Connection.hh
00001 /* 00002 * Copyright 2011 Nate Koenig & Andrew Howard 00003 * 00004 * Licensed under the Apache License, Version 2.0 (the "License"); 00005 * you may not use this file except in compliance with the License. 00006 * You may obtain a copy of the License at 00007 * 00008 * http://www.apache.org/licenses/LICENSE-2.0 00009 * 00010 * Unless required by applicable law or agreed to in writing, software 00011 * distributed under the License is distributed on an "AS IS" BASIS, 00012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00013 * See the License for the specific language governing permissions and 00014 * limitations under the License. 00015 * 00016 */ 00017 #ifndef CONNECTION_HH 00018 #define CONNECTION_HH 00019 00020 #include <boost/asio.hpp> 00021 #include <boost/bind.hpp> 00022 #include <boost/function.hpp> 00023 #include <boost/thread.hpp> 00024 #include <boost/tuple/tuple.hpp> 00025 #include <iostream> 00026 #include <iomanip> 00027 #include <deque> 00028 00029 #include <google/protobuf/message.h> 00030 00031 #include "common/Event.hh" 00032 #include "common/Console.hh" 00033 #include "common/Exception.hh" 00034 00035 #define HEADER_LENGTH 8 00036 00037 namespace gazebo 00038 { 00039 namespace transport 00040 { 00041 extern bool is_stopped(); 00042 00043 class IOManager; 00044 class Connection; 00045 typedef boost::shared_ptr<Connection> ConnectionPtr; 00046 00049 00051 class Connection : public boost::enable_shared_from_this<Connection> 00052 { 00054 public: Connection(); 00055 00057 public: virtual ~Connection(); 00058 00060 public: bool Connect(const std::string &host, unsigned short port); 00061 00062 typedef boost::function<void(const ConnectionPtr&)> AcceptCallback; 00063 00065 public: void Listen(unsigned short port, const AcceptCallback &accept_cb); 00066 00067 typedef boost::function<void(const std::string &data)> ReadCallback; 00070 public: void StartRead(const ReadCallback &cb); 00071 00073 public: void StopRead(); 00074 00076 public: void Shutdown(); 00077 00079 public: bool IsOpen() const; 00080 00082 private: void Close(); 00083 00085 public: void Cancel(); 00086 00088 public: bool Read(std::string &data); 00089 00091 public: void EnqueueMsg(const std::string &_buffer, bool _force = false); 00092 00094 public: std::string GetLocalURI() const; 00095 00097 public: std::string GetRemoteURI() const; 00098 00100 public: std::string GetLocalAddress() const; 00101 00103 public: unsigned short GetLocalPort() const; 00104 00106 public: std::string GetRemoteAddress() const; 00107 00109 public: unsigned short GetRemotePort() const; 00110 00112 public: std::string GetRemoteHostname() const; 00113 00115 public: std::string GetLocalHostname() const; 00116 00118 public: template<typename Handler> 00119 void AsyncRead(Handler handler) 00120 { 00121 if (!this->IsOpen()) 00122 { 00123 gzerr << "AsyncRead on a closed socket\n"; 00124 return; 00125 } 00126 00127 void (Connection::*f)(const boost::system::error_code &, 00128 boost::tuple<Handler>) = &Connection::OnReadHeader<Handler>; 00129 00130 this->inbound_header.resize(HEADER_LENGTH); 00131 boost::asio::async_read(*this->socket, 00132 boost::asio::buffer(this->inbound_header), 00133 boost::bind(f, this, 00134 boost::asio::placeholders::error, 00135 boost::make_tuple(handler)) ); 00136 } 00137 00138 // Handle a completed read of a message header. The handler is passed 00139 // using a tuple since boost::bind seems to have trouble binding 00140 // a function object created using boost::bind as a parameter 00141 private: template<typename Handler> 00142 void OnReadHeader(const boost::system::error_code &_e, 00143 boost::tuple<Handler> _handler) 00144 { 00145 if (_e) 00146 { 00147 if (_e.message() != "End of File") 00148 { 00149 this->Close(); 00150 // This will occur when the other side closes the 00151 // connection 00152 } 00153 } 00154 else 00155 { 00156 std::size_t inbound_data_size = 0; 00157 std::string header(&this->inbound_header[0], 00158 this->inbound_header.size()); 00159 this->inbound_header.clear(); 00160 00161 inbound_data_size = this->ParseHeader(header); 00162 00163 if (inbound_data_size > 0) 00164 { 00165 // Start the asynchronous call to receive data 00166 this->inbound_data.resize(inbound_data_size); 00167 00168 void (Connection::*f)(const boost::system::error_code &e, 00169 boost::tuple<Handler>) = &Connection::OnReadData<Handler>; 00170 00171 boost::asio::async_read( *this->socket, 00172 boost::asio::buffer(this->inbound_data), 00173 boost::bind(f, this, 00174 boost::asio::placeholders::error, 00175 _handler) ); 00176 } 00177 else 00178 { 00179 gzerr << "Header is empty\n"; 00180 boost::get<0>(_handler)(""); 00181 // This code tries to read the header again. We should 00182 // never get here. 00183 //this->inbound_header.resize(HEADER_LENGTH); 00184 00185 //void (Connection::*f)(const boost::system::error_code &, 00186 // boost::tuple<Handler>) = &Connection::OnReadHeader<Handler>; 00187 00188 //boost::asio::async_read(*this->socket, 00189 // boost::asio::buffer(this->inbound_header), 00190 // boost::bind(f, this, 00191 // boost::asio::placeholders::error, _handler) ); 00192 } 00193 } 00194 } 00195 00196 private: template<typename Handler> 00197 void OnReadData(const boost::system::error_code &e, 00198 boost::tuple<Handler> handler) 00199 { 00200 if (e) 00201 gzerr << "Error Reading data!\n"; 00202 00203 // Inform caller that data has been received 00204 std::string data(&this->inbound_data[0], 00205 this->inbound_data.size()); 00206 this->inbound_data.clear(); 00207 00208 if (data.empty()) 00209 gzerr << "OnReadData got empty data!!!\n"; 00210 00211 if (!e && !transport::is_stopped()) 00212 { 00213 boost::get<0>(handler)(data); 00214 } 00215 } 00216 00217 public: event::ConnectionPtr ConnectToShutdown( boost::function<void()> subscriber_ ) 00218 { return this->shutdown.Connect(subscriber_); } 00219 00220 public: void DisconnectShutdown( event::ConnectionPtr subscriber_) 00221 {this->shutdown.Disconnect(subscriber_);} 00222 00223 00225 public: void ProcessWriteQueue(); 00226 00227 private: void OnWrite(const boost::system::error_code &e, 00228 boost::asio::streambuf *_b); 00229 //std::list<boost::asio::const_buffer> *_buffer); 00230 00232 private: void OnAccept(const boost::system::error_code &e); 00233 00235 private: std::size_t ParseHeader( const std::string &header ); 00236 00238 private: void ReadLoop(const ReadCallback &cb); 00239 00241 private: boost::asio::ip::tcp::endpoint GetLocalEndpoint() const; 00242 00244 private: boost::asio::ip::tcp::endpoint GetRemoteEndpoint() const; 00245 00246 private: static std::string GetHostname(boost::asio::ip::tcp::endpoint ep); 00247 00248 private: boost::asio::ip::tcp::socket *socket; 00249 private: boost::asio::ip::tcp::acceptor *acceptor; 00250 00251 private: std::deque<std::string> writeQueue; 00252 private: std::deque<unsigned int> writeCounts; 00253 private: boost::recursive_mutex *writeMutex; 00254 private: boost::recursive_mutex *readMutex; 00255 00256 // Called when a new connection is received 00257 private: AcceptCallback acceptCB; 00258 00259 //private: char inbound_header[HEADER_LENGTH]; 00260 private: std::vector<char> inbound_header; 00261 private: std::vector<char> inbound_data; 00262 00263 private: boost::thread *readThread; 00264 private: bool readQuit; 00265 00266 public: unsigned int id; 00267 private: static unsigned int idCounter; 00268 private: ConnectionPtr acceptConn; 00269 00270 private: event::EventT<void()> shutdown; 00271 private: static IOManager *iomanager; 00272 00273 public: unsigned int writeCount; 00274 00275 private: std::string localURI; 00276 private: std::string localAddress; 00277 private: std::string remoteURI; 00278 private: std::string remoteAddress; 00279 }; 00281 } 00282 } 00283 00284 #endif

1.7.5.1