18 #ifndef __IGN_TRANSPORT_NODE_HH_INCLUDED__ 19 #define __IGN_TRANSPORT_NODE_HH_INCLUDED__ 22 #pragma warning(push, 0) 24 #include <google/protobuf/message.h> 35 #include <unordered_set> 71 public:
virtual ~
Node();
78 public:
template<
typename T>
bool Advertise(
const std::string &_topic,
81 std::string fullyQualifiedTopic;
83 this->Options().NameSpace(), _topic, fullyQualifiedTopic))
85 std::cerr <<
"Topic [" << _topic <<
"] is not valid." << std::endl;
89 std::lock_guard<std::recursive_mutex> lk(this->Shared()->mutex);
92 this->TopicsAdvertised().insert(fullyQualifiedTopic);
96 this->Shared()->myAddress,
97 this->Shared()->myControlAddress,
98 this->Shared()->pUuid, this->NodeUuid(), _options.Scope(),
101 if (!this->Shared()->discovery->AdvertiseMsg(publisher))
103 std::cerr <<
"Node::Advertise(): Error advertising a topic. " 104 <<
"Did you forget to start the discovery service?" 114 public: std::vector<std::string> AdvertisedTopics()
const;
119 public:
bool Unadvertise(
const std::string &_topic);
125 public:
bool Publish(
const std::string &_topic,
136 const std::string &_topic,
137 void(*_cb)(
const T &_msg))
139 std::function<void(const T &)> f = [_cb](
const T & _internalMsg)
141 (*_cb)(_internalMsg);
144 return this->Subscribe<T>(_topic, f);
154 const std::string &_topic,
155 std::function<
void(
const T &_msg)> &_cb)
157 std::string fullyQualifiedTopic;
159 this->Options().NameSpace(), _topic, fullyQualifiedTopic))
161 std::cerr <<
"Topic [" << _topic <<
"] is not valid." << std::endl;
166 std::shared_ptr<SubscriptionHandler<T>> subscrHandlerPtr(
170 subscrHandlerPtr->SetCallback(_cb);
172 std::lock_guard<std::recursive_mutex> lk(this->Shared()->mutex);
178 this->Shared()->localSubscriptions.AddHandler(
179 fullyQualifiedTopic, this->NodeUuid(), subscrHandlerPtr);
182 this->TopicsSubscribed().insert(fullyQualifiedTopic);
185 if (!this->Shared()->discovery->DiscoverMsg(fullyQualifiedTopic))
187 std::cerr <<
"Node::Subscribe(): Error discovering a topic. " 188 <<
"Did you forget to start the discovery service?" 204 public:
template<
typename C,
typename T>
bool Subscribe(
205 const std::string &_topic,
206 void(C::*_cb)(
const T &_msg),
209 std::function<void(const T &)> f = [_cb, _obj](
const T & _internalMsg)
211 auto cb = std::bind(_cb, _obj, std::placeholders::_1);
215 return this->Subscribe<T>(_topic, f);
223 public: std::vector<std::string> SubscribedTopics()
const;
228 public:
bool Unsubscribe(
const std::string &_topic);
242 public:
template<
typename T1,
typename T2>
bool Advertise(
243 const std::string &_topic,
244 void(*_cb)(
const T1 &_req, T2 &_rep,
bool &_result),
247 std::function<void(const T1 &, T2 &, bool &)> f =
248 [_cb](
const T1 &_internalReq, T2 &_internalRep,
bool &_internalResult)
250 (*_cb)(_internalReq, _internalRep, _internalResult);
253 return this->Advertise<T1, T2>(_topic, f, _options);
268 public:
template<
typename T1,
typename T2>
bool Advertise(
269 const std::string &_topic,
270 std::function<
void(
const T1 &_req, T2 &_rep,
bool &_result)> &_cb,
273 std::string fullyQualifiedTopic;
275 this->Options().NameSpace(), _topic, fullyQualifiedTopic))
277 std::cerr <<
"Topic [" << _topic <<
"] is not valid." << std::endl;
282 std::shared_ptr<RepHandler<T1, T2>> repHandlerPtr(
286 repHandlerPtr->SetCallback(_cb);
288 std::lock_guard<std::recursive_mutex> lk(this->Shared()->mutex);
291 this->SrvsAdvertised().insert(fullyQualifiedTopic);
297 this->Shared()->repliers.AddHandler(
298 fullyQualifiedTopic, this->NodeUuid(), repHandlerPtr);
302 this->Shared()->myReplierAddress,
303 this->Shared()->replierId.ToString(),
304 this->Shared()->pUuid, this->NodeUuid(), _options.Scope(),
305 T1().GetTypeName(), T2().GetTypeName());
307 if (!this->Shared()->discovery->AdvertiseSrv(publisher))
309 std::cerr <<
"Node::Advertise(): Error advertising a service. " 310 <<
"Did you forget to start the discovery service?" 331 public:
template<
typename C,
typename T1,
typename T2>
bool Advertise(
332 const std::string &_topic,
333 void(C::*_cb)(
const T1 &_req, T2 &_rep,
bool &_result),
337 std::function<void(const T1 &, T2 &, bool &)> f =
338 [_cb, _obj](
const T1 &_internalReq,
340 bool &_internalResult)
342 auto cb = std::bind(_cb, _obj, std::placeholders::_1,
343 std::placeholders::_2, std::placeholders::_3);
344 cb(_internalReq, _internalRep, _internalResult);
347 return this->Advertise<T1, T2>(_topic, f, _options);
352 public: std::vector<std::string> AdvertisedServices()
const;
364 public:
template<
typename T1,
typename T2>
bool Request(
365 const std::string &_topic,
367 void(*_cb)(
const T2 &_rep,
const bool _result))
369 std::function<void(const T2 &, const bool)> f =
370 [_cb](
const T2 &_internalRep,
const bool _internalResult)
372 (*_cb)(_internalRep, _internalResult);
375 return this->Request<T1, T2>(_topic, _req, f);
388 public:
template<
typename T1,
typename T2>
bool Request(
389 const std::string &_topic,
391 std::function<
void(
const T2 &_rep,
const bool _result)> &_cb)
393 std::string fullyQualifiedTopic;
395 this->Options().NameSpace(), _topic, fullyQualifiedTopic))
397 std::cerr <<
"Topic [" << _topic <<
"] is not valid." << std::endl;
401 bool localResponserFound;
404 std::lock_guard<std::recursive_mutex> lk(this->Shared()->mutex);
405 localResponserFound = this->Shared()->repliers.FirstHandler(
406 fullyQualifiedTopic, T1().GetTypeName(), T2().GetTypeName(),
411 if (localResponserFound)
416 repHandler->RunLocalCallback(_req, rep, result);
423 std::shared_ptr<ReqHandler<T1, T2>> reqHandlerPtr(
427 reqHandlerPtr->SetMessage(_req);
430 reqHandlerPtr->SetCallback(_cb);
433 std::lock_guard<std::recursive_mutex> lk(this->Shared()->mutex);
436 this->Shared()->requests.AddHandler(
437 fullyQualifiedTopic, this->NodeUuid(), reqHandlerPtr);
441 if (this->Shared()->discovery->SrvPublishers(
442 fullyQualifiedTopic, addresses))
444 this->Shared()->SendPendingRemoteReqs(fullyQualifiedTopic,
445 T1().GetTypeName(), T2().GetTypeName());
450 if (!this->Shared()->discovery->DiscoverSrv(fullyQualifiedTopic))
452 std::cerr <<
"Node::Request(): Error discovering a service. " 453 <<
"Did you forget to start the discovery service?" 474 public:
template<
typename C,
typename T1,
typename T2>
bool Request(
475 const std::string &_topic,
477 void(C::*_cb)(
const T2 &_rep,
const bool _result),
480 std::function<void(const T2 &, const bool)> f =
481 [_cb, _obj](
const T2 &_internalRep,
const bool _internalResult)
483 auto cb = std::bind(_cb, _obj, std::placeholders::_1,
484 std::placeholders::_2);
485 cb(_internalRep, _internalResult);
488 return this->Request<T1, T2>(_topic, _req, f);
499 public:
template<
typename T1,
typename T2>
bool Request(
500 const std::string &_topic,
502 const unsigned int &_timeout,
506 std::string fullyQualifiedTopic;
508 this->Options().NameSpace(), _topic, fullyQualifiedTopic))
510 std::cerr <<
"Topic [" << _topic <<
"] is not valid." << std::endl;
515 std::shared_ptr<ReqHandler<T1, T2>> reqHandlerPtr(
519 reqHandlerPtr->SetMessage(_req);
521 std::unique_lock<std::recursive_mutex> lk(this->Shared()->mutex);
525 if (this->Shared()->repliers.FirstHandler(fullyQualifiedTopic,
526 T1().GetTypeName(), T2().GetTypeName(), repHandler))
529 repHandler->RunLocalCallback(_req, _rep, _result);
534 this->Shared()->requests.AddHandler(
535 fullyQualifiedTopic, this->NodeUuid(), reqHandlerPtr);
539 if (this->Shared()->discovery->SrvPublishers(
540 fullyQualifiedTopic, addresses))
542 this->Shared()->SendPendingRemoteReqs(fullyQualifiedTopic,
543 T1().GetTypeName(), T2().GetTypeName());
548 if (!this->Shared()->discovery->DiscoverSrv(fullyQualifiedTopic))
550 std::cerr <<
"Node::Request(): Error discovering a service. " 551 <<
"Did you forget to start the discovery service?" 558 bool executed = reqHandlerPtr->WaitUntil(lk, _timeout);
565 if (!reqHandlerPtr->Result())
572 if (!_rep.ParseFromString(reqHandlerPtr->Response()))
574 std::cerr <<
"Node::Request(): Error Parsing the response" 587 public:
bool UnadvertiseSrv(
const std::string &_topic);
595 public:
void TopicList(std::vector<std::string> &_topics)
const;
601 public:
bool TopicInfo(
const std::string &_topic,
602 std::vector<MessagePublisher> &_publishers)
const;
610 public:
void ServiceList(std::vector<std::string> &_services)
const;
616 public:
bool ServiceInfo(
const std::string &_service,
617 std::vector<ServicePublisher> &_publishers)
const;
621 private:
const std::string &Partition()
const;
625 private:
const std::string &NameSpace()
const;
634 private:
const std::string &NodeUuid()
const;
638 private: std::unordered_set<std::string> &TopicsAdvertised()
const;
642 private: std::unordered_set<std::string> &TopicsSubscribed()
const;
646 private: std::unordered_set<std::string> &SrvsAdvertised()
const;
654 protected: std::unique_ptr<transport::NodePrivate>
dataPtr;
bool Advertise(const std::string &_topic, const AdvertiseOptions &_options=AdvertiseOptions())
Advertise a new topic.
Definition: Node.hh:78
static bool FullyQualifiedName(const std::string &_partition, const std::string &_ns, const std::string &_topic, std::string &_name)
Get the full topic path given a namespace and a topic name.
#define IGNITION_VISIBLE
Use to represent "symbol visible" if supported.
Definition: Helpers.hh:56
bool Request(const std::string &_topic, const T1 &_req, void(*_cb)(const T2 &_rep, const bool _result))
Request a new service using a non-blocking call.
Definition: Node.hh:364
A class for customizing the behavior of the Node.
Definition: NodeOptions.hh:35
std::map< std::string, std::vector< ServicePublisher >> SrvAddresses_M
Definition: TransportTypes.hh:58
It creates a reply handler for the specific protobuf messages used.
Definition: ReqHandler.hh:175
bool Subscribe(const std::string &_topic, void(*_cb)(const T &_msg))
Subscribe to a topic registering a callback.
Definition: Node.hh:135
ignition/transport/AdvertiseOptions.hh
Definition: AdvertiseOptions.hh:50
bool Advertise(const std::string &_topic, std::function< void(const T1 &_req, T2 &_rep, bool &_result)> &_cb, const AdvertiseOptions &_options=AdvertiseOptions())
Advertise a new service.
Definition: Node.hh:268
Private data for the Node class.
Definition: NodeShared.hh:53
google::protobuf::Message ProtoMsg
Definition: TransportTypes.hh:62
std::shared_ptr< IRepHandler > IRepHandlerPtr
Definition: TransportTypes.hh:82
bool Request(const std::string &_topic, const T1 &_req, std::function< void(const T2 &_rep, const bool _result)> &_cb)
Request a new service using a non-blocking call.
Definition: Node.hh:388
bool Advertise(const std::string &_topic, void(C::*_cb)(const T1 &_req, T2 &_rep, bool &_result), C *_obj, const AdvertiseOptions &_options=AdvertiseOptions())
Advertise a new service.
Definition: Node.hh:331
bool Subscribe(const std::string &_topic, std::function< void(const T &_msg)> &_cb)
Subscribe to a topic registering a callback.
Definition: Node.hh:153
with the service response.
Definition: RepHandler.hh:98
std::unique_ptr< transport::NodePrivate > dataPtr
Definition: Node.hh:654
A class that allows a client to communicate with other peers.
Definition: Node.hh:64
ignition/transport/Publisher.hh
Definition: Publisher.hh:175
ignition/transport/Publisher.hh
Definition: Publisher.hh:264
bool Advertise(const std::string &_topic, void(*_cb)(const T1 &_req, T2 &_rep, bool &_result), const AdvertiseOptions &_options=AdvertiseOptions())
Advertise a new service.
Definition: Node.hh:242
It creates a subscription handler for a specific protobuf message.
Definition: SubscriptionHandler.hh:102
bool Request(const std::string &_topic, const T1 &_req, const unsigned int &_timeout, T2 &_rep, bool &_result)
Request a new service using a blocking call.
Definition: Node.hh:499
Definition: AdvertiseOptions.hh:25
bool Request(const std::string &_topic, const T1 &_req, void(C::*_cb)(const T2 &_rep, const bool _result), C *_obj)
Request a new service using a non-blocking call.
Definition: Node.hh:474
IGNITION_VISIBLE void waitForShutdown()
Block the current thread until a SIGINT or SIGTERM is received.
bool Subscribe(const std::string &_topic, void(C::*_cb)(const T &_msg), C *_obj)
Subscribe to a topic registering a callback.
Definition: Node.hh:204