00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 #include "adchpp.h"
00020
00021 #include "SocketManager.h"
00022
00023 #include "LogManager.h"
00024 #include "TimerManager.h"
00025 #include "ManagedSocket.h"
00026 #include "ServerInfo.h"
00027 #include "SimpleXML.h"
00028
00029 #ifdef HAVE_OPENSSL
00030 #include <boost/asio/ssl.hpp>
00031 #endif
00032 #include <boost/date_time/posix_time/time_parsers.hpp>
00033
00034 namespace adchpp {
00035
00036 using namespace std;
00037 using namespace std::placeholders;
00038 using namespace boost::asio;
00039 using boost::system::error_code;
00040 using boost::system::system_error;
00041
00042 SocketManager::SocketManager() {
00043
00044 }
00045
00046 SocketManager::~SocketManager() {
00047
00048 }
00049
00050 SocketManager* SocketManager::instance = 0;
00051 const string SocketManager::className = "SocketManager";
00052
00053 template<typename T>
00054 class SocketStream : public AsyncStream {
00055 public:
00056 template<typename X>
00057 SocketStream(X& x) : sock(x) { }
00058
00059 template<typename X, typename Y>
00060 SocketStream(X& x, Y& y) : sock(x, y) {
00061 }
00062
00063 virtual size_t available() {
00064 return sock.lowest_layer().available();
00065 }
00066
00067 virtual void prepareRead(const BufferPtr& buf, const Handler& handler) {
00068 if(buf) {
00069 sock.async_read_some(boost::asio::buffer(buf->data(), buf->size()), handler);
00070 } else {
00071 sock.async_read_some(boost::asio::null_buffers(), handler);
00072 }
00073 }
00074
00075 virtual size_t read(const BufferPtr& buf) {
00076 return sock.read_some(boost::asio::buffer(buf->data(), buf->size()));
00077 }
00078
00079 virtual void write(const BufferList& bufs, const Handler& handler) {
00080 if(bufs.size() == 1) {
00081 sock.async_write_some(boost::asio::buffer(bufs[0]->data(), bufs[0]->size()), handler);
00082 } else {
00083 size_t n = std::min(bufs.size(), static_cast<size_t>(64));
00084 std::vector<boost::asio::const_buffer> buffers;
00085 buffers.reserve(n);
00086
00087 const size_t maxBytes = 1024;
00088
00089 for(size_t i = 0, total = 0; i < n && total < maxBytes; ++i) {
00090 size_t left = maxBytes - total;
00091 size_t bytes = min(bufs[i]->size(), left);
00092 buffers.push_back(boost::asio::const_buffer(bufs[i]->data(), bytes));
00093 total += bytes;
00094 }
00095
00096 sock.async_write_some(buffers, handler);
00097 }
00098 }
00099
00100 virtual void close() {
00101
00102 if(sock.lowest_layer().is_open()) {
00103 boost::system::error_code ec;
00104 sock.lowest_layer().set_option(socket_base::linger(true, 10), ec);
00105 sock.lowest_layer().close(ec);
00106 }
00107 }
00108
00109 T sock;
00110 };
00111
00112 typedef SocketStream<ip::tcp::socket> SimpleSocketStream;
00113
00114 #ifdef HAVE_OPENSSL
00115 typedef SocketStream<ssl::stream<ip::tcp::socket> > TLSSocketStream;
00116 #endif
00117
00118
00119
00120 static const int SOCKET_BUFFER_SIZE = 1024;
00121
00122 class SocketFactory : public enable_shared_from_this<SocketFactory> {
00123 public:
00124 SocketFactory(io_service& io_, const SocketManager::IncomingHandler& handler_, const ServerInfoPtr& info) :
00125 io(io_),
00126 acceptor(io_, ip::tcp::endpoint(boost::asio::ip::tcp::v4(), info->port)),
00127 serverInfo(info),
00128 handler(handler_)
00129 {
00130 #ifdef HAVE_OPENSSL
00131 if(info->secure()) {
00132 context = make_shared<boost::asio::ssl::context>(io, ssl::context::tlsv1_server);
00133 context->set_options(
00134 boost::asio::ssl::context::no_sslv2
00135 | boost::asio::ssl::context::no_sslv3
00136 | boost::asio::ssl::context::single_dh_use);
00137
00138 context->use_certificate_chain_file(info->TLSParams.cert);
00139 context->use_private_key_file(info->TLSParams.pkey, boost::asio::ssl::context::pem);
00140 context->use_tmp_dh_file(info->TLSParams.dh);
00141 }
00142 #endif
00143 }
00144
00145 void prepareAccept() {
00146 if(!SocketManager::getInstance()->work.get()) {
00147 return;
00148 }
00149 #ifdef HAVE_OPENSSL
00150 if(serverInfo->secure()) {
00151 shared_ptr<TLSSocketStream> s = make_shared<TLSSocketStream>(io, *context);
00152 ManagedSocketPtr socket = make_shared<ManagedSocket>(s);
00153 acceptor.async_accept(s->sock.lowest_layer(), std::bind(&SocketFactory::prepareHandshake, shared_from_this(), std::placeholders::_1, socket));
00154 } else {
00155 #endif
00156 shared_ptr<SimpleSocketStream> s = make_shared<SimpleSocketStream>(io);
00157 ManagedSocketPtr socket = make_shared<ManagedSocket>(s);
00158 acceptor.async_accept(s->sock.lowest_layer(), std::bind(&SocketFactory::handleAccept, shared_from_this(), std::placeholders::_1, socket));
00159 #ifdef HAVE_OPENSSL
00160 }
00161 #endif
00162 }
00163
00164 #ifdef HAVE_OPENSSL
00165 void prepareHandshake(const error_code& ec, const ManagedSocketPtr& socket) {
00166 if(!ec) {
00167 TLSSocketStream* tls = static_cast<TLSSocketStream*>(socket->sock.get());
00168
00169
00170 tls->sock.lowest_layer().set_option(socket_base::linger(true, 30));
00171 tls->sock.lowest_layer().set_option(boost::asio::socket_base::receive_buffer_size(SOCKET_BUFFER_SIZE));
00172 tls->sock.lowest_layer().set_option(boost::asio::socket_base::send_buffer_size(SOCKET_BUFFER_SIZE));
00173 try {
00174 socket->setIp(tls->sock.lowest_layer().remote_endpoint().address().to_string());
00175 } catch(const system_error&) { }
00176 tls->sock.async_handshake(ssl::stream_base::server, std::bind(&SocketFactory::completeAccept, shared_from_this(), std::placeholders::_1, socket));
00177 }
00178
00179 prepareAccept();
00180 }
00181 #endif
00182
00183 void handleAccept(const error_code& ec, const ManagedSocketPtr& socket) {
00184 if(!ec) {
00185 shared_ptr<SimpleSocketStream> s = SHARED_PTR_NS::static_pointer_cast<SimpleSocketStream>(socket->sock);
00186
00187
00188 s->sock.lowest_layer().set_option(socket_base::linger(true, 30));
00189 s->sock.lowest_layer().set_option(boost::asio::socket_base::receive_buffer_size(SOCKET_BUFFER_SIZE));
00190 s->sock.lowest_layer().set_option(boost::asio::socket_base::send_buffer_size(SOCKET_BUFFER_SIZE));
00191 try {
00192 socket->setIp(s->sock.lowest_layer().remote_endpoint().address().to_string());
00193 } catch(const system_error&) { }
00194 }
00195
00196 completeAccept(ec, socket);
00197
00198 prepareAccept();
00199 }
00200
00201 void completeAccept(const error_code& ec, const ManagedSocketPtr& socket) {
00202 handler(socket);
00203 socket->completeAccept(ec);
00204 }
00205
00206 void close() { acceptor.close(); }
00207
00208 io_service& io;
00209 ip::tcp::acceptor acceptor;
00210 ServerInfoPtr serverInfo;
00211 SocketManager::IncomingHandler handler;
00212
00213 #ifdef HAVE_OPENSSL
00214 shared_ptr<boost::asio::ssl::context> context;
00215 #endif
00216
00217 };
00218
00219 int SocketManager::run() {
00220 LOG(SocketManager::className, "Starting");
00221
00222 for(std::vector<ServerInfoPtr>::iterator i = servers.begin(), iend = servers.end(); i != iend; ++i) {
00223 const ServerInfoPtr& si = *i;
00224
00225 try {
00226 SocketFactoryPtr factory = make_shared<SocketFactory>(io, incomingHandler, si);
00227 factory->prepareAccept();
00228 factories.push_back(factory);
00229 } catch(const system_error& se) {
00230 LOG(SocketManager::className, "Error while loading server on port " + Util::toString(si->port) +": " + se.what());
00231 }
00232 }
00233
00234 io.run();
00235
00236 io.reset();
00237
00238 return 0;
00239 }
00240
00241 void SocketManager::closeFactories() {
00242 for(std::vector<SocketFactoryPtr>::iterator i = factories.begin(), iend = factories.end(); i != iend; ++i) {
00243 (*i)->close();
00244 }
00245 factories.clear();
00246 }
00247
00248 void SocketManager::addJob(const Callback& callback) throw() {
00249 io.post(callback);
00250 }
00251
00252 void SocketManager::addJob(const long msec, const Callback& callback) {
00253 addJob(boost::posix_time::milliseconds(msec), callback);
00254 }
00255
00256 void SocketManager::addJob(const std::string& time, const Callback& callback) {
00257 addJob(boost::posix_time::duration_from_string(time), callback);
00258 }
00259
00260 SocketManager::Callback SocketManager::addTimedJob(const long msec, const Callback& callback) {
00261 return addTimedJob(boost::posix_time::milliseconds(msec), callback);
00262 }
00263
00264 SocketManager::Callback SocketManager::addTimedJob(const std::string& time, const Callback& callback) {
00265 return addTimedJob(boost::posix_time::duration_from_string(time), callback);
00266 }
00267
00268 void SocketManager::addJob(const deadline_timer::duration_type& duration, const Callback& callback) {
00269 setTimer(make_shared<timer_ptr::element_type>(io, duration), deadline_timer::duration_type(), new Callback(callback));
00270 }
00271
00272 SocketManager::Callback SocketManager::addTimedJob(const deadline_timer::duration_type& duration, const Callback& callback) {
00273 timer_ptr timer = make_shared<timer_ptr::element_type>(io, duration);
00274 Callback* pCallback = new Callback(callback);
00275 setTimer(timer, duration, pCallback);
00276 return std::bind(&SocketManager::cancelTimer, this, timer, pCallback);
00277 }
00278
00279 void SocketManager::setTimer(timer_ptr timer, const deadline_timer::duration_type& duration, Callback* callback) {
00280 timer->async_wait(std::bind(&SocketManager::handleWait, this, timer, duration, std::placeholders::_1, callback));
00281 }
00282
00283 void SocketManager::handleWait(timer_ptr timer, const deadline_timer::duration_type& duration, const error_code& error, Callback* callback) {
00284 bool run_on = duration.ticks();
00285
00286 if(!error) {
00287 if(run_on) {
00288
00289 timer->expires_at(timer->expires_at() + duration);
00290 setTimer(timer, duration, callback);
00291 }
00292
00293 addJob(*callback);
00294 }
00295
00296 if(!run_on) {
00297
00298 delete callback;
00299 }
00300 }
00301
00302 void SocketManager::cancelTimer(timer_ptr timer, Callback* callback) {
00303 if(timer.get()) {
00304 error_code ec;
00305 timer->cancel(ec);
00306 }
00307
00308 delete callback;
00309 }
00310
00311 void SocketManager::startup() throw(ThreadException) {
00312 work.reset(new io_service::work(io));
00313 start();
00314 }
00315
00316 void SocketManager::shutdown() {
00317 work.reset();
00318 addJob(std::bind(&SocketManager::closeFactories, this));
00319 io.stop();
00320 join();
00321 }
00322
00323 void SocketManager::onLoad(const SimpleXML& xml) throw() {
00324 servers.clear();
00325 }
00326
00327 }