24 #include <baseapp/thread_manager.h> 25 #include <core/threading/thread.h> 26 #include <core/threading/mutex_locker.h> 27 #include <core/threading/wait_condition.h> 28 #include <core/threading/thread_initializer.h> 29 #include <core/threading/thread_finalizer.h> 30 #include <core/exceptions/software.h> 31 #include <core/exceptions/system.h> 33 #include <aspect/blocked_timing.h> 60 ThreadManager::ThreadManagerAspectCollector::ThreadManagerAspectCollector(ThreadManager *parent_manager)
62 __parent_manager = parent_manager;
67 ThreadManager::ThreadManagerAspectCollector::add(ThreadList &tl)
69 BlockedTimingAspect *timed_thread;
71 for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
72 if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(*i)) != NULL ) {
73 throw IllegalArgumentException(
"ThreadProducerAspect may not add threads with BlockedTimingAspect");
77 __parent_manager->add_maybelocked(tl,
false);
82 ThreadManager::ThreadManagerAspectCollector::add(Thread *t)
84 BlockedTimingAspect *timed_thread;
86 if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL ) {
87 throw IllegalArgumentException(
"ThreadProducerAspect may not add threads with BlockedTimingAspect");
90 __parent_manager->add_maybelocked(t,
false);
95 ThreadManager::ThreadManagerAspectCollector::remove(ThreadList &tl)
97 BlockedTimingAspect *timed_thread;
99 for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
100 if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(*i)) != NULL ) {
101 throw IllegalArgumentException(
"ThreadProducerAspect may not remove threads with BlockedTimingAspect");
105 __parent_manager->remove_maybelocked(tl,
false);
110 ThreadManager::ThreadManagerAspectCollector::remove(Thread *t)
112 BlockedTimingAspect *timed_thread;
114 if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL ) {
115 throw IllegalArgumentException(
"ThreadProducerAspect may not remove threads with BlockedTimingAspect");
118 __parent_manager->remove_maybelocked(t,
false);
125 throw AccessViolationException(
"ThreadManagerAspect threads may not force removal of threads");
129 ThreadManager::ThreadManagerAspectCollector::force_remove(
fawkes::Thread *t)
131 throw AccessViolationException(
"ThreadManagerAspect threads may not force removal of threads");
141 __initializer = NULL;
145 __interrupt_timed_thread_wait =
false;
146 __aspect_collector =
new ThreadManagerAspectCollector(
this);
158 __initializer = NULL;
162 __interrupt_timed_thread_wait =
false;
163 __aspect_collector =
new ThreadManagerAspectCollector(
this);
173 for (__tit = __threads.begin(); __tit != __threads.end(); ++__tit) {
174 __tit->second.force_stop(__finalizer);
179 delete __waitcond_timedthreads;
180 delete __aspect_collector;
192 __initializer = initializer;
193 __finalizer = finalizer;
205 ThreadManager::internal_remove_thread(
Thread *t)
209 if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL ) {
212 if ( __threads.find(hook) != __threads.end() ) {
213 __threads[hook].remove_locked(t);
214 if (__threads[hook].empty()) __threads.erase(hook);
230 ThreadManager::internal_add_thread(
Thread *t)
233 if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL ) {
236 if ( __threads.find(hook) == __threads.end() ) {
237 __threads[hook].set_name(
"ThreadManagerList Hook %i", hook);
238 __threads[hook].set_maintain_barrier(
true);
240 __threads[hook].push_back_locked(t);
242 __waitcond_timedthreads->
wake_all();
261 ThreadManager::add_maybelocked(
ThreadList &tl,
bool lock)
263 if ( ! (__initializer && __finalizer) ) {
268 throw Exception(
"Not accepting new threads from list that is not fresh, " 269 "list '%s' already sealed", tl.
name());
276 tl.
init(__initializer, __finalizer);
287 for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
288 internal_add_thread(*i);
306 ThreadManager::add_maybelocked(
Thread *thread,
bool lock)
308 if ( thread == NULL ) {
312 if ( ! (__initializer && __finalizer) ) {
317 __initializer->
init(thread);
319 e.
append(
"Adding thread in ThreadManager failed");
325 internal_add_thread(thread);
344 ThreadManager::remove_maybelocked(
ThreadList &tl,
bool lock)
346 if ( ! (__initializer && __finalizer) ) {
353 "list. Not accepting unsealed list '%s' for removal",
365 "finalized", tl.
name());
372 e.
append(
"One or more threads in list '%s' cannot be finalized", tl.
name());
384 for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
385 internal_remove_thread(*i);
405 ThreadManager::remove_maybelocked(
Thread *thread,
bool lock)
407 if ( thread == NULL )
return;
409 if ( ! (__initializer && __finalizer) ) {
420 e.
append(
"ThreadManager cannot stop thread '%s'", thread->
name());
430 internal_remove_thread(thread);
461 __threads.mutex()->stopby();
464 for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
465 internal_remove_thread(*i);
500 if (__finalizer) __finalizer->
finalize(thread);
503 internal_remove_thread(thread);
509 unsigned int timeout_usec)
513 unsigned int timeout_sec = 0;
514 if (timeout_usec >= 1000000) {
515 timeout_sec = timeout_usec / 1000000;
516 timeout_usec -= timeout_sec * 1000000;
520 if ( __threads.find(hook) != __threads.end() ) {
521 __threads[hook].wakeup_and_wait(timeout_sec, timeout_usec * 1000);
531 if ( __threads.find(hook) != __threads.end() ) {
533 __threads[hook].wakeup(barrier);
535 __threads[hook].wakeup();
537 if ( __threads[hook].size() == 0 ) {
538 __threads.erase(hook);
548 for (__tit = __threads.begin(); __tit != __threads.end(); ++__tit) {
549 __tit->second.try_recover(recovered_threads);
558 return (__threads.size() > 0);
565 __interrupt_timed_thread_wait =
false;
566 __waitcond_timedthreads->
wait();
567 if ( __interrupt_timed_thread_wait ) {
568 __interrupt_timed_thread_wait =
false;
576 __interrupt_timed_thread_wait =
true;
577 __waitcond_timedthreads->
wake_all();
588 return __aspect_collector;
bool sealed()
Check if list is sealed.
void set_inifin(ThreadInitializer *initializer, ThreadFinalizer *finalizer)
Set initializer/finalizer.
Wait until a given condition holds.
const char * name()
Name of the thread list.
ThreadCollector * aspect_collector() const
Get a thread collector to be used for an aspect initializer.
virtual void init(Thread *thread)=0
This method is called by the ThreadManager for each newly added Thread.
void start()
Start threads.
void cancel_finalize()
Cancel finalization on all threads.
Fawkes library namespace.
virtual void unlock() const
Unlock list.
void wake_all()
Wake up all waiting threads.
void push_back_locked(Thread *thread)
Add thread to the end with lock protection.
void seal()
Seal the list.
virtual ~ThreadManager()
Destructor.
virtual void finalize(Thread *thread)=0
Finalize a thread.
void cancel_finalize()
Cancel finalization.
void force_stop(ThreadFinalizer *finalizer)
Force stop of all threads.
A NULL pointer was supplied where not allowed.
virtual bool timed_threads_exist()
Check if any timed threads exist.
Thread class encapsulation of pthreads.
virtual void lock() const
Lock list.
virtual void force_remove(ThreadList &tl)
Force removal of the given threads.
bool prepare_finalize(ThreadFinalizer *finalizer)
Prepare finalize.
ThreadManager()
Constructor.
Thread aspect to use blocked timing.
virtual void wait_for_timed_threads()
Wait for timed threads.
Thread cannot be initialized.
WakeupHook
Type to define at which hook the thread is woken up.
Base class for exceptions in Fawkes.
void remove_locked(Thread *thread)
Remove with lock protection.
void init(ThreadInitializer *initializer, ThreadFinalizer *finalizer)
Initialize threads.
Thread initializer interface.
virtual void finalize()
Finalize the thread.
bool prepare_finalize()
Prepare finalization.
void finalize(ThreadFinalizer *finalizer)
Finalize Threads.
The current system call has been interrupted (for instance by a signal).
void wait()
Wait for the condition forever.
const char * name() const
Get name of thread.
virtual void wakeup(BlockedTimingAspect::WakeupHook hook, Barrier *barrier=0)
Wakeup thread for given hook.
virtual void wakeup_and_wait(BlockedTimingAspect::WakeupHook hook, unsigned int timeout_usec=0)
Wakeup thread for given hook and wait for completion.
void cancel()
Cancel a thread.
Thread list not sealed exception.
virtual void interrupt_timed_thread_wait()
Interrupt any currently running wait_for_timed_threads() and cause it to throw an InterruptedExceptio...
Thread cannot be finalized.
void join()
Join the thread.
virtual void try_recover(std::list< std::string > &recovered_threads)
Try to recover threads.
WakeupHook blockedTimingAspectHook() const
Get the wakeup hook.
A barrier is a synchronization tool which blocks until a given number of threads have reached the bar...
void append(const char *format,...)
Append messages to the message list.
void start(bool wait=true)
Call this method to start the thread.
Thread finalizer interface.