diff --git a/cib/callbacks.c b/cib/callbacks.c index fc61bc051b..dcf4854faf 100644 --- a/cib/callbacks.c +++ b/cib/callbacks.c @@ -1,1391 +1,1345 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "common.h" extern GMainLoop *mainloop; extern gboolean cib_shutdown_flag; extern gboolean stand_alone; extern const char *cib_root; +qb_ipcs_service_t *ipcs_ro = NULL; +qb_ipcs_service_t *ipcs_rw = NULL; + #if SUPPORT_HEARTBEAT extern ll_cluster_t *hb_conn; #endif -extern void cib_ha_connection_destroy(gpointer user_data); - extern enum cib_errors cib_update_counter(xmlNode * xml_obj, const char *field, gboolean reset); extern void GHFunc_count_peers(gpointer key, gpointer value, gpointer user_data); -void initiate_exit(void); void terminate_cib(const char *caller, gboolean fast); gint cib_GCompareFunc(gconstpointer a, gconstpointer b); gboolean can_write(int flags); void send_cib_replace(const xmlNode * sync_request, const char *host); void cib_process_request(xmlNode * request, gboolean privileged, gboolean force_synchronous, gboolean from_peer, cib_client_t * cib_client); -void cib_common_callback_worker(xmlNode * op_request, cib_client_t * cib_client, - gboolean force_synchronous, gboolean privileged); extern GHashTable *client_list; int next_client_id = 0; extern const char *cib_our_uname; extern unsigned long cib_num_ops, cib_num_local, cib_num_updates, cib_num_fail; extern unsigned long cib_bad_connects, cib_num_timeouts; extern longclock_t cib_call_time; extern enum cib_errors cib_status; -int send_via_callback_channel(xmlNode * msg, const char *token); - enum cib_errors cib_process_command(xmlNode * request, xmlNode ** reply, xmlNode ** cib_diff, gboolean privileged); -gboolean cib_common_callback(IPC_Channel * channel, cib_client_t * cib_client, - gboolean force_synchronous, gboolean privileged); +gboolean cib_common_callback(qb_ipcs_connection_t *c, void *data, size_t size, gboolean privileged); -gboolean cib_process_disconnect(IPC_Channel * channel, cib_client_t * cib_client); -int num_clients = 0; - -static void -cib_ipc_connection_destroy(gpointer user_data) +static int32_t +cib_ipc_accept(qb_ipcs_connection_t *c, uid_t uid, gid_t gid) { - cib_client_t *cib_client = user_data; - - /* cib_process_disconnect */ - - if (cib_client == NULL) { - crm_trace("Destroying %p", user_data); - return; - } - - if (cib_client->source != NULL) { - /* Should this even be necessary? */ - crm_trace("Deleting %s (%p) from mainloop", cib_client->name, cib_client->source); - G_main_del_IPC_Channel(cib_client->source); - cib_client->source = NULL; + crm_trace("Connecting %p for uid=%d gid=%d pid=%d", c, uid, gid, crm_ipcs_client_pid(c)); + if (cib_shutdown_flag) { + crm_info("Ignoring new client [%d] during shutdown", crm_ipcs_client_pid(c)); + return -EPERM; } - - crm_trace("Destroying %s (%p)", cib_client->name, user_data); - num_clients--; - crm_trace("Num unfree'd clients: %d", num_clients); - crm_free(cib_client->name); - crm_free(cib_client->callback_id); - crm_free(cib_client->id); - crm_free(cib_client->user); - crm_free(cib_client); - crm_trace("Freed the cib client"); - - return; + return 0; } -gboolean -cib_client_connect(IPC_Channel * channel, gpointer user_data) +static void +cib_ipc_created(qb_ipcs_connection_t *c) { cl_uuid_t client_id; - xmlNode *reg_msg = NULL; cib_client_t *new_client = NULL; char uuid_str[UU_UNPARSE_SIZEOF]; - const char *channel_name = user_data; - - gboolean(*callback) (IPC_Channel * channel, gpointer user_data); - - crm_trace("Connecting channel"); - - if (channel == NULL) { - crm_err("Channel was NULL"); - cib_bad_connects++; - return FALSE; - - } else if (channel->ch_status != IPC_CONNECT) { - crm_err("Channel was disconnected"); - cib_bad_connects++; - return FALSE; - - } else if (channel_name == NULL) { - crm_err("user_data must contain channel name"); - cib_bad_connects++; - return FALSE; - - } else if (cib_shutdown_flag) { - crm_info("Ignoring new client [%d] during shutdown", channel->farside_pid); - return FALSE; - } - - callback = cib_ro_callback; - if (safe_str_eq(channel_name, cib_channel_rw)) { - callback = cib_rw_callback; - } crm_malloc0(new_client, sizeof(cib_client_t)); - num_clients++; - new_client->channel = channel; - new_client->channel_name = channel_name; - - crm_trace("Created channel %p for channel %s", new_client, new_client->channel_name); - - channel->ops->set_recv_qlen(channel, 1024); - channel->ops->set_send_qlen(channel, 1024); + new_client->ipc = c; - new_client->source = G_main_add_IPC_Channel(G_PRIORITY_DEFAULT, channel, FALSE, callback, - new_client, cib_ipc_connection_destroy); - - crm_trace("Channel %s connected for client %s", new_client->channel_name, new_client->id); + crm_trace("%p connected for client %s", c, new_client->id); cl_uuid_generate(&client_id); cl_uuid_unparse(&client_id, uuid_str); CRM_CHECK(new_client->id == NULL, crm_free(new_client->id)); new_client->id = crm_strdup(uuid_str); /* make sure we can find ourselves later for sync calls * redirected to the master instance */ g_hash_table_insert(client_list, new_client->id, new_client); - reg_msg = create_xml_node(NULL, "callback"); - crm_xml_add(reg_msg, F_CIB_OPERATION, CRM_OP_REGISTER); - crm_xml_add(reg_msg, F_CIB_CLIENTID, new_client->id); + qb_ipcs_context_set(c, new_client); +} - send_ipc_message(channel, reg_msg); - free_xml(reg_msg); +static int32_t +cib_ipc_dispatch_rw(qb_ipcs_connection_t *c, void *data, size_t size) +{ + cib_client_t *cib_client = qb_ipcs_context_get(c); + crm_trace("%p message from %s", c, cib_client->id); + return cib_common_callback(c, data, size, TRUE); +} - return TRUE; +static int32_t +cib_ipc_dispatch_ro(qb_ipcs_connection_t *c, void *data, size_t size) +{ + cib_client_t *cib_client = qb_ipcs_context_get(c); + crm_trace("%p message from %s", c, cib_client->id); + return cib_common_callback(c, data, size, FALSE); } -gboolean -cib_rw_callback(IPC_Channel * channel, gpointer user_data) +/* Error code means? */ +static int32_t +cib_ipc_closed(qb_ipcs_connection_t *c) { - gboolean result = FALSE; + cib_client_t *cib_client = qb_ipcs_context_get(c); + crm_trace("Connection %p closed", c); + + CRM_ASSERT(cib_client != NULL); + CRM_ASSERT(cib_client->id != NULL); - result = cib_common_callback(channel, user_data, FALSE, TRUE); - return result; + if (!g_hash_table_remove(client_list, cib_client->id)) { + crm_err("Client %s not found in the hashtable", cib_client->name); + } + + return 0; } -gboolean -cib_ro_callback(IPC_Channel * channel, gpointer user_data) +static void +cib_ipc_destroy(qb_ipcs_connection_t *c) { - gboolean result = FALSE; + cib_client_t *cib_client = qb_ipcs_context_get(c); + + CRM_ASSERT(cib_client != NULL); + CRM_ASSERT(cib_client->id != NULL); + + /* In case we arrive here without a call to cib_ipc_close() */ + g_hash_table_remove(client_list, cib_client->id); - result = cib_common_callback(channel, user_data, FALSE, FALSE); - return result; + crm_trace("Destroying %s (%p)", cib_client->name, c); + crm_free(cib_client->name); + crm_free(cib_client->callback_id); + crm_free(cib_client->id); + crm_free(cib_client->user); + crm_free(cib_client); + crm_trace("Freed the cib client"); + + if (cib_shutdown_flag) { + cib_shutdown(0); + } } +struct qb_ipcs_service_handlers ipc_ro_callbacks = +{ + .connection_accept = cib_ipc_accept, + .connection_created = cib_ipc_created, + .msg_process = cib_ipc_dispatch_ro, + .connection_closed = cib_ipc_closed, + .connection_destroyed = cib_ipc_destroy +}; + +struct qb_ipcs_service_handlers ipc_rw_callbacks = +{ + .connection_accept = cib_ipc_accept, + .connection_created = cib_ipc_created, + .msg_process = cib_ipc_dispatch_rw, + .connection_closed = cib_ipc_closed, + .connection_destroyed = cib_ipc_destroy +}; + void -cib_common_callback_worker(xmlNode * op_request, cib_client_t * cib_client, - gboolean force_synchronous, gboolean privileged) +cib_common_callback_worker(xmlNode * op_request, cib_client_t * cib_client, gboolean privileged) { longclock_t call_stop = 0; longclock_t call_start = 0; const char *op = crm_element_value(op_request, F_CIB_OPERATION); if (crm_str_eq(op, CRM_OP_REGISTER, TRUE)) { + xmlNode *ack = create_xml_node(NULL, __FUNCTION__); + + crm_xml_add(ack, F_CIB_OPERATION, CRM_OP_REGISTER); + crm_xml_add(ack, F_CIB_CLIENTID, cib_client->id); + crm_ipcs_send(cib_client->ipc, ack, FALSE); + free_xml(ack); return; } else if (crm_str_eq(op, T_CIB_NOTIFY, TRUE)) { /* Update the notify filters for this client */ int on_off = 0; - const char *type = crm_element_value(op_request, F_CIB_NOTIFY_TYPE);; + int rc = cib_ok; + const char *type = crm_element_value(op_request, F_CIB_NOTIFY_TYPE); crm_element_value_int(op_request, F_CIB_NOTIFY_ACTIVATE, &on_off); crm_debug("Setting %s callbacks for %s (%s): %s", type, cib_client->name, cib_client->id, on_off ? "on" : "off"); if (safe_str_eq(type, T_CIB_POST_NOTIFY)) { cib_client->post_notify = on_off; } else if (safe_str_eq(type, T_CIB_PRE_NOTIFY)) { cib_client->pre_notify = on_off; } else if (safe_str_eq(type, T_CIB_UPDATE_CONFIRM)) { cib_client->confirmations = on_off; } else if (safe_str_eq(type, T_CIB_DIFF_NOTIFY)) { cib_client->diffs = on_off; } else if (safe_str_eq(type, T_CIB_REPLACE_NOTIFY)) { cib_client->replace = on_off; + } else { + rc = cib_NOTEXISTS; } + /* Already ack'd */ return; } cib_client->num_calls++; call_start = time_longclock(); - cib_process_request(op_request, force_synchronous, privileged, FALSE, cib_client); + cib_process_request(op_request, FALSE, privileged, FALSE, cib_client); call_stop = time_longclock(); cib_call_time += (call_stop - call_start); } -gboolean -cib_common_callback(IPC_Channel * channel, cib_client_t * cib_client, - gboolean force_synchronous, gboolean privileged) +int32_t +cib_common_callback(qb_ipcs_connection_t *c, void *data, size_t size, gboolean privileged) { - int lpc = 0; - const char *value = NULL; - xmlNode *op_request = NULL; - gboolean keep_channel = TRUE; + int call_options = 0; + xmlNode *op_request = crm_ipcs_recv(c, data, size); + cib_client_t *cib_client = qb_ipcs_context_get(c); - CRM_CHECK(cib_client != NULL, crm_err("Invalid client"); return FALSE); - CRM_CHECK(cib_client->id != NULL, crm_err("Invalid client: %p", cib_client); return FALSE); + if(op_request) { + crm_element_value_int(op_request, F_CIB_CALLOPTS, &call_options); + } + + if (op_request == NULL || cib_client == NULL) { + xmlNode *ack = create_xml_node(NULL, "nack"); - /* - * Do enough work to make entering worthwhile - * But don't allow a single client to monopolize the CIB - */ - while (lpc < 5 && IPC_ISRCONN(channel) - && channel->ops->is_message_pending(channel)) { + crm_trace("Sending nack to %p", cib_client); + crm_ipcs_send(c, ack, FALSE); + free_xml(ack); + return 0; - lpc++; - op_request = xmlfromIPC(channel, MAX_IPC_DELAY); - if (op_request == NULL) { - break; - } + } else if((call_options & cib_sync_call) == 0) { + xmlNode *ack = create_xml_node(NULL, "ack"); - if (cib_client->name == NULL) { - value = crm_element_value(op_request, F_CIB_CLIENTNAME); - if (value == NULL) { - cib_client->name = crm_itoa(channel->farside_pid); - } else { - cib_client->name = crm_strdup(value); - } + crm_trace("Sending a-sync ack"); + crm_ipcs_send(c, ack, FALSE); + free_xml(ack); + } + + if (cib_client->name == NULL) { + const char *value = crm_element_value(op_request, F_CIB_CLIENTNAME); + if (value == NULL) { + cib_client->name = crm_itoa(crm_ipcs_client_pid(c)); + } else { + cib_client->name = crm_strdup(value); } + } - crm_xml_add(op_request, F_CIB_CLIENTID, cib_client->id); - crm_xml_add(op_request, F_CIB_CLIENTNAME, cib_client->name); + if (cib_client->callback_id == NULL) { + const char *value = crm_element_value(op_request, F_CIB_CALLBACK_TOKEN); + if (value != NULL) { + cib_client->callback_id = crm_strdup(value); + + } else { + cib_client->callback_id = crm_strdup(cib_client->id); + } + } + + crm_xml_add(op_request, F_CIB_CLIENTID, cib_client->id); + crm_xml_add(op_request, F_CIB_CLIENTNAME, cib_client->name); #if ENABLE_ACL - determine_request_user(&cib_client->user, channel, op_request, F_CIB_USER); + determine_request_user(&cib_client->user, channel, op_request, F_CIB_USER); #endif - /* crm_log_xml_trace(op_request, "Client[inbound]"); */ - - if (cib_client->callback_id == NULL) { - value = crm_element_value(op_request, F_CIB_CALLBACK_TOKEN); - if (value != NULL) { - cib_client->callback_id = crm_strdup(value); - - } else { - cib_client->callback_id = crm_strdup(cib_client->id); - } - } + crm_log_xml_trace(op_request, "Client[inbound]"); - cib_common_callback_worker(op_request, cib_client, force_synchronous, privileged); - - free_xml(op_request); - } - - if (channel->ch_status != IPC_CONNECT) { - crm_trace("Client disconnected"); - keep_channel = cib_process_disconnect(channel, cib_client); - } - - return keep_channel; + cib_common_callback_worker(op_request, cib_client, privileged); + + return 0; } static void do_local_notify(xmlNode * notify_src, const char *client_id, gboolean sync_reply, gboolean from_peer) { /* send callback to originating child */ cib_client_t *client_obj = NULL; enum cib_errors local_rc = cib_ok; if (client_id != NULL) { client_obj = g_hash_table_lookup(client_list, client_id); } else { crm_trace("No client to sent the response to. F_CIB_CLIENTID not set."); } if (client_obj == NULL) { - local_rc = cib_reply_failed; + local_rc = cib_client_gone; } else { - const char *client_id = client_obj->callback_id; - crm_trace("Sending %ssync response to %s %s", sync_reply ? "" : "an a-", client_obj->name, from_peer ? "(originator of delegated request)" : ""); - if (sync_reply) { - client_id = client_obj->id; + if (client_obj->ipc && crm_ipcs_send(client_obj->ipc, notify_src, !sync_reply) < 0) { + local_rc = cib_reply_failed; + +#ifdef HAVE_GNUTLS_GNUTLS_H + } else if (client_obj->session) { + cib_send_remote_msg(client_obj->session, notify_src, client_obj->encrypted); +#endif + } else if(client_obj->ipc == NULL) { + crm_err("Unknown transport for %s", client_obj->name); } - local_rc = send_via_callback_channel(notify_src, client_id); } if (local_rc != cib_ok && client_obj != NULL) { crm_warn("%sSync reply to %s failed: %s", sync_reply ? "" : "A-", client_obj ? client_obj->name : "", cib_error2string(local_rc)); } } static void parse_local_options(cib_client_t * cib_client, int call_type, int call_options, const char *host, const char *op, gboolean * local_notify, gboolean * needs_reply, gboolean * process, gboolean * needs_forward) { if (cib_op_modifies(call_type) && !(call_options & cib_inhibit_bcast)) { /* we need to send an update anyway */ *needs_reply = TRUE; } else { *needs_reply = FALSE; } if (host == NULL && (call_options & cib_scope_local)) { crm_trace("Processing locally scoped %s op from %s", op, cib_client->name); *local_notify = TRUE; } else if (host == NULL && cib_is_master) { crm_trace("Processing master %s op locally from %s", op, cib_client->name); *local_notify = TRUE; } else if (safe_str_eq(host, cib_our_uname)) { crm_trace("Processing locally addressed %s op from %s", op, cib_client->name); *local_notify = TRUE; } else if (stand_alone) { *needs_forward = FALSE; *local_notify = TRUE; *process = TRUE; } else { crm_trace("%s op from %s needs to be forwarded to %s", op, cib_client->name, host ? host : "the master instance"); *needs_forward = TRUE; *process = FALSE; } } static gboolean parse_peer_options(int call_type, xmlNode * request, gboolean * local_notify, gboolean * needs_reply, gboolean * process, gboolean * needs_forward) { const char *op = NULL; const char *host = NULL; const char *delegated = NULL; const char *originator = crm_element_value(request, F_ORIG); const char *reply_to = crm_element_value(request, F_CIB_ISREPLY); const char *update = crm_element_value(request, F_CIB_GLOBAL_UPDATE); gboolean is_reply = safe_str_eq(reply_to, cib_our_uname); if (crm_is_true(update)) { *needs_reply = FALSE; if (is_reply) { *local_notify = TRUE; crm_trace("Processing global/peer update from %s" " that originated from us", originator); } else { crm_trace("Processing global/peer update from %s", originator); } return TRUE; } host = crm_element_value(request, F_CIB_HOST); if (host != NULL && safe_str_eq(host, cib_our_uname)) { crm_trace("Processing request sent to us from %s", originator); return TRUE; } else if (host == NULL && cib_is_master == TRUE) { crm_trace("Processing request sent to master instance from %s", originator); return TRUE; } op = crm_element_value(request, F_CIB_OPERATION); if(safe_str_eq(op, "cib_shutdown_req")) { /* Always process these */ *local_notify = FALSE; if(reply_to == NULL || is_reply) { *process = TRUE; } if(is_reply) { *needs_reply = FALSE; } return *process; } if (is_reply) { crm_trace("Forward reply sent from %s to local clients", originator); *process = FALSE; *needs_reply = FALSE; *local_notify = TRUE; return TRUE; } delegated = crm_element_value(request, F_CIB_DELEGATED); if (delegated != NULL) { crm_trace("Ignoring msg for master instance"); } else if (host != NULL) { /* this is for a specific instance and we're not it */ crm_trace("Ignoring msg for instance on %s", crm_str(host)); } else if (reply_to == NULL && cib_is_master == FALSE) { /* this is for the master instance and we're not it */ crm_trace("Ignoring reply to %s", crm_str(reply_to)); } else if (safe_str_eq(op, "cib_shutdown_req")) { if (reply_to != NULL) { crm_debug("Processing %s from %s", op, host); *needs_reply = FALSE; } else { crm_debug("Processing %s reply from %s", op, host); } return TRUE; } else { crm_err("Nothing for us to do?"); crm_log_xml_err(request, "Peer[inbound]"); } return FALSE; } static void forward_request(xmlNode * request, cib_client_t * cib_client, int call_options) { const char *op = crm_element_value(request, F_CIB_OPERATION); const char *host = crm_element_value(request, F_CIB_HOST); crm_xml_add(request, F_CIB_DELEGATED, cib_our_uname); if (host != NULL) { crm_trace("Forwarding %s op to %s", op, host); send_cluster_message(host, crm_msg_cib, request, FALSE); } else { crm_trace("Forwarding %s op to master instance", op); send_cluster_message(NULL, crm_msg_cib, request, FALSE); } /* Return the request to its original state */ xml_remove_prop(request, F_CIB_DELEGATED); if (call_options & cib_discard_reply) { crm_trace("Client not interested in reply"); } } static void send_peer_reply(xmlNode * msg, xmlNode * result_diff, const char *originator, gboolean broadcast) { CRM_ASSERT(msg != NULL); if (broadcast) { /* this (successful) call modified the CIB _and_ the * change needs to be broadcast... * send via HA to other nodes */ int diff_add_updates = 0; int diff_add_epoch = 0; int diff_add_admin_epoch = 0; int diff_del_updates = 0; int diff_del_epoch = 0; int diff_del_admin_epoch = 0; char *digest = NULL; cib_diff_version_details(result_diff, &diff_add_admin_epoch, &diff_add_epoch, &diff_add_updates, &diff_del_admin_epoch, &diff_del_epoch, &diff_del_updates); crm_trace("Sending update diff %d.%d.%d -> %d.%d.%d", diff_del_admin_epoch, diff_del_epoch, diff_del_updates, diff_add_admin_epoch, diff_add_epoch, diff_add_updates); crm_xml_add(msg, F_CIB_ISREPLY, originator); crm_xml_add(msg, F_CIB_GLOBAL_UPDATE, XML_BOOLEAN_TRUE); crm_xml_add(msg, F_CIB_OPERATION, CIB_OP_APPLY_DIFF); /* Its safe to always use the latest version since the election * ensures the software on this node is the oldest node in the cluster */ digest = calculate_xml_versioned_digest(the_cib, FALSE, TRUE, CRM_FEATURE_SET); crm_xml_add(result_diff, XML_ATTR_DIGEST, digest); crm_log_xml_trace(the_cib, digest); crm_free(digest); add_message_xml(msg, F_CIB_UPDATE_DIFF, result_diff); crm_log_xml_trace(msg, "copy"); send_cluster_message(NULL, crm_msg_cib, msg, TRUE); } else if (originator != NULL) { /* send reply via HA to originating node */ crm_trace("Sending request result to originator only"); crm_xml_add(msg, F_CIB_ISREPLY, originator); send_cluster_message(originator, crm_msg_cib, msg, FALSE); } } void cib_process_request(xmlNode * request, gboolean force_synchronous, gboolean privileged, gboolean from_peer, cib_client_t * cib_client) { int call_type = 0; int call_options = 0; gboolean process = TRUE; gboolean is_update = TRUE; gboolean needs_reply = TRUE; gboolean local_notify = FALSE; gboolean needs_forward = FALSE; gboolean global_update = crm_is_true(crm_element_value(request, F_CIB_GLOBAL_UPDATE)); xmlNode *op_reply = NULL; xmlNode *result_diff = NULL; enum cib_errors rc = cib_ok; const char *op = crm_element_value(request, F_CIB_OPERATION); const char *originator = crm_element_value(request, F_ORIG); const char *host = crm_element_value(request, F_CIB_HOST); crm_trace("%s Processing msg %s", cib_our_uname, crm_element_value(request, F_SEQ)); cib_num_ops++; if (cib_num_ops == 0) { cib_num_fail = 0; cib_num_local = 0; cib_num_updates = 0; crm_info("Stats wrapped around"); } if (host != NULL && strlen(host) == 0) { host = NULL; } crm_element_value_int(request, F_CIB_CALLOPTS, &call_options); if (force_synchronous) { call_options |= cib_sync_call; } crm_trace("Processing %s message (%s) for %s...", from_peer ? "peer" : "local", from_peer ? originator : cib_our_uname, host ? host : "master"); rc = cib_get_operation_id(op, &call_type); if (rc != cib_ok) { /* TODO: construct error reply? */ crm_err("Pre-processing of command failed: %s", cib_error2string(rc)); return; } is_update = cib_op_modifies(call_type); if (is_update) { cib_num_updates++; } if (from_peer == FALSE) { parse_local_options(cib_client, call_type, call_options, host, op, &local_notify, &needs_reply, &process, &needs_forward); } else if (parse_peer_options(call_type, request, &local_notify, &needs_reply, &process, &needs_forward) == FALSE) { return; } crm_trace("Finished determining processing actions"); if (call_options & cib_discard_reply) { needs_reply = is_update; local_notify = FALSE; } if (needs_forward) { forward_request(request, cib_client, call_options); return; } if (cib_status != cib_ok) { rc = cib_status; crm_err("Operation ignored, cluster configuration is invalid." " Please repair and restart: %s", cib_error2string(cib_status)); op_reply = cib_construct_reply(request, the_cib, cib_status); } else if (process) { int level = LOG_INFO; const char *section = crm_element_value(request, F_CIB_SECTION); cib_num_local++; rc = cib_process_command(request, &op_reply, &result_diff, privileged); if (global_update) { switch (rc) { case cib_ok: case cib_old_data: case cib_diff_resync: case cib_diff_failed: level = LOG_DEBUG_2; break; default: level = LOG_ERR; } } else if (safe_str_eq(op, CIB_OP_QUERY)) { level = LOG_DEBUG_2; } else if (rc != cib_ok) { cib_num_fail++; level = LOG_WARNING; } else if (safe_str_eq(op, CIB_OP_SLAVE)) { level = LOG_DEBUG_2; } else if (safe_str_eq(section, XML_CIB_TAG_STATUS)) { level = LOG_DEBUG_2; } if (get_crm_log_level() >= level) { /* Avoid all the xml lookups if we're not going to print the results */ do_crm_log(level, "Operation complete: op %s for section %s (origin=%s/%s/%s, version=%s.%s.%s): %s (rc=%d)", op, section ? section : "'all'", originator ? originator : "local", crm_element_value(request, F_CIB_CLIENTNAME), crm_element_value(request, F_CIB_CALLID), the_cib ? crm_element_value(the_cib, XML_ATTR_GENERATION_ADMIN) : "0", the_cib ? crm_element_value(the_cib, XML_ATTR_GENERATION) : "0", the_cib ? crm_element_value(the_cib, XML_ATTR_NUMUPDATES) : "0", cib_error2string(rc), rc); } if (op_reply == NULL && (needs_reply || local_notify)) { crm_err("Unexpected NULL reply to message"); crm_log_xml_err(request, "null reply"); needs_reply = FALSE; local_notify = FALSE; } } - crm_trace("processing response cases"); + crm_trace("processing response cases %.16x %.16x", call_options, cib_sync_call); if (local_notify) { const char *client_id = crm_element_value(request, F_CIB_CLIENTID); if (client_id && process == FALSE) { do_local_notify(request, client_id, call_options & cib_sync_call, from_peer); } else if (client_id) { do_local_notify(op_reply, client_id, call_options & cib_sync_call, from_peer); } } /* from now on we are the server */ if (needs_reply == FALSE || stand_alone) { /* nothing more to do... * this was a non-originating slave update */ crm_trace("Completed slave update"); } else if (rc == cib_ok && result_diff != NULL && !(call_options & cib_inhibit_bcast)) { send_peer_reply(request, result_diff, originator, TRUE); } else if (call_options & cib_discard_reply) { crm_trace("Caller isn't interested in reply"); } else if (from_peer) { if (is_update == FALSE || result_diff == NULL) { crm_trace("Request not broadcast: R/O call"); } else if (call_options & cib_inhibit_bcast) { crm_trace("Request not broadcast: inhibited"); } else if (rc != cib_ok) { crm_trace("Request not broadcast: call failed: %s", cib_error2string(rc)); } else { crm_trace("Directing reply to %s", originator); } send_peer_reply(op_reply, result_diff, originator, FALSE); } free_xml(op_reply); free_xml(result_diff); return; } xmlNode * cib_construct_reply(xmlNode * request, xmlNode * output, int rc) { int lpc = 0; xmlNode *reply = NULL; const char *name = NULL; const char *value = NULL; const char *names[] = { F_CIB_OPERATION, F_CIB_CALLID, F_CIB_CLIENTID, F_CIB_CALLOPTS }; static int max = DIMOF(names); crm_trace("Creating a basic reply"); reply = create_xml_node(NULL, "cib-reply"); crm_xml_add(reply, F_TYPE, T_CIB); for (lpc = 0; lpc < max; lpc++) { name = names[lpc]; value = crm_element_value(request, name); crm_xml_add(reply, name, value); } crm_xml_add_int(reply, F_CIB_RC, rc); if (output != NULL) { crm_trace("Attaching reply output"); add_message_xml(reply, F_CIB_CALLDATA, output); } return reply; } enum cib_errors cib_process_command(xmlNode * request, xmlNode ** reply, xmlNode ** cib_diff, gboolean privileged) { xmlNode *input = NULL; xmlNode *output = NULL; xmlNode *result_cib = NULL; xmlNode *current_cib = NULL; #if ENABLE_ACL xmlNode *filtered_current_cib = NULL; #endif int call_type = 0; int call_options = 0; int log_level = LOG_DEBUG_4; const char *op = NULL; const char *section = NULL; enum cib_errors rc = cib_ok; enum cib_errors rc2 = cib_ok; gboolean send_r_notify = FALSE; gboolean global_update = FALSE; gboolean config_changed = FALSE; gboolean manage_counters = TRUE; CRM_ASSERT(cib_status == cib_ok); *reply = NULL; *cib_diff = NULL; current_cib = the_cib; /* Start processing the request... */ op = crm_element_value(request, F_CIB_OPERATION); crm_element_value_int(request, F_CIB_CALLOPTS, &call_options); rc = cib_get_operation_id(op, &call_type); if (rc == cib_ok && privileged == FALSE) { rc = cib_op_can_run(call_type, call_options, privileged, global_update); } rc2 = cib_op_prepare(call_type, request, &input, §ion); if (rc == cib_ok) { rc = rc2; } if (rc != cib_ok) { crm_trace("Call setup failed: %s", cib_error2string(rc)); goto done; } else if (cib_op_modifies(call_type) == FALSE) { #if ENABLE_ACL if (acl_enabled(config_hash) == FALSE || acl_filter_cib(request, current_cib, current_cib, &filtered_current_cib) == FALSE) { rc = cib_perform_op(op, call_options, cib_op_func(call_type), TRUE, section, request, input, FALSE, &config_changed, current_cib, &result_cib, NULL, &output); } else if (filtered_current_cib == NULL) { crm_debug("Pre-filtered the entire cib"); rc = cib_permission_denied; } else { crm_debug("Pre-filtered the queried cib according to the ACLs"); rc = cib_perform_op(op, call_options, cib_op_func(call_type), TRUE, section, request, input, FALSE, &config_changed, filtered_current_cib, &result_cib, NULL, &output); } #else rc = cib_perform_op(op, call_options, cib_op_func(call_type), TRUE, section, request, input, FALSE, &config_changed, current_cib, &result_cib, NULL, &output); #endif CRM_CHECK(result_cib == NULL, free_xml(result_cib)); goto done; } /* Handle a valid write action */ global_update = crm_is_true(crm_element_value(request, F_CIB_GLOBAL_UPDATE)); if (global_update) { manage_counters = FALSE; call_options |= cib_force_diff; CRM_CHECK(call_type == 3 || call_type == 4, crm_err("Call type: %d", call_type); crm_log_xml_err(request, "bad op")); } #ifdef SUPPORT_PRENOTIFY if ((call_options & cib_inhibit_notify) == 0) { cib_pre_notify(call_options, op, the_cib, input); } #endif if (rc == cib_ok) { if (call_options & cib_inhibit_bcast) { /* skip */ crm_trace("Skipping update: inhibit broadcast"); manage_counters = FALSE; } rc = cib_perform_op(op, call_options, cib_op_func(call_type), FALSE, section, request, input, manage_counters, &config_changed, current_cib, &result_cib, cib_diff, &output); #if ENABLE_ACL if (acl_enabled(config_hash) == TRUE && acl_check_diff(request, current_cib, result_cib, *cib_diff) == FALSE) { rc = cib_permission_denied; } #endif if (rc == cib_ok && config_changed) { time_t now; char *now_str = NULL; const char *validation = crm_element_value(result_cib, XML_ATTR_VALIDATION); if (validation) { int current_version = get_schema_version(validation); int support_version = get_schema_version("pacemaker-1.1"); /* Once the later schemas support the "update-*" attributes, change "==" to ">=" -- Changed */ if (current_version >= support_version) { const char *origin = crm_element_value(request, F_ORIG); crm_xml_replace(result_cib, XML_ATTR_UPDATE_ORIG, origin ? origin : cib_our_uname); crm_xml_replace(result_cib, XML_ATTR_UPDATE_CLIENT, crm_element_value(request, F_CIB_CLIENTNAME)); #if ENABLE_ACL crm_xml_replace(result_cib, XML_ATTR_UPDATE_USER, crm_element_value(request, F_CIB_USER)); #endif } } now = time(NULL); now_str = ctime(&now); now_str[24] = EOS; /* replace the newline */ crm_xml_replace(result_cib, XML_CIB_ATTR_WRITTEN, now_str); } if (manage_counters == FALSE) { config_changed = cib_config_changed(current_cib, result_cib, cib_diff); } /* Always write to disk for replace ops, * this negates the need to detect ordering changes */ if (config_changed == FALSE && crm_str_eq(CIB_OP_REPLACE, op, TRUE)) { config_changed = TRUE; } } if (rc == cib_ok && (call_options & cib_dryrun) == 0) { rc = activateCibXml(result_cib, config_changed, op); if (rc == cib_ok && cib_internal_config_changed(*cib_diff)) { cib_read_config(config_hash, result_cib); } if (crm_str_eq(CIB_OP_REPLACE, op, TRUE)) { if (section == NULL) { send_r_notify = TRUE; } else if (safe_str_eq(section, XML_TAG_CIB)) { send_r_notify = TRUE; } else if (safe_str_eq(section, XML_CIB_TAG_NODES)) { send_r_notify = TRUE; } else if (safe_str_eq(section, XML_CIB_TAG_STATUS)) { send_r_notify = TRUE; } } else if (crm_str_eq(CIB_OP_ERASE, op, TRUE)) { send_r_notify = TRUE; } } else if (rc == cib_dtd_validation) { if (output != NULL) { crm_log_xml_info(output, "cib:output"); free_xml(output); } #if ENABLE_ACL { xmlNode *filtered_result_cib = NULL; if (acl_enabled(config_hash) == FALSE || acl_filter_cib(request, current_cib, result_cib, &filtered_result_cib) == FALSE) { output = result_cib; } else { crm_debug("Filtered the result cib for output according to the ACLs"); output = filtered_result_cib; if (result_cib != NULL) { free_xml(result_cib); } } } #else output = result_cib; #endif } else { free_xml(result_cib); } if ((call_options & cib_inhibit_notify) == 0) { const char *call_id = crm_element_value(request, F_CIB_CALLID); const char *client = crm_element_value(request, F_CIB_CLIENTNAME); #ifdef SUPPORT_POSTNOTIFY cib_post_notify(call_options, op, input, rc, the_cib); #endif cib_diff_notify(call_options, client, call_id, op, input, rc, *cib_diff); } if (send_r_notify) { const char *origin = crm_element_value(request, F_ORIG); cib_replace_notify(origin, the_cib, rc, *cib_diff); } if (rc != cib_ok) { log_level = LOG_DEBUG_4; if (rc == cib_dtd_validation && global_update) { log_level = LOG_WARNING; crm_log_xml_info(input, "cib:global_update"); } } else if (config_changed) { log_level = LOG_DEBUG_3; if (cib_is_master) { log_level = LOG_INFO; } } else if (cib_is_master) { log_level = LOG_DEBUG_2; } log_xml_diff(log_level, *cib_diff, "cib:diff"); done: if ((call_options & cib_discard_reply) == 0) { *reply = cib_construct_reply(request, output, rc); crm_log_xml_trace(*reply, "cib:reply"); } #if ENABLE_ACL if (filtered_current_cib != NULL) { free_xml(filtered_current_cib); } #endif if (call_type >= 0) { cib_op_cleanup(call_type, call_options, &input, &output); } return rc; } -int -send_via_callback_channel(xmlNode * msg, const char *token) -{ - cib_client_t *hash_client = NULL; - enum cib_errors rc = cib_ok; - - crm_trace("Delivering msg %p to client %s", msg, token); - - if (token == NULL) { - crm_err("No client id token, cant send message"); - if (rc == cib_ok) { - rc = cib_missing; - } - - } else if (msg == NULL) { - crm_err("No message to send"); - rc = cib_reply_failed; - - } else { - /* A client that left before we could reply is not really - * _our_ error. Warn instead. - */ - hash_client = g_hash_table_lookup(client_list, token); - if (hash_client == NULL) { - crm_warn("Cannot find client for token %s", token); - rc = cib_client_gone; - - } else if (crm_str_eq(hash_client->channel_name, "remote", FALSE)) { - /* just hope it's alive */ - - } else if (hash_client->channel == NULL) { - crm_err("Cannot find channel for client %s", token); - rc = cib_client_corrupt; - } - } - - if (rc == cib_ok) { - crm_trace("Delivering reply to client %s (%s)", token, hash_client->channel_name); - if (crm_str_eq(hash_client->channel_name, "remote", FALSE)) { - cib_send_remote_msg(hash_client->channel, msg, hash_client->encrypted); - - } else if (send_ipc_message(hash_client->channel, msg) == FALSE) { - crm_warn("Delivery of reply to client %s/%s failed", hash_client->name, token); - rc = cib_reply_failed; - } - } - - return rc; -} - gint cib_GCompareFunc(gconstpointer a, gconstpointer b) { const xmlNode *a_msg = a; const xmlNode *b_msg = b; int msg_a_id = 0; int msg_b_id = 0; const char *value = NULL; value = crm_element_value_const(a_msg, F_CIB_CALLID); msg_a_id = crm_parse_int(value, NULL); value = crm_element_value_const(b_msg, F_CIB_CALLID); msg_b_id = crm_parse_int(value, NULL); if (msg_a_id == msg_b_id) { return 0; } else if (msg_a_id < msg_b_id) { return -1; } return 1; } -gboolean -cib_process_disconnect(IPC_Channel * channel, cib_client_t * cib_client) -{ - /* TODO: Move all this into cib_ipc_connection_destroy() and re-use with cib_remote_connection_destroy() */ - if (channel == NULL) { - CRM_LOG_ASSERT(cib_client == NULL); - - } else if (cib_client == NULL) { - crm_err("No client"); - - } else { - CRM_LOG_ASSERT(channel->ch_status != IPC_CONNECT); - crm_trace("Cleaning up after client disconnect: %s/%s/%s", - crm_str(cib_client->name), cib_client->channel_name, cib_client->id); - - if (cib_client->id != NULL) { - if (!g_hash_table_remove(client_list, cib_client->id)) { - crm_err("Client %s not found in the hashtable", cib_client->name); - } - } - } - - if (cib_shutdown_flag && g_hash_table_size(client_list) == 0) { - crm_info("All clients disconnected..."); - initiate_exit(); - } - - return FALSE; -} - void cib_ha_peer_callback(HA_Message * msg, void *private_data) { xmlNode *xml = convert_ha_message(NULL, msg, __FUNCTION__); cib_peer_callback(xml, private_data); free_xml(xml); } void cib_peer_callback(xmlNode * msg, void *private_data) { const char *reason = NULL; const char *originator = crm_element_value(msg, F_ORIG); if (originator == NULL || crm_str_eq(originator, cib_our_uname, TRUE)) { /* message is from ourselves */ return; } else if (crm_peer_cache == NULL) { reason = "membership not established"; goto bail; } if (crm_element_value(msg, F_CIB_CLIENTNAME) == NULL) { crm_xml_add(msg, F_CIB_CLIENTNAME, originator); } /* crm_log_xml_trace("Peer[inbound]", msg); */ cib_process_request(msg, FALSE, TRUE, TRUE, NULL); return; bail: if (reason) { const char *seq = crm_element_value(msg, F_SEQ); const char *op = crm_element_value(msg, F_CIB_OPERATION); crm_warn("Discarding %s message (%s) from %s: %s", op, seq, originator, reason); } } void cib_client_status_callback(const char *node, const char *client, const char *status, void *private) { crm_node_t *peer = NULL; if (safe_str_eq(client, CRM_SYSTEM_CIB)) { crm_info("Status update: Client %s/%s now has status [%s]", node, client, status); if (safe_str_eq(status, JOINSTATUS)) { status = ONLINESTATUS; } else if (safe_str_eq(status, LEAVESTATUS)) { status = OFFLINESTATUS; } peer = crm_get_peer(0, node); crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cib, status); } return; } #if SUPPORT_HEARTBEAT extern oc_ev_t *cib_ev_token; static void *ccm_library = NULL; int (*ccm_api_callback_done) (void *cookie) = NULL; int (*ccm_api_handle_event) (const oc_ev_t * token) = NULL; gboolean cib_ccm_dispatch(int fd, gpointer user_data) { int rc = 0; oc_ev_t *ccm_token = (oc_ev_t *) user_data; crm_trace("received callback"); if (ccm_api_handle_event == NULL) { ccm_api_handle_event = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_handle_event"); } rc = (*ccm_api_handle_event) (ccm_token); if (0 == rc) { return TRUE; } crm_err("CCM connection appears to have failed: rc=%d.", rc); /* eventually it might be nice to recover and reconnect... but until then... */ crm_err("Exiting to recover from CCM connection failure"); exit(2); return FALSE; } int current_instance = 0; void cib_ccm_msg_callback(oc_ed_t event, void *cookie, size_t size, const void *data) { gboolean update_id = FALSE; const oc_ev_membership_t *membership = data; CRM_ASSERT(membership != NULL); crm_info("Processing CCM event=%s (id=%d)", ccm_event_name(event), membership->m_instance); if (current_instance > membership->m_instance) { crm_err("Membership instance ID went backwards! %d->%d", current_instance, membership->m_instance); CRM_ASSERT(current_instance <= membership->m_instance); } switch (event) { case OC_EV_MS_NEW_MEMBERSHIP: case OC_EV_MS_INVALID: update_id = TRUE; break; case OC_EV_MS_PRIMARY_RESTORED: update_id = TRUE; break; case OC_EV_MS_NOT_PRIMARY: crm_trace("Ignoring transitional CCM event: %s", ccm_event_name(event)); break; case OC_EV_MS_EVICTED: crm_err("Evicted from CCM: %s", ccm_event_name(event)); break; default: crm_err("Unknown CCM event: %d", event); } if (update_id) { unsigned int lpc = 0; CRM_CHECK(membership != NULL, return); current_instance = membership->m_instance; for (lpc = 0; lpc < membership->m_n_out; lpc++) { crm_update_ccm_node(membership, lpc + membership->m_out_idx, CRM_NODE_LOST, current_instance); } for (lpc = 0; lpc < membership->m_n_member; lpc++) { crm_update_ccm_node(membership, lpc + membership->m_memb_idx, CRM_NODE_ACTIVE, current_instance); } } if (ccm_api_callback_done == NULL) { ccm_api_callback_done = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_callback_done"); } (*ccm_api_callback_done) (cookie); return; } #endif gboolean can_write(int flags) { return TRUE; } static gboolean cib_force_exit(gpointer data) { crm_notice("Forcing exit!"); terminate_cib(__FUNCTION__, TRUE); return FALSE; } +static void +disconnect_remote_client(gpointer key, gpointer value, gpointer user_data) +{ + cib_client_t *a_client = value; + crm_err("Disconnecting %s... Not implemented", crm_str(a_client->name)); +} + +void +cib_shutdown(int nsig) +{ + struct qb_ipcs_stats srv_stats; + if (cib_shutdown_flag == FALSE) { + int disconnects = 0; + qb_ipcs_connection_t *c = NULL; + + cib_shutdown_flag = TRUE; + + for(c = qb_ipcs_connection_first_get(ipcs_rw); c != NULL; c = qb_ipcs_connection_next_get(ipcs_rw, c)) { + crm_debug("Disconnecting r/w client %p...", c); + qb_ipcs_disconnect(c); + disconnects++; + } + + for(c = qb_ipcs_connection_first_get(ipcs_ro); c != NULL; c = qb_ipcs_connection_next_get(ipcs_ro, c)) { + crm_debug("Disconnecting r/o client %p...", c); + qb_ipcs_disconnect(c); + disconnects++; + } + + disconnects += g_hash_table_size(client_list); + + crm_debug("Disconnecting %d remote clients", g_hash_table_size(client_list)); + g_hash_table_foreach(client_list, disconnect_remote_client, NULL); + crm_info("Disconnected %d clients", disconnects); + } + + qb_ipcs_stats_get(ipcs_rw, &srv_stats, QB_FALSE); + + if(g_hash_table_size(client_list) == 0) { + crm_info("All clients disconnected (%d)", srv_stats.active_connections); + initiate_exit(); + + } else { + crm_info("Waiting on %d clients to disconnect (%d)", g_hash_table_size(client_list), srv_stats.active_connections); + } +} + void initiate_exit(void) { int active = 0; xmlNode *leaving = NULL; active = crm_active_peers(); if (active < 2) { terminate_cib(__FUNCTION__, FALSE); return; } crm_info("Sending disconnect notification to %d peers...", active); leaving = create_xml_node(NULL, "exit-notification"); crm_xml_add(leaving, F_TYPE, "cib"); crm_xml_add(leaving, F_CIB_OPERATION, "cib_shutdown_req"); send_cluster_message(NULL, crm_msg_cib, leaving, TRUE); free_xml(leaving); g_timeout_add(crm_get_msec("5s"), cib_force_exit, NULL); } extern int remote_fd; extern int remote_tls_fd; extern void terminate_ais_connection(void); void terminate_cib(const char *caller, gboolean fast) { if (remote_fd > 0) { close(remote_fd); } if (remote_tls_fd > 0) { close(remote_tls_fd); } if(!fast) { if(is_heartbeat_cluster()) { #if SUPPORT_HEARTBEAT if (hb_conn != NULL) { crm_info("%s: Disconnecting heartbeat", caller); hb_conn->llc_ops->signoff(hb_conn, FALSE); } else { crm_err("%s: No heartbeat connection", caller); } #endif } else { #if SUPPORT_COROSYNC crm_info("%s: Disconnecting corosync", caller); terminate_ais_connection(); #endif } } uninitializeCib(); crm_info("%s: Exiting...", caller); if (fast) { exit(LSB_EXIT_GENERIC); } else if(mainloop != NULL && g_main_is_running(mainloop)) { g_main_quit(mainloop); } else { exit(LSB_EXIT_OK); } } diff --git a/cib/callbacks.h b/cib/callbacks.h index deca4e5458..33fdf9328b 100644 --- a/cib/callbacks.h +++ b/cib/callbacks.h @@ -1,80 +1,95 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include +#ifdef HAVE_GNUTLS_GNUTLS_H +# undef KEYFILE +# include +#endif + + extern gboolean cib_is_master; extern GHashTable *client_list; extern GHashTable *peer_hash; extern GHashTable *config_hash; typedef struct cib_client_s { char *id; char *name; char *callback_id; char *user; - const char *channel_name; + qb_ipcs_connection_t *ipc; - IPC_Channel *channel; - GCHSource *source; +#ifdef HAVE_GNUTLS_GNUTLS_H + gnutls_session *session; +#else + void *session; +#endif gboolean encrypted; + GFDSource *remote; + unsigned long num_calls; int pre_notify; int post_notify; int confirmations; int replace; int diffs; GList *delegated_calls; } cib_client_t; typedef struct cib_operation_s { const char *operation; gboolean modifies_cib; gboolean needs_privileges; gboolean needs_quorum; enum cib_errors (*prepare) (xmlNode *, xmlNode **, const char **); enum cib_errors (*cleanup) (int, xmlNode **, xmlNode **); enum cib_errors (*fn) (const char *, int, const char *, xmlNode *, xmlNode *, xmlNode *, xmlNode **, xmlNode **); } cib_operation_t; -extern gboolean cib_client_connect(IPC_Channel * channel, gpointer user_data); -extern gboolean cib_null_callback(IPC_Channel * channel, gpointer user_data); -extern gboolean cib_rw_callback(IPC_Channel * channel, gpointer user_data); -extern gboolean cib_ro_callback(IPC_Channel * channel, gpointer user_data); +extern struct qb_ipcs_service_handlers ipc_ro_callbacks; +extern struct qb_ipcs_service_handlers ipc_rw_callbacks; +extern qb_ipcs_service_t *ipcs_ro; +extern qb_ipcs_service_t *ipcs_rw; extern void cib_ha_peer_callback(HA_Message * msg, void *private_data); extern void cib_peer_callback(xmlNode * msg, void *private_data); extern void cib_client_status_callback(const char *node, const char *client, const char *status, void *private); +extern void cib_common_callback_worker(xmlNode * op_request, cib_client_t * cib_client, gboolean privileged); + +void cib_shutdown(int nsig); +void initiate_exit(void); #if SUPPORT_HEARTBEAT extern gboolean cib_ccm_dispatch(int fd, gpointer user_data); extern void cib_ccm_msg_callback(oc_ed_t event, void *cookie, size_t size, const void *data); #endif diff --git a/cib/main.c b/cib/main.c index d4bcd34ddf..5ba19fd290 100644 --- a/cib/main.c +++ b/cib/main.c @@ -1,708 +1,652 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #if HAVE_LIBXML2 # include #endif #ifdef HAVE_GETOPT_H # include #endif #if HAVE_BZLIB_H # include #endif extern int init_remote_listener(int port, gboolean encrypted); extern gboolean stand_alone; gboolean cib_shutdown_flag = FALSE; enum cib_errors cib_status = cib_ok; #if SUPPORT_HEARTBEAT oc_ev_t *cib_ev_token; ll_cluster_t *hb_conn = NULL; extern void oc_ev_special(const oc_ev_t *, oc_ev_class_t, int); gboolean cib_register_ha(ll_cluster_t * hb_cluster, const char *client_name); #endif extern void terminate_cib(const char *caller, gboolean fast); GMainLoop *mainloop = NULL; const char *cib_root = CRM_CONFIG_DIR; char *cib_our_uname = NULL; gboolean preserve_status = FALSE; gboolean cib_writes_enabled = TRUE; int remote_fd = 0; int remote_tls_fd = 0; void usage(const char *cmd, int exit_status); int cib_init(void); void cib_shutdown(int nsig); -void cib_ha_connection_destroy(gpointer user_data); gboolean startCib(const char *filename); extern int write_cib_contents(gpointer p); GTRIGSource *cib_writer = NULL; GHashTable *client_list = NULL; GHashTable *config_hash = NULL; char *channel1 = NULL; char *channel2 = NULL; char *channel3 = NULL; char *channel4 = NULL; char *channel5 = NULL; #define OPTARGS "maswr:V?" void cib_cleanup(void); static void cib_enable_writes(int nsig) { crm_info("(Re)enabling disk writes"); cib_writes_enabled = TRUE; } static void cib_diskwrite_complete(gpointer userdata, int status, int signo, int exitcode) { if (exitcode != LSB_EXIT_OK || signo != 0 || status != 0) { crm_err("Disk write failed: status=%d, signo=%d, exitcode=%d", status, signo, exitcode); if (cib_writes_enabled) { crm_err("Disabling disk writes after write failure"); cib_writes_enabled = FALSE; } } else { crm_trace("Disk write passed"); } } static void log_cib_client(gpointer key, gpointer value, gpointer user_data) { cib_client_t *a_client = value; - crm_info("Client %s/%s", crm_str(a_client->name), - crm_str(a_client->channel_name)); + crm_info("Client %s", crm_str(a_client->name)); } int main(int argc, char **argv) { int flag; int rc = 0; int argerr = 0; #ifdef HAVE_GETOPT_H int option_index = 0; /* *INDENT-OFF* */ static struct option long_options[] = { {"per-action-cib", 0, 0, 'a'}, {"stand-alone", 0, 0, 's'}, {"disk-writes", 0, 0, 'w'}, {"cib-root", 1, 0, 'r'}, {"verbose", 0, 0, 'V'}, {"help", 0, 0, '?'}, {"metadata", 0, 0, 'm'}, {0, 0, 0, 0} }; /* *INDENT-ON* */ #endif struct passwd *pwentry = NULL; crm_log_init("cib", LOG_INFO, TRUE, FALSE, 0, NULL); mainloop_add_signal(SIGTERM, cib_shutdown); mainloop_add_signal(SIGPIPE, cib_enable_writes); cib_writer = G_main_add_tempproc_trigger(G_PRIORITY_LOW, write_cib_contents, "write_cib_contents", NULL, NULL, NULL, cib_diskwrite_complete); /* EnableProcLogging(); */ set_sigchld_proctrack(G_PRIORITY_HIGH, DEFAULT_MAXDISPATCHTIME); crm_peer_init(); client_list = g_hash_table_new(crm_str_hash, g_str_equal); while (1) { #ifdef HAVE_GETOPT_H flag = getopt_long(argc, argv, OPTARGS, long_options, &option_index); #else flag = getopt(argc, argv, OPTARGS); #endif if (flag == -1) break; switch (flag) { case 'V': crm_bump_log_level(); break; case 's': stand_alone = TRUE; preserve_status = TRUE; cib_writes_enabled = FALSE; pwentry = getpwnam(CRM_DAEMON_USER); CRM_CHECK(pwentry != NULL, crm_perror(LOG_ERR, "Invalid uid (%s) specified", CRM_DAEMON_USER); return 100); rc = setgid(pwentry->pw_gid); if (rc < 0) { crm_perror(LOG_ERR, "Could not set group to %d", pwentry->pw_gid); return 100; } rc = setuid(pwentry->pw_uid); if (rc < 0) { crm_perror(LOG_ERR, "Could not set user to %d", pwentry->pw_uid); return 100; } break; case '?': /* Help message */ usage(crm_system_name, LSB_EXIT_OK); break; case 'w': cib_writes_enabled = TRUE; break; case 'r': cib_root = optarg; break; case 'm': cib_metadata(); return 0; default: ++argerr; break; } } if (argc - optind == 1 && safe_str_eq("metadata", argv[optind])) { cib_metadata(); return 0; } if (optind > argc) { ++argerr; } if (argerr) { usage(crm_system_name, LSB_EXIT_GENERIC); } if (crm_is_writable(cib_root, NULL, CRM_DAEMON_USER, CRM_DAEMON_GROUP, FALSE) == FALSE) { crm_err("Bad permissions on %s. Terminating", cib_root); fprintf(stderr, "ERROR: Bad permissions on %s. See logs for details\n", cib_root); fflush(stderr); return 100; } /* read local config file */ rc = cib_init(); CRM_CHECK(g_hash_table_size(client_list) == 0, crm_warn("Not all clients gone at exit")); g_hash_table_foreach(client_list, log_cib_client, NULL); cib_cleanup(); #if SUPPORT_HEARTBEAT if (hb_conn) { hb_conn->llc_ops->delete(hb_conn); } #endif crm_info("Done"); return rc; } void cib_cleanup(void) { crm_peer_destroy(); g_hash_table_destroy(config_hash); g_hash_table_destroy(client_list); crm_free(cib_our_uname); #if HAVE_LIBXML2 crm_xml_cleanup(); #endif crm_free(channel1); crm_free(channel2); crm_free(channel3); crm_free(channel4); crm_free(channel5); } unsigned long cib_num_ops = 0; const char *cib_stat_interval = "10min"; unsigned long cib_num_local = 0, cib_num_updates = 0, cib_num_fail = 0; unsigned long cib_bad_connects = 0, cib_num_timeouts = 0; longclock_t cib_call_time = 0; gboolean cib_stats(gpointer data); gboolean cib_stats(gpointer data) { int local_log_level = LOG_DEBUG; static unsigned long last_stat = 0; unsigned int cib_calls_ms = 0; static unsigned long cib_stat_interval_ms = 0; if (cib_stat_interval_ms == 0) { cib_stat_interval_ms = crm_get_msec(cib_stat_interval); } cib_calls_ms = longclockto_ms(cib_call_time); if ((cib_num_ops - last_stat) > 0) { unsigned long calls_diff = cib_num_ops - last_stat; double stat_1 = (1000 * cib_calls_ms) / calls_diff; local_log_level = LOG_INFO; do_crm_log(local_log_level, "Processed %lu operations" " (%.2fus average, %lu%% utilization) in the last %s", calls_diff, stat_1, (100 * cib_calls_ms) / cib_stat_interval_ms, cib_stat_interval); } crm_trace( "\tDetail: %lu operations (%ums total)" " (%lu local, %lu updates, %lu failures," " %lu timeouts, %lu bad connects)", cib_num_ops, cib_calls_ms, cib_num_local, cib_num_updates, cib_num_fail, cib_bad_connects, cib_num_timeouts); last_stat = cib_num_ops; cib_call_time = 0; return TRUE; } #if SUPPORT_HEARTBEAT gboolean ccm_connect(void); static void ccm_connection_destroy(gpointer user_data) { crm_err("CCM connection failed... blocking while we reconnect"); CRM_ASSERT(ccm_connect()); return; } static void *ccm_library = NULL; gboolean ccm_connect(void) { gboolean did_fail = TRUE; int num_ccm_fails = 0; int max_ccm_fails = 30; int ret; int cib_ev_fd; int (*ccm_api_register) (oc_ev_t ** token) = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_register"); int (*ccm_api_set_callback) (const oc_ev_t * token, oc_ev_class_t class, oc_ev_callback_t * fn, oc_ev_callback_t ** prev_fn) = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_set_callback"); void (*ccm_api_special) (const oc_ev_t *, oc_ev_class_t, int) = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_special"); int (*ccm_api_activate) (const oc_ev_t * token, int *fd) = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_activate"); int (*ccm_api_unregister) (oc_ev_t * token) = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_unregister"); while (did_fail) { did_fail = FALSE; crm_info("Registering with CCM..."); ret = (*ccm_api_register) (&cib_ev_token); if (ret != 0) { did_fail = TRUE; } if (did_fail == FALSE) { crm_trace("Setting up CCM callbacks"); ret = (*ccm_api_set_callback) (cib_ev_token, OC_EV_MEMB_CLASS, cib_ccm_msg_callback, NULL); if (ret != 0) { crm_warn("CCM callback not set"); did_fail = TRUE; } } if (did_fail == FALSE) { (*ccm_api_special) (cib_ev_token, OC_EV_MEMB_CLASS, 0); crm_trace("Activating CCM token"); ret = (*ccm_api_activate) (cib_ev_token, &cib_ev_fd); if (ret != 0) { crm_warn("CCM Activation failed"); did_fail = TRUE; } } if (did_fail) { num_ccm_fails++; (*ccm_api_unregister) (cib_ev_token); if (num_ccm_fails < max_ccm_fails) { crm_warn("CCM Connection failed %d times (%d max)", num_ccm_fails, max_ccm_fails); sleep(3); } else { crm_err("CCM Activation failed %d (max) times", num_ccm_fails); return FALSE; } } } crm_debug("CCM Activation passed... all set to go!"); G_main_add_fd(G_PRIORITY_HIGH, cib_ev_fd, FALSE, cib_ccm_dispatch, cib_ev_token, ccm_connection_destroy); return TRUE; } #endif #if SUPPORT_COROSYNC static gboolean cib_ais_dispatch(AIS_Message * wrapper, char *data, int sender) { xmlNode *xml = NULL; if (wrapper->header.id == crm_class_cluster) { xml = string2xml(data); if (xml == NULL) { goto bail; } crm_xml_add(xml, F_ORIG, wrapper->sender.uname); crm_xml_add_int(xml, F_SEQ, wrapper->id); cib_peer_callback(xml, NULL); } free_xml(xml); return TRUE; bail: crm_err("Invalid XML: '%.120s'", data); return TRUE; } static void cib_ais_destroy(gpointer user_data) { if (cib_shutdown_flag) { crm_info("Corosync disconnection complete"); } else { crm_err("Corosync connection lost! Exiting."); terminate_cib(__FUNCTION__, TRUE); } } #endif static void cib_peer_update_callback(enum crm_status_type type, crm_node_t * node, const void *data) { #if 0 /* crm_active_peers(crm_proc_cib) appears to give the wrong answer * sometimes, this might help figure out why */ if(type == crm_status_nstate) { crm_info("status: %s is now %s (was %s)", node->uname, node->state, (const char *)data); if (safe_str_eq(CRMD_STATE_ACTIVE, node->state)) { return; } } else if(type == crm_status_processes) { uint32_t old = 0; if (data) { old = *(const uint32_t *)data; } if ((node->processes ^ old) & crm_proc_cib) { crm_info("status: cib process on %s is now %sactive", node->uname, is_set(node->processes, crm_proc_cib)?"":"in"); } else { return; } } else { return; } #endif if(cib_shutdown_flag && crm_active_peers() < 2 && g_hash_table_size(client_list) == 0) { crm_info("No more peers"); terminate_cib(__FUNCTION__, FALSE); } } +static void +cib_ha_connection_destroy(gpointer user_data) +{ + if (cib_shutdown_flag) { + crm_info("Heartbeat disconnection complete... exiting"); + terminate_cib(__FUNCTION__, FALSE); + } else { + crm_err("Heartbeat connection lost! Exiting."); + terminate_cib(__FUNCTION__, TRUE); + } +} + int cib_init(void) { gboolean was_error = FALSE; config_hash = g_hash_table_new_full(crm_str_hash, g_str_equal, g_hash_destroy_str, g_hash_destroy_str); if (startCib("cib.xml") == FALSE) { crm_crit("Cannot start CIB... terminating"); exit(1); } if (stand_alone == FALSE) { void *dispatch = cib_ha_peer_callback; void *destroy = cib_ha_connection_destroy; if (is_openais_cluster()) { #if SUPPORT_COROSYNC destroy = cib_ais_destroy; dispatch = cib_ais_dispatch; #endif } if (crm_cluster_connect(&cib_our_uname, NULL, dispatch, destroy, #if SUPPORT_HEARTBEAT &hb_conn #else NULL #endif ) == FALSE) { crm_crit("Cannot sign in to the cluster... terminating"); exit(100); } if (is_openais_cluster()) { crm_set_status_callback(&cib_peer_update_callback); } #if SUPPORT_HEARTBEAT if (is_heartbeat_cluster()) { if (was_error == FALSE) { if (HA_OK != hb_conn->llc_ops->set_cstatus_callback(hb_conn, cib_client_status_callback, hb_conn)) { crm_err("Cannot set cstatus callback: %s", hb_conn->llc_ops->errmsg(hb_conn)); was_error = TRUE; } } if (was_error == FALSE) { was_error = (ccm_connect() == FALSE); } if (was_error == FALSE) { /* Async get client status information in the cluster */ crm_info("Requesting the list of configured nodes"); hb_conn->llc_ops->client_status(hb_conn, NULL, CRM_SYSTEM_CIB, -1); } } #endif } else { cib_our_uname = crm_strdup("localhost"); } - channel1 = crm_strdup(cib_channel_callback); - was_error = init_server_ipc_comms(channel1, cib_client_connect, default_ipc_connection_destroy); - - channel2 = crm_strdup(cib_channel_ro); - was_error = was_error || init_server_ipc_comms(channel2, cib_client_connect, - default_ipc_connection_destroy); - - channel3 = crm_strdup(cib_channel_rw); - was_error = was_error || init_server_ipc_comms(channel3, cib_client_connect, - default_ipc_connection_destroy); + ipcs_ro = mainloop_add_ipc_server(cib_channel_ro, QB_IPC_SOCKET, &ipc_ro_callbacks); + ipcs_rw = mainloop_add_ipc_server(cib_channel_rw, QB_IPC_SOCKET, &ipc_rw_callbacks); if (stand_alone) { if (was_error) { crm_err("Couldnt start"); return 1; } cib_is_master = TRUE; /* Create the mainloop and run it... */ mainloop = g_main_new(FALSE); crm_info("Starting %s mainloop", crm_system_name); g_main_run(mainloop); return 0; } if (was_error == FALSE) { /* Create the mainloop and run it... */ mainloop = g_main_new(FALSE); crm_info("Starting %s mainloop", crm_system_name); g_timeout_add(crm_get_msec(cib_stat_interval), cib_stats, NULL); g_main_run(mainloop); } else { crm_err("Couldnt start all communication channels, exiting."); } return 0; } void usage(const char *cmd, int exit_status) { FILE *stream; stream = exit_status ? stderr : stdout; fprintf(stream, "usage: %s [-%s]\n", cmd, OPTARGS); fprintf(stream, "\t--%s (-%c)\t\tTurn on debug info." " Additional instances increase verbosity\n", "verbose", 'V'); fprintf(stream, "\t--%s (-%c)\t\tThis help message\n", "help", '?'); fprintf(stream, "\t--%s (-%c)\t\tShow configurable cib options\n", "metadata", 'm'); fprintf(stream, "\t--%s (-%c)\tAdvanced use only\n", "per-action-cib", 'a'); fprintf(stream, "\t--%s (-%c)\tAdvanced use only\n", "stand-alone", 's'); fprintf(stream, "\t--%s (-%c)\tAdvanced use only\n", "disk-writes", 'w'); fprintf(stream, "\t--%s (-%c)\t\tAdvanced use only\n", "cib-root", 'r'); fflush(stream); exit(exit_status); } -void -cib_ha_connection_destroy(gpointer user_data) -{ - if (cib_shutdown_flag) { - crm_info("Heartbeat disconnection complete... exiting"); - terminate_cib(__FUNCTION__, FALSE); - } else { - crm_err("Heartbeat connection lost! Exiting."); - terminate_cib(__FUNCTION__, TRUE); - } -} - -static void -disconnect_cib_client(gpointer key, gpointer value, gpointer user_data) -{ - cib_client_t *a_client = value; - - crm_trace("Processing client %s/%s... send=%d, recv=%d", - crm_str(a_client->name), crm_str(a_client->channel_name), - (int)a_client->channel->send_queue->current_qlen, - (int)a_client->channel->recv_queue->current_qlen); - - if (a_client->channel->ch_status == IPC_CONNECT) { - a_client->channel->ops->resume_io(a_client->channel); - if (a_client->channel->send_queue->current_qlen != 0 - || a_client->channel->recv_queue->current_qlen != 0) { - crm_info("Flushed messages to/from %s/%s... send=%d, recv=%d", - crm_str(a_client->name), - crm_str(a_client->channel_name), - (int)a_client->channel->send_queue->current_qlen, - (int)a_client->channel->recv_queue->current_qlen); - } - } - - if (a_client->channel->ch_status == IPC_CONNECT) { - crm_warn("Disconnecting %s/%s...", - crm_str(a_client->name), crm_str(a_client->channel_name)); - a_client->channel->ops->disconnect(a_client->channel); - } -} - -extern gboolean cib_process_disconnect(IPC_Channel * channel, cib_client_t * cib_client); - -void -cib_shutdown(int nsig) -{ - if (cib_shutdown_flag == FALSE) { - cib_shutdown_flag = TRUE; - crm_debug("Disconnecting %d clients", g_hash_table_size(client_list)); - g_hash_table_foreach(client_list, disconnect_cib_client, NULL); - crm_info("Disconnected %d clients", g_hash_table_size(client_list)); - cib_process_disconnect(NULL, NULL); - - } else { - crm_info("Waiting for %d clients to disconnect...", g_hash_table_size(client_list)); - } -} - gboolean startCib(const char *filename) { gboolean active = FALSE; xmlNode *cib = readCibXmlFile(cib_root, filename, !preserve_status); CRM_ASSERT(cib != NULL); if (activateCibXml(cib, TRUE, "start") == 0) { int port = 0; const char *port_s = NULL; active = TRUE; cib_read_config(config_hash, cib); port_s = crm_element_value(cib, "remote-tls-port"); if (port_s) { port = crm_parse_int(port_s, "0"); remote_tls_fd = init_remote_listener(port, TRUE); } port_s = crm_element_value(cib, "remote-clear-port"); if (port_s) { port = crm_parse_int(port_s, "0"); remote_fd = init_remote_listener(port, FALSE); } crm_info("CIB Initialization completed successfully"); } return active; } diff --git a/cib/notify.c b/cib/notify.c index 0bd15f6e8f..546ce5d748 100644 --- a/cib/notify.c +++ b/cib/notify.c @@ -1,370 +1,345 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include int pending_updates = 0; extern GHashTable *client_list; gboolean cib_notify_client(gpointer key, gpointer value, gpointer user_data); void attach_cib_generation(xmlNode * msg, const char *field, xmlNode * a_cib); void do_cib_notify(int options, const char *op, xmlNode * update, enum cib_errors result, xmlNode * result_data, const char *msg_type); static void need_pre_notify(gpointer key, gpointer value, gpointer user_data) { cib_client_t *client = value; if (client->pre_notify) { gboolean *needed = user_data; *needed = TRUE; } } static void need_post_notify(gpointer key, gpointer value, gpointer user_data) { cib_client_t *client = value; if (client->post_notify) { gboolean *needed = user_data; *needed = TRUE; } } gboolean cib_notify_client(gpointer key, gpointer value, gpointer user_data) { - int qlen = 0; - int max_qlen = 500; const char *type = NULL; gboolean do_send = FALSE; - gboolean do_remote = FALSE; - IPC_Channel *ipc_client = NULL; cib_client_t *client = value; xmlNode *update_msg = user_data; CRM_CHECK(client != NULL, return TRUE); CRM_CHECK(update_msg != NULL, return TRUE); if (client == NULL) { crm_warn("Skipping NULL client"); return TRUE; - } else if (client->channel == NULL) { + } else if (client->ipc == NULL) { crm_warn("Skipping client with NULL channel"); return FALSE; - - } else if (client->name == NULL) { - crm_trace("Skipping unnammed client / comamnd channel"); - return FALSE; } type = crm_element_value(update_msg, F_SUBTYPE); - ipc_client = client->channel; - do_remote = crm_str_eq(client->channel_name, "remote", FALSE); - - if (do_remote == FALSE) { - qlen = ipc_client->send_queue->current_qlen; - max_qlen = ipc_client->send_queue->max_qlen; - } - CRM_LOG_ASSERT(type != NULL); if (client->diffs && safe_str_eq(type, T_CIB_DIFF_NOTIFY)) { do_send = TRUE; } else if (client->replace && safe_str_eq(type, T_CIB_REPLACE_NOTIFY)) { do_send = TRUE; } else if (client->confirmations && safe_str_eq(type, T_CIB_UPDATE_CONFIRM)) { do_send = TRUE; } else if (client->pre_notify && safe_str_eq(type, T_CIB_PRE_NOTIFY)) { - if (qlen < (int)(0.4 * max_qlen)) { - do_send = TRUE; - } else { - crm_warn("Throttling pre-notifications due to" - " high load: queue=%d (max=%d)", qlen, max_qlen); - } + do_send = TRUE; } else if (client->post_notify && safe_str_eq(type, T_CIB_POST_NOTIFY)) { - if (qlen < (int)(0.7 * max_qlen)) { - do_send = TRUE; - } else { - crm_warn("Throttling post-notifications due to" - " extreme load: queue=%d (max=%d)", qlen, max_qlen); - } + do_send = TRUE; } if (do_send) { - if (do_remote) { - crm_debug("Sent %s notification to client %s/%s", type, client->name, client->id); - cib_send_remote_msg(client->channel, update_msg, client->encrypted); + if (client->ipc) { + if(crm_ipcs_send(client->ipc, update_msg, TRUE) == FALSE) { + crm_warn("Notification of client %s/%s failed", client->name, client->id); + } - } else if (ipc_client->send_queue->current_qlen >= ipc_client->send_queue->max_qlen) { - /* We never want the CIB to exit because our client is slow */ - crm_crit("%s-notification of client %s/%s failed - queue saturated", - type, client->name, client->id); +#ifdef HAVE_GNUTLS_GNUTLS_H + } else if (client->session) { + crm_debug("Sent %s notification to client %s/%s", type, client->name, client->id); + cib_send_remote_msg(client->session, update_msg, client->encrypted); - } else if (send_ipc_message(ipc_client, update_msg) == FALSE) { - crm_warn("Notification of client %s/%s failed", client->name, client->id); - return FALSE; +#endif + } else { + crm_err("Unknown transport for %s", client->name); } } return FALSE; } void cib_pre_notify(int options, const char *op, xmlNode * existing, xmlNode * update) { xmlNode *update_msg = NULL; const char *type = NULL; const char *id = NULL; gboolean needed = FALSE; g_hash_table_foreach(client_list, need_pre_notify, &needed); if (needed == FALSE) { return; } /* TODO: consider pre-notification for removal */ update_msg = create_xml_node(NULL, "pre-notify"); if (update != NULL) { id = crm_element_value(update, XML_ATTR_ID); } crm_xml_add(update_msg, F_TYPE, T_CIB_NOTIFY); crm_xml_add(update_msg, F_SUBTYPE, T_CIB_PRE_NOTIFY); crm_xml_add(update_msg, F_CIB_OPERATION, op); if (id != NULL) { crm_xml_add(update_msg, F_CIB_OBJID, id); } if (update != NULL) { crm_xml_add(update_msg, F_CIB_OBJTYPE, crm_element_name(update)); } else if (existing != NULL) { crm_xml_add(update_msg, F_CIB_OBJTYPE, crm_element_name(existing)); } type = crm_element_value(update_msg, F_CIB_OBJTYPE); attach_cib_generation(update_msg, "cib_generation", the_cib); if (existing != NULL) { add_message_xml(update_msg, F_CIB_EXISTING, existing); } if (update != NULL) { add_message_xml(update_msg, F_CIB_UPDATE, update); } g_hash_table_foreach_remove(client_list, cib_notify_client, update_msg); if (update == NULL) { crm_trace("Performing operation %s (on section=%s)", op, type); } else { crm_trace("Performing %s on <%s%s%s>", op, type, id ? " id=" : "", id ? id : ""); } free_xml(update_msg); } void cib_post_notify(int options, const char *op, xmlNode * update, enum cib_errors result, xmlNode * new_obj) { gboolean needed = FALSE; g_hash_table_foreach(client_list, need_post_notify, &needed); if (needed == FALSE) { return; } do_cib_notify(options, op, update, result, new_obj, T_CIB_UPDATE_CONFIRM); } void cib_diff_notify(int options, const char *client, const char *call_id, const char *op, xmlNode * update, enum cib_errors result, xmlNode * diff) { int add_updates = 0; int add_epoch = 0; int add_admin_epoch = 0; int del_updates = 0; int del_epoch = 0; int del_admin_epoch = 0; int log_level = LOG_DEBUG_2; if (diff == NULL) { return; } if (result != cib_ok) { log_level = LOG_WARNING; } cib_diff_version_details(diff, &add_admin_epoch, &add_epoch, &add_updates, &del_admin_epoch, &del_epoch, &del_updates); if (add_updates != del_updates) { do_crm_log(log_level, "Update (client: %s%s%s): %d.%d.%d -> %d.%d.%d (%s)", client, call_id ? ", call:" : "", call_id ? call_id : "", del_admin_epoch, del_epoch, del_updates, add_admin_epoch, add_epoch, add_updates, cib_error2string(result)); } else if (diff != NULL) { do_crm_log(log_level, "Local-only Change (client:%s%s%s): %d.%d.%d (%s)", client, call_id ? ", call: " : "", call_id ? call_id : "", add_admin_epoch, add_epoch, add_updates, cib_error2string(result)); } do_cib_notify(options, op, update, result, diff, T_CIB_DIFF_NOTIFY); } void do_cib_notify(int options, const char *op, xmlNode * update, enum cib_errors result, xmlNode * result_data, const char *msg_type) { xmlNode *update_msg = NULL; const char *id = NULL; update_msg = create_xml_node(NULL, "notify"); if (result_data != NULL) { id = crm_element_value(result_data, XML_ATTR_ID); } crm_xml_add(update_msg, F_TYPE, T_CIB_NOTIFY); crm_xml_add(update_msg, F_SUBTYPE, msg_type); crm_xml_add(update_msg, F_CIB_OPERATION, op); crm_xml_add_int(update_msg, F_CIB_RC, result); if (id != NULL) { crm_xml_add(update_msg, F_CIB_OBJID, id); } if (update != NULL) { crm_trace("Setting type to update->name: %s", crm_element_name(update)); crm_xml_add(update_msg, F_CIB_OBJTYPE, crm_element_name(update)); } else if (result_data != NULL) { crm_trace("Setting type to new_obj->name: %s", crm_element_name(result_data)); crm_xml_add(update_msg, F_CIB_OBJTYPE, crm_element_name(result_data)); } else { crm_trace("Not Setting type"); } attach_cib_generation(update_msg, "cib_generation", the_cib); if (update != NULL) { add_message_xml(update_msg, F_CIB_UPDATE, update); } if (result_data != NULL) { add_message_xml(update_msg, F_CIB_UPDATE_RESULT, result_data); } crm_trace("Notifying clients"); g_hash_table_foreach_remove(client_list, cib_notify_client, update_msg); free_xml(update_msg); crm_trace("Notify complete"); } void attach_cib_generation(xmlNode * msg, const char *field, xmlNode * a_cib) { xmlNode *generation = create_xml_node(NULL, XML_CIB_TAG_GENERATION_TUPPLE); if (a_cib != NULL) { copy_in_properties(generation, a_cib); } add_message_xml(msg, field, generation); free_xml(generation); } void cib_replace_notify(const char *origin, xmlNode * update, enum cib_errors result, xmlNode * diff) { xmlNode *replace_msg = NULL; int add_updates = 0; int add_epoch = 0; int add_admin_epoch = 0; int del_updates = 0; int del_epoch = 0; int del_admin_epoch = 0; if (diff == NULL) { return; } cib_diff_version_details(diff, &add_admin_epoch, &add_epoch, &add_updates, &del_admin_epoch, &del_epoch, &del_updates); if (add_updates != del_updates) { crm_info("Replaced: %d.%d.%d -> %d.%d.%d from %s", del_admin_epoch, del_epoch, del_updates, add_admin_epoch, add_epoch, add_updates, crm_str(origin)); } else if (diff != NULL) { crm_info("Local-only Replace: %d.%d.%d from %s", add_admin_epoch, add_epoch, add_updates, crm_str(origin)); } replace_msg = create_xml_node(NULL, "notify-replace"); crm_xml_add(replace_msg, F_TYPE, T_CIB_NOTIFY); crm_xml_add(replace_msg, F_SUBTYPE, T_CIB_REPLACE_NOTIFY); crm_xml_add(replace_msg, F_CIB_OPERATION, CIB_OP_REPLACE); crm_xml_add_int(replace_msg, F_CIB_RC, result); attach_cib_generation(replace_msg, "cib-replace-generation", update); crm_log_xml_trace(replace_msg, "CIB Replaced"); g_hash_table_foreach_remove(client_list, cib_notify_client, replace_msg); free_xml(replace_msg); } diff --git a/cib/remote.c b/cib/remote.c index d0fc1c6e0a..af65cf96de 100644 --- a/cib/remote.c +++ b/cib/remote.c @@ -1,611 +1,610 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "callbacks.h" /* #undef HAVE_PAM_PAM_APPL_H */ /* #undef HAVE_GNUTLS_GNUTLS_H */ #ifdef HAVE_GNUTLS_GNUTLS_H # undef KEYFILE # include #endif #include #include #if HAVE_SECURITY_PAM_APPL_H # include # define HAVE_PAM 1 #else # if HAVE_PAM_PAM_APPL_H # include # define HAVE_PAM 1 # endif #endif #ifdef HAVE_DECL_NANOSLEEP # include #endif extern int remote_tls_fd; extern gboolean cib_shutdown_flag; -extern void initiate_exit(void); int init_remote_listener(int port, gboolean encrypted); void cib_remote_connection_destroy(gpointer user_data); #ifdef HAVE_GNUTLS_GNUTLS_H # define DH_BITS 1024 gnutls_dh_params dh_params; extern gnutls_anon_server_credentials anon_cred_s; static void debug_log(int level, const char *str) { fputs(str, stderr); } extern gnutls_session *create_tls_session(int csock, int type); #endif -extern int num_clients; +int num_clients; int authenticate_user(const char *user, const char *passwd); gboolean cib_remote_listen(int ssock, gpointer data); gboolean cib_remote_msg(int csock, gpointer data); -extern void cib_common_callback_worker(xmlNode * op_request, cib_client_t * cib_client, - gboolean force_synchronous, gboolean privileged); +static void +remote_connection_destroy(gpointer user_data) +{ + return; +} #define ERROR_SUFFIX " Shutting down remote listener" int init_remote_listener(int port, gboolean encrypted) { int ssock; struct sockaddr_in saddr; int optval; if (port <= 0) { /* dont start it */ return 0; } if (encrypted) { #ifndef HAVE_GNUTLS_GNUTLS_H crm_warn("TLS support is not available"); return 0; #else crm_notice("Starting a tls listener on port %d.", port); gnutls_global_init(); /* gnutls_global_set_log_level (10); */ gnutls_global_set_log_function(debug_log); gnutls_dh_params_init(&dh_params); gnutls_dh_params_generate2(dh_params, DH_BITS); gnutls_anon_allocate_server_credentials(&anon_cred_s); gnutls_anon_set_server_dh_params(anon_cred_s, dh_params); #endif } else { crm_warn("Starting a plain_text listener on port %d.", port); } #ifndef HAVE_PAM crm_warn("PAM is _not_ enabled!"); #endif /* create server socket */ ssock = socket(AF_INET, SOCK_STREAM, 0); if (ssock == -1) { crm_perror(LOG_ERR, "Can not create server socket." ERROR_SUFFIX); return -1; } /* reuse address */ optval = 1; setsockopt(ssock, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)); /* bind server socket */ memset(&saddr, '\0', sizeof(saddr)); saddr.sin_family = AF_INET; saddr.sin_addr.s_addr = INADDR_ANY; saddr.sin_port = htons(port); if (bind(ssock, (struct sockaddr *)&saddr, sizeof(saddr)) == -1) { crm_perror(LOG_ERR, "Can not bind server socket." ERROR_SUFFIX); close(ssock); return -2; } if (listen(ssock, 10) == -1) { crm_perror(LOG_ERR, "Can not start listen." ERROR_SUFFIX); close(ssock); return -3; } G_main_add_fd(G_PRIORITY_HIGH, ssock, FALSE, - cib_remote_listen, NULL, default_ipc_connection_destroy); + cib_remote_listen, NULL, remote_connection_destroy); return ssock; } static int check_group_membership(const char *usr, const char *grp) { int index = 0; struct passwd *pwd = NULL; struct group *group = NULL; CRM_CHECK(usr != NULL, return FALSE); CRM_CHECK(grp != NULL, return FALSE); pwd = getpwnam(usr); if (pwd == NULL) { crm_err("No user named '%s' exists!", usr); return FALSE; } group = getgrgid(pwd->pw_gid); if (group != NULL && crm_str_eq(grp, group->gr_name, TRUE)) { return TRUE; } group = getgrnam(grp); if (group == NULL) { crm_err("No group named '%s' exists!", grp); return FALSE; } while (TRUE) { char *member = group->gr_mem[index++]; if (member == NULL) { break; } else if (crm_str_eq(usr, member, TRUE)) { return TRUE; } }; return FALSE; } gboolean cib_remote_listen(int ssock, gpointer data) { int lpc = 0; int csock = 0; unsigned laddr; time_t now = 0; time_t start = time(NULL); struct sockaddr_in addr; #ifdef HAVE_GNUTLS_GNUTLS_H gnutls_session *session = NULL; #endif cib_client_t *new_client = NULL; xmlNode *login = NULL; const char *user = NULL; const char *pass = NULL; const char *tmp = NULL; cl_uuid_t client_id; char uuid_str[UU_UNPARSE_SIZEOF]; #ifdef HAVE_DECL_NANOSLEEP const struct timespec sleepfast = { 0, 10000000 }; /* 10 millisec */ #endif /* accept the connection */ laddr = sizeof(addr); csock = accept(ssock, (struct sockaddr *)&addr, &laddr); crm_debug("New %s connection from %s", ssock == remote_tls_fd ? "secure" : "clear-text", inet_ntoa(addr.sin_addr)); if (csock == -1) { crm_err("accept socket failed"); return TRUE; } if (ssock == remote_tls_fd) { #ifdef HAVE_GNUTLS_GNUTLS_H /* create gnutls session for the server socket */ session = create_tls_session(csock, GNUTLS_SERVER); if (session == NULL) { crm_err("TLS session creation failed"); close(csock); return TRUE; } #endif } do { crm_trace("Iter: %d", lpc++); if (ssock == remote_tls_fd) { #ifdef HAVE_GNUTLS_GNUTLS_H login = cib_recv_remote_msg(session, TRUE); #endif } else { login = cib_recv_remote_msg(GINT_TO_POINTER(csock), FALSE); } if (login != NULL) { break; } #ifdef HAVE_DECL_NANOSLEEP nanosleep(&sleepfast, NULL); #else sleep(1); #endif now = time(NULL); /* Peers have 3s to connect */ } while (login == NULL && (start - now) < 4); crm_log_xml_info(login, "Login: "); if (login == NULL) { goto bail; } tmp = crm_element_name(login); if (safe_str_neq(tmp, "cib_command")) { crm_err("Wrong tag: %s", tmp); goto bail; } tmp = crm_element_value(login, "op"); if (safe_str_neq(tmp, "authenticate")) { crm_err("Wrong operation: %s", tmp); goto bail; } user = crm_element_value(login, "user"); pass = crm_element_value(login, "password"); /* Non-root daemons can only validate the password of the * user they're running as */ if (check_group_membership(user, CRM_DAEMON_GROUP) == FALSE) { crm_err("User is not a member of the required group"); goto bail; } else if (authenticate_user(user, pass) == FALSE) { crm_err("PAM auth failed"); goto bail; } /* send ACK */ - crm_malloc0(new_client, sizeof(cib_client_t)); num_clients++; - new_client->channel_name = "remote"; + crm_malloc0(new_client, sizeof(cib_client_t)); new_client->name = crm_element_value_copy(login, "name"); cl_uuid_generate(&client_id); cl_uuid_unparse(&client_id, uuid_str); CRM_CHECK(new_client->id == NULL, crm_free(new_client->id)); new_client->id = crm_strdup(uuid_str); #if ENABLE_ACL new_client->user = crm_strdup(user); #endif new_client->callback_id = NULL; if (ssock == remote_tls_fd) { #ifdef HAVE_GNUTLS_GNUTLS_H new_client->encrypted = TRUE; - new_client->channel = (void *)session; + new_client->session = session; #endif } else { - new_client->channel = GINT_TO_POINTER(csock); + new_client->session = GINT_TO_POINTER(csock); } free_xml(login); login = create_xml_node(NULL, "cib_result"); crm_xml_add(login, F_CIB_OPERATION, CRM_OP_REGISTER); crm_xml_add(login, F_CIB_CLIENTID, new_client->id); - cib_send_remote_msg(new_client->channel, login, new_client->encrypted); + cib_send_remote_msg(new_client->session, login, new_client->encrypted); free_xml(login); - new_client->source = - (void *)G_main_add_fd(G_PRIORITY_DEFAULT, csock, FALSE, cib_remote_msg, new_client, - cib_remote_connection_destroy); + new_client->remote = + G_main_add_fd(G_PRIORITY_DEFAULT, csock, FALSE, cib_remote_msg, new_client, + cib_remote_connection_destroy); g_hash_table_insert(client_list, new_client->id, new_client); return TRUE; bail: if (ssock == remote_tls_fd) { #ifdef HAVE_GNUTLS_GNUTLS_H gnutls_bye(*session, GNUTLS_SHUT_RDWR); gnutls_deinit(*session); gnutls_free(session); #endif } close(csock); free_xml(login); return TRUE; } void cib_remote_connection_destroy(gpointer user_data) { cib_client_t *client = user_data; if (client == NULL) { return; } - crm_trace("Cleaning up after client disconnect: %s/%s/%s", - crm_str(client->name), client->channel_name, client->id); + crm_trace("Cleaning up after client disconnect: %s/%s", + crm_str(client->name), client->id); if (client->id != NULL) { if (!g_hash_table_remove(client_list, client->id)) { crm_err("Client %s not found in the hashtable", client->name); } } - if (client->source != NULL) { + if (client->remote != NULL) { /* Should this even be necessary? */ - crm_trace("Deleting %s (%p) from mainloop", client->name, client->source); - G_main_del_fd((GFDSource *) client->source); - client->source = NULL; + crm_trace("Deleting %s (%p) from mainloop", client->name, client->remote); + G_main_del_fd(client->remote); + client->remote = NULL; } crm_trace("Destroying %s (%p)", client->name, user_data); num_clients--; crm_trace("Num unfree'd clients: %d", num_clients); crm_free(client->name); crm_free(client->callback_id); crm_free(client->id); crm_free(client->user); crm_free(client); crm_trace("Freed the cib client"); - if (cib_shutdown_flag && g_hash_table_size(client_list) == 0) { - crm_info("All clients disconnected..."); - initiate_exit(); + if (cib_shutdown_flag) { + cib_shutdown(0); } - return; } gboolean cib_remote_msg(int csock, gpointer data) { const char *value = NULL; xmlNode *command = NULL; cib_client_t *client = data; crm_trace("%s callback", client->encrypted ? "secure" : "clear-text"); - command = cib_recv_remote_msg(client->channel, client->encrypted); + command = cib_recv_remote_msg(client->session, client->encrypted); if (command == NULL) { return FALSE; } value = crm_element_name(command); if (safe_str_neq(value, "cib_command")) { crm_log_xml_trace(command, "Bad command: "); goto bail; } if (client->name == NULL) { value = crm_element_value(command, F_CLIENTNAME); if (value == NULL) { client->name = crm_strdup(client->id); } else { client->name = crm_strdup(value); } } if (client->callback_id == NULL) { value = crm_element_value(command, F_CIB_CALLBACK_TOKEN); if (value != NULL) { client->callback_id = crm_strdup(value); crm_trace("Callback channel for %s is %s", client->id, client->callback_id); } else { client->callback_id = crm_strdup(client->id); } } /* unset dangerous options */ xml_remove_prop(command, F_ORIG); xml_remove_prop(command, F_CIB_HOST); xml_remove_prop(command, F_CIB_GLOBAL_UPDATE); crm_xml_add(command, F_TYPE, T_CIB); crm_xml_add(command, F_CIB_CLIENTID, client->id); crm_xml_add(command, F_CIB_CLIENTNAME, client->name); #if ENABLE_ACL crm_xml_add(command, F_CIB_USER, client->user); #endif if (crm_element_value(command, F_CIB_CALLID) == NULL) { cl_uuid_t call_id; char call_uuid[UU_UNPARSE_SIZEOF]; /* fix the command */ cl_uuid_generate(&call_id); cl_uuid_unparse(&call_id, call_uuid); crm_xml_add(command, F_CIB_CALLID, call_uuid); } if (crm_element_value(command, F_CIB_CALLOPTS) == NULL) { crm_xml_add_int(command, F_CIB_CALLOPTS, 0); } crm_log_xml_trace(command, "Remote command: "); - cib_common_callback_worker(command, client, FALSE, TRUE); + cib_common_callback_worker(command, client, TRUE); bail: free_xml(command); command = NULL; return TRUE; } #ifdef HAVE_PAM /* * Useful Examples: * http://www.kernel.org/pub/linux/libs/pam/Linux-PAM-html * http://developer.apple.com/samplecode/CryptNoMore/index.html */ static int construct_pam_passwd(int num_msg, const struct pam_message **msg, struct pam_response **response, void *data) { int count = 0; struct pam_response *reply; char *string = (char *)data; CRM_CHECK(data, return PAM_CONV_ERR); CRM_CHECK(num_msg == 1, return PAM_CONV_ERR); /* We only want to handle one message */ reply = calloc(1, sizeof(struct pam_response)); CRM_ASSERT(reply != NULL); for (count = 0; count < num_msg; ++count) { switch (msg[count]->msg_style) { case PAM_TEXT_INFO: crm_info("PAM: %s\n", msg[count]->msg); break; case PAM_PROMPT_ECHO_OFF: case PAM_PROMPT_ECHO_ON: reply[count].resp_retcode = 0; reply[count].resp = string; /* We already made a copy */ case PAM_ERROR_MSG: /* In theory we'd want to print this, but then * we see the password prompt in the logs */ /* crm_err("PAM error: %s\n", msg[count]->msg); */ break; default: crm_err("Unhandled conversation type: %d", msg[count]->msg_style); goto bail; } } *response = reply; reply = NULL; return PAM_SUCCESS; bail: for (count = 0; count < num_msg; ++count) { if (reply[count].resp != NULL) { switch (msg[count]->msg_style) { case PAM_PROMPT_ECHO_ON: case PAM_PROMPT_ECHO_OFF: /* Erase the data - it contained a password */ while (*(reply[count].resp)) { *(reply[count].resp)++ = '\0'; } free(reply[count].resp); break; } reply[count].resp = NULL; } } free(reply); reply = NULL; return PAM_CONV_ERR; } #endif int authenticate_user(const char *user, const char *passwd) { #ifndef HAVE_PAM gboolean pass = TRUE; #else int rc = 0; gboolean pass = FALSE; const void *p_user = NULL; struct pam_conv p_conv; struct pam_handle *pam_h = NULL; static const char *pam_name = NULL; if (pam_name == NULL) { pam_name = getenv("CIB_pam_service"); } if (pam_name == NULL) { pam_name = "login"; } p_conv.conv = construct_pam_passwd; p_conv.appdata_ptr = strdup(passwd); rc = pam_start(pam_name, user, &p_conv, &pam_h); if (rc != PAM_SUCCESS) { crm_err("Could not initialize PAM: %s (%d)", pam_strerror(pam_h, rc), rc); goto bail; } rc = pam_authenticate(pam_h, 0); if (rc != PAM_SUCCESS) { crm_err("Authentication failed for %s: %s (%d)", user, pam_strerror(pam_h, rc), rc); goto bail; } /* Make sure we authenticated the user we wanted to authenticate. * Since we also run as non-root, it might be worth pre-checking * the user has the same EID as us, since that the only user we * can authenticate. */ rc = pam_get_item(pam_h, PAM_USER, &p_user); if (rc != PAM_SUCCESS) { crm_err("Internal PAM error: %s (%d)", pam_strerror(pam_h, rc), rc); goto bail; } else if (p_user == NULL) { crm_err("Unknown user authenticated."); goto bail; } else if (safe_str_neq(p_user, user)) { crm_err("User mismatch: %s vs. %s.", (const char *)p_user, (const char *)user); goto bail; } rc = pam_acct_mgmt(pam_h, 0); if (rc != PAM_SUCCESS) { crm_err("Access denied: %s (%d)", pam_strerror(pam_h, rc), rc); goto bail; } pass = TRUE; bail: rc = pam_end(pam_h, rc); #endif return pass; } diff --git a/include/crm/cib.h b/include/crm/cib.h index c0722a5149..541e57b857 100644 --- a/include/crm/cib.h +++ b/include/crm/cib.h @@ -1,312 +1,312 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #ifndef CIB__H # define CIB__H # include # include # define CIB_FEATURE_SET "2.0" # define USE_PESKY_FRAGMENTS 1 /* use compare_version() for doing comparisons */ enum cib_variant { cib_undefined, cib_native, cib_file, cib_remote, cib_database, cib_edir }; enum cib_state { cib_connected_command, cib_connected_query, cib_disconnected }; enum cib_conn_type { cib_command, cib_query, cib_no_connection }; /* *INDENT-OFF* */ enum cib_call_options { cib_none = 0x00000000, cib_verbose = 0x00000001, cib_xpath = 0x00000002, cib_multiple = 0x00000004, cib_can_create = 0x00000008, cib_discard_reply = 0x00000010, cib_no_children = 0x00000020, cib_scope_local = 0x00000100, cib_dryrun = 0x00000200, cib_sync_call = 0x00001000, cib_inhibit_notify = 0x00010000, cib_quorum_override = 0x00100000, cib_inhibit_bcast = 0x01000000, cib_force_diff = 0x10000000 }; #define cib_default_options = cib_none enum cib_errors { cib_ok = 0, cib_operation = -1, cib_create_msg = -2, cib_not_connected = -3, cib_not_authorized = -4, cib_send_failed = -5, cib_reply_failed = -6, cib_return_code = -7, cib_output_ptr = -8, cib_output_data = -9, cib_connection = -10, cib_authentication = -11, cib_missing = -12, cib_variant = -28, CIBRES_MISSING_ID = -13, CIBRES_MISSING_TYPE = -14, CIBRES_MISSING_FIELD = -15, CIBRES_OBJTYPE_MISMATCH = -16, CIBRES_CORRUPT = -17, CIBRES_OTHER = -18, cib_unknown = -19, cib_STALE = -20, cib_EXISTS = -21, cib_NOTEXISTS = -22, cib_ACTIVATION = -23, cib_NOSECTION = -24, cib_NOOBJECT = -25, cib_NOPARENT = -26, cib_NODECOPY = -27, cib_NOTSUPPORTED = -29, cib_registration_msg = -30, cib_callback_token = -31, cib_callback_register = -32, cib_msg_field_add = -33, cib_client_gone = -34, cib_not_master = -35, cib_client_corrupt = -36, cib_master_timeout = -37, cib_revision_unsupported= -38, cib_revision_unknown = -39, cib_missing_data = -40, cib_remote_timeout = -41, cib_no_quorum = -42, cib_diff_failed = -43, cib_diff_resync = -44, cib_old_data = -45, cib_id_check = -46, cib_dtd_validation = -47, cib_bad_section = -48, cib_bad_digest = -49, cib_bad_permissions = -50, cib_bad_config = -51, cib_invalid_argument = -52, cib_transform_failed = -53, cib_permission_denied = -54, }; /* *INDENT-ON* */ enum cib_update_op { CIB_UPDATE_OP_NONE = 0, CIB_UPDATE_OP_ADD, CIB_UPDATE_OP_MODIFY, CIB_UPDATE_OP_DELETE, CIB_UPDATE_OP_MAX }; enum cib_section { cib_section_none, cib_section_all, cib_section_nodes, cib_section_constraints, cib_section_resources, cib_section_crmconfig, cib_section_status }; # define CIB_OP_SLAVE "cib_slave" # define CIB_OP_SLAVEALL "cib_slave_all" # define CIB_OP_MASTER "cib_master" # define CIB_OP_SYNC "cib_sync" # define CIB_OP_SYNC_ONE "cib_sync_one" # define CIB_OP_ISMASTER "cib_ismaster" # define CIB_OP_BUMP "cib_bump" # define CIB_OP_QUERY "cib_query" # define CIB_OP_CREATE "cib_create" # define CIB_OP_UPDATE "cib_update" # define CIB_OP_MODIFY "cib_modify" # define CIB_OP_DELETE "cib_delete" # define CIB_OP_ERASE "cib_erase" # define CIB_OP_REPLACE "cib_replace" # define CIB_OP_NOTIFY "cib_notify" # define CIB_OP_APPLY_DIFF "cib_apply_diff" # define CIB_OP_UPGRADE "cib_upgrade" # define CIB_OP_DELETE_ALT "cib_delete_alt" # define F_CIB_CLIENTID "cib_clientid" # define F_CIB_CALLOPTS "cib_callopt" # define F_CIB_CALLID "cib_callid" # define F_CIB_CALLDATA "cib_calldata" # define F_CIB_OPERATION "cib_op" # define F_CIB_ISREPLY "cib_isreplyto" # define F_CIB_SECTION "cib_section" # define F_CIB_HOST "cib_host" # define F_CIB_RC "cib_rc" # define F_CIB_DELEGATED "cib_delegated_from" # define F_CIB_OBJID "cib_object" # define F_CIB_OBJTYPE "cib_object_type" # define F_CIB_EXISTING "cib_existing_object" # define F_CIB_SEENCOUNT "cib_seen" # define F_CIB_TIMEOUT "cib_timeout" # define F_CIB_UPDATE "cib_update" # define F_CIB_CALLBACK_TOKEN "cib_async_id" # define F_CIB_GLOBAL_UPDATE "cib_update" # define F_CIB_UPDATE_RESULT "cib_update_result" # define F_CIB_CLIENTNAME "cib_clientname" # define F_CIB_NOTIFY_TYPE "cib_notify_type" # define F_CIB_NOTIFY_ACTIVATE "cib_notify_activate" # define F_CIB_UPDATE_DIFF "cib_update_diff" # define F_CIB_USER "cib_user" # define T_CIB "cib" # define T_CIB_NOTIFY "cib_notify" /* notify sub-types */ # define T_CIB_PRE_NOTIFY "cib_pre_notify" # define T_CIB_POST_NOTIFY "cib_post_notify" # define T_CIB_UPDATE_CONFIRM "cib_update_confirmation" # define T_CIB_DIFF_NOTIFY "cib_diff_notify" # define T_CIB_REPLACE_NOTIFY "cib_refresh_notify" # define cib_channel_ro "cib_ro" # define cib_channel_rw "cib_rw" # define cib_channel_callback "cib_callback" # define cib_channel_ro_synchronous "cib_ro_syncronous" # define cib_channel_rw_synchronous "cib_rw_syncronous" typedef struct cib_s cib_t; typedef struct cib_api_operations_s { int (*variant_op) (cib_t * cib, const char *op, const char *host, const char *section, xmlNode * data, xmlNode ** output_data, int call_options); int (*signon) (cib_t * cib, const char *name, enum cib_conn_type type); int (*signon_raw) (cib_t * cib, const char *name, enum cib_conn_type type, int *async_fd, - int *sync_fd); + int *unused); int (*signoff) (cib_t * cib); int (*free) (cib_t * cib); int (*set_op_callback) (cib_t * cib, void (*callback) (const xmlNode * msg, int callid, int rc, xmlNode * output)); int (*add_notify_callback) (cib_t * cib, const char *event, void (*callback) (const char *event, xmlNode * msg)); int (*del_notify_callback) (cib_t * cib, const char *event, void (*callback) (const char *event, xmlNode * msg)); int (*set_connection_dnotify) (cib_t * cib, void (*dnotify) (gpointer user_data)); int (*inputfd) (cib_t * cib); int (*noop) (cib_t * cib, int call_options); int (*ping) (cib_t * cib, xmlNode ** output_data, int call_options); int (*query) (cib_t * cib, const char *section, xmlNode ** output_data, int call_options); int (*query_from) (cib_t * cib, const char *host, const char *section, xmlNode ** output_data, int call_options); int (*is_master) (cib_t * cib); int (*set_master) (cib_t * cib, int call_options); int (*set_slave) (cib_t * cib, int call_options); int (*set_slave_all) (cib_t * cib, int call_options); int (*sync) (cib_t * cib, const char *section, int call_options); int (*sync_from) (cib_t * cib, const char *host, const char *section, int call_options); int (*upgrade) (cib_t * cib, int call_options); int (*bump_epoch) (cib_t * cib, int call_options); int (*create) (cib_t * cib, const char *section, xmlNode * data, int call_options); int (*modify) (cib_t * cib, const char *section, xmlNode * data, int call_options); int (*update) (cib_t * cib, const char *section, xmlNode * data, int call_options); int (*replace) (cib_t * cib, const char *section, xmlNode * data, int call_options); int (*delete) (cib_t * cib, const char *section, xmlNode * data, int call_options); int (*erase) (cib_t * cib, xmlNode ** output_data, int call_options); int (*delete_absolute) (cib_t * cib, const char *section, xmlNode * data, int call_options); int (*quit) (cib_t * cib, int call_options); int (*register_notification) (cib_t * cib, const char *callback, int enabled); gboolean(*register_callback) (cib_t * cib, int call_id, int timeout, gboolean only_success, void *user_data, const char *callback_name, void (*callback) (xmlNode *, int, int, xmlNode *, void *)); int (*delegated_variant_op) (cib_t * cib, const char *op, const char *host, const char *section, xmlNode * data, xmlNode ** output_data, int call_options, const char *user_name); } cib_api_operations_t; struct cib_s { enum cib_state state; enum cib_conn_type type; enum cib_variant variant; int call_id; int call_timeout; void *variant_opaque; GList *notify_list; void (*op_callback) (const xmlNode * msg, int call_id, int rc, xmlNode * output); cib_api_operations_t *cmds; }; /* Core functions */ extern cib_t *cib_new(void); extern cib_t *cib_native_new(void); extern cib_t *cib_file_new(const char *filename); extern cib_t *cib_remote_new(const char *server, const char *user, const char *passwd, int port, gboolean encrypted); extern cib_t *cib_new_no_shadow(void); extern char *get_shadow_file(const char *name); extern cib_t *cib_shadow_new(const char *name); extern void cib_delete(cib_t * cib); extern void cib_dump_pending_callbacks(void); extern int num_cib_op_callbacks(void); extern void remove_cib_op_callback(int call_id, gboolean all_callbacks); # define add_cib_op_callback(cib, id, flag, data, fn) cib->cmds->register_callback(cib, id, 120, flag, data, #fn, fn) # include # include # define CIB_LIBRARY "libcib.so.1" #endif diff --git a/lib/cib/cib_client.c b/lib/cib/cib_client.c index efd0e4340c..7f46db406f 100644 --- a/lib/cib/cib_client.c +++ b/lib/cib/cib_client.c @@ -1,623 +1,624 @@ /* * Copyright (c) 2004 Andrew Beekhof * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA * */ #include #include #include #include #include #include #include #include #include #include #include #include #include GHashTable *cib_op_callback_table = NULL; int cib_client_set_op_callback(cib_t * cib, void (*callback) (const xmlNode * msg, int call_id, int rc, xmlNode * output)); int cib_client_add_notify_callback(cib_t * cib, const char *event, void (*callback) (const char *event, xmlNode * msg)); int cib_client_del_notify_callback(cib_t * cib, const char *event, void (*callback) (const char *event, xmlNode * msg)); gint ciblib_GCompareFunc(gconstpointer a, gconstpointer b); #define op_common(cib) do { \ if(cib == NULL) { \ return cib_missing; \ } else if(cib->cmds->variant_op == NULL) { \ return cib_variant; \ } \ } while(0) static int cib_client_noop(cib_t * cib, int call_options) { op_common(cib); return cib->cmds->variant_op(cib, CRM_OP_NOOP, NULL, NULL, NULL, NULL, call_options); } static int cib_client_ping(cib_t * cib, xmlNode ** output_data, int call_options) { op_common(cib); return cib->cmds->variant_op(cib, CRM_OP_PING, NULL, NULL, NULL, output_data, call_options); } static int cib_client_query(cib_t * cib, const char *section, xmlNode ** output_data, int call_options) { return cib->cmds->query_from(cib, NULL, section, output_data, call_options); } static int cib_client_query_from(cib_t * cib, const char *host, const char *section, xmlNode ** output_data, int call_options) { op_common(cib); return cib->cmds->variant_op(cib, CIB_OP_QUERY, host, section, NULL, output_data, call_options); } static int cib_client_is_master(cib_t * cib) { op_common(cib); return cib->cmds->variant_op(cib, CIB_OP_ISMASTER, NULL, NULL, NULL, NULL, cib_scope_local | cib_sync_call); } static int cib_client_set_slave(cib_t * cib, int call_options) { op_common(cib); return cib->cmds->variant_op(cib, CIB_OP_SLAVE, NULL, NULL, NULL, NULL, call_options); } static int cib_client_set_slave_all(cib_t * cib, int call_options) { return cib_NOTSUPPORTED; } static int cib_client_set_master(cib_t * cib, int call_options) { op_common(cib); crm_trace("Adding cib_scope_local to options"); return cib->cmds->variant_op(cib, CIB_OP_MASTER, NULL, NULL, NULL, NULL, call_options | cib_scope_local); } static int cib_client_bump_epoch(cib_t * cib, int call_options) { op_common(cib); return cib->cmds->variant_op(cib, CIB_OP_BUMP, NULL, NULL, NULL, NULL, call_options); } static int cib_client_upgrade(cib_t * cib, int call_options) { op_common(cib); return cib->cmds->variant_op(cib, CIB_OP_UPGRADE, NULL, NULL, NULL, NULL, call_options); } static int cib_client_sync(cib_t * cib, const char *section, int call_options) { return cib->cmds->sync_from(cib, NULL, section, call_options); } static int cib_client_sync_from(cib_t * cib, const char *host, const char *section, int call_options) { op_common(cib); return cib->cmds->variant_op(cib, CIB_OP_SYNC, host, section, NULL, NULL, call_options); } static int cib_client_create(cib_t * cib, const char *section, xmlNode * data, int call_options) { op_common(cib); return cib->cmds->variant_op(cib, CIB_OP_CREATE, NULL, section, data, NULL, call_options); } static int cib_client_modify(cib_t * cib, const char *section, xmlNode * data, int call_options) { op_common(cib); return cib->cmds->variant_op(cib, CIB_OP_MODIFY, NULL, section, data, NULL, call_options); } static int cib_client_update(cib_t * cib, const char *section, xmlNode * data, int call_options) { op_common(cib); return cib->cmds->variant_op(cib, CIB_OP_MODIFY, NULL, section, data, NULL, call_options); } static int cib_client_replace(cib_t * cib, const char *section, xmlNode * data, int call_options) { op_common(cib); return cib->cmds->variant_op(cib, CIB_OP_REPLACE, NULL, section, data, NULL, call_options); } static int cib_client_delete(cib_t * cib, const char *section, xmlNode * data, int call_options) { op_common(cib); return cib->cmds->variant_op(cib, CIB_OP_DELETE, NULL, section, data, NULL, call_options); } static int cib_client_delete_absolute(cib_t * cib, const char *section, xmlNode * data, int call_options) { op_common(cib); return cib->cmds->variant_op(cib, CIB_OP_DELETE_ALT, NULL, section, data, NULL, call_options); } static int cib_client_erase(cib_t * cib, xmlNode ** output_data, int call_options) { op_common(cib); return cib->cmds->variant_op(cib, CIB_OP_ERASE, NULL, NULL, NULL, output_data, call_options); } static int cib_client_quit(cib_t * cib, int call_options) { op_common(cib); return cib->cmds->variant_op(cib, CRM_OP_QUIT, NULL, NULL, NULL, NULL, call_options); } static void cib_destroy_op_callback(gpointer data) { cib_callback_client_t *blob = data; if (blob->timer && blob->timer->ref > 0) { g_source_remove(blob->timer->ref); } crm_free(blob->timer); crm_free(blob); } char * get_shadow_file(const char *suffix) { char *cib_home = NULL; char *fullname = NULL; char *name = crm_concat("shadow", suffix, '.'); const char *dir = getenv("CIB_shadow_dir"); if (dir == NULL) { uid_t uid = geteuid(); struct passwd *pwent = getpwuid(uid); const char *user = NULL; if (pwent) { user = pwent->pw_name; } else { crm_perror(LOG_ERR, "Cannot get password entry for uid: %d", uid); user = getenv("USER"); } if (safe_str_eq(user, "root") || safe_str_eq(user, CRM_DAEMON_USER)) { dir = CRM_CONFIG_DIR; } else { const char *home = NULL; if ((home = getenv("HOME")) == NULL) { if (pwent) { home = pwent->pw_dir; } } if ((dir = getenv("TMPDIR")) == NULL) { dir = "/tmp"; } if (home && home[0] == '/') { int rc = 0; cib_home = crm_concat(home, ".cib", '/'); rc = mkdir(cib_home, 0700); if (rc < 0 && errno != EEXIST) { crm_perror(LOG_ERR, "Couldn't create user-specific shadow directory: %s", cib_home); errno = 0; } else { dir = cib_home; } } } } fullname = crm_concat(dir, name, '/'); crm_free(cib_home); crm_free(name); return fullname; } cib_t * cib_shadow_new(const char *shadow) { cib_t *new_cib = NULL; char *shadow_file = NULL; CRM_CHECK(shadow != NULL, return NULL); shadow_file = get_shadow_file(shadow); new_cib = cib_file_new(shadow_file); crm_free(shadow_file); return new_cib; } cib_t * cib_new_no_shadow(void) { unsetenv("CIB_shadow"); return cib_new(); } cib_t * cib_new(void) { const char *value = getenv("CIB_shadow"); if (value) { return cib_shadow_new(value); } value = getenv("CIB_file"); if (value) { return cib_file_new(value); } value = getenv("CIB_port"); if (value) { gboolean encrypted = TRUE; int port = crm_parse_int(value, NULL); const char *server = getenv("CIB_server"); const char *user = getenv("CIB_user"); const char *pass = getenv("CIB_passwd"); value = getenv("CIB_encrypted"); if (value && crm_is_true(value) == FALSE) { crm_info("Disabling TLS"); encrypted = FALSE; } if (user == NULL) { user = CRM_DAEMON_USER; crm_info("Defaulting to user: %s", user); } if (server == NULL) { server = "localhost"; crm_info("Defaulting to localhost"); } return cib_remote_new(server, user, pass, port, encrypted); } return cib_native_new(); } /* this is backwards... cib_*_new should call this not the other way around */ cib_t * cib_new_variant(void) { cib_t *new_cib = NULL; crm_malloc0(new_cib, sizeof(cib_t)); if (cib_op_callback_table != NULL) { g_hash_table_destroy(cib_op_callback_table); cib_op_callback_table = NULL; } if (cib_op_callback_table == NULL) { cib_op_callback_table = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, cib_destroy_op_callback); } new_cib->call_id = 1; new_cib->variant = cib_undefined; new_cib->type = cib_none; new_cib->state = cib_disconnected; new_cib->op_callback = NULL; new_cib->variant_opaque = NULL; new_cib->notify_list = NULL; /* the rest will get filled in by the variant constructor */ crm_malloc0(new_cib->cmds, sizeof(cib_api_operations_t)); new_cib->cmds->set_op_callback = cib_client_set_op_callback; new_cib->cmds->add_notify_callback = cib_client_add_notify_callback; new_cib->cmds->del_notify_callback = cib_client_del_notify_callback; new_cib->cmds->register_callback = cib_client_register_callback; new_cib->cmds->noop = cib_client_noop; new_cib->cmds->ping = cib_client_ping; new_cib->cmds->query = cib_client_query; new_cib->cmds->sync = cib_client_sync; new_cib->cmds->query_from = cib_client_query_from; new_cib->cmds->sync_from = cib_client_sync_from; new_cib->cmds->is_master = cib_client_is_master; new_cib->cmds->set_master = cib_client_set_master; new_cib->cmds->set_slave = cib_client_set_slave; new_cib->cmds->set_slave_all = cib_client_set_slave_all; new_cib->cmds->upgrade = cib_client_upgrade; new_cib->cmds->bump_epoch = cib_client_bump_epoch; new_cib->cmds->create = cib_client_create; new_cib->cmds->modify = cib_client_modify; new_cib->cmds->update = cib_client_update; new_cib->cmds->replace = cib_client_replace; new_cib->cmds->delete = cib_client_delete; new_cib->cmds->erase = cib_client_erase; new_cib->cmds->quit = cib_client_quit; new_cib->cmds->delete_absolute = cib_client_delete_absolute; return new_cib; } void cib_delete(cib_t * cib) { GList *list = cib->notify_list; while (list != NULL) { cib_notify_client_t *client = g_list_nth_data(list, 0); list = g_list_remove(list, client); crm_free(client); } g_hash_table_destroy(cib_op_callback_table); cib_op_callback_table = NULL; cib->cmds->free(cib); cib = NULL; } int cib_client_set_op_callback(cib_t * cib, void (*callback) (const xmlNode * msg, int call_id, int rc, xmlNode * output)) { if (callback == NULL) { crm_info("Un-Setting operation callback"); } else { crm_trace("Setting operation callback"); } cib->op_callback = callback; return cib_ok; } int cib_client_add_notify_callback(cib_t * cib, const char *event, void (*callback) (const char *event, xmlNode * msg)) { GList *list_item = NULL; cib_notify_client_t *new_client = NULL; if (cib->variant != cib_native && cib->variant != cib_remote) { return cib_NOTSUPPORTED; } crm_trace("Adding callback for %s events (%d)", event, g_list_length(cib->notify_list)); crm_malloc0(new_client, sizeof(cib_notify_client_t)); new_client->event = event; new_client->callback = callback; list_item = g_list_find_custom(cib->notify_list, new_client, ciblib_GCompareFunc); if (list_item != NULL) { crm_warn("Callback already present"); crm_free(new_client); return cib_EXISTS; } else { cib->notify_list = g_list_append(cib->notify_list, new_client); cib->cmds->register_notification(cib, event, 1); crm_trace("Callback added (%d)", g_list_length(cib->notify_list)); } return cib_ok; } int cib_client_del_notify_callback(cib_t * cib, const char *event, void (*callback) (const char *event, xmlNode * msg)) { GList *list_item = NULL; cib_notify_client_t *new_client = NULL; if (cib->variant != cib_native && cib->variant != cib_remote) { return cib_NOTSUPPORTED; } crm_debug("Removing callback for %s events", event); crm_malloc0(new_client, sizeof(cib_notify_client_t)); new_client->event = event; new_client->callback = callback; list_item = g_list_find_custom(cib->notify_list, new_client, ciblib_GCompareFunc); cib->cmds->register_notification(cib, event, 0); if (list_item != NULL) { cib_notify_client_t *list_client = list_item->data; cib->notify_list = g_list_remove(cib->notify_list, list_client); crm_free(list_client); crm_trace("Removed callback"); } else { crm_trace("Callback not present"); } crm_free(new_client); return cib_ok; } gint ciblib_GCompareFunc(gconstpointer a, gconstpointer b) { int rc = 0; const cib_notify_client_t *a_client = a; const cib_notify_client_t *b_client = b; CRM_CHECK(a_client->event != NULL && b_client->event != NULL, return 0); rc = strcmp(a_client->event, b_client->event); if (rc == 0) { if (a_client->callback == b_client->callback) { return 0; } else if (((long)a_client->callback) < ((long)b_client->callback)) { crm_trace("callbacks for %s are not equal: %p < %p", a_client->event, a_client->callback, b_client->callback); return -1; } crm_trace("callbacks for %s are not equal: %p > %p", a_client->event, a_client->callback, b_client->callback); return 1; } return rc; } static gboolean cib_async_timeout_handler(gpointer data) { struct timer_rec_s *timer = data; crm_debug("Async call %d timed out after %ds", timer->call_id, timer->timeout); cib_native_callback(timer->cib, NULL, timer->call_id, cib_remote_timeout); /* Always return TRUE, never remove the handler * We do that in remove_cib_op_callback() */ return TRUE; } gboolean cib_client_register_callback(cib_t * cib, int call_id, int timeout, gboolean only_success, void *user_data, const char *callback_name, void (*callback) (xmlNode *, int, int, xmlNode *, void *)) { cib_callback_client_t *blob = NULL; if (call_id < 0) { if (only_success == FALSE) { callback(NULL, call_id, call_id, NULL, user_data); } else { crm_warn("CIB call failed: %s", cib_error2string(call_id)); } return FALSE; } crm_malloc0(blob, sizeof(cib_callback_client_t)); blob->id = callback_name; blob->only_success = only_success; blob->user_data = user_data; blob->callback = callback; if (timeout > 0) { struct timer_rec_s *async_timer = NULL; crm_malloc0(async_timer, sizeof(struct timer_rec_s)); blob->timer = async_timer; async_timer->cib = cib; async_timer->call_id = call_id; async_timer->timeout = timeout * 1000; async_timer->ref = g_timeout_add(async_timer->timeout, cib_async_timeout_handler, async_timer); } + crm_trace("Adding callback %s for call %d", callback_name, call_id); g_hash_table_insert(cib_op_callback_table, GINT_TO_POINTER(call_id), blob); return TRUE; } void remove_cib_op_callback(int call_id, gboolean all_callbacks) { if (all_callbacks) { if (cib_op_callback_table != NULL) { g_hash_table_destroy(cib_op_callback_table); } cib_op_callback_table = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, cib_destroy_op_callback); } else { g_hash_table_remove(cib_op_callback_table, GINT_TO_POINTER(call_id)); } } int num_cib_op_callbacks(void) { if (cib_op_callback_table == NULL) { return 0; } return g_hash_table_size(cib_op_callback_table); } static void cib_dump_pending_op(gpointer key, gpointer value, gpointer user_data) { int call = GPOINTER_TO_INT(key); cib_callback_client_t *blob = value; crm_debug("Call %d (%s): pending", call, crm_str(blob->id)); } void cib_dump_pending_callbacks(void) { if (cib_op_callback_table == NULL) { return; } return g_hash_table_foreach(cib_op_callback_table, cib_dump_pending_op, NULL); } diff --git a/lib/cib/cib_native.c b/lib/cib/cib_native.c index 60a7fc55c4..53d4f0ab89 100644 --- a/lib/cib/cib_native.c +++ b/lib/cib/cib_native.c @@ -1,678 +1,494 @@ /* * Copyright (c) 2004 International Business Machines * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA * */ #include #include #include #include #include #include #include #include #include #include -#include +#include #include typedef struct cib_native_opaque_s { - IPC_Channel *command_channel; - IPC_Channel *callback_channel; - GCHSource *callback_source; char *token; + crm_ipc_t *ipc; + void (*dnotify_fn) (gpointer user_data); + mainloop_ipc_t *source; } cib_native_opaque_t; int cib_native_perform_op(cib_t * cib, const char *op, const char *host, const char *section, xmlNode * data, xmlNode ** output_data, int call_options); int cib_native_perform_op_delegate(cib_t * cib, const char *op, const char *host, const char *section, xmlNode * data, xmlNode ** output_data, int call_options, const char *user_name); int cib_native_free(cib_t * cib); int cib_native_signoff(cib_t * cib); int cib_native_signon(cib_t * cib, const char *name, enum cib_conn_type type); -int cib_native_signon_raw(cib_t * cib, const char *name, enum cib_conn_type type, int *async_fd, - int *sync_fd); +int cib_native_signon_raw(cib_t * cib, const char *name, enum cib_conn_type type, int *async_fd, int *unused); -IPC_Channel *cib_native_channel(cib_t * cib); -gboolean cib_native_msgready(cib_t * cib); -gboolean cib_native_dispatch(IPC_Channel * channel, gpointer user_data); +bool cib_native_dispatch(cib_t * cib); -int cib_native_inputfd(cib_t * cib); -int cib_native_rcvmsg(cib_t * cib, int blocking); int cib_native_set_connection_dnotify(cib_t * cib, void (*dnotify) (gpointer user_data)); cib_t * cib_native_new(void) { cib_native_opaque_t *native = NULL; cib_t *cib = cib_new_variant(); crm_malloc0(native, sizeof(cib_native_opaque_t)); cib->variant = cib_native; cib->variant_opaque = native; - native->command_channel = NULL; - native->callback_channel = NULL; + native->ipc = NULL; + native->source = NULL; + native->dnotify_fn = NULL; /* assign variant specific ops */ cib->cmds->variant_op = cib_native_perform_op; cib->cmds->delegated_variant_op = cib_native_perform_op_delegate; cib->cmds->signon = cib_native_signon; cib->cmds->signon_raw = cib_native_signon_raw; cib->cmds->signoff = cib_native_signoff; cib->cmds->free = cib_native_free; - cib->cmds->inputfd = cib_native_inputfd; cib->cmds->register_notification = cib_native_register_notification; cib->cmds->set_connection_dnotify = cib_native_set_connection_dnotify; return cib; } int cib_native_signon(cib_t * cib, const char *name, enum cib_conn_type type) { return cib_native_signon_raw(cib, name, type, NULL, NULL); } +static int +cib_native_dispatch_internal(const char *buffer, ssize_t length, gpointer userdata) +{ + const char *type = NULL; + xmlNode *msg = NULL; + + cib_t * cib = userdata; + cib_native_opaque_t *native; + + crm_trace("dispatching %p", userdata); + + if (cib == NULL) { + crm_err("No CIB!"); + return 0; + } + + native = cib->variant_opaque; + msg = string2xml(buffer); + + if (msg == NULL) { + crm_warn("Received a NULL msg from CIB service."); + return 0; + } + + /* do callbacks */ + type = crm_element_value(msg, F_TYPE); + crm_trace("Activating %s callbacks...", type); + crm_log_xml_trace(msg, "cib-reply"); + + if (safe_str_eq(type, T_CIB)) { + cib_native_callback(cib, msg, 0, 0); + + } else if (safe_str_eq(type, T_CIB_NOTIFY)) { + g_list_foreach(cib->notify_list, cib_native_notify, msg); + + } else { + crm_err("Unknown message type: %s", type); + } + + free_xml(msg); + return 0; +} + +bool +cib_native_dispatch(cib_t * cib) +{ + gboolean stay_connected = TRUE; + cib_native_opaque_t *native; + + if (cib == NULL) { + crm_err("No CIB!"); + return FALSE; + } + + crm_trace("dispatching %p", cib); + native = cib->variant_opaque; + while(crm_ipc_ready(native->ipc)) { + + if(crm_ipc_read(native->ipc) > 0) { + const char *msg = crm_ipc_buffer(native->ipc); + cib_native_dispatch_internal(msg, strlen(msg), cib); + } + + if(crm_ipc_connected(native->ipc) == FALSE) { + crm_err("Connection closed"); + stay_connected = FALSE; + } + } + + return stay_connected; +} + +static void +cib_native_destroy(void *userdata) +{ + cib_t *cib = userdata; + cib_native_opaque_t *native = cib->variant_opaque; + + crm_trace("destroying %p", userdata); + cib->state = cib_disconnected; + + if(native->dnotify_fn) { + native->dnotify_fn(userdata); + } +} + int -cib_native_signon_raw(cib_t * cib, const char *name, enum cib_conn_type type, int *async_fd, - int *sync_fd) +cib_native_signon_raw(cib_t * cib, const char *name, enum cib_conn_type type, int *async_fd, int *unused) { int rc = cib_ok; - xmlNode *hello = NULL; - char *uuid_ticket = NULL; + const char *channel = NULL; cib_native_opaque_t *native = cib->variant_opaque; + static struct ipc_client_callbacks ipc_callbacks = + { + .dispatch = cib_native_dispatch_internal, + .destroy = cib_native_destroy + }; + crm_trace("Connecting command channel"); + cib->call_timeout = MAX_IPC_DELAY; if (type == cib_command) { cib->state = cib_connected_command; - native->command_channel = init_client_ipc_comms_nodispatch(cib_channel_rw); + channel = cib_channel_rw; } else if (type == cib_query) { cib->state = cib_connected_query; - native->command_channel = init_client_ipc_comms_nodispatch(cib_channel_ro); + channel = cib_channel_ro; } else { return cib_not_connected; } - if (native->command_channel == NULL) { - crm_debug("Connection to command channel failed"); - rc = cib_connection; + if (async_fd != NULL) { + native->ipc = crm_ipc_new(channel, 0); - } else if (native->command_channel->ch_status != IPC_CONNECT) { - crm_err("Connection may have succeeded," " but authentication to command channel failed"); - rc = cib_authentication; - } + if(native->ipc && crm_ipc_connect(native->ipc)) { + *async_fd = crm_ipc_get_fd(native->ipc); - if (rc == cib_ok) { - rc = get_channel_token(native->command_channel, &uuid_ticket); - if (rc == cib_ok) { - native->token = uuid_ticket; - uuid_ticket = NULL; + } else if(native->ipc) { + rc = cib_connection; } + + } else { + native->source = mainloop_add_ipc_client(channel, 0, cib, &ipc_callbacks); + native->ipc = mainloop_get_ipc_client(native->source); } - native->callback_channel = init_client_ipc_comms_nodispatch(cib_channel_callback); - if (native->callback_channel == NULL) { - crm_debug("Connection to callback channel failed"); + if (rc != cib_ok || native->ipc == NULL || crm_ipc_connected(native->ipc) == FALSE) { + crm_debug("Connection unsuccessful (%d %p)", rc, native->ipc); rc = cib_connection; - - } else if (native->callback_channel->ch_status != IPC_CONNECT) { - crm_err("Connection may have succeeded," " but authentication to command channel failed"); - rc = cib_authentication; } if (rc == cib_ok) { - native->callback_channel->send_queue->max_qlen = 500; - rc = get_channel_token(native->callback_channel, &uuid_ticket); - if (rc == cib_ok) { - crm_free(native->token); - native->token = uuid_ticket; - } - } + xmlNode *reply = NULL; + xmlNode *hello = create_xml_node(NULL, "stonith_command"); - if (rc == cib_ok) { - CRM_CHECK(native->token != NULL,;); - hello = cib_create_op(0, native->token, CRM_OP_REGISTER, NULL, NULL, NULL, 0, NULL); + crm_xml_add(hello, F_TYPE, T_CIB); + crm_xml_add(hello, F_CIB_OPERATION, CRM_OP_REGISTER); crm_xml_add(hello, F_CIB_CLIENTNAME, name); + crm_xml_add_int(hello, F_CIB_CALLOPTS, cib_sync_call); - if (send_ipc_message(native->command_channel, hello) == FALSE) { - rc = cib_callback_register; - } + if (crm_ipc_send(native->ipc, hello, &reply, -1) > 0) { + const char *msg_type = crm_element_value(reply, F_CIB_OPERATION); - free_xml(hello); - } + rc = cib_ok; + crm_log_xml_trace(reply, "reg-reply"); - if (rc == cib_ok) { - gboolean do_mainloop = TRUE; + if (safe_str_neq(msg_type, CRM_OP_REGISTER)) { + crm_err("Invalid registration message: %s", msg_type); + rc = cib_registration_msg; - if (async_fd != NULL) { - do_mainloop = FALSE; - *async_fd = native->callback_channel->ops->get_recv_select_fd(native->callback_channel); - } + } else { + native->token = crm_element_value_copy(reply, F_CIB_CLIENTID); + if (native->token == NULL) { + rc = cib_callback_token; + } + } - if (sync_fd != NULL) { - do_mainloop = FALSE; - *sync_fd = native->callback_channel->ops->get_send_select_fd(native->callback_channel); + } else { + rc = cib_callback_register; } - if (do_mainloop) { - crm_trace("Connecting callback channel"); - native->callback_source = - G_main_add_IPC_Channel(G_PRIORITY_HIGH, native->callback_channel, FALSE, - cib_native_dispatch, cib, default_ipc_connection_destroy); - - if (native->callback_source == NULL) { - crm_err("Callback source not recorded"); - rc = cib_connection; - } - } + free_xml(hello); } if (rc == cib_ok) { -#if HAVE_MSGFROMIPC_TIMEOUT - cib->call_timeout = MAX_IPC_DELAY; -#endif crm_debug("Connection to CIB successful"); return cib_ok; } crm_debug("Connection to CIB failed: %s", cib_error2string(rc)); cib_native_signoff(cib); return rc; } int cib_native_signoff(cib_t * cib) { cib_native_opaque_t *native = cib->variant_opaque; crm_debug("Signing out of the CIB Service"); - /* close channels */ - if (native->command_channel != NULL) { - native->command_channel->ops->destroy(native->command_channel); - native->command_channel = NULL; - } - - if (native->callback_source != NULL) { - G_main_del_IPC_Channel(native->callback_source); - native->callback_source = NULL; - } + if(native->source) { + mainloop_del_ipc_client(native->source); + native->source = NULL; + native->ipc = NULL; - if (native->callback_channel != NULL) { -#ifdef BUG - native->callback_channel->ops->destroy(native->callback_channel); -#endif - native->callback_channel = NULL; + } else if (native->ipc != NULL) { + crm_ipc_close(native->ipc); + crm_ipc_destroy(native->ipc); + native->ipc = NULL; } cib->state = cib_disconnected; cib->type = cib_none; return cib_ok; } int cib_native_free(cib_t * cib) { int rc = cib_ok; if (cib->state != cib_disconnected) { rc = cib_native_signoff(cib); } if (cib->state == cib_disconnected) { cib_native_opaque_t *native = cib->variant_opaque; crm_free(native->token); crm_free(cib->variant_opaque); crm_free(cib->cmds); crm_free(cib); } return rc; } -IPC_Channel * -cib_native_channel(cib_t * cib) -{ - cib_native_opaque_t *native = NULL; - - if (cib == NULL) { - crm_err("Missing cib object"); - return NULL; - } - - native = cib->variant_opaque; - - if (native != NULL) { - return native->callback_channel; - } - - crm_err("couldnt find variant specific data in %p", cib); - return NULL; -} - -int -cib_native_inputfd(cib_t * cib) -{ - IPC_Channel *ch = cib_native_channel(cib); - - return ch->ops->get_recv_select_fd(ch); -} - -static gboolean timer_expired = FALSE; - -#ifndef HAVE_MSGFROMIPC_TIMEOUT -static struct timer_rec_s sync_timer; -static gboolean -cib_timeout_handler(gpointer data) -{ - struct timer_rec_s *timer = data; - - timer_expired = TRUE; - crm_err("Call %d timed out after %ds", timer->call_id, timer->timeout); - - /* Always return TRUE, never remove the handler - * We do that after the while-loop in cib_native_perform_op() - */ - return TRUE; -} -#endif - int cib_native_perform_op(cib_t * cib, const char *op, const char *host, const char *section, xmlNode * data, xmlNode ** output_data, int call_options) { return cib_native_perform_op_delegate(cib, op, host, section, data, output_data, call_options, NULL); } int cib_native_perform_op_delegate(cib_t * cib, const char *op, const char *host, const char *section, xmlNode * data, xmlNode ** output_data, int call_options, const char *user_name) { - int rc = HA_OK; + int rc = cib_ok; + int reply_id = 0; xmlNode *op_msg = NULL; xmlNode *op_reply = NULL; cib_native_opaque_t *native = cib->variant_opaque; if (cib->state == cib_disconnected) { return cib_not_connected; } if (output_data != NULL) { *output_data = NULL; } if (op == NULL) { crm_err("No operation specified"); return cib_operation; } cib->call_id++; /* prevent call_id from being negative (or zero) and conflicting * with the cib_errors enum * use 2 because we use it as (cib->call_id - 1) below */ if (cib->call_id < 1) { cib->call_id = 1; } CRM_CHECK(native->token != NULL,;); op_msg = cib_create_op(cib->call_id, native->token, op, host, section, data, call_options, user_name); if (op_msg == NULL) { return cib_create_msg; } - crm_trace("Sending %s message to CIB service", op); - if (send_ipc_message(native->command_channel, op_msg) == FALSE) { - crm_err("Sending message to CIB service FAILED"); - free_xml(op_msg); - return cib_send_failed; - - } else { - crm_trace("Message sent"); - } - + crm_trace("Sending %s message to CIB service (timeout=%ds)", op, cib->call_timeout); + rc = crm_ipc_send(native->ipc, op_msg, &op_reply, cib->call_timeout * 1000); free_xml(op_msg); - if ((call_options & cib_discard_reply)) { - crm_trace("Discarding reply"); - return cib_ok; + if(rc < 0) { + crm_perror(LOG_ERR, "Couldn't perform %s operation (timeout=%ds): %d", op, cib->call_timeout, rc); + rc = cib_send_failed; + goto done; + } - } else if (!(call_options & cib_sync_call)) { - crm_trace("Async call, returning"); + crm_log_xml_trace(op_reply, "Reply"); + + if (!(call_options & cib_sync_call)) { + crm_trace("Async call, returning %d", cib->call_id); CRM_CHECK(cib->call_id != 0, return cib_reply_failed); - + free_xml(op_reply); return cib->call_id; } - rc = IPC_OK; - crm_trace("Waiting for a syncronous reply"); - -#ifndef HAVE_MSGFROMIPC_TIMEOUT - sync_timer.ref = 0; - if (cib->call_timeout > 0) { - timer_expired = FALSE; - sync_timer.call_id = cib->call_id; - sync_timer.timeout = cib->call_timeout * 1000; - sync_timer.ref = g_timeout_add(sync_timer.timeout, cib_timeout_handler, &sync_timer); - } -#endif rc = cib_ok; - while (timer_expired == FALSE && IPC_ISRCONN(native->command_channel)) { - int reply_id = -1; - int msg_id = cib->call_id; + crm_element_value_int(op_reply, F_CIB_CALLID, &reply_id); + if (reply_id == cib->call_id) { + xmlNode *tmp = get_message_xml(op_reply, F_CIB_CALLDATA); - op_reply = xmlfromIPC(native->command_channel, cib->call_timeout); - if (op_reply == NULL) { - rc = cib_remote_timeout; - break; + crm_trace("Syncronous reply %d received", reply_id); + if (crm_element_value_int(op_reply, F_CIB_RC, &rc) != 0) { + rc = cib_return_code; } - crm_element_value_int(op_reply, F_CIB_CALLID, &reply_id); - if (reply_id <= 0) { - rc = cib_reply_failed; - break; - - } else if (reply_id == msg_id) { - crm_trace("Syncronous reply received"); - if (crm_element_value_int(op_reply, F_CIB_RC, &rc) != 0) { - rc = cib_return_code; - } - - if (output_data != NULL && is_not_set(call_options, cib_discard_reply)) { - xmlNode *tmp = get_message_xml(op_reply, F_CIB_CALLDATA); - - if (tmp != NULL) { - *output_data = copy_xml(tmp); - } - } - - break; - - } else if (reply_id < msg_id) { - crm_debug("Received old reply: %d (wanted %d)", reply_id, msg_id); - crm_log_xml_trace(op_reply, "Old reply"); + if (output_data == NULL || (call_options & cib_discard_reply)) { + crm_trace("Discarding reply"); - } else if ((reply_id - 10000) > msg_id) { - /* wrap-around case */ - crm_debug("Received old reply: %d (wanted %d)", reply_id, msg_id); - crm_log_xml_trace(op_reply, "Old reply"); - - } else { - crm_err("Received a __future__ reply:" " %d (wanted %d)", reply_id, msg_id); + } else if (tmp != NULL) { + *output_data = copy_xml(tmp); } - free_xml(op_reply); - op_reply = NULL; - } - if (IPC_ISRCONN(native->command_channel) == FALSE) { - crm_err("CIB disconnected: %d", native->command_channel->ch_status); - cib->state = cib_disconnected; + } else if (reply_id <= 0) { + crm_err("Recieved bad reply: No id set"); + crm_log_xml_err(op_reply, "Bad reply"); + rc = cib_reply_failed; + goto done; + + } else { + crm_err("Recieved bad reply: %d (wanted %d)", reply_id, cib->call_id); + crm_log_xml_err(op_reply, "Old reply"); + rc = cib_reply_failed; + goto done; } - + if (op_reply == NULL && cib->state == cib_disconnected) { rc = cib_not_connected; } else if (rc == cib_ok && op_reply == NULL) { rc = cib_remote_timeout; } switch (rc) { case cib_ok: case cib_not_master: break; /* This is an internal value that clients do not and should not care about */ case cib_diff_resync: rc = cib_ok; break; /* These indicate internal problems */ case cib_return_code: case cib_reply_failed: case cib_master_timeout: crm_err("Call failed: %s", cib_error2string(rc)); if (op_reply) { crm_log_xml_err(op_reply, "Invalid reply"); } break; default: if (safe_str_neq(op, CIB_OP_QUERY)) { crm_warn("Call failed: %s", cib_error2string(rc)); } } -#ifndef HAVE_MSGFROMIPC_TIMEOUT - if (sync_timer.ref > 0) { - g_source_remove(sync_timer.ref); - sync_timer.ref = 0; + done: + if (crm_ipc_connected(native->ipc) == FALSE) { + crm_err("CIB disconnected"); + cib->state = cib_disconnected; } -#endif free_xml(op_reply); return rc; } -gboolean -cib_native_msgready(cib_t * cib) -{ - cib_native_opaque_t *native = NULL; - - if (cib == NULL) { - crm_err("No CIB!"); - return FALSE; - } - - native = cib->variant_opaque; - - if (native->command_channel != NULL) { - /* drain the channel */ - IPC_Channel *cmd_ch = native->command_channel; - xmlNode *cmd_msg = NULL; - - while (cmd_ch->ch_status != IPC_DISCONNECT && cmd_ch->ops->is_message_pending(cmd_ch)) { - /* this will happen when the CIB exited from beneath us */ - cmd_msg = xmlfromIPC(cmd_ch, MAX_IPC_DELAY); - free_xml(cmd_msg); - } - - } else { - crm_err("No command channel"); - } - - if (native->callback_channel == NULL) { - crm_err("No callback channel"); - return FALSE; - - } else if (native->callback_channel->ch_status == IPC_DISCONNECT) { - crm_info("Lost connection to the CIB service [%d].", native->callback_channel->farside_pid); - return FALSE; - - } else if (native->callback_channel->ops->is_message_pending(native->callback_channel)) { - crm_trace("Message pending on command channel [%d]", native->callback_channel->farside_pid); - return TRUE; - } - - crm_trace("No message pending"); - return FALSE; -} - -int -cib_native_rcvmsg(cib_t * cib, int blocking) -{ - const char *type = NULL; - xmlNode *msg = NULL; - cib_native_opaque_t *native = NULL; - - if (cib == NULL) { - crm_err("No CIB!"); - return FALSE; - } - - native = cib->variant_opaque; - - /* if it is not blocking mode and no message in the channel, return */ - if (blocking == 0 && cib_native_msgready(cib) == FALSE) { - crm_trace("No message ready and non-blocking..."); - return 0; - - } else if (cib_native_msgready(cib) == FALSE) { - crm_debug("Waiting for message from CIB service..."); - if (native->callback_channel == NULL) { - return -1; - - } else if (native->callback_channel->ch_status != IPC_CONNECT) { - return -2; - - } else if (native->command_channel && native->command_channel->ch_status != IPC_CONNECT) { - return -3; - } - native->callback_channel->ops->waitin(native->callback_channel); - } - - /* IPC_INTR is not a factor here */ - msg = xmlfromIPC(native->callback_channel, MAX_IPC_DELAY); - if (msg == NULL) { - crm_warn("Received a NULL msg from CIB service."); - return 0; - } - - /* do callbacks */ - type = crm_element_value(msg, F_TYPE); - crm_trace("Activating %s callbacks...", type); - - if (safe_str_eq(type, T_CIB)) { - cib_native_callback(cib, msg, 0, 0); - - } else if (safe_str_eq(type, T_CIB_NOTIFY)) { - g_list_foreach(cib->notify_list, cib_native_notify, msg); - - } else { - crm_err("Unknown message type: %s", type); - } - - free_xml(msg); - - return 1; -} - -gboolean -cib_native_dispatch(IPC_Channel * channel, gpointer user_data) -{ - cib_t *cib = user_data; - cib_native_opaque_t *native = NULL; - gboolean stay_connected = TRUE; - - CRM_CHECK(cib != NULL, return FALSE); - - native = cib->variant_opaque; - CRM_CHECK(native->callback_channel == channel, return FALSE); - - while (cib_native_msgready(cib)) { - /* invoke the callbacks but dont block */ - int rc = cib_native_rcvmsg(cib, 0); - - if (rc < 0) { - crm_err("Message acquisition failed: %d", rc); - break; - - } else if (rc == 0) { - break; - } - } - - if (native->callback_channel && native->callback_channel->ch_status != IPC_CONNECT) { - crm_crit("Lost connection to the CIB service [%d/callback].", channel->farside_pid); - native->callback_source = NULL; - stay_connected = FALSE; - } - - if (native->command_channel && native->command_channel->ch_status != IPC_CONNECT) { - crm_crit("Lost connection to the CIB service [%d/command].", channel->farside_pid); - native->callback_source = NULL; - stay_connected = FALSE; - } - - return stay_connected; -} - -static void -default_cib_connection_destroy(gpointer user_data) -{ - cib_t *cib = user_data; - - cib->state = cib_disconnected; -} - int cib_native_set_connection_dnotify(cib_t * cib, void (*dnotify) (gpointer user_data)) { cib_native_opaque_t *native = NULL; if (cib == NULL) { crm_err("No CIB!"); return FALSE; } native = cib->variant_opaque; + native->dnotify_fn = dnotify; - if (dnotify == NULL) { - crm_warn("Setting dnotify back to default value"); - set_IPC_Channel_dnotify(native->callback_source, default_cib_connection_destroy); - - } else { - crm_trace("Setting dnotify"); - set_IPC_Channel_dnotify(native->callback_source, dnotify); - } return cib_ok; } int cib_native_register_notification(cib_t * cib, const char *callback, int enabled) { + int rc = cib_ok; xmlNode *notify_msg = create_xml_node(NULL, "cib-callback"); cib_native_opaque_t *native = cib->variant_opaque; if (cib->state != cib_disconnected) { crm_xml_add(notify_msg, F_CIB_OPERATION, T_CIB_NOTIFY); crm_xml_add(notify_msg, F_CIB_NOTIFY_TYPE, callback); crm_xml_add_int(notify_msg, F_CIB_NOTIFY_ACTIVATE, enabled); - send_ipc_message(native->callback_channel, notify_msg); + rc = crm_ipc_send(native->ipc, notify_msg, NULL, 1000 * cib->call_timeout); + if(rc <= 0) { + crm_trace("Notification not registered: %d", rc); + rc = cib_send_failed; + } } free_xml(notify_msg); - return cib_ok; + return rc; } diff --git a/lib/cib/cib_utils.c b/lib/cib/cib_utils.c index 1c16e57c42..ffcc3361e6 100644 --- a/lib/cib/cib_utils.c +++ b/lib/cib/cib_utils.c @@ -1,1034 +1,991 @@ /* * Copyright (c) 2004 International Business Machines * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA * */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include struct config_root_s { const char *name; const char *parent; const char *path; }; /* * "//crm_config" will also work in place of "/cib/configuration/crm_config" * The / prefix means find starting from the root, whereas the // prefix means * find anywhere and risks multiple matches */ /* *INDENT-OFF* */ struct config_root_s known_paths[] = { { NULL, NULL, "//cib" }, { XML_TAG_CIB, NULL, "//cib" }, { XML_CIB_TAG_STATUS, "/cib", "//cib/status" }, { XML_CIB_TAG_CONFIGURATION,"/cib", "//cib/configuration" }, { XML_CIB_TAG_CRMCONFIG, "/cib/configuration", "//cib/configuration/crm_config" }, { XML_CIB_TAG_NODES, "/cib/configuration", "//cib/configuration/nodes" }, { XML_CIB_TAG_DOMAINS, "/cib/configuration", "//cib/configuration/domains" }, { XML_CIB_TAG_RESOURCES, "/cib/configuration", "//cib/configuration/resources" }, { XML_CIB_TAG_CONSTRAINTS, "/cib/configuration", "//cib/configuration/constraints" }, { XML_CIB_TAG_OPCONFIG, "/cib/configuration", "//cib/configuration/op_defaults" }, { XML_CIB_TAG_RSCCONFIG, "/cib/configuration", "//cib/configuration/rsc_defaults" }, { XML_CIB_TAG_ACLS, "/cib/configuration", "//cib/configuration/acls" }, { XML_TAG_FENCING_TOPOLOGY, "/cib/configuration", "//cib/configuration/fencing-topology" }, { XML_CIB_TAG_SECTION_ALL, NULL, "//cib" }, }; /* *INDENT-ON* */ const char * cib_error2string(enum cib_errors return_code) { const char *error_msg = NULL; switch (return_code) { case cib_bad_permissions: error_msg = "bad permissions for the on-disk configuration. shutdown heartbeat and repair."; break; case cib_bad_digest: error_msg = "the on-disk configuration was manually altered. shutdown heartbeat and repair."; break; case cib_bad_config: error_msg = "the on-disk configuration is not valid"; break; case cib_msg_field_add: error_msg = "failed adding field to cib message"; break; case cib_id_check: error_msg = "missing id or id-collision detected"; break; case cib_operation: error_msg = "invalid operation"; break; case cib_create_msg: error_msg = "couldnt create cib message"; break; case cib_client_gone: error_msg = "client left before we could send reply"; break; case cib_not_connected: error_msg = "not connected"; break; case cib_not_authorized: error_msg = "not authorized"; break; case cib_send_failed: error_msg = "send failed"; break; case cib_reply_failed: error_msg = "reply failed"; break; case cib_return_code: error_msg = "no return code"; break; case cib_output_ptr: error_msg = "nowhere to store output"; break; case cib_output_data: error_msg = "corrupt output data"; break; case cib_connection: error_msg = "connection failed"; break; case cib_callback_register: error_msg = "couldnt register callback channel"; break; case cib_authentication: error_msg = ""; break; case cib_registration_msg: error_msg = "invalid registration msg"; break; case cib_callback_token: error_msg = "callback token not found"; break; case cib_missing: error_msg = "cib object missing"; break; case cib_variant: error_msg = "unknown/corrupt cib variant"; break; case CIBRES_MISSING_ID: error_msg = "The id field is missing"; break; case CIBRES_MISSING_TYPE: error_msg = "The type field is missing"; break; case CIBRES_MISSING_FIELD: error_msg = "A required field is missing"; break; case CIBRES_OBJTYPE_MISMATCH: error_msg = "CIBRES_OBJTYPE_MISMATCH"; break; case cib_EXISTS: error_msg = "The object already exists"; break; case cib_NOTEXISTS: error_msg = "The object/attribute does not exist"; break; case CIBRES_CORRUPT: error_msg = "The CIB is corrupt"; break; case cib_NOOBJECT: error_msg = "The update was empty"; break; case cib_NOPARENT: error_msg = "The parent object does not exist"; break; case cib_NODECOPY: error_msg = "Failed while copying update"; break; case CIBRES_OTHER: error_msg = "CIBRES_OTHER"; break; case cib_ok: error_msg = "ok"; break; case cib_unknown: error_msg = "Unknown error"; break; case cib_STALE: error_msg = "Discarded old update"; break; case cib_ACTIVATION: error_msg = "Activation Failed"; break; case cib_NOSECTION: error_msg = "Required section was missing"; break; case cib_NOTSUPPORTED: error_msg = "The action/feature is not supported"; break; case cib_not_master: error_msg = "Local service is not the master instance"; break; case cib_client_corrupt: error_msg = "Service client not valid"; break; case cib_remote_timeout: error_msg = "Remote node did not respond"; break; case cib_master_timeout: error_msg = "No master service is currently active"; break; case cib_revision_unsupported: error_msg = "The required CIB revision number is not supported"; break; case cib_revision_unknown: error_msg = "The CIB revision number could not be determined"; break; case cib_missing_data: error_msg = "Required data for this CIB API call not found"; break; case cib_no_quorum: error_msg = "Write requires quorum"; break; case cib_diff_failed: error_msg = "Application of an update diff failed"; break; case cib_diff_resync: error_msg = "Application of an update diff failed, requesting a full refresh"; break; case cib_bad_section: error_msg = "Invalid CIB section specified"; break; case cib_old_data: error_msg = "Update was older than existing configuration"; break; case cib_dtd_validation: error_msg = "Update does not conform to the configured schema/DTD"; break; case cib_invalid_argument: error_msg = "Invalid argument"; break; case cib_transform_failed: error_msg = "Schema transform failed"; break; case cib_permission_denied: error_msg = "Permission Denied"; break; } if (error_msg == NULL) { crm_err("Unknown CIB Error Code: %d", return_code); error_msg = ""; } return error_msg; } int cib_section2enum(const char *a_section) { if (a_section == NULL || strcasecmp(a_section, "all") == 0) { return cib_section_all; } else if (strcasecmp(a_section, XML_CIB_TAG_NODES) == 0) { return cib_section_nodes; } else if (strcasecmp(a_section, XML_CIB_TAG_STATUS) == 0) { return cib_section_status; } else if (strcasecmp(a_section, XML_CIB_TAG_CONSTRAINTS) == 0) { return cib_section_constraints; } else if (strcasecmp(a_section, XML_CIB_TAG_RESOURCES) == 0) { return cib_section_resources; } else if (strcasecmp(a_section, XML_CIB_TAG_CRMCONFIG) == 0) { return cib_section_crmconfig; } crm_err("Unknown CIB section: %s", a_section); return cib_section_none; } int cib_compare_generation(xmlNode * left, xmlNode * right) { int lpc = 0; const char *attributes[] = { XML_ATTR_GENERATION_ADMIN, XML_ATTR_GENERATION, XML_ATTR_NUMUPDATES, }; crm_log_xml_trace(left, "left"); crm_log_xml_trace(right, "right"); for (lpc = 0; lpc < DIMOF(attributes); lpc++) { int int_elem_l = -1; int int_elem_r = -1; const char *elem_r = NULL; const char *elem_l = crm_element_value(left, attributes[lpc]); if (right != NULL) { elem_r = crm_element_value(right, attributes[lpc]); } if (elem_l != NULL) { int_elem_l = crm_parse_int(elem_l, NULL); } if (elem_r != NULL) { int_elem_r = crm_parse_int(elem_r, NULL); } if (int_elem_l < int_elem_r) { crm_trace("%s (%s < %s)", attributes[lpc], crm_str(elem_l), crm_str(elem_r)); return -1; } else if (int_elem_l > int_elem_r) { crm_trace("%s (%s > %s)", attributes[lpc], crm_str(elem_l), crm_str(elem_r)); return 1; } } return 0; } xmlNode * get_cib_copy(cib_t * cib) { xmlNode *xml_cib; int options = cib_scope_local | cib_sync_call; if (cib->cmds->query(cib, NULL, &xml_cib, options) != cib_ok) { crm_err("Couldnt retrieve the CIB"); return NULL; } else if (xml_cib == NULL) { crm_err("The CIB result was empty"); return NULL; } if (safe_str_eq(crm_element_name(xml_cib), XML_TAG_CIB)) { return xml_cib; } free_xml(xml_cib); return NULL; } xmlNode * cib_get_generation(cib_t * cib) { xmlNode *the_cib = get_cib_copy(cib); xmlNode *generation = create_xml_node(NULL, XML_CIB_TAG_GENERATION_TUPPLE); if (the_cib != NULL) { copy_in_properties(generation, the_cib); free_xml(the_cib); } return generation; } void log_cib_diff(int log_level, xmlNode * diff, const char *function) { int add_updates = 0; int add_epoch = 0; int add_admin_epoch = 0; int del_updates = 0; int del_epoch = 0; int del_admin_epoch = 0; if (diff == NULL) { return; } cib_diff_version_details(diff, &add_admin_epoch, &add_epoch, &add_updates, &del_admin_epoch, &del_epoch, &del_updates); if (add_updates != del_updates) { do_crm_log(log_level, "%s: Diff: --- %d.%d.%d", function, del_admin_epoch, del_epoch, del_updates); do_crm_log(log_level, "%s: Diff: +++ %d.%d.%d", function, add_admin_epoch, add_epoch, add_updates); } else if (diff != NULL) { do_crm_log(log_level, "%s: Local-only Change: %d.%d.%d", function, add_admin_epoch, add_epoch, add_updates); } log_xml_diff(log_level, diff, function); } gboolean cib_version_details(xmlNode * cib, int *admin_epoch, int *epoch, int *updates) { *epoch = -1; *updates = -1; *admin_epoch = -1; if (cib == NULL) { return FALSE; } else { crm_element_value_int(cib, XML_ATTR_GENERATION, epoch); crm_element_value_int(cib, XML_ATTR_NUMUPDATES, updates); crm_element_value_int(cib, XML_ATTR_GENERATION_ADMIN, admin_epoch); } return TRUE; } gboolean cib_diff_version_details(xmlNode * diff, int *admin_epoch, int *epoch, int *updates, int *_admin_epoch, int *_epoch, int *_updates) { xmlNode *tmp = NULL; tmp = find_xml_node(diff, "diff-added", FALSE); tmp = find_xml_node(tmp, XML_TAG_CIB, FALSE); cib_version_details(tmp, admin_epoch, epoch, updates); tmp = find_xml_node(diff, "diff-removed", FALSE); cib_version_details(tmp, _admin_epoch, _epoch, _updates); return TRUE; } /* * The caller should never free the return value */ const char * get_object_path(const char *object_type) { int lpc = 0; int max = DIMOF(known_paths); for (; lpc < max; lpc++) { if ((object_type == NULL && known_paths[lpc].name == NULL) || safe_str_eq(object_type, known_paths[lpc].name)) { return known_paths[lpc].path; } } return NULL; } const char * get_object_parent(const char *object_type) { int lpc = 0; int max = DIMOF(known_paths); for (; lpc < max; lpc++) { if (safe_str_eq(object_type, known_paths[lpc].name)) { return known_paths[lpc].parent; } } return NULL; } xmlNode * get_object_root(const char *object_type, xmlNode * the_root) { const char *xpath = get_object_path(object_type); if (xpath == NULL) { return the_root; /* or return NULL? */ } return get_xpath_object(xpath, the_root, LOG_DEBUG_4); } xmlNode * create_cib_fragment_adv(xmlNode * update, const char *update_section, const char *source) { xmlNode *cib = NULL; gboolean whole_cib = FALSE; xmlNode *object_root = NULL; char *local_section = NULL; /* crm_debug("Creating a blank fragment: %s", update_section); */ if (update == NULL && update_section == NULL) { crm_trace("Creating a blank fragment"); update = createEmptyCib(); crm_xml_add(cib, XML_ATTR_ORIGIN, source); return update; } else if (update == NULL) { crm_err("No update to create a fragment for"); return NULL; } CRM_CHECK(update_section != NULL, return NULL); if (safe_str_eq(crm_element_name(update), XML_TAG_CIB)) { whole_cib = TRUE; } if (whole_cib == FALSE) { cib = createEmptyCib(); crm_xml_add(cib, XML_ATTR_ORIGIN, source); object_root = get_object_root(update_section, cib); add_node_copy(object_root, update); } else { cib = copy_xml(update); crm_xml_add(cib, XML_ATTR_ORIGIN, source); } crm_free(local_section); crm_trace("Verifying created fragment"); return cib; } /* * It is the callers responsibility to free both the new CIB (output) * and the new CIB (input) */ xmlNode * createEmptyCib(void) { xmlNode *cib_root = NULL, *config = NULL; cib_root = create_xml_node(NULL, XML_TAG_CIB); config = create_xml_node(cib_root, XML_CIB_TAG_CONFIGURATION); create_xml_node(cib_root, XML_CIB_TAG_STATUS); /* crm_xml_add(cib_root, "version", "1"); */ create_xml_node(config, XML_CIB_TAG_CRMCONFIG); create_xml_node(config, XML_CIB_TAG_NODES); create_xml_node(config, XML_CIB_TAG_RESOURCES); create_xml_node(config, XML_CIB_TAG_CONSTRAINTS); return cib_root; } static unsigned int dtd_throttle = 0; void fix_cib_diff(xmlNode * last, xmlNode * next, xmlNode * local_diff, gboolean changed) { xmlNode *cib = NULL; xmlNode *diff_child = NULL; const char *tag = NULL; const char *value = NULL; tag = "diff-removed"; diff_child = find_xml_node(local_diff, tag, FALSE); if (diff_child == NULL) { diff_child = create_xml_node(local_diff, tag); } tag = XML_TAG_CIB; cib = find_xml_node(diff_child, tag, FALSE); if (cib == NULL) { cib = create_xml_node(diff_child, tag); } tag = XML_ATTR_GENERATION_ADMIN; value = crm_element_value(last, tag); crm_xml_add(diff_child, tag, value); if (changed) { crm_xml_add(cib, tag, value); } tag = XML_ATTR_GENERATION; value = crm_element_value(last, tag); crm_xml_add(diff_child, tag, value); if (changed) { crm_xml_add(cib, tag, value); } tag = XML_ATTR_NUMUPDATES; value = crm_element_value(last, tag); crm_xml_add(cib, tag, value); crm_xml_add(diff_child, tag, value); tag = "diff-added"; diff_child = find_xml_node(local_diff, tag, FALSE); if (diff_child == NULL) { diff_child = create_xml_node(local_diff, tag); } tag = XML_TAG_CIB; cib = find_xml_node(diff_child, tag, FALSE); if (cib == NULL) { cib = create_xml_node(diff_child, tag); } if (next) { xmlAttrPtr xIter = NULL; for (xIter = next->properties; xIter; xIter = xIter->next) { const char *p_name = (const char *)xIter->name; const char *p_value = crm_element_value(next, p_name); xmlSetProp(cib, (const xmlChar *)p_name, (const xmlChar *)p_value); } } crm_log_xml_trace(local_diff, "Repaired-diff"); } enum cib_errors cib_perform_op(const char *op, int call_options, cib_op_t * fn, gboolean is_query, const char *section, xmlNode * req, xmlNode * input, gboolean manage_counters, gboolean * config_changed, xmlNode * current_cib, xmlNode ** result_cib, xmlNode ** diff, xmlNode ** output) { int rc = cib_ok; gboolean check_dtd = TRUE; xmlNode *scratch = NULL; xmlNode *local_diff = NULL; const char *current_dtd = "unknown"; CRM_CHECK(output != NULL, return cib_output_data); CRM_CHECK(result_cib != NULL, return cib_output_data); CRM_CHECK(config_changed != NULL, return cib_output_data); *output = NULL; *result_cib = NULL; *config_changed = FALSE; if (fn == NULL) { return cib_operation; } if (is_query) { rc = (*fn) (op, call_options, section, req, input, current_cib, result_cib, output); return rc; } scratch = copy_xml(current_cib); rc = (*fn) (op, call_options, section, req, input, current_cib, &scratch, output); CRM_CHECK(current_cib != scratch, return cib_unknown); if (rc == cib_ok && scratch == NULL) { rc = cib_unknown; } if (rc == cib_ok && scratch) { const char *new_version = crm_element_value(scratch, XML_ATTR_CRM_VERSION); if (new_version && compare_version(new_version, CRM_FEATURE_SET) > 0) { crm_err("Discarding update with feature set '%s' greater than our own '%s'", new_version, CRM_FEATURE_SET); rc = cib_NOTSUPPORTED; } } if (rc == cib_ok && current_cib) { int old = 0; int new = 0; crm_element_value_int(scratch, XML_ATTR_GENERATION_ADMIN, &new); crm_element_value_int(current_cib, XML_ATTR_GENERATION_ADMIN, &old); if (old > new) { crm_err("%s went backwards: %d -> %d (Opts: 0x%x)", XML_ATTR_GENERATION_ADMIN, old, new, call_options); crm_log_xml_warn(req, "Bad Op"); crm_log_xml_warn(input, "Bad Data"); rc = cib_old_data; } else if (old == new) { crm_element_value_int(scratch, XML_ATTR_GENERATION, &new); crm_element_value_int(current_cib, XML_ATTR_GENERATION, &old); if (old > new) { crm_err("%s went backwards: %d -> %d (Opts: 0x%x)", XML_ATTR_GENERATION, old, new, call_options); crm_log_xml_warn(req, "Bad Op"); crm_log_xml_warn(input, "Bad Data"); rc = cib_old_data; } } } if (rc == cib_ok) { fix_plus_plus_recursive(scratch); current_dtd = crm_element_value(scratch, XML_ATTR_VALIDATION); if (manage_counters) { if (is_set(call_options, cib_inhibit_bcast) && safe_str_eq(section, XML_CIB_TAG_STATUS)) { /* Fast-track changes connections which wont be broadcasting anywhere */ cib_update_counter(scratch, XML_ATTR_NUMUPDATES, FALSE); goto done; } /* The diff calculation in cib_config_changed() accounts for 25% of the * CIB's total CPU usage on the DC * * RNG validation on the otherhand, accounts for only 9%... */ *config_changed = cib_config_changed(current_cib, scratch, &local_diff); if (*config_changed) { cib_update_counter(scratch, XML_ATTR_NUMUPDATES, TRUE); cib_update_counter(scratch, XML_ATTR_GENERATION, FALSE); } else { /* Previously we only did this if the diff detected a change * * But we replies are still sent, even if nothing changes so we * don't save any network traffic and means we need to jump * through expensive hoops to detect ordering changes - see below */ cib_update_counter(scratch, XML_ATTR_NUMUPDATES, FALSE); if (local_diff == NULL) { /* Nothing to check */ check_dtd = FALSE; /* Create a fake diff so that notifications, which include a _digest_, * will be sent to our peers * * This is the cheapest way to detect changes to group/set ordering * * Previously we compared the old and new digest in cib_config_changed(), * but that accounted for 15% of the CIB's total CPU usage on the DC */ local_diff = create_xml_node(NULL, "diff"); crm_xml_add(local_diff, XML_ATTR_CRM_VERSION, CRM_FEATURE_SET); create_xml_node(local_diff, "diff-removed"); create_xml_node(local_diff, "diff-added"); /* Usually these are attrd re-updates */ crm_log_xml_trace(req, "Non-change"); } else if (dtd_throttle++ % 20) { /* Throttle the amount of costly validation we perform due to status updates * a) we don't really care whats in the status section * b) we don't validate any of it's contents at the moment anyway */ check_dtd = FALSE; } } } } if (diff != NULL && local_diff != NULL) { /* Only fix the diff if we'll return it... */ fix_cib_diff(current_cib, scratch, local_diff, *config_changed); *diff = local_diff; local_diff = NULL; } done: if (rc == cib_ok && check_dtd && validate_xml(scratch, NULL, TRUE) == FALSE) { crm_warn("Updated CIB does not validate against %s schema/dtd", crm_str(current_dtd)); rc = cib_dtd_validation; } *result_cib = scratch; free_xml(local_diff); return rc; } -int -get_channel_token(IPC_Channel * ch, char **token) -{ - int rc = cib_ok; - xmlNode *reg_msg = NULL; - const char *msg_type = NULL; - const char *tmp_ticket = NULL; - - CRM_CHECK(ch != NULL, return cib_missing); - CRM_CHECK(token != NULL, return cib_output_ptr); - - crm_trace("Waiting for msg on command channel"); - - reg_msg = xmlfromIPC(ch, MAX_IPC_DELAY); - - if (ch->ops->get_chan_status(ch) != IPC_CONNECT) { - crm_err("No reply message - disconnected"); - free_xml(reg_msg); - return cib_not_connected; - - } else if (reg_msg == NULL) { - crm_err("No reply message - empty"); - return cib_reply_failed; - } - - msg_type = crm_element_value(reg_msg, F_CIB_OPERATION); - tmp_ticket = crm_element_value(reg_msg, F_CIB_CLIENTID); - - if (safe_str_neq(msg_type, CRM_OP_REGISTER)) { - crm_err("Invalid registration message: %s", msg_type); - rc = cib_registration_msg; - - } else if (tmp_ticket == NULL) { - rc = cib_callback_token; - - } else { - *token = crm_strdup(tmp_ticket); - } - - free_xml(reg_msg); - return rc; -} - xmlNode * cib_create_op(int call_id, const char *token, const char *op, const char *host, const char *section, xmlNode * data, int call_options, const char *user_name) { int rc = HA_OK; xmlNode *op_msg = create_xml_node(NULL, "cib_command"); CRM_CHECK(op_msg != NULL, return NULL); CRM_CHECK(token != NULL, return NULL); crm_xml_add(op_msg, F_XML_TAGNAME, "cib_command"); crm_xml_add(op_msg, F_TYPE, T_CIB); crm_xml_add(op_msg, F_CIB_CALLBACK_TOKEN, token); crm_xml_add(op_msg, F_CIB_OPERATION, op); crm_xml_add(op_msg, F_CIB_HOST, host); crm_xml_add(op_msg, F_CIB_SECTION, section); crm_xml_add_int(op_msg, F_CIB_CALLID, call_id); #if ENABLE_ACL if (user_name) { crm_xml_add(op_msg, F_CIB_USER, user_name); } #endif crm_trace("Sending call options: %.8lx, %d", (long)call_options, call_options); crm_xml_add_int(op_msg, F_CIB_CALLOPTS, call_options); if (data != NULL) { add_message_xml(op_msg, F_CIB_CALLDATA, data); } if (rc != HA_OK) { crm_err("Failed to create CIB operation message"); crm_log_xml_err(op_msg, "op"); free_xml(op_msg); return NULL; } if (call_options & cib_inhibit_bcast) { CRM_CHECK((call_options & cib_scope_local), return NULL); } return op_msg; } void cib_native_callback(cib_t * cib, xmlNode * msg, int call_id, int rc) { xmlNode *output = NULL; cib_callback_client_t *blob = NULL; cib_callback_client_t local_blob; local_blob.id = NULL; local_blob.callback = NULL; local_blob.user_data = NULL; local_blob.only_success = FALSE; if (msg != NULL) { crm_element_value_int(msg, F_CIB_RC, &rc); crm_element_value_int(msg, F_CIB_CALLID, &call_id); output = get_message_xml(msg, F_CIB_CALLDATA); } blob = g_hash_table_lookup(cib_op_callback_table, GINT_TO_POINTER(call_id)); if (blob != NULL) { local_blob = *blob; blob = NULL; remove_cib_op_callback(call_id, FALSE); } else { crm_trace("No callback found for call %d", call_id); local_blob.callback = NULL; } if (cib == NULL) { crm_debug("No cib object supplied"); } if (rc == cib_diff_resync) { /* This is an internal value that clients do not and should not care about */ rc = cib_ok; } if (local_blob.callback != NULL && (rc == cib_ok || local_blob.only_success == FALSE)) { crm_trace("Invoking callback %s for call %d", crm_str(local_blob.id), call_id); local_blob.callback(msg, call_id, rc, output, local_blob.user_data); } else if (cib && cib->op_callback == NULL && rc != cib_ok) { crm_warn("CIB command failed: %s", cib_error2string(rc)); crm_log_xml_debug(msg, "Failed CIB Update"); } if (cib && cib->op_callback != NULL) { crm_trace("Invoking global callback for call %d", call_id); cib->op_callback(msg, call_id, rc, output); } crm_trace("OP callback activated."); } void cib_native_notify(gpointer data, gpointer user_data) { xmlNode *msg = user_data; cib_notify_client_t *entry = data; const char *event = NULL; if (msg == NULL) { crm_warn("Skipping callback - NULL message"); return; } event = crm_element_value(msg, F_SUBTYPE); if (entry == NULL) { crm_warn("Skipping callback - NULL callback client"); return; } else if (entry->callback == NULL) { crm_warn("Skipping callback - NULL callback"); return; } else if (safe_str_neq(entry->event, event)) { crm_trace("Skipping callback - event mismatch %p/%s vs. %s", entry, entry->event, event); return; } crm_trace("Invoking callback for %p/%s event...", entry, event); entry->callback(event, msg); crm_trace("Callback invoked..."); } gboolean determine_host(cib_t * cib_conn, char **node_uname, char **node_uuid) { CRM_CHECK(node_uname != NULL, return FALSE); if (*node_uname == NULL) { struct utsname name; if (uname(&name) < 0) { crm_perror(LOG_ERR, "uname(2) call failed"); return FALSE; } *node_uname = crm_strdup(name.nodename); crm_info("Detected uname: %s", *node_uname); } if (cib_conn && *node_uname != NULL && node_uuid != NULL && *node_uuid == NULL) { int rc = query_node_uuid(cib_conn, *node_uname, node_uuid); if (rc != cib_ok) { fprintf(stderr, "Could not map uname=%s to a UUID: %s\n", *node_uname, cib_error2string(rc)); return FALSE; } crm_info("Mapped %s to %s", *node_uname, crm_str(*node_uuid)); } return TRUE; } pe_cluster_option cib_opts[] = { /* name, old-name, validate, default, description */ {"enable-acl", NULL, "boolean", NULL, "false", &check_boolean, "Enable CIB ACL", NULL} , }; void cib_metadata(void) { config_metadata("Cluster Information Base", "1.0", "Cluster Information Base Options", "This is a fake resource that details the options that can be configured for the Cluster Information Base.", cib_opts, DIMOF(cib_opts)); } void verify_cib_options(GHashTable * options) { verify_all_options(options, cib_opts, DIMOF(cib_opts)); } const char * cib_pref(GHashTable * options, const char *name) { return get_cluster_pref(options, cib_opts, DIMOF(cib_opts), name); } gboolean cib_read_config(GHashTable * options, xmlNode * current_cib) { xmlNode *config = NULL; ha_time_t *now = NULL; if (options == NULL || current_cib == NULL) { return FALSE; } now = new_ha_date(TRUE); g_hash_table_remove_all(options); config = get_object_root(XML_CIB_TAG_CRMCONFIG, current_cib); if (config) { unpack_instance_attributes(current_cib, config, XML_CIB_TAG_PROPSET, NULL, options, CIB_OPTIONS_FIRST, FALSE, now); } verify_cib_options(options); free_ha_date(now); return TRUE; } gboolean cib_internal_config_changed(xmlNode * diff) { gboolean changed = FALSE; const char *config_xpath = "//" XML_TAG_CIB "/" XML_CIB_TAG_CONFIGURATION "/" XML_CIB_TAG_CRMCONFIG; xmlXPathObject *xpathObj = NULL; if (diff == NULL) { return FALSE; } xpathObj = xpath_search(diff, config_xpath); if (xpathObj && xpathObj->nodesetval->nodeNr > 0) { changed = TRUE; } if (xpathObj) { xmlXPathFreeObject(xpathObj); } return changed; }