OpenSync  0.22
osengine_engine.c
1 /*
2  * libosengine - A synchronization engine for the opensync framework
3  * Copyright (C) 2004-2005 Armin Bauer <armin.bauer@opensync.org>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18  *
19  */
20 
21 #include "engine.h"
22 
23 #include <errno.h>
24 #include <sys/stat.h>
25 #include <sys/types.h>
26 
27 #include <glib.h>
28 
29 #include <opensync/opensync_support.h>
30 #include "opensync/opensync_message_internals.h"
31 #include "opensync/opensync_queue_internals.h"
32 #include "opensync/opensync_format_internals.h"
33 
34 #include "engine_internals.h"
35 #include <opensync/opensync_user_internals.h>
36 
37 OSyncMappingEntry *osengine_mappingtable_find_entry(OSyncMappingTable *table, const char *uid, const char *objtype, long long int memberid);
56 
57 void _new_change_receiver(OSyncEngine *engine, OSyncClient *client, OSyncChange *change)
58 {
59  osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, engine, client, change);
60 
61  OSyncError *error = NULL;
62  OSyncChangeType change_type = osync_change_get_changetype(change);
63  OSyncFormatEnv *format_env = osync_group_get_format_env(engine->group);
64  OSyncObjType *objtype = osync_change_get_objtype(change);
65  const char* uid = osync_change_get_uid(change);
66  OSyncObjFormat *objformat = osync_change_get_objformat(change);
67 
68  osync_change_set_member(change, client->member);
69 
70  osync_trace(TRACE_INTERNAL, "Handling new change with uid %s, changetype %i, objtype %s and format %s from member %lli", uid, change_type,
71  objtype ? osync_objtype_get_name(objtype) : "None", osync_change_get_objformat(change) ? osync_objformat_get_name(osync_change_get_objformat(change)) : "None",
72  osync_member_get_id(client->member));
73 
74 
81  if ( (change_type != CHANGE_DELETED) &&
82  (osync_change_has_data(change))) {
83  osync_bool is_file_objformat = FALSE;
84  if(objformat)
85  is_file_objformat =
86  ((!strcmp(objformat->name, "file"))?(TRUE):(FALSE));
87  if ( (!objtype) || (!objformat) ||
88  (!strcmp(osync_objtype_get_name(objtype), "data")) ||
89  (!strcmp(objformat->name, "plain"))) {
90  OSyncObjType *objtype_test = osync_change_detect_objtype_full(format_env, change, &error);
91  objtype = (objtype_test)?(objtype_test):(objtype);
92  }
93  if (objtype) {
94  osync_trace(TRACE_INTERNAL, "Detected the object to be of type %s", osync_objtype_get_name(objtype));
95 
96  osync_change_set_objtype(change, objtype);
97 
102  if ( ( (osync_group_get_slow_sync(engine->group,
103  osync_objtype_get_name(objtype))) ||
104  ( (!is_file_objformat) &&
105  (!osengine_mappingtable_find_entry(
106  engine->maptable, uid,
107  osync_objtype_get_name(objtype),
108  osync_member_get_id(client->member))) )
109  ) && (change_type == CHANGE_MODIFIED) ){
111  change_type = osync_change_get_changetype(change);
112  }
113  }
114  } else
115  if (change_type == CHANGE_DELETED){
121  if ( !objtype ||
122  (( !strcmp(osync_objtype_get_name(objtype), "data") ) &&
123  ( !osengine_mappingtable_find_entry(
124  engine->maptable, uid,
125  osync_objtype_get_name(objtype),
126  osync_member_get_id(client->member)) )) ){
127 
128  OSyncMappingEntry *entry =
129  osengine_mappingtable_find_entry(
130  engine->maptable, uid, NULL,
131  osync_member_get_id(client->member)
132  );
133  if (entry) {
136  entry->change));
137  objtype=osync_change_get_objtype(change);
138  } else {
139  osync_error_set(&error, OSYNC_ERROR_GENERIC,
140  "Could not find one entry with UID=%s to delete.", uid);
141  goto error;
142  }
143  }
144  } else {
145  osync_trace(TRACE_INTERNAL, "Change has no data!");
146  }
147 
148  osync_trace(TRACE_INTERNAL, "Handling new change with uid %s, changetype %i, data %p, size %i, objtype %s and format %s from member %lli", uid, change_type, osync_change_get_data(change), osync_change_get_datasize(change), objtype ? osync_objtype_get_name(objtype) : "None", osync_change_get_objformat(change) ? osync_objformat_get_name(osync_change_get_objformat(change)) : "None", osync_member_get_id(client->member));
149 
150  if (!objtype){
151  osync_error_set(&error, OSYNC_ERROR_GENERIC,
152  "ObjType not set for uid %s.", uid);
153  goto error;
154  }
155 
156 
157  OSyncMappingEntry *entry = osengine_mappingtable_store_change(engine->maptable, change);
158  change = entry->change;
159  if (!osync_change_save(change, TRUE, &error)) {
160  osync_error_duplicate(&engine->error, &error);
161  osync_status_update_change(engine, change, CHANGE_RECV_ERROR, &error);
162  osync_error_update(&engine->error, "Unable to receive one or more objects");
163  osync_flag_unset(entry->fl_has_data);
164  goto error;
165  }
166 
167  osync_group_remove_changelog(engine->group, change, &error);
168 
169  //We convert to the common format here to make sure we always pass it
170  osync_change_convert_to_common(change, NULL);
171 
172  if (!entry->mapping) {
173  osync_flag_attach(entry->fl_mapped, engine->cmb_entries_mapped);
174  osync_flag_unset(entry->fl_mapped);
175  osync_debug("ENG", 3, "+It has no mapping");
176  } else {
177  osync_debug("ENG", 3, "+It has mapping");
178  osync_flag_set(entry->fl_mapped);
179  osync_flag_unset(entry->mapping->fl_solved);
180  osync_flag_unset(entry->mapping->fl_chkconflict);
181  osync_flag_unset(entry->mapping->fl_multiplied);
182  }
183 
184  if (osync_change_has_data(change)) {
185  osync_debug("ENG", 3, "+It has data");
186  osync_flag_set(entry->fl_has_data);
187  osync_status_update_change(engine, change, CHANGE_RECEIVED, NULL);
188  } else {
189  osync_debug("ENG", 3, "+It has no data");
190  osync_flag_unset(entry->fl_has_data);
191  osync_status_update_change(engine, change, CHANGE_RECEIVED_INFO, NULL);
192  }
193 
195  osync_flag_set(entry->fl_deleted);
196 
197  osync_flag_set(entry->fl_has_info);
198  osync_flag_unset(entry->fl_synced);
199 
200  osengine_mappingentry_decider(engine, entry);
201 
202  osync_trace(TRACE_EXIT, "%s", __func__);
203  return;
204 
205 error:
206  osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(&error));
207  osync_error_free(&error);
208  return;
209 }
210 
211 OSyncClient *osengine_get_client(OSyncEngine *engine, long long int memberId)
212 {
213  GList *c = NULL;
214  for (c = engine->clients; c; c = c->next) {
215  OSyncClient *client = c->data;
216  if (osync_member_get_id(client->member) == memberId)
217  return client;
218  }
219  return NULL;
220 }
221 
222 
223 void send_engine_changed(OSyncEngine *engine)
224 {
225  if (!engine->is_initialized)
226  return;
227 
228  OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_ENGINE_CHANGED, 0, NULL);
229  /*FIXME: Handle errors here */
230 
231  osync_debug("ENG", 4, "Sending message %p:\"ENGINE_CHANGED\"", message);
232  osync_queue_send_message(engine->commands_to_self, NULL, message, NULL);
233 }
234 
235 void send_mapping_changed(OSyncEngine *engine, OSyncMapping *mapping)
236 {
237  OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_MAPPING_CHANGED, sizeof(long long), NULL);
238  osync_message_write_long_long_int(message, mapping->id);
239  /*FIXME: Handle errors here */
240 
241  osync_queue_send_message(engine->commands_to_self, NULL, message, NULL);
242  /*FIXME: Handle errors here, too */
243 }
244 
245 void send_mappingentry_changed(OSyncEngine *engine, OSyncMappingEntry *entry)
246 {
247  OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_MAPPINGENTRY_CHANGED, sizeof(long long)*2, NULL);
248 
249  /*FIXME: don't pass a pointer through the messaging system */
250  long long ptr = (long long)(long)entry;
251  osync_message_write_long_long_int(message, ptr);
252  /*FIXME: Handle errors here */
253 
254  osync_queue_send_message(engine->commands_to_self, NULL, message, NULL);
255  /*FIXME: Handle errors here, too */
256 }
257 
265 static void engine_message_handler(OSyncMessage *message, OSyncEngine *engine)
266 {
267  osync_trace(TRACE_ENTRY, "engine_message_handler(%p:%lli-%i, %p)", message, message->id1, message->id2, engine);
268 
269  OSyncChange *change = NULL;
270 
271  osync_trace(TRACE_INTERNAL, "engine received command %i", osync_message_get_command(message));
272 
273  switch (osync_message_get_command(message)) {
274  case OSYNC_MESSAGE_SYNCHRONIZE:
275  osync_trace(TRACE_INTERNAL, "all deciders");
276  osengine_client_all_deciders(engine);
277  break;
278  case OSYNC_MESSAGE_NEW_CHANGE:
279  osync_demarshal_change(message, osync_group_get_format_env(engine->group), &change);
280 
281  long long int member_id = 0;
282  osync_message_read_long_long_int(message, &member_id);
283  OSyncClient *sender = osengine_get_client(engine, member_id);
284 
285  _new_change_receiver(engine, sender, change);
286  break;
287  case OSYNC_MESSAGE_ENGINE_CHANGED:
288  osengine_client_all_deciders(engine);
289  osengine_mapping_all_deciders(engine);
290  GList *u;
291  for (u = engine->maptable->unmapped; u; u = u->next) {
292  OSyncMappingEntry *unmapped = u->data;
293  send_mappingentry_changed(engine, unmapped);
294  }
295  break;
296  case OSYNC_MESSAGE_MAPPING_CHANGED:
297  {
298  long long id;
299  osync_message_read_long_long_int(message, &id);
300  /*FIXME: check errors by read_long_long_int */
301  OSyncMapping *mapping = osengine_mappingtable_mapping_from_id(engine->maptable, id);
302 
303  if (!g_list_find(engine->maptable->mappings, mapping)) {
304  osync_trace(TRACE_EXIT, "%s: Mapping %p is dead", __func__, mapping);
305  return;
306  }
307 
308  osengine_mapping_decider(engine, mapping);
309  }
310  break;
311  case OSYNC_MESSAGE_MAPPINGENTRY_CHANGED:
312  {
313  long long ptr;
314  osync_message_read_long_long_int(message, &ptr);
315  OSyncMappingEntry *entry = (OSyncMappingEntry*)(long)ptr;
316 
317  if (!g_list_find(engine->maptable->entries, entry) && !g_list_find(engine->maptable->unmapped, entry)) {
318  osync_trace(TRACE_EXIT, "%s: Entry %p is dead", __func__, entry);
319  return;
320  }
321 
322  osengine_mappingentry_decider(engine, entry);
323  }
324  break;
325  case OSYNC_MESSAGE_SYNC_ALERT:
326  if (engine->allow_sync_alert)
327  osync_flag_set(engine->fl_running);
328  else
329  osync_trace(TRACE_INTERNAL, "Sync Alert not allowed");
330  break;
331 
332  default:
333  break;
334  }
335 
336  /*TODO: Implement handling of the messages listed below, on commented code */
337 
338  /*
339  if (osync_message_is_signal (message, "CLIENT_CHANGED")) {
340  OSyncClient *client = osync_message_get_data(message, "client");
341 
342  if (!g_list_find(engine->clients, client)) {
343  osync_trace(TRACE_EXIT, "%s: Client %p is dead", __func__, client);
344  return;
345  }
346 
347  osengine_client_decider(engine, client);
348  osync_trace(TRACE_EXIT, "engine_message_handler");
349  return;
350  }
351 
352  if (osync_message_is_signal (message, "PLUGIN_MESSAGE")) {
353  char *name = osync_message_get_data(message, "name");
354  void *data = osync_message_get_data(message, "data");
355  engine->plgmsg_callback(engine, sender, name, data, engine->plgmsg_userdata);
356  osync_trace(TRACE_EXIT, "engine_message_handler");
357  return;
358  }
359 
360  osync_debug("ENG", 0, "Unknown message \"%s\"", osync_message_get_msgname(message));
361  osync_trace(TRACE_EXIT_ERROR, "engine_message_handler: Unknown message");
362  g_assert_not_reached();*/
363  osync_trace(TRACE_EXIT, "%s", __func__);
364 }
365 
366 static void trigger_clients_sent_changes(OSyncEngine *engine)
367 {
368  osync_trace(TRACE_ENTRY, "%s(%p)", __func__, engine);
369  osync_status_update_engine(engine, ENG_ENDPHASE_READ, NULL);
370 
371  g_mutex_lock(engine->info_received_mutex);
372  g_cond_signal(engine->info_received);
373  g_mutex_unlock(engine->info_received_mutex);
374 
375  //Load the old mappings
376  osengine_mappingtable_inject_changes(engine->maptable);
377 
378  send_engine_changed(engine);
379  osync_trace(TRACE_EXIT, "%s", __func__);
380 }
381 
382 static void trigger_clients_read_all(OSyncEngine *engine)
383 {
384  osync_trace(TRACE_ENTRY, "%s(%p)", __func__, engine);
385 
386  send_engine_changed(engine);
387  osync_trace(TRACE_EXIT, "%s", __func__);
388 }
389 
390 static void trigger_status_end_conflicts(OSyncEngine *engine)
391 {
392  osync_trace(TRACE_ENTRY, "%s(%p)", __func__, engine);
393  osync_status_update_engine(engine, ENG_END_CONFLICTS, NULL);
394 
395  osync_trace(TRACE_EXIT, "%s", __func__);
396 }
397 
398 static void trigger_clients_connected(OSyncEngine *engine)
399 {
400  osync_trace(TRACE_ENTRY, "%s(%p)", __func__, engine);
401  osync_status_update_engine(engine, ENG_ENDPHASE_CON, NULL);
402  osengine_client_all_deciders(engine);
403 
404  osync_trace(TRACE_EXIT, "%s", __func__);
405 }
406 
407 static void trigger_clients_comitted_all(OSyncEngine *engine)
408 {
409  osync_trace(TRACE_ENTRY, "%s(%p)", __func__, engine);
410  osync_status_update_engine(engine, ENG_ENDPHASE_WRITE, NULL);
411 
412  osync_trace(TRACE_EXIT, "%s", __func__);
413 }
414 
415 
416 /*void send_engine_committed_all(OSyncEngine *engine)
417 {
418  osync_trace(TRACE_ENTRY, "%s(%p)", __func__, engine);
419 
420  engine->committed_all_sent = TRUE;
421 
422  osync_trace(TRACE_INTERNAL, "++++ ENGINE COMMAND: Committed all ++++");
423 
424  GList *c = NULL;
425  for (c = engine->clients; c; c = c->next) {
426  OSyncClient *client = c->data;
427  if (osync_flag_is_not_set(client->fl_committed_all))
428  send_committed_all(client, engine);
429  }
430 
431  osync_trace(TRACE_EXIT, "%s", __func__);
432 }
433 
434 static void trigger_engine_committed_all(OSyncEngine *engine)
435 {
436  osync_trace(TRACE_ENTRY, "%s(%p)", __func__, engine);
437 
438  if (osync_flag_is_not_set(engine->cmb_multiplied)) {
439  osync_trace(TRACE_EXIT, "%s: Not multiplied yet", __func__);
440  return;
441  }
442 
443  send_engine_committed_all(engine);
444 
445  osync_trace(TRACE_EXIT, "%s", __func__);
446 }*/
447 
448 static gboolean startupfunc(gpointer data)
449 {
450  OSyncEngine *engine = data;
451  osync_trace(TRACE_INTERNAL, "+++++++++ This is the engine of group \"%s\" +++++++++", osync_group_get_name(engine->group));
452 
453  OSyncError *error = NULL;
454  if (!osengine_mappingtable_load(engine->maptable, &error)) {
455  osync_error_duplicate(&engine->error, &error);
456  osync_status_update_engine(engine, ENG_ERROR, &error);
457  osync_error_update(&engine->error, "Unable to connect one of the members");
458  osync_flag_set(engine->fl_stop);
459  }
460 
461  g_mutex_lock(engine->started_mutex);
462  g_cond_signal(engine->started);
463  g_mutex_unlock(engine->started_mutex);
464  return FALSE;
465 }
466 
478 
489 osync_bool osengine_reset(OSyncEngine *engine, OSyncError **error)
490 {
491  //FIXME Check if engine is running
492  osync_trace(TRACE_ENTRY, "osengine_reset(%p, %p)", engine, error);
493  GList *c = NULL;
494  for (c = engine->clients; c; c = c->next) {
495  OSyncClient *client = c->data;
496  osync_client_reset(client);
497  }
498 
499  osync_flag_set_state(engine->fl_running, FALSE);
500  osync_flag_set_state(engine->fl_stop, FALSE);
501  osync_flag_set_state(engine->cmb_sent_changes, FALSE);
502  osync_flag_set_state(engine->cmb_entries_mapped, TRUE);
503  osync_flag_set_state(engine->cmb_synced, TRUE);
504  osync_flag_set_state(engine->cmb_chkconflict, TRUE);
505  osync_flag_set_state(engine->cmb_finished, FALSE);
506  osync_flag_set_state(engine->cmb_connected, FALSE);
507  osync_flag_set_state(engine->cmb_read_all, TRUE);
508  osync_flag_set_state(engine->cmb_committed_all, TRUE);
509  osync_flag_set_state(engine->cmb_committed_all_sent, FALSE);
510 
511  osync_status_update_engine(engine, ENG_ENDPHASE_DISCON, NULL);
512 
513  engine->committed_all_sent = FALSE;
514 
515  osengine_mappingtable_reset(engine->maptable);
516 
517  if (engine->error) {
518  //FIXME We might be leaking memory here
519  OSyncError *newerror = NULL;
520  osync_error_duplicate(&newerror, &engine->error);
521  osync_status_update_engine(engine, ENG_ERROR, &newerror);
522  osync_group_set_slow_sync(engine->group, "data", TRUE);
523  } else {
524  osync_status_update_engine(engine, ENG_SYNC_SUCCESSFULL, NULL);
525  osync_group_reset_slow_sync(engine->group, "data");
526  }
527 
528  osync_trace(TRACE_INTERNAL, "engine error is %p", engine->error);
529 
530  g_mutex_lock(engine->syncing_mutex);
531  g_cond_signal(engine->syncing);
532  g_mutex_unlock(engine->syncing_mutex);
533 
534  osync_trace(TRACE_EXIT, "osengine_reset");
535  return TRUE;
536 }
537 
538 /* Implementation of g_mkdir_with_parents()
539  *
540  * This function overwrite the contents of the 'dir' parameter
541  */
542 static int __mkdir_with_parents(char *dir, int mode)
543 {
544  if (g_file_test(dir, G_FILE_TEST_IS_DIR))
545  return 0;
546 
547  char *slash = strrchr(dir, '/');
548  if (slash && slash != dir) {
549  /* Create parent directory if needed */
550 
551  /* This is a trick: I don't want to allocate a new string
552  * for the parent directory. So, just put a NUL char
553  * in the last slash, and restore it after creating the
554  * parent directory
555  */
556  *slash = '\0';
557  if (__mkdir_with_parents(dir, mode) < 0)
558  return -1;
559  *slash = '/';
560  }
561 
562  if (mkdir(dir, mode) < 0)
563  return -1;
564 
565  return 0;
566 }
567 
568 static int mkdir_with_parents(const char *dir, int mode)
569 {
570  int r;
571  char *mydir = strdup(dir);
572  if (!mydir)
573  return -1;
574 
575  r = __mkdir_with_parents(mydir, mode);
576  free(mydir);
577  return r;
578 }
579 
589 OSyncEngine *osengine_new(OSyncGroup *group, OSyncError **error)
590 {
591  osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, group, error);
592 
593  g_assert(group);
594  OSyncEngine *engine = g_malloc0(sizeof(OSyncEngine));
595  osync_group_set_data(group, engine);
596 
597  if (!g_thread_supported ())
598  g_thread_init (NULL);
599 
600  engine->context = g_main_context_new();
601  engine->syncloop = g_main_loop_new(engine->context, FALSE);
602  engine->group = group;
603 
604  OSyncUserInfo *user = osync_user_new(error);
605  if (!user)
606  goto error;
607 
608  char *enginesdir = g_strdup_printf("%s/engines", osync_user_get_confdir(user));
609  char *path = g_strdup_printf("%s/enginepipe", enginesdir);
610 
611  if (mkdir_with_parents(enginesdir, 0755) < 0) {
612  osync_error_set(error, OSYNC_ERROR_GENERIC, "Couldn't create engines directory: %s", strerror(errno));
613  goto error_free_paths;
614  }
615 
616  engine->syncing_mutex = g_mutex_new();
617  engine->info_received_mutex = g_mutex_new();
618  engine->syncing = g_cond_new();
619  engine->info_received = g_cond_new();
620  engine->started_mutex = g_mutex_new();
621  engine->started = g_cond_new();
622 
623  //Set the default start flags
624  engine->fl_running = osync_flag_new(NULL);
625  osync_flag_set_pos_trigger(engine->fl_running, (OSyncFlagTriggerFunc)osengine_client_all_deciders, engine, NULL);
626 
627  engine->fl_sync = osync_flag_new(NULL);
628  engine->fl_stop = osync_flag_new(NULL);
629  osync_flag_set_pos_trigger(engine->fl_stop, (OSyncFlagTriggerFunc)osengine_client_all_deciders, engine, NULL);
630 
631  //The combined flags
632  engine->cmb_sent_changes = osync_comb_flag_new(FALSE, FALSE);
633  osync_flag_set_pos_trigger(engine->cmb_sent_changes, (OSyncFlagTriggerFunc)trigger_clients_sent_changes, engine, NULL);
634 
635  engine->cmb_read_all = osync_comb_flag_new(FALSE, TRUE);
636  osync_flag_set_pos_trigger(engine->cmb_read_all, (OSyncFlagTriggerFunc)trigger_clients_read_all, engine, NULL);
637 
638  engine->cmb_entries_mapped = osync_comb_flag_new(FALSE, FALSE);
639  osync_flag_set_pos_trigger(engine->cmb_entries_mapped, (OSyncFlagTriggerFunc)send_engine_changed, engine, NULL);
640 
641 
642  engine->cmb_synced = osync_comb_flag_new(FALSE, TRUE);
643  osync_flag_set_pos_trigger(engine->cmb_synced, (OSyncFlagTriggerFunc)send_engine_changed, engine, NULL);
644 
645 
646  engine->cmb_finished = osync_comb_flag_new(FALSE, TRUE);
647  osync_flag_set_pos_trigger(engine->cmb_finished, (OSyncFlagTriggerFunc)osengine_reset, engine, NULL);
648 
649  engine->cmb_connected = osync_comb_flag_new(FALSE, FALSE);
650  osync_flag_set_pos_trigger(engine->cmb_connected, (OSyncFlagTriggerFunc)trigger_clients_connected, engine, NULL);
651 
652  engine->cmb_chkconflict = osync_comb_flag_new(FALSE, TRUE);
653  osync_flag_set_pos_trigger(engine->cmb_chkconflict, (OSyncFlagTriggerFunc)trigger_status_end_conflicts, engine, NULL);
654 
655  engine->cmb_multiplied = osync_comb_flag_new(FALSE, TRUE);
656 
657  engine->cmb_committed_all = osync_comb_flag_new(FALSE, TRUE);
658  osync_flag_set_pos_trigger(engine->cmb_committed_all, (OSyncFlagTriggerFunc)send_engine_changed, engine, NULL);
659 
660 
661  engine->cmb_committed_all_sent = osync_comb_flag_new(FALSE, TRUE);
662  osync_flag_set_pos_trigger(engine->cmb_committed_all_sent, (OSyncFlagTriggerFunc)trigger_clients_comitted_all, engine, NULL);
663 
664  osync_flag_set(engine->fl_sync);
665 
666  int i;
667  for (i = 0; i < osync_group_num_members(group); i++) {
668  OSyncMember *member = osync_group_nth_member(group, i);
669  if (!osync_client_new(engine, member, error))
670  goto error_free_paths;
671  }
672 
673  engine->maptable = osengine_mappingtable_new(engine);
674 
675  osync_trace(TRACE_EXIT, "osengine_new: %p", engine);
676  return engine;
677 
678 error_free_paths:
679  g_free(path);
680  g_free(enginesdir);
681 error:
682  osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
683  return NULL;
684 }
685 
693 void osengine_free(OSyncEngine *engine)
694 {
695  osync_trace(TRACE_ENTRY, "osengine_free(%p)", engine);
696 
697  GList *c = NULL;
698  for (c = engine->clients; c; c = c->next) {
699  OSyncClient *client = c->data;
700  osync_client_free(client);
701  }
702 
703  osengine_mappingtable_free(engine->maptable);
704  engine->maptable = NULL;
705 
706  osync_flag_free(engine->fl_running);
707  osync_flag_free(engine->fl_sync);
708  osync_flag_free(engine->fl_stop);
709  osync_flag_free(engine->cmb_sent_changes);
710  osync_flag_free(engine->cmb_entries_mapped);
711  osync_flag_free(engine->cmb_synced);
712  osync_flag_free(engine->cmb_chkconflict);
713  osync_flag_free(engine->cmb_finished);
714  osync_flag_free(engine->cmb_connected);
715  osync_flag_free(engine->cmb_read_all);
716  osync_flag_free(engine->cmb_multiplied);
717  osync_flag_free(engine->cmb_committed_all);
718  osync_flag_free(engine->cmb_committed_all_sent);
719 
720  g_list_free(engine->clients);
721  g_main_loop_unref(engine->syncloop);
722 
723  g_main_context_unref(engine->context);
724 
725  g_mutex_free(engine->syncing_mutex);
726  g_mutex_free(engine->info_received_mutex);
727  g_cond_free(engine->syncing);
728  g_cond_free(engine->info_received);
729  g_mutex_free(engine->started_mutex);
730  g_cond_free(engine->started);
731 
732  g_free(engine);
733  osync_trace(TRACE_EXIT, "osengine_free");
734 }
735 
745 void osengine_set_conflict_callback(OSyncEngine *engine, void (* function) (OSyncEngine *, OSyncMapping *, void *), void *user_data)
746 {
747  engine->conflict_callback = function;
748  engine->conflict_userdata = user_data;
749 }
750 
760 void osengine_set_changestatus_callback(OSyncEngine *engine, void (* function) (OSyncEngine *, OSyncChangeUpdate *, void *), void *user_data)
761 {
762  engine->changestat_callback = function;
763  engine->changestat_userdata = user_data;
764 }
765 
775 void osengine_set_mappingstatus_callback(OSyncEngine *engine, void (* function) (OSyncMappingUpdate *, void *), void *user_data)
776 {
777  engine->mapstat_callback = function;
778  engine->mapstat_userdata = user_data;
779 }
780 
790 void osengine_set_enginestatus_callback(OSyncEngine *engine, void (* function) (OSyncEngine *, OSyncEngineUpdate *, void *), void *user_data)
791 {
792  engine->engstat_callback = function;
793  engine->engstat_userdata = user_data;
794 }
795 
805 void osengine_set_memberstatus_callback(OSyncEngine *engine, void (* function) (OSyncMemberUpdate *, void *), void *user_data)
806 {
807  engine->mebstat_callback = function;
808  engine->mebstat_userdata = user_data;
809 }
810 
820 void osengine_set_message_callback(OSyncEngine *engine, void *(* function) (OSyncEngine *, OSyncClient *, const char *, void *, void *), void *user_data)
821 {
822  engine->plgmsg_callback = function;
823  engine->plgmsg_userdata = user_data;
824 }
825 
837 osync_bool osengine_init(OSyncEngine *engine, OSyncError **error)
838 {
839  osync_trace(TRACE_ENTRY, "osengine_init(%p, %p)", engine, error);
840 
841  if (engine->is_initialized) {
842  osync_error_set(error, OSYNC_ERROR_MISCONFIGURATION, "This engine was already initialized");
843  osync_trace(TRACE_EXIT_ERROR, "osengine_init: %s", osync_error_print(error));
844  return FALSE;
845  }
846 
847  switch (osync_group_lock(engine->group)) {
848  case OSYNC_LOCKED:
849  osync_error_set(error, OSYNC_ERROR_LOCKED, "Group is locked");
850  osync_trace(TRACE_EXIT_ERROR, "osengine_init: %s", osync_error_print(error));
851  return FALSE;
852  case OSYNC_LOCK_STALE:
853  osync_debug("ENG", 1, "Detected stale lock file. Slow-syncing");
854  osync_status_update_engine(engine, ENG_PREV_UNCLEAN, NULL);
855  osync_group_set_slow_sync(engine->group, "data", TRUE);
856  break;
857  default:
858  break;
859  }
860 
861  osync_flag_set(engine->cmb_entries_mapped);
862  osync_flag_set(engine->cmb_synced);
863  engine->allow_sync_alert = TRUE;
864 
865  //OSyncMember *member = NULL;
866  OSyncGroup *group = engine->group;
867 
868  if (osync_group_num_members(group) < 2) {
869  //Not enough members!
870  osync_error_set(error, OSYNC_ERROR_MISCONFIGURATION, "You only configured %i members, but at least 2 are needed", osync_group_num_members(group));
871  osync_group_unlock(engine->group, TRUE);
872  osync_trace(TRACE_EXIT_ERROR, "osengine_init: %s", osync_error_print(error));
873  return FALSE;
874  }
875 
876  engine->is_initialized = TRUE;
877 
878  osync_trace(TRACE_INTERNAL, "Spawning clients");
879  GList *c = NULL;
880  for (c = engine->clients; c; c = c->next) {
881  OSyncClient *client = c->data;
882  osync_queue_create(client->commands_from_osplugin, NULL);
883 
884  if (!osync_client_spawn(client, engine, error)) {
885  osync_group_unlock(engine->group, TRUE);
886  osync_trace(TRACE_EXIT_ERROR, "osengine_init: %s", osync_error_print(error));
887  return FALSE;
888  }
889 
890  osync_queue_set_message_handler(client->commands_from_osplugin, (OSyncMessageHandler)engine_message_handler, engine);
891  if (!(engine->man_dispatch))
892  osync_queue_setup_with_gmainloop(client->commands_from_osplugin, engine->context);
893  osync_trace(TRACE_INTERNAL, "opening client queue");
894  if (!osync_queue_connect(client->commands_from_osplugin, OSYNC_QUEUE_RECEIVER, 0 )) {
895  osync_group_unlock(engine->group, TRUE);
896  osync_trace(TRACE_EXIT_ERROR, "osengine_init: %s", osync_error_print(error));
897  return FALSE;
898  }
899  }
900 
901  osync_trace(TRACE_INTERNAL, "opening engine queue");
902  if (!osync_queue_new_pipes(&engine->commands_from_self, &engine->commands_to_self, error)) {
903  osync_group_unlock(engine->group, TRUE);
904  osync_trace(TRACE_EXIT_ERROR, "osengine_init: %s", osync_error_print(error));
905  return FALSE;
906  }
907 
908  if (!osync_queue_connect(engine->commands_from_self, OSYNC_QUEUE_RECEIVER, 0 )) {
909  osync_group_unlock(engine->group, TRUE);
910  osync_trace(TRACE_EXIT_ERROR, "osengine_init: %s", osync_error_print(error));
911  return FALSE;
912  }
913 
914  if (!osync_queue_connect(engine->commands_to_self, OSYNC_QUEUE_SENDER, 0 )) {
915  osync_group_unlock(engine->group, TRUE);
916  osync_trace(TRACE_EXIT_ERROR, "osengine_init: %s", osync_error_print(error));
917  return FALSE;
918  }
919 
920  osync_queue_set_message_handler(engine->commands_from_self, (OSyncMessageHandler)engine_message_handler, engine);
921  if (!(engine->man_dispatch))
922  osync_queue_setup_with_gmainloop(engine->commands_from_self, engine->context);
923 
924  osync_trace(TRACE_INTERNAL, "initializing clients");
925  for (c = engine->clients; c; c = c->next) {
926  OSyncClient *client = c->data;
927  if (!osync_client_init(client, engine, error)) {
928  osengine_finalize(engine);
929  osync_group_unlock(engine->group, TRUE);
930  osync_trace(TRACE_EXIT_ERROR, "osengine_init: %s", osync_error_print(error));
931  return FALSE;
932  }
933  }
934 
935  osync_debug("ENG", 3, "Running the main loop");
936 
937  //Now we can run the main loop
938  //We protect the startup by a g_cond
939  g_mutex_lock(engine->started_mutex);
940  GSource *idle = g_idle_source_new();
941  g_source_set_priority(idle, G_PRIORITY_HIGH);
942  g_source_set_callback(idle, startupfunc, engine, NULL);
943  g_source_attach(idle, engine->context);
944  engine->thread = g_thread_create ((GThreadFunc)g_main_loop_run, engine->syncloop, TRUE, NULL);
945  g_cond_wait(engine->started, engine->started_mutex);
946  g_mutex_unlock(engine->started_mutex);
947 
948  osync_trace(TRACE_EXIT, "osengine_init");
949  return TRUE;
950 }
951 
960 void osengine_finalize(OSyncEngine *engine)
961 {
962  //FIXME check if engine is running
963  osync_trace(TRACE_ENTRY, "osengine_finalize(%p)", engine);
964 
965  if (!engine->is_initialized) {
966  osync_trace(TRACE_EXIT_ERROR, "osengine_finalize: Not initialized");
967  return;
968  }
969 
970  g_assert(engine);
971  osync_debug("ENG", 3, "finalizing engine %p", engine);
972 
973  if (engine->thread) {
974  g_main_loop_quit(engine->syncloop);
975  g_thread_join(engine->thread);
976  }
977 
978  GList *c = NULL;
979  for (c = engine->clients; c; c = c->next) {
980  OSyncClient *client = c->data;
981  osync_queue_disconnect(client->commands_from_osplugin, NULL);
982  osync_client_finalize(client, NULL);
983  }
984 
985  osync_queue_disconnect(engine->commands_from_self, NULL);
986  osync_queue_disconnect(engine->commands_to_self, NULL);
987 
988  osync_queue_free(engine->commands_from_self);
989  engine->commands_from_self = NULL;
990  osync_queue_free(engine->commands_to_self);
991  engine->commands_to_self = NULL;
992 
993  osengine_mappingtable_close(engine->maptable);
994 
995  if (engine->error) {
996  /* If the error occured during connect, we
997  * dont want to trigger a slow-sync the next
998  * time. In the case the we have a slow-sync
999  * right in the beginning, we also dont remove
1000  * the lockfile to trigger a slow-sync again
1001  * next time */
1002  if (!osync_flag_is_set(engine->cmb_connected) && !engine->slowsync)
1003  osync_group_unlock(engine->group, TRUE);
1004  else
1005  osync_group_unlock(engine->group, FALSE);
1006  } else
1007  osync_group_unlock(engine->group, TRUE);
1008 
1009  engine->is_initialized = FALSE;
1010  osync_trace(TRACE_EXIT, "osengine_finalize");
1011 }
1012 
1023 osync_bool osengine_synchronize(OSyncEngine *engine, OSyncError **error)
1024 {
1025  osync_trace(TRACE_INTERNAL, "synchronize now");
1026  osync_trace(TRACE_ENTRY, "%s(%p)", __func__, engine);
1027  g_assert(engine);
1028 
1029  if (!engine->is_initialized) {
1030  osync_error_set(error, OSYNC_ERROR_GENERIC, "osengine_synchronize: Not initialized");
1031  goto error;
1032  }
1033 
1034  /* We now remember if slow-sync is set right from the start.
1035  * If it is, we dont remove the lock file in the case of
1036  * a error during connect. */
1037  if (osync_group_get_slow_sync(engine->group, "data")) {
1038  engine->slowsync = TRUE;
1039  } else {
1040  engine->slowsync = FALSE;
1041  }
1042 
1043  engine->wasted = 0;
1044  engine->alldeciders = 0;
1045 
1046  osync_flag_set(engine->fl_running);
1047 
1048  OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_SYNCHRONIZE, 0, error);
1049  if (!message)
1050  goto error;
1051 
1052  if (!osync_queue_send_message(engine->commands_to_self, NULL, message, error))
1053  goto error_free_message;
1054 
1055  osync_message_unref(message);
1056 
1057  osync_trace(TRACE_EXIT, "%s", __func__);
1058  return TRUE;
1059 
1060 error_free_message:
1061  osync_message_unref(message);
1062 error:
1063  osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
1064  return FALSE;
1065 }
1066 
1074 void osengine_flag_only_info(OSyncEngine *engine)
1075 {
1076  osync_flag_unset(engine->fl_sync);
1077 }
1078 
1086 void osengine_flag_manual(OSyncEngine *engine)
1087 {
1088  if (engine->syncloop) {
1089  g_warning("Unable to flag manual since engine is already initialized\n");
1090  }
1091  engine->man_dispatch = TRUE;
1092 }
1093 
1100 void osengine_pause(OSyncEngine *engine)
1101 {
1102  osync_flag_unset(engine->fl_running);
1103 }
1104 
1112 void osengine_abort(OSyncEngine *engine)
1113 {
1114  osync_flag_set(engine->fl_stop);
1115 }
1116 
1123 void osengine_allow_sync_alert(OSyncEngine *engine)
1124 {
1125  engine->allow_sync_alert = TRUE;
1126 }
1127 
1134 void osengine_deny_sync_alert(OSyncEngine *engine)
1135 {
1136  engine->allow_sync_alert = FALSE;
1137 }
1138 
1149 osync_bool osengine_sync_and_block(OSyncEngine *engine, OSyncError **error)
1150 {
1151  osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, engine, error);
1152 
1153  g_mutex_lock(engine->syncing_mutex);
1154 
1155  if (!osengine_synchronize(engine, error)) {
1156  g_mutex_unlock(engine->syncing_mutex);
1157  goto error;
1158  }
1159 
1160  g_cond_wait(engine->syncing, engine->syncing_mutex);
1161  g_mutex_unlock(engine->syncing_mutex);
1162 
1163  if (engine->error) {
1164  osync_error_duplicate(error, &(engine->error));
1165  goto error;
1166  }
1167 
1168  osync_trace(TRACE_EXIT, "%s", __func__);
1169  return TRUE;
1170 
1171 error:
1172  osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
1173  return FALSE;
1174 }
1175 
1186 osync_bool osengine_wait_sync_end(OSyncEngine *engine, OSyncError **error)
1187 {
1188  g_mutex_lock(engine->syncing_mutex);
1189  g_cond_wait(engine->syncing, engine->syncing_mutex);
1190  g_mutex_unlock(engine->syncing_mutex);
1191 
1192  if (engine->error) {
1193  osync_error_duplicate(error, &(engine->error));
1194  return FALSE;
1195  }
1196  return TRUE;
1197 }
1198 
1205 void osengine_wait_info_end(OSyncEngine *engine)
1206 {
1207  g_mutex_lock(engine->info_received_mutex);
1208  g_cond_wait(engine->info_received, engine->info_received_mutex);
1209  g_mutex_unlock(engine->info_received_mutex);
1210 }
1211 
1216 void osengine_one_iteration(OSyncEngine *engine)
1217 {
1218  /*TODO: Reimplement support to stepping mode on engine */
1219  abort();//osync_queue_dispatch(engine->incoming);
1220 }
1221 
1228 OSyncMapping *osengine_mapping_from_id(OSyncEngine *engine, long long int id)
1229 {
1230  return osengine_mappingtable_mapping_from_id(engine->maptable, id);
1231 }
1232 
OSyncChangeType osync_change_get_changetype(OSyncChange *change)
Gets the changetype of a change.
void osengine_wait_info_end(OSyncEngine *engine)
This function will block until all change object information has been received.
Struct for the engine status callback.
void osengine_set_memberstatus_callback(OSyncEngine *engine, void(*function)(OSyncMemberUpdate *, void *), void *user_data)
This will set the member status handler for the given engine.
osync_bool osengine_init(OSyncEngine *engine, OSyncError **error)
This will initialize a engine.
int osync_change_get_datasize(OSyncChange *change)
Gets the size of the data of a change.
void osync_error_duplicate(OSyncError **target, OSyncError **source)
Duplicates the error into the target.
Represent a user.
void osengine_set_conflict_callback(OSyncEngine *engine, void(*function)(OSyncEngine *, OSyncMapping *, void *), void *user_data)
This will set the conflict handler for the given engine.
Represent an error.
int osync_group_num_members(OSyncGroup *group)
Counts the members of the group.
osync_bool osengine_synchronize(OSyncEngine *engine, OSyncError **error)
Starts to synchronize the given OSyncEngine.
OSyncMapping * osengine_mapping_from_id(OSyncEngine *engine, long long int id)
Searches for a mapping by its id.
OSyncObjType * osync_change_detect_objtype_full(OSyncFormatEnv *env, OSyncChange *change, OSyncError **error)
Tries to detect the encapsulated object type of the given change.
osync_bool osengine_wait_sync_end(OSyncEngine *engine, OSyncError **error)
This function will block until a synchronization has ended.
void osync_group_set_data(OSyncGroup *group, void *data)
Sets the custom data of a group.
void osync_queue_set_message_handler(OSyncQueue *queue, OSyncMessageHandler handler, gpointer user_data)
Sets the message handler for a queue.
void osengine_allow_sync_alert(OSyncEngine *engine)
Allows that the engine can be started by a member.
void osengine_flag_only_info(OSyncEngine *engine)
Sets a flag on the engine that the engine should only request the info about sync objects...
void(* OSyncMessageHandler)(OSyncMessage *message, void *user_data)
Function which can receive messages.
Represent a group of members that should be synchronized.
void osengine_pause(OSyncEngine *engine)
This will pause the engine.
OSyncUserInfo * osync_user_new(OSyncError **error)
This will create a new user.
Definition: opensync_user.c:40
void osengine_free(OSyncEngine *engine)
This will free a engine and all resources associated.
void osengine_finalize(OSyncEngine *engine)
This will finalize a engine.
Struct for the member status callback.
const char * osync_change_get_uid(OSyncChange *change)
Gets the uid of a change.
Struct for the change status callback.
void osync_error_free(OSyncError **error)
Frees the error so it can be reused.
osync_bool osync_change_has_data(OSyncChange *change)
Returns wether the complete data already has been set.
A member of a group which represent a single device.
OSyncObjFormat * osync_change_get_objformat(OSyncChange *change)
Gets the object format of a change.
void osync_group_unlock(OSyncGroup *group, osync_bool remove)
Unlocks a group.
void osync_queue_setup_with_gmainloop(OSyncQueue *queue, GMainContext *context)
Sets the queue to use the gmainloop with the given context.
void osync_group_reset_slow_sync(OSyncGroup *group, const char *objtypestr)
Reset slow-sync for this group.
void osengine_set_enginestatus_callback(OSyncEngine *engine, void(*function)(OSyncEngine *, OSyncEngineUpdate *, void *), void *user_data)
This will set the engine status handler for the given engine.
void osync_change_set_changetype(OSyncChange *change, OSyncChangeType type)
Sets the changetype of a change.
void _new_change_receiver(OSyncEngine *engine, OSyncClient *client, OSyncChange *change)
osync_bool osengine_sync_and_block(OSyncEngine *engine, OSyncError **error)
This function will synchronize once and block until the sync has finished.
const char * osync_user_get_confdir(OSyncUserInfo *user)
This will get the configdir for the given user.
Definition: opensync_user.c:93
void osync_debug(const char *subpart, int level, const char *message,...)
Used for debugging.
void osync_error_update(OSyncError **error, const char *format,...)
Updates the error message.
const char * osync_objformat_get_name(OSyncObjFormat *format)
Returns the name of a object format.
void osengine_abort(OSyncEngine *engine)
Sets a flag on the engine that the engine should do single stepping (For debugging) ...
void osengine_set_mappingstatus_callback(OSyncEngine *engine, void(*function)(OSyncMappingUpdate *, void *), void *user_data)
This will set the mapping status handler for the given engine.
A change object.
const char * osync_group_get_name(OSyncGroup *group)
Returns the name of a group.
OSyncMessageCommand osync_message_get_command(OSyncMessage *message)
Gets the command from a message.
long long int osync_member_get_id(OSyncMember *member)
Gets the unique id of a member.
void osengine_one_iteration(OSyncEngine *engine)
Does one iteration of the engine (For debugging)
void osengine_set_changestatus_callback(OSyncEngine *engine, void(*function)(OSyncEngine *, OSyncChangeUpdate *, void *), void *user_data)
This will set the change status handler for the given engine.
void osync_change_set_objtype(OSyncChange *change, OSyncObjType *type)
Sets the object type of a change.
void osync_change_set_member(OSyncChange *change, OSyncMember *member)
Sets the member of a change.
void osync_error_set(OSyncError **error, OSyncErrorType type, const char *format,...)
Sets the error.
const char * osync_error_print(OSyncError **error)
Returns the message of the error.
Struct for the mapping status callback.
void osengine_set_message_callback(OSyncEngine *engine, void *(*function)(OSyncEngine *, OSyncClient *, const char *, void *, void *), void *user_data)
This will set the callback handler for a custom message.
void osync_trace(OSyncTraceType type, const char *message,...)
Used for tracing the application.
osync_bool osync_group_get_slow_sync(OSyncGroup *group, const char *objtype)
Returns if the group will perform a slow-sync for the object type.
osync_bool osync_change_convert_to_common(OSyncChange *change, OSyncError **error)
Convert a change to the specified common format.
const char * osync_objtype_get_name(OSyncObjType *type)
Returns the name of a object type.
osync_bool osync_group_remove_changelog(OSyncGroup *group, OSyncChange *change, OSyncError **error)
Removes a change from the changelog.
OSyncObjType * osync_change_get_objtype(OSyncChange *change)
Gets the object type of a change.
Represent a abstract object type (like "contact")
osync_bool osengine_reset(OSyncEngine *engine, OSyncError **error)
This will reset the engine to its initial state.
OSyncLockState osync_group_lock(OSyncGroup *group)
Locks a group.
OSyncEngine * osengine_new(OSyncGroup *group, OSyncError **error)
This will create a new engine for the given group.
Represent a format for a object type.
OSyncChangeType
The changetypes of a change object.
Definition: opensync.h:28
The environment used for conversions.
OSyncMember * osync_group_nth_member(OSyncGroup *group, int nth)
Returns the nth member of the group.
void osync_group_set_slow_sync(OSyncGroup *group, const char *objtypestr, osync_bool slow_sync)
Sets if the group requires slow-sync for the given object type.
OSyncMessage * osync_message_new(OSyncMessageCommand cmd, int size, OSyncError **error)
A Message used by the inter thread messaging library.
OSyncFormatEnv * osync_group_get_format_env(OSyncGroup *group)
Returns the format environment of a group.
char * osync_change_get_data(OSyncChange *change)
Gets the data of a change.
osync_bool osync_change_save(OSyncChange *change, osync_bool save_format, OSyncError **error)
This will save a change into the database.
void osengine_deny_sync_alert(OSyncEngine *engine)
Do not allow that the engine can be started by a member.
void osengine_flag_manual(OSyncEngine *engine)
Sets a flag on the engine that the engine should do single stepping (For debugging) ...