ManagedSocket.cpp
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 #include "adchpp.h"
00020
00021 #include "ManagedSocket.h"
00022
00023 #include "SocketManager.h"
00024 #include "TimerManager.h"
00025
00026 namespace adchpp {
00027
00028 using namespace std;
00029
00030 using namespace boost::asio;
00031
00032 size_t ManagedSocket::defaultMaxBufferSize = 16 * 1024;
00033 time_t ManagedSocket::overflowTimeout = 60;
00034
00035 ManagedSocket::~ManagedSocket() throw() {
00036 dcdebug("ManagedSocket deleted\n");
00037 }
00038
00039 static size_t sum(const BufferList& l) {
00040 size_t bytes = 0;
00041 for(BufferList::const_iterator i = l.begin(); i != l.end(); ++i) {
00042 bytes += (*i)->size();
00043 }
00044 return bytes;
00045 }
00046
00047 size_t ManagedSocket::getQueuedBytes() const {
00048 return sum(outBuf);
00049 }
00050
00051 void ManagedSocket::write(const BufferPtr& buf, bool lowPrio ) throw() {
00052 if((buf->size() == 0) || (disc > 0))
00053 return;
00054
00055 size_t queued = getQueuedBytes();
00056
00057 if(getMaxBufferSize() > 0 && queued + buf->size() > getMaxBufferSize()) {
00058 if(lowPrio) {
00059 return;
00060 } else if(overflow > 0 && overflow + getOverflowTimeout() < GET_TIME()) {
00061 disconnect(5000, Util::REASON_WRITE_OVERFLOW);
00062 return;
00063 } else {
00064 overflow = GET_TIME();
00065 }
00066 }
00067
00068 Stats::queueBytes += buf->size();
00069 Stats::queueCalls++;
00070
00071 outBuf.push_back(buf);
00072
00073 prepareWrite();
00074 }
00075
00076
00077 namespace {
00078 template<void (ManagedSocket::*F)(const boost::system::error_code&, size_t)>
00079 struct Handler {
00080 Handler(const ManagedSocketPtr& ms_) : ms(ms_) { }
00081 ManagedSocketPtr ms;
00082
00083 void operator()(const boost::system::error_code& ec, size_t bytes) {
00084 (ms.get()->*F)(ec, bytes);
00085 }
00086 };
00087 }
00088
00089 void ManagedSocket::prepareWrite() throw() {
00090 if(lastWrite != 0 && TimerManager::getTime() > lastWrite + 60) {
00091 disconnect(5000, Util::REASON_WRITE_TIMEOUT);
00092 } else if(!outBuf.empty() && lastWrite == 0) {
00093 lastWrite = TimerManager::getTime();
00094 sock->write(outBuf, Handler<&ManagedSocket::completeWrite>(shared_from_this()));
00095 }
00096 }
00097
00098 void ManagedSocket::completeWrite(const boost::system::error_code& ec, size_t bytes) throw() {
00099 lastWrite = 0;
00100
00101 if(!ec) {
00102 Stats::sendBytes += bytes;
00103 Stats::sendCalls++;
00104
00105 while(bytes > 0) {
00106 BufferPtr& p = *outBuf.begin();
00107 if(p->size() <= bytes) {
00108 bytes -= p->size();
00109 outBuf.erase(outBuf.begin());
00110 } else {
00111 p = make_shared<Buffer>(p->data() + bytes, p->size() - bytes);
00112 bytes = 0;
00113 }
00114 }
00115
00116 if(overflow > 0) {
00117 size_t left = getQueuedBytes();
00118 if(left < getMaxBufferSize()) {
00119 overflow = 0;
00120 }
00121 }
00122
00123 if(disc && outBuf.empty()) {
00124 sock->close();
00125 } else {
00126 prepareWrite();
00127 }
00128 } else {
00129 failSocket(ec);
00130 }
00131 }
00132
00133 void ManagedSocket::prepareRead() throw() {
00134
00135 sock->prepareRead(BufferPtr(), Handler<&ManagedSocket::prepareRead2>(shared_from_this()));
00136 }
00137
00138 void ManagedSocket::prepareRead2(const boost::system::error_code& ec, size_t) throw() {
00139 if(!ec) {
00140
00141
00142
00143
00144
00145
00146
00147
00148
00149
00150 inBuf = make_shared<Buffer>(64);
00151
00152 sock->prepareRead(inBuf, Handler<&ManagedSocket::completeRead>(shared_from_this()));
00153 } else {
00154 failSocket(ec);
00155 }
00156 }
00157
00158 void ManagedSocket::completeRead(const boost::system::error_code& ec, size_t bytes) throw() {
00159 if(!ec) {
00160 try {
00161 Stats::recvBytes += bytes;
00162 Stats::recvCalls++;
00163
00164 inBuf->resize(bytes);
00165
00166 if(dataHandler) {
00167 dataHandler(inBuf);
00168 }
00169 inBuf.reset();
00170 prepareRead();
00171 } catch(const boost::system::system_error& e) {
00172 failSocket(e.code());
00173 }
00174 } else {
00175 inBuf.reset();
00176 failSocket(ec);
00177 }
00178 }
00179
00180 void ManagedSocket::completeAccept(const boost::system::error_code& ec) throw() {
00181 if(!ec) {
00182 if(connectedHandler)
00183 connectedHandler();
00184 prepareRead();
00185 } else {
00186 failSocket(ec);
00187 }
00188 }
00189
00190 void ManagedSocket::failSocket(const boost::system::error_code& code) throw() {
00191 SocketManager::getInstance()->errors[code.message()]++;
00192 if(failedHandler) {
00193 failedHandler(code);
00194 failedHandler = FailedHandler();
00195 dataHandler = DataHandler();
00196 connectedHandler = ConnectedHandler();
00197 }
00198 }
00199
00200 struct Disconnector {
00201 Disconnector(const AsyncStreamPtr& stream_) : stream(stream_) { }
00202 void operator()() { stream->close(); }
00203 AsyncStreamPtr stream;
00204 };
00205
00206 void ManagedSocket::disconnect(size_t timeout, Util::Reason reason) throw() {
00207 if(!disc) {
00208 disc = GET_TICK() + timeout;
00209
00210 Util::reasons[reason]++;
00211 if(!lastWrite) {
00212 sock->close();
00213 } else {
00214 SocketManager::getInstance()->addJob(timeout, Disconnector(sock));
00215 }
00216 }
00217 }
00218
00219 }