TopicManager.hh
00001 /*
00002  * Copyright 2011 Nate Koenig & Andrew Howard
00003  *
00004  * Licensed under the Apache License, Version 2.0 (the "License");
00005  * you may not use this file except in compliance with the License.
00006  * You may obtain a copy of the License at
00007  *
00008  *     http://www.apache.org/licenses/LICENSE-2.0
00009  *
00010  * Unless required by applicable law or agreed to in writing, software
00011  * distributed under the License is distributed on an "AS IS" BASIS,
00012  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  * See the License for the specific language governing permissions and
00014  * limitations under the License.
00015  *
00016 */
00017 #ifndef TOPICMANAGER_HH
00018 #define TOPICMANAGER_HH
00019 
00020 #include <map>
00021 #include <list>
00022 #include <boost/bind.hpp>
00023 
00024 #include "common/Exception.hh"
00025 #include "msgs/msgs.h"
00026 #include "common/SingletonT.hh"
00027 
00028 #include "transport/TransportTypes.hh"
00029 #include "transport/SubscribeOptions.hh"
00030 #include "transport/SubscriptionTransport.hh"
00031 #include "transport/PublicationTransport.hh"
00032 #include "transport/ConnectionManager.hh"
00033 //#include "transport/Connection.hh"
00034 #include "transport/Publisher.hh"
00035 #include "transport/Publication.hh"
00036 #include "transport/Subscriber.hh"
00037 
00038 namespace gazebo
00039 {
00040   namespace transport
00041   {
00044 
00045 
00047     class TopicManager : public SingletonT<TopicManager>
00048     {
00049       private: TopicManager();
00050       private: virtual ~TopicManager();
00051 
00052       public: void Init();
00053 
00054       public: void Fini();
00055 
00056       public: PublicationPtr FindPublication(const std::string &topic);
00057 
00058       public: void AddNode( NodePtr _node );
00059 
00060       public: void RemoveNode( unsigned int _id );
00061 
00062       public: void ProcessNodes();
00063 
00067       public: bool IsAdvertised(const std::string &_topic);
00068 
00070       public: SubscriberPtr Subscribe(const SubscribeOptions &options);
00071 
00074       public: void Unsubscribe(const std::string &topic, 
00075                                const CallbackHelperPtr &_sub);
00076 
00077       public: void Unsubscribe(const std::string &_topic, const NodePtr &_sub);
00078 
00081       public: template<typename M>
00082               PublisherPtr Advertise(const std::string &_topic,
00083                                      unsigned int _queueLimit,
00084                                      bool _latch)
00085               {
00086                 google::protobuf::Message *msg = NULL;
00087                 M msgtype;
00088                 msg = dynamic_cast<google::protobuf::Message *>(&msgtype);
00089                 if (!msg)
00090                   gzthrow("Advertise requires a google protobuf type");
00091 
00092                 this->UpdatePublications(_topic, msg->GetTypeName());
00093               
00094                 PublisherPtr pub = PublisherPtr(new Publisher(_topic,
00095                       msg->GetTypeName(), _queueLimit, _latch));
00096 
00097                 std::string msgTypename;
00098                 PublicationPtr publication;
00099 
00100                 // Connect all local subscription to the publisher
00101                 for (int i=0; i < 2; i ++)
00102                 { 
00103                   std::string t;
00104                   if (i==0)
00105                   {
00106                     t = _topic;
00107                     msgTypename = msg->GetTypeName();
00108                   }
00109                   else
00110                   {
00111                     t = _topic + "/__dbg";
00112                     msgs::String tmp;
00113                     msgTypename = tmp.GetTypeName();
00114                   }
00115 
00116                   publication = this->FindPublication(t);
00117                   publication->AddPublisher(pub);
00118                   if (!publication->GetLocallyAdvertised())
00119                   {
00120                     ConnectionManager::Instance()->Advertise(t, msgTypename);
00121                   }
00122 
00123                   publication->SetLocallyAdvertised(true);
00124                   pub->SetPublication(publication, i);
00125 
00126                   SubNodeMap::iterator iter2;
00127                   SubNodeMap::iterator st_end2 = this->subscribedNodes.end();
00128                   for (iter2 = this->subscribedNodes.begin(); 
00129                        iter2 != st_end2; iter2++)
00130                   {
00131                     if (iter2->first == t)
00132                     {
00133                       std::list<NodePtr>::iterator liter;
00134                       std::list<NodePtr>::iterator l_end = iter2->second.end();
00135                       for (liter = iter2->second.begin(); 
00136                            liter != l_end; liter++)
00137                       {
00138                         publication->AddSubscription(*liter);
00139                       }
00140                     }
00141                   }
00142                 }
00143 
00144                 return pub;
00145               }
00146 
00148       public: void Unadvertise(const std::string &topic);
00149 
00155       public: void Publish( const std::string &topic, 
00156                             const google::protobuf::Message &message,
00157                             const boost::function<void()> &cb = NULL);
00158 
00160       public: void ConnectPubToSub( const std::string &topic,
00161                                     const SubscriptionTransportPtr &sublink );
00162 
00164       public: void ConnectSubToPub( const msgs::Publish &_pub );
00165 
00167       public: void DisconnectPubFromSub( const std::string &topic, 
00168                                          const std::string &host, 
00169                                          unsigned int port);
00170 
00172       public: void DisconnectSubFromPub( const std::string &topic, 
00173                                          const std::string &host, 
00174                                          unsigned int port);
00175 
00177       public: void ConnectSubscribers(const std::string &topic);
00178 
00181       public: PublicationPtr UpdatePublications( const std::string &topic, 
00182                                                  const std::string &msgType );
00183 
00185       public: void RegisterTopicNamespace(const std::string &_name);
00186 
00188       public: void GetTopicNamespaces(std::list<std::string> &_namespaces);
00189 
00190       public: void ClearBuffers();
00191 
00192       public: void PauseIncoming(bool _pause);
00193 
00194       private: void HandleIncoming();
00195 
00196       typedef std::map<std::string, std::list<NodePtr> > SubNodeMap;
00197 
00198       private: typedef std::map<std::string, PublicationPtr> PublicationPtr_M;
00199       private: PublicationPtr_M advertisedTopics;
00200       private: PublicationPtr_M::iterator advertisedTopicsEnd;
00201       private: SubNodeMap subscribedNodes; 
00202       private: std::vector<NodePtr> nodes;
00203 
00204       private: boost::recursive_mutex *nodeMutex;
00205 
00206       private: bool pauseIncoming;
00207 
00208       //Singleton implementation
00209       private: friend class SingletonT<TopicManager>;
00210     };
00212   }
00213 }
00214 
00215 #endif
00216