Connection.hh
00001 /*
00002  * Copyright 2011 Nate Koenig & Andrew Howard
00003  *
00004  * Licensed under the Apache License, Version 2.0 (the "License");
00005  * you may not use this file except in compliance with the License.
00006  * You may obtain a copy of the License at
00007  *
00008  *     http://www.apache.org/licenses/LICENSE-2.0
00009  *
00010  * Unless required by applicable law or agreed to in writing, software
00011  * distributed under the License is distributed on an "AS IS" BASIS,
00012  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  * See the License for the specific language governing permissions and
00014  * limitations under the License.
00015  *
00016 */
00017 #ifndef CONNECTION_HH
00018 #define CONNECTION_HH
00019 
00020 #include <boost/asio.hpp>
00021 #include <boost/bind.hpp>
00022 #include <boost/function.hpp>
00023 #include <boost/thread.hpp>
00024 #include <boost/tuple/tuple.hpp>
00025 #include <iostream>
00026 #include <iomanip>
00027 #include <deque>
00028 
00029 #include <google/protobuf/message.h>
00030 
00031 #include "common/Event.hh"
00032 #include "common/Console.hh"
00033 #include "common/Exception.hh"
00034 
00035 #define HEADER_LENGTH 8
00036 
00037 namespace gazebo
00038 {
00039   namespace transport
00040   {
00041     extern bool is_stopped();
00042 
00043     class IOManager;
00044     class Connection;
00045     typedef boost::shared_ptr<Connection> ConnectionPtr;
00046 
00049 
00051     class Connection : public boost::enable_shared_from_this<Connection>
00052     {
00054       public: Connection();
00055 
00057       public: virtual ~Connection();
00058 
00060       public: bool Connect(const std::string &host,  unsigned short port);
00061 
00062       typedef boost::function<void(const ConnectionPtr&)> AcceptCallback;
00063 
00065       public: void Listen(unsigned short port, const AcceptCallback &accept_cb);
00066 
00067       typedef boost::function<void(const std::string &data)> ReadCallback;
00070       public: void StartRead(const ReadCallback &cb);
00071              
00073       public: void StopRead();
00074 
00076       public: void Shutdown();
00077 
00079       public: bool IsOpen() const;
00080 
00082       private: void Close();
00083 
00085       public: void Cancel();
00086 
00088       public: bool Read(std::string &data);
00089 
00091       public: void EnqueueMsg(const std::string &_buffer, bool _force = false);
00092 
00094       public: std::string GetLocalURI() const;
00095               
00097       public: std::string GetRemoteURI() const;
00098 
00100       public: std::string GetLocalAddress() const;
00101 
00103       public: unsigned short GetLocalPort() const;
00104 
00106       public: std::string GetRemoteAddress() const;
00107 
00109       public: unsigned short GetRemotePort() const;
00110 
00112       public: std::string GetRemoteHostname() const;
00113 
00115       public: std::string GetLocalHostname() const;
00116 
00118       public: template<typename Handler>
00119               void AsyncRead(Handler handler)
00120               {
00121                 if (!this->IsOpen())
00122                 {
00123                   gzerr << "AsyncRead on a closed socket\n";
00124                   return;
00125                 }
00126 
00127                 void (Connection::*f)(const boost::system::error_code &,
00128                     boost::tuple<Handler>) = &Connection::OnReadHeader<Handler>;
00129 
00130                 this->inbound_header.resize(HEADER_LENGTH);
00131                 boost::asio::async_read(*this->socket,
00132                     boost::asio::buffer(this->inbound_header),
00133                     boost::bind(f, this, 
00134                                 boost::asio::placeholders::error,
00135                                 boost::make_tuple(handler)) );
00136               }
00137 
00138       // Handle a completed read of a message header. The handler is passed
00139       // using a tuple since boost::bind seems to have trouble binding
00140       // a function object created using boost::bind as a parameter
00141       private: template<typename Handler>
00142                void OnReadHeader(const boost::system::error_code &_e,
00143                                  boost::tuple<Handler> _handler)
00144               {
00145                 if (_e)
00146                 {
00147                   if (_e.message() != "End of File")
00148                   {
00149                     this->Close();
00150                     // This will occur when the other side closes the
00151                     // connection
00152                   }
00153                 }
00154                 else
00155                 {
00156                   std::size_t inbound_data_size = 0;
00157                   std::string header(&this->inbound_header[0], 
00158                                       this->inbound_header.size());
00159                   this->inbound_header.clear();
00160 
00161                   inbound_data_size = this->ParseHeader(header);
00162 
00163                  if (inbound_data_size > 0)
00164                   {
00165                     // Start the asynchronous call to receive data
00166                     this->inbound_data.resize(inbound_data_size);
00167 
00168                     void (Connection::*f)(const boost::system::error_code &e,
00169                         boost::tuple<Handler>) = &Connection::OnReadData<Handler>;
00170 
00171                     boost::asio::async_read( *this->socket, 
00172                         boost::asio::buffer(this->inbound_data), 
00173                         boost::bind(f, this, 
00174                                     boost::asio::placeholders::error, 
00175                                     _handler) );
00176                   }
00177                   else
00178                   {
00179                     gzerr << "Header is empty\n";
00180                     boost::get<0>(_handler)("");
00181                     // This code tries to read the header again. We should
00182                     // never get here.
00183                     //this->inbound_header.resize(HEADER_LENGTH);
00184 
00185                     //void (Connection::*f)(const boost::system::error_code &,
00186                     //    boost::tuple<Handler>) = &Connection::OnReadHeader<Handler>;
00187 
00188                     //boost::asio::async_read(*this->socket,
00189                     //    boost::asio::buffer(this->inbound_header),
00190                     //    boost::bind(f, this, 
00191                     //      boost::asio::placeholders::error, _handler) );
00192                   }
00193                 }
00194               }
00195 
00196      private: template<typename Handler>
00197               void OnReadData(const boost::system::error_code &e,
00198                               boost::tuple<Handler> handler)
00199               {
00200                 if (e)
00201                   gzerr << "Error Reading data!\n";
00202 
00203                 // Inform caller that data has been received
00204                 std::string data(&this->inbound_data[0], 
00205                                   this->inbound_data.size());
00206                 this->inbound_data.clear();
00207 
00208                 if (data.empty())
00209                   gzerr << "OnReadData got empty data!!!\n";
00210 
00211                 if (!e && !transport::is_stopped()) 
00212                 {
00213                   boost::get<0>(handler)(data);
00214                 }
00215               }
00216 
00217      public: event::ConnectionPtr ConnectToShutdown( boost::function<void()> subscriber_ ) 
00218              { return this->shutdown.Connect(subscriber_); }
00219 
00220      public: void DisconnectShutdown( event::ConnectionPtr subscriber_)
00221              {this->shutdown.Disconnect(subscriber_);}
00222 
00223 
00225      public: void ProcessWriteQueue();
00226 
00227      private: void OnWrite(const boost::system::error_code &e,
00228                   boost::asio::streambuf *_b);
00229            //std::list<boost::asio::const_buffer> *_buffer);
00230 
00232      private: void OnAccept(const boost::system::error_code &e);
00233 
00235      private: std::size_t ParseHeader( const std::string &header );
00236 
00238      private: void ReadLoop(const ReadCallback &cb);
00239 
00241      private: boost::asio::ip::tcp::endpoint GetLocalEndpoint() const;
00242 
00244      private: boost::asio::ip::tcp::endpoint GetRemoteEndpoint() const;
00245 
00246      private: static std::string GetHostname(boost::asio::ip::tcp::endpoint ep);
00247 
00248       private: boost::asio::ip::tcp::socket *socket;
00249       private: boost::asio::ip::tcp::acceptor *acceptor;
00250 
00251       private: std::deque<std::string> writeQueue;
00252       private: std::deque<unsigned int> writeCounts;
00253       private: boost::recursive_mutex *writeMutex;
00254       private: boost::recursive_mutex *readMutex;
00255 
00256       // Called when a new connection is received
00257       private: AcceptCallback acceptCB;
00258 
00259       //private: char inbound_header[HEADER_LENGTH];
00260       private: std::vector<char> inbound_header;
00261       private: std::vector<char> inbound_data;
00262 
00263       private: boost::thread *readThread;
00264       private: bool readQuit;
00265 
00266       public: unsigned int id;
00267       private: static unsigned int idCounter;
00268       private: ConnectionPtr acceptConn;
00269 
00270       private: event::EventT<void()> shutdown;
00271       private: static IOManager *iomanager;
00272 
00273       public: unsigned int writeCount;
00274 
00275       private: std::string localURI;
00276       private: std::string localAddress;
00277       private: std::string remoteURI;
00278       private: std::string remoteAddress;
00279     };
00281   }
00282 }
00283 
00284 #endif