Node.hh
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2014 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 
18 #ifndef __IGN_TRANSPORT_NODE_HH_INCLUDED__
19 #define __IGN_TRANSPORT_NODE_HH_INCLUDED__
20 
21 #ifdef _MSC_VER
22 #pragma warning(push, 0)
23 #endif
24 #include <google/protobuf/message.h>
25 #ifdef _MSC_VER
26 #pragma warning(pop)
27 #endif
28 
29 #include <algorithm>
30 #include <functional>
31 #include <map>
32 #include <memory>
33 #include <mutex>
34 #include <string>
35 #include <unordered_set>
36 #include <vector>
37 
48 
49 namespace ignition
50 {
51  namespace transport
52  {
53  class NodePrivate;
54 
59 
65  {
68  public: Node(const NodeOptions &_options = NodeOptions());
69 
71  public: virtual ~Node();
72 
78  public: template<typename T> bool Advertise(const std::string &_topic,
79  const AdvertiseOptions &_options = AdvertiseOptions())
80  {
81  std::string fullyQualifiedTopic;
82  if (!TopicUtils::FullyQualifiedName(this->Options().Partition(),
83  this->Options().NameSpace(), _topic, fullyQualifiedTopic))
84  {
85  std::cerr << "Topic [" << _topic << "] is not valid." << std::endl;
86  return false;
87  }
88 
89  std::lock_guard<std::recursive_mutex> lk(this->Shared()->mutex);
90 
91  // Add the topic to the list of advertised topics (if it was not before)
92  this->TopicsAdvertised().insert(fullyQualifiedTopic);
93 
94  // Notify the discovery service to register and advertise my topic.
95  MessagePublisher publisher(fullyQualifiedTopic,
96  this->Shared()->myAddress,
97  this->Shared()->myControlAddress,
98  this->Shared()->pUuid, this->NodeUuid(), _options.Scope(),
99  T().GetTypeName());
100 
101  if (!this->Shared()->discovery->AdvertiseMsg(publisher))
102  {
103  std::cerr << "Node::Advertise(): Error advertising a topic. "
104  << "Did you forget to start the discovery service?"
105  << std::endl;
106  return false;
107  }
108 
109  return true;
110  }
111 
114  public: std::vector<std::string> AdvertisedTopics() const;
115 
119  public: bool Unadvertise(const std::string &_topic);
120 
125  public: bool Publish(const std::string &_topic,
126  const ProtoMsg &_msg);
127 
135  public: template<typename T> bool Subscribe(
136  const std::string &_topic,
137  void(*_cb)(const T &_msg))
138  {
139  std::function<void(const T &)> f = [_cb](const T & _internalMsg)
140  {
141  (*_cb)(_internalMsg);
142  };
143 
144  return this->Subscribe<T>(_topic, f);
145  }
146 
153  public: template<typename T> bool Subscribe(
154  const std::string &_topic,
155  std::function<void(const T &_msg)> &_cb)
156  {
157  std::string fullyQualifiedTopic;
158  if (!TopicUtils::FullyQualifiedName(this->Options().Partition(),
159  this->Options().NameSpace(), _topic, fullyQualifiedTopic))
160  {
161  std::cerr << "Topic [" << _topic << "] is not valid." << std::endl;
162  return false;
163  }
164 
165  // Create a new subscription handler.
166  std::shared_ptr<SubscriptionHandler<T>> subscrHandlerPtr(
167  new SubscriptionHandler<T>(this->NodeUuid()));
168 
169  // Insert the callback into the handler.
170  subscrHandlerPtr->SetCallback(_cb);
171 
172  std::lock_guard<std::recursive_mutex> lk(this->Shared()->mutex);
173 
174  // Store the subscription handler. Each subscription handler is
175  // associated with a topic. When the receiving thread gets new data,
176  // it will recover the subscription handler associated to the topic and
177  // will invoke the callback.
178  this->Shared()->localSubscriptions.AddHandler(
179  fullyQualifiedTopic, this->NodeUuid(), subscrHandlerPtr);
180 
181  // Add the topic to the list of subscribed topics (if it was not before)
182  this->TopicsSubscribed().insert(fullyQualifiedTopic);
183 
184  // Discover the list of nodes that publish on the topic.
185  if (!this->Shared()->discovery->DiscoverMsg(fullyQualifiedTopic))
186  {
187  std::cerr << "Node::Subscribe(): Error discovering a topic. "
188  << "Did you forget to start the discovery service?"
189  << std::endl;
190  return false;
191  }
192 
193  return true;
194  }
195 
204  public: template<typename C, typename T> bool Subscribe(
205  const std::string &_topic,
206  void(C::*_cb)(const T &_msg),
207  C *_obj)
208  {
209  std::function<void(const T &)> f = [_cb, _obj](const T & _internalMsg)
210  {
211  auto cb = std::bind(_cb, _obj, std::placeholders::_1);
212  cb(_internalMsg);
213  };
214 
215  return this->Subscribe<T>(_topic, f);
216  }
217 
223  public: std::vector<std::string> SubscribedTopics() const;
224 
228  public: bool Unsubscribe(const std::string &_topic);
229 
242  public: template<typename T1, typename T2> bool Advertise(
243  const std::string &_topic,
244  void(*_cb)(const T1 &_req, T2 &_rep, bool &_result),
245  const AdvertiseOptions &_options = AdvertiseOptions())
246  {
247  std::function<void(const T1 &, T2 &, bool &)> f =
248  [_cb](const T1 &_internalReq, T2 &_internalRep, bool &_internalResult)
249  {
250  (*_cb)(_internalReq, _internalRep, _internalResult);
251  };
252 
253  return this->Advertise<T1, T2>(_topic, f, _options);
254  }
255 
268  public: template<typename T1, typename T2> bool Advertise(
269  const std::string &_topic,
270  std::function<void(const T1 &_req, T2 &_rep, bool &_result)> &_cb,
271  const AdvertiseOptions &_options = AdvertiseOptions())
272  {
273  std::string fullyQualifiedTopic;
274  if (!TopicUtils::FullyQualifiedName(this->Options().Partition(),
275  this->Options().NameSpace(), _topic, fullyQualifiedTopic))
276  {
277  std::cerr << "Topic [" << _topic << "] is not valid." << std::endl;
278  return false;
279  }
280 
281  // Create a new service reply handler.
282  std::shared_ptr<RepHandler<T1, T2>> repHandlerPtr(
283  new RepHandler<T1, T2>());
284 
285  // Insert the callback into the handler.
286  repHandlerPtr->SetCallback(_cb);
287 
288  std::lock_guard<std::recursive_mutex> lk(this->Shared()->mutex);
289 
290  // Add the topic to the list of advertised services.
291  this->SrvsAdvertised().insert(fullyQualifiedTopic);
292 
293  // Store the replier handler. Each replier handler is
294  // associated with a topic. When the receiving thread gets new requests,
295  // it will recover the replier handler associated to the topic and
296  // will invoke the service call.
297  this->Shared()->repliers.AddHandler(
298  fullyQualifiedTopic, this->NodeUuid(), repHandlerPtr);
299 
300  // Notify the discovery service to register and advertise my responser.
301  ServicePublisher publisher(fullyQualifiedTopic,
302  this->Shared()->myReplierAddress,
303  this->Shared()->replierId.ToString(),
304  this->Shared()->pUuid, this->NodeUuid(), _options.Scope(),
305  T1().GetTypeName(), T2().GetTypeName());
306 
307  if (!this->Shared()->discovery->AdvertiseSrv(publisher))
308  {
309  std::cerr << "Node::Advertise(): Error advertising a service. "
310  << "Did you forget to start the discovery service?"
311  << std::endl;
312  return false;
313  }
314 
315  return true;
316  }
317 
331  public: template<typename C, typename T1, typename T2> bool Advertise(
332  const std::string &_topic,
333  void(C::*_cb)(const T1 &_req, T2 &_rep, bool &_result),
334  C *_obj,
335  const AdvertiseOptions &_options = AdvertiseOptions())
336  {
337  std::function<void(const T1 &, T2 &, bool &)> f =
338  [_cb, _obj](const T1 &_internalReq,
339  T2 &_internalRep,
340  bool &_internalResult)
341  {
342  auto cb = std::bind(_cb, _obj, std::placeholders::_1,
343  std::placeholders::_2, std::placeholders::_3);
344  cb(_internalReq, _internalRep, _internalResult);
345  };
346 
347  return this->Advertise<T1, T2>(_topic, f, _options);
348  }
349 
352  public: std::vector<std::string> AdvertisedServices() const;
353 
364  public: template<typename T1, typename T2> bool Request(
365  const std::string &_topic,
366  const T1 &_req,
367  void(*_cb)(const T2 &_rep, const bool _result))
368  {
369  std::function<void(const T2 &, const bool)> f =
370  [_cb](const T2 &_internalRep, const bool _internalResult)
371  {
372  (*_cb)(_internalRep, _internalResult);
373  };
374 
375  return this->Request<T1, T2>(_topic, _req, f);
376  }
377 
388  public: template<typename T1, typename T2> bool Request(
389  const std::string &_topic,
390  const T1 &_req,
391  std::function<void(const T2 &_rep, const bool _result)> &_cb)
392  {
393  std::string fullyQualifiedTopic;
394  if (!TopicUtils::FullyQualifiedName(this->Options().Partition(),
395  this->Options().NameSpace(), _topic, fullyQualifiedTopic))
396  {
397  std::cerr << "Topic [" << _topic << "] is not valid." << std::endl;
398  return false;
399  }
400 
401  bool localResponserFound;
402  IRepHandlerPtr repHandler;
403  {
404  std::lock_guard<std::recursive_mutex> lk(this->Shared()->mutex);
405  localResponserFound = this->Shared()->repliers.FirstHandler(
406  fullyQualifiedTopic, T1().GetTypeName(), T2().GetTypeName(),
407  repHandler);
408  }
409 
410  // If the responser is within my process.
411  if (localResponserFound)
412  {
413  // There is a responser in my process, let's use it.
414  T2 rep;
415  bool result;
416  repHandler->RunLocalCallback(_req, rep, result);
417 
418  _cb(rep, result);
419  return true;
420  }
421 
422  // Create a new request handler.
423  std::shared_ptr<ReqHandler<T1, T2>> reqHandlerPtr(
424  new ReqHandler<T1, T2>(this->NodeUuid()));
425 
426  // Insert the request's parameters.
427  reqHandlerPtr->SetMessage(_req);
428 
429  // Insert the callback into the handler.
430  reqHandlerPtr->SetCallback(_cb);
431 
432  {
433  std::lock_guard<std::recursive_mutex> lk(this->Shared()->mutex);
434 
435  // Store the request handler.
436  this->Shared()->requests.AddHandler(
437  fullyQualifiedTopic, this->NodeUuid(), reqHandlerPtr);
438 
439  // If the responser's address is known, make the request.
440  SrvAddresses_M addresses;
441  if (this->Shared()->discovery->SrvPublishers(
442  fullyQualifiedTopic, addresses))
443  {
444  this->Shared()->SendPendingRemoteReqs(fullyQualifiedTopic,
445  T1().GetTypeName(), T2().GetTypeName());
446  }
447  else
448  {
449  // Discover the service responser.
450  if (!this->Shared()->discovery->DiscoverSrv(fullyQualifiedTopic))
451  {
452  std::cerr << "Node::Request(): Error discovering a service. "
453  << "Did you forget to start the discovery service?"
454  << std::endl;
455  return false;
456  }
457  }
458  }
459 
460  return true;
461  }
462 
474  public: template<typename C, typename T1, typename T2> bool Request(
475  const std::string &_topic,
476  const T1 &_req,
477  void(C::*_cb)(const T2 &_rep, const bool _result),
478  C *_obj)
479  {
480  std::function<void(const T2 &, const bool)> f =
481  [_cb, _obj](const T2 &_internalRep, const bool _internalResult)
482  {
483  auto cb = std::bind(_cb, _obj, std::placeholders::_1,
484  std::placeholders::_2);
485  cb(_internalRep, _internalResult);
486  };
487 
488  return this->Request<T1, T2>(_topic, _req, f);
489  }
490 
499  public: template<typename T1, typename T2> bool Request(
500  const std::string &_topic,
501  const T1 &_req,
502  const unsigned int &_timeout,
503  T2 &_rep,
504  bool &_result)
505  {
506  std::string fullyQualifiedTopic;
507  if (!TopicUtils::FullyQualifiedName(this->Options().Partition(),
508  this->Options().NameSpace(), _topic, fullyQualifiedTopic))
509  {
510  std::cerr << "Topic [" << _topic << "] is not valid." << std::endl;
511  return false;
512  }
513 
514  // Create a new request handler.
515  std::shared_ptr<ReqHandler<T1, T2>> reqHandlerPtr(
516  new ReqHandler<T1, T2>(this->NodeUuid()));
517 
518  // Insert the request's parameters.
519  reqHandlerPtr->SetMessage(_req);
520 
521  std::unique_lock<std::recursive_mutex> lk(this->Shared()->mutex);
522 
523  // If the responser is within my process.
524  IRepHandlerPtr repHandler;
525  if (this->Shared()->repliers.FirstHandler(fullyQualifiedTopic,
526  T1().GetTypeName(), T2().GetTypeName(), repHandler))
527  {
528  // There is a responser in my process, let's use it.
529  repHandler->RunLocalCallback(_req, _rep, _result);
530  return true;
531  }
532 
533  // Store the request handler.
534  this->Shared()->requests.AddHandler(
535  fullyQualifiedTopic, this->NodeUuid(), reqHandlerPtr);
536 
537  // If the responser's address is known, make the request.
538  SrvAddresses_M addresses;
539  if (this->Shared()->discovery->SrvPublishers(
540  fullyQualifiedTopic, addresses))
541  {
542  this->Shared()->SendPendingRemoteReqs(fullyQualifiedTopic,
543  T1().GetTypeName(), T2().GetTypeName());
544  }
545  else
546  {
547  // Discover the service responser.
548  if (!this->Shared()->discovery->DiscoverSrv(fullyQualifiedTopic))
549  {
550  std::cerr << "Node::Request(): Error discovering a service. "
551  << "Did you forget to start the discovery service?"
552  << std::endl;
553  return false;
554  }
555  }
556 
557  // Wait until the REP is available.
558  bool executed = reqHandlerPtr->WaitUntil(lk, _timeout);
559 
560  // The request was not executed.
561  if (!executed)
562  return false;
563 
564  // The request was executed but did not succeed.
565  if (!reqHandlerPtr->Result())
566  {
567  _result = false;
568  return true;
569  }
570 
571  // Parse the response.
572  if (!_rep.ParseFromString(reqHandlerPtr->Response()))
573  {
574  std::cerr << "Node::Request(): Error Parsing the response"
575  << std::endl;
576  _result = false;
577  return true;
578  }
579 
580  _result = true;
581  return true;
582  }
583 
587  public: bool UnadvertiseSrv(const std::string &_topic);
588 
595  public: void TopicList(std::vector<std::string> &_topics) const;
596 
601  public: bool TopicInfo(const std::string &_topic,
602  std::vector<MessagePublisher> &_publishers) const;
603 
610  public: void ServiceList(std::vector<std::string> &_services) const;
611 
616  public: bool ServiceInfo(const std::string &_service,
617  std::vector<ServicePublisher> &_publishers) const;
618 
621  private: const std::string &Partition() const;
622 
625  private: const std::string &NameSpace() const;
626 
630  private: NodeShared *Shared() const;
631 
634  private: const std::string &NodeUuid() const;
635 
638  private: std::unordered_set<std::string> &TopicsAdvertised() const;
639 
642  private: std::unordered_set<std::string> &TopicsSubscribed() const;
643 
646  private: std::unordered_set<std::string> &SrvsAdvertised() const;
647 
650  private: NodeOptions &Options() const;
651 
654  protected: std::unique_ptr<transport::NodePrivate> dataPtr;
655  };
656  }
657 }
658 #endif
bool Advertise(const std::string &_topic, const AdvertiseOptions &_options=AdvertiseOptions())
Advertise a new topic.
Definition: Node.hh:78
static bool FullyQualifiedName(const std::string &_partition, const std::string &_ns, const std::string &_topic, std::string &_name)
Get the full topic path given a namespace and a topic name.
#define IGNITION_VISIBLE
Use to represent "symbol visible" if supported.
Definition: Helpers.hh:56
bool Request(const std::string &_topic, const T1 &_req, void(*_cb)(const T2 &_rep, const bool _result))
Request a new service using a non-blocking call.
Definition: Node.hh:364
A class for customizing the behavior of the Node.
Definition: NodeOptions.hh:35
std::map< std::string, std::vector< ServicePublisher >> SrvAddresses_M
Definition: TransportTypes.hh:58
It creates a reply handler for the specific protobuf messages used.
Definition: ReqHandler.hh:175
bool Subscribe(const std::string &_topic, void(*_cb)(const T &_msg))
Subscribe to a topic registering a callback.
Definition: Node.hh:135
ignition/transport/AdvertiseOptions.hh
Definition: AdvertiseOptions.hh:50
bool Advertise(const std::string &_topic, std::function< void(const T1 &_req, T2 &_rep, bool &_result)> &_cb, const AdvertiseOptions &_options=AdvertiseOptions())
Advertise a new service.
Definition: Node.hh:268
Private data for the Node class.
Definition: NodeShared.hh:53
google::protobuf::Message ProtoMsg
Definition: TransportTypes.hh:62
std::shared_ptr< IRepHandler > IRepHandlerPtr
Definition: TransportTypes.hh:82
bool Request(const std::string &_topic, const T1 &_req, std::function< void(const T2 &_rep, const bool _result)> &_cb)
Request a new service using a non-blocking call.
Definition: Node.hh:388
bool Advertise(const std::string &_topic, void(C::*_cb)(const T1 &_req, T2 &_rep, bool &_result), C *_obj, const AdvertiseOptions &_options=AdvertiseOptions())
Advertise a new service.
Definition: Node.hh:331
bool Subscribe(const std::string &_topic, std::function< void(const T &_msg)> &_cb)
Subscribe to a topic registering a callback.
Definition: Node.hh:153
with the service response.
Definition: RepHandler.hh:98
std::unique_ptr< transport::NodePrivate > dataPtr
Definition: Node.hh:654
A class that allows a client to communicate with other peers.
Definition: Node.hh:64
ignition/transport/Publisher.hh
Definition: Publisher.hh:175
ignition/transport/Publisher.hh
Definition: Publisher.hh:264
bool Advertise(const std::string &_topic, void(*_cb)(const T1 &_req, T2 &_rep, bool &_result), const AdvertiseOptions &_options=AdvertiseOptions())
Advertise a new service.
Definition: Node.hh:242
It creates a subscription handler for a specific protobuf message.
Definition: SubscriptionHandler.hh:102
bool Request(const std::string &_topic, const T1 &_req, const unsigned int &_timeout, T2 &_rep, bool &_result)
Request a new service using a blocking call.
Definition: Node.hh:499
Definition: AdvertiseOptions.hh:25
bool Request(const std::string &_topic, const T1 &_req, void(C::*_cb)(const T2 &_rep, const bool _result), C *_obj)
Request a new service using a non-blocking call.
Definition: Node.hh:474
IGNITION_VISIBLE void waitForShutdown()
Block the current thread until a SIGINT or SIGTERM is received.
bool Subscribe(const std::string &_topic, void(C::*_cb)(const T &_msg), C *_obj)
Subscribe to a topic registering a callback.
Definition: Node.hh:204