Fawkes API  Fawkes Development Version
log_thread.cpp
1 
2 /***************************************************************************
3  * log_thread.cpp - BB Logger Thread
4  *
5  * Created: Sun Nov 08 00:02:09 2009
6  * Copyright 2006-2009 Tim Niemueller [www.niemueller.de]
7  *
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU Library General Public License for more details.
19  *
20  * Read the full text in the LICENSE.GPL file in the doc directory.
21  */
22 
23 #include "log_thread.h"
24 #include "file.h"
25 
26 #include <blackboard/blackboard.h>
27 #include <logging/logger.h>
28 #include <core/exceptions/system.h>
29 #include <interfaces/SwitchInterface.h>
30 
31 #include <memory>
32 #include <cstring>
33 #include <cstdlib>
34 #include <cstdio>
35 #include <cerrno>
36 #include <fcntl.h>
37 #ifdef __FreeBSD__
38 # include <sys/endian.h>
39 #elif defined(__MACH__) && defined(__APPLE__)
40 # include <sys/_endian.h>
41 #else
42 # include <endian.h>
43 #endif
44 #include <arpa/inet.h>
45 #include <sys/stat.h>
46 #include <sys/mman.h>
47 
48 using namespace fawkes;
49 
50 /** @class BBLoggerThread "log_thread.h"
51  * BlackBoard logger thread.
52  * One instance of this thread handles logging of one specific interface.
53  * The plugin will spawn as many threads as there are interfaces to log. This
54  * allows for maximum concurrency of the writers and avoids a serialization
55  * bottle neck.
56  * The log thread can operate in buffering mode. If this mode is disabled, the
57  * data is written to the file within the blackboard data changed event, and
58  * thus the writing operation can slow down the overall system, but memory
59  * requirements are low. This is useful if a lot of data is written or if the
60  * storage device is slow. If the mode is enabled, during the event the BB data
61  * will be copied into another memory segment and the thread will be woken up.
62  * Once the thread is running it stores all of the BB data segments bufferd
63  * up to then.
64  * The interface listener listens for events for a particular interface and
65  * then writes the changes to the file.
66  * @author Tim Niemueller
67  */
68 
69 /** Constructor.
70  * @param iface_uid interface UID which to log
71  * @param logdir directory to store config files, must exist
72  * @param buffering enable log buffering?
73  * @param flushing true to flush after each written chunk
74  * @param scenario ID of the log scenario
75  * @param start_time time to use as start time for the log
76  */
77 BBLoggerThread::BBLoggerThread(const char *iface_uid,
78  const char *logdir, bool buffering, bool flushing,
79  const char *scenario, fawkes::Time *start_time)
80  : Thread("BBLoggerThread", Thread::OPMODE_WAITFORWAKEUP),
81  BlackBoardInterfaceListener("BBLoggerThread(%s)", iface_uid)
82 {
84  set_name("BBLoggerThread(%s)", iface_uid);
85 
86  __buffering = buffering;
87  __flushing = flushing;
88  __uid = strdup(iface_uid);
89  __logdir = strdup(logdir);
90  __scenario = strdup(scenario);
91  __start = new Time(start_time);
92  __filename = NULL;
93  __queue_mutex = new Mutex();
94  __data_size = 0;
95  __is_master = false;
96  __enabled = true;
97 
98  __now = NULL;
99 
100  // Parse UID
101  Interface::parse_uid(__uid, &__type, &__id);
102 
103  char date[21];
104  Time now;
105  struct tm *tmp = localtime(&(now.get_timeval()->tv_sec));
106  strftime(date, 21, "%F-%H-%M-%S", tmp);
107 
108  if (asprintf(&__filename, "%s/%s-%s-%s-%s.log", LOGDIR, __scenario,
109  __type, __id, date) == -1) {
110  throw OutOfMemoryException("Cannot generate log name");
111  }
112 }
113 
114 
115 /** Destructor. */
117 {
118  free(__uid);
119  free(__type);
120  free(__id);
121  free(__logdir);
122  free(__scenario);
123  free(__filename);
124  delete __queue_mutex;
125  delete __start;
126 }
127 
128 
129 void
131 {
132  __queues[0].clear();
133  __queues[1].clear();
134  __act_queue = 0;
135 
136  __queue_mutex = new Mutex();
137  __data_size = 0;
138 
139  __now = NULL;
140  __num_data_items = 0;
141  __session_start = 0;
142 
143  // use open because fopen does not provide O_CREAT | O_EXCL
144  // open read/write because of usage of mmap
145  mode_t m = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
146  int fd = open(__filename, O_RDWR | O_CREAT | O_EXCL, m);
147  if ( ! fd ) {
148  throw CouldNotOpenFileException(__filename, errno, "Failed to open log 1");
149  } else {
150  __f_data = fdopen(fd, "w+");
151  if ( ! __f_data ) {
152  throw CouldNotOpenFileException(__filename, errno, "Failed to open log 2");
153  }
154  }
155 
156  try {
157  __iface = blackboard->open_for_reading(__type, __id);
158  __data_size = __iface->datasize();
159  } catch (Exception &e) {
160  fclose(__f_data);
161  throw;
162  }
163 
164  try {
165  write_header();
166  } catch (FileWriteException &e) {
167  blackboard->close(__iface);
168  fclose(__f_data);
169  throw;
170  }
171 
172  __now = new Time(clock);
173 
174  if (__is_master) {
175  try {
176  __switch_if = blackboard->open_for_writing<SwitchInterface>("BBLogger");
177  __switch_if->set_enabled(__enabled);
178  __switch_if->write();
179  bbil_add_message_interface(__switch_if);
180  } catch (Exception &e) {
181  fclose(__f_data);
182  throw;
183  }
184  }
185 
186  bbil_add_data_interface(__iface);
187  bbil_add_writer_interface(__iface);
188 
190 
191  logger->log_info(name(), "Logging %s to %s%s", __iface->uid(), __filename,
192  __is_master ? " as master" : "");
193 }
194 
195 
196 void
198 {
200  if (__is_master) {
201  blackboard->close(__switch_if);
202  }
203  update_header();
204  fclose(__f_data);
205  for (unsigned int q = 0; q < 2; ++q) {
206  while (!__queues[q].empty()) {
207  void *t = __queues[q].front();
208  free(t);
209  __queues[q].pop();
210  }
211  }
212  delete __now;
213  __now = NULL;
214 }
215 
216 
217 /** Get filename.
218  * @return file name, valid after object instantiated, but before init() does not
219  * mean that the file has been or can actually be opened
220  */
221 const char *
223 {
224  return __filename;
225 }
226 
227 
228 /** Enable or disable logging.
229  * @param enabled true to enable logging, false to disable
230  */
231 void
233 {
234  if (enabled && !__enabled) {
235  logger->log_info(name(), "Logging enabled",
236  (__num_data_items - __session_start));
237  __session_start = __num_data_items;
238  } else if (!enabled && __enabled) {
239  logger->log_info(name(), "Logging disabled (wrote %u entries), flushing",
240  (__num_data_items - __session_start));
241  update_header();
242  fflush(__f_data);
243  }
244 
245  __enabled = enabled;
246 }
247 
248 
249 /** Set threadlist and master status.
250  * This copies the thread list and sets this thread as master thread.
251  * If you intend to use this method you must do so before the thread is
252  * initialized. You may only ever declare one thread as master.
253  * @param thread_list list of threads to notify on enable/disable events
254  */
255 void
257 {
258  __is_master = true;
259  __threads = thread_list;
260 }
261 
262 void
263 BBLoggerThread::write_header()
264 {
265  bblog_file_header header;
266  memset(&header, 0, sizeof(header));
267  header.file_magic = htonl(BBLOGGER_FILE_MAGIC);
268  header.file_version = htonl(BBLOGGER_FILE_VERSION);
269 #if __BYTE_ORDER == __BIG_ENDIAN
270  header.endianess = BBLOG_BIG_ENDIAN;
271 #else
272  header.endianess = BBLOG_LITTLE_ENDIAN;
273 #endif
274  header.num_data_items = __num_data_items;
275  strncpy(header.scenario, (const char *)__scenario, BBLOG_SCENARIO_SIZE);
276  strncpy(header.interface_type, __iface->type(), BBLOG_INTERFACE_TYPE_SIZE);
277  strncpy(header.interface_id, __iface->id(), BBLOG_INTERFACE_ID_SIZE);
278  memcpy(header.interface_hash, __iface->hash(), BBLOG_INTERFACE_HASH_SIZE);
279  header.data_size = __iface->datasize();
280  long start_time_sec, start_time_usec;
281  __start->get_timestamp(start_time_sec, start_time_usec);
282  header.start_time_sec = start_time_sec;
283  header.start_time_usec = start_time_usec;
284  if (fwrite(&header, sizeof(header), 1, __f_data) != 1) {
285  throw FileWriteException(__filename, "Failed to write header");
286  }
287  fflush(__f_data);
288 }
289 
290 /** Updates the num_data_items field in the header. */
291 void
292 BBLoggerThread::update_header()
293 {
294  // write updated num_data_items field
295 #if _POSIX_MAPPED_FILES
296  void *h = mmap(NULL, sizeof(bblog_file_header), PROT_WRITE, MAP_SHARED,
297  fileno(__f_data), 0);
298  if (h == MAP_FAILED) {
299  logger->log_warn(name(), "Failed to mmap log (%s), "
300  "not updating number of data items",
301  strerror(errno));
302  } else {
303  bblog_file_header *header = (bblog_file_header *)h;
304  header->num_data_items = __num_data_items;
305  munmap(h, sizeof(bblog_file_header));
306  }
307 #else
308  logger->log_warn(name(), "Memory mapped files not available, "
309  "not updating number of data items on close");
310 #endif
311 }
312 
313 void
314 BBLoggerThread::write_chunk(const void *chunk)
315 {
316  bblog_entry_header ehead;
317  __now->stamp();
318  Time d = *__now - *__start;
319  long rel_time_sec, rel_time_usec;
320  d.get_timestamp(rel_time_sec, rel_time_usec);
321  ehead.rel_time_sec = rel_time_sec;
322  ehead.rel_time_usec = rel_time_usec;
323  if ( (fwrite(&ehead, sizeof(ehead), 1, __f_data) == 1) &&
324  (fwrite(chunk, __data_size, 1, __f_data) == 1) ) {
325  if (__flushing) fflush(__f_data);
326  __num_data_items += 1;
327  } else {
328  logger->log_warn(name(), "Failed to write chunk");
329  }
330 }
331 
332 
333 void
335 {
336  unsigned int write_queue = __act_queue;
337  __queue_mutex->lock();
338  __act_queue = 1 - __act_queue;
339  __queue_mutex->unlock();
340  LockQueue<void *> &queue = __queues[write_queue];
341  //logger->log_debug(name(), "Writing %zu entries", queue.size());
342  while (! queue.empty() ) {
343  void *c = queue.front();
344  write_chunk(c);
345  free(c);
346  queue.pop();
347  }
348 }
349 
350 bool
352  Message *message) throw()
353 {
356 
357  bool enabled = true;
358  if ((enm = dynamic_cast<SwitchInterface::EnableSwitchMessage *>(message)) != NULL) {
359  enabled = true;
360  } else if ((dism = dynamic_cast<SwitchInterface::DisableSwitchMessage *>(message)) != NULL) {
361  enabled = false;
362  } else {
363  logger->log_debug(name(), "Unhandled message type: %s via %s",
364  message->type(), interface->uid());
365  }
366 
367  for (ThreadList::iterator i = __threads.begin(); i != __threads.end(); ++i) {
368  BBLoggerThread *bblt = dynamic_cast<BBLoggerThread *>(*i);
369  bblt->set_enabled(enabled);
370  }
371 
372  __switch_if->set_enabled(__enabled);
373  __switch_if->write();
374 
375  return false;
376 }
377 
378 
379 void
381 {
382  if (!__enabled) return;
383 
384  try {
385  __iface->read();
386 
387  if ( __buffering ) {
388  void *c = malloc(__iface->datasize());
389  memcpy(c, __iface->datachunk(), __iface->datasize());
390  __queue_mutex->lock();
391  __queues[__act_queue].push_locked(c);
392  __queue_mutex->unlock();
393  wakeup();
394  } else {
395  __queue_mutex->lock();
396  write_chunk(__iface->datachunk());
397  __queue_mutex->unlock();
398  }
399 
400  } catch (Exception &e) {
401  logger->log_error(name(), "Exception when data changed");
402  logger->log_error(name(), e);
403  }
404 }
405 
406 void
408  unsigned int instance_serial) throw()
409 {
410  __session_start = __num_data_items;
411 }
412 
413 void
415  unsigned int instance_serial) throw()
416 {
417  logger->log_info(name(), "Writer removed (wrote %u entries), flushing",
418  (__num_data_items - __session_start));
419  update_header();
420  fflush(__f_data);
421 }
uint32_t rel_time_sec
time since start time, seconds
Definition: file.h:76
uint64_t start_time_sec
Start time, timestamp seconds.
Definition: file.h:68
File could not be opened.
Definition: system.h:53
char interface_id[BBLOG_INTERFACE_ID_SIZE]
Interface ID.
Definition: file.h:65
virtual bool bb_interface_message_received(fawkes::Interface *interface, fawkes::Message *message)
BlackBoard message received notification.
Definition: log_thread.cpp:351
BBLogger file header definition.
Definition: file.h:53
void clear()
Clear the queue.
Definition: lock_queue.h:158
Base class for all messages passed through interfaces in Fawkes BlackBoard.
Definition: message.h:43
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
const timeval * get_timeval() const
Obtain the timeval where the time is stored.
Definition: time.h:109
char scenario[BBLOG_SCENARIO_SIZE]
Scenario as defined in config.
Definition: file.h:62
Fawkes library namespace.
void unlock()
Unlock the mutex.
Definition: mutex.cpp:135
void bbil_add_writer_interface(Interface *interface)
Add an interface to the writer addition/removal watch list.
virtual void finalize()
Finalize the thread.
Definition: log_thread.cpp:197
const unsigned char * hash() const
Get interface hash.
Definition: interface.cpp:292
A class for handling time.
Definition: time.h:91
virtual void init()
Initialize the thread.
Definition: log_thread.cpp:130
virtual void unregister_listener(BlackBoardInterfaceListener *listener)
Unregister BB interface listener.
Definition: blackboard.cpp:200
Thread class encapsulation of pthreads.
Definition: thread.h:42
uint32_t endianess
Endianess, 0 little endian, 1 big endian.
Definition: file.h:58
void write()
Write from local copy into BlackBoard memory.
Definition: interface.cpp:495
Base class for all Fawkes BlackBoard interfaces.
Definition: interface.h:80
virtual ~BBLoggerThread()
Destructor.
Definition: log_thread.cpp:116
BlackBoard logger thread.
Definition: log_thread.h:46
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:44
virtual Interface * open_for_writing(const char *interface_type, const char *identifier)=0
Open interface for writing.
const char * uid() const
Get unique identifier of interface.
Definition: interface.cpp:660
virtual void bb_interface_writer_added(fawkes::Interface *interface, unsigned int instance_serial)
A writing instance has been opened for a watched interface.
Definition: log_thread.cpp:407
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:45
const char * id() const
Get identifier of interface.
Definition: interface.cpp:645
virtual void bb_interface_data_changed(fawkes::Interface *interface)
BlackBoard data changed notification.
Definition: log_thread.cpp:380
virtual void register_listener(BlackBoardInterfaceListener *listener, ListenerRegisterFlag flag=BBIL_FLAG_ALL)
Register BB event listener.
Definition: blackboard.cpp:174
List of threads.
Definition: thread_list.h:57
SwitchInterface Fawkes BlackBoard Interface.
unsigned char interface_hash[BBLOG_INTERFACE_HASH_SIZE]
Interface Hash.
Definition: file.h:66
unsigned int datasize() const
Get data size.
Definition: interface.cpp:529
void wakeup()
Wake up thread.
Definition: thread.cpp:979
uint32_t file_magic
Magic value to identify file, must be 0xFFBBFFBB (big endian)
Definition: file.h:54
void set_name(const char *format,...)
Set name of thread.
Definition: thread.cpp:749
Base class for exceptions in Fawkes.
Definition: exception.h:36
void read()
Read from BlackBoard into local copy.
Definition: interface.cpp:472
virtual void bb_interface_writer_removed(fawkes::Interface *interface, unsigned int instance_serial)
A writing instance has been closed for a watched interface.
Definition: log_thread.cpp:414
void set_enabled(const bool new_enabled)
Set enabled value.
Could not write to file.
Definition: system.h:68
BBLogger entry header.
Definition: file.h:75
DisableSwitchMessage Fawkes BlackBoard Interface Message.
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void log_error(const char *component, const char *format,...)=0
Log error message.
void get_timestamp(long &sec, long &usec) const
Get time stamp.
Definition: time.h:114
uint32_t num_data_items
Number of data items in file, if set to zero reader must scan the file for this number.
Definition: file.h:60
uint32_t data_size
size of one interface data block
Definition: file.h:67
void set_coalesce_wakeups(bool coalesce=true)
Set wakeup coalescing.
Definition: thread.cpp:729
void set_enabled(bool enabled)
Enable or disable logging.
Definition: log_thread.cpp:232
const char * name() const
Get name of thread.
Definition: thread.h:95
virtual void loop()
Code to execute in the thread.
Definition: log_thread.cpp:334
BBLoggerThread(const char *iface_uid, const char *logdir, bool buffering, bool flushing, const char *scenario, fawkes::Time *start_time)
Constructor.
Definition: log_thread.cpp:77
void set_threadlist(fawkes::ThreadList &thread_list)
Set threadlist and master status.
Definition: log_thread.cpp:256
EnableSwitchMessage Fawkes BlackBoard Interface Message.
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
void push_locked(const Type &x)
Push element to queue with lock protection.
Definition: lock_queue.h:139
uint32_t rel_time_usec
time since start time, microseconds
Definition: file.h:77
virtual Interface * open_for_reading(const char *interface_type, const char *identifier)=0
Open interface for reading.
uint64_t start_time_usec
Start time, timestamp microseconds.
Definition: file.h:69
void lock()
Lock this mutex.
Definition: mutex.cpp:89
Time & stamp()
Set this time to the current time.
Definition: time.cpp:783
const void * datachunk() const
Get data chunk.
Definition: interface.cpp:425
const char * get_filename() const
Get filename.
Definition: log_thread.cpp:222
Mutex mutual exclusion lock.
Definition: mutex.h:32
uint32_t file_version
File version, set to BBLOGGER_FILE_VERSION on write and verify on read (big endian) ...
Definition: file.h:56
void bbil_add_message_interface(Interface *interface)
Add an interface to the message received watch list.
const char * type() const
Get type of interface.
Definition: interface.cpp:635
char interface_type[BBLOG_INTERFACE_TYPE_SIZE]
Interface type.
Definition: file.h:64
System ran out of memory and desired operation could not be fulfilled.
Definition: system.h:32
BlackBoard interface listener.
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
Definition: blackboard.h:43
void bbil_add_data_interface(Interface *interface)
Add an interface to the data modification watch list.
virtual void close(Interface *interface)=0
Close interface.