TopicManager.hh
00001 /* 00002 * Copyright 2011 Nate Koenig & Andrew Howard 00003 * 00004 * Licensed under the Apache License, Version 2.0 (the "License"); 00005 * you may not use this file except in compliance with the License. 00006 * You may obtain a copy of the License at 00007 * 00008 * http://www.apache.org/licenses/LICENSE-2.0 00009 * 00010 * Unless required by applicable law or agreed to in writing, software 00011 * distributed under the License is distributed on an "AS IS" BASIS, 00012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00013 * See the License for the specific language governing permissions and 00014 * limitations under the License. 00015 * 00016 */ 00017 #ifndef TOPICMANAGER_HH 00018 #define TOPICMANAGER_HH 00019 00020 #include <map> 00021 #include <list> 00022 #include <boost/bind.hpp> 00023 00024 #include "common/Exception.hh" 00025 #include "msgs/msgs.h" 00026 #include "common/SingletonT.hh" 00027 00028 #include "transport/TransportTypes.hh" 00029 #include "transport/SubscribeOptions.hh" 00030 #include "transport/SubscriptionTransport.hh" 00031 #include "transport/PublicationTransport.hh" 00032 #include "transport/ConnectionManager.hh" 00033 //#include "transport/Connection.hh" 00034 #include "transport/Publisher.hh" 00035 #include "transport/Publication.hh" 00036 #include "transport/Subscriber.hh" 00037 00038 namespace gazebo 00039 { 00040 namespace transport 00041 { 00044 00045 00047 class TopicManager : public SingletonT<TopicManager> 00048 { 00049 private: TopicManager(); 00050 private: virtual ~TopicManager(); 00051 00052 public: void Init(); 00053 00054 public: void Fini(); 00055 00056 public: PublicationPtr FindPublication(const std::string &topic); 00057 00058 public: void AddNode( NodePtr _node ); 00059 00060 public: void RemoveNode( unsigned int _id ); 00061 00062 public: void ProcessNodes(); 00063 00067 public: bool IsAdvertised(const std::string &_topic); 00068 00070 public: SubscriberPtr Subscribe(const SubscribeOptions &options); 00071 00074 public: void Unsubscribe(const std::string &topic, 00075 const CallbackHelperPtr &_sub); 00076 00077 public: void Unsubscribe(const std::string &_topic, const NodePtr &_sub); 00078 00081 public: template<typename M> 00082 PublisherPtr Advertise(const std::string &_topic, 00083 unsigned int _queueLimit, 00084 bool _latch) 00085 { 00086 google::protobuf::Message *msg = NULL; 00087 M msgtype; 00088 msg = dynamic_cast<google::protobuf::Message *>(&msgtype); 00089 if (!msg) 00090 gzthrow("Advertise requires a google protobuf type"); 00091 00092 this->UpdatePublications(_topic, msg->GetTypeName()); 00093 00094 PublisherPtr pub = PublisherPtr(new Publisher(_topic, 00095 msg->GetTypeName(), _queueLimit, _latch)); 00096 00097 std::string msgTypename; 00098 PublicationPtr publication; 00099 00100 // Connect all local subscription to the publisher 00101 for (int i=0; i < 2; i ++) 00102 { 00103 std::string t; 00104 if (i==0) 00105 { 00106 t = _topic; 00107 msgTypename = msg->GetTypeName(); 00108 } 00109 else 00110 { 00111 t = _topic + "/__dbg"; 00112 msgs::String tmp; 00113 msgTypename = tmp.GetTypeName(); 00114 } 00115 00116 publication = this->FindPublication(t); 00117 publication->AddPublisher(pub); 00118 if (!publication->GetLocallyAdvertised()) 00119 { 00120 ConnectionManager::Instance()->Advertise(t, msgTypename); 00121 } 00122 00123 publication->SetLocallyAdvertised(true); 00124 pub->SetPublication(publication, i); 00125 00126 SubNodeMap::iterator iter2; 00127 SubNodeMap::iterator st_end2 = this->subscribedNodes.end(); 00128 for (iter2 = this->subscribedNodes.begin(); 00129 iter2 != st_end2; iter2++) 00130 { 00131 if (iter2->first == t) 00132 { 00133 std::list<NodePtr>::iterator liter; 00134 std::list<NodePtr>::iterator l_end = iter2->second.end(); 00135 for (liter = iter2->second.begin(); 00136 liter != l_end; liter++) 00137 { 00138 publication->AddSubscription(*liter); 00139 } 00140 } 00141 } 00142 } 00143 00144 return pub; 00145 } 00146 00148 public: void Unadvertise(const std::string &topic); 00149 00155 public: void Publish( const std::string &topic, 00156 const google::protobuf::Message &message, 00157 const boost::function<void()> &cb = NULL); 00158 00160 public: void ConnectPubToSub( const std::string &topic, 00161 const SubscriptionTransportPtr &sublink ); 00162 00164 public: void ConnectSubToPub( const msgs::Publish &_pub ); 00165 00167 public: void DisconnectPubFromSub( const std::string &topic, 00168 const std::string &host, 00169 unsigned int port); 00170 00172 public: void DisconnectSubFromPub( const std::string &topic, 00173 const std::string &host, 00174 unsigned int port); 00175 00177 public: void ConnectSubscribers(const std::string &topic); 00178 00181 public: PublicationPtr UpdatePublications( const std::string &topic, 00182 const std::string &msgType ); 00183 00185 public: void RegisterTopicNamespace(const std::string &_name); 00186 00188 public: void GetTopicNamespaces(std::list<std::string> &_namespaces); 00189 00190 public: void ClearBuffers(); 00191 00192 public: void PauseIncoming(bool _pause); 00193 00194 private: void HandleIncoming(); 00195 00196 typedef std::map<std::string, std::list<NodePtr> > SubNodeMap; 00197 00198 private: typedef std::map<std::string, PublicationPtr> PublicationPtr_M; 00199 private: PublicationPtr_M advertisedTopics; 00200 private: PublicationPtr_M::iterator advertisedTopicsEnd; 00201 private: SubNodeMap subscribedNodes; 00202 private: std::vector<NodePtr> nodes; 00203 00204 private: boost::recursive_mutex *nodeMutex; 00205 00206 private: bool pauseIncoming; 00207 00208 //Singleton implementation 00209 private: friend class SingletonT<TopicManager>; 00210 }; 00212 } 00213 } 00214 00215 #endif 00216

1.7.5.1