diff --git a/cib/callbacks.c b/cib/callbacks.c index 2585d47284..0a44c92ed2 100644 --- a/cib/callbacks.c +++ b/cib/callbacks.c @@ -1,1420 +1,1419 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include -#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "common.h" extern GMainLoop *mainloop; extern gboolean cib_shutdown_flag; extern gboolean stand_alone; extern const char *cib_root; static unsigned long cib_local_bcast_num = 0; typedef struct cib_local_notify_s { xmlNode *notify_src; char *client_id; gboolean from_peer; gboolean sync_reply; } cib_local_notify_t; qb_ipcs_service_t *ipcs_ro = NULL; qb_ipcs_service_t *ipcs_rw = NULL; qb_ipcs_service_t *ipcs_shm = NULL; extern crm_cluster_t crm_cluster; extern int cib_update_counter(xmlNode * xml_obj, const char *field, gboolean reset); extern void GHFunc_count_peers(gpointer key, gpointer value, gpointer user_data); gint cib_GCompareFunc(gconstpointer a, gconstpointer b); gboolean can_write(int flags); void send_cib_replace(const xmlNode * sync_request, const char *host); void cib_process_request(xmlNode * request, gboolean privileged, gboolean force_synchronous, gboolean from_peer, crm_client_t * cib_client); extern GHashTable *local_notify_queue; int next_client_id = 0; extern const char *cib_our_uname; extern unsigned long cib_num_ops, cib_num_local, cib_num_updates, cib_num_fail; extern unsigned long cib_bad_connects, cib_num_timeouts; extern int cib_status; int cib_process_command(xmlNode * request, xmlNode ** reply, xmlNode ** cib_diff, gboolean privileged); gboolean cib_common_callback(qb_ipcs_connection_t *c, void *data, size_t size, gboolean privileged); static int32_t cib_ipc_accept(qb_ipcs_connection_t *c, uid_t uid, gid_t gid) { crm_trace("Connection %p", c); if (cib_shutdown_flag) { crm_info("Ignoring new client [%d] during shutdown", crm_ipcs_client_pid(c)); return -EPERM; } if(crm_client_new(c, uid, gid) == NULL) { return -EIO; } return 0; } static void cib_ipc_created(qb_ipcs_connection_t *c) { crm_trace("Connection %p", c); } static int32_t cib_ipc_dispatch_rw(qb_ipcs_connection_t *c, void *data, size_t size) { crm_client_t *client = crm_client_get(c); crm_trace("%p message from %s", c, client->id); return cib_common_callback(c, data, size, TRUE); } static int32_t cib_ipc_dispatch_ro(qb_ipcs_connection_t *c, void *data, size_t size) { crm_client_t *client = crm_client_get(c); crm_trace("%p message from %s", c, client->id); return cib_common_callback(c, data, size, FALSE); } /* Error code means? */ static int32_t cib_ipc_closed(qb_ipcs_connection_t *c) { crm_client_t *client = crm_client_get(c); crm_trace("Connection %p", c); crm_client_destroy(client); return 0; } static void cib_ipc_destroy(qb_ipcs_connection_t *c) { crm_trace("Connection %p", c); if (cib_shutdown_flag) { cib_shutdown(0); } } struct qb_ipcs_service_handlers ipc_ro_callbacks = { .connection_accept = cib_ipc_accept, .connection_created = cib_ipc_created, .msg_process = cib_ipc_dispatch_ro, .connection_closed = cib_ipc_closed, .connection_destroyed = cib_ipc_destroy }; struct qb_ipcs_service_handlers ipc_rw_callbacks = { .connection_accept = cib_ipc_accept, .connection_created = cib_ipc_created, .msg_process = cib_ipc_dispatch_rw, .connection_closed = cib_ipc_closed, .connection_destroyed = cib_ipc_destroy }; void cib_common_callback_worker(uint32_t id, uint32_t flags, xmlNode * op_request, crm_client_t * cib_client, gboolean privileged) { const char *op = crm_element_value(op_request, F_CIB_OPERATION); if (crm_str_eq(op, CRM_OP_REGISTER, TRUE)) { if(flags & crm_ipc_client_response) { xmlNode *ack = create_xml_node(NULL, __FUNCTION__); crm_xml_add(ack, F_CIB_OPERATION, CRM_OP_REGISTER); crm_xml_add(ack, F_CIB_CLIENTID, cib_client->id); crm_ipcs_send(cib_client, id, ack, FALSE); cib_client->request_id = 0; free_xml(ack); } return; } else if (crm_str_eq(op, T_CIB_NOTIFY, TRUE)) { /* Update the notify filters for this client */ int on_off = 0; long long bit = 0; const char *type = crm_element_value(op_request, F_CIB_NOTIFY_TYPE); crm_element_value_int(op_request, F_CIB_NOTIFY_ACTIVATE, &on_off); crm_debug("Setting %s callbacks for %s (%s): %s", type, cib_client->name, cib_client->id, on_off ? "on" : "off"); if (safe_str_eq(type, T_CIB_POST_NOTIFY)) { bit = cib_notify_post; } else if (safe_str_eq(type, T_CIB_PRE_NOTIFY)) { bit = cib_notify_pre; } else if (safe_str_eq(type, T_CIB_UPDATE_CONFIRM)) { bit = cib_notify_confirm; } else if (safe_str_eq(type, T_CIB_DIFF_NOTIFY)) { bit = cib_notify_diff; } else if (safe_str_eq(type, T_CIB_REPLACE_NOTIFY)) { bit = cib_notify_replace; } if(on_off) { set_bit(cib_client->options, bit); } else { clear_bit(cib_client->options, bit); } if(flags & crm_ipc_client_response) { /* TODO - include rc */ crm_ipcs_send_ack(cib_client, id, "ack", __FUNCTION__, __LINE__); cib_client->request_id = 0; } return; } cib_process_request(op_request, FALSE, privileged, FALSE, cib_client); } int32_t cib_common_callback(qb_ipcs_connection_t *c, void *data, size_t size, gboolean privileged) { uint32_t id = 0; uint32_t flags = 0; int call_options = 0; crm_client_t *cib_client = crm_client_get(c); xmlNode *op_request = crm_ipcs_recv(cib_client, data, size, &id, &flags); if(op_request) { crm_element_value_int(op_request, F_CIB_CALLOPTS, &call_options); } crm_trace("Inbound: %.200s", data); if (op_request == NULL || cib_client == NULL) { crm_ipcs_send_ack(cib_client, id, "nack", __FUNCTION__, __LINE__); return 0; } if(is_set(call_options, cib_sync_call)) { CRM_ASSERT(flags & crm_ipc_client_response); } if(flags & crm_ipc_client_response) { CRM_LOG_ASSERT(cib_client->request_id == 0); /* This means the client has two synchronous events in-flight */ cib_client->request_id = id; /* Reply only to the last one */ } if (cib_client->name == NULL) { const char *value = crm_element_value(op_request, F_CIB_CLIENTNAME); if (value == NULL) { cib_client->name = crm_itoa(cib_client->pid); } else { cib_client->name = strdup(value); } } crm_xml_add(op_request, F_CIB_CLIENTID, cib_client->id); crm_xml_add(op_request, F_CIB_CLIENTNAME, cib_client->name); #if ENABLE_ACL determine_request_user(cib_client->user, op_request, F_CIB_USER); #endif crm_log_xml_trace(op_request, "Client[inbound]"); cib_common_callback_worker(id, flags, op_request, cib_client, privileged); free_xml(op_request); return 0; } static void do_local_notify(xmlNode * notify_src, const char *client_id, gboolean sync_reply, gboolean from_peer) { /* send callback to originating child */ crm_client_t *client_obj = NULL; int local_rc = pcmk_ok; if (client_id != NULL) { client_obj = crm_client_get_by_id(client_id); } if (client_obj == NULL) { local_rc = -ECONNRESET; crm_trace("No client to sent the response to. F_CIB_CLIENTID not set."); } else { int rid = 0; if(sync_reply) { if (client_obj->ipcs) { CRM_LOG_ASSERT(client_obj->request_id); rid = client_obj->request_id; client_obj->request_id = 0; crm_trace("Sending response %d to %s %s", rid, client_obj->name, from_peer?"(originator of delegated request)":""); } else { crm_trace("Sending response to %s %s", client_obj->name, from_peer?"(originator of delegated request)":""); } } else { crm_trace("Sending an event to %s %s", client_obj->name, from_peer?"(originator of delegated request)":""); } switch(client_obj->kind) { case CRM_CLIENT_IPC: if (crm_ipcs_send(client_obj, rid, notify_src, !sync_reply) < 0) { local_rc = -ENOMSG; } break; case CRM_CLIENT_TLS: case CRM_CLIENT_TCP: crm_remote_send(client_obj->remote, notify_src); break; default: crm_err("Unknown transport %d for %s", client_obj->kind, client_obj->name); } } if (local_rc != pcmk_ok && client_obj != NULL) { crm_warn("%sSync reply to %s failed: %s", sync_reply ? "" : "A-", client_obj ? client_obj->name : "", pcmk_strerror(local_rc)); } } static void local_notify_destroy_callback(gpointer data) { cib_local_notify_t *notify = data; free_xml(notify->notify_src); free(notify->client_id); free(notify); } static void check_local_notify(int bcast_id) { cib_local_notify_t *notify = NULL; if (!local_notify_queue) { return; } notify = g_hash_table_lookup(local_notify_queue, GINT_TO_POINTER(bcast_id)); if (notify) { do_local_notify(notify->notify_src, notify->client_id, notify->sync_reply, notify->from_peer); g_hash_table_remove(local_notify_queue, GINT_TO_POINTER(bcast_id)); } } static void queue_local_notify(xmlNode * notify_src, const char *client_id, gboolean sync_reply, gboolean from_peer) { cib_local_notify_t *notify = calloc(1, sizeof(cib_local_notify_t)); notify->notify_src = notify_src; notify->client_id = strdup(client_id); notify->sync_reply = sync_reply; notify->from_peer = from_peer; if (!local_notify_queue) { local_notify_queue = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, local_notify_destroy_callback); } g_hash_table_insert(local_notify_queue, GINT_TO_POINTER(cib_local_bcast_num), notify); } static void parse_local_options(crm_client_t * cib_client, int call_type, int call_options, const char *host, const char *op, gboolean * local_notify, gboolean * needs_reply, gboolean * process, gboolean * needs_forward) { if (cib_op_modifies(call_type) && !(call_options & cib_inhibit_bcast)) { /* we need to send an update anyway */ *needs_reply = TRUE; } else { *needs_reply = FALSE; } if (host == NULL && (call_options & cib_scope_local)) { crm_trace("Processing locally scoped %s op from %s", op, cib_client->name); *local_notify = TRUE; } else if (host == NULL && cib_is_master) { crm_trace("Processing master %s op locally from %s", op, cib_client->name); *local_notify = TRUE; } else if (safe_str_eq(host, cib_our_uname)) { crm_trace("Processing locally addressed %s op from %s", op, cib_client->name); *local_notify = TRUE; } else if (stand_alone) { *needs_forward = FALSE; *local_notify = TRUE; *process = TRUE; } else { crm_trace("%s op from %s needs to be forwarded to %s", op, cib_client->name, host ? host : "the master instance"); *needs_forward = TRUE; *process = FALSE; } } static gboolean parse_peer_options(int call_type, xmlNode * request, gboolean * local_notify, gboolean * needs_reply, gboolean * process, gboolean * needs_forward) { const char *op = NULL; const char *host = NULL; const char *delegated = NULL; const char *originator = crm_element_value(request, F_ORIG); const char *reply_to = crm_element_value(request, F_CIB_ISREPLY); const char *update = crm_element_value(request, F_CIB_GLOBAL_UPDATE); gboolean is_reply = safe_str_eq(reply_to, cib_our_uname); if (crm_is_true(update)) { *needs_reply = FALSE; if (is_reply) { *local_notify = TRUE; crm_trace("Processing global/peer update from %s" " that originated from us", originator); } else { crm_trace("Processing global/peer update from %s", originator); } return TRUE; } host = crm_element_value(request, F_CIB_HOST); if (host != NULL && safe_str_eq(host, cib_our_uname)) { crm_trace("Processing request sent to us from %s", originator); return TRUE; } else if (host == NULL && cib_is_master == TRUE) { crm_trace("Processing request sent to master instance from %s", originator); return TRUE; } op = crm_element_value(request, F_CIB_OPERATION); if(safe_str_eq(op, "cib_shutdown_req")) { /* Always process these */ *local_notify = FALSE; if(reply_to == NULL || is_reply) { *process = TRUE; } if(is_reply) { *needs_reply = FALSE; } return *process; } if (is_reply) { crm_trace("Forward reply sent from %s to local clients", originator); *process = FALSE; *needs_reply = FALSE; *local_notify = TRUE; return TRUE; } delegated = crm_element_value(request, F_CIB_DELEGATED); if (delegated != NULL) { crm_trace("Ignoring msg for master instance"); } else if (host != NULL) { /* this is for a specific instance and we're not it */ crm_trace("Ignoring msg for instance on %s", crm_str(host)); } else if (reply_to == NULL && cib_is_master == FALSE) { /* this is for the master instance and we're not it */ crm_trace("Ignoring reply to %s", crm_str(reply_to)); } else if (safe_str_eq(op, "cib_shutdown_req")) { if (reply_to != NULL) { crm_debug("Processing %s from %s", op, host); *needs_reply = FALSE; } else { crm_debug("Processing %s reply from %s", op, host); } return TRUE; } else { crm_err("Nothing for us to do?"); crm_log_xml_err(request, "Peer[inbound]"); } return FALSE; } static void forward_request(xmlNode * request, crm_client_t * cib_client, int call_options) { const char *op = crm_element_value(request, F_CIB_OPERATION); const char *host = crm_element_value(request, F_CIB_HOST); crm_xml_add(request, F_CIB_DELEGATED, cib_our_uname); if (host != NULL) { crm_trace("Forwarding %s op to %s", op, host); send_cluster_message(crm_get_peer(0, host), crm_msg_cib, request, FALSE); } else { crm_trace("Forwarding %s op to master instance", op); send_cluster_message(NULL, crm_msg_cib, request, FALSE); } /* Return the request to its original state */ xml_remove_prop(request, F_CIB_DELEGATED); if (call_options & cib_discard_reply) { crm_trace("Client not interested in reply"); } } static gboolean send_peer_reply(xmlNode * msg, xmlNode * result_diff, const char *originator, gboolean broadcast) { CRM_ASSERT(msg != NULL); if (broadcast) { /* this (successful) call modified the CIB _and_ the * change needs to be broadcast... * send via HA to other nodes */ int diff_add_updates = 0; int diff_add_epoch = 0; int diff_add_admin_epoch = 0; int diff_del_updates = 0; int diff_del_epoch = 0; int diff_del_admin_epoch = 0; const char *digest = NULL; CRM_LOG_ASSERT(result_diff != NULL); digest = crm_element_value(result_diff, XML_ATTR_DIGEST); cib_diff_version_details(result_diff, &diff_add_admin_epoch, &diff_add_epoch, &diff_add_updates, &diff_del_admin_epoch, &diff_del_epoch, &diff_del_updates); crm_trace("Sending update diff %d.%d.%d -> %d.%d.%d %s", diff_del_admin_epoch, diff_del_epoch, diff_del_updates, diff_add_admin_epoch, diff_add_epoch, diff_add_updates, digest); crm_xml_add(msg, F_CIB_ISREPLY, originator); crm_xml_add(msg, F_CIB_GLOBAL_UPDATE, XML_BOOLEAN_TRUE); crm_xml_add(msg, F_CIB_OPERATION, CIB_OP_APPLY_DIFF); CRM_ASSERT(digest != NULL); add_message_xml(msg, F_CIB_UPDATE_DIFF, result_diff); crm_log_xml_explicit(msg, "copy"); return send_cluster_message(NULL, crm_msg_cib, msg, TRUE); } else if (originator != NULL) { /* send reply via HA to originating node */ crm_trace("Sending request result to originator only"); crm_xml_add(msg, F_CIB_ISREPLY, originator); return send_cluster_message(crm_get_peer(0, originator), crm_msg_cib, msg, FALSE); } return FALSE; } void cib_process_request(xmlNode * request, gboolean force_synchronous, gboolean privileged, gboolean unused, crm_client_t * cib_client) { int call_type = 0; int call_options = 0; gboolean process = TRUE; gboolean is_update = TRUE; gboolean from_peer = TRUE; gboolean needs_reply = TRUE; gboolean local_notify = FALSE; gboolean needs_forward = FALSE; gboolean global_update = crm_is_true(crm_element_value(request, F_CIB_GLOBAL_UPDATE)); xmlNode *op_reply = NULL; xmlNode *result_diff = NULL; int rc = pcmk_ok; const char *op = crm_element_value(request, F_CIB_OPERATION); const char *originator = crm_element_value(request, F_ORIG); const char *host = crm_element_value(request, F_CIB_HOST); const char *target = NULL; const char *client_id = crm_element_value(request, F_CIB_CLIENTID); if(cib_client) { from_peer = FALSE; } cib_num_ops++; if (cib_num_ops == 0) { cib_num_fail = 0; cib_num_local = 0; cib_num_updates = 0; crm_info("Stats wrapped around"); } crm_element_value_int(request, F_CIB_CALLOPTS, &call_options); if (force_synchronous) { call_options |= cib_sync_call; } if (host != NULL && strlen(host) == 0) { host = NULL; } if(host) { target = host; } else if(call_options & cib_scope_local) { target = "local host"; } else { target = "master"; } if(from_peer) { crm_trace("Processing peer %s operation from %s on %s intended for %s", op, client_id, originator, target); } else { crm_xml_add(request, F_ORIG, cib_our_uname); crm_trace("Processing local %s operation from %s intended for %s", op, client_id, target); } rc = cib_get_operation_id(op, &call_type); if (rc != pcmk_ok) { /* TODO: construct error reply? */ crm_err("Pre-processing of command failed: %s", pcmk_strerror(rc)); return; } if (from_peer == FALSE) { parse_local_options(cib_client, call_type, call_options, host, op, &local_notify, &needs_reply, &process, &needs_forward); } else if (parse_peer_options(call_type, request, &local_notify, &needs_reply, &process, &needs_forward) == FALSE) { return; } is_update = cib_op_modifies(call_type); if (is_update) { cib_num_updates++; } if (call_options & cib_discard_reply) { needs_reply = is_update; local_notify = FALSE; } if (needs_forward) { const char *host = crm_element_value(request, F_CIB_HOST); const char *section = crm_element_value(request, F_CIB_SECTION); crm_info("Forwarding %s operation for section %s to %s (origin=%s/%s/%s)", op, section ? section : "'all'", host ? host : "master", originator ? originator : "local", crm_element_value(request, F_CIB_CLIENTNAME), crm_element_value(request, F_CIB_CALLID)); forward_request(request, cib_client, call_options); return; } if (cib_status != pcmk_ok) { rc = cib_status; crm_err("Operation ignored, cluster configuration is invalid." " Please repair and restart: %s", pcmk_strerror(cib_status)); op_reply = cib_construct_reply(request, the_cib, cib_status); } else if (process) { int now = time(NULL); int level = LOG_INFO; const char *section = crm_element_value(request, F_CIB_SECTION); cib_num_local++; rc = cib_process_command(request, &op_reply, &result_diff, privileged); if (global_update) { switch (rc) { case pcmk_ok: level = LOG_INFO; break; case -pcmk_err_old_data: case -pcmk_err_diff_resync: case -pcmk_err_diff_failed: level = LOG_TRACE; break; default: level = LOG_ERR; } } else if (rc != pcmk_ok && is_update) { cib_num_fail++; level = LOG_WARNING; /* } else if (safe_str_eq(op, CIB_OP_QUERY)) { level = LOG_TRACE; } else if (safe_str_eq(op, CIB_OP_SLAVE)) { level = LOG_TRACE; } else if (safe_str_eq(section, XML_CIB_TAG_STATUS)) { level = LOG_TRACE; */ } do_crm_log(level, "Completed %s operation for section %s: %s (rc=%d, origin=%s/%s/%s, version=%s.%s.%s)", op, section ? section : "'all'", pcmk_strerror(rc), rc, originator ? originator : "local", crm_element_value(request, F_CIB_CLIENTNAME), crm_element_value(request, F_CIB_CALLID), the_cib ? crm_element_value(the_cib, XML_ATTR_GENERATION_ADMIN) : "0", the_cib ? crm_element_value(the_cib, XML_ATTR_GENERATION) : "0", the_cib ? crm_element_value(the_cib, XML_ATTR_NUMUPDATES) : "0" ); if((now + 1) < time(NULL)) { crm_write_blackbox(0, NULL); } if (op_reply == NULL && (needs_reply || local_notify)) { crm_err("Unexpected NULL reply to message"); crm_log_xml_err(request, "null reply"); needs_reply = FALSE; local_notify = FALSE; } } /* from now on we are the server */ if (needs_reply == FALSE || stand_alone) { /* nothing more to do... * this was a non-originating slave update */ crm_trace("Completed slave update"); } else if (rc == pcmk_ok && result_diff != NULL && !(call_options & cib_inhibit_bcast)) { gboolean broadcast = FALSE; cib_local_bcast_num++; crm_xml_add_int(request, F_CIB_LOCAL_NOTIFY_ID, cib_local_bcast_num); broadcast = send_peer_reply(request, result_diff, originator, TRUE); if (broadcast && client_id && local_notify && op_reply) { /* If we have been asked to sync the reply, * and a bcast msg has gone out, we queue the local notify * until we know the bcast message has been received */ local_notify = FALSE; crm_trace("Queuing local %ssync notification for %s", (call_options & cib_sync_call)?"":"a-", client_id); queue_local_notify(op_reply, client_id, (call_options & cib_sync_call), from_peer); op_reply = NULL; /* the reply is queued, so don't free here */ } } else if (call_options & cib_discard_reply) { crm_trace("Caller isn't interested in reply"); } else if (from_peer) { if (is_update == FALSE || result_diff == NULL) { crm_trace("Request not broadcast: R/O call"); } else if (call_options & cib_inhibit_bcast) { crm_trace("Request not broadcast: inhibited"); } else if (rc != pcmk_ok) { crm_trace("Request not broadcast: call failed: %s", pcmk_strerror(rc)); } else { crm_trace("Directing reply to %s", originator); } send_peer_reply(op_reply, result_diff, originator, FALSE); } if (local_notify && client_id) { crm_trace("Performing local %ssync notification for %s", (call_options & cib_sync_call)?"":"a-", client_id); if (process == FALSE) { do_local_notify(request, client_id, call_options & cib_sync_call, from_peer); } else { do_local_notify(op_reply, client_id, call_options & cib_sync_call, from_peer); } } free_xml(op_reply); free_xml(result_diff); return; } xmlNode * cib_construct_reply(xmlNode * request, xmlNode * output, int rc) { int lpc = 0; xmlNode *reply = NULL; const char *name = NULL; const char *value = NULL; const char *names[] = { F_CIB_OPERATION, F_CIB_CALLID, F_CIB_CLIENTID, F_CIB_CALLOPTS }; static int max = DIMOF(names); crm_trace("Creating a basic reply"); reply = create_xml_node(NULL, "cib-reply"); crm_xml_add(reply, F_TYPE, T_CIB); for (lpc = 0; lpc < max; lpc++) { name = names[lpc]; value = crm_element_value(request, name); crm_xml_add(reply, name, value); } crm_xml_add_int(reply, F_CIB_RC, rc); if (output != NULL) { crm_trace("Attaching reply output"); add_message_xml(reply, F_CIB_CALLDATA, output); } return reply; } int cib_process_command(xmlNode * request, xmlNode ** reply, xmlNode ** cib_diff, gboolean privileged) { xmlNode *input = NULL; xmlNode *output = NULL; xmlNode *result_cib = NULL; xmlNode *current_cib = NULL; #if ENABLE_ACL xmlNode *filtered_current_cib = NULL; #endif int call_type = 0; int call_options = 0; int log_level = LOG_TRACE; const char *op = NULL; const char *section = NULL; int rc = pcmk_ok; int rc2 = pcmk_ok; gboolean send_r_notify = FALSE; gboolean global_update = FALSE; gboolean config_changed = FALSE; gboolean manage_counters = TRUE; CRM_ASSERT(cib_status == pcmk_ok); *reply = NULL; *cib_diff = NULL; current_cib = the_cib; /* Start processing the request... */ op = crm_element_value(request, F_CIB_OPERATION); crm_element_value_int(request, F_CIB_CALLOPTS, &call_options); rc = cib_get_operation_id(op, &call_type); if (rc == pcmk_ok && privileged == FALSE) { rc = cib_op_can_run(call_type, call_options, privileged, global_update); } rc2 = cib_op_prepare(call_type, request, &input, §ion); if (rc == pcmk_ok) { rc = rc2; } if (rc != pcmk_ok) { crm_trace("Call setup failed: %s", pcmk_strerror(rc)); goto done; } else if (cib_op_modifies(call_type) == FALSE) { #if ENABLE_ACL if (acl_enabled(config_hash) == FALSE || acl_filter_cib(request, current_cib, current_cib, &filtered_current_cib) == FALSE) { rc = cib_perform_op(op, call_options, cib_op_func(call_type), TRUE, section, request, input, FALSE, &config_changed, current_cib, &result_cib, NULL, &output); } else if (filtered_current_cib == NULL) { crm_debug("Pre-filtered the entire cib"); rc = -EACCES; } else { crm_debug("Pre-filtered the queried cib according to the ACLs"); rc = cib_perform_op(op, call_options, cib_op_func(call_type), TRUE, section, request, input, FALSE, &config_changed, filtered_current_cib, &result_cib, NULL, &output); } #else rc = cib_perform_op(op, call_options, cib_op_func(call_type), TRUE, section, request, input, FALSE, &config_changed, current_cib, &result_cib, NULL, &output); #endif CRM_CHECK(result_cib == NULL, free_xml(result_cib)); goto done; } /* Handle a valid write action */ global_update = crm_is_true(crm_element_value(request, F_CIB_GLOBAL_UPDATE)); if (global_update) { manage_counters = FALSE; call_options |= cib_force_diff; CRM_CHECK(call_type == 3 || call_type == 4, crm_err("Call type: %d", call_type); crm_log_xml_err(request, "bad op")); } #ifdef SUPPORT_PRENOTIFY if ((call_options & cib_inhibit_notify) == 0) { cib_pre_notify(call_options, op, the_cib, input); } #endif if (rc == pcmk_ok) { if (call_options & cib_inhibit_bcast) { /* skip */ crm_trace("Skipping update: inhibit broadcast"); manage_counters = FALSE; } /* result_cib must not be modified after cib_perform_op() returns */ rc = cib_perform_op(op, call_options, cib_op_func(call_type), FALSE, section, request, input, manage_counters, &config_changed, current_cib, &result_cib, cib_diff, &output); #if ENABLE_ACL if (acl_enabled(config_hash) == TRUE && acl_check_diff(request, current_cib, result_cib, *cib_diff) == FALSE) { rc = -EACCES; } #endif if (manage_counters == FALSE) { /* If the diff is NULL at this point, its because nothing changed */ config_changed = cib_config_changed(NULL, NULL, cib_diff); } /* Always write to disk for replace ops, * this also negates the need to detect ordering changes */ if (crm_str_eq(CIB_OP_REPLACE, op, TRUE)) { config_changed = TRUE; } } if (rc == pcmk_ok && (call_options & cib_dryrun) == 0) { rc = activateCibXml(result_cib, config_changed, op); if (rc == pcmk_ok && cib_internal_config_changed(*cib_diff)) { cib_read_config(config_hash, result_cib); } if (crm_str_eq(CIB_OP_REPLACE, op, TRUE)) { if (section == NULL) { send_r_notify = TRUE; } else if (safe_str_eq(section, XML_TAG_CIB)) { send_r_notify = TRUE; } else if (safe_str_eq(section, XML_CIB_TAG_NODES)) { send_r_notify = TRUE; } else if (safe_str_eq(section, XML_CIB_TAG_STATUS)) { send_r_notify = TRUE; } } else if (crm_str_eq(CIB_OP_ERASE, op, TRUE)) { send_r_notify = TRUE; } } else if (rc == -pcmk_err_dtd_validation) { if (output != NULL) { crm_log_xml_info(output, "cib:output"); free_xml(output); } #if ENABLE_ACL { xmlNode *filtered_result_cib = NULL; if (acl_enabled(config_hash) == FALSE || acl_filter_cib(request, current_cib, result_cib, &filtered_result_cib) == FALSE) { output = result_cib; } else { crm_debug("Filtered the result cib for output according to the ACLs"); output = filtered_result_cib; if (result_cib != NULL) { free_xml(result_cib); } } } #else output = result_cib; #endif } else { free_xml(result_cib); } if ((call_options & cib_inhibit_notify) == 0) { const char *call_id = crm_element_value(request, F_CIB_CALLID); const char *client = crm_element_value(request, F_CIB_CLIENTNAME); #ifdef SUPPORT_POSTNOTIFY cib_post_notify(call_options, op, input, rc, the_cib); #endif cib_diff_notify(call_options, client, call_id, op, input, rc, *cib_diff); } if (send_r_notify) { const char *origin = crm_element_value(request, F_ORIG); cib_replace_notify(origin, the_cib, rc, *cib_diff); } if (rc != pcmk_ok) { log_level = LOG_TRACE; if (rc == -pcmk_err_dtd_validation && global_update) { log_level = LOG_WARNING; crm_log_xml_info(input, "cib:global_update"); } } else if (config_changed) { log_level = LOG_TRACE; if (cib_is_master) { log_level = LOG_NOTICE; } } else if (cib_is_master) { log_level = LOG_TRACE; } log_cib_diff(log_level, *cib_diff, "cib:diff"); done: if ((call_options & cib_discard_reply) == 0) { *reply = cib_construct_reply(request, output, rc); crm_log_xml_explicit(*reply, "cib:reply"); } #if ENABLE_ACL if (filtered_current_cib != NULL) { free_xml(filtered_current_cib); } #endif if (call_type >= 0) { cib_op_cleanup(call_type, call_options, &input, &output); } return rc; } gint cib_GCompareFunc(gconstpointer a, gconstpointer b) { const xmlNode *a_msg = a; const xmlNode *b_msg = b; int msg_a_id = 0; int msg_b_id = 0; const char *value = NULL; value = crm_element_value_const(a_msg, F_CIB_CALLID); msg_a_id = crm_parse_int(value, NULL); value = crm_element_value_const(b_msg, F_CIB_CALLID); msg_b_id = crm_parse_int(value, NULL); if (msg_a_id == msg_b_id) { return 0; } else if (msg_a_id < msg_b_id) { return -1; } return 1; } #if SUPPORT_HEARTBEAT void cib_ha_peer_callback(HA_Message * msg, void *private_data) { xmlNode *xml = convert_ha_message(NULL, msg, __FUNCTION__); cib_peer_callback(xml, private_data); free_xml(xml); } #endif void cib_peer_callback(xmlNode * msg, void *private_data) { const char *reason = NULL; const char *originator = crm_element_value(msg, F_ORIG); if (originator == NULL || crm_str_eq(originator, cib_our_uname, TRUE)) { /* message is from ourselves */ int bcast_id = 0; if (!(crm_element_value_int(msg, F_CIB_LOCAL_NOTIFY_ID, &bcast_id))) { check_local_notify(bcast_id); } return; } else if (crm_peer_cache == NULL) { reason = "membership not established"; goto bail; } if (crm_element_value(msg, F_CIB_CLIENTNAME) == NULL) { crm_xml_add(msg, F_CIB_CLIENTNAME, originator); } /* crm_log_xml_trace("Peer[inbound]", msg); */ cib_process_request(msg, FALSE, TRUE, TRUE, NULL); return; bail: if (reason) { const char *seq = crm_element_value(msg, F_SEQ); const char *op = crm_element_value(msg, F_CIB_OPERATION); crm_warn("Discarding %s message (%s) from %s: %s", op, seq, originator, reason); } } #if SUPPORT_HEARTBEAT extern oc_ev_t *cib_ev_token; static void *ccm_library = NULL; int (*ccm_api_callback_done) (void *cookie) = NULL; int (*ccm_api_handle_event) (const oc_ev_t * token) = NULL; void cib_client_status_callback(const char *node, const char *client, const char *status, void *private) { crm_node_t *peer = NULL; if (safe_str_eq(client, CRM_SYSTEM_CIB)) { crm_info("Status update: Client %s/%s now has status [%s]", node, client, status); if (safe_str_eq(status, JOINSTATUS)) { status = ONLINESTATUS; } else if (safe_str_eq(status, LEAVESTATUS)) { status = OFFLINESTATUS; } peer = crm_get_peer(0, node); crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cib, status); } return; } int cib_ccm_dispatch(gpointer user_data) { int rc = 0; oc_ev_t *ccm_token = (oc_ev_t *) user_data; crm_trace("received callback"); if (ccm_api_handle_event == NULL) { ccm_api_handle_event = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_handle_event", 1); } rc = (*ccm_api_handle_event) (ccm_token); if (0 == rc) { return 0; } crm_err("CCM connection appears to have failed: rc=%d.", rc); /* eventually it might be nice to recover and reconnect... but until then... */ crm_err("Exiting to recover from CCM connection failure"); crm_exit(2); return -1; } int current_instance = 0; void cib_ccm_msg_callback(oc_ed_t event, void *cookie, size_t size, const void *data) { gboolean update_id = FALSE; const oc_ev_membership_t *membership = data; CRM_ASSERT(membership != NULL); crm_info("Processing CCM event=%s (id=%d)", ccm_event_name(event), membership->m_instance); if (current_instance > membership->m_instance) { crm_err("Membership instance ID went backwards! %d->%d", current_instance, membership->m_instance); CRM_ASSERT(current_instance <= membership->m_instance); } switch (event) { case OC_EV_MS_NEW_MEMBERSHIP: case OC_EV_MS_INVALID: update_id = TRUE; break; case OC_EV_MS_PRIMARY_RESTORED: update_id = TRUE; break; case OC_EV_MS_NOT_PRIMARY: crm_trace("Ignoring transitional CCM event: %s", ccm_event_name(event)); break; case OC_EV_MS_EVICTED: crm_err("Evicted from CCM: %s", ccm_event_name(event)); break; default: crm_err("Unknown CCM event: %d", event); } if (update_id) { unsigned int lpc = 0; CRM_CHECK(membership != NULL, return); current_instance = membership->m_instance; for (lpc = 0; lpc < membership->m_n_out; lpc++) { crm_update_ccm_node(membership, lpc + membership->m_out_idx, CRM_NODE_LOST, current_instance); } for (lpc = 0; lpc < membership->m_n_member; lpc++) { crm_update_ccm_node(membership, lpc + membership->m_memb_idx, CRM_NODE_ACTIVE, current_instance); } } if (ccm_api_callback_done == NULL) { ccm_api_callback_done = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_callback_done", 1); } (*ccm_api_callback_done) (cookie); return; } #endif gboolean can_write(int flags) { return TRUE; } static gboolean cib_force_exit(gpointer data) { crm_notice("Forcing exit!"); terminate_cib(__FUNCTION__, TRUE); return FALSE; } static void disconnect_remote_client(gpointer key, gpointer value, gpointer user_data) { crm_client_t *a_client = value; crm_err("Disconnecting %s... Not implemented", crm_str(a_client->name)); } void cib_shutdown(int nsig) { struct qb_ipcs_stats srv_stats; if (cib_shutdown_flag == FALSE) { int disconnects = 0; qb_ipcs_connection_t *c = NULL; cib_shutdown_flag = TRUE; c = qb_ipcs_connection_first_get(ipcs_rw); while(c != NULL) { qb_ipcs_connection_t *last = c; c = qb_ipcs_connection_next_get(ipcs_rw, last); crm_debug("Disconnecting r/w client %p...", last); qb_ipcs_disconnect(last); qb_ipcs_connection_unref(last); disconnects++; } c = qb_ipcs_connection_first_get(ipcs_ro); while(c != NULL) { qb_ipcs_connection_t *last = c; c = qb_ipcs_connection_next_get(ipcs_ro, last); crm_debug("Disconnecting r/o client %p...", last); qb_ipcs_disconnect(last); qb_ipcs_connection_unref(last); disconnects++; } c = qb_ipcs_connection_first_get(ipcs_shm); while(c != NULL) { qb_ipcs_connection_t *last = c; c = qb_ipcs_connection_next_get(ipcs_shm, last); crm_debug("Disconnecting non-blocking r/w client %p...", last); qb_ipcs_disconnect(last); qb_ipcs_connection_unref(last); disconnects++; } disconnects += crm_hash_table_size(client_connections); crm_debug("Disconnecting %d remote clients", crm_hash_table_size(client_connections)); g_hash_table_foreach(client_connections, disconnect_remote_client, NULL); crm_info("Disconnected %d clients", disconnects); } qb_ipcs_stats_get(ipcs_rw, &srv_stats, QB_FALSE); if(crm_hash_table_size(client_connections) == 0) { crm_info("All clients disconnected (%d)", srv_stats.active_connections); initiate_exit(); } else { crm_info("Waiting on %d clients to disconnect (%d)", crm_hash_table_size(client_connections), srv_stats.active_connections); } } void initiate_exit(void) { int active = 0; xmlNode *leaving = NULL; active = crm_active_peers(); if (active < 2) { terminate_cib(__FUNCTION__, FALSE); return; } crm_info("Sending disconnect notification to %d peers...", active); leaving = create_xml_node(NULL, "exit-notification"); crm_xml_add(leaving, F_TYPE, "cib"); crm_xml_add(leaving, F_CIB_OPERATION, "cib_shutdown_req"); send_cluster_message(NULL, crm_msg_cib, leaving, TRUE); free_xml(leaving); g_timeout_add(crm_get_msec("5s"), cib_force_exit, NULL); } extern int remote_fd; extern int remote_tls_fd; extern void terminate_cs_connection(void); void terminate_cib(const char *caller, gboolean fast) { if (remote_fd > 0) { close(remote_fd); remote_fd = 0; } if (remote_tls_fd > 0) { close(remote_tls_fd); remote_tls_fd = 0; } if(!fast) { crm_info("%s: Disconnecting from cluster infrastructure", caller); crm_cluster_disconnect(&crm_cluster); } uninitializeCib(); crm_info("%s: Exiting%s...", caller, fast?" fast":mainloop?" from mainloop":""); if(fast == FALSE && mainloop != NULL && g_main_is_running(mainloop)) { g_main_quit(mainloop); } else { qb_ipcs_destroy(ipcs_ro); qb_ipcs_destroy(ipcs_rw); qb_ipcs_destroy(ipcs_shm); if (fast) { crm_exit(EX_USAGE); } else { crm_exit(EX_OK); } } } diff --git a/cib/remote.c b/cib/remote.c index e91b5d8b5a..f5e3744d8c 100644 --- a/cib/remote.c +++ b/cib/remote.c @@ -1,695 +1,691 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "callbacks.h" /* #undef HAVE_PAM_PAM_APPL_H */ /* #undef HAVE_GNUTLS_GNUTLS_H */ #ifdef HAVE_GNUTLS_GNUTLS_H # undef KEYFILE # include #endif #include #include #if HAVE_SECURITY_PAM_APPL_H # include # define HAVE_PAM 1 #else # if HAVE_PAM_PAM_APPL_H # include # define HAVE_PAM 1 # endif #endif extern int remote_tls_fd; extern gboolean cib_shutdown_flag; int init_remote_listener(int port, gboolean encrypted); void cib_remote_connection_destroy(gpointer user_data); #ifdef HAVE_GNUTLS_GNUTLS_H # define DH_BITS 1024 gnutls_dh_params dh_params; gnutls_anon_server_credentials anon_cred_s; static void debug_log(int level, const char *str) { fputs(str, stderr); } #endif #define REMOTE_AUTH_TIMEOUT 10000 int num_clients; int authenticate_user(const char *user, const char *passwd); int cib_remote_listen(gpointer data); int cib_remote_msg(gpointer data); static void remote_connection_destroy(gpointer user_data) { return; } #define ERROR_SUFFIX " Shutting down remote listener" int init_remote_listener(int port, gboolean encrypted) { int rc; int *ssock = NULL; struct sockaddr_in saddr; int optval; static struct mainloop_fd_callbacks remote_listen_fd_callbacks = { .dispatch = cib_remote_listen, .destroy = remote_connection_destroy, }; if (port <= 0) { /* dont start it */ return 0; } if (encrypted) { #ifndef HAVE_GNUTLS_GNUTLS_H crm_warn("TLS support is not available"); return 0; #else crm_notice("Starting a tls listener on port %d.", port); gnutls_global_init(); /* gnutls_global_set_log_level (10); */ gnutls_global_set_log_function(debug_log); gnutls_dh_params_init(&dh_params); gnutls_dh_params_generate2(dh_params, DH_BITS); gnutls_anon_allocate_server_credentials(&anon_cred_s); gnutls_anon_set_server_dh_params(anon_cred_s, dh_params); #endif } else { crm_warn("Starting a plain_text listener on port %d.", port); } #ifndef HAVE_PAM crm_warn("PAM is _not_ enabled!"); #endif /* create server socket */ ssock = malloc(sizeof(int)); *ssock = socket(AF_INET, SOCK_STREAM, 0); if (*ssock == -1) { crm_perror(LOG_ERR, "Can not create server socket." ERROR_SUFFIX); free(ssock); return -1; } /* reuse address */ optval = 1; rc = setsockopt(*ssock, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)); if(rc < 0) { crm_perror(LOG_INFO, "Couldn't allow the reuse of local addresses by our remote listener"); } /* bind server socket */ memset(&saddr, '\0', sizeof(saddr)); saddr.sin_family = AF_INET; saddr.sin_addr.s_addr = INADDR_ANY; saddr.sin_port = htons(port); if (bind(*ssock, (struct sockaddr *)&saddr, sizeof(saddr)) == -1) { crm_perror(LOG_ERR, "Can not bind server socket." ERROR_SUFFIX); close(*ssock); free(ssock); return -2; } if (listen(*ssock, 10) == -1) { crm_perror(LOG_ERR, "Can not start listen." ERROR_SUFFIX); close(*ssock); free(ssock); return -3; } mainloop_add_fd("cib-remote", G_PRIORITY_DEFAULT, *ssock, ssock, &remote_listen_fd_callbacks); return *ssock; } static int check_group_membership(const char *usr, const char *grp) { int index = 0; struct passwd *pwd = NULL; struct group *group = NULL; CRM_CHECK(usr != NULL, return FALSE); CRM_CHECK(grp != NULL, return FALSE); pwd = getpwnam(usr); if (pwd == NULL) { crm_err("No user named '%s' exists!", usr); return FALSE; } group = getgrgid(pwd->pw_gid); if (group != NULL && crm_str_eq(grp, group->gr_name, TRUE)) { return TRUE; } group = getgrnam(grp); if (group == NULL) { crm_err("No group named '%s' exists!", grp); return FALSE; } while (TRUE) { char *member = group->gr_mem[index++]; if (member == NULL) { break; } else if (crm_str_eq(usr, member, TRUE)) { return TRUE; } }; return FALSE; } static gboolean cib_remote_auth(xmlNode *login) { const char *user = NULL; const char *pass = NULL; const char *tmp = NULL; crm_log_xml_info(login, "Login: "); if (login == NULL) { return FALSE; } tmp = crm_element_name(login); if (safe_str_neq(tmp, "cib_command")) { crm_err("Wrong tag: %s", tmp); return FALSE; } tmp = crm_element_value(login, "op"); if (safe_str_neq(tmp, "authenticate")) { crm_err("Wrong operation: %s", tmp); return FALSE; } user = crm_element_value(login, "user"); pass = crm_element_value(login, "password"); if (!user || !pass) { crm_err("missing auth credentials"); return FALSE; } /* Non-root daemons can only validate the password of the * user they're running as */ if (check_group_membership(user, CRM_DAEMON_GROUP) == FALSE) { crm_err("User is not a member of the required group"); return FALSE; } else if (authenticate_user(user, pass) == FALSE) { crm_err("PAM auth failed"); return FALSE; } return TRUE; } static gboolean remote_auth_timeout_cb(gpointer data) { crm_client_t *client = data; client->remote->auth_timeout = 0; if (client->remote->authenticated == TRUE) { return FALSE; } mainloop_del_fd(client->remote->source); crm_err("Remote client authentication timed out"); return FALSE; } int cib_remote_listen(gpointer data) { int csock = 0; unsigned laddr; struct sockaddr_in addr; int ssock = *(int *)data; int flag; crm_client_t *new_client = NULL; static struct mainloop_fd_callbacks remote_client_fd_callbacks = { .dispatch = cib_remote_msg, .destroy = cib_remote_connection_destroy, }; /* accept the connection */ laddr = sizeof(addr); csock = accept(ssock, (struct sockaddr *)&addr, &laddr); crm_debug("New %s connection from %s", ssock == remote_tls_fd ? "secure" : "clear-text", inet_ntoa(addr.sin_addr)); if (csock == -1) { crm_err("accept socket failed"); return TRUE; } if ((flag = fcntl(csock, F_GETFL)) >= 0) { if (fcntl(csock, F_SETFL, flag | O_NONBLOCK) < 0) { crm_err( "fcntl() write failed"); close(csock); return TRUE; } } else { crm_err( "fcntl() read failed"); close(csock); return TRUE; } if (ssock == remote_tls_fd) { #ifdef HAVE_GNUTLS_GNUTLS_H /* create gnutls session for the server socket */ new_client->remote->tls_session = crm_create_anon_tls_session( csock, GNUTLS_SERVER, anon_cred_s); if (new_client->remote->tls_session == NULL) { crm_err("TLS session creation failed"); close(csock); return TRUE; } #endif } num_clients++; crm_client_init(); new_client = calloc(1, sizeof(crm_client_t)); new_client->remote = calloc(1, sizeof(crm_remote_t)); new_client->id = crm_generate_uuid(); g_hash_table_insert(client_connections, new_client->id/* Should work */, new_client); -#if ENABLE_ACL - new_client->user = strdup(user); -#endif - /* clients have a few seconds to perform handshake. */ new_client->remote->auth_timeout = g_timeout_add( REMOTE_AUTH_TIMEOUT, remote_auth_timeout_cb, new_client); if (ssock == remote_tls_fd) { #ifdef HAVE_GNUTLS_GNUTLS_H new_client->kind = CRM_CLIENT_TLS; #endif } else { new_client->kind = CRM_CLIENT_TCP; new_client->remote->tcp_socket = csock; } new_client->remote->source = mainloop_add_fd( "cib-remote-client", G_PRIORITY_DEFAULT, csock, new_client, &remote_client_fd_callbacks); return TRUE; } void cib_remote_connection_destroy(gpointer user_data) { crm_client_t *client = user_data; int csock = 0; if (client == NULL) { return; } crm_trace("Cleaning up after client disconnect: %s/%s", crm_str(client->name), client->id); num_clients--; crm_trace("Num unfree'd clients: %d", num_clients); if (client->kind == CRM_CLIENT_TLS) { #ifdef HAVE_GNUTLS_GNUTLS_H if (client->remote->tls_session) { void *sock_ptr = gnutls_transport_get_ptr(*client->remote->tls_session); csock = GPOINTER_TO_INT(sock_ptr); if (client->remote->tls_handshake_complete) { gnutls_bye(*client->remote->tls_session, GNUTLS_SHUT_WR); } gnutls_deinit(*client->remote->tls_session); gnutls_free(client->remote->tls_session); } #endif } else { csock = GPOINTER_TO_INT(client->remote->tls_session); } client->remote->tls_session = NULL; if (csock > 0) { close(csock); } crm_client_destroy(client); crm_trace("Freed the cib client"); if (cib_shutdown_flag) { cib_shutdown(0); } return; } static void cib_handle_remote_msg(crm_client_t *client, xmlNode *command) { const char *value = NULL; value = crm_element_name(command); if (safe_str_neq(value, "cib_command")) { crm_log_xml_trace(command, "Bad command: "); return; } if (client->name == NULL) { value = crm_element_value(command, F_CLIENTNAME); if (value == NULL) { client->name = strdup(client->id); } else { client->name = strdup(value); } } if (client->userdata == NULL) { value = crm_element_value(command, F_CIB_CALLBACK_TOKEN); if (value != NULL) { client->userdata = strdup(value); crm_trace("Callback channel for %s is %s", client->id, client->userdata); } else { client->userdata = strdup(client->id); } } /* unset dangerous options */ xml_remove_prop(command, F_ORIG); xml_remove_prop(command, F_CIB_HOST); xml_remove_prop(command, F_CIB_GLOBAL_UPDATE); crm_xml_add(command, F_TYPE, T_CIB); crm_xml_add(command, F_CIB_CLIENTID, client->id); crm_xml_add(command, F_CIB_CLIENTNAME, client->name); #if ENABLE_ACL crm_xml_add(command, F_CIB_USER, client->user); #endif if (crm_element_value(command, F_CIB_CALLID) == NULL) { char *call_uuid = crm_generate_uuid(); /* fix the command */ crm_xml_add(command, F_CIB_CALLID, call_uuid); free(call_uuid); } if (crm_element_value(command, F_CIB_CALLOPTS) == NULL) { crm_xml_add_int(command, F_CIB_CALLOPTS, 0); } crm_log_xml_trace(command, "Remote command: "); cib_common_callback_worker(0, 0, command, client, TRUE); } int cib_remote_msg(gpointer data) { xmlNode *command = NULL; crm_client_t *client = data; int disconnected = 0; int timeout = client->remote->authenticated ? -1 : 1000; crm_trace("%s callback", client->kind == CRM_CLIENT_TLS ? "secure" : "clear-text"); #ifdef HAVE_GNUTLS_GNUTLS_H if (client->kind == CRM_CLIENT_TLS && (client->remote->tls_handshake_complete == FALSE)) { int rc = 0; /* Muliple calls to handshake will be required, this callback * will be invoked once the client sends more handshake data. */ do { rc = gnutls_handshake(*client->remote->tls_session); if (rc < 0 && rc != GNUTLS_E_AGAIN) { crm_err("Remote cib tls handshake failed"); return -1; } } while (rc == GNUTLS_E_INTERRUPTED); if (rc == 0) { crm_debug("Remote cib tls handshake completed"); client->remote->tls_handshake_complete = TRUE; if (client->remote->auth_timeout) { g_source_remove(client->remote->auth_timeout); } /* after handshake, clients must send auth in a few seconds */ client->remote->auth_timeout = g_timeout_add(REMOTE_AUTH_TIMEOUT, remote_auth_timeout_cb, client); } return 0; } #endif crm_remote_recv(client->remote, timeout, &disconnected); /* must pass auth before we will process anything else */ if (client->remote->authenticated == FALSE) { xmlNode *reg; #if ENABLE_ACL const char *user = NULL; #endif command = crm_remote_parse_buffer(client->remote); if (cib_remote_auth(command) == FALSE) { free_xml(command); return -1; } crm_debug("remote connection authenticated successfully"); client->remote->authenticated = TRUE; g_source_remove(client->remote->auth_timeout); client->remote->auth_timeout = 0; client->name = crm_element_value_copy(command, "name"); #if ENABLE_ACL user = crm_element_value(command, "user"); if (user) { - new_client->user = strdup(user); + client->user = strdup(user); } #endif /* send ACK */ reg = create_xml_node(NULL, "cib_result"); crm_xml_add(reg, F_CIB_OPERATION, CRM_OP_REGISTER); crm_xml_add(reg, F_CIB_CLIENTID, client->id); crm_remote_send(client->remote, reg); free_xml(reg); free_xml(command); } command = crm_remote_parse_buffer(client->remote); while (command) { crm_trace("command received"); cib_handle_remote_msg(client, command); free_xml(command); command = crm_remote_parse_buffer(client->remote); } if (disconnected) { crm_trace("disconnected while receiving remote cib msg."); return -1; } return 0; } #ifdef HAVE_PAM /* * Useful Examples: * http://www.kernel.org/pub/linux/libs/pam/Linux-PAM-html * http://developer.apple.com/samplecode/CryptNoMore/index.html */ static int construct_pam_passwd(int num_msg, const struct pam_message **msg, struct pam_response **response, void *data) { int count = 0; struct pam_response *reply; char *string = (char *)data; CRM_CHECK(data, return PAM_CONV_ERR); CRM_CHECK(num_msg == 1, return PAM_CONV_ERR); /* We only want to handle one message */ reply = calloc(1, sizeof(struct pam_response)); CRM_ASSERT(reply != NULL); for (count = 0; count < num_msg; ++count) { switch (msg[count]->msg_style) { case PAM_TEXT_INFO: crm_info("PAM: %s\n", msg[count]->msg); break; case PAM_PROMPT_ECHO_OFF: case PAM_PROMPT_ECHO_ON: reply[count].resp_retcode = 0; reply[count].resp = string; /* We already made a copy */ case PAM_ERROR_MSG: /* In theory we'd want to print this, but then * we see the password prompt in the logs */ /* crm_err("PAM error: %s\n", msg[count]->msg); */ break; default: crm_err("Unhandled conversation type: %d", msg[count]->msg_style); goto bail; } } *response = reply; reply = NULL; return PAM_SUCCESS; bail: for (count = 0; count < num_msg; ++count) { if (reply[count].resp != NULL) { switch (msg[count]->msg_style) { case PAM_PROMPT_ECHO_ON: case PAM_PROMPT_ECHO_OFF: /* Erase the data - it contained a password */ while (*(reply[count].resp)) { *(reply[count].resp)++ = '\0'; } free(reply[count].resp); break; } reply[count].resp = NULL; } } free(reply); reply = NULL; return PAM_CONV_ERR; } #endif int authenticate_user(const char *user, const char *passwd) { #ifndef HAVE_PAM gboolean pass = TRUE; #else int rc = 0; gboolean pass = FALSE; const void *p_user = NULL; struct pam_conv p_conv; struct pam_handle *pam_h = NULL; static const char *pam_name = NULL; if (pam_name == NULL) { pam_name = getenv("CIB_pam_service"); } if (pam_name == NULL) { pam_name = "login"; } p_conv.conv = construct_pam_passwd; p_conv.appdata_ptr = strdup(passwd); rc = pam_start(pam_name, user, &p_conv, &pam_h); if (rc != PAM_SUCCESS) { crm_err("Could not initialize PAM: %s (%d)", pam_strerror(pam_h, rc), rc); goto bail; } rc = pam_authenticate(pam_h, 0); if (rc != PAM_SUCCESS) { crm_err("Authentication failed for %s: %s (%d)", user, pam_strerror(pam_h, rc), rc); goto bail; } /* Make sure we authenticated the user we wanted to authenticate. * Since we also run as non-root, it might be worth pre-checking * the user has the same EID as us, since that the only user we * can authenticate. */ rc = pam_get_item(pam_h, PAM_USER, &p_user); if (rc != PAM_SUCCESS) { crm_err("Internal PAM error: %s (%d)", pam_strerror(pam_h, rc), rc); goto bail; } else if (p_user == NULL) { crm_err("Unknown user authenticated."); goto bail; } else if (safe_str_neq(p_user, user)) { crm_err("User mismatch: %s vs. %s.", (const char *)p_user, (const char *)user); goto bail; } rc = pam_acct_mgmt(pam_h, 0); if (rc != PAM_SUCCESS) { crm_err("Access denied: %s (%d)", pam_strerror(pam_h, rc), rc); goto bail; } pass = TRUE; bail: rc = pam_end(pam_h, rc); #endif return pass; } diff --git a/crmd/control.c b/crmd/control.c index 6312e53c53..0f3c647039 100644 --- a/crmd/control.c +++ b/crmd/control.c @@ -1,885 +1,884 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include -#include qb_ipcs_service_t *ipcs = NULL; extern gboolean crm_connect_corosync(crm_cluster_t *cluster); extern void crmd_ha_connection_destroy(gpointer user_data); void crm_shutdown(int nsig); gboolean crm_read_options(gpointer user_data); gboolean fsa_has_quorum = FALSE; crm_trigger_t *fsa_source = NULL; crm_trigger_t *config_read = NULL; /* A_HA_CONNECT */ void do_ha_control(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t * msg_data) { gboolean registered = FALSE; static crm_cluster_t *cluster = NULL; if(cluster == NULL) { cluster = calloc(1, sizeof(crm_cluster_t)); } if (action & A_HA_DISCONNECT) { crm_cluster_disconnect(cluster); crm_info("Disconnected from the cluster"); #if SUPPORT_HEARTBEAT set_bit(fsa_input_register, R_HA_DISCONNECTED); #endif } if (action & A_HA_CONNECT) { crm_set_status_callback(&peer_update_callback); if (is_openais_cluster()) { #if SUPPORT_COROSYNC registered = crm_connect_corosync(cluster); #endif } else if (is_heartbeat_cluster()) { #if SUPPORT_HEARTBEAT cluster->destroy = crmd_ha_connection_destroy; cluster->hb_dispatch = crmd_ha_msg_callback; registered = crm_cluster_connect(cluster); fsa_cluster_conn = cluster->hb_conn; crm_trace("Be informed of Node Status changes"); if (registered && fsa_cluster_conn->llc_ops->set_nstatus_callback(fsa_cluster_conn, crmd_ha_status_callback, fsa_cluster_conn) != HA_OK) { crm_err("Cannot set nstatus callback: %s", fsa_cluster_conn->llc_ops->errmsg(fsa_cluster_conn)); registered = FALSE; } crm_trace("Be informed of CRM Client Status changes"); if (registered && fsa_cluster_conn->llc_ops->set_cstatus_callback(fsa_cluster_conn, crmd_client_status_callback, fsa_cluster_conn) != HA_OK) { crm_err("Cannot set cstatus callback: %s", fsa_cluster_conn->llc_ops->errmsg(fsa_cluster_conn)); registered = FALSE; } if (registered) { crm_trace("Requesting an initial dump of CRMD client_status"); fsa_cluster_conn->llc_ops->client_status(fsa_cluster_conn, NULL, CRM_SYSTEM_CRMD, -1); } #endif } fsa_our_uname = cluster->uname; fsa_our_uuid = cluster->uuid; if (registered == FALSE) { set_bit(fsa_input_register, R_HA_DISCONNECTED); register_fsa_error(C_FSA_INTERNAL, I_ERROR, NULL); return; } populate_cib_nodes(node_update_none, __FUNCTION__); clear_bit(fsa_input_register, R_HA_DISCONNECTED); crm_info("Connected to the cluster"); } if (action & ~(A_HA_CONNECT | A_HA_DISCONNECT)) { crm_err("Unexpected action %s in %s", fsa_action2string(action), __FUNCTION__); } } /* A_SHUTDOWN */ void do_shutdown(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t * msg_data) { /* just in case */ set_bit(fsa_input_register, R_SHUTDOWN); if (is_heartbeat_cluster()) { if (is_set(fsa_input_register, pe_subsystem->flag_connected)) { crm_info("Terminating the %s", pe_subsystem->name); if (stop_subsystem(pe_subsystem, TRUE) == FALSE) { /* its gone... */ crm_err("Faking %s exit", pe_subsystem->name); clear_bit(fsa_input_register, pe_subsystem->flag_connected); } else { crm_info("Waiting for subsystems to exit"); crmd_fsa_stall(FALSE); } } crm_info("All subsystems stopped, continuing"); } if (stonith_api) { /* Prevent it from comming up again */ clear_bit(fsa_input_register, R_ST_REQUIRED); crm_info("Disconnecting STONITH..."); stonith_api->cmds->disconnect(stonith_api); } } /* A_SHUTDOWN_REQ */ void do_shutdown_req(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t * msg_data) { xmlNode *msg = NULL; crm_info("Sending shutdown request to %s", crm_str(fsa_our_dc)); msg = create_request(CRM_OP_SHUTDOWN_REQ, NULL, NULL, CRM_SYSTEM_DC, CRM_SYSTEM_CRMD, NULL); /* set_bit(fsa_input_register, R_STAYDOWN); */ if (send_cluster_message(NULL, crm_msg_crmd, msg, TRUE) == FALSE) { register_fsa_error(C_FSA_INTERNAL, I_ERROR, NULL); } free_xml(msg); } extern crm_ipc_t *attrd_ipc; extern char *max_generation_from; extern xmlNode *max_generation_xml; extern GHashTable *resource_history; extern GHashTable *voted; extern GHashTable *reload_hash; void log_connected_client(gpointer key, gpointer value, gpointer user_data); void log_connected_client(gpointer key, gpointer value, gpointer user_data) { crm_client_t *client = value; crm_err("%s is still connected at exit", crm_client_name(client)); } int crmd_exit(int rc) { GListPtr gIter = NULL; if(attrd_ipc) { crm_ipc_close(attrd_ipc); crm_ipc_destroy(attrd_ipc); } if(crmd_mainloop) { g_main_loop_quit(crmd_mainloop); g_main_loop_unref(crmd_mainloop); } #if SUPPORT_HEARTBEAT if (fsa_cluster_conn) { fsa_cluster_conn->llc_ops->delete(fsa_cluster_conn); fsa_cluster_conn = NULL; } #endif for(gIter = fsa_message_queue; gIter != NULL; gIter = gIter->next) { fsa_data_t *fsa_data = gIter->data; crm_info("Dropping %s: [ state=%s cause=%s origin=%s ]", fsa_input2string(fsa_data->fsa_input), fsa_state2string(fsa_state), fsa_cause2string(fsa_data->fsa_cause), fsa_data->origin); delete_fsa_input(fsa_data); } g_list_free(fsa_message_queue); fsa_message_queue = NULL; crm_client_cleanup(); empty_uuid_cache(); crm_peer_destroy(); clear_bit(fsa_input_register, R_MEMBERSHIP); if (te_subsystem->client && te_subsystem->client->ipcs) { crm_debug("Full destroy: TE"); qb_ipcs_disconnect(te_subsystem->client->ipcs); } free(te_subsystem); if (pe_subsystem->client && pe_subsystem->client->ipcs) { crm_debug("Full destroy: PE"); qb_ipcs_disconnect(pe_subsystem->client->ipcs); } free(pe_subsystem); free(cib_subsystem); if (integrated_nodes) { g_hash_table_destroy(integrated_nodes); } if (finalized_nodes) { g_hash_table_destroy(finalized_nodes); } if (confirmed_nodes) { g_hash_table_destroy(confirmed_nodes); } if (reload_hash) { g_hash_table_destroy(reload_hash); } if (voted) { g_hash_table_destroy(voted); } cib_delete(fsa_cib_conn); fsa_cib_conn = NULL; lrm_state_destroy_all(); free(transition_timer); free(integration_timer); free(finalization_timer); free(election_trigger); free(election_timeout); free(shutdown_escalation_timer); free(wait_timer); free(recheck_timer); free(fsa_our_dc_version); free(fsa_our_uname); free(fsa_our_uuid); free(fsa_our_dc); free(max_generation_from); free_xml(max_generation_xml); return crm_exit(rc); } /* A_EXIT_0, A_EXIT_1 */ void do_exit(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t * msg_data) { int exit_code = 0; int log_level = LOG_INFO; const char *exit_type = "gracefully"; if (action & A_EXIT_1) { exit_code = 1; log_level = LOG_ERR; exit_type = "forcefully"; } verify_stopped(cur_state, LOG_ERR); do_crm_log(log_level, "Performing %s - %s exiting the CRMd", fsa_action2string(action), exit_type); if (is_set(fsa_input_register, R_IN_RECOVERY)) { crm_err("Could not recover from internal error"); exit_code = 2; } if (is_set(fsa_input_register, R_STAYDOWN)) { crm_warn("Inhibiting respawn by Heartbeat"); exit_code = 100; } crm_info("[%s] stopped (%d)", crm_system_name, exit_code); delete_fsa_input(msg_data); crmd_exit(exit_code); } /* A_STARTUP */ void do_startup(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t * msg_data) { int was_error = 0; int interval = 1; /* seconds between DC heartbeats */ crm_debug("Registering Signal Handlers"); mainloop_add_signal(SIGTERM, crm_shutdown); fsa_source = mainloop_add_trigger(G_PRIORITY_HIGH, crm_fsa_trigger, NULL); config_read = mainloop_add_trigger(G_PRIORITY_HIGH, crm_read_options, NULL); crm_debug("Creating CIB and LRM objects"); fsa_cib_conn = cib_new(); lrm_state_init_local(); /* set up the timers */ transition_timer = calloc(1, sizeof(fsa_timer_t)); integration_timer = calloc(1, sizeof(fsa_timer_t)); finalization_timer = calloc(1, sizeof(fsa_timer_t)); election_trigger = calloc(1, sizeof(fsa_timer_t)); election_timeout = calloc(1, sizeof(fsa_timer_t)); shutdown_escalation_timer = calloc(1, sizeof(fsa_timer_t)); wait_timer = calloc(1, sizeof(fsa_timer_t)); recheck_timer = calloc(1, sizeof(fsa_timer_t)); interval = interval * 1000; if (election_trigger != NULL) { election_trigger->source_id = 0; election_trigger->period_ms = -1; election_trigger->fsa_input = I_DC_TIMEOUT; election_trigger->callback = crm_timer_popped; election_trigger->repeat = FALSE; } else { was_error = TRUE; } if (election_timeout != NULL) { election_timeout->source_id = 0; election_timeout->period_ms = -1; election_timeout->fsa_input = I_ELECTION_DC; election_timeout->callback = crm_timer_popped; election_timeout->repeat = FALSE; } else { was_error = TRUE; } if (transition_timer != NULL) { transition_timer->source_id = 0; transition_timer->period_ms = -1; transition_timer->fsa_input = I_PE_CALC; transition_timer->callback = crm_timer_popped; transition_timer->repeat = FALSE; } else { was_error = TRUE; } if (integration_timer != NULL) { integration_timer->source_id = 0; integration_timer->period_ms = -1; integration_timer->fsa_input = I_INTEGRATED; integration_timer->callback = crm_timer_popped; integration_timer->repeat = FALSE; } else { was_error = TRUE; } if (finalization_timer != NULL) { finalization_timer->source_id = 0; finalization_timer->period_ms = -1; finalization_timer->fsa_input = I_FINALIZED; finalization_timer->callback = crm_timer_popped; finalization_timer->repeat = FALSE; /* for possible enabling... a bug in the join protocol left * a slave in S_PENDING while we think its in S_NOT_DC * * raising I_FINALIZED put us into a transition loop which is * never resolved. * in this loop we continually send probes which the node * NACK's because its in S_PENDING * * if we have nodes where heartbeat is active but the * CRM is not... then this will be handled in the * integration phase */ finalization_timer->fsa_input = I_ELECTION; } else { was_error = TRUE; } if (shutdown_escalation_timer != NULL) { shutdown_escalation_timer->source_id = 0; shutdown_escalation_timer->period_ms = -1; shutdown_escalation_timer->fsa_input = I_STOP; shutdown_escalation_timer->callback = crm_timer_popped; shutdown_escalation_timer->repeat = FALSE; } else { was_error = TRUE; } if (wait_timer != NULL) { wait_timer->source_id = 0; wait_timer->period_ms = 2000; wait_timer->fsa_input = I_NULL; wait_timer->callback = crm_timer_popped; wait_timer->repeat = FALSE; } else { was_error = TRUE; } if (recheck_timer != NULL) { recheck_timer->source_id = 0; recheck_timer->period_ms = -1; recheck_timer->fsa_input = I_PE_CALC; recheck_timer->callback = crm_timer_popped; recheck_timer->repeat = FALSE; } else { was_error = TRUE; } /* set up the sub systems */ cib_subsystem = calloc(1, sizeof(struct crm_subsystem_s)); te_subsystem = calloc(1, sizeof(struct crm_subsystem_s)); pe_subsystem = calloc(1, sizeof(struct crm_subsystem_s)); if (cib_subsystem != NULL) { cib_subsystem->pid = -1; cib_subsystem->name = CRM_SYSTEM_CIB; cib_subsystem->flag_connected = R_CIB_CONNECTED; cib_subsystem->flag_required = R_CIB_REQUIRED; } else { was_error = TRUE; } if (te_subsystem != NULL) { te_subsystem->pid = -1; te_subsystem->name = CRM_SYSTEM_TENGINE; te_subsystem->flag_connected = R_TE_CONNECTED; te_subsystem->flag_required = R_TE_REQUIRED; } else { was_error = TRUE; } if (pe_subsystem != NULL) { pe_subsystem->pid = -1; pe_subsystem->path = CRM_DAEMON_DIR; pe_subsystem->name = CRM_SYSTEM_PENGINE; pe_subsystem->command = CRM_DAEMON_DIR "/" CRM_SYSTEM_PENGINE; pe_subsystem->args = NULL; pe_subsystem->flag_connected = R_PE_CONNECTED; pe_subsystem->flag_required = R_PE_REQUIRED; } else { was_error = TRUE; } if (was_error == FALSE && is_heartbeat_cluster()) { if(start_subsystem(pe_subsystem) == FALSE) { was_error = TRUE; } } if (was_error) { register_fsa_error(C_FSA_INTERNAL, I_ERROR, NULL); } welcomed_nodes = g_hash_table_new_full(crm_str_hash, g_str_equal, g_hash_destroy_str, g_hash_destroy_str); integrated_nodes = g_hash_table_new_full(crm_str_hash, g_str_equal, g_hash_destroy_str, g_hash_destroy_str); finalized_nodes = g_hash_table_new_full(crm_str_hash, g_str_equal, g_hash_destroy_str, g_hash_destroy_str); confirmed_nodes = g_hash_table_new_full(crm_str_hash, g_str_equal, g_hash_destroy_str, g_hash_destroy_str); } static int32_t crmd_ipc_accept(qb_ipcs_connection_t *c, uid_t uid, gid_t gid) { crm_trace("Connection %p", c); if(crm_client_new(c, uid, gid) == NULL) { return -EIO; } return 0; } static void crmd_ipc_created(qb_ipcs_connection_t *c) { crm_trace("Connection %p", c); } static int32_t crmd_ipc_dispatch(qb_ipcs_connection_t *c, void *data, size_t size) { uint32_t id = 0; uint32_t flags = 0; crm_client_t *client = crm_client_get(c); xmlNode *msg = crm_ipcs_recv(client, data, size, &id, &flags); crm_trace("Invoked: %s", crm_client_name(client)); if(flags & crm_ipc_client_response) { crm_ipcs_send_ack(client, id, "ack", __FUNCTION__, __LINE__); } if (msg == NULL) { return 0; } #if ENABLE_ACL determine_request_user(client->user, msg, F_CRM_USER); #endif crm_trace("Processing msg from %s", crm_client_name(client)); crm_log_xml_trace(msg, "CRMd[inbound]"); crm_xml_add(msg, F_CRM_SYS_FROM, client->id); if (crmd_authorize_message(msg, client)) { route_message(C_IPC_MESSAGE, msg); } trigger_fsa(fsa_source); free_xml(msg); return 0; } static int32_t crmd_ipc_closed(qb_ipcs_connection_t *c) { crm_client_t *client = crm_client_get(c); struct crm_subsystem_s *the_subsystem = NULL; crm_trace("Connection %p", c); if (client->userdata == NULL) { crm_trace("Client hadn't registered with us yet"); } else if (strcasecmp(CRM_SYSTEM_PENGINE, client->userdata) == 0) { the_subsystem = pe_subsystem; } else if (strcasecmp(CRM_SYSTEM_TENGINE, client->userdata) == 0) { the_subsystem = te_subsystem; } else if (strcasecmp(CRM_SYSTEM_CIB, client->userdata) == 0) { the_subsystem = cib_subsystem; } if (the_subsystem != NULL) { the_subsystem->source = NULL; the_subsystem->client = NULL; crm_info("Received HUP from %s:[%d]", the_subsystem->name, the_subsystem->pid); } else { /* else that was a transient client */ crm_trace("Received HUP from transient client"); } crm_trace("Disconnecting client %s (%p)", crm_client_name(client), client); free(client->userdata); crm_client_destroy(client); trigger_fsa(fsa_source); return 0; } static void crmd_ipc_destroy(qb_ipcs_connection_t *c) { crm_trace("Connection %p", c); } /* A_STOP */ void do_stop(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t * msg_data) { if (is_heartbeat_cluster()) { stop_subsystem(pe_subsystem, FALSE); } mainloop_del_ipc_server(ipcs); register_fsa_input(C_FSA_INTERNAL, I_TERMINATE, NULL); } /* A_STARTED */ void do_started(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t * msg_data) { static struct qb_ipcs_service_handlers crmd_callbacks = { .connection_accept = crmd_ipc_accept, .connection_created = crmd_ipc_created, .msg_process = crmd_ipc_dispatch, .connection_closed = crmd_ipc_closed, .connection_destroyed = crmd_ipc_destroy }; if (cur_state != S_STARTING) { crm_err("Start cancelled... %s", fsa_state2string(cur_state)); return; } else if (is_set(fsa_input_register, R_MEMBERSHIP) == FALSE) { crm_info("Delaying start, no membership data (%.16llx)", R_MEMBERSHIP); crmd_fsa_stall(TRUE); return; } else if (is_set(fsa_input_register, R_LRM_CONNECTED) == FALSE) { crm_info("Delaying start, LRM not connected (%.16llx)", R_LRM_CONNECTED); crmd_fsa_stall(TRUE); return; } else if (is_set(fsa_input_register, R_CIB_CONNECTED) == FALSE) { crm_info("Delaying start, CIB not connected (%.16llx)", R_CIB_CONNECTED); crmd_fsa_stall(TRUE); return; } else if (is_set(fsa_input_register, R_READ_CONFIG) == FALSE) { crm_info("Delaying start, Config not read (%.16llx)", R_READ_CONFIG); crmd_fsa_stall(TRUE); return; } else if (is_set(fsa_input_register, R_PEER_DATA) == FALSE) { /* try reading from HA */ crm_info("Delaying start, No peer data (%.16llx)", R_PEER_DATA); #if SUPPORT_HEARTBEAT if (is_heartbeat_cluster()) { HA_Message *msg = NULL; crm_trace("Looking for a HA message"); msg = fsa_cluster_conn->llc_ops->readmsg(fsa_cluster_conn, 0); if (msg != NULL) { crm_trace("There was a HA message"); ha_msg_del(msg); } } #endif crmd_fsa_stall(TRUE); return; } crm_debug("Init server comms"); ipcs = mainloop_add_ipc_server(CRM_SYSTEM_CRMD, QB_IPC_NATIVE, &crmd_callbacks); if (ipcs == NULL) { crm_err("Failed to create IPC server: shutting down and inhibiting respawn"); register_fsa_error(C_FSA_INTERNAL, I_ERROR, NULL); } if (stonith_reconnect == NULL) { int dummy; stonith_reconnect = mainloop_add_trigger(G_PRIORITY_LOW, te_connect_stonith, &dummy); } set_bit(fsa_input_register, R_ST_REQUIRED); mainloop_set_trigger(stonith_reconnect); crm_notice("The local CRM is operational"); clear_bit(fsa_input_register, R_STARTING); register_fsa_input(msg_data->fsa_cause, I_PENDING, NULL); } /* A_RECOVER */ void do_recover(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t * msg_data) { set_bit(fsa_input_register, R_IN_RECOVERY); crm_err("Action %s (%.16llx) not supported", fsa_action2string(action), action); register_fsa_input(C_FSA_INTERNAL, I_TERMINATE, NULL); } /* *INDENT-OFF* */ pe_cluster_option crmd_opts[] = { /* name, old-name, validate, default, description */ { "dc-version", NULL, "string", NULL, "none", NULL, "Version of Pacemaker on the cluster's DC.", "Includes the hash which identifies the exact Mercurial changeset it was built from. Used for diagnostic purposes." }, { "cluster-infrastructure", NULL, "string", NULL, "heartbeat", NULL, "The messaging stack on which Pacemaker is currently running.", "Used for informational and diagnostic purposes." }, { XML_CONFIG_ATTR_DC_DEADTIME, "dc_deadtime", "time", NULL, "20s", &check_time, "How long to wait for a response from other nodes during startup.", "The \"correct\" value will depend on the speed/load of your network and the type of switches used." }, { XML_CONFIG_ATTR_RECHECK, "cluster_recheck_interval", "time", "Zero disables polling. Positive values are an interval in seconds (unless other SI units are specified. eg. 5min)", "15min", &check_timer, "Polling interval for time based changes to options, resource parameters and constraints.", "The Cluster is primarily event driven, however the configuration can have elements that change based on time." " To ensure these changes take effect, we can optionally poll the cluster's status for changes." }, { XML_CONFIG_ATTR_ELECTION_FAIL, "election_timeout", "time", NULL, "2min", &check_timer, "*** Advanced Use Only ***.", "If need to adjust this value, it probably indicates the presence of a bug." }, { XML_CONFIG_ATTR_FORCE_QUIT, "shutdown_escalation", "time", NULL, "20min", &check_timer, "*** Advanced Use Only ***.", "If need to adjust this value, it probably indicates the presence of a bug." }, { "crmd-integration-timeout", NULL, "time", NULL, "3min", &check_timer, "*** Advanced Use Only ***.", "If need to adjust this value, it probably indicates the presence of a bug." }, { "crmd-finalization-timeout", NULL, "time", NULL, "30min", &check_timer, "*** Advanced Use Only ***.", "If you need to adjust this value, it probably indicates the presence of a bug." }, { "crmd-transition-delay", NULL, "time", NULL, "0s", &check_timer, "*** Advanced Use Only ***\nEnabling this option will slow down cluster recovery under all conditions", "Delay cluster recovery for the configured interval to allow for additional/related events to occur.\nUseful if your configuration is sensitive to the order in which ping updates arrive." }, { XML_ATTR_EXPECTED_VOTES, NULL, "integer", NULL, "2", &check_number, "The number of nodes expected to be in the cluster", "Used to calculate quorum in openais based clusters." }, }; /* *INDENT-ON* */ void crmd_metadata(void) { config_metadata("CRM Daemon", "1.0", "CRM Daemon Options", "This is a fake resource that details the options that can be configured for the CRM Daemon.", crmd_opts, DIMOF(crmd_opts)); } static void verify_crmd_options(GHashTable * options) { verify_all_options(options, crmd_opts, DIMOF(crmd_opts)); } static const char * crmd_pref(GHashTable * options, const char *name) { return get_cluster_pref(options, crmd_opts, DIMOF(crmd_opts), name); } static void config_query_callback(xmlNode * msg, int call_id, int rc, xmlNode * output, void *user_data) { const char *value = NULL; GHashTable *config_hash = NULL; crm_time_t *now = crm_time_new(NULL); if (rc != pcmk_ok) { fsa_data_t *msg_data = NULL; crm_err("Local CIB query resulted in an error: %s", pcmk_strerror(rc)); register_fsa_error(C_FSA_INTERNAL, I_ERROR, NULL); if (rc == -EACCES || rc == -pcmk_err_dtd_validation) { crm_err("The cluster is mis-configured - shutting down and staying down"); set_bit(fsa_input_register, R_STAYDOWN); } goto bail; } crm_debug("Call %d : Parsing CIB options", call_id); config_hash = g_hash_table_new_full(crm_str_hash, g_str_equal, g_hash_destroy_str, g_hash_destroy_str); unpack_instance_attributes(output, output, XML_CIB_TAG_PROPSET, NULL, config_hash, CIB_OPTIONS_FIRST, FALSE, now); verify_crmd_options(config_hash); value = crmd_pref(config_hash, XML_CONFIG_ATTR_DC_DEADTIME); election_trigger->period_ms = crm_get_msec(value); value = crmd_pref(config_hash, XML_CONFIG_ATTR_FORCE_QUIT); shutdown_escalation_timer->period_ms = crm_get_msec(value); crm_debug("Shutdown escalation occurs after: %dms", shutdown_escalation_timer->period_ms); value = crmd_pref(config_hash, XML_CONFIG_ATTR_ELECTION_FAIL); election_timeout->period_ms = crm_get_msec(value); value = crmd_pref(config_hash, XML_CONFIG_ATTR_RECHECK); recheck_timer->period_ms = crm_get_msec(value); crm_debug("Checking for expired actions every %dms", recheck_timer->period_ms); value = crmd_pref(config_hash, "crmd-transition-delay"); transition_timer->period_ms = crm_get_msec(value); value = crmd_pref(config_hash, "crmd-integration-timeout"); integration_timer->period_ms = crm_get_msec(value); value = crmd_pref(config_hash, "crmd-finalization-timeout"); finalization_timer->period_ms = crm_get_msec(value); #if SUPPORT_COROSYNC if (is_classic_ais_cluster()) { value = crmd_pref(config_hash, XML_ATTR_EXPECTED_VOTES); crm_debug("Sending expected-votes=%s to corosync", value); send_ais_text(crm_class_quorum, value, TRUE, NULL, crm_msg_ais); } #endif set_bit(fsa_input_register, R_READ_CONFIG); crm_trace("Triggering FSA: %s", __FUNCTION__); mainloop_set_trigger(fsa_source); g_hash_table_destroy(config_hash); bail: crm_time_free(now); } gboolean crm_read_options(gpointer user_data) { int call_id = fsa_cib_conn->cmds->query(fsa_cib_conn, XML_CIB_TAG_CRMCONFIG, NULL, cib_scope_local); fsa_register_cib_callback(call_id, FALSE, NULL, config_query_callback); crm_trace("Querying the CIB... call %d", call_id); return TRUE; } /* A_READCONFIG */ void do_read_config(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t * msg_data) { mainloop_set_trigger(config_read); } void crm_shutdown(int nsig) { if (crmd_mainloop != NULL && g_main_is_running(crmd_mainloop)) { if (is_set(fsa_input_register, R_SHUTDOWN)) { crm_err("Escalating the shutdown"); register_fsa_input_before(C_SHUTDOWN, I_ERROR, NULL); } else { set_bit(fsa_input_register, R_SHUTDOWN); register_fsa_input(C_SHUTDOWN, I_SHUTDOWN, NULL); if (shutdown_escalation_timer->period_ms < 1) { const char *value = crmd_pref(NULL, XML_CONFIG_ATTR_FORCE_QUIT); int msec = crm_get_msec(value); crm_debug("Using default shutdown escalation: %dms", msec); shutdown_escalation_timer->period_ms = msec; } /* cant rely on this... */ crm_notice("Requesting shutdown, upper limit is %dms", shutdown_escalation_timer->period_ms); crm_timer_start(shutdown_escalation_timer); } } else { crm_info("exit from shutdown"); crmd_exit(EX_OK); } } diff --git a/lib/common/ipc.c b/lib/common/ipc.c index d43365d70c..09088f0e7b 100644 --- a/lib/common/ipc.c +++ b/lib/common/ipc.c @@ -1,1081 +1,1082 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include +#include #include #include #include #include #include #include #include struct crm_ipc_request_header { struct qb_ipc_request_header qb; uint32_t flags; }; struct crm_ipc_response_header { struct qb_ipc_response_header qb; uint32_t size_uncompressed; uint32_t size_compressed; uint32_t flags; }; static int hdr_offset = 0; static int ipc_buffer_max = 0; static int pick_ipc_buffer(int max); static inline void crm_ipc_init(void) { if(hdr_offset == 0) { hdr_offset = sizeof(struct crm_ipc_response_header); } if(ipc_buffer_max == 0) { ipc_buffer_max = pick_ipc_buffer(0); } } static char * generateReference(const char *custom1, const char *custom2) { static uint ref_counter = 0; const char *local_cust1 = custom1; const char *local_cust2 = custom2; int reference_len = 4; char *since_epoch = NULL; reference_len += 20; /* too big */ reference_len += 40; /* too big */ if (local_cust1 == NULL) { local_cust1 = "_empty_"; } reference_len += strlen(local_cust1); if (local_cust2 == NULL) { local_cust2 = "_empty_"; } reference_len += strlen(local_cust2); since_epoch = calloc(1, reference_len); if (since_epoch != NULL) { sprintf(since_epoch, "%s-%s-%ld-%u", local_cust1, local_cust2, (unsigned long)time(NULL), ref_counter++); } return since_epoch; } xmlNode * create_request_adv(const char *task, xmlNode * msg_data, const char *host_to, const char *sys_to, const char *sys_from, const char *uuid_from, const char *origin) { char *true_from = NULL; xmlNode *request = NULL; char *reference = generateReference(task, sys_from); if (uuid_from != NULL) { true_from = generate_hash_key(sys_from, uuid_from); } else if (sys_from != NULL) { true_from = strdup(sys_from); } else { crm_err("No sys from specified"); } /* host_from will get set for us if necessary by CRMd when routed */ request = create_xml_node(NULL, __FUNCTION__); crm_xml_add(request, F_CRM_ORIGIN, origin); crm_xml_add(request, F_TYPE, T_CRM); crm_xml_add(request, F_CRM_VERSION, CRM_FEATURE_SET); crm_xml_add(request, F_CRM_MSG_TYPE, XML_ATTR_REQUEST); crm_xml_add(request, F_CRM_REFERENCE, reference); crm_xml_add(request, F_CRM_TASK, task); crm_xml_add(request, F_CRM_SYS_TO, sys_to); crm_xml_add(request, F_CRM_SYS_FROM, true_from); /* HOSTTO will be ignored if it is to the DC anyway. */ if (host_to != NULL && strlen(host_to) > 0) { crm_xml_add(request, F_CRM_HOST_TO, host_to); } if (msg_data != NULL) { add_message_xml(request, F_CRM_DATA, msg_data); } free(reference); free(true_from); return request; } /* * This method adds a copy of xml_response_data */ xmlNode * create_reply_adv(xmlNode * original_request, xmlNode * xml_response_data, const char *origin) { xmlNode *reply = NULL; const char *host_from = crm_element_value(original_request, F_CRM_HOST_FROM); const char *sys_from = crm_element_value(original_request, F_CRM_SYS_FROM); const char *sys_to = crm_element_value(original_request, F_CRM_SYS_TO); const char *type = crm_element_value(original_request, F_CRM_MSG_TYPE); const char *operation = crm_element_value(original_request, F_CRM_TASK); const char *crm_msg_reference = crm_element_value(original_request, F_CRM_REFERENCE); if (type == NULL) { crm_err("Cannot create new_message," " no message type in original message"); CRM_ASSERT(type != NULL); return NULL; #if 0 } else if (strcasecmp(XML_ATTR_REQUEST, type) != 0) { crm_err("Cannot create new_message," " original message was not a request"); return NULL; #endif } reply = create_xml_node(NULL, __FUNCTION__); crm_xml_add(reply, F_CRM_ORIGIN, origin); crm_xml_add(reply, F_TYPE, T_CRM); crm_xml_add(reply, F_CRM_VERSION, CRM_FEATURE_SET); crm_xml_add(reply, F_CRM_MSG_TYPE, XML_ATTR_RESPONSE); crm_xml_add(reply, F_CRM_REFERENCE, crm_msg_reference); crm_xml_add(reply, F_CRM_TASK, operation); /* since this is a reply, we reverse the from and to */ crm_xml_add(reply, F_CRM_SYS_TO, sys_from); crm_xml_add(reply, F_CRM_SYS_FROM, sys_to); /* HOSTTO will be ignored if it is to the DC anyway. */ if (host_from != NULL && strlen(host_from) > 0) { crm_xml_add(reply, F_CRM_HOST_TO, host_from); } if (xml_response_data != NULL) { add_message_xml(reply, F_CRM_DATA, xml_response_data); } return reply; } /* Libqb based IPC */ /* Server... */ GHashTable *client_connections = NULL; crm_client_t * crm_client_get(qb_ipcs_connection_t *c) { if(client_connections) { return g_hash_table_lookup(client_connections, c); } crm_trace("No client found for %p", c); return NULL; } crm_client_t * crm_client_get_by_id(const char *id) { gpointer key; crm_client_t *client; GHashTableIter iter; if(client_connections) { g_hash_table_iter_init(&iter, client_connections); while (g_hash_table_iter_next(&iter, &key, (gpointer *) & client)) { if(strcmp(client->id, id) == 0) { return client; } } } crm_trace("No client found with id=%s", id); return NULL; } const char * crm_client_name(crm_client_t *c) { if(c == NULL){ return "null"; } else if(c->name == NULL && c->id == NULL) { return "unknown"; } else if(c->name == NULL) { return c->id; } else { return c->name; } } void crm_client_init(void) { if(client_connections == NULL) { crm_trace("Creating client hash table"); client_connections = g_hash_table_new(g_direct_hash, g_direct_equal); } } void crm_client_cleanup(void) { if(client_connections == NULL) { int active = g_hash_table_size(client_connections); if(active) { crm_err("Exiting with %d active connections", active); } g_hash_table_destroy(client_connections); } } crm_client_t * crm_client_new(qb_ipcs_connection_t *c, uid_t uid, gid_t gid) { crm_client_t *client = NULL; CRM_LOG_ASSERT(c); if(c == NULL) { return NULL; } crm_client_init(); client = calloc(1, sizeof(crm_client_t)); client->ipcs = c; client->kind = CRM_CLIENT_IPC; client->pid = crm_ipcs_client_pid(c); client->id = crm_generate_uuid(); crm_info("Connecting %p for uid=%d gid=%d pid=%u id=%s", c, uid, gid, client->pid, client->id); #if ENABLE_ACL { struct group *crm_grp = NULL; crm_grp = getgrnam(CRM_DAEMON_GROUP); if (crm_grp) { qb_ipcs_connection_auth_set(c, -1, crm_grp->gr_gid, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); } client->user = uid2username(uid); } #endif g_hash_table_insert(client_connections, c, client); return client; } void crm_client_destroy(crm_client_t *c) { if(c == NULL) { return; } if(client_connections) { if(c->ipcs) { crm_trace("Destroying %p/%p (%d remaining)", c, c->ipcs, crm_hash_table_size(client_connections) - 1); g_hash_table_remove(client_connections, c->ipcs); } else { crm_trace("Destroying remote connection %p (%d remaining)", c, crm_hash_table_size(client_connections) - 1); g_hash_table_remove(client_connections, c->id); } } if(c->event_timer) { g_source_remove(c->event_timer); } crm_info("Destroying %d events", g_list_length(c->event_queue)); while(c->event_queue) { struct iovec *event = c->event_queue->data; c->event_queue = g_list_remove(c->event_queue, event); free(event[0].iov_base); free(event[1].iov_base); free(event); } free(c->id); free(c->name); free(c->user); if(c->remote) { if (c->remote->auth_timeout) { g_source_remove(c->remote->auth_timeout); } free(c->remote->buffer); free(c->remote); } free(c); } int crm_ipcs_client_pid(qb_ipcs_connection_t *c) { struct qb_ipcs_connection_stats stats; stats.client_pid = 0; qb_ipcs_connection_stats_get(c, &stats, 0); return stats.client_pid; } xmlNode * crm_ipcs_recv(crm_client_t *c, void *data, size_t size, uint32_t *id, uint32_t *flags) { char *text = ((char*)data) + sizeof(struct crm_ipc_request_header); crm_trace("Received %.200s", text); if(id) { *id = ((struct qb_ipc_request_header*)data)->id; } if(flags) { *flags = ((struct crm_ipc_request_header*)data)->flags; } return string2xml(text); } ssize_t crm_ipcs_flush_events(crm_client_t *c); static gboolean crm_ipcs_flush_events_cb(gpointer data) { crm_client_t *c = data; c->event_timer = 0; crm_ipcs_flush_events(c); return FALSE; } ssize_t crm_ipcs_flush_events(crm_client_t *c) { int sent = 0; ssize_t rc = 0; int queue_len = 0; if(c == NULL) { return pcmk_ok; } else if(c->event_timer) { /* There is already a timer, wait until it goes off */ crm_trace("Timer active for %p - %d", c->ipcs, c->event_timer); return pcmk_ok; } queue_len = g_list_length(c->event_queue); while(c->event_queue && sent < 100) { struct crm_ipc_response_header *header = NULL; struct iovec *event = c->event_queue->data; rc = qb_ipcs_event_sendv(c->ipcs, event, 2); if(rc < 0) { break; } sent++; header = event[0].iov_base; if(header->flags & crm_ipc_compressed) { crm_trace("Event %d to %p[%d] (%d compressed bytes) sent", header->qb.id, c->ipcs, c->pid, rc); } else { crm_trace("Event %d to %p[%d] (%d bytes) sent: %.120s", header->qb.id, c->ipcs, c->pid, rc, event[1].iov_base); } c->event_queue = g_list_remove(c->event_queue, event); free(event[0].iov_base); free(event[1].iov_base); free(event); } queue_len -= sent; if(sent > 0 || c->event_queue) { crm_trace("Sent %d events (%d remaining) for %p[%d]: %s", sent, queue_len, c->ipcs, c->pid, pcmk_strerror(rc<0?rc:0)); } if(c->event_queue) { if(queue_len % 100 == 0 && queue_len > 99) { crm_warn("Event queue for %p[%d] has grown to %d", c->ipcs, c->pid, queue_len); } else if(queue_len > 500) { crm_err("Evicting slow client %p[%d]: event queue reached %d entries", c->ipcs, c->pid, queue_len); qb_ipcs_disconnect(c->ipcs); return rc; } c->event_timer = g_timeout_add(1000 + 100 * queue_len, crm_ipcs_flush_events_cb, c); } return rc; } ssize_t crm_ipcs_prepare(uint32_t request, xmlNode *message, struct iovec **result) { static int biggest = 0; struct iovec *iov; unsigned int total = 0; char *compressed = NULL; char *buffer = dump_xml_unformatted(message); struct crm_ipc_response_header *header = calloc(1, sizeof(struct crm_ipc_response_header)); CRM_ASSERT(result != NULL); iov = calloc(2, sizeof(struct iovec)); crm_ipc_init(); iov[0].iov_len = hdr_offset; iov[0].iov_base = header; header->size_uncompressed = 1 + strlen(buffer); total = hdr_offset + header->size_uncompressed; if(total < ipc_buffer_max) { iov[1].iov_base = buffer; iov[1].iov_len = header->size_uncompressed; } else { unsigned int new_size = 0; if(total > biggest) { biggest = 2*QB_MAX(total, biggest); crm_notice("Message exceeds the configured ipc limit (%d bytes), " "consider configuring PCMK_ipc_buffer to %d or higher " "to avoid compression overheads", ipc_buffer_max, biggest); } if(crm_compress_string( buffer, header->size_uncompressed, ipc_buffer_max, &compressed, &new_size)) { header->flags |= crm_ipc_compressed; header->size_compressed = new_size; iov[1].iov_len = header->size_compressed; iov[1].iov_base = compressed; free(buffer); } else { ssize_t rc = -EMSGSIZE; crm_log_xml_trace(message, "EMSGSIZE"); crm_err("Could not compress the message into less than the configured ipc limit (%d bytes)." "Set PCMK_ipc_buffer to a higher value (%d bytes suggested)", ipc_buffer_max, biggest); free(compressed); free(buffer); free(header); free(iov); return rc; } } header->qb.size = iov[0].iov_len + iov[1].iov_len; header->qb.id = request; /* Replying to a specific request */ *result = iov; return header->qb.size; } ssize_t crm_ipcs_sendv(crm_client_t *c, struct iovec *iov, enum crm_ipc_server_flags flags) { ssize_t rc; static uint32_t id = 1; struct crm_ipc_response_header *header = iov[0].iov_base; if(flags & crm_ipc_server_event) { header->qb.id = id++; /* We don't really use it, but doesn't hurt to set one */ if(flags & crm_ipc_server_free) { crm_trace("Sending the original to %p[%d]", c->ipcs, c->pid); c->event_queue = g_list_append(c->event_queue, iov); } else { struct iovec *iov_copy = calloc(2, sizeof(struct iovec)); crm_trace("Sending a copy to %p[%d]", c->ipcs, c->pid); iov_copy[0].iov_len = iov[0].iov_len; iov_copy[0].iov_base = malloc(iov[0].iov_len); memcpy(iov_copy[0].iov_base,iov[0].iov_base, iov[0].iov_len); iov_copy[1].iov_len = iov[1].iov_len; iov_copy[1].iov_base = malloc(iov[1].iov_len); memcpy(iov_copy[1].iov_base,iov[1].iov_base, iov[1].iov_len); c->event_queue = g_list_append(c->event_queue, iov_copy); } } else { CRM_LOG_ASSERT (header->qb.id != 0); /* Replying to a specific request */ rc = qb_ipcs_response_sendv(c->ipcs, iov, 2); if(rc < header->qb.size) { crm_notice("Response %d to %p[%d] (%d bytes) failed: %s (%d)", header->qb.id, c->ipcs, c->pid, header->qb.size, pcmk_strerror(rc), rc); } else { crm_trace("Response %d sent, %d bytes to %p[%d]", header->qb.id, rc, c->ipcs, c->pid); } if(header->flags & crm_ipc_server_free) { free(iov[0].iov_base); free(iov[1].iov_base); free(iov); } } if(flags & crm_ipc_server_event) { rc = crm_ipcs_flush_events(c); } else { crm_ipcs_flush_events(c); } if(rc == -EPIPE || rc == -ENOTCONN) { crm_trace("Client %p disconnected", c->ipcs); } return rc; } ssize_t crm_ipcs_send(crm_client_t *c, uint32_t request, xmlNode *message, enum crm_ipc_server_flags flags) { struct iovec *iov = NULL; ssize_t rc = crm_ipcs_prepare(request, message, &iov); if(rc > 0) { rc = crm_ipcs_sendv(c, iov, flags|crm_ipc_server_free); } else { crm_notice("Message to %p[%d] failed: %s (%d)", c->ipcs, c->pid, pcmk_strerror(rc), rc); } return rc; } void crm_ipcs_send_ack( crm_client_t *c, uint32_t request, const char *tag, const char *function, int line) { xmlNode *ack = create_xml_node(NULL, tag); crm_xml_add(ack, "function", function); crm_xml_add_int(ack, "line", line); crm_ipcs_send(c, request, ack, 0); free_xml(ack); } /* Client... */ #define MIN_MSG_SIZE 12336 /* sizeof(struct qb_ipc_connection_response) */ #define MAX_MSG_SIZE 20*1024 /* 20k default */ struct crm_ipc_s { struct pollfd pfd; int buf_size; int msg_size; int need_reply; char *buffer; char *name; qb_ipcc_connection_t *ipc; }; static int pick_ipc_buffer(int max) { const char *env = getenv("PCMK_ipc_buffer"); if(env) { max = crm_parse_int(env, "0"); } if(max <= 0) { max = MAX_MSG_SIZE; } if(max < MIN_MSG_SIZE) { max = MIN_MSG_SIZE; } crm_trace("Using max message size of %d", max); return max; } crm_ipc_t * crm_ipc_new(const char *name, size_t max_size) { crm_ipc_t *client = NULL; client = calloc(1, sizeof(crm_ipc_t)); client->name = strdup(name); client->buf_size = pick_ipc_buffer(max_size); client->buffer = malloc(client->buf_size); client->pfd.fd = -1; client->pfd.events = POLLIN; client->pfd.revents = 0; return client; } bool crm_ipc_connect(crm_ipc_t *client) { client->need_reply = FALSE; client->ipc = qb_ipcc_connect(client->name, client->buf_size); if (client->ipc == NULL) { crm_perror(LOG_INFO, "Could not establish %s connection", client->name); return FALSE; } client->pfd.fd = crm_ipc_get_fd(client); if(client->pfd.fd < 0) { crm_perror(LOG_INFO, "Could not obtain file descriptor for %s connection", client->name); return FALSE; } qb_ipcc_context_set(client->ipc, client); return TRUE; } void crm_ipc_close(crm_ipc_t *client) { if(client) { crm_trace("Disconnecting %s IPC connection %p (%p.%p)", client->name, client, client->ipc); if(client->ipc) { qb_ipcc_connection_t *ipc = client->ipc; client->ipc = NULL; qb_ipcc_disconnect(ipc); } } } void crm_ipc_destroy(crm_ipc_t *client) { if(client) { if(client->ipc && qb_ipcc_is_connected(client->ipc)) { crm_notice("Destroying an active IPC connection to %s", client->name); /* The next line is basically unsafe * * If this connection was attached to mainloop and mainloop is active, * the 'disconnected' callback will end up back here and we'll end * up free'ing the memory twice - something that can still happen * even without this if we destroy a connection and it closes before * we call exit */ /* crm_ipc_close(client); */ } crm_trace("Destroying IPC connection to %s: %p", client->name, client); free(client->buffer); free(client->name); free(client); } } int crm_ipc_get_fd(crm_ipc_t *client) { int fd = 0; CRM_ASSERT(client != NULL); if(client->ipc && qb_ipcc_fd_get(client->ipc, &fd) == 0) { return fd; } crm_perror(LOG_ERR, "Could not obtain file IPC descriptor for %s", client->name); return -EINVAL; } bool crm_ipc_connected(crm_ipc_t *client) { bool rc = FALSE; if(client == NULL) { crm_trace("No client"); return FALSE; } else if(client->ipc == NULL) { crm_trace("No connection"); return FALSE; } else if(client->pfd.fd < 0) { crm_trace("Bad descriptor"); return FALSE; } rc = qb_ipcc_is_connected(client->ipc); if(rc == FALSE) { client->pfd.fd = -EINVAL; } return rc; } int crm_ipc_ready(crm_ipc_t *client) { CRM_ASSERT(client != NULL); if(crm_ipc_connected(client) == FALSE) { return -ENOTCONN; } client->pfd.revents = 0; return poll(&(client->pfd), 1, 0); } static int crm_ipc_decompress(crm_ipc_t *client) { struct crm_ipc_response_header * header = (struct crm_ipc_response_header *)client->buffer; if(header->flags & crm_ipc_compressed) { int rc = 0; unsigned int size_u = 1 + header->size_uncompressed; char *uncompressed = calloc(1, hdr_offset + size_u); crm_info("Decompressing message data %d bytes into %d bytes", header->size_compressed, size_u); rc = BZ2_bzBuffToBuffDecompress( uncompressed + hdr_offset, &size_u, client->buffer + hdr_offset, header->size_compressed, 1, 0); if(rc != BZ_OK) { crm_err("Decompression failed: %s (%d)", bz2_strerror(rc), rc); free(uncompressed); return -EREMOTEIO; } CRM_ASSERT(header->size_uncompressed > ipc_buffer_max); CRM_ASSERT(size_u == header->size_uncompressed); memcpy(uncompressed, client->buffer, hdr_offset); /* Preserve the header */ header = (struct crm_ipc_response_header *)uncompressed; free(client->buffer); client->buf_size = hdr_offset + size_u; client->buffer = uncompressed; } CRM_ASSERT(client->buffer[hdr_offset + header->size_uncompressed - 1] == 0); return pcmk_ok; } long crm_ipc_read(crm_ipc_t *client) { struct crm_ipc_response_header *header = NULL; CRM_ASSERT(client != NULL); CRM_ASSERT(client->ipc != NULL); CRM_ASSERT(client->buffer != NULL); crm_ipc_init(); client->buffer[0] = 0; client->msg_size = qb_ipcc_event_recv(client->ipc, client->buffer, client->buf_size-1, 0); if(client->msg_size >= 0) { int rc = crm_ipc_decompress(client); if(rc != pcmk_ok) { return rc; } header = (struct crm_ipc_response_header *)client->buffer; crm_trace("Recieved %s event %d, size=%d, rc=%d, text: %.100s", client->name, header->qb.id, header->qb.size, client->msg_size, client->buffer+hdr_offset); } else { crm_trace("No message from %s recieved: %s", client->name, pcmk_strerror(client->msg_size)); } if(crm_ipc_connected(client) == FALSE || client->msg_size == -ENOTCONN) { crm_err("Connection to %s failed", client->name); } if(header) { /* Data excluding the header */ return header->size_uncompressed; } return -ENOMSG; } const char * crm_ipc_buffer(crm_ipc_t *client) { CRM_ASSERT(client != NULL); return client->buffer + sizeof(struct crm_ipc_response_header); } const char *crm_ipc_name(crm_ipc_t *client) { CRM_ASSERT(client != NULL); return client->name; } static int internal_ipc_send_recv(crm_ipc_t *client, const void *iov) { int rc = 0; do { rc = qb_ipcc_sendv_recv(client->ipc, iov, 2, client->buffer, client->buf_size, -1); } while(rc == -EAGAIN && crm_ipc_connected(client)); return rc; } static int internal_ipc_send_request(crm_ipc_t *client, const void *iov, int ms_timeout) { int rc = 0; time_t timeout = time(NULL) + 1 + (ms_timeout / 1000); do { rc = qb_ipcc_sendv(client->ipc, iov, 2); } while(rc == -EAGAIN && time(NULL) < timeout && crm_ipc_connected(client)); return rc; } static int internal_ipc_get_reply(crm_ipc_t *client, int request_id, int ms_timeout) { time_t timeout = time(NULL) + 1 + (ms_timeout / 1000); int rc = 0; crm_ipc_init(); /* get the reply */ crm_trace("client %s waiting on reply to msg id %d", client->name, request_id); do { rc = qb_ipcc_recv(client->ipc, client->buffer, client->buf_size, 1000); if(rc > 0) { struct crm_ipc_response_header *hdr = NULL; int rc = crm_ipc_decompress(client); if(rc != pcmk_ok) { return rc; } hdr = (struct crm_ipc_response_header *)client->buffer; if(hdr->qb.id == request_id) { /* Got it */ break; } else if(hdr->qb.id < request_id){ xmlNode *bad = string2xml(crm_ipc_buffer(client)); crm_err("Discarding old reply %d (need %d)", hdr->qb.id, request_id); crm_log_xml_notice(bad, "OldIpcReply"); } else { xmlNode *bad = string2xml(crm_ipc_buffer(client)); crm_err("Discarding newer reply %d (need %d)", hdr->qb.id, request_id); crm_log_xml_notice(bad, "ImpossibleReply"); CRM_ASSERT(hdr->qb.id <= request_id); } } else if (crm_ipc_connected(client) == FALSE) { crm_err("Server disconnected client %s while waiting for msg id %d", client->name, request_id); break; } } while(time(NULL) < timeout); return rc; } int crm_ipc_send(crm_ipc_t *client, xmlNode *message, enum crm_ipc_flags flags, int32_t ms_timeout, xmlNode **reply) { long rc = 0; struct iovec iov[2]; static uint32_t id = 0; struct crm_ipc_request_header header; char *buffer = NULL; crm_ipc_init(); if(client == NULL) { crm_notice("Invalid connection"); return -ENOTCONN; } else if(crm_ipc_connected(client) == FALSE) { /* Don't even bother */ crm_notice("Connection to %s closed", client->name); return -ENOTCONN; } if(client->need_reply) { crm_trace("Trying again to obtain pending reply from %s", client->name); rc = qb_ipcc_recv(client->ipc, client->buffer, client->buf_size, 300); if(rc < 0) { crm_warn("Sending to %s (%p) is disabled until pending reply is recieved", client->name, client->ipc); free(buffer); return -EREMOTEIO; } else { crm_notice("Lost reply from %s (%p) finally arrived, sending re-enabled", client->name, client->ipc); client->need_reply = FALSE; } } buffer = dump_xml_unformatted(message); iov[0].iov_len = sizeof(struct crm_ipc_request_header); iov[0].iov_base = &header; iov[1].iov_len = 1 + strlen(buffer); iov[1].iov_base = buffer; header.qb.id = ++id; header.qb.size = iov[0].iov_len + iov[1].iov_len; header.flags = flags; if(ms_timeout == 0) { ms_timeout = 5000; } crm_trace("Sending from client: %s request id: %d bytes: %u timeout:%d msg: %.200s...", client->name, header.qb.id, header.qb.size, ms_timeout, buffer); if(ms_timeout > 0) { rc = internal_ipc_send_request(client, iov, ms_timeout); if (rc <= 0) { crm_trace("Failed to send from client %s request %d with %u bytes: %.200s...", client->name, header.qb.id, header.qb.size, buffer); goto send_cleanup; } else if(is_not_set(flags, crm_ipc_client_response)) { crm_trace("Message sent, not waiting for reply to %d from %s to %u bytes: %.200s...", header.qb.id, client->name, header.qb.size, buffer); goto send_cleanup; } rc = internal_ipc_get_reply(client, header.qb.id, ms_timeout); if(rc < 0) { /* No reply, for now, disable sending * * The alternative is to close the connection since we don't know * how to detect and discard out-of-sequence replies * * TODO - implement the above */ client->need_reply = TRUE; } } else { rc = internal_ipc_send_recv(client, iov); } if(rc > 0) { struct crm_ipc_response_header *hdr = (struct crm_ipc_response_header *)client->buffer; crm_trace("Recieved response %d, size=%d, rc=%ld, text: %.200s", hdr->qb.id, hdr->qb.size, rc, crm_ipc_buffer(client)); if(reply) { *reply = string2xml(crm_ipc_buffer(client)); } } else { crm_trace("Response not recieved: rc=%ld, errno=%d", rc, errno); } send_cleanup: if(crm_ipc_connected(client) == FALSE) { crm_notice("Connection to %s closed: %s (%ld)", client->name, pcmk_strerror(rc), rc); } else if(rc == -ETIMEDOUT) { crm_warn("Request %d to %s (%p) failed: %s (%ld) after %dms", header.qb.id, client->name, client->ipc, pcmk_strerror(rc), rc, ms_timeout); crm_info("Request was %.120s", buffer); crm_write_blackbox(0, NULL); } else if(rc <= 0) { crm_warn("Request %d to %s (%p) failed: %s (%ld)", header.qb.id, client->name, client->ipc, pcmk_strerror(rc), rc); crm_info("Request was %.120s", buffer); } free(buffer); return rc; } /* Utils */ xmlNode * create_hello_message(const char *uuid, const char *client_name, const char *major_version, const char *minor_version) { xmlNode *hello_node = NULL; xmlNode *hello = NULL; if (uuid == NULL || strlen(uuid) == 0 || client_name == NULL || strlen(client_name) == 0 || major_version == NULL || strlen(major_version) == 0 || minor_version == NULL || strlen(minor_version) == 0) { crm_err("Missing fields, Hello message will not be valid."); return NULL; } hello_node = create_xml_node(NULL, XML_TAG_OPTIONS); crm_xml_add(hello_node, "major_version", major_version); crm_xml_add(hello_node, "minor_version", minor_version); crm_xml_add(hello_node, "client_name", client_name); crm_xml_add(hello_node, "client_uuid", uuid); crm_trace("creating hello message"); hello = create_request(CRM_OP_HELLO, hello_node, NULL, NULL, client_name, uuid); free_xml(hello_node); return hello; } diff --git a/tools/attrd.c b/tools/attrd.c index 7fb70aaf95..1338547e64 100644 --- a/tools/attrd.c +++ b/tools/attrd.c @@ -1,886 +1,885 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include -#include #include #include #include #include #include #include #include #include #include #define OPTARGS "hV" #if SUPPORT_HEARTBEAT ll_cluster_t *attrd_cluster_conn; #endif GMainLoop *mainloop = NULL; char *attrd_uname = NULL; char *attrd_uuid = NULL; gboolean need_shutdown = FALSE; GHashTable *attr_hash = NULL; cib_t *cib_conn = NULL; typedef struct attr_hash_entry_s { char *uuid; char *id; char *set; char *section; char *value; char *stored_value; int timeout; char *dampen; guint timer_id; char *user; } attr_hash_entry_t; void attrd_local_callback(xmlNode * msg); gboolean attrd_timer_callback(void *user_data); gboolean attrd_trigger_update(attr_hash_entry_t * hash_entry); void attrd_perform_update(attr_hash_entry_t * hash_entry); static void free_hash_entry(gpointer data) { attr_hash_entry_t *entry = data; if (entry == NULL) { return; } free(entry->id); free(entry->set); free(entry->dampen); free(entry->section); free(entry->uuid); free(entry->value); free(entry->stored_value); free(entry->user); free(entry); } static int32_t attrd_ipc_accept(qb_ipcs_connection_t *c, uid_t uid, gid_t gid) { crm_trace("Connection %p", c); if (need_shutdown) { crm_info("Ignoring new client [%d] during shutdown", crm_ipcs_client_pid(c)); return -EPERM; } if(crm_client_new(c, uid, gid) == NULL) { return -EIO; } return 0; } static void attrd_ipc_created(qb_ipcs_connection_t *c) { crm_trace("Connection %p", c); } /* Exit code means? */ static int32_t attrd_ipc_dispatch(qb_ipcs_connection_t *c, void *data, size_t size) { uint32_t id = 0; uint32_t flags = 0; crm_client_t *client = crm_client_get(c); xmlNode *msg = crm_ipcs_recv(client, data, size, &id, &flags); if(flags & crm_ipc_client_response) { crm_trace("Ack'ing msg from %d (%p)", crm_ipcs_client_pid(c), c); crm_ipcs_send_ack(client, id, "ack", __FUNCTION__, __LINE__); } if (msg == NULL) { crm_debug("No msg from %d (%p)", crm_ipcs_client_pid(c), c); return 0; } #if ENABLE_ACL determine_request_user(client->user, msg, F_ATTRD_USER); #endif crm_trace("Processing msg from %d (%p)", crm_ipcs_client_pid(c), c); crm_log_xml_trace(msg, __PRETTY_FUNCTION__); attrd_local_callback(msg); free_xml(msg); return 0; } /* Error code means? */ static int32_t attrd_ipc_closed(qb_ipcs_connection_t *c) { crm_client_t *client = crm_client_get(c); crm_trace("Connection %p", c); crm_client_destroy(client); return 0; } static void attrd_ipc_destroy(qb_ipcs_connection_t *c) { crm_trace("Connection %p", c); } struct qb_ipcs_service_handlers ipc_callbacks = { .connection_accept = attrd_ipc_accept, .connection_created = attrd_ipc_created, .msg_process = attrd_ipc_dispatch, .connection_closed = attrd_ipc_closed, .connection_destroyed = attrd_ipc_destroy }; static void attrd_shutdown(int nsig) { need_shutdown = TRUE; crm_info("Exiting"); if (mainloop != NULL && g_main_is_running(mainloop)) { g_main_quit(mainloop); } else { crm_exit(0); } } static void usage(const char *cmd, int exit_status) { FILE *stream; stream = exit_status ? stderr : stdout; fprintf(stream, "usage: %s [-srkh] [-c configure file]\n", cmd); /* fprintf(stream, "\t-d\tsets debug level\n"); */ /* fprintf(stream, "\t-s\tgets daemon status\n"); */ /* fprintf(stream, "\t-r\trestarts daemon\n"); */ /* fprintf(stream, "\t-k\tstops daemon\n"); */ /* fprintf(stream, "\t-h\thelp message\n"); */ fflush(stream); crm_exit(exit_status); } static void stop_attrd_timer(attr_hash_entry_t * hash_entry) { if (hash_entry != NULL && hash_entry->timer_id != 0) { crm_trace("Stopping %s timer", hash_entry->id); g_source_remove(hash_entry->timer_id); hash_entry->timer_id = 0; } } static void log_hash_entry(int level, attr_hash_entry_t * entry, const char *text) { do_crm_log(level, "%s: Set: %s, Name: %s, Value: %s, Timeout: %s", text, entry->section, entry->id, entry->value, entry->dampen); } static attr_hash_entry_t * find_hash_entry(xmlNode * msg) { const char *value = NULL; const char *attr = crm_element_value(msg, F_ATTRD_ATTRIBUTE); attr_hash_entry_t *hash_entry = NULL; if (attr == NULL) { crm_info("Ignoring message with no attribute name"); return NULL; } hash_entry = g_hash_table_lookup(attr_hash, attr); if (hash_entry == NULL) { /* create one and add it */ crm_info("Creating hash entry for %s", attr); hash_entry = calloc(1, sizeof(attr_hash_entry_t)); hash_entry->id = strdup(attr); g_hash_table_insert(attr_hash, hash_entry->id, hash_entry); hash_entry = g_hash_table_lookup(attr_hash, attr); CRM_CHECK(hash_entry != NULL, return NULL); } value = crm_element_value(msg, F_ATTRD_SET); if (value != NULL) { free(hash_entry->set); hash_entry->set = strdup(value); crm_debug("\t%s->set: %s", attr, value); } value = crm_element_value(msg, F_ATTRD_SECTION); if (value == NULL) { value = XML_CIB_TAG_STATUS; } free(hash_entry->section); hash_entry->section = strdup(value); crm_trace("\t%s->section: %s", attr, value); value = crm_element_value(msg, F_ATTRD_DAMPEN); if (value != NULL) { free(hash_entry->dampen); hash_entry->dampen = strdup(value); hash_entry->timeout = crm_get_msec(value); crm_trace("\t%s->timeout: %s", attr, value); } #if ENABLE_ACL free(hash_entry->user); value = crm_element_value(msg, F_ATTRD_USER); if (value != NULL) { hash_entry->user = strdup(value); crm_trace("\t%s->user: %s", attr, value); } #endif log_hash_entry(LOG_DEBUG_2, hash_entry, "Found (and updated) entry:"); return hash_entry; } #if SUPPORT_HEARTBEAT static void attrd_ha_connection_destroy(gpointer user_data) { crm_trace("Invoked"); if (need_shutdown) { /* we signed out, so this is expected */ crm_info("Heartbeat disconnection complete"); return; } crm_crit("Lost connection to heartbeat service!"); if (mainloop != NULL && g_main_is_running(mainloop)) { g_main_quit(mainloop); return; } crm_exit(EX_OK); } static void attrd_ha_callback(HA_Message * msg, void *private_data) { attr_hash_entry_t *hash_entry = NULL; xmlNode *xml = convert_ha_message(NULL, msg, __FUNCTION__); const char *from = crm_element_value(xml, F_ORIG); const char *op = crm_element_value(xml, F_ATTRD_TASK); const char *host = crm_element_value(xml, F_ATTRD_HOST); const char *ignore = crm_element_value(xml, F_ATTRD_IGNORE_LOCALLY); if (host != NULL && safe_str_eq(host, attrd_uname)) { crm_info("Update relayed from %s", from); attrd_local_callback(xml); } else if (ignore == NULL || safe_str_neq(from, attrd_uname)) { crm_info("%s message from %s", op, from); hash_entry = find_hash_entry(xml); stop_attrd_timer(hash_entry); attrd_perform_update(hash_entry); } free_xml(xml); } #endif #if SUPPORT_COROSYNC static gboolean attrd_ais_dispatch(int kind, const char *from, const char *data) { xmlNode *xml = NULL; if (kind == crm_class_cluster) { xml = string2xml(data); if (xml == NULL) { crm_err("Bad message received: '%.120s'", data); } } if (xml != NULL) { attr_hash_entry_t *hash_entry = NULL; const char *op = crm_element_value(xml, F_ATTRD_TASK); const char *host = crm_element_value(xml, F_ATTRD_HOST); const char *ignore = crm_element_value(xml, F_ATTRD_IGNORE_LOCALLY); /* crm_xml_add_int(xml, F_SEQ, wrapper->id); */ crm_xml_add(xml, F_ORIG, from); if (host != NULL && safe_str_eq(host, attrd_uname)) { crm_notice("Update relayed from %s", from); attrd_local_callback(xml); } else if (ignore == NULL || safe_str_neq(from, attrd_uname)) { crm_trace("%s message from %s", op, from); hash_entry = find_hash_entry(xml); stop_attrd_timer(hash_entry); attrd_perform_update(hash_entry); } free_xml(xml); } return TRUE; } static void attrd_ais_destroy(gpointer unused) { if (need_shutdown) { /* we signed out, so this is expected */ crm_info("Corosync disconnection complete"); return; } crm_crit("Lost connection to Corosync service!"); if (mainloop != NULL && g_main_is_running(mainloop)) { g_main_quit(mainloop); return; } crm_exit(EX_USAGE); } #endif static void attrd_cib_connection_destroy(gpointer user_data) { cib_t *conn = user_data; conn->cmds->signoff(conn); /* Ensure IPC is cleaned up */ if (need_shutdown) { crm_info("Connection to the CIB terminated..."); } else { /* eventually this will trigger a reconnect, not a shutdown */ crm_err("Connection to the CIB terminated..."); crm_exit(1); } return; } static void update_for_hash_entry(gpointer key, gpointer value, gpointer user_data) { attr_hash_entry_t *entry = value; if (entry->value != NULL) { attrd_timer_callback(value); } } static void local_update_for_hash_entry(gpointer key, gpointer value, gpointer user_data) { attr_hash_entry_t *entry = value; if (entry->timer_id == 0) { crm_trace("Performing local-only update after replace for %s", entry->id); attrd_perform_update(entry); /* } else { * just let the timer expire and attrd_timer_callback() will do the right thing */ } } static void do_cib_replaced(const char *event, xmlNode * msg) { crm_info("Updating all attributes after %s event", event); g_hash_table_foreach(attr_hash, local_update_for_hash_entry, NULL); } static gboolean cib_connect(void *user_data) { static int attempts = 1; static int max_retry = 20; gboolean was_err = FALSE; static cib_t *local_conn = NULL; if (local_conn == NULL) { local_conn = cib_new(); } if (was_err == FALSE) { int rc = -ENOTCONN; if (attempts < max_retry) { crm_debug("CIB signon attempt %d", attempts); rc = local_conn->cmds->signon(local_conn, T_ATTRD, cib_command); } if (rc != pcmk_ok && attempts > max_retry) { crm_err("Signon to CIB failed: %s", pcmk_strerror(rc)); was_err = TRUE; } else if (rc != pcmk_ok) { attempts++; return TRUE; } } crm_info("Connected to the CIB after %d signon attempts", attempts); if (was_err == FALSE) { int rc = local_conn->cmds->set_connection_dnotify(local_conn, attrd_cib_connection_destroy); if (rc != pcmk_ok) { crm_err("Could not set dnotify callback"); was_err = TRUE; } } if (was_err == FALSE) { if (pcmk_ok != local_conn->cmds->add_notify_callback(local_conn, T_CIB_REPLACE_NOTIFY, do_cib_replaced)) { crm_err("Could not set CIB notification callback"); was_err = TRUE; } } if (was_err) { crm_err("Aborting startup"); crm_exit(100); } cib_conn = local_conn; crm_info("Sending full refresh now that we're connected to the cib"); g_hash_table_foreach(attr_hash, local_update_for_hash_entry, NULL); return FALSE; } int main(int argc, char **argv) { int flag = 0; int argerr = 0; crm_cluster_t cluster; gboolean was_err = FALSE; qb_ipcs_connection_t *c = NULL; qb_ipcs_service_t *ipcs = NULL; crm_log_init(T_ATTRD, LOG_NOTICE, TRUE, FALSE, argc, argv, FALSE); mainloop_add_signal(SIGTERM, attrd_shutdown); while ((flag = getopt(argc, argv, OPTARGS)) != EOF) { switch (flag) { case 'V': crm_bump_log_level(argc, argv); break; case 'h': /* Help message */ usage(T_ATTRD, EX_OK); break; default: ++argerr; break; } } if (optind > argc) { ++argerr; } if (argerr) { usage(T_ATTRD, EX_USAGE); } attr_hash = g_hash_table_new_full(crm_str_hash, g_str_equal, NULL, free_hash_entry); crm_info("Starting up"); if (was_err == FALSE) { #if SUPPORT_COROSYNC if (is_openais_cluster()) { cluster.destroy = attrd_ais_destroy; cluster.cs_dispatch = attrd_ais_dispatch; } #endif #if SUPPORT_HEARTBEAT if (is_heartbeat_cluster()) { cluster.hb_conn = NULL; cluster.hb_dispatch = attrd_ha_callback; cluster.destroy = attrd_ha_connection_destroy; } #endif if (FALSE == crm_cluster_connect(&cluster)) { crm_err("HA Signon failed"); was_err = TRUE; } attrd_uname = cluster.uname; attrd_uuid = cluster.uuid; #if SUPPORT_HEARTBEAT attrd_cluster_conn = cluster.hb_conn; #endif } crm_info("Cluster connection active"); if (was_err == FALSE) { ipcs = mainloop_add_ipc_server(T_ATTRD, QB_IPC_NATIVE, &ipc_callbacks); if (ipcs == NULL) { crm_err("Failed to create IPC server: shutting down and inhibiting respawn"); crm_exit(100); } } crm_info("Accepting attribute updates"); mainloop = g_main_new(FALSE); if (0 == g_timeout_add_full(G_PRIORITY_LOW + 1, 5000, cib_connect, NULL, NULL)) { crm_info("Adding timer failed"); was_err = TRUE; } if (was_err) { crm_err("Aborting startup"); return 100; } crm_notice("Starting mainloop..."); g_main_run(mainloop); crm_notice("Exiting..."); #if SUPPORT_HEARTBEAT if (is_heartbeat_cluster()) { attrd_cluster_conn->llc_ops->signoff(attrd_cluster_conn, TRUE); attrd_cluster_conn->llc_ops->delete(attrd_cluster_conn); } #endif c = qb_ipcs_connection_first_get(ipcs); while(c != NULL) { qb_ipcs_connection_t *last = c; c = qb_ipcs_connection_next_get(ipcs, last); /* There really shouldn't be anyone connected at this point */ crm_notice("Disconnecting client %p, pid=%d...", last, crm_ipcs_client_pid(last)); qb_ipcs_disconnect(last); qb_ipcs_connection_unref(last); } qb_ipcs_destroy(ipcs); if (cib_conn) { cib_conn->cmds->signoff(cib_conn); cib_delete(cib_conn); } g_hash_table_destroy(attr_hash); free(attrd_uuid); empty_uuid_cache(); return crm_exit(0); } struct attrd_callback_s { char *attr; char *value; }; static void attrd_cib_callback(xmlNode * msg, int call_id, int rc, xmlNode * output, void *user_data) { attr_hash_entry_t *hash_entry = NULL; struct attrd_callback_s *data = user_data; if (data->value == NULL && rc == -ENXIO) { rc = pcmk_ok; } else if(call_id < 0) { crm_warn("Update %s=%s failed: %s", data->attr, data->value, pcmk_strerror(call_id)); goto cleanup; } switch (rc) { case pcmk_ok: crm_debug("Update %d for %s=%s passed", call_id, data->attr, data->value); hash_entry = g_hash_table_lookup(attr_hash, data->attr); if (hash_entry) { free(hash_entry->stored_value); hash_entry->stored_value = NULL; if (data->value != NULL) { hash_entry->stored_value = strdup(data->value); } } break; case -pcmk_err_diff_failed: /* When an attr changes while the CIB is syncing */ case -ETIME: /* When an attr changes while there is a DC election */ case -ENXIO: /* When an attr changes while the CIB is syncing a * newer config from a node that just came up */ crm_warn("Update %d for %s=%s failed: %s", call_id, data->attr, data->value, pcmk_strerror(rc)); break; default: crm_err("Update %d for %s=%s failed: %s", call_id, data->attr, data->value, pcmk_strerror(rc)); } cleanup: free(data->value); free(data->attr); free(data); } void attrd_perform_update(attr_hash_entry_t * hash_entry) { int rc = pcmk_ok; struct attrd_callback_s *data = NULL; const char *user_name = NULL; if (hash_entry == NULL) { return; } else if (cib_conn == NULL) { crm_info("Delaying operation %s=%s: cib not connected", hash_entry->id, crm_str(hash_entry->value)); return; } #if ENABLE_ACL if (hash_entry->user) { user_name = hash_entry->user; crm_trace("Performing request from user '%s'", hash_entry->user); } #endif if (hash_entry->value == NULL) { /* delete the attr */ rc = delete_attr_delegate(cib_conn, cib_none, hash_entry->section, attrd_uuid, NULL, hash_entry->set, hash_entry->uuid, hash_entry->id, NULL, FALSE, user_name); if (rc >= 0 && hash_entry->stored_value) { crm_notice("Sent delete %d: node=%s, attr=%s, id=%s, set=%s, section=%s", rc, attrd_uuid, hash_entry->id, hash_entry->uuid ? hash_entry->uuid : "", hash_entry->set, hash_entry->section); } else if (rc < 0 && rc != -ENXIO) { crm_notice ("Delete operation failed: node=%s, attr=%s, id=%s, set=%s, section=%s: %s (%d)", attrd_uuid, hash_entry->id, hash_entry->uuid ? hash_entry->uuid : "", hash_entry->set, hash_entry->section, pcmk_strerror(rc), rc); } else { crm_trace("Sent delete %d: node=%s, attr=%s, id=%s, set=%s, section=%s", rc, attrd_uuid, hash_entry->id, hash_entry->uuid ? hash_entry->uuid : "", hash_entry->set, hash_entry->section); } } else { /* send update */ rc = update_attr_delegate(cib_conn, cib_none, hash_entry->section, attrd_uuid, NULL, hash_entry->set, hash_entry->uuid, hash_entry->id, hash_entry->value, FALSE, user_name); if (rc < 0) { crm_notice("Sent update %s=%s failed: %s", hash_entry->id, hash_entry->value, pcmk_strerror(rc)); } if (safe_str_neq(hash_entry->value, hash_entry->stored_value) || rc < 0) { crm_notice("Sent update %d: %s=%s", rc, hash_entry->id, hash_entry->value); } else { crm_trace("Sent update %d: %s=%s", rc, hash_entry->id, hash_entry->value); } } data = calloc(1, sizeof(struct attrd_callback_s)); data->attr = strdup(hash_entry->id); if (hash_entry->value != NULL) { data->value = strdup(hash_entry->value); } cib_conn->cmds->register_callback( cib_conn, rc, 120, FALSE, data, "attrd_cib_callback", attrd_cib_callback); return; } void attrd_local_callback(xmlNode * msg) { static int plus_plus_len = 5; attr_hash_entry_t *hash_entry = NULL; const char *from = crm_element_value(msg, F_ORIG); const char *op = crm_element_value(msg, F_ATTRD_TASK); const char *attr = crm_element_value(msg, F_ATTRD_ATTRIBUTE); const char *value = crm_element_value(msg, F_ATTRD_VALUE); const char *host = crm_element_value(msg, F_ATTRD_HOST); if (safe_str_eq(op, "refresh")) { crm_notice("Sending full refresh (origin=%s)", from); g_hash_table_foreach(attr_hash, update_for_hash_entry, NULL); return; } if (host != NULL && safe_str_neq(host, attrd_uname)) { send_cluster_message(crm_get_peer(0, host), crm_msg_attrd, msg, FALSE); return; } crm_debug("%s message from %s: %s=%s", op, from, attr, crm_str(value)); hash_entry = find_hash_entry(msg); if (hash_entry == NULL) { return; } if (hash_entry->uuid == NULL) { const char *key = crm_element_value(msg, F_ATTRD_KEY); if (key) { hash_entry->uuid = strdup(key); } } crm_debug("Supplied: %s, Current: %s, Stored: %s", value, hash_entry->value, hash_entry->stored_value); if (safe_str_eq(value, hash_entry->value) && safe_str_eq(value, hash_entry->stored_value)) { crm_trace("Ignoring non-change"); return; } else if (value) { int offset = 1; int int_value = 0; int value_len = strlen(value); if (value_len < (plus_plus_len + 2) || value[plus_plus_len] != '+' || (value[plus_plus_len + 1] != '+' && value[plus_plus_len + 1] != '=')) { goto set_unexpanded; } int_value = char2score(hash_entry->value); if (value[plus_plus_len + 1] != '+') { const char *offset_s = value + (plus_plus_len + 2); offset = char2score(offset_s); } int_value += offset; if (int_value > INFINITY) { int_value = INFINITY; } crm_info("Expanded %s=%s to %d", attr, value, int_value); crm_xml_add_int(msg, F_ATTRD_VALUE, int_value); value = crm_element_value(msg, F_ATTRD_VALUE); } set_unexpanded: if (safe_str_eq(value, hash_entry->value) && hash_entry->timer_id) { /* We're already waiting to set this value */ return; } free(hash_entry->value); hash_entry->value = NULL; if (value != NULL) { hash_entry->value = strdup(value); crm_debug("New value of %s is %s", attr, value); } stop_attrd_timer(hash_entry); if (hash_entry->timeout > 0) { hash_entry->timer_id = g_timeout_add(hash_entry->timeout, attrd_timer_callback, hash_entry); } else { attrd_trigger_update(hash_entry); } return; } gboolean attrd_timer_callback(void *user_data) { stop_attrd_timer(user_data); attrd_trigger_update(user_data); return TRUE; /* Always return true, removed cleanly by stop_attrd_timer() */ } gboolean attrd_trigger_update(attr_hash_entry_t * hash_entry) { xmlNode *msg = NULL; /* send HA message to everyone */ crm_notice("Sending flush op to all hosts for: %s (%s)", hash_entry->id, crm_str(hash_entry->value)); log_hash_entry(LOG_DEBUG_2, hash_entry, "Sending flush op to all hosts for:"); msg = create_xml_node(NULL, __FUNCTION__); crm_xml_add(msg, F_TYPE, T_ATTRD); crm_xml_add(msg, F_ORIG, attrd_uname); crm_xml_add(msg, F_ATTRD_TASK, "flush"); crm_xml_add(msg, F_ATTRD_ATTRIBUTE, hash_entry->id); crm_xml_add(msg, F_ATTRD_SET, hash_entry->set); crm_xml_add(msg, F_ATTRD_SECTION, hash_entry->section); crm_xml_add(msg, F_ATTRD_DAMPEN, hash_entry->dampen); crm_xml_add(msg, F_ATTRD_VALUE, hash_entry->value); #if ENABLE_ACL if (hash_entry->user) { crm_xml_add(msg, F_ATTRD_USER, hash_entry->user); } #endif if (hash_entry->timeout <= 0) { crm_xml_add(msg, F_ATTRD_IGNORE_LOCALLY, hash_entry->value); attrd_perform_update(hash_entry); } send_cluster_message(NULL, crm_msg_attrd, msg, FALSE); free_xml(msg); return TRUE; }