OpenSync  0.22
opensync_queue.c
1 /*
2  * libosengine - A synchronization engine for the opensync framework
3  * Copyright (C) 2004-2005 Armin Bauer <armin.bauer@opensync.org>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18  *
19  */
20 
21 #include <fcntl.h>
22 #include <sys/poll.h>
23 
24 #include "opensync.h"
25 #include "opensync_internals.h"
26 
27 #include <sys/time.h>
28 #include <signal.h>
29 
30 typedef struct OSyncPendingMessage {
31  long long int id1;
32  int id2;
36  gpointer user_data;
38 
46 
47 static
48 gboolean _incoming_prepare(GSource *source, gint *timeout_)
49 {
50  *timeout_ = 1;
51  return FALSE;
52 }
53 
54 static
55 gboolean _incoming_check(GSource *source)
56 {
57  OSyncQueue *queue = *((OSyncQueue **)(source + 1));
58  if (g_async_queue_length(queue->incoming) > 0)
59  return TRUE;
60 
61  return FALSE;
62 }
63 
64 /* This function is called from the master thread. The function dispatched incoming data from
65  * the remote end */
66 static
67 gboolean _incoming_dispatch(GSource *source, GSourceFunc callback, gpointer user_data)
68 {
69  osync_trace(TRACE_ENTRY, "%s(%p)", __func__, user_data);
70  OSyncQueue *queue = user_data;
71 
72  OSyncMessage *message = NULL;
73  while ((message = g_async_queue_try_pop(queue->incoming))) {
74  /* We check of the message is a reply to something */
75  if (message->cmd == OSYNC_MESSAGE_REPLY || message->cmd == OSYNC_MESSAGE_ERRORREPLY) {
76 
77  /* Search for the pending reply. We have to lock the
78  * list since another thread might be duing the updates */
79  g_mutex_lock(queue->pendingLock);
80 
81  OSyncPendingMessage *found = NULL;
82 
83  GList *p = NULL;
84  for (p = queue->pendingReplies; p; p = p->next) {
85  OSyncPendingMessage *pending = p->data;
86 
87  if (pending->id1 == message->id1 && pending->id2 == message->id2) {
88 
89  /* Get the pending message from the queue */
90  queue->pendingReplies = g_list_remove(queue->pendingReplies, pending);
91  found = pending;
92  break;
93  }
94  }
95  g_mutex_unlock(queue->pendingLock);
96 
97  if (found) {
98  /* Call the callback of the pending message and free the message */
99  osync_assert(found->callback);
100  found->callback(message, found->user_data);
101 
102  g_free(found);
103  } else
104  osync_trace(TRACE_INTERNAL, "%s: No pending message for %lld:%d", __func__, message->id1, message->id2);
105 
106  } else
107  queue->message_handler(message, queue->user_data);
108 
109  osync_message_unref(message);
110  }
111 
112  osync_trace(TRACE_EXIT, "%s: Done dispatching", __func__);
113  return TRUE;
114 }
115 
116 static void _osync_queue_stop_incoming(OSyncQueue *queue)
117 {
118  if (queue->incoming_source) {
119  g_source_destroy(queue->incoming_source);
120  queue->incoming_source = NULL;
121  }
122 
123  if (queue->incomingContext) {
124  g_main_context_unref(queue->incomingContext);
125  queue->incomingContext = NULL;
126  }
127 
128  if (queue->incoming_functions) {
129  g_free(queue->incoming_functions);
130  queue->incoming_functions = NULL;
131  }
132 }
133 
134 static
135 gboolean _queue_prepare(GSource *source, gint *timeout_)
136 {
137  *timeout_ = 1;
138  return FALSE;
139 }
140 
141 static
142 gboolean _queue_check(GSource *source)
143 {
144  OSyncQueue *queue = *((OSyncQueue **)(source + 1));
145  if (g_async_queue_length(queue->outgoing) > 0)
146  return TRUE;
147  return FALSE;
148 }
149 
150 int _osync_queue_write_data(OSyncQueue *queue, const void *vptr, size_t n, OSyncError **error)
151 {
152  ssize_t nwritten = 0;
153 
154  while (n > 0) {
155  if ((nwritten = write(queue->fd, vptr, n)) <= 0) {
156  if (errno == EINTR)
157  nwritten = 0; /* and call write() again */
158  else {
159  osync_error_set(error, OSYNC_ERROR_IO_ERROR, "Unable to write IPC data: %i: %s", errno, strerror(errno));
160  return (-1); /* error */
161  }
162  }
163 
164  n -= nwritten;
165  vptr += nwritten;
166  }
167  return (nwritten);
168 }
169 
170 osync_bool _osync_queue_write_long_long_int(OSyncQueue *queue, const long long int message, OSyncError **error)
171 {
172  if (_osync_queue_write_data(queue, &message, sizeof(long long int), error) < 0)
173  return FALSE;
174 
175  return TRUE;
176 }
177 
178 osync_bool _osync_queue_write_int(OSyncQueue *queue, const int message, OSyncError **error)
179 {
180  if (_osync_queue_write_data(queue, &message, sizeof(int), error) < 0)
181  return FALSE;
182 
183  return TRUE;
184 }
185 
186 /* This function sends the data to the remote side. If there is an error, it sends an error
187  * message to the incoming queue */
188 static
189 gboolean _queue_dispatch(GSource *source, GSourceFunc callback, gpointer user_data)
190 {
191  OSyncQueue *queue = user_data;
192  OSyncError *error = NULL;
193 
194  OSyncMessage *message = NULL;
195 
196  while ((message = g_async_queue_try_pop(queue->outgoing))) {
197  /* Check if the queue is connected */
198  if (!queue->connected) {
199  osync_error_set(&error, OSYNC_ERROR_GENERIC, "Trying to send to a queue thats not connected");
200  goto error;
201  }
202 
203  /*FIXME: review usage of osync_marshal_get_size_message() */
204  if (!_osync_queue_write_int(queue, message->buffer->len + osync_marshal_get_size_message(message), &error))
205  goto error;
206 
207  if (!_osync_queue_write_int(queue, message->cmd, &error))
208  goto error;
209 
210  if (!_osync_queue_write_long_long_int(queue, message->id1, &error))
211  goto error;
212 
213  if (!_osync_queue_write_int(queue, message->id2, &error))
214  goto error;
215 
216  if (message->buffer->len) {
217  int sent = 0;
218  do {
219  int written = _osync_queue_write_data(queue, message->buffer->data + sent, message->buffer->len - sent, &error);
220  if (written < 0)
221  goto error;
222 
223  sent += written;
224  } while (sent < message->buffer->len);
225  }
226 
227  osync_message_unref(message);
228  }
229 
230  return TRUE;
231 
232 error:
233  if (message)
234  osync_message_unref(message);
235 
236  if (error) {
237  message = osync_message_new(OSYNC_MESSAGE_QUEUE_ERROR, 0, &error);
238  if (message) {
239  osync_marshal_error(message, error);
240  g_async_queue_push(queue->incoming, message);
241  }
242 
243  osync_error_free(&error);
244  }
245  return FALSE;
246 }
247 
248 static
249 gboolean _source_prepare(GSource *source, gint *timeout_)
250 {
251  *timeout_ = 1;
252  return FALSE;
253 }
254 
255 static
256 int _osync_queue_read_data(OSyncQueue *queue, void *vptr, size_t n, OSyncError **error)
257 {
258  size_t nleft;
259  ssize_t nread = 0;
260 
261  nleft = n;
262  while (n > 0) {
263  if ((nread = read(queue->fd, vptr, nleft)) < 0) {
264  if (errno == EINTR)
265  nread = 0; /* and call read() again */
266  else {
267  osync_error_set(error, OSYNC_ERROR_IO_ERROR, "Unable to read IPC data: %i: %s", errno, strerror(errno));
268  return (-1);
269  }
270  } else if (nread == 0)
271  break; /* EOF */
272 
273  nleft -= nread;
274  vptr += nread;
275  }
276  return (n - nleft); /* return >= 0 */
277 }
278 
279 static
280 osync_bool _osync_queue_read_int(OSyncQueue *queue, int *message, OSyncError **error)
281 {
282  int read = _osync_queue_read_data(queue, message, sizeof(int), error);
283 
284  if (read < 0)
285  return FALSE;
286 
287  if (read != sizeof(int)) {
288  osync_error_set(error, OSYNC_ERROR_IO_ERROR, "Unable to read int. EOF");
289  return FALSE;
290  }
291 
292  return TRUE;
293 }
294 
295 static
296 osync_bool _osync_queue_read_long_long_int(OSyncQueue *queue, long long int *message, OSyncError **error)
297 {
298  int read = _osync_queue_read_data(queue, message, sizeof(long long int), error);
299 
300  if (read < 0)
301  return FALSE;
302 
303  if (read != sizeof(long long int)) {
304  osync_error_set(error, OSYNC_ERROR_IO_ERROR, "Unable to read int. EOF");
305  return FALSE;
306  }
307 
308  return TRUE;
309 }
310 
311 static
312 gboolean _source_check(GSource *source)
313 {
314  OSyncQueue *queue = *((OSyncQueue **)(source + 1));
315  OSyncMessage *message = NULL;
316  OSyncError *error = NULL;
317 
318  if (queue->connected == FALSE) {
319  /* Ok. so we arent connected. lets check if there are pending replies. We cannot
320  * receive any data on the pipe, therefore, any pending replies will never
321  * be answered. So we return error messages for all of them. */
322  if (queue->pendingReplies) {
323  g_mutex_lock(queue->pendingLock);
324  osync_error_set(&error, OSYNC_ERROR_IO_ERROR, "Broken Pipe");
325  GList *p = NULL;
326  for (p = queue->pendingReplies; p; p = p->next) {
327  OSyncPendingMessage *pending = p->data;
328 
329  message = osync_message_new(OSYNC_MESSAGE_ERRORREPLY, 0, NULL);
330  if (message) {
331  osync_marshal_error(message, error);
332 
333  message->id1 = pending->id1;
334  message->id2 = pending->id2;
335 
336  g_async_queue_push(queue->incoming, message);
337  }
338  }
339 
340  osync_error_free(&error);
341  g_mutex_unlock(queue->pendingLock);
342  }
343 
344  return FALSE;
345  }
346 
347  switch (osync_queue_poll(queue)) {
348  case OSYNC_QUEUE_EVENT_NONE:
349  return FALSE;
350  case OSYNC_QUEUE_EVENT_READ:
351  return TRUE;
352  case OSYNC_QUEUE_EVENT_HUP:
353  case OSYNC_QUEUE_EVENT_ERROR:
354  queue->connected = FALSE;
355 
356  /* Now we can send the hup message, and wake up the consumer thread so
357  * it can pickup the messages in the incoming queue */
358  message = osync_message_new(OSYNC_MESSAGE_QUEUE_HUP, 0, &error);
359  if (!message)
360  goto error;
361 
362  g_async_queue_push(queue->incoming, message);
363 
364  if (queue->incomingContext)
365  g_main_context_wakeup(queue->incomingContext);
366  return FALSE;
367  }
368 
369  return FALSE;
370 
371 error:
372  message = osync_message_new(OSYNC_MESSAGE_QUEUE_ERROR, 0, &error);
373  if (message) {
374  osync_marshal_error(message, error);
375  g_async_queue_push(queue->incoming, message);
376  }
377  osync_error_free(&error);
378  return FALSE;
379 }
380 
381 /* This function reads from the file descriptor and inserts incoming data into the
382  * incoming queue */
383 static
384 gboolean _source_dispatch(GSource *source, GSourceFunc callback, gpointer user_data)
385 {
386  OSyncQueue *queue = user_data;
387  OSyncMessage *message = NULL;
388  OSyncError *error = NULL;
389 
390  do {
391  int size = 0;
392  int cmd = 0;
393  long long int id1 = 0;
394  int id2 = 0;
395 
396  if (!_osync_queue_read_int(queue, &size, &error))
397  goto error;
398 
399  if (!_osync_queue_read_int(queue, &cmd, &error))
400  goto error;
401 
402  if (!_osync_queue_read_long_long_int(queue, &id1, &error))
403  goto error;
404 
405  if (!_osync_queue_read_int(queue, &id2, &error))
406  goto error;
407 
408  message = osync_message_new(cmd, size, &error);
409  if (!message)
410  goto error;
411 
412  message->id1 = id1;
413  message->id2 = id2;
414 
415  if (size) {
416  int read = 0;
417  do {
418  int inc = _osync_queue_read_data(queue, message->buffer->data + read, size - read, &error);
419 
420  if (inc < 0)
421  goto error_free_message;
422 
423  if (inc == 0) {
424  osync_error_set(&error, OSYNC_ERROR_IO_ERROR, "Encountered EOF while data was missing");
425  goto error_free_message;
426  }
427 
428  read += inc;
429  } while (read < size);
430  }
431 
432  g_async_queue_push(queue->incoming, message);
433 
434  if (queue->incomingContext)
435  g_main_context_wakeup(queue->incomingContext);
436  } while (_source_check(queue->read_source));
437 
438  return TRUE;
439 
440 error_free_message:
441  osync_message_unref(message);
442 error:
443  if (error) {
444  message = osync_message_new(OSYNC_MESSAGE_QUEUE_ERROR, 0, &error);
445  if (message) {
446  osync_marshal_error(message, error);
447  g_async_queue_push(queue->incoming, message);
448  }
449 
450  osync_error_free(&error);
451  }
452 
453  return FALSE;
454 }
455 
461 OSyncQueue *osync_queue_new(const char *name, OSyncError **error)
462 {
463  osync_trace(TRACE_ENTRY, "%s(%s, %p)", __func__, name, error);
464 
465  OSyncQueue *queue = osync_try_malloc0(sizeof(OSyncQueue), error);
466  if (!queue)
467  goto error;
468 
469  if (name)
470  queue->name = g_strdup(name);
471  queue->fd = -1;
472 
473  if (!g_thread_supported ())
474  g_thread_init (NULL);
475 
476  queue->pendingLock = g_mutex_new();
477 
478  queue->context = g_main_context_new();
479 
480  queue->outgoing = g_async_queue_new();
481  queue->incoming = g_async_queue_new();
482 
483  osync_trace(TRACE_EXIT, "%s: %p", __func__, queue);
484  return queue;
485 
486 error:
487  osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
488  return NULL;
489 }
490 
491 /* Creates anonymous pipes which dont have to be created and are automatically connected.
492  *
493  * Lets assume parent wants to send, child wants to receive
494  *
495  * osync_queue_new_pipes()
496  * fork()
497  *
498  * Parent:
499  * connect(write_queue)
500  * disconnect(read_queue)
501  *
502  * Child:
503  * connect(read_queue)
504  * close(write_queue)
505  *
506  *
507  * */
508 osync_bool osync_queue_new_pipes(OSyncQueue **read_queue, OSyncQueue **write_queue, OSyncError **error)
509 {
510  osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, read_queue, write_queue, error);
511 
512  *read_queue = osync_queue_new(NULL, error);
513  if (!*read_queue)
514  goto error;
515 
516  *write_queue = osync_queue_new(NULL, error);
517  if (!*write_queue)
518  goto error_free_read_queue;
519 
520  int filedes[2];
521 
522  if (pipe(filedes) < 0) {
523  osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to create pipes");
524  goto error_free_write_queue;
525  }
526 
527  (*read_queue)->fd = filedes[0];
528  (*write_queue)->fd = filedes[1];
529 
530  osync_trace(TRACE_EXIT, "%s", __func__);
531  return TRUE;
532 
533 error_free_write_queue:
534  osync_queue_free(*write_queue);
535 error_free_read_queue:
536  osync_queue_free(*read_queue);
537 error:
538  osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
539  return FALSE;
540 }
541 
542 void osync_queue_free(OSyncQueue *queue)
543 {
544  osync_trace(TRACE_ENTRY, "%s(%p)", __func__, queue);
545  OSyncMessage *message = NULL;
546  OSyncPendingMessage *pending = NULL;
547 
548  g_mutex_free(queue->pendingLock);
549 
550  g_main_context_unref(queue->context);
551 
552  _osync_queue_stop_incoming(queue);
553 
554  while ((message = g_async_queue_try_pop(queue->incoming))) {
555  osync_message_unref(message);
556  }
557  g_async_queue_unref(queue->incoming);
558 
559  while ((message = g_async_queue_try_pop(queue->outgoing))) {
560  osync_message_unref(message);
561  }
562  g_async_queue_unref(queue->outgoing);
563 
564  while (queue->pendingReplies) {
565  pending = queue->pendingReplies->data;
566  g_free(pending);
567  queue->pendingReplies = g_list_remove(queue->pendingReplies, pending);
568  }
569 
570  if (queue->name)
571  g_free(queue->name);
572 
573  g_free(queue);
574 
575  osync_trace(TRACE_EXIT, "%s", __func__);
576 }
577 
578 osync_bool osync_queue_exists(OSyncQueue *queue)
579 {
580  return g_file_test(queue->name, G_FILE_TEST_EXISTS) ? TRUE : FALSE;
581 }
582 
583 osync_bool osync_queue_create(OSyncQueue *queue, OSyncError **error)
584 {
585  if (mkfifo(queue->name, 0600) != 0) {
586  osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to create fifo");
587  return FALSE;
588  }
589 
590  return TRUE;
591 }
592 
593 osync_bool osync_queue_remove(OSyncQueue *queue, OSyncError **error)
594 {
595  osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, queue, error);
596 
597  if (unlink(queue->name) != 0) {
598  osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to remove queue");
599  osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
600  return FALSE;
601  }
602 
603  osync_trace(TRACE_EXIT, "%s", __func__);
604  return TRUE;
605 }
606 
607 static osync_bool __osync_queue_connect(OSyncQueue *queue, OSyncQueueType type, osync_bool nonblocking, OSyncError **error)
608 {
609  osync_assert(queue);
610  osync_assert(queue->connected == FALSE);
611  OSyncQueue **queueptr = NULL;
612 
613  queue->type = type;
614 
615  if (queue->fd == -1) {
616  /* First, open the queue with the flags provided by the user */
617  int fd = open(queue->name, (type == OSYNC_QUEUE_SENDER ? O_WRONLY : O_RDONLY) | (nonblocking ? O_NONBLOCK : 0));
618  if (fd == -1) {
619  osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to open fifo");
620  goto error;
621  }
622  queue->fd = fd;
623 
624  int oldflags = fcntl(queue->fd, F_GETFD);
625  if (oldflags == -1) {
626  osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to get fifo flags");
627  goto error_close;
628  }
629  if (fcntl(queue->fd, F_SETFD, oldflags|FD_CLOEXEC) == -1) {
630  osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to set fifo flags");
631  goto error_close;
632  }
633  }
634 
635  queue->connected = TRUE;
636  signal(SIGPIPE, SIG_IGN);
637 
638  /* now we start a thread which handles reading/writing of the queue */
639  queue->thread = osync_thread_new(queue->context, error);
640 
641  if (!queue->thread)
642  goto error;
643 
644  queue->write_functions = g_malloc0(sizeof(GSourceFuncs));
645  queue->write_functions->prepare = _queue_prepare;
646  queue->write_functions->check = _queue_check;
647  queue->write_functions->dispatch = _queue_dispatch;
648  queue->write_functions->finalize = NULL;
649 
650  queue->write_source = g_source_new(queue->write_functions, sizeof(GSource) + sizeof(OSyncQueue *));
651  queueptr = (OSyncQueue **)(queue->write_source + 1);
652  *queueptr = queue;
653  g_source_set_callback(queue->write_source, NULL, queue, NULL);
654  g_source_attach(queue->write_source, queue->context);
655  g_main_context_ref(queue->context);
656 
657  queue->read_functions = g_malloc0(sizeof(GSourceFuncs));
658  queue->read_functions->prepare = _source_prepare;
659  queue->read_functions->check = _source_check;
660  queue->read_functions->dispatch = _source_dispatch;
661  queue->read_functions->finalize = NULL;
662 
663  queue->read_source = g_source_new(queue->read_functions, sizeof(GSource) + sizeof(OSyncQueue *));
664  queueptr = (OSyncQueue **)(queue->read_source + 1);
665  *queueptr = queue;
666  g_source_set_callback(queue->read_source, NULL, queue, NULL);
667  g_source_attach(queue->read_source, queue->context);
668  g_main_context_ref(queue->context);
669 
670  osync_thread_start(queue->thread);
671 
672  return TRUE;
673 
674 error_close:
675  close(queue->fd);
676 error:
677  return FALSE;
678 }
679 
680 
681 osync_bool osync_queue_connect(OSyncQueue *queue, OSyncQueueType type, OSyncError **error)
682 {
683  return __osync_queue_connect(queue, type, FALSE, error);
684 }
685 
686 osync_bool osync_queue_try_connect(OSyncQueue *queue, OSyncQueueType type, OSyncError **error)
687 {
688  return __osync_queue_connect(queue, type, TRUE, error);
689 }
690 
691 osync_bool osync_queue_disconnect(OSyncQueue *queue, OSyncError **error)
692 {
693  osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, queue, error);
694  osync_assert(queue);
695 
696  if (queue->thread) {
697  osync_thread_stop(queue->thread);
698  osync_thread_free(queue->thread);
699  queue->thread = NULL;
700  }
701 
702  //g_source_unref(queue->write_source);
703 
704  if (queue->write_functions)
705  g_free(queue->write_functions);
706 
707  //g_source_unref(queue->read_source);
708 
709  _osync_queue_stop_incoming(queue);
710 
711  /* We have to empty the incoming queue if we disconnect the queue. Otherwise, the
712  * consumer threads might try to pick up messages even after we are done. */
713  OSyncMessage *message = NULL;
714  while ((message = g_async_queue_try_pop(queue->incoming))) {
715  osync_message_unref(message);
716  }
717 
718  if (close(queue->fd) != 0) {
719  osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to close queue");
720  osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
721  return FALSE;
722  }
723 
724  queue->fd = -1;
725  queue->connected = FALSE;
726 
727  osync_trace(TRACE_EXIT, "%s", __func__);
728  return TRUE;
729 }
730 
731 osync_bool osync_queue_is_connected(OSyncQueue *queue)
732 {
733  osync_assert(queue);
734  return queue->connected;
735 }
736 
747 {
748  osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, queue, handler, user_data);
749 
750  queue->message_handler = handler;
751  queue->user_data = user_data;
752 
753  osync_trace(TRACE_EXIT, "%s", __func__);
754 }
755 
766 void osync_queue_setup_with_gmainloop(OSyncQueue *queue, GMainContext *context)
767 {
768  osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, queue, context);
769 
770  queue->incoming_functions = g_malloc0(sizeof(GSourceFuncs));
771  queue->incoming_functions->prepare = _incoming_prepare;
772  queue->incoming_functions->check = _incoming_check;
773  queue->incoming_functions->dispatch = _incoming_dispatch;
774  queue->incoming_functions->finalize = NULL;
775 
776  queue->incoming_source = g_source_new(queue->incoming_functions, sizeof(GSource) + sizeof(OSyncQueue *));
777  OSyncQueue **queueptr = (OSyncQueue **)(queue->incoming_source + 1);
778  *queueptr = queue;
779  g_source_set_callback(queue->incoming_source, NULL, queue, NULL);
780  g_source_attach(queue->incoming_source, context);
781  queue->incomingContext = context;
782  // For the source
783  g_main_context_ref(context);
784 
785  //To unref it later
786  g_main_context_ref(context);
787 
788  osync_trace(TRACE_EXIT, "%s", __func__);
789 }
790 
791 osync_bool osync_queue_dispatch(OSyncQueue *queue, OSyncError **error)
792 {
793  _incoming_dispatch(NULL, NULL, queue);
794  return TRUE;
795 }
796 
797 OSyncQueueEvent osync_queue_poll(OSyncQueue *queue)
798 {
799  struct pollfd pfd;
800  pfd.fd = queue->fd;
801  pfd.events = POLLIN;
802 
803  /* Here we poll on the queue. If we read on the queue, we either receive a
804  * POLLIN or POLLHUP. Since we cannot write to the queue, we can block pretty long here.
805  *
806  * If we are sending, we can only receive a POLLERR which means that the remote side has
807  * disconnected. Since we mainly dispatch the write IO, we dont want to block here. */
808  int ret = poll(&pfd, 1, queue->type == OSYNC_QUEUE_SENDER ? 0 : 100);
809 
810  if (ret < 0 && errno == EINTR)
811  return OSYNC_QUEUE_EVENT_NONE;
812 
813  if (ret == 0)
814  return OSYNC_QUEUE_EVENT_NONE;
815 
816  if (pfd.revents & POLLERR)
817  return OSYNC_QUEUE_EVENT_ERROR;
818  else if (pfd.revents & POLLHUP)
819  return OSYNC_QUEUE_EVENT_HUP;
820  else if (pfd.revents & POLLIN)
821  return OSYNC_QUEUE_EVENT_READ;
822 
823  return OSYNC_QUEUE_EVENT_ERROR;
824 }
825 
828 {
829  return g_async_queue_pop(queue->incoming);
830 }
831 
832 void gen_id(long long int *part1, int *part2)
833 {
834  struct timeval tv;
835  struct timezone tz;
836 
837  gettimeofday(&tv, &tz);
838 
839  long long int now = tv.tv_sec * 1000000 + tv.tv_usec;
840 
841  int rnd = (int)random();
842  rnd = rnd << 16 | getpid();
843 
844  *part1 = now;
845  *part2 = rnd;
846 }
847 
848 osync_bool osync_queue_send_message(OSyncQueue *queue, OSyncQueue *replyqueue, OSyncMessage *message, OSyncError **error)
849 {
850  osync_trace(TRACE_ENTRY, "%s(%p, %p, %p, %p)", __func__, queue, replyqueue, message, error);
851 
852  if (message->callback) {
853  osync_assert(replyqueue);
854  OSyncPendingMessage *pending = osync_try_malloc0(sizeof(OSyncPendingMessage), error);
855  if (!pending)
856  goto error;
857 
858  gen_id(&(message->id1), &(message->id2));
859  pending->id1 = message->id1;
860  pending->id2 = message->id2;
861 
862  pending->callback = message->callback;
863  pending->user_data = message->user_data;
864 
865  g_mutex_lock(replyqueue->pendingLock);
866  replyqueue->pendingReplies = g_list_append(replyqueue->pendingReplies, pending);
867  g_mutex_unlock(replyqueue->pendingLock);
868  }
869 
870  osync_message_ref(message);
871  g_async_queue_push(queue->outgoing, message);
872 
873  g_main_context_wakeup(queue->context);
874 
875  osync_trace(TRACE_EXIT, "%s", __func__);
876  return TRUE;
877 
878 error:
879  osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
880  return FALSE;
881 }
882 
883 osync_bool osync_queue_send_message_with_timeout(OSyncQueue *queue, OSyncQueue *replyqueue, OSyncMessage *message, int timeout, OSyncError **error)
884 {
885  osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, queue, message, error);
886 
887  /*TODO: add timeout handling */
888 
889  osync_bool ret = osync_queue_send_message(queue, replyqueue, message, error);
890 
891  osync_trace(ret ? TRACE_EXIT : TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
892  return ret;
893 }
894 
895 osync_bool osync_queue_is_alive(OSyncQueue *queue)
896 {
897 
898  if (!osync_queue_try_connect(queue, OSYNC_QUEUE_SENDER, NULL)) {
899  return FALSE;
900  }
901 
902  OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_NOOP, 0, NULL);
903  if (!message) {
904  return FALSE;
905  }
906 
907  if (!osync_queue_send_message(queue, NULL, message, NULL)) {
908  return FALSE;
909  }
910 
911  osync_queue_disconnect(queue, NULL);
912 
913  return TRUE;
914 }
OSyncMessageHandler message_handler
Represent an error.
void osync_queue_set_message_handler(OSyncQueue *queue, OSyncMessageHandler handler, gpointer user_data)
Sets the message handler for a queue.
OSyncMessageHandler callback
void(* OSyncMessageHandler)(OSyncMessage *message, void *user_data)
Function which can receive messages.
OSyncMessageCommand cmd
void osync_error_free(OSyncError **error)
Frees the error so it can be reused.
void osync_queue_setup_with_gmainloop(OSyncQueue *queue, GMainContext *context)
Sets the queue to use the gmainloop with the given context.
OSyncQueue * osync_queue_new(const char *name, OSyncError **error)
Creates a new asynchronous queue.
void * osync_try_malloc0(unsigned int size, OSyncError **error)
Safely tries to malloc memory.
Definition: opensync_env.c:796
GMainContext * context
GSourceFuncs * incoming_functions
void osync_error_set(OSyncError **error, OSyncErrorType type, const char *format,...)
Sets the error.
const char * osync_error_print(OSyncError **error)
Returns the message of the error.
void osync_trace(OSyncTraceType type, const char *message,...)
Used for tracing the application.
OSyncMessageHandler callback
Represents a Queue which can be used to receive messages.
OSyncMessage * osync_queue_get_message(OSyncQueue *queue)
OSyncMessage * osync_message_new(OSyncMessageCommand cmd, int size, OSyncError **error)
A Message used by the inter thread messaging library.