24 #include <opensync/opensync_support.h> 25 #include "opensync/opensync_format_internals.h" 26 #include "opensync/opensync_member_internals.h" 27 #include "opensync/opensync_message_internals.h" 28 #include "opensync/opensync_queue_internals.h" 30 #include "engine_internals.h" 33 #include <sys/types.h> 43 void _get_changes_reply_receiver(
OSyncMessage *message, OSyncClient *sender)
46 OSyncEngine *engine = sender->engine;
50 osync_demarshal_error(message, &error);
55 osync_flag_unset(sender->fl_sent_changes);
57 osync_flag_set(sender->fl_done);
63 osync_flag_set(engine->fl_stop);
67 osync_flag_set(sender->fl_sent_changes);
70 osengine_client_decider(engine, sender);
79 void _connect_reply_receiver(
OSyncMessage *message, OSyncClient *sender)
84 OSyncEngine *engine = sender->engine;
88 osync_demarshal_error(message, &error);
93 osync_flag_unset(sender->fl_connected);
94 osync_flag_set(sender->fl_finished);
95 osync_flag_set(sender->fl_sent_changes);
96 osync_flag_set(sender->fl_done);
102 osync_flag_set(engine->fl_stop);
105 osync_member_read_sink_info(sender->member, message);
108 osync_flag_set(sender->fl_connected);
111 osengine_client_decider(engine, sender);
115 void _sync_done_reply_receiver(
OSyncMessage *message, OSyncClient *sender)
119 OSyncEngine *engine = sender->engine;
123 osync_demarshal_error(message, &error);
127 osync_error_update(&engine->error,
"Unable to finish the sync for one of the members");
130 osync_flag_set(sender->fl_done);
131 osengine_client_decider(engine, sender);
135 void _committed_all_reply_receiver(
OSyncMessage *message, OSyncClient *sender)
139 OSyncEngine *engine = sender->engine;
143 osync_demarshal_error(message, &error);
151 osync_flag_set(sender->fl_committed_all);
152 osengine_client_decider(engine, sender);
156 void _disconnect_reply_receiver(
OSyncMessage *message, OSyncClient *sender)
160 OSyncEngine *engine = sender->engine;
164 osync_demarshal_error(message, &error);
170 osync_flag_unset(sender->fl_connected);
171 osync_flag_set(sender->fl_finished);
172 osengine_client_decider(engine, sender);
176 void _get_change_data_reply_receiver(
OSyncMessage *message, OSyncMappingEntry *entry)
179 OSyncEngine *engine = entry->client->engine;
183 osync_demarshal_error(message, &error);
193 osync_demarshal_changedata(message, entry->change);
195 osync_flag_set(entry->fl_has_data);
196 osync_status_update_change(engine, entry->change,
CHANGE_RECEIVED, NULL);
200 osengine_mappingentry_decider(engine, entry);
204 void _read_change_reply_receiver(OSyncClient *sender,
OSyncMessage *message, OSyncEngine *engine)
230 void _commit_change_reply_receiver(
OSyncMessage *message, OSyncMappingEntry *entry)
233 OSyncEngine *engine = entry->client->engine;
237 osync_demarshal_error(message, &error);
247 osync_flag_unset(entry->fl_dirty);
248 osync_flag_set(entry->fl_synced);
255 osync_message_read_string(message, &newuid);
258 osync_status_update_change(engine, entry->change,
CHANGE_SENT, NULL);
259 osync_flag_unset(entry->fl_dirty);
260 osync_flag_set(entry->fl_synced);
264 osync_flag_set(entry->fl_deleted);
271 osengine_mappingentry_decider(engine, entry);
282 client->member = member;
284 client->engine = engine;
285 engine->clients = g_list_append(engine->clients, client);
295 if (!client->commands_to_osplugin || !client->commands_from_osplugin)
296 goto error_free_client;
298 client->fl_connected = osync_flag_new(engine->cmb_connected);
299 client->fl_sent_changes = osync_flag_new(engine->cmb_sent_changes);
300 client->fl_done = osync_flag_new(NULL);
301 client->fl_committed_all = osync_flag_new(engine->cmb_committed_all_sent);
302 client->fl_finished = osync_flag_new(engine->cmb_finished);
314 void osync_client_reset(OSyncClient *client)
317 osync_flag_set_state(client->fl_connected, FALSE);
318 osync_flag_set_state(client->fl_sent_changes, FALSE);
319 osync_flag_set_state(client->fl_done, FALSE);
320 osync_flag_set_state(client->fl_finished, FALSE);
321 osync_flag_set_state(client->fl_committed_all, FALSE);
325 void osync_client_free(OSyncClient *client)
328 osync_queue_free(client->commands_to_osplugin);
329 osync_queue_free(client->commands_from_osplugin);
331 osync_flag_free(client->fl_connected);
332 osync_flag_free(client->fl_sent_changes);
333 osync_flag_free(client->fl_done);
334 osync_flag_free(client->fl_finished);
335 osync_flag_free(client->fl_committed_all);
341 void *osync_client_message_sink(
OSyncMember *member,
const char *name,
void *data, osync_bool synchronous)
344 OSyncEngine *engine = client->engine;
353 return engine->plgmsg_callback(engine, client, name, data, engine->plgmsg_userdata);
362 void osync_client_call_plugin(OSyncClient *client,
char *
function,
void *data, OSyncPluginReplyHandler replyhandler,
void *userdata)
364 osync_trace(
TRACE_ENTRY,
"%s(%p, %s, %p, %p, %p)", __func__, client,
function, data, replyhandler, userdata);
386 osync_bool osync_client_get_changes(OSyncClient *target, OSyncEngine *sender,
OSyncError **error)
390 osync_flag_changing(target->fl_sent_changes);
398 osync_member_write_sink_info(target->member, message);
401 if (!osync_queue_send_message_with_timeout(target->commands_to_osplugin, target->commands_from_osplugin, message, timeouts.
get_changeinfo_timeout, error))
402 goto error_free_message;
404 osync_message_unref(message);
410 osync_message_unref(message);
416 osync_bool osync_client_get_change_data(OSyncClient *target, OSyncEngine *sender, OSyncMappingEntry *entry,
OSyncError **error)
418 osync_flag_changing(entry->fl_has_data);
426 osync_marshal_change(message, entry->change);
428 osync_debug(
"ENG", 3,
"Sending get_changedata message %p to client %p", message, entry->client);
431 if (!osync_queue_send_message_with_timeout(target->commands_to_osplugin, target->commands_from_osplugin, message, timeouts.
get_data_timeout, error))
432 goto error_free_message;
434 osync_message_unref(message);
440 osync_message_unref(message);
459 osync_bool osync_client_connect(OSyncClient *target, OSyncEngine *sender,
OSyncError **error)
463 osync_flag_changing(target->fl_connected);
469 osync_member_write_sink_info(target->member, message);
474 if (!osync_queue_send_message_with_timeout(target->commands_to_osplugin, target->commands_from_osplugin, message, timeouts.
connect_timeout, error))
475 goto error_free_message;
477 osync_message_unref(message);
483 osync_message_unref(message);
489 osync_bool osync_client_commit_change(OSyncClient *target, OSyncEngine *sender, OSyncMappingEntry *entry,
OSyncError **error)
492 osync_trace(
TRACE_INTERNAL,
"Committing change with uid %s, changetype %i, data %p, size %i, objtype %s and format %s from member %lli",
osync_change_get_uid(entry->change),
osync_change_get_changetype(entry->change),
osync_change_get_data(entry->change),
osync_change_get_datasize(entry->change),
osync_change_get_objtype(entry->change) ?
osync_objtype_get_name(
osync_change_get_objtype(entry->change)) :
"None",
osync_change_get_objformat(entry->change) ?
osync_objformat_get_name(
osync_change_get_objformat(entry->change)) :
"None",
osync_member_get_id(entry->client->member));
494 osync_flag_changing(entry->fl_dirty);
503 OSyncMappingView *view = osengine_mappingtable_find_view(sender->maptable, target->member);
504 while (!osengine_mappingview_uid_is_unique(view, entry, TRUE)) {
505 if (!osync_change_elevate(sender, entry->change, 1))
521 osync_marshal_change(message, entry->change);
526 if (!osync_queue_send_message_with_timeout(target->commands_to_osplugin, target->commands_from_osplugin, message, timeouts.
commit_timeout, error))
527 goto error_free_message;
529 osync_message_unref(message);
531 g_assert(osync_flag_is_attached(entry->fl_committed) == TRUE);
532 osync_flag_detach(entry->fl_committed);
538 osync_message_unref(message);
544 osync_bool osync_client_sync_done(OSyncClient *target, OSyncEngine *sender,
OSyncError **error)
548 osync_flag_changing(target->fl_done);
556 if (!osync_queue_send_message_with_timeout(target->commands_to_osplugin, target->commands_from_osplugin, message, timeouts.
sync_done_timeout, error))
557 goto error_free_message;
559 osync_message_unref(message);
565 osync_message_unref(message);
571 osync_bool osync_client_committed_all(OSyncClient *target, OSyncEngine *sender,
OSyncError **error)
575 osync_flag_changing(target->fl_committed_all);
585 if (!osync_queue_send_message(target->commands_to_osplugin, target->commands_from_osplugin, message, error))
586 goto error_free_message;
588 osync_message_unref(message);
594 osync_message_unref(message);
600 osync_bool osync_client_disconnect(OSyncClient *target, OSyncEngine *sender,
OSyncError **error)
604 osync_flag_changing(target->fl_connected);
613 if (!osync_queue_send_message_with_timeout(target->commands_to_osplugin, target->commands_from_osplugin, message, timeouts.
disconnect_timeout, error))
614 goto error_free_message;
616 osync_message_unref(message);
622 osync_message_unref(message);
640 char *osync_client_pid_filename(OSyncClient *client)
642 return g_strdup_printf(
"%s/osplugin.pid", client->member->configdir);
645 osync_bool osync_client_remove_pidfile(OSyncClient *client,
OSyncError **error)
647 osync_bool ret = FALSE;
648 char *pidpath = osync_client_pid_filename(client);
650 if (unlink(pidpath) < 0) {
651 osync_error_set(error, OSYNC_ERROR_GENERIC,
"Couldn't remove pid file: %s", strerror(errno));
664 osync_bool osync_client_create_pidfile(OSyncClient *client,
OSyncError **error)
666 osync_bool ret = FALSE;
667 char *pidpath = osync_client_pid_filename(client);
668 char *pidstr = g_strdup_printf(
"%ld", (
long)client->child_pid);
671 goto out_free_pidstr;
684 osync_bool osync_client_kill_old_osplugin(OSyncClient *client,
OSyncError **error)
686 osync_bool ret = FALSE;
692 char *pidpath = osync_client_pid_filename(client);
695 if (!g_file_test(pidpath, G_FILE_TEST_EXISTS)) {
709 if (kill(pid, SIGTERM) < 0) {
715 while (osync_queue_is_alive(client->commands_to_osplugin)) {
726 if (unlink(pidpath) < 0) {
727 osync_error_set(error, OSYNC_ERROR_GENERIC,
"Couldn't erase PID file: %s", strerror(errno));
743 osync_bool osync_client_spawn(OSyncClient *client, OSyncEngine *engine,
OSyncError **error)
749 if (!osync_client_kill_old_osplugin(client, error))
752 if (!osync_queue_exists(client->commands_to_osplugin) || !osync_queue_is_alive(client->commands_to_osplugin)) {
763 setenv(
"OSYNC_MODULE_LIST", path, 1);
770 if (errno == ENOENT) {
778 client->child_pid = cpid;
781 while (!osync_queue_exists(client->commands_to_osplugin) && waiting <= 5) {
791 if (client->child_pid) {
792 if (!osync_client_create_pidfile(client, error))
796 if (!osync_queue_connect(client->commands_to_osplugin, OSYNC_QUEUE_SENDER, error))
801 goto error_disconnect;
803 osync_message_write_string(message, client->commands_from_osplugin->name);
805 if (!osync_queue_send_message(client->commands_to_osplugin, NULL, message, error))
806 goto error_free_message;
808 osync_message_unref(message);
814 osync_message_unref(message);
816 osync_queue_disconnect(client->commands_to_osplugin, NULL);
822 osync_bool osync_client_init(OSyncClient *client, OSyncEngine *engine,
OSyncError **error)
829 if (reply->
cmd == OSYNC_MESSAGE_ERRORREPLY) {
831 osync_demarshal_error(reply, error);
832 goto error_free_reply;
835 if (reply->
cmd != OSYNC_MESSAGE_REPLY) {
836 osync_error_set(error, OSYNC_ERROR_GENERIC,
"Invalid answer from plugin process");
837 goto error_free_reply;
840 osync_message_unref(reply);
846 osync_message_unref(reply);
851 osync_bool osync_client_finalize(OSyncClient *client,
OSyncError **error)
859 if (!osync_queue_send_message(client->commands_to_osplugin, NULL, message, error))
860 goto error_free_message;
862 osync_message_unref(message);
864 if (client->child_pid) {
866 if (waitpid(client->child_pid, &status, 0) == -1) {
867 osync_error_set(error, OSYNC_ERROR_GENERIC,
"Error waiting for osplugin process: %s", strerror(errno));
871 if (!WIFEXITED(status))
873 else if (WEXITSTATUS(status) != 0)
876 if (!osync_client_remove_pidfile(client, error))
880 osync_queue_disconnect(client->commands_to_osplugin, NULL);
887 osync_message_unref(message);
OSyncChangeType osync_change_get_changetype(OSyncChange *change)
Gets the changetype of a change.
unsigned int get_data_timeout
OSyncPluginTimeouts osync_plugin_get_timeouts(OSyncPlugin *plugin)
Returns the timeouts of the plugin.
int osync_change_get_datasize(OSyncChange *change)
Gets the size of the data of a change.
void osync_env_export_loaded_modules(OSyncEnv *env)
void osync_error_duplicate(OSyncError **target, OSyncError **source)
Duplicates the error into the target.
void osync_env_export_all_options(OSyncEnv *env)
void osync_change_set_uid(OSyncChange *change, const char *uid)
Sets the uid of a change.
OSyncEnv * osync_group_get_env(OSyncGroup *group)
Returns the environment in which a group is registered.
Represent a synchronzation plugin.
void(* OSyncMessageHandler)(OSyncMessage *message, void *user_data)
Function which can receive messages.
void osync_message_set_handler(OSyncMessage *message, OSyncMessageHandler handler, gpointer user_data)
Sets the handler that will receive the reply.
osync_bool osync_file_write(const char *filename, const char *data, int size, int mode, OSyncError **oserror)
Writes data to a file.
unsigned int connect_timeout
const char * osync_change_get_uid(OSyncChange *change)
Gets the uid of a change.
A member of a group which represent a single device.
void osync_member_set_data(OSyncMember *member, void *data)
Sets the custom data on a member.
OSyncObjFormat * osync_change_get_objformat(OSyncChange *change)
Gets the object format of a change.
OSyncQueue * osync_queue_new(const char *name, OSyncError **error)
Creates a new asynchronous queue.
unsigned int sync_done_timeout
void * osync_member_get_data(OSyncMember *member)
Returns the custom data of a member.
void * osync_try_malloc0(unsigned int size, OSyncError **error)
Safely tries to malloc memory.
void osync_debug(const char *subpart, int level, const char *message,...)
Used for debugging.
unsigned int disconnect_timeout
void osync_error_update(OSyncError **error, const char *format,...)
Updates the error message.
const char * osync_objformat_get_name(OSyncObjFormat *format)
Returns the name of a object format.
osync_bool osync_file_read(const char *filename, char **data, int *size, OSyncError **oserror)
Reads a file.
unsigned int get_changeinfo_timeout
long long int osync_member_get_id(OSyncMember *member)
Gets the unique id of a member.
void osync_change_reset(OSyncChange *change)
Resets a change.
const char * osync_plugin_get_path(OSyncPlugin *plugin)
Get full path for plugin module.
OSyncPlugin * osync_member_get_plugin(OSyncMember *member)
Returns the plugin of member.
void osync_trace_reset_indent(void)
void osync_error_set(OSyncError **error, OSyncErrorType type, const char *format,...)
Sets the error.
gboolean osync_message_is_error(OSyncMessage *message)
Checks if the message is a error.
const char * osync_error_print(OSyncError **error)
Returns the message of the error.
void osync_trace(OSyncTraceType type, const char *message,...)
Used for tracing the application.
osync_bool osync_change_convert_member_sink(OSyncFormatEnv *env, OSyncChange *change, OSyncMember *member, OSyncError **error)
Convert a change to the nearest sink on a member.
const char * osync_member_get_configdir(OSyncMember *member)
Returns the configuration directory where this member is stored.
const char * osync_objtype_get_name(OSyncObjType *type)
Returns the name of a object type.
OSyncObjType * osync_change_get_objtype(OSyncChange *change)
Gets the object type of a change.
unsigned int commit_timeout
OSyncMessage * osync_queue_get_message(OSyncQueue *queue)
OSyncMessage * osync_message_new(OSyncMessageCommand cmd, int size, OSyncError **error)
A Message used by the inter thread messaging library.
const char * osync_group_get_configdir(OSyncGroup *group)
Returns the configdir for the group.
OSyncFormatEnv * osync_group_get_format_env(OSyncGroup *group)
Returns the format environment of a group.
char * osync_change_get_data(OSyncChange *change)
Gets the data of a change.
osync_bool osync_change_save(OSyncChange *change, osync_bool save_format, OSyncError **error)
This will save a change into the database.
The timeouts for the asynchronous functions of a plugin.