ManagedSocket.cpp

Go to the documentation of this file.
00001 /*
00002  * Copyright (C) 2006-2010 Jacek Sieka, arnetheduck on gmail point com
00003  *
00004  * This program is free software; you can redistribute it and/or modify
00005  * it under the terms of the GNU General Public License as published by
00006  * the Free Software Foundation; either version 2 of the License, or
00007  * (at your option) any later version.
00008  *
00009  * This program is distributed in the hope that it will be useful,
00010  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00011  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012  * GNU General Public License for more details.
00013  *
00014  * You should have received a copy of the GNU General Public License
00015  * along with this program; if not, write to the Free Software
00016  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
00017  */
00018 
00019 #include "adchpp.h"
00020 
00021 #include "ManagedSocket.h"
00022 
00023 #include "SocketManager.h"
00024 #include "TimerManager.h"
00025 
00026 namespace adchpp {
00027 
00028 using namespace std;
00029 
00030 using namespace boost::asio;
00031 
00032 size_t ManagedSocket::defaultMaxBufferSize = 16 * 1024;
00033 time_t ManagedSocket::overflowTimeout = 60;
00034 
00035 ManagedSocket::~ManagedSocket() throw() {
00036     dcdebug("ManagedSocket deleted\n");
00037 }
00038 
00039 static size_t sum(const BufferList& l) {
00040     size_t bytes = 0;
00041     for(BufferList::const_iterator i = l.begin(); i != l.end(); ++i) {
00042         bytes += (*i)->size();
00043     }
00044     return bytes;
00045 }
00046 
00047 size_t ManagedSocket::getQueuedBytes() const {
00048     return sum(outBuf);
00049 }
00050 
00051 void ManagedSocket::write(const BufferPtr& buf, bool lowPrio /* = false */) throw() {
00052     if((buf->size() == 0) || (disc > 0))
00053         return;
00054 
00055     size_t queued = getQueuedBytes();
00056 
00057     if(getMaxBufferSize() > 0 && queued + buf->size() > getMaxBufferSize()) {
00058         if(lowPrio) {
00059             return;
00060         } else if(overflow > 0 && overflow + getOverflowTimeout() < GET_TIME()) {
00061             disconnect(5000, Util::REASON_WRITE_OVERFLOW);
00062             return;
00063         } else {
00064             overflow = GET_TIME();
00065         }
00066     }
00067 
00068     Stats::queueBytes += buf->size();
00069     Stats::queueCalls++;
00070 
00071     outBuf.push_back(buf);
00072 
00073     prepareWrite();
00074 }
00075 
00076 // Simplified handler to avoid bind complexity
00077 namespace {
00078 template<void (ManagedSocket::*F)(const boost::system::error_code&, size_t)>
00079 struct Handler {
00080     Handler(const ManagedSocketPtr& ms_) : ms(ms_) { }
00081     ManagedSocketPtr ms;
00082 
00083     void operator()(const boost::system::error_code& ec, size_t bytes) {
00084         (ms.get()->*F)(ec, bytes);
00085     }
00086 };
00087 }
00088 
00089 void ManagedSocket::prepareWrite() throw() {
00090     if(lastWrite != 0 && TimerManager::getTime() > lastWrite + 60) {
00091         disconnect(5000, Util::REASON_WRITE_TIMEOUT);
00092     } else if(!outBuf.empty() && lastWrite == 0) {
00093         lastWrite = TimerManager::getTime();
00094         sock->write(outBuf, Handler<&ManagedSocket::completeWrite>(shared_from_this()));
00095     }
00096 }
00097 
00098 void ManagedSocket::completeWrite(const boost::system::error_code& ec, size_t bytes) throw() {
00099     lastWrite = 0;
00100 
00101     if(!ec) {
00102         Stats::sendBytes += bytes;
00103         Stats::sendCalls++;
00104 
00105         while(bytes > 0) {
00106             BufferPtr& p = *outBuf.begin();
00107             if(p->size() <= bytes) {
00108                 bytes -= p->size();
00109                 outBuf.erase(outBuf.begin());
00110             } else {
00111                 p = make_shared<Buffer>(p->data() + bytes, p->size() - bytes);
00112                 bytes = 0;
00113             }
00114         }
00115 
00116         if(overflow > 0) {
00117             size_t left = getQueuedBytes();
00118             if(left < getMaxBufferSize()) {
00119                 overflow = 0;
00120             }
00121         }
00122 
00123         if(disc && outBuf.empty()) {
00124             sock->close();
00125         } else {
00126             prepareWrite();
00127         }
00128     } else {
00129         failSocket(ec);
00130     }
00131 }
00132 
00133 void ManagedSocket::prepareRead() throw() {
00134     // We first send in an empty buffer to get notification when there's data available
00135     sock->prepareRead(BufferPtr(), Handler<&ManagedSocket::prepareRead2>(shared_from_this()));
00136 }
00137 
00138 void ManagedSocket::prepareRead2(const boost::system::error_code& ec, size_t) throw() {
00139     if(!ec) {
00140         // ADC commands are typically small - using a small buffer
00141         // helps with fairness
00142         // Calling available() on an ASIO socket seems to be terribly slow
00143         // Also, we might end up here if the socket has been closed, in which
00144         // case available would return 0 bytes...
00145         // We can't make a synchronous receive here because when using SSL
00146         // there might be data on the socket that won't translate into user data
00147         // and thus read_some will block
00148         // If there's no user data, this will effectively post a read operation
00149         // with a buffer and waste memory...to be continued.
00150         inBuf = make_shared<Buffer>(64);
00151 
00152         sock->prepareRead(inBuf, Handler<&ManagedSocket::completeRead>(shared_from_this()));
00153     } else {
00154         failSocket(ec);
00155     }
00156 }
00157 
00158 void ManagedSocket::completeRead(const boost::system::error_code& ec, size_t bytes) throw() {
00159     if(!ec) {
00160         try {
00161             Stats::recvBytes += bytes;
00162             Stats::recvCalls++;
00163 
00164             inBuf->resize(bytes);
00165 
00166             if(dataHandler) {
00167                 dataHandler(inBuf);
00168             }
00169             inBuf.reset();
00170             prepareRead();
00171         } catch(const boost::system::system_error& e) {
00172             failSocket(e.code());
00173         }
00174     } else {
00175         inBuf.reset();
00176         failSocket(ec);
00177     }
00178 }
00179 
00180 void ManagedSocket::completeAccept(const boost::system::error_code& ec) throw() {
00181     if(!ec) {
00182         if(connectedHandler)
00183             connectedHandler();
00184         prepareRead();
00185     } else {
00186         failSocket(ec);
00187     }
00188 }
00189 
00190 void ManagedSocket::failSocket(const boost::system::error_code& code) throw() {
00191     SocketManager::getInstance()->errors[code.message()]++;
00192     if(failedHandler) {
00193         failedHandler(code);
00194         failedHandler = FailedHandler();
00195         dataHandler = DataHandler();
00196         connectedHandler = ConnectedHandler();
00197     }
00198 }
00199 
00200 struct Disconnector {
00201     Disconnector(const AsyncStreamPtr& stream_) : stream(stream_) { }
00202     void operator()() { stream->close(); }
00203     AsyncStreamPtr stream;
00204 };
00205 
00206 void ManagedSocket::disconnect(size_t timeout, Util::Reason reason) throw() {
00207     if(!disc) {
00208         disc = GET_TICK() + timeout;
00209 
00210         Util::reasons[reason]++;
00211         if(!lastWrite) {
00212             sock->close();
00213         } else {
00214             SocketManager::getInstance()->addJob(timeout, Disconnector(sock));
00215         }
00216     }
00217 }
00218 
00219 }
Generated on Sat Nov 27 23:37:53 2010 for adchpp by  doxygen 1.6.3