24 #include <netcomm/fawkes/client.h> 25 #include <netcomm/fawkes/client_handler.h> 26 #include <netcomm/fawkes/message_queue.h> 27 #include <netcomm/fawkes/transceiver.h> 28 #include <netcomm/socket/stream.h> 29 #include <netcomm/utils/exceptions.h> 31 #include <core/threading/thread.h> 32 #include <core/threading/mutex.h> 33 #include <core/threading/mutex_locker.h> 34 #include <core/threading/wait_condition.h> 35 #include <core/exceptions/system.h> 54 :
Exception(
"A handler for this component has already been registered")
74 :
Thread(
"FawkesNetworkClientSendThread",
Thread::OPMODE_WAITFORWAKEUP)
78 __outbound_mutex =
new Mutex();
81 __outbound_active = 0;
82 __outbound_msgq = __outbound_msgqs[0];
88 for (
unsigned int i = 0; i < 2; ++i) {
89 while ( ! __outbound_msgqs[i]->empty() ) {
92 __outbound_msgqs[i]->pop();
95 delete __outbound_msgqs[0];
96 delete __outbound_msgqs[1];
97 delete __outbound_mutex;
102 __parent->set_send_slave_alive();
107 if ( ! __parent->connected() )
return;
109 while ( __outbound_havemore ) {
110 __outbound_mutex->lock();
111 __outbound_havemore =
false;
113 __outbound_active = 1 - __outbound_active;
114 __outbound_msgq = __outbound_msgqs[__outbound_active];
115 __outbound_mutex->
unlock();
117 if ( ! q->empty() ) {
121 __parent->connection_died();
133 if ( loop_mutex->try_lock() ) {
135 loop_mutex->unlock();
152 __outbound_mutex->lock();
153 __outbound_msgq->push(message);
154 __outbound_havemore =
true;
155 __outbound_mutex->unlock();
165 Mutex *__outbound_mutex;
166 unsigned int __outbound_active;
167 bool __outbound_havemore;
190 :
Thread(
"FawkesNetworkClientRecvThread")
195 __recv_mutex = recv_mutex;
201 while ( ! __inbound_msgq->empty() ) {
204 __inbound_msgq->pop();
206 delete __inbound_msgq;
212 std::list<unsigned int> wakeup_list;
219 __inbound_msgq->lock();
220 while ( ! __inbound_msgq->empty() ) {
222 wakeup_list.push_back(m->
cid());
223 __parent->dispatch_message(m);
225 __inbound_msgq->pop();
227 __inbound_msgq->unlock();
232 wakeup_list.unique();
233 for (std::list<unsigned int>::iterator i = wakeup_list.begin(); i != wakeup_list.end(); ++i) {
234 __parent->wake_handlers(*i);
243 __parent->set_recv_slave_alive();
261 __parent->connection_died();
268 __parent->connection_died();
300 __hostname = strdup(hostname);
301 __ip = ip ? strdup(ip) : NULL;
308 connection_died_recently =
false;
309 __send_slave_alive =
false;
310 __recv_slave_alive =
false;
312 slave_status_mutex =
new Mutex();
317 __recv_mutex =
new Mutex();
319 __connest_mutex =
new Mutex();
322 __connest_interrupted =
false;
340 connection_died_recently =
false;
341 __send_slave_alive =
false;
342 __recv_slave_alive =
false;
344 slave_status_mutex =
new Mutex();
349 __recv_mutex =
new Mutex();
351 __connest_mutex =
new Mutex();
354 __connest_interrupted =
false;
365 unsigned short int port,
const char *ip)
367 __hostname = strdup(hostname);
368 __ip = ip ? strdup(ip) : NULL;
375 connection_died_recently =
false;
376 __send_slave_alive =
false;
377 __recv_slave_alive =
false;
379 slave_status_mutex =
new Mutex();
384 __recv_mutex =
new Mutex();
386 __connest_mutex =
new Mutex();
389 __connest_interrupted =
false;
399 if (__hostname) free(__hostname);
400 if (__ip) free(__ip);
401 delete slave_status_mutex;
403 delete __connest_waitcond;
404 delete __connest_mutex;
405 delete __recv_waitcond;
417 if ( __hostname == NULL && __ip == NULL) {
426 connection_died_recently =
false;
430 s->connect(__ip ? __ip : __hostname, __port);
432 __send_slave->start();
434 __recv_slave->start();
436 connection_died_recently =
true;
437 if ( __send_slave ) {
438 __send_slave->cancel();
439 __send_slave->join();
443 if ( __recv_slave ) {
444 __recv_slave->cancel();
445 __recv_slave->join();
449 __send_slave_alive =
false;
450 __recv_slave_alive =
false;
456 __connest_mutex->lock();
457 while ( ! __connest && ! __connest_interrupted ) {
458 __connest_waitcond->wait();
460 bool interrupted = __connest_interrupted;
461 __connest_interrupted =
false;
462 __connest_mutex->unlock();
467 notify_of_connection_established();
480 connect(hostname, NULL, port);
493 if (__hostname) free(__hostname);
494 if (__ip) free(__ip);
495 __hostname = strdup(hostname);
496 __ip = ip ? strdup(ip) : NULL;
505 if ( s == NULL )
return;
507 if ( __send_slave_alive ) {
508 if ( ! connection_died_recently ) {
509 __send_slave->force_send();
513 __send_slave->cancel();
514 __send_slave->join();
518 if ( __recv_slave_alive ) {
519 __recv_slave->cancel();
520 __recv_slave->join();
524 __send_slave_alive =
false;
525 __recv_slave_alive =
false;
529 if (! connection_died_recently) {
542 __connest_mutex->lock();
543 __connest_interrupted =
true;
544 __connest_waitcond->wake_all();
545 __connest_mutex->unlock();
563 if (__send_slave) __send_slave->enqueue(message);
580 unsigned int timeout_sec)
582 if (__send_slave && __recv_slave) {
583 __recv_mutex->lock();
584 if ( __recv_received.find(message->
cid()) != __recv_received.end()) {
585 __recv_mutex->unlock();
586 unsigned int cid = message->
cid();
587 throw Exception(
"There is already a thread waiting for messages of " 588 "component id %u", cid);
590 __send_slave->enqueue(message);
591 unsigned int cid = message->
cid();
592 __recv_received[cid] =
false;
593 while (!__recv_received[cid] && ! connection_died_recently) {
594 if (!__recv_waitcond->reltimed_wait(timeout_sec, 0)) {
595 __recv_received.erase(cid);
596 __recv_mutex->unlock();
597 throw TimeoutException(
"Timeout reached while waiting for incoming message " 598 "(outgoing was %u:%u)", message->
cid(), message->
msgid());
601 __recv_received.erase(cid);
602 __recv_mutex->unlock();
604 unsigned int cid = message->
cid();
605 unsigned int msgid = message->
msgid();
606 throw Exception(
"Cannot enqueue given message %u:%u, sender or " 607 "receiver missing", cid, msgid);
621 unsigned int component_id)
624 if ( handlers.find(component_id) != handlers.end() ) {
628 handlers[component_id] = handler;
642 if ( handlers.find(component_id) != handlers.end() ) {
643 handlers[component_id]->deregistered(_id);
644 handlers.erase(component_id);
647 __recv_mutex->lock();
648 if (__recv_received.find(component_id) != __recv_received.end()) {
649 __recv_received[component_id] =
true;
650 __recv_waitcond->wake_all();
652 __recv_mutex->unlock();
659 unsigned int cid = m->
cid();
661 if (handlers.find(cid) != handlers.end()) {
662 handlers[cid]->inbound_received(m, _id);
669 FawkesNetworkClient::wake_handlers(
unsigned int cid)
671 __recv_mutex->lock();
672 if (__recv_received.find(cid) != __recv_received.end()) {
673 __recv_received[cid] =
true;
675 __recv_waitcond->wake_all();
676 __recv_mutex->unlock();
680 FawkesNetworkClient::notify_of_connection_dead()
682 __connest_mutex->lock();
684 __connest_mutex->unlock();
687 for ( HandlerMap::iterator i = handlers.begin(); i != handlers.end(); ++i ) {
688 i->second->connection_died(_id);
692 __recv_mutex->lock();
693 __recv_waitcond->wake_all();
694 __recv_mutex->unlock();
698 FawkesNetworkClient::notify_of_connection_established()
701 for ( HandlerMap::iterator i = handlers.begin(); i != handlers.end(); ++i ) {
702 i->second->connection_established(_id);
709 FawkesNetworkClient::connection_died()
711 connection_died_recently =
true;
712 notify_of_connection_dead();
717 FawkesNetworkClient::set_send_slave_alive()
719 slave_status_mutex->lock();
720 __send_slave_alive =
true;
721 if ( __send_slave_alive && __recv_slave_alive ) {
722 __connest_mutex->lock();
724 __connest_waitcond->wake_all();
725 __connest_mutex->unlock();
727 slave_status_mutex->unlock();
732 FawkesNetworkClient::set_recv_slave_alive()
734 slave_status_mutex->lock();
735 __recv_slave_alive =
true;
736 if ( __send_slave_alive && __recv_slave_alive ) {
737 __connest_mutex->lock();
739 __connest_waitcond->wake_all();
740 __connest_mutex->unlock();
742 slave_status_mutex->unlock();
756 __recv_mutex->lock();
757 if ( __recv_received.find(component_id) != __recv_received.end()) {
758 __recv_mutex->unlock();
759 throw Exception(
"There is already a thread waiting for messages of " 760 "component id %u", component_id);
762 __recv_received[component_id] =
false;
763 while (! __recv_received[component_id] && ! connection_died_recently) {
764 if (!__recv_waitcond->reltimed_wait(timeout_sec, 0)) {
765 __recv_received.erase(component_id);
766 __recv_mutex->unlock();
767 throw TimeoutException(
"Timeout reached while waiting for incoming message " 768 "(component %u)", component_id);
771 __recv_received.erase(component_id);
772 __recv_mutex->unlock();
784 __recv_mutex->lock();
785 if ( __recv_received.find(component_id) != __recv_received.end()) {
786 __recv_received[component_id] =
true;
788 __recv_waitcond->wake_all();
789 __recv_mutex->unlock();
799 return (! connection_died_recently && (s != NULL));
820 throw Exception(
"Trying to get the ID of a client that has no ID");
~FawkesNetworkClient()
Destructor.
void unlock() const
Unlock list.
Message handler for FawkesNetworkClient.
static const short POLL_ERR
Error condition.
void wake(unsigned int component_id)
Wake a waiting thread.
unsigned int id() const
Get the client's ID.
void interrupt_connect()
Interrupt connect().
bool has_id() const
Check whether the client has an id.
const char * get_hostname() const
Get the client's hostname.
Wait until a given condition holds.
void enqueue(FawkesNetworkMessage *message)
Enqueue message to send and take ownership.
A LockQueue of FawkesNetworkMessage to hold messages in inbound and outbound queues.
void enqueue_and_wait(FawkesNetworkMessage *message, unsigned int timeout_sec=15)
Enqueue message to send and wait for answer.
Simple Fawkes network client.
void unref()
Decrement reference count and conditionally delete this instance.
const char * get_ip() const
Get the client's ip.
Fawkes library namespace.
static void recv(StreamSocket *s, FawkesNetworkMessageQueue *msgq, unsigned int max_num_msgs=8)
Receive data.
virtual void run()
Stub to see name in backtrace for easier debugging.
Exception()
Constructor for subclasses.
virtual void run()
Code to execute in the thread.
void disconnect()
Disconnect socket.
void register_handler(FawkesNetworkClientHandler *handler, unsigned int component_id)
Register handler.
static const short POLL_IN
Data can be read.
void enqueue(FawkesNetworkMessage *message)
Enqueue message to send.
void wait(unsigned int component_id, unsigned int timeout_sec=15)
Wait for messages for component ID.
Representation of a message that is sent over the network.
void connect()
Connect to remote.
The current system call has timed out before completion.
unsigned short int msgid() const
Get message type ID.
Fawkes network client send thread.
A NULL pointer was supplied where not allowed.
Thread class encapsulation of pthreads.
~FawkesNetworkClientRecvThread()
Destructor.
virtual void loop()
Code to execute in the thread.
TCP stream socket over IP.
void unlock()
Unlock the mutex.
HandlerAlreadyRegisteredException()
Costructor.
virtual void loop()
Code to execute in the thread.
Base class for exceptions in Fawkes.
void recv()
Receive and process messages.
virtual void once()
Execute an action exactly once.
bool connected() const
Check if connection is alive.
The current system call has been interrupted (for instance by a signal).
FawkesNetworkClient()
Constructor.
Thrown if the connection died during an operation.
static const short POLL_RDHUP
Stream socket peer closed connection, or shut down writing half of connection.
FawkesNetworkClientRecvThread(StreamSocket *s, FawkesNetworkClient *parent, Mutex *recv_mutex)
Constructor.
~FawkesNetworkClientSendThread()
Destructor.
void deregister_handler(unsigned int component_id)
Deregister handler.
static void send(StreamSocket *s, FawkesNetworkMessageQueue *msgq)
Send messages.
static const short POLL_HUP
Hang up.
unsigned short int cid() const
Get component ID.
virtual void run()
Stub to see name in backtrace for easier debugging.
Mutex mutual exclusion lock.
FawkesNetworkClientSendThread(StreamSocket *s, FawkesNetworkClient *parent)
Constructor.
Fawkes network client receive thread.
void force_send()
Force sending of messages.
virtual void once()
Execute an action exactly once.