diff --git a/include/crm/cib.h b/include/crm/cib.h index 820ef1769a..ae3b3d349e 100644 --- a/include/crm/cib.h +++ b/include/crm/cib.h @@ -1,315 +1,317 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef CIB__H #define CIB__H #include #include #include #include #define CIB_FEATURE_SET "2.0" #define USE_PESKY_FRAGMENTS 1 /* use compare_version() for doing comparisons */ enum cib_variant { + cib_undefined, cib_native, cib_file, cib_remote, cib_database, cib_edir }; enum cib_state { cib_connected_command, cib_connected_query, cib_disconnected }; enum cib_conn_type { cib_command, cib_query, cib_query_synchronous, cib_command_synchronous, cib_no_connection }; enum cib_call_options { cib_none = 0x00000000, cib_verbose = 0x00000001, cib_discard_reply = 0x00000010, cib_scope_local = 0x00000100, cib_sync_call = 0x00001000, cib_inhibit_notify = 0x00010000, cib_quorum_override = 0x00100000, cib_inhibit_bcast = 0x01000000, cib_force_diff = 0x10000000 }; #define cib_default_options = cib_none enum cib_errors { cib_ok = 0, cib_operation = -1, cib_create_msg = -2, cib_not_connected = -3, cib_not_authorized = -4, cib_send_failed = -5, cib_reply_failed = -6, cib_return_code = -7, cib_output_ptr = -8, cib_output_data = -9, cib_connection = -10, cib_authentication = -11, cib_missing = -12, cib_variant = -28, CIBRES_MISSING_ID = -13, CIBRES_MISSING_TYPE = -14, CIBRES_MISSING_FIELD = -15, CIBRES_OBJTYPE_MISMATCH = -16, CIBRES_CORRUPT = -17, CIBRES_OTHER = -18, cib_unknown = -19, cib_STALE = -20, cib_EXISTS = -21, cib_NOTEXISTS = -22, cib_ACTIVATION = -23, cib_NOSECTION = -24, cib_NOOBJECT = -25, cib_NOPARENT = -26, cib_NODECOPY = -27, cib_NOTSUPPORTED = -29, cib_registration_msg = -30, cib_callback_token = -31, cib_callback_register = -32, cib_msg_field_add = -33, cib_client_gone = -34, cib_not_master = -35, cib_client_corrupt = -36, cib_master_timeout = -37, cib_revision_unsupported= -38, cib_revision_unknown = -39, cib_missing_data = -40, cib_remote_timeout = -41, cib_no_quorum = -42, cib_diff_failed = -43, cib_diff_resync = -44, cib_old_data = -45, cib_id_check = -46, cib_dtd_validation = -47, cib_bad_section = -48, cib_bad_digest = -49, cib_bad_permissions = -50, cib_bad_config = -51, cib_invalid_argument = -52 }; enum cib_update_op { CIB_UPDATE_OP_NONE = 0, CIB_UPDATE_OP_ADD, CIB_UPDATE_OP_MODIFY, CIB_UPDATE_OP_DELETE, CIB_UPDATE_OP_MAX }; enum cib_section { cib_section_none, cib_section_all, cib_section_nodes, cib_section_constraints, cib_section_resources, cib_section_crmconfig, cib_section_status }; #define CIB_OP_SLAVE "cib_slave" #define CIB_OP_SLAVEALL "cib_slave_all" #define CIB_OP_MASTER "cib_master" #define CIB_OP_SYNC "cib_sync" #define CIB_OP_SYNC_ONE "cib_sync_one" #define CIB_OP_ISMASTER "cib_ismaster" #define CIB_OP_BUMP "cib_bump" #define CIB_OP_QUERY "cib_query" #define CIB_OP_CREATE "cib_create" #define CIB_OP_UPDATE "cib_update" #define CIB_OP_MODIFY "cib_modify" #define CIB_OP_DELETE "cib_delete" #define CIB_OP_DELETE_ALT "cib_delete_alt" #define CIB_OP_ERASE "cib_erase" #define CIB_OP_REPLACE "cib_replace" #define CIB_OP_NOTIFY "cib_notify" #define CIB_OP_APPLY_DIFF "cib_apply_diff" #define F_CIB_CLIENTID "cib_clientid" #define F_CIB_CALLOPTS "cib_callopt" #define F_CIB_CALLID "cib_callid" #define F_CIB_CALLDATA "cib_calldata" #define F_CIB_OPERATION "cib_op" #define F_CIB_ISREPLY "cib_isreplyto" #define F_CIB_SECTION "cib_section" #define F_CIB_HOST "cib_host" #define F_CIB_RC "cib_rc" #define F_CIB_DELEGATED "cib_delegated_from" #define F_CIB_OBJID "cib_object" #define F_CIB_OBJTYPE "cib_object_type" #define F_CIB_EXISTING "cib_existing_object" #define F_CIB_SEENCOUNT "cib_seen" #define F_CIB_TIMEOUT "cib_timeout" #define F_CIB_UPDATE "cib_update" #define F_CIB_CALLBACK_TOKEN "cib_callback_token" #define F_CIB_GLOBAL_UPDATE "cib_update" #define F_CIB_UPDATE_RESULT "cib_update_result" #define F_CIB_CLIENTNAME "cib_clientname" #define F_CIB_NOTIFY_TYPE "cib_notify_type" #define F_CIB_NOTIFY_ACTIVATE "cib_notify_activate" #define F_CIB_UPDATE_DIFF "cib_update_diff" #define T_CIB "cib" #define T_CIB_NOTIFY "cib_notify" /* notify sub-types */ #define T_CIB_PRE_NOTIFY "cib_pre_notify" #define T_CIB_POST_NOTIFY "cib_post_notify" #define T_CIB_UPDATE_CONFIRM "cib_update_confirmation" #define T_CIB_DIFF_NOTIFY "cib_diff_notify" #define T_CIB_REPLACE_NOTIFY "cib_refresh_notify" #define cib_channel_ro "cib_ro" #define cib_channel_rw "cib_rw" #define cib_channel_callback "cib_callback" #define cib_channel_ro_synchronous "cib_ro_syncronous" #define cib_channel_rw_synchronous "cib_rw_syncronous" typedef struct cib_s cib_t; typedef struct cib_api_operations_s { int (*variant_op)( cib_t *cib, const char *op, const char *host, const char *section, xmlNode *data, xmlNode **output_data, int call_options); int (*signon) ( cib_t *cib, const char *name, enum cib_conn_type type); int (*signoff)(cib_t *cib); int (*free) (cib_t *cib); int (*set_op_callback)( cib_t *cib, void (*callback)( const xmlNode *msg, int callid , int rc, xmlNode *output)); int (*add_notify_callback)( cib_t *cib, const char *event, void (*callback)( const char *event, xmlNode *msg)); int (*del_notify_callback)( cib_t *cib, const char *event, void (*callback)( const char *event, xmlNode *msg)); int (*set_connection_dnotify)( cib_t *cib, void (*dnotify)(gpointer user_data)); IPC_Channel *(*channel)(cib_t* cib); int (*inputfd)(cib_t* cib); int (*noop)(cib_t *cib, int call_options); int (*ping)( cib_t *cib, xmlNode **output_data, int call_options); int (*query)(cib_t *cib, const char *section, xmlNode **output_data, int call_options); int (*query_from)( cib_t *cib, const char *host, const char *section, xmlNode **output_data, int call_options); int (*is_master) (cib_t *cib); int (*set_master)(cib_t *cib, int call_options); int (*set_slave) (cib_t *cib, int call_options); int (*set_slave_all)(cib_t *cib, int call_options); int (*sync)(cib_t *cib, const char *section, int call_options); int (*sync_from)( cib_t *cib, const char *host, const char *section, int call_options); int (*bump_epoch)(cib_t *cib, int call_options); int (*create)(cib_t *cib, const char *section, xmlNode *data, xmlNode **output_data, int call_options); int (*modify)(cib_t *cib, const char *section, xmlNode *data, xmlNode **output_data, int call_options); int (*update)(cib_t *cib, const char *section, xmlNode *data, xmlNode **output_data, int call_options); int (*replace)(cib_t *cib, const char *section, xmlNode *data, xmlNode **output_data, int call_options); int (*delete)(cib_t *cib, const char *section, xmlNode *data, xmlNode **output_data, int call_options); int (*delete_absolute)( cib_t *cib, const char *section, xmlNode *data, xmlNode **output_data, int call_options); int (*erase)( cib_t *cib, xmlNode **output_data, int call_options); int (*quit)(cib_t *cib, int call_options); gboolean (*msgready)(cib_t* cib); int (*rcvmsg)(cib_t* cib, int blocking); gboolean (*dispatch)(IPC_Channel *channel, gpointer user_data); int (*register_callback)( cib_t* cib, const char *callback, int enabled); } cib_api_operations_t; struct cib_s { enum cib_state state; enum cib_conn_type type; enum cib_variant variant; int call_id; int call_timeout; void *variant_opaque; GList *notify_list; void (*op_callback)(const xmlNode *msg, int call_id, int rc, xmlNode *output); cib_api_operations_t *cmds; }; /* Core functions */ extern cib_t *cib_new(void); -extern cib_t *cib_new_variant(enum cib_variant variant); +extern cib_t *cib_native_new(void); +extern cib_t *cib_file_new(const char *filename); extern void cib_delete(cib_t *cib); extern int num_cib_op_callbacks(void); extern void remove_cib_op_callback(int call_id, gboolean all_callbacks); extern gboolean add_cib_op_callback_timeout( int call_id, int timeout, gboolean only_success, void *user_data, void (*callback)(xmlNode*, int, int, xmlNode*,void*)); #define add_cib_op_callback(id, flag, data, fn) add_cib_op_callback_timeout(id, 0, flag, data, fn) #include #include #endif diff --git a/lib/crm/cib/cib_client.c b/lib/crm/cib/cib_client.c index 3f2d4faab9..c6cfca273e 100644 --- a/lib/crm/cib/cib_client.c +++ b/lib/crm/cib/cib_client.c @@ -1,515 +1,499 @@ /* * Copyright (c) 2004 Andrew Beekhof * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include GHashTable *cib_op_callback_table = NULL; int cib_client_set_op_callback( cib_t *cib, void (*callback)(const xmlNode *msg, int call_id, int rc, xmlNode *output)); int cib_client_add_notify_callback( cib_t *cib, const char *event, void (*callback)( const char *event, xmlNode *msg)); int cib_client_del_notify_callback( cib_t *cib, const char *event, void (*callback)( const char *event, xmlNode *msg)); gint ciblib_GCompareFunc(gconstpointer a, gconstpointer b); -extern cib_t *cib_native_new(cib_t *cib); -extern cib_t *cib_file_new(cib_t *cib, const char *filename); - #define op_common(cib) \ if(cib == NULL) { \ return cib_missing; \ } else if(cib->cmds->variant_op == NULL) { \ return cib_variant; \ } static int cib_client_noop(cib_t *cib, int call_options) { op_common(cib) return cib->cmds->variant_op( cib, CRM_OP_NOOP, NULL, NULL, NULL, NULL, call_options); } static int cib_client_ping(cib_t *cib, xmlNode **output_data, int call_options) { op_common(cib) return cib->cmds->variant_op( cib, CRM_OP_PING, NULL,NULL,NULL, output_data, call_options); } static int cib_client_query(cib_t *cib, const char *section, xmlNode **output_data, int call_options) { return cib->cmds->query_from( cib, NULL, section, output_data, call_options); } static int cib_client_query_from(cib_t *cib, const char *host, const char *section, xmlNode **output_data, int call_options) { op_common(cib) return cib->cmds->variant_op(cib, CIB_OP_QUERY, host, section, NULL, output_data, call_options); } static int cib_client_is_master(cib_t *cib) { op_common(cib) return cib->cmds->variant_op( cib, CIB_OP_ISMASTER, NULL, NULL,NULL,NULL, cib_scope_local|cib_sync_call); } static int cib_client_set_slave(cib_t *cib, int call_options) { op_common(cib) return cib->cmds->variant_op( cib, CIB_OP_SLAVE, NULL,NULL,NULL,NULL, call_options); } static int cib_client_set_slave_all(cib_t *cib, int call_options) { op_common(cib) return cib->cmds->variant_op( cib, CIB_OP_SLAVEALL, NULL,NULL,NULL,NULL, call_options); } static int cib_client_set_master(cib_t *cib, int call_options) { op_common(cib) crm_debug_3("Adding cib_scope_local to options"); return cib->cmds->variant_op( cib, CIB_OP_MASTER, NULL,NULL,NULL,NULL, call_options|cib_scope_local); } static int cib_client_bump_epoch(cib_t *cib, int call_options) { op_common(cib) return cib->cmds->variant_op( cib, CIB_OP_BUMP, NULL, NULL, NULL, NULL, call_options); } static int cib_client_sync(cib_t *cib, const char *section, int call_options) { return cib->cmds->sync_from(cib, NULL, section, call_options); } static int cib_client_sync_from( cib_t *cib, const char *host, const char *section, int call_options) { op_common(cib) return cib->cmds->variant_op( cib, CIB_OP_SYNC, host, section, NULL, NULL, call_options); } static int cib_client_create(cib_t *cib, const char *section, xmlNode *data, xmlNode **output_data, int call_options) { op_common(cib) return cib->cmds->variant_op(cib, CIB_OP_CREATE, NULL, section, data, output_data, call_options); } static int cib_client_modify(cib_t *cib, const char *section, xmlNode *data, xmlNode **output_data, int call_options) { op_common(cib) return cib->cmds->variant_op(cib, CIB_OP_MODIFY, NULL, section, data, output_data, call_options); } static int cib_client_update(cib_t *cib, const char *section, xmlNode *data, xmlNode **output_data, int call_options) { op_common(cib) return cib->cmds->variant_op(cib, CIB_OP_UPDATE, NULL, section, data, output_data, call_options); } static int cib_client_replace(cib_t *cib, const char *section, xmlNode *data, xmlNode **output_data, int call_options) { op_common(cib) return cib->cmds->variant_op(cib, CIB_OP_REPLACE, NULL, section, data, output_data, call_options); } static int cib_client_delete(cib_t *cib, const char *section, xmlNode *data, xmlNode **output_data, int call_options) { op_common(cib) return cib->cmds->variant_op(cib, CIB_OP_DELETE, NULL, section, data, output_data, call_options); } static int cib_client_delete_absolute( cib_t *cib, const char *section, xmlNode *data, xmlNode **output_data, int call_options) { op_common(cib) return cib->cmds->variant_op(cib, CIB_OP_DELETE_ALT, NULL, section, data, output_data, call_options); } static int cib_client_erase( cib_t *cib, xmlNode **output_data, int call_options) { op_common(cib) return cib->cmds->variant_op(cib, CIB_OP_ERASE, NULL, NULL, NULL, output_data, call_options); } static int cib_client_quit(cib_t *cib, int call_options) { op_common(cib) return cib->cmds->variant_op( cib, CRM_OP_QUIT, NULL, NULL, NULL, NULL, call_options); } static void cib_destroy_op_callback(gpointer data) { cib_callback_client_t *blob = data; if(blob->timer && blob->timer->ref > 0) { g_source_remove(blob->timer->ref); } crm_free(blob); } cib_t* cib_new(void) { - if(getenv("CIB_file")) { - return cib_new_variant(cib_file); + const char *file = getenv("CIB_file"); + if(file) { + return cib_file_new(file); } - return cib_new_variant(cib_native); + return cib_native_new(); } /* this is backwards... cib_*_new should call this not the other way around */ cib_t* -cib_new_variant(enum cib_variant variant) +cib_new_variant(void) { cib_t* new_cib = NULL; crm_malloc0(new_cib, sizeof(cib_t)); if(cib_op_callback_table != NULL) { g_hash_table_destroy(cib_op_callback_table); cib_op_callback_table = NULL; } if(cib_op_callback_table == NULL) { cib_op_callback_table = g_hash_table_new_full( g_direct_hash, g_direct_equal, NULL, cib_destroy_op_callback); } new_cib->call_id = 1; + new_cib->variant = cib_undefined; new_cib->type = cib_none; new_cib->state = cib_disconnected; - new_cib->variant = variant; new_cib->op_callback = NULL; new_cib->variant_opaque = NULL; new_cib->notify_list = NULL; /* the rest will get filled in by the variant constructor */ crm_malloc0(new_cib->cmds, sizeof(cib_api_operations_t)); new_cib->cmds->set_op_callback = cib_client_set_op_callback; new_cib->cmds->add_notify_callback = cib_client_add_notify_callback; new_cib->cmds->del_notify_callback = cib_client_del_notify_callback; new_cib->cmds->noop = cib_client_noop; new_cib->cmds->ping = cib_client_ping; new_cib->cmds->query = cib_client_query; new_cib->cmds->sync = cib_client_sync; new_cib->cmds->query_from = cib_client_query_from; new_cib->cmds->sync_from = cib_client_sync_from; new_cib->cmds->is_master = cib_client_is_master; new_cib->cmds->set_master = cib_client_set_master; new_cib->cmds->set_slave = cib_client_set_slave; new_cib->cmds->set_slave_all = cib_client_set_slave_all; new_cib->cmds->bump_epoch = cib_client_bump_epoch; new_cib->cmds->create = cib_client_create; new_cib->cmds->modify = cib_client_modify; new_cib->cmds->update = cib_client_update; new_cib->cmds->replace = cib_client_replace; new_cib->cmds->delete = cib_client_delete; new_cib->cmds->erase = cib_client_erase; new_cib->cmds->quit = cib_client_quit; new_cib->cmds->delete_absolute = cib_client_delete_absolute; - switch(variant) { - case cib_native: - cib_native_new(new_cib); - break; - - case cib_file: - cib_file_new(new_cib, NULL); - break; - default: - crm_err("Only the native CIB type is currently implemented"); - crm_free(new_cib); - return NULL; - } - return new_cib; } void cib_delete(cib_t *cib) { GList *list = cib->notify_list; while(list != NULL) { cib_notify_client_t *client = g_list_nth_data(list, 0); list = g_list_remove(list, client); crm_free(client); } g_hash_table_destroy(cib_op_callback_table); cib->cmds->free(cib); cib = NULL; } int cib_client_set_op_callback( cib_t *cib, void (*callback)(const xmlNode *msg, int call_id, int rc, xmlNode *output)) { if(callback == NULL) { crm_info("Un-Setting operation callback"); } else { crm_debug_3("Setting operation callback"); } cib->op_callback = callback; return cib_ok; } int cib_client_add_notify_callback( cib_t *cib, const char *event, void (*callback)( const char *event, xmlNode *msg)) { GList *list_item = NULL; cib_notify_client_t *new_client = NULL; if(cib->variant != cib_native) { crm_err("Not supported"); return cib_NOTSUPPORTED; } crm_debug_2("Adding callback for %s events (%d)", event, g_list_length(cib->notify_list)); crm_malloc0(new_client, sizeof(cib_notify_client_t)); new_client->event = event; new_client->callback = callback; list_item = g_list_find_custom( cib->notify_list, new_client, ciblib_GCompareFunc); if(list_item != NULL) { crm_warn("Callback already present"); crm_free(new_client); } else { cib->notify_list = g_list_append( cib->notify_list, new_client); cib->cmds->register_callback(cib, event, 1); crm_debug_3("Callback added (%d)", g_list_length(cib->notify_list)); } return cib_ok; } int cib_client_del_notify_callback( cib_t *cib, const char *event, void (*callback)( const char *event, xmlNode *msg)) { GList *list_item = NULL; cib_notify_client_t *new_client = NULL; if(cib->variant != cib_native) { crm_err("Not supported"); return cib_NOTSUPPORTED; } crm_debug("Removing callback for %s events", event); crm_malloc0(new_client, sizeof(cib_notify_client_t)); new_client->event = event; new_client->callback = callback; list_item = g_list_find_custom( cib->notify_list, new_client, ciblib_GCompareFunc); cib->cmds->register_callback(cib, event, 0); if(list_item != NULL) { cib_notify_client_t *list_client = list_item->data; cib->notify_list = g_list_remove(cib->notify_list, list_client); crm_free(list_client); crm_debug_3("Removed callback"); } else { crm_debug_3("Callback not present"); } crm_free(new_client); return cib_ok; } gint ciblib_GCompareFunc(gconstpointer a, gconstpointer b) { const cib_notify_client_t *a_client = a; const cib_notify_client_t *b_client = b; if(a_client->callback == b_client->callback && safe_str_neq(a_client->event, b_client->event)) { return 0; } else if(((long)a_client->callback) < ((long)b_client->callback)) { return -1; } return 1; } static gboolean cib_async_timeout_handler(gpointer data) { struct timer_rec_s *timer = data; int call_id = timer->call_id; cib_callback_client_t *blob = NULL; crm_err("Async call %d timed out after %ds", call_id, timer->timeout); /* Send an async reply with rc=cib_remote_timeout */ blob = g_hash_table_lookup(cib_op_callback_table, GINT_TO_POINTER(call_id)); if(blob != NULL && blob->callback != NULL) { crm_debug_3("Callback found for call %d", call_id); blob->callback(NULL, call_id, cib_remote_timeout, NULL, blob->user_data); } remove_cib_op_callback(timer->call_id, FALSE); /* Always return TRUE, never remove the handler * We do that in remove_cib_op_callback() */ return TRUE; } gboolean add_cib_op_callback_timeout( int call_id, int timeout, gboolean only_success, void *user_data, void (*callback)(xmlNode*, int, int, xmlNode*,void*)) { cib_callback_client_t *blob = NULL; if(call_id < 0) { crm_warn("CIB call failed: %s", cib_error2string(call_id)); if(only_success == FALSE) { callback(NULL, call_id, call_id, NULL, user_data); } return FALSE; } crm_malloc0(blob, sizeof(cib_callback_client_t)); blob->only_success = only_success; blob->user_data = user_data; blob->callback = callback; if(timeout > 0) { struct timer_rec_s *async_timer = NULL; crm_malloc0(async_timer, sizeof(struct timer_rec_s)); blob->timer = async_timer; async_timer->call_id = call_id; async_timer->timeout = timeout*1000; async_timer->ref = Gmain_timeout_add( async_timer->timeout, cib_async_timeout_handler, async_timer); } g_hash_table_insert(cib_op_callback_table, GINT_TO_POINTER(call_id), blob); return TRUE; } void remove_cib_op_callback(int call_id, gboolean all_callbacks) { if(all_callbacks) { if(cib_op_callback_table != NULL) { g_hash_table_destroy(cib_op_callback_table); } cib_op_callback_table = g_hash_table_new_full( g_direct_hash, g_direct_equal, NULL, cib_destroy_op_callback); } else { g_hash_table_remove( cib_op_callback_table, GINT_TO_POINTER(call_id)); } } int num_cib_op_callbacks(void) { if(cib_op_callback_table == NULL) { return 0; } return g_hash_table_size(cib_op_callback_table); } diff --git a/lib/crm/cib/cib_file.c b/lib/crm/cib/cib_file.c index 2940d79fd5..a1d49df63d 100644 --- a/lib/crm/cib/cib_file.c +++ b/lib/crm/cib/cib_file.c @@ -1,318 +1,320 @@ /* * Copyright (c) 2004 International Business Machines * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include typedef struct cib_file_opaque_s { int flags; char *filename; } cib_file_opaque_t; int cib_file_perform_op( cib_t *cib, const char *op, const char *host, const char *section, xmlNode *data, xmlNode **output_data, int call_options); int cib_file_signon(cib_t* cib, const char *name, enum cib_conn_type type); int cib_file_signoff(cib_t* cib); int cib_file_free(cib_t* cib); static gboolean cib_file_msgready(cib_t* cib) { return FALSE; } static IPC_Channel *cib_file_channel(cib_t* cib) { return NULL; } static int cib_file_inputfd(cib_t* cib) { return cib_NOTSUPPORTED; } static int cib_file_rcvmsg(cib_t* cib, int blocking) { return cib_NOTSUPPORTED; } static gboolean cib_file_dispatch(IPC_Channel *channel, gpointer user_data) { return FALSE; } static int cib_file_set_connection_dnotify( cib_t *cib, void (*dnotify)(gpointer user_data)) { return cib_NOTSUPPORTED; } static int cib_file_register_callback(cib_t* cib, const char *callback, int enabled) { return cib_NOTSUPPORTED; } -cib_t *cib_file_new (cib_t *cib, const char *cib_location); - cib_t* -cib_file_new (cib_t *cib, const char *cib_location) +cib_file_new (const char *cib_location) { cib_file_opaque_t *private = NULL; + cib_t *cib = cib_new_variant(); + crm_malloc0(private, sizeof(cib_file_opaque_t)); + cib->variant = cib_file; cib->variant_opaque = private; if(cib_location == NULL) { cib_location = getenv("CIB_file"); } private->filename = crm_strdup(cib_location); /* assign variant specific ops*/ cib->cmds->variant_op = cib_file_perform_op; cib->cmds->signon = cib_file_signon; cib->cmds->signoff = cib_file_signoff; cib->cmds->free = cib_file_free; cib->cmds->channel = cib_file_channel; cib->cmds->inputfd = cib_file_inputfd; cib->cmds->msgready = cib_file_msgready; cib->cmds->rcvmsg = cib_file_rcvmsg; cib->cmds->dispatch = cib_file_dispatch; cib->cmds->register_callback = cib_file_register_callback; cib->cmds->set_connection_dnotify = cib_file_set_connection_dnotify; return cib; } + static xmlNode *in_mem_cib = NULL; static int load_file_cib(const char *filename) { int rc = cib_ok; struct stat buf; xmlNode *root = NULL; FILE *cib_file = NULL; gboolean dtd_ok = TRUE; const char *ignore_dtd = NULL; xmlNode *status = NULL; rc = stat(filename, &buf); if (rc == 0) { cib_file = fopen(filename, "r"); if(cib_file == NULL) { cl_perror("Could not open config file %s for reading", filename); return cib_not_authorized; } else { root = file2xml(cib_file, FALSE); fclose(cib_file); } } rc = 0; if(root == NULL) { crm_warn("Continuing with an empty configuration."); root = createEmptyCib(); } status = find_xml_node(root, XML_CIB_TAG_STATUS, FALSE); if(status == NULL) { create_xml_node(root, XML_CIB_TAG_STATUS); } ignore_dtd = crm_element_value(root, "ignore_dtd"); dtd_ok = validate_with_dtd(root, TRUE, DTD_DIRECTORY"/crm.dtd"); if(dtd_ok == FALSE) { crm_err("CIB does not validate against "DTD_DIRECTORY"/crm.dtd"); if(ignore_dtd != NULL && crm_is_true(ignore_dtd) == FALSE) { rc = cib_dtd_validation; goto bail; } } if(do_id_check(root, NULL, TRUE, FALSE)) { crm_err("%s does not contain a vaild configuration:" " ID check failed", filename); rc = cib_id_check; goto bail; } in_mem_cib = root; return rc; bail: free_xml(root); root = NULL; return rc; } int cib_file_signon(cib_t* cib, const char *name, enum cib_conn_type type) { int rc = cib_ok; cib_file_opaque_t *private = cib->variant_opaque; if(private->filename == FALSE) { rc = cib_missing; } else { rc = load_file_cib(private->filename); } if(rc == cib_ok) { fprintf(stderr, "%s: Opened connection to local file '%s'\n", name, private->filename); cib->state = cib_connected_command; cib->type = cib_command; } else { fprintf(stderr, "%s: Connection to local file '%s' failed: %s\n", name, private->filename, cib_error2string(rc)); } return rc; } int cib_file_signoff(cib_t* cib) { int rc = cib_ok; cib_file_opaque_t *private = cib->variant_opaque; crm_debug("Signing out of the CIB Service"); rc = write_xml_file(in_mem_cib, private->filename, FALSE); if(rc > 0) { crm_info("Wrote CIB to %s", private->filename); } else { crm_err("Could not write CIB to %s", private->filename); } crm_free(in_mem_cib); cib->state = cib_disconnected; cib->type = cib_none; return rc; } int cib_file_free (cib_t* cib) { int rc = cib_ok; crm_warn("Freeing CIB"); if(cib->state != cib_disconnected) { rc = cib_file_signoff(cib); if(rc == cib_ok) { cib_file_opaque_t *private = cib->variant_opaque; crm_free(private->filename); crm_free(cib->cmds); crm_free(private); crm_free(cib); } } return rc; } struct cib_func_entry { const char *op; gboolean read_only; cib_op_t fn; }; static struct cib_func_entry cib_file_ops[] = { {CIB_OP_QUERY, TRUE, cib_process_query}, {CIB_OP_MODIFY, FALSE, cib_process_modify}, /* {CIB_OP_UPDATE, FALSE, cib_process_change}, */ {CIB_OP_APPLY_DIFF, FALSE, cib_process_diff}, {CIB_OP_BUMP, FALSE, cib_process_bump}, {CIB_OP_REPLACE, FALSE, cib_process_replace}, /* {CIB_OP_CREATE, FALSE, cib_process_change}, */ {CIB_OP_DELETE, FALSE, cib_process_delete}, {CIB_OP_ERASE, FALSE, cib_process_erase}, }; int cib_file_perform_op( cib_t *cib, const char *op, const char *host, const char *section, xmlNode *data, xmlNode **output_data, int call_options) { int rc = cib_ok; gboolean query = FALSE; gboolean changed = FALSE; xmlNode *output = NULL; xmlNode *result_cib = NULL; cib_op_t *fn = NULL; int lpc = 0; static int max_msg_types = DIMOF(cib_file_ops); crm_info("%s on %s", op, section); if(cib->state == cib_disconnected) { return cib_not_connected; } if(output_data != NULL) { *output_data = NULL; } if(op == NULL) { crm_err("No operation specified"); return cib_operation; } for (lpc = 0; lpc < max_msg_types; lpc++) { if (safe_str_eq(op, cib_file_ops[lpc].op)) { fn = &(cib_file_ops[lpc].fn); query = cib_file_ops[lpc].read_only; break; } } if(fn == NULL) { return cib_NOTSUPPORTED; } cib->call_id++; rc = cib_perform_op(op, call_options, fn, query, section, NULL, data, TRUE, &changed, in_mem_cib, &result_cib, &output); if(rc != cib_ok) { crm_err("Call failed: %s", cib_error2string(rc)); crm_log_xml(LOG_WARNING, "failed", output); free_xml(result_cib); } else if(query == FALSE) { free_xml(in_mem_cib); in_mem_cib = result_cib; } if(output_data && output) { *output_data = copy_xml(output); } if(cib->op_callback != NULL) { cib->op_callback(NULL, cib->call_id, rc, output); } return rc; } diff --git a/lib/crm/cib/cib_native.c b/lib/crm/cib/cib_native.c index bb587aedf3..e45d9ee898 100644 --- a/lib/crm/cib/cib_native.c +++ b/lib/crm/cib/cib_native.c @@ -1,881 +1,883 @@ /* * Copyright (c) 2004 International Business Machines * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include typedef struct cib_native_opaque_s { IPC_Channel *command_channel; IPC_Channel *callback_channel; GCHSource *callback_source; } cib_native_opaque_t; int cib_native_perform_op( cib_t *cib, const char *op, const char *host, const char *section, xmlNode *data, xmlNode **output_data, int call_options); int cib_native_signon(cib_t* cib, const char *name, enum cib_conn_type type); int cib_native_signoff(cib_t* cib); int cib_native_free(cib_t* cib); IPC_Channel *cib_native_channel(cib_t* cib); int cib_native_inputfd(cib_t* cib); gboolean cib_native_msgready(cib_t* cib); int cib_native_rcvmsg(cib_t* cib, int blocking); gboolean cib_native_dispatch(IPC_Channel *channel, gpointer user_data); -cib_t *cib_native_new (cib_t *cib); int cib_native_set_connection_dnotify( cib_t *cib, void (*dnotify)(gpointer user_data)); void cib_native_notify(gpointer data, gpointer user_data); void cib_native_callback(cib_t *cib, xmlNode *msg); int cib_native_register_callback(cib_t* cib, const char *callback, int enabled); cib_t* -cib_native_new (cib_t *cib) +cib_native_new (void) { cib_native_opaque_t *native = NULL; + cib_t *cib = cib_new_variant(); + crm_malloc0(native, sizeof(cib_native_opaque_t)); + + cib->variant = cib_native; + cib->variant_opaque = native; native->command_channel = NULL; native->callback_channel = NULL; - - cib->variant_opaque = native; /* assign variant specific ops*/ cib->cmds->variant_op = cib_native_perform_op; cib->cmds->signon = cib_native_signon; cib->cmds->signoff = cib_native_signoff; cib->cmds->free = cib_native_free; cib->cmds->channel = cib_native_channel; cib->cmds->inputfd = cib_native_inputfd; cib->cmds->msgready = cib_native_msgready; cib->cmds->rcvmsg = cib_native_rcvmsg; cib->cmds->dispatch = cib_native_dispatch; cib->cmds->register_callback = cib_native_register_callback; cib->cmds->set_connection_dnotify = cib_native_set_connection_dnotify; return cib; } int cib_native_signon(cib_t* cib, const char *name, enum cib_conn_type type) { int rc = cib_ok; char *uuid_ticket = NULL; xmlNode *reg_msg = NULL; cib_native_opaque_t *native = cib->variant_opaque; crm_debug_4("Connecting command channel"); if(type == cib_command) { cib->state = cib_connected_command; native->command_channel = init_client_ipc_comms_nodispatch( cib_channel_rw); } else if(type == cib_query) { cib->state = cib_connected_query; native->command_channel = init_client_ipc_comms_nodispatch( cib_channel_ro); } else if(type == cib_query_synchronous) { cib->state = cib_connected_query; native->command_channel = init_client_ipc_comms_nodispatch( cib_channel_ro_synchronous); } else if(type == cib_command_synchronous) { cib->state = cib_connected_query; native->command_channel = init_client_ipc_comms_nodispatch( cib_channel_rw_synchronous); } else { return cib_not_connected; } if(native->command_channel == NULL) { crm_debug("Connection to command channel failed"); rc = cib_connection; } else if(native->command_channel->ch_status != IPC_CONNECT) { crm_err("Connection may have succeeded," " but authentication to command channel failed"); rc = cib_authentication; } if(type == cib_query_synchronous || type == cib_command_synchronous) { return rc; } if(rc == cib_ok) { crm_debug_4("Connecting callback channel"); native->callback_source = init_client_ipc_comms( cib_channel_callback, cib_native_dispatch, cib, &(native->callback_channel)); if(native->callback_channel == NULL) { crm_debug("Connection to callback channel failed"); rc = cib_connection; } else if(native->callback_channel->ch_status != IPC_CONNECT) { crm_err("Connection may have succeeded," " but authentication to callback channel failed"); rc = cib_authentication; } else if(native->callback_source == NULL) { crm_err("Callback source not recorded"); rc = cib_connection; } else { native->callback_channel->send_queue->max_qlen = 500; } } if(rc == cib_ok) { crm_debug_4("Waiting for msg on command channel"); reg_msg = xmlfromIPC(native->command_channel, 0); if(native->command_channel->ops->get_chan_status( native->command_channel) != IPC_CONNECT) { crm_err("No reply message - disconnected - %d", rc); rc = cib_not_connected; } else if(rc != IPC_OK) { crm_err("No reply message - failed - %d", rc); rc = cib_reply_failed; } else if(reg_msg == NULL) { crm_err("No reply message - empty - %d", rc); rc = cib_reply_failed; } } if(rc == cib_ok) { const char *msg_type = NULL; msg_type = crm_element_value(reg_msg, F_CIB_OPERATION); if(safe_str_neq(msg_type, CRM_OP_REGISTER) ) { crm_err("Invalid registration message: %s", msg_type); rc = cib_registration_msg; } else { const char *tmp_ticket = NULL; crm_debug_4("Retrieving callback channel ticket"); tmp_ticket = crm_element_value( reg_msg, F_CIB_CALLBACK_TOKEN); if(tmp_ticket == NULL) { rc = cib_callback_token; } else { uuid_ticket = crm_strdup(tmp_ticket); } } } if(reg_msg != NULL) { free_xml(reg_msg); reg_msg = NULL; } if(rc == cib_ok) { crm_debug_4("Registering callback channel with ticket %s", crm_str(uuid_ticket)); reg_msg = create_xml_node(NULL, __FUNCTION__); crm_xml_add(reg_msg, F_CIB_OPERATION, CRM_OP_REGISTER); crm_xml_add(reg_msg, F_CIB_CALLBACK_TOKEN, uuid_ticket); crm_xml_add(reg_msg, F_CIB_CLIENTNAME, name); if(send_ipc_message( native->callback_channel, reg_msg) == FALSE) { rc = cib_callback_register; } free_xml(reg_msg); crm_free(uuid_ticket); } if(rc == cib_ok) { /* In theory IPC_INTR could trip us up here */ crm_debug_4("wait for the callback channel setup to complete"); reg_msg = xmlfromIPC(native->callback_channel, 0); if(native->callback_channel->ops->get_chan_status( native->callback_channel) != IPC_CONNECT) { crm_err("No reply message - disconnected - %d", rc); rc = cib_not_connected; } else if(reg_msg == NULL) { crm_err("No reply message - empty - %d", rc); rc = cib_reply_failed; } free_xml(reg_msg); } if(rc == cib_ok) { crm_debug("Connection to CIB successful"); return cib_ok; } crm_debug("Connection to CIB failed: %s", cib_error2string(rc)); cib_native_signoff(cib); return rc; } int cib_native_signoff(cib_t* cib) { cib_native_opaque_t *native = cib->variant_opaque; crm_debug("Signing out of the CIB Service"); /* close channels */ if (native->command_channel != NULL) { native->command_channel->ops->destroy( native->command_channel); native->command_channel = NULL; } if (native->callback_source != NULL) { G_main_del_IPC_Channel(native->callback_source); native->callback_source = NULL; } if (native->callback_channel != NULL) { #ifdef BUG native->callback_channel->ops->destroy( native->callback_channel); #endif native->callback_channel = NULL; } cib->state = cib_disconnected; cib->type = cib_none; return cib_ok; } int cib_native_free (cib_t* cib) { int rc = cib_ok; crm_warn("Freeing CIB"); if(cib->state != cib_disconnected) { rc = cib_native_signoff(cib); if(rc == cib_ok) { crm_free(cib->variant_opaque); crm_free(cib->cmds); crm_free(cib); } } return rc; } IPC_Channel * cib_native_channel(cib_t* cib) { cib_native_opaque_t *native = NULL; if(cib == NULL) { crm_err("Missing cib object"); return NULL; } native = cib->variant_opaque; if(native != NULL) { return native->callback_channel; } crm_err("couldnt find variant specific data in %p", cib); return NULL; } int cib_native_inputfd(cib_t* cib) { IPC_Channel *ch = cib_native_channel(cib); return ch->ops->get_recv_select_fd(ch); } static xmlNode * cib_create_op( int call_id, const char *op, const char *host, const char *section, xmlNode *data, int call_options) { int rc = HA_OK; xmlNode *op_msg = create_xml_node(NULL, "cib-op"); CRM_CHECK(op_msg != NULL, return NULL); crm_xml_add(op_msg, F_XML_TAGNAME, "cib_command"); crm_xml_add(op_msg, F_TYPE, T_CIB); crm_xml_add(op_msg, F_CIB_OPERATION, op); crm_xml_add(op_msg, F_CIB_HOST, host); crm_xml_add(op_msg, F_CIB_SECTION, section); crm_xml_add_int(op_msg, F_CIB_CALLID, call_id); crm_debug_4("Sending call options: %.8lx, %d", (long)call_options, call_options); crm_xml_add_int(op_msg, F_CIB_CALLOPTS, call_options); if(data != NULL) { #if 0 const char *tag = crm_element_name(data); xmlNode *cib = data; if(safe_str_neq(tag, XML_TAG_CIB)) { cib = find_xml_node(data, XML_TAG_CIB, FALSE); if(cib != NULL) { tag = XML_TAG_CIB; } } if(safe_str_eq(tag, XML_TAG_CIB)) { const char *version = feature_set(cib); crm_xml_add(cib, XML_ATTR_CIB_REVISION, version); } else { crm_info("Skipping feature check for %s tag", tag); } #endif add_message_xml(op_msg, F_CIB_CALLDATA, data); } if (rc != HA_OK) { crm_err("Failed to create CIB operation message"); crm_log_xml(LOG_ERR, "op", op_msg); free_xml(op_msg); return NULL; } if(call_options & cib_inhibit_bcast) { CRM_CHECK((call_options & cib_scope_local), return NULL); } return op_msg; } static gboolean timer_expired = FALSE; static struct timer_rec_s *sync_timer = NULL; static gboolean cib_timeout_handler(gpointer data) { struct timer_rec_s *timer = data; timer_expired = TRUE; crm_err("Call %d timed out after %ds", timer->call_id, timer->timeout); /* Always return TRUE, never remove the handler * We do that after the while-loop in cib_native_perform_op() */ return TRUE; } int cib_native_perform_op( cib_t *cib, const char *op, const char *host, const char *section, xmlNode *data, xmlNode **output_data, int call_options) { int rc = HA_OK; xmlNode *op_msg = NULL; xmlNode *op_reply = NULL; cib_native_opaque_t *native = cib->variant_opaque; if(sync_timer == NULL) { crm_malloc0(sync_timer, sizeof(struct timer_rec_s)); } if(cib->state == cib_disconnected) { return cib_not_connected; } if(output_data != NULL) { *output_data = NULL; } if(op == NULL) { crm_err("No operation specified"); return cib_operation; } cib->call_id++; /* prevent call_id from being negative (or zero) and conflicting * with the cib_errors enum * use 2 because we use it as (cib->call_id - 1) below */ if(cib->call_id < 1) { cib->call_id = 1; } op_msg = cib_create_op( cib->call_id, op, host, section, data, call_options); if(op_msg == NULL) { return cib_create_msg; } crm_debug_3("Sending %s message to CIB service", op); if(send_ipc_message(native->command_channel, op_msg) == FALSE) { crm_err("Sending message to CIB service FAILED"); free_xml(op_msg); return cib_send_failed; } else { crm_debug_3("Message sent"); } free_xml(op_msg); if((call_options & cib_discard_reply)) { crm_debug_3("Discarding reply"); return cib_ok; } else if(!(call_options & cib_sync_call)) { crm_debug_3("Async call, returning"); CRM_CHECK(cib->call_id != 0, return cib_reply_failed); return cib->call_id; } rc = IPC_OK; crm_debug_3("Waiting for a syncronous reply"); if(cib->call_timeout > 0) { /* We need this, even with msgfromIPC_timeout(), because we might * get other/older replies that don't match the active request */ timer_expired = FALSE; sync_timer->call_id = cib->call_id; sync_timer->timeout = cib->call_timeout*1000; sync_timer->ref = Gmain_timeout_add( sync_timer->timeout, cib_timeout_handler, sync_timer); } while(timer_expired == FALSE && IPC_ISRCONN(native->command_channel)) { int reply_id = -1; int msg_id = cib->call_id; op_reply = xmlfromIPC(native->command_channel, cib->call_timeout); if(op_reply == NULL) { break; } crm_element_value_int(op_reply, F_CIB_CALLID, &reply_id); CRM_CHECK(reply_id > 0, free_xml(op_reply); if(sync_timer->ref > 0) { g_source_remove(sync_timer->ref); sync_timer->ref = 0; } return cib_reply_failed); if(reply_id == msg_id) { break; } else if(reply_id < msg_id) { crm_debug("Recieved old reply: %d (wanted %d)", reply_id, msg_id); crm_log_xml( LOG_MSG, "Old reply", op_reply); } else if((reply_id - 10000) > msg_id) { /* wrap-around case */ crm_debug("Recieved old reply: %d (wanted %d)", reply_id, msg_id); crm_log_xml( LOG_MSG, "Old reply", op_reply); } else { crm_err("Received a __future__ reply:" " %d (wanted %d)", reply_id, msg_id); } free_xml(op_reply); op_reply = NULL; } if(sync_timer->ref > 0) { g_source_remove(sync_timer->ref); sync_timer->ref = 0; } if(timer_expired) { return cib_remote_timeout; } if(op_reply == NULL) { if(IPC_ISRCONN(native->command_channel) == FALSE) { crm_err("No reply message - disconnected - %d", native->command_channel->ch_status); cib->state = cib_disconnected; return cib_not_connected; } crm_err("No reply message - empty - %d", rc); return cib_reply_failed; } if(IPC_ISRCONN(native->command_channel) == FALSE) { crm_err("CIB disconnected: %d", native->command_channel->ch_status); cib->state = cib_disconnected; } crm_debug_3("Syncronous reply recieved"); rc = cib_ok; /* Start processing the reply... */ if(crm_element_value_int(op_reply, F_CIB_RC, &rc) != 0) { rc = cib_return_code; } if(rc == cib_diff_resync) { /* This is an internal value that clients do not and should not care about */ rc = cib_ok; } if(rc == cib_ok || rc == cib_not_master || rc == cib_master_timeout) { crm_log_xml(LOG_MSG, "passed", op_reply); } else { /* } else if(rc == cib_remote_timeout) { */ crm_err("Call failed: %s", cib_error2string(rc)); crm_log_xml(LOG_WARNING, "failed", op_reply); } if(output_data == NULL) { /* do nothing more */ } else if(!(call_options & cib_discard_reply)) { xmlNode *tmp = get_message_xml(op_reply, F_CIB_CALLDATA); if(tmp == NULL) { crm_debug_3("No output in reply to \"%s\" command %d", op, cib->call_id - 1); } else { *output_data = copy_xml(tmp); } } free_xml(op_reply); return rc; } gboolean cib_native_msgready(cib_t* cib) { cib_native_opaque_t *native = NULL; if (cib == NULL) { crm_err("No CIB!"); return FALSE; } native = cib->variant_opaque; if(native->command_channel != NULL) { /* drain the channel */ IPC_Channel *cmd_ch = native->command_channel; xmlNode *cmd_msg = NULL; while(cmd_ch->ch_status != IPC_DISCONNECT && cmd_ch->ops->is_message_pending(cmd_ch)) { /* this will happen when the CIB exited from beneath us */ cmd_msg = xmlfromIPC(cmd_ch, 0); free_xml(cmd_msg); } } else { crm_err("No command channel"); } if(native->callback_channel == NULL) { crm_err("No callback channel"); return FALSE; } else if(native->callback_channel->ch_status == IPC_DISCONNECT) { crm_info("Lost connection to the CIB service [%d].", native->callback_channel->farside_pid); return FALSE; } else if(native->callback_channel->ops->is_message_pending( native->callback_channel)) { crm_debug_4("Message pending on command channel [%d]", native->callback_channel->farside_pid); return TRUE; } crm_debug_3("No message pending"); return FALSE; } int cib_native_rcvmsg(cib_t* cib, int blocking) { const char *type = NULL; xmlNode* msg = NULL; cib_native_opaque_t *native = NULL; if (cib == NULL) { crm_err("No CIB!"); return FALSE; } native = cib->variant_opaque; /* if it is not blocking mode and no message in the channel, return */ if (blocking == 0 && cib_native_msgready(cib) == FALSE) { crm_debug_3("No message ready and non-blocking..."); return 0; } else if (cib_native_msgready(cib) == FALSE) { crm_debug("Waiting for message from CIB service..."); if(native->callback_channel == NULL) { return 0; } else if(native->callback_channel->ch_status != IPC_CONNECT) { return 0; } else if(native->command_channel && native->command_channel->ch_status != IPC_CONNECT){ return 0; } native->callback_channel->ops->waitin(native->callback_channel); } /* IPC_INTR is not a factor here */ msg = xmlfromIPC(native->callback_channel, 0); if (msg == NULL) { crm_warn("Received a NULL msg from CIB service."); return 0; } /* do callbacks */ type = crm_element_value(msg, F_TYPE); crm_debug_4("Activating %s callbacks...", type); if(safe_str_eq(type, T_CIB)) { cib_native_callback(cib, msg); } else if(safe_str_eq(type, T_CIB_NOTIFY)) { g_list_foreach(cib->notify_list, cib_native_notify, msg); } else { crm_err("Unknown message type: %s", type); } free_xml(msg); return 1; } void cib_native_callback(cib_t *cib, xmlNode *msg) { int rc = 0; int call_id = 0; xmlNode *output = NULL; cib_callback_client_t *blob = NULL; cib_callback_client_t local_blob; /* gcc4 had a point... make sure (at least) local_blob.callback * is initialized before use */ local_blob.callback = NULL; local_blob.user_data = NULL; local_blob.only_success = FALSE; crm_element_value_int(msg, F_CIB_CALLID, &call_id); blob = g_hash_table_lookup( cib_op_callback_table, GINT_TO_POINTER(call_id)); if(blob != NULL) { crm_debug_3("Callback found for call %d", call_id); /* local_blob.callback = blob->callback; */ /* local_blob.user_data = blob->user_data; */ /* local_blob.only_success = blob->only_success; */ local_blob = *blob; blob = NULL; remove_cib_op_callback(call_id, FALSE); } else { crm_debug_3("No callback found for call %d", call_id); local_blob.callback = NULL; } crm_element_value_int(msg, F_CIB_RC, &rc); if(rc == cib_diff_resync) { /* This is an internal value that clients do not and should not care about */ rc = cib_ok; } output = get_message_xml(msg, F_CIB_CALLDATA); if(local_blob.callback != NULL && (rc == cib_ok || local_blob.only_success == FALSE)) { local_blob.callback( msg, call_id, rc, output, local_blob.user_data); } else if(cib->op_callback == NULL && rc != cib_ok) { crm_warn("CIB command failed: %s", cib_error2string(rc)); crm_log_xml(LOG_DEBUG, "Failed CIB Update", msg); } if(cib->op_callback == NULL) { crm_debug_3("No OP callback set, ignoring reply"); } else { cib->op_callback(msg, call_id, rc, output); } crm_debug_4("OP callback activated."); } void cib_native_notify(gpointer data, gpointer user_data) { xmlNode *msg = user_data; cib_notify_client_t *entry = data; const char *event = NULL; if(msg == NULL) { crm_warn("Skipping callback - NULL message"); return; } event = crm_element_value(msg, F_SUBTYPE); if(entry == NULL) { crm_warn("Skipping callback - NULL callback client"); return; } else if(entry->callback == NULL) { crm_warn("Skipping callback - NULL callback"); return; } else if(safe_str_neq(entry->event, event)) { crm_debug_4("Skipping callback - event mismatch %p/%s vs. %s", entry, entry->event, event); return; } crm_debug_4("Invoking callback for %p/%s event...", entry, event); entry->callback(event, msg); crm_debug_4("Callback invoked..."); } gboolean cib_native_dispatch(IPC_Channel *channel, gpointer user_data) { int lpc = 0; cib_t *cib = user_data; cib_native_opaque_t *native = NULL; crm_debug_3("Received callback"); if(user_data == NULL){ crm_err("user_data field must contain the CIB struct"); return FALSE; } native = cib->variant_opaque; while(cib_native_msgready(cib)) { lpc++; /* invoke the callbacks but dont block */ if(cib_native_rcvmsg(cib, 0) < 1) { break; } } crm_debug_3("%d CIB messages dispatched", lpc); if(native->callback_channel && native->callback_channel->ch_status != IPC_CONNECT) { crm_crit("Lost connection to the CIB service [%d/callback].", channel->farside_pid); if(native->callback_source != NULL) { G_main_del_IPC_Channel(native->callback_source); native->callback_source = NULL; } return FALSE; } else if(native->command_channel && native->command_channel->ch_status != IPC_CONNECT) { crm_crit("Lost connection to the CIB service [%d/command].", channel->farside_pid); return FALSE; } return TRUE; } int cib_native_set_connection_dnotify( cib_t *cib, void (*dnotify)(gpointer user_data)) { cib_native_opaque_t *native = NULL; if (cib == NULL) { crm_err("No CIB!"); return FALSE; } native = cib->variant_opaque; if(dnotify == NULL) { crm_warn("Setting dnotify back to default value"); set_IPC_Channel_dnotify(native->callback_source, default_ipc_connection_destroy); } else { crm_debug_3("Setting dnotify"); set_IPC_Channel_dnotify(native->callback_source, dnotify); } return cib_ok; } int cib_native_register_callback(cib_t* cib, const char *callback, int enabled) { xmlNode *notify_msg = create_xml_node(NULL, "cib-callback"); cib_native_opaque_t *native = cib->variant_opaque; /* short term hack - should make this generic somehow */ crm_xml_add(notify_msg, F_CIB_OPERATION, T_CIB_NOTIFY); crm_xml_add(notify_msg, F_CIB_NOTIFY_TYPE, callback); crm_xml_add_int(notify_msg, F_CIB_NOTIFY_ACTIVATE, enabled); send_ipc_message(native->callback_channel, notify_msg); free_xml(notify_msg); return cib_ok; } diff --git a/lib/crm/cib/cib_private.h b/lib/crm/cib/cib_private.h index 94692ec636..cdd3a31585 100644 --- a/lib/crm/cib/cib_private.h +++ b/lib/crm/cib/cib_private.h @@ -1,59 +1,61 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef CIB_PRIVATE__H #define CIB_PRIVATE__H #include extern GHashTable *cib_op_callback_table; typedef struct cib_notify_client_s { const char *event; const char *obj_id; /* implement one day */ const char *obj_type; /* implement one day */ void (*callback)(const char *event, xmlNode *msg); } cib_notify_client_t; typedef struct cib_callback_client_s { void (*callback)(xmlNode*, int, int, xmlNode*, void*); void *user_data; gboolean only_success; struct timer_rec_s *timer; } cib_callback_client_t; struct timer_rec_s { int call_id; int timeout; guint ref; }; typedef enum cib_errors (*cib_op_t)(const char *, int, const char *, xmlNode *, xmlNode*, xmlNode*, xmlNode**, xmlNode**); +extern cib_t *cib_new_variant(void); + enum cib_errors cib_perform_op(const char *op, int call_options, cib_op_t *fn, gboolean is_query, const char *section, xmlNode *req, xmlNode *input, gboolean manage_counters, gboolean *config_changed, xmlNode *current_cib, xmlNode **result_cib, xmlNode **output); #endif