Fawkes API  Fawkes Development Version
thread_manager.cpp
1 
2 /***************************************************************************
3  * thread_manager.cpp - Thread manager
4  *
5  * Created: Thu Nov 3 19:11:31 2006 (on train to Cologne)
6  * Copyright 2006-2009 Tim Niemueller [www.niemueller.de]
7  *
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version. A runtime exception applies to
14  * this software (see LICENSE.GPL_WRE file mentioned below for details).
15  *
16  * This program is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19  * GNU Library General Public License for more details.
20  *
21  * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
22  */
23 
24 #include <baseapp/thread_manager.h>
25 #include <core/threading/thread.h>
26 #include <core/threading/mutex_locker.h>
27 #include <core/threading/wait_condition.h>
28 #include <core/threading/thread_initializer.h>
29 #include <core/threading/thread_finalizer.h>
30 #include <core/exceptions/software.h>
31 #include <core/exceptions/system.h>
32 
33 #include <aspect/blocked_timing.h>
34 
35 namespace fawkes {
36 #if 0 /* just to make Emacs auto-indent happy */
37 }
38 #endif
39 
40 /** @class ThreadManager <baseapp/thread_manager.h>
41  * Base application thread manager.
42  * This class provides a manager for the threads. Threads are memorized by
43  * their wakeup hook. When the thread manager is deleted, all threads are
44  * appropriately cancelled, joined and deleted. Thus the thread manager
45  * can be used for "garbage collection" of threads.
46  *
47  * The thread manager allows easy wakeup of threads of a given wakeup hook.
48  *
49  * The thread manager needs a thread initializer. Each thread that is added
50  * to the thread manager is initialized with this. The runtime type information
51  * (RTTI) supplied by C++ can be used to initialize threads if appropriate
52  * (if the thread has certain aspects that need special treatment).
53  *
54  * @author Tim Niemueller
55  */
56 
57 /** Constructor.
58  * @param parent_manager parent thread manager
59  */
60 ThreadManager::ThreadManagerAspectCollector::ThreadManagerAspectCollector(ThreadManager *parent_manager)
61 {
62  __parent_manager = parent_manager;
63 }
64 
65 
66 void
67 ThreadManager::ThreadManagerAspectCollector::add(ThreadList &tl)
68 {
69  BlockedTimingAspect *timed_thread;
70 
71  for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
72  if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(*i)) != NULL ) {
73  throw IllegalArgumentException("ThreadProducerAspect may not add threads with BlockedTimingAspect");
74  }
75  }
76 
77  __parent_manager->add_maybelocked(tl, /* lock */ false);
78 }
79 
80 
81 void
82 ThreadManager::ThreadManagerAspectCollector::add(Thread *t)
83 {
84  BlockedTimingAspect *timed_thread;
85 
86  if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL ) {
87  throw IllegalArgumentException("ThreadProducerAspect may not add threads with BlockedTimingAspect");
88  }
89 
90  __parent_manager->add_maybelocked(t, /* lock */ false);
91 }
92 
93 
94 void
95 ThreadManager::ThreadManagerAspectCollector::remove(ThreadList &tl)
96 {
97  BlockedTimingAspect *timed_thread;
98 
99  for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
100  if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(*i)) != NULL ) {
101  throw IllegalArgumentException("ThreadProducerAspect may not remove threads with BlockedTimingAspect");
102  }
103  }
104 
105  __parent_manager->remove_maybelocked(tl, /* lock */ false);
106 }
107 
108 
109 void
110 ThreadManager::ThreadManagerAspectCollector::remove(Thread *t)
111 {
112  BlockedTimingAspect *timed_thread;
113 
114  if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL ) {
115  throw IllegalArgumentException("ThreadProducerAspect may not remove threads with BlockedTimingAspect");
116  }
117 
118  __parent_manager->remove_maybelocked(t, /* lock */ false);
119 }
120 
121 
122 void
123 ThreadManager::ThreadManagerAspectCollector::force_remove(fawkes::ThreadList &tl)
124 {
125  throw AccessViolationException("ThreadManagerAspect threads may not force removal of threads");
126 }
127 
128 void
129 ThreadManager::ThreadManagerAspectCollector::force_remove(fawkes::Thread *t)
130 {
131  throw AccessViolationException("ThreadManagerAspect threads may not force removal of threads");
132 }
133 
134 
135 /** Constructor.
136  * When using this constructor you need to make sure to call set_inifin()
137  * before any thread is added.
138  */
140 {
141  __initializer = NULL;
142  __finalizer = NULL;
143  __threads.clear();
144  __waitcond_timedthreads = new WaitCondition();
145  __interrupt_timed_thread_wait = false;
146  __aspect_collector = new ThreadManagerAspectCollector(this);
147 }
148 
149 /** Constructor.
150  * This contsructor is equivalent to the one without parameters followed
151  * by a call to set_inifins().
152  * @param initializer thread initializer
153  * @param finalizer thread finalizer
154  */
156  ThreadFinalizer *finalizer)
157 {
158  __initializer = NULL;
159  __finalizer = NULL;
160  __threads.clear();
161  __waitcond_timedthreads = new WaitCondition();
162  __interrupt_timed_thread_wait = false;
163  __aspect_collector = new ThreadManagerAspectCollector(this);
164  set_inifin(initializer, finalizer);
165 }
166 
167 
168 /** Destructor. */
170 {
171  // stop all threads, we call finalize, and we run through it as long as there are
172  // still running threads, after that, we force the thread's death.
173  for (__tit = __threads.begin(); __tit != __threads.end(); ++__tit) {
174  __tit->second.force_stop(__finalizer);
175  }
176  __untimed_threads.force_stop(__finalizer);
177  __threads.clear();
178 
179  delete __waitcond_timedthreads;
180  delete __aspect_collector;
181 }
182 
183 
184 /** Set initializer/finalizer.
185  * This method has to be called before any thread is added/removed.
186  * @param initializer thread initializer
187  * @param finalizer thread finalizer
188  */
189 void
191 {
192  __initializer = initializer;
193  __finalizer = finalizer;
194 }
195 
196 
197 /** Remove the given thread from internal structures.
198  * Thread is removed from the internal structures. If the thread has the
199  * BlockedTimingAspect then the hook is added to the changed list.
200  *
201  * @param t thread to remove
202  * @param changed list of changed hooks, appropriate hook is added if necessary
203  */
204 void
205 ThreadManager::internal_remove_thread(Thread *t)
206 {
207  BlockedTimingAspect *timed_thread;
208 
209  if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL ) {
210  // find thread and remove
212  if ( __threads.find(hook) != __threads.end() ) {
213  __threads[hook].remove_locked(t);
214  if (__threads[hook].empty()) __threads.erase(hook);
215  }
216  } else {
217  __untimed_threads.remove_locked(t);
218  }
219 }
220 
221 
222 /** Add the given thread to internal structures.
223  * Thread is added to the internal structures. If the thread has the
224  * BlockedTimingAspect then the hook is added to the changed list.
225  *
226  * @param t thread to add
227  * @param changed list of changed hooks, appropriate hook is added if necessary
228  */
229 void
230 ThreadManager::internal_add_thread(Thread *t)
231 {
232  BlockedTimingAspect *timed_thread;
233  if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL ) {
235 
236  if ( __threads.find(hook) == __threads.end() ) {
237  __threads[hook].set_name("ThreadManagerList Hook %i", hook);
238  __threads[hook].set_maintain_barrier(true);
239  }
240  __threads[hook].push_back_locked(t);
241 
242  __waitcond_timedthreads->wake_all();
243  } else {
244  __untimed_threads.push_back_locked(t);
245  }
246 }
247 
248 
249 /** Add threads.
250  * Add the given threads to the thread manager. The threads are initialised
251  * as appropriate and started. See the class documentation for supported
252  * specialisations of threads and the performed initialisation steps.
253  * If the thread initializer cannot initalize one or more threads no thread
254  * is added. In this regard the operation is atomic, either all threads are
255  * added or none.
256  * @param tl thread list with threads to add
257  * @exception CannotInitializeThreadException thrown if at least one of the
258  * threads could not be initialised
259  */
260 void
261 ThreadManager::add_maybelocked(ThreadList &tl, bool lock)
262 {
263  if ( ! (__initializer && __finalizer) ) {
264  throw NullPointerException("ThreadManager: initializer/finalizer not set");
265  }
266 
267  if ( tl.sealed() ) {
268  throw Exception("Not accepting new threads from list that is not fresh, "
269  "list '%s' already sealed", tl.name());
270  }
271 
272  tl.lock();
273 
274  // Try to initialise all threads
275  try {
276  tl.init(__initializer, __finalizer);
277  } catch (Exception &e) {
278  tl.unlock();
279  throw;
280  }
281 
282  tl.seal();
283  tl.start();
284 
285  // All thread initialized, now add threads to internal structure
286  MutexLocker locker(__threads.mutex(), lock);
287  for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
288  internal_add_thread(*i);
289  }
290 
291  tl.unlock();
292 }
293 
294 
295 /** Add one thread.
296  * Add the given thread to the thread manager. The thread is initialized
297  * as appropriate and started. See the class documentation for supported
298  * specialisations of threads and the performed initialisation steps.
299  * If the thread initializer cannot initalize the thread it is not added.
300  * @param thread thread to add
301  * @param lock if true the environment is locked before adding the thread
302  * @exception CannotInitializeThreadException thrown if at least the
303  * thread could not be initialised
304  */
305 void
306 ThreadManager::add_maybelocked(Thread *thread, bool lock)
307 {
308  if ( thread == NULL ) {
309  throw NullPointerException("FawkesThreadMananger: cannot add NULL as thread");
310  }
311 
312  if ( ! (__initializer && __finalizer) ) {
313  throw NullPointerException("ThreadManager: initializer/finalizer not set");
314  }
315 
316  try {
317  __initializer->init(thread);
318  } catch (CannotInitializeThreadException &e) {
319  e.append("Adding thread in ThreadManager failed");
320  throw;
321  }
322 
323  thread->start();
324  MutexLocker locker(__threads.mutex(), lock);
325  internal_add_thread(thread);
326 }
327 
328 
329 /** Remove the given threads.
330  * The thread manager tries to finalize and stop the threads and then removes the
331  * threads from the internal structures.
332  *
333  * This may fail if at least one thread of the given list cannot be finalized, for
334  * example if prepare_finalize() returns false or if the thread finalizer cannot
335  * finalize the thread. In this case a CannotFinalizeThreadException is thrown.
336  *
337  * @param tl threads to remove.
338  * @exception CannotFinalizeThreadException At least one thread cannot be safely
339  * finalized
340  * @exception ThreadListNotSealedException if the given thread lits tl is not
341  * sealed the thread manager will refuse to remove it
342  */
343 void
344 ThreadManager::remove_maybelocked(ThreadList &tl, bool lock)
345 {
346  if ( ! (__initializer && __finalizer) ) {
347  throw NullPointerException("ThreadManager: initializer/finalizer not set");
348  }
349 
350 
351  if ( ! tl.sealed() ) {
352  throw ThreadListNotSealedException("(ThreadManager) Cannot remove unsealed thread "
353  "list. Not accepting unsealed list '%s' for removal",
354  tl.name());
355  }
356 
357  tl.lock();
358  MutexLocker locker(__threads.mutex(), lock);
359 
360  try {
361  if ( ! tl.prepare_finalize(__finalizer) ) {
362  tl.cancel_finalize();
363  tl.unlock();
364  throw CannotFinalizeThreadException("One or more threads in list '%s' cannot be "
365  "finalized", tl.name());
366  }
367  } catch (CannotFinalizeThreadException &e) {
368  tl.unlock();
369  throw;
370  } catch (Exception &e) {
371  tl.unlock();
372  e.append("One or more threads in list '%s' cannot be finalized", tl.name());
374  }
375 
376  tl.stop();
377  try {
378  tl.finalize(__finalizer);
379  } catch (Exception &e) {
380  tl.unlock();
381  throw;
382  }
383 
384  for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
385  internal_remove_thread(*i);
386  }
387 
388  tl.unlock();
389 }
390 
391 
392 /** Remove the given thread.
393  * The thread manager tries to finalize and stop the thread and then removes the
394  * thread from the internal structures.
395  *
396  * This may fail if the thread cannot be finalized, for
397  * example if prepare_finalize() returns false or if the thread finalizer cannot
398  * finalize the thread. In this case a CannotFinalizeThreadException is thrown.
399  *
400  * @param thread thread to remove.
401  * @exception CannotFinalizeThreadException At least one thread cannot be safely
402  * finalized
403  */
404 void
405 ThreadManager::remove_maybelocked(Thread *thread, bool lock)
406 {
407  if ( thread == NULL ) return;
408 
409  if ( ! (__initializer && __finalizer) ) {
410  throw NullPointerException("ThreadManager: initializer/finalizer not set");
411  }
412 
413  MutexLocker locker(__threads.mutex(), lock);
414  try {
415  if ( ! thread->prepare_finalize() ) {
416  thread->cancel_finalize();
417  throw CannotFinalizeThreadException("Thread '%s'cannot be finalized", thread->name());
418  }
419  } catch (CannotFinalizeThreadException &e) {
420  e.append("ThreadManager cannot stop thread '%s'", thread->name());
421  thread->cancel_finalize();
422  throw;
423  }
424 
425  thread->cancel();
426  thread->join();
427  __finalizer->finalize(thread);
428  thread->finalize();
429 
430  internal_remove_thread(thread);
431 }
432 
433 
434 
435 
436 /** Force removal of the given threads.
437  * The thread manager tries to finalize and stop the threads and then removes the
438  * threads from the internal structures.
439  *
440  * This will succeed even if a thread of the given list cannot be finalized, for
441  * example if prepare_finalize() returns false or if the thread finalizer cannot
442  * finalize the thread.
443  *
444  * <b>Caution, using this function may damage your robot.</b>
445  *
446  * @param tl threads to remove.
447  * @exception ThreadListNotSealedException if the given thread lits tl is not
448  * sealed the thread manager will refuse to remove it
449  * The threads are removed from thread manager control. The threads will be stopped
450  * before they are removed (may cause unpredictable results otherwise).
451  */
452 void
454 {
455  if ( ! tl.sealed() ) {
456  throw ThreadListNotSealedException("Not accepting unsealed list '%s' for removal",
457  tl.name());
458  }
459 
460  tl.lock();
461  __threads.mutex()->stopby();
462  tl.force_stop(__finalizer);
463 
464  for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
465  internal_remove_thread(*i);
466  }
467 
468  tl.unlock();
469 }
470 
471 
472 /** Force removal of the given thread.
473  * The thread manager tries to finalize and stop the thread and then removes the
474  * thread from the internal structures.
475  *
476  * This will succeed even if the thread cannot be finalized, for
477  * example if prepare_finalize() returns false or if the thread finalizer cannot
478  * finalize the thread.
479  *
480  * <b>Caution, using this function may damage your robot.</b>
481  *
482  * @param thread thread to remove.
483  * @exception ThreadListNotSealedException if the given thread lits tl is not
484  * sealed the thread manager will refuse to remove it
485  * The threads are removed from thread manager control. The threads will be stopped
486  * before they are removed (may cause unpredictable results otherwise).
487  */
488 void
490 {
491  MutexLocker lock(__threads.mutex());
492  try {
493  thread->prepare_finalize();
494  } catch (Exception &e) {
495  // ignore
496  }
497 
498  thread->cancel();
499  thread->join();
500  if (__finalizer) __finalizer->finalize(thread);
501  thread->finalize();
502 
503  internal_remove_thread(thread);
504 }
505 
506 
507 void
509  unsigned int timeout_usec)
510 {
511  MutexLocker lock(__threads.mutex());
512 
513  unsigned int timeout_sec = 0;
514  if (timeout_usec >= 1000000) {
515  timeout_sec = timeout_usec / 1000000;
516  timeout_usec -= timeout_sec * 1000000;
517  }
518 
519  // Note that the following lines might throw an exception, we just pass it on
520  if ( __threads.find(hook) != __threads.end() ) {
521  __threads[hook].wakeup_and_wait(timeout_sec, timeout_usec * 1000);
522  }
523 }
524 
525 
526 void
528 {
529  MutexLocker lock(__threads.mutex());
530 
531  if ( __threads.find(hook) != __threads.end() ) {
532  if ( barrier ) {
533  __threads[hook].wakeup(barrier);
534  } else {
535  __threads[hook].wakeup();
536  }
537  if ( __threads[hook].size() == 0 ) {
538  __threads.erase(hook);
539  }
540  }
541 }
542 
543 
544 void
545 ThreadManager::try_recover(std::list<std::string> &recovered_threads)
546 {
547  __threads.lock();
548  for (__tit = __threads.begin(); __tit != __threads.end(); ++__tit) {
549  __tit->second.try_recover(recovered_threads);
550  }
551  __threads.unlock();
552 }
553 
554 
555 bool
557 {
558  return (__threads.size() > 0);
559 }
560 
561 
562 void
564 {
565  __interrupt_timed_thread_wait = false;
566  __waitcond_timedthreads->wait();
567  if ( __interrupt_timed_thread_wait ) {
568  __interrupt_timed_thread_wait = false;
569  throw InterruptedException("Waiting for timed threads was interrupted");
570  }
571 }
572 
573 void
575 {
576  __interrupt_timed_thread_wait = true;
577  __waitcond_timedthreads->wake_all();
578 }
579 
580 
581 
582 /** Get a thread collector to be used for an aspect initializer.
583  * @return thread collector instance to use for ThreadProducerAspect.
584  */
587 {
588  return __aspect_collector;
589 }
590 
591 } // end namespace fawkes
bool sealed()
Check if list is sealed.
void set_inifin(ThreadInitializer *initializer, ThreadFinalizer *finalizer)
Set initializer/finalizer.
Wait until a given condition holds.
const char * name()
Name of the thread list.
ThreadCollector * aspect_collector() const
Get a thread collector to be used for an aspect initializer.
virtual void init(Thread *thread)=0
This method is called by the ThreadManager for each newly added Thread.
void start()
Start threads.
void cancel_finalize()
Cancel finalization on all threads.
Fawkes library namespace.
virtual void unlock() const
Unlock list.
Definition: lock_list.h:144
void wake_all()
Wake up all waiting threads.
void push_back_locked(Thread *thread)
Add thread to the end with lock protection.
Mutex locking helper.
Definition: mutex_locker.h:33
void seal()
Seal the list.
virtual ~ThreadManager()
Destructor.
virtual void finalize(Thread *thread)=0
Finalize a thread.
void cancel_finalize()
Cancel finalization.
Definition: thread.cpp:488
Thread collector.
void force_stop(ThreadFinalizer *finalizer)
Force stop of all threads.
A NULL pointer was supplied where not allowed.
Definition: software.h:34
virtual bool timed_threads_exist()
Check if any timed threads exist.
Thread class encapsulation of pthreads.
Definition: thread.h:42
virtual void lock() const
Lock list.
Definition: lock_list.h:128
virtual void force_remove(ThreadList &tl)
Force removal of the given threads.
bool prepare_finalize(ThreadFinalizer *finalizer)
Prepare finalize.
ThreadManager()
Constructor.
Thread aspect to use blocked timing.
virtual void wait_for_timed_threads()
Wait for timed threads.
List of threads.
Definition: thread_list.h:57
Thread cannot be initialized.
WakeupHook
Type to define at which hook the thread is woken up.
Base class for exceptions in Fawkes.
Definition: exception.h:36
void remove_locked(Thread *thread)
Remove with lock protection.
void init(ThreadInitializer *initializer, ThreadFinalizer *finalizer)
Initialize threads.
void stop()
Stop threads.
Thread initializer interface.
virtual void finalize()
Finalize the thread.
Definition: thread.cpp:469
bool prepare_finalize()
Prepare finalization.
Definition: thread.cpp:379
void finalize(ThreadFinalizer *finalizer)
Finalize Threads.
The current system call has been interrupted (for instance by a signal).
Definition: system.h:39
void wait()
Wait for the condition forever.
const char * name() const
Get name of thread.
Definition: thread.h:95
virtual void wakeup(BlockedTimingAspect::WakeupHook hook, Barrier *barrier=0)
Wakeup thread for given hook.
virtual void wakeup_and_wait(BlockedTimingAspect::WakeupHook hook, unsigned int timeout_usec=0)
Wakeup thread for given hook and wait for completion.
void cancel()
Cancel a thread.
Definition: thread.cpp:640
Thread list not sealed exception.
Definition: thread_list.h:50
virtual void interrupt_timed_thread_wait()
Interrupt any currently running wait_for_timed_threads() and cause it to throw an InterruptedExceptio...
Thread cannot be finalized.
void join()
Join the thread.
Definition: thread.cpp:599
virtual void try_recover(std::list< std::string > &recovered_threads)
Try to recover threads.
WakeupHook blockedTimingAspectHook() const
Get the wakeup hook.
A barrier is a synchronization tool which blocks until a given number of threads have reached the bar...
Definition: barrier.h:32
void append(const char *format,...)
Append messages to the message list.
Definition: exception.cpp:341
void start(bool wait=true)
Call this method to start the thread.
Definition: thread.cpp:507
Thread finalizer interface.