diff --git a/cib/callbacks.c b/cib/callbacks.c index 39c99ba302..7fa7c2d759 100644 --- a/cib/callbacks.c +++ b/cib/callbacks.c @@ -1,1477 +1,1477 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "common.h" extern GMainLoop *mainloop; extern gboolean cib_shutdown_flag; extern gboolean stand_alone; extern const char *cib_root; static unsigned long cib_local_bcast_num = 0; typedef struct cib_local_notify_s { xmlNode *notify_src; char *client_id; gboolean from_peer; gboolean sync_reply; } cib_local_notify_t; qb_ipcs_service_t *ipcs_ro = NULL; qb_ipcs_service_t *ipcs_rw = NULL; qb_ipcs_service_t *ipcs_shm = NULL; #if SUPPORT_HEARTBEAT extern ll_cluster_t *hb_conn; #endif extern int cib_update_counter(xmlNode * xml_obj, const char *field, gboolean reset); extern void GHFunc_count_peers(gpointer key, gpointer value, gpointer user_data); gint cib_GCompareFunc(gconstpointer a, gconstpointer b); gboolean can_write(int flags); void send_cib_replace(const xmlNode * sync_request, const char *host); void cib_process_request(xmlNode * request, gboolean privileged, gboolean force_synchronous, gboolean from_peer, cib_client_t * cib_client); extern GHashTable *client_list; extern GHashTable *local_notify_queue; int next_client_id = 0; extern const char *cib_our_uname; extern unsigned long cib_num_ops, cib_num_local, cib_num_updates, cib_num_fail; extern unsigned long cib_bad_connects, cib_num_timeouts; extern int cib_status; int cib_process_command(xmlNode * request, xmlNode ** reply, xmlNode ** cib_diff, gboolean privileged); gboolean cib_common_callback(qb_ipcs_connection_t *c, void *data, size_t size, gboolean privileged); static int32_t cib_ipc_accept(qb_ipcs_connection_t *c, uid_t uid, gid_t gid) { cib_client_t *new_client = NULL; #if ENABLE_ACL struct group *crm_grp = NULL; #endif crm_trace("Connecting %p for uid=%d gid=%d pid=%d", c, uid, gid, crm_ipcs_client_pid(c)); if (cib_shutdown_flag) { crm_info("Ignoring new client [%d] during shutdown", crm_ipcs_client_pid(c)); return -EPERM; } new_client = calloc(1, sizeof(cib_client_t)); new_client->ipc = c; CRM_CHECK(new_client->id == NULL, free(new_client->id)); new_client->id = crm_generate_uuid(); #if ENABLE_ACL crm_grp = getgrnam(CRM_DAEMON_GROUP); if (crm_grp) { qb_ipcs_connection_auth_set(c, -1, crm_grp->gr_gid, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); } new_client->user = uid2username(uid); #endif /* make sure we can find ourselves later for sync calls * redirected to the master instance */ g_hash_table_insert(client_list, new_client->id, new_client); qb_ipcs_context_set(c, new_client); return 0; } static void cib_ipc_created(qb_ipcs_connection_t *c) { cib_client_t *cib_client = qb_ipcs_context_get(c); crm_trace("%p connected for client %s", c, cib_client->id); } static int32_t cib_ipc_dispatch_rw(qb_ipcs_connection_t *c, void *data, size_t size) { cib_client_t *cib_client = qb_ipcs_context_get(c); crm_trace("%p message from %s", c, cib_client->id); return cib_common_callback(c, data, size, TRUE); } static int32_t cib_ipc_dispatch_ro(qb_ipcs_connection_t *c, void *data, size_t size) { cib_client_t *cib_client = qb_ipcs_context_get(c); crm_trace("%p message from %s", c, cib_client->id); return cib_common_callback(c, data, size, FALSE); } /* Error code means? */ static int32_t cib_ipc_closed(qb_ipcs_connection_t *c) { cib_client_t *cib_client = qb_ipcs_context_get(c); crm_trace("Connection %p closed", c); CRM_ASSERT(cib_client != NULL); CRM_ASSERT(cib_client->id != NULL); if (!g_hash_table_remove(client_list, cib_client->id)) { crm_err("Client %s not found in the hashtable", cib_client->name); } return 0; } static void cib_ipc_destroy(qb_ipcs_connection_t *c) { cib_client_t *cib_client = qb_ipcs_context_get(c); CRM_ASSERT(cib_client != NULL); CRM_ASSERT(cib_client->id != NULL); /* In case we arrive here without a call to cib_ipc_close() */ g_hash_table_remove(client_list, cib_client->id); crm_trace("Destroying %s (%p)", cib_client->name, c); free(cib_client->name); free(cib_client->callback_id); free(cib_client->id); free(cib_client->user); free(cib_client); crm_trace("Freed the cib client"); if (cib_shutdown_flag) { cib_shutdown(0); } } struct qb_ipcs_service_handlers ipc_ro_callbacks = { .connection_accept = cib_ipc_accept, .connection_created = cib_ipc_created, .msg_process = cib_ipc_dispatch_ro, .connection_closed = cib_ipc_closed, .connection_destroyed = cib_ipc_destroy }; struct qb_ipcs_service_handlers ipc_rw_callbacks = { .connection_accept = cib_ipc_accept, .connection_created = cib_ipc_created, .msg_process = cib_ipc_dispatch_rw, .connection_closed = cib_ipc_closed, .connection_destroyed = cib_ipc_destroy }; void cib_common_callback_worker(uint32_t id, uint32_t flags, xmlNode * op_request, cib_client_t * cib_client, gboolean privileged) { const char *op = crm_element_value(op_request, F_CIB_OPERATION); if (crm_str_eq(op, CRM_OP_REGISTER, TRUE)) { if(flags & crm_ipc_client_response) { xmlNode *ack = create_xml_node(NULL, __FUNCTION__); crm_xml_add(ack, F_CIB_OPERATION, CRM_OP_REGISTER); crm_xml_add(ack, F_CIB_CLIENTID, cib_client->id); crm_ipcs_send(cib_client->ipc, id, ack, FALSE); cib_client->request_id = 0; free_xml(ack); } return; } else if (crm_str_eq(op, T_CIB_NOTIFY, TRUE)) { /* Update the notify filters for this client */ int on_off = 0; int rc = pcmk_ok; const char *type = crm_element_value(op_request, F_CIB_NOTIFY_TYPE); crm_element_value_int(op_request, F_CIB_NOTIFY_ACTIVATE, &on_off); crm_debug("Setting %s callbacks for %s (%s): %s", type, cib_client->name, cib_client->id, on_off ? "on" : "off"); if (safe_str_eq(type, T_CIB_POST_NOTIFY)) { cib_client->post_notify = on_off; } else if (safe_str_eq(type, T_CIB_PRE_NOTIFY)) { cib_client->pre_notify = on_off; } else if (safe_str_eq(type, T_CIB_UPDATE_CONFIRM)) { cib_client->confirmations = on_off; } else if (safe_str_eq(type, T_CIB_DIFF_NOTIFY)) { cib_client->diffs = on_off; } else if (safe_str_eq(type, T_CIB_REPLACE_NOTIFY)) { cib_client->replace = on_off; } else { rc = -ENXIO; } if(flags & crm_ipc_client_response) { /* TODO - include rc */ crm_ipcs_send_ack(cib_client->ipc, id, "ack", __FUNCTION__, __LINE__); cib_client->request_id = 0; } return; } cib_client->num_calls++; cib_process_request(op_request, FALSE, privileged, FALSE, cib_client); } int32_t cib_common_callback(qb_ipcs_connection_t *c, void *data, size_t size, gboolean privileged) { uint32_t id = 0; uint32_t flags = 0; int call_options = 0; xmlNode *op_request = crm_ipcs_recv(c, data, size, &id, &flags); cib_client_t *cib_client = qb_ipcs_context_get(c); if(op_request) { crm_element_value_int(op_request, F_CIB_CALLOPTS, &call_options); } crm_trace("Inbound: %.200s", data); if (op_request == NULL || cib_client == NULL) { crm_ipcs_send_ack(c, id, "nack", __FUNCTION__, __LINE__); return 0; } if(is_set(call_options, cib_sync_call)) { CRM_ASSERT(flags & crm_ipc_client_response); } if(flags & crm_ipc_client_response) { CRM_LOG_ASSERT(cib_client->request_id == 0); /* This means the client has two synchronous events in-flight */ cib_client->request_id = id; /* Reply only to the last one */ } if (cib_client->name == NULL) { const char *value = crm_element_value(op_request, F_CIB_CLIENTNAME); if (value == NULL) { cib_client->name = crm_itoa(crm_ipcs_client_pid(c)); } else { cib_client->name = strdup(value); } } if (cib_client->callback_id == NULL) { const char *value = crm_element_value(op_request, F_CIB_CALLBACK_TOKEN); if (value != NULL) { cib_client->callback_id = strdup(value); } else { cib_client->callback_id = strdup(cib_client->id); } } crm_xml_add(op_request, F_CIB_CLIENTID, cib_client->id); crm_xml_add(op_request, F_CIB_CLIENTNAME, cib_client->name); #if ENABLE_ACL determine_request_user(cib_client->user, op_request, F_CIB_USER); #endif crm_log_xml_trace(op_request, "Client[inbound]"); cib_common_callback_worker(id, flags, op_request, cib_client, privileged); free_xml(op_request); return 0; } static void do_local_notify(xmlNode * notify_src, const char *client_id, gboolean sync_reply, gboolean from_peer) { /* send callback to originating child */ cib_client_t *client_obj = NULL; int local_rc = pcmk_ok; if (client_id != NULL) { client_obj = g_hash_table_lookup(client_list, client_id); } else { crm_trace("No client to sent the response to. F_CIB_CLIENTID not set."); } if (client_obj == NULL) { local_rc = -ECONNRESET; } else { int rid = 0; if(sync_reply) { CRM_LOG_ASSERT(client_obj->request_id); rid = client_obj->request_id; client_obj->request_id = 0; crm_trace("Sending response %d to %s %s", rid, client_obj->name, from_peer?"(originator of delegated request)":""); } else { crm_trace("Sending an event to %s %s", client_obj->name, from_peer?"(originator of delegated request)":""); } if (client_obj->ipc && crm_ipcs_send(client_obj->ipc, rid, notify_src, !sync_reply) < 0) { local_rc = -ENOMSG; #ifdef HAVE_GNUTLS_GNUTLS_H } else if (client_obj->session) { crm_send_remote_msg(client_obj->session, notify_src, client_obj->encrypted); #endif } else if(client_obj->ipc == NULL) { crm_err("Unknown transport for %s", client_obj->name); } } if (local_rc != pcmk_ok && client_obj != NULL) { crm_warn("%sSync reply to %s failed: %s", sync_reply ? "" : "A-", client_obj ? client_obj->name : "", pcmk_strerror(local_rc)); } } static void local_notify_destroy_callback(gpointer data) { cib_local_notify_t *notify = data; free_xml(notify->notify_src); free(notify->client_id); free(notify); } static void check_local_notify(int bcast_id) { cib_local_notify_t *notify = NULL; if (!local_notify_queue) { return; } notify = g_hash_table_lookup(local_notify_queue, GINT_TO_POINTER(bcast_id)); if (notify) { do_local_notify(notify->notify_src, notify->client_id, notify->sync_reply, notify->from_peer); g_hash_table_remove(local_notify_queue, GINT_TO_POINTER(bcast_id)); } } static void queue_local_notify(xmlNode * notify_src, const char *client_id, gboolean sync_reply, gboolean from_peer) { cib_local_notify_t *notify = calloc(1, sizeof(cib_local_notify_t)); notify->notify_src = notify_src; notify->client_id = strdup(client_id); notify->sync_reply = sync_reply; notify->from_peer = from_peer; if (!local_notify_queue) { local_notify_queue = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, local_notify_destroy_callback); } g_hash_table_insert(local_notify_queue, GINT_TO_POINTER(cib_local_bcast_num), notify); } static void parse_local_options(cib_client_t * cib_client, int call_type, int call_options, const char *host, const char *op, gboolean * local_notify, gboolean * needs_reply, gboolean * process, gboolean * needs_forward) { if (cib_op_modifies(call_type) && !(call_options & cib_inhibit_bcast)) { /* we need to send an update anyway */ *needs_reply = TRUE; } else { *needs_reply = FALSE; } if (host == NULL && (call_options & cib_scope_local)) { crm_trace("Processing locally scoped %s op from %s", op, cib_client->name); *local_notify = TRUE; } else if (host == NULL && cib_is_master) { crm_trace("Processing master %s op locally from %s", op, cib_client->name); *local_notify = TRUE; } else if (safe_str_eq(host, cib_our_uname)) { crm_trace("Processing locally addressed %s op from %s", op, cib_client->name); *local_notify = TRUE; } else if (stand_alone) { *needs_forward = FALSE; *local_notify = TRUE; *process = TRUE; } else { crm_trace("%s op from %s needs to be forwarded to %s", op, cib_client->name, host ? host : "the master instance"); *needs_forward = TRUE; *process = FALSE; } } static gboolean parse_peer_options(int call_type, xmlNode * request, gboolean * local_notify, gboolean * needs_reply, gboolean * process, gboolean * needs_forward) { const char *op = NULL; const char *host = NULL; const char *delegated = NULL; const char *originator = crm_element_value(request, F_ORIG); const char *reply_to = crm_element_value(request, F_CIB_ISREPLY); const char *update = crm_element_value(request, F_CIB_GLOBAL_UPDATE); gboolean is_reply = safe_str_eq(reply_to, cib_our_uname); if (crm_is_true(update)) { *needs_reply = FALSE; if (is_reply) { *local_notify = TRUE; crm_trace("Processing global/peer update from %s" " that originated from us", originator); } else { crm_trace("Processing global/peer update from %s", originator); } return TRUE; } host = crm_element_value(request, F_CIB_HOST); if (host != NULL && safe_str_eq(host, cib_our_uname)) { crm_trace("Processing request sent to us from %s", originator); return TRUE; } else if (host == NULL && cib_is_master == TRUE) { crm_trace("Processing request sent to master instance from %s", originator); return TRUE; } op = crm_element_value(request, F_CIB_OPERATION); if(safe_str_eq(op, "cib_shutdown_req")) { /* Always process these */ *local_notify = FALSE; if(reply_to == NULL || is_reply) { *process = TRUE; } if(is_reply) { *needs_reply = FALSE; } return *process; } if (is_reply) { crm_trace("Forward reply sent from %s to local clients", originator); *process = FALSE; *needs_reply = FALSE; *local_notify = TRUE; return TRUE; } delegated = crm_element_value(request, F_CIB_DELEGATED); if (delegated != NULL) { crm_trace("Ignoring msg for master instance"); } else if (host != NULL) { /* this is for a specific instance and we're not it */ crm_trace("Ignoring msg for instance on %s", crm_str(host)); } else if (reply_to == NULL && cib_is_master == FALSE) { /* this is for the master instance and we're not it */ crm_trace("Ignoring reply to %s", crm_str(reply_to)); } else if (safe_str_eq(op, "cib_shutdown_req")) { if (reply_to != NULL) { crm_debug("Processing %s from %s", op, host); *needs_reply = FALSE; } else { crm_debug("Processing %s reply from %s", op, host); } return TRUE; } else { crm_err("Nothing for us to do?"); crm_log_xml_err(request, "Peer[inbound]"); } return FALSE; } static void forward_request(xmlNode * request, cib_client_t * cib_client, int call_options) { const char *op = crm_element_value(request, F_CIB_OPERATION); const char *host = crm_element_value(request, F_CIB_HOST); crm_xml_add(request, F_CIB_DELEGATED, cib_our_uname); if (host != NULL) { crm_trace("Forwarding %s op to %s", op, host); send_cluster_message(host, crm_msg_cib, request, FALSE); } else { crm_trace("Forwarding %s op to master instance", op); send_cluster_message(NULL, crm_msg_cib, request, FALSE); } /* Return the request to its original state */ xml_remove_prop(request, F_CIB_DELEGATED); if (call_options & cib_discard_reply) { crm_trace("Client not interested in reply"); } } static gboolean send_peer_reply(xmlNode * msg, xmlNode * result_diff, const char *originator, gboolean broadcast) { CRM_ASSERT(msg != NULL); if (broadcast) { /* this (successful) call modified the CIB _and_ the * change needs to be broadcast... * send via HA to other nodes */ int diff_add_updates = 0; int diff_add_epoch = 0; int diff_add_admin_epoch = 0; int diff_del_updates = 0; int diff_del_epoch = 0; int diff_del_admin_epoch = 0; char *digest = NULL; cib_diff_version_details(result_diff, &diff_add_admin_epoch, &diff_add_epoch, &diff_add_updates, &diff_del_admin_epoch, &diff_del_epoch, &diff_del_updates); crm_trace("Sending update diff %d.%d.%d -> %d.%d.%d", diff_del_admin_epoch, diff_del_epoch, diff_del_updates, diff_add_admin_epoch, diff_add_epoch, diff_add_updates); crm_xml_add(msg, F_CIB_ISREPLY, originator); crm_xml_add(msg, F_CIB_GLOBAL_UPDATE, XML_BOOLEAN_TRUE); crm_xml_add(msg, F_CIB_OPERATION, CIB_OP_APPLY_DIFF); /* Its safe to always use the latest version since the election * ensures the software on this node is the oldest node in the cluster */ digest = calculate_xml_versioned_digest(the_cib, FALSE, TRUE, CRM_FEATURE_SET); crm_xml_add(result_diff, XML_ATTR_DIGEST, digest); crm_log_xml_trace(the_cib, digest); free(digest); add_message_xml(msg, F_CIB_UPDATE_DIFF, result_diff); crm_log_xml_trace(msg, "copy"); return send_cluster_message(NULL, crm_msg_cib, msg, TRUE); } else if (originator != NULL) { /* send reply via HA to originating node */ crm_trace("Sending request result to originator only"); crm_xml_add(msg, F_CIB_ISREPLY, originator); return send_cluster_message(originator, crm_msg_cib, msg, FALSE); } return FALSE; } void cib_process_request(xmlNode * request, gboolean force_synchronous, gboolean privileged, gboolean from_peer, cib_client_t * cib_client) { int call_type = 0; int call_options = 0; gboolean process = TRUE; gboolean is_update = TRUE; gboolean needs_reply = TRUE; gboolean local_notify = FALSE; gboolean needs_forward = FALSE; gboolean global_update = crm_is_true(crm_element_value(request, F_CIB_GLOBAL_UPDATE)); xmlNode *op_reply = NULL; xmlNode *result_diff = NULL; int rc = pcmk_ok; const char *op = crm_element_value(request, F_CIB_OPERATION); const char *originator = crm_element_value(request, F_ORIG); const char *host = crm_element_value(request, F_CIB_HOST); const char *client_id = crm_element_value(request, F_CIB_CLIENTID); crm_trace("%s Processing msg %s", cib_our_uname, crm_element_value(request, F_SEQ)); cib_num_ops++; if (cib_num_ops == 0) { cib_num_fail = 0; cib_num_local = 0; cib_num_updates = 0; crm_info("Stats wrapped around"); } if (host != NULL && strlen(host) == 0) { host = NULL; } crm_element_value_int(request, F_CIB_CALLOPTS, &call_options); if (force_synchronous) { call_options |= cib_sync_call; } crm_trace("Processing %s message (%s) for %s...", from_peer ? "peer" : "local", from_peer ? originator : cib_our_uname, host ? host : "master"); rc = cib_get_operation_id(op, &call_type); if (rc != pcmk_ok) { /* TODO: construct error reply? */ crm_err("Pre-processing of command failed: %s", pcmk_strerror(rc)); return; } is_update = cib_op_modifies(call_type); if (is_update) { cib_num_updates++; } if (from_peer == FALSE) { parse_local_options(cib_client, call_type, call_options, host, op, &local_notify, &needs_reply, &process, &needs_forward); } else if (parse_peer_options(call_type, request, &local_notify, &needs_reply, &process, &needs_forward) == FALSE) { return; } crm_trace("Finished determining processing actions"); if (call_options & cib_discard_reply) { needs_reply = is_update; local_notify = FALSE; } if (needs_forward) { forward_request(request, cib_client, call_options); return; } if (cib_status != pcmk_ok) { rc = cib_status; crm_err("Operation ignored, cluster configuration is invalid." " Please repair and restart: %s", pcmk_strerror(cib_status)); op_reply = cib_construct_reply(request, the_cib, cib_status); } else if (process) { int level = LOG_INFO; const char *section = crm_element_value(request, F_CIB_SECTION); cib_num_local++; rc = cib_process_command(request, &op_reply, &result_diff, privileged); if (global_update) { switch (rc) { case pcmk_ok: case -pcmk_err_old_data: case -pcmk_err_diff_resync: case -pcmk_err_diff_failed: level = LOG_DEBUG_2; break; default: level = LOG_ERR; } } else if (safe_str_eq(op, CIB_OP_QUERY)) { level = LOG_DEBUG_2; } else if (rc != pcmk_ok) { cib_num_fail++; level = LOG_WARNING; } else if (safe_str_eq(op, CIB_OP_SLAVE)) { level = LOG_DEBUG_2; } else if (safe_str_eq(section, XML_CIB_TAG_STATUS)) { level = LOG_DEBUG_2; } do_crm_log_unlikely(level, "Operation complete: op %s for section %s (origin=%s/%s/%s, version=%s.%s.%s): %s (rc=%d)", op, section ? section : "'all'", originator ? originator : "local", crm_element_value(request, F_CIB_CLIENTNAME), crm_element_value(request, F_CIB_CALLID), the_cib ? crm_element_value(the_cib, XML_ATTR_GENERATION_ADMIN) : "0", the_cib ? crm_element_value(the_cib, XML_ATTR_GENERATION) : "0", the_cib ? crm_element_value(the_cib, XML_ATTR_NUMUPDATES) : "0", pcmk_strerror(rc), rc); if (op_reply == NULL && (needs_reply || local_notify)) { crm_err("Unexpected NULL reply to message"); crm_log_xml_err(request, "null reply"); needs_reply = FALSE; local_notify = FALSE; } } crm_trace("processing response cases %.16x %.16x", call_options, cib_sync_call); /* from now on we are the server */ if (needs_reply == FALSE || stand_alone) { /* nothing more to do... * this was a non-originating slave update */ crm_trace("Completed slave update"); } else if (rc == pcmk_ok && result_diff != NULL && !(call_options & cib_inhibit_bcast)) { gboolean broadcast = FALSE; cib_local_bcast_num++; crm_xml_add_int(request, F_CIB_LOCAL_NOTIFY_ID, cib_local_bcast_num); broadcast = send_peer_reply(request, result_diff, originator, TRUE); if (broadcast && client_id && local_notify && op_reply) { /* If we have been asked to sync the reply, * and a bcast msg has gone out, we queue the local notify * until we know the bcast message has been received */ local_notify = FALSE; queue_local_notify(op_reply, client_id, (call_options & cib_sync_call), from_peer); op_reply = NULL; /* the reply is queued, so don't free here */ } } else if (call_options & cib_discard_reply) { crm_trace("Caller isn't interested in reply"); } else if (from_peer) { if (is_update == FALSE || result_diff == NULL) { crm_trace("Request not broadcast: R/O call"); } else if (call_options & cib_inhibit_bcast) { crm_trace("Request not broadcast: inhibited"); } else if (rc != pcmk_ok) { crm_trace("Request not broadcast: call failed: %s", pcmk_strerror(rc)); } else { crm_trace("Directing reply to %s", originator); } send_peer_reply(op_reply, result_diff, originator, FALSE); } if (local_notify && client_id) { if (process == FALSE) { do_local_notify(request, client_id, call_options & cib_sync_call, from_peer); } else { do_local_notify(op_reply, client_id, call_options & cib_sync_call, from_peer); } } free_xml(op_reply); free_xml(result_diff); return; } xmlNode * cib_construct_reply(xmlNode * request, xmlNode * output, int rc) { int lpc = 0; xmlNode *reply = NULL; const char *name = NULL; const char *value = NULL; const char *names[] = { F_CIB_OPERATION, F_CIB_CALLID, F_CIB_CLIENTID, F_CIB_CALLOPTS }; static int max = DIMOF(names); crm_trace("Creating a basic reply"); reply = create_xml_node(NULL, "cib-reply"); crm_xml_add(reply, F_TYPE, T_CIB); for (lpc = 0; lpc < max; lpc++) { name = names[lpc]; value = crm_element_value(request, name); crm_xml_add(reply, name, value); } crm_xml_add_int(reply, F_CIB_RC, rc); if (output != NULL) { crm_trace("Attaching reply output"); add_message_xml(reply, F_CIB_CALLDATA, output); } return reply; } int cib_process_command(xmlNode * request, xmlNode ** reply, xmlNode ** cib_diff, gboolean privileged) { xmlNode *input = NULL; xmlNode *output = NULL; xmlNode *result_cib = NULL; xmlNode *current_cib = NULL; #if ENABLE_ACL xmlNode *filtered_current_cib = NULL; #endif int call_type = 0; int call_options = 0; int log_level = LOG_DEBUG_4; const char *op = NULL; const char *section = NULL; int rc = pcmk_ok; int rc2 = pcmk_ok; gboolean send_r_notify = FALSE; gboolean global_update = FALSE; gboolean config_changed = FALSE; gboolean manage_counters = TRUE; CRM_ASSERT(cib_status == pcmk_ok); *reply = NULL; *cib_diff = NULL; current_cib = the_cib; /* Start processing the request... */ op = crm_element_value(request, F_CIB_OPERATION); crm_element_value_int(request, F_CIB_CALLOPTS, &call_options); rc = cib_get_operation_id(op, &call_type); if (rc == pcmk_ok && privileged == FALSE) { rc = cib_op_can_run(call_type, call_options, privileged, global_update); } rc2 = cib_op_prepare(call_type, request, &input, §ion); if (rc == pcmk_ok) { rc = rc2; } if (rc != pcmk_ok) { crm_trace("Call setup failed: %s", pcmk_strerror(rc)); goto done; } else if (cib_op_modifies(call_type) == FALSE) { #if ENABLE_ACL if (acl_enabled(config_hash) == FALSE || acl_filter_cib(request, current_cib, current_cib, &filtered_current_cib) == FALSE) { rc = cib_perform_op(op, call_options, cib_op_func(call_type), TRUE, section, request, input, FALSE, &config_changed, current_cib, &result_cib, NULL, &output); } else if (filtered_current_cib == NULL) { crm_debug("Pre-filtered the entire cib"); rc = -EACCES; } else { crm_debug("Pre-filtered the queried cib according to the ACLs"); rc = cib_perform_op(op, call_options, cib_op_func(call_type), TRUE, section, request, input, FALSE, &config_changed, filtered_current_cib, &result_cib, NULL, &output); } #else rc = cib_perform_op(op, call_options, cib_op_func(call_type), TRUE, section, request, input, FALSE, &config_changed, current_cib, &result_cib, NULL, &output); #endif CRM_CHECK(result_cib == NULL, free_xml(result_cib)); goto done; } /* Handle a valid write action */ global_update = crm_is_true(crm_element_value(request, F_CIB_GLOBAL_UPDATE)); if (global_update) { manage_counters = FALSE; call_options |= cib_force_diff; CRM_CHECK(call_type == 3 || call_type == 4, crm_err("Call type: %d", call_type); crm_log_xml_err(request, "bad op")); } #ifdef SUPPORT_PRENOTIFY if ((call_options & cib_inhibit_notify) == 0) { cib_pre_notify(call_options, op, the_cib, input); } #endif if (rc == pcmk_ok) { if (call_options & cib_inhibit_bcast) { /* skip */ crm_trace("Skipping update: inhibit broadcast"); manage_counters = FALSE; } rc = cib_perform_op(op, call_options, cib_op_func(call_type), FALSE, section, request, input, manage_counters, &config_changed, current_cib, &result_cib, cib_diff, &output); #if ENABLE_ACL if (acl_enabled(config_hash) == TRUE && acl_check_diff(request, current_cib, result_cib, *cib_diff) == FALSE) { rc = -EACCES; } #endif if (rc == pcmk_ok && config_changed) { time_t now; char *now_str = NULL; const char *validation = crm_element_value(result_cib, XML_ATTR_VALIDATION); if (validation) { int current_version = get_schema_version(validation); int support_version = get_schema_version("pacemaker-1.1"); /* Once the later schemas support the "update-*" attributes, change "==" to ">=" -- Changed */ if (current_version >= support_version) { const char *origin = crm_element_value(request, F_ORIG); crm_xml_replace(result_cib, XML_ATTR_UPDATE_ORIG, origin ? origin : cib_our_uname); crm_xml_replace(result_cib, XML_ATTR_UPDATE_CLIENT, crm_element_value(request, F_CIB_CLIENTNAME)); #if ENABLE_ACL crm_xml_replace(result_cib, XML_ATTR_UPDATE_USER, crm_element_value(request, F_CIB_USER)); #endif } } now = time(NULL); now_str = ctime(&now); now_str[24] = EOS; /* replace the newline */ crm_xml_replace(result_cib, XML_CIB_ATTR_WRITTEN, now_str); } if (manage_counters == FALSE) { config_changed = cib_config_changed(current_cib, result_cib, cib_diff); } /* Always write to disk for replace ops, * this negates the need to detect ordering changes */ if (config_changed == FALSE && crm_str_eq(CIB_OP_REPLACE, op, TRUE)) { config_changed = TRUE; } } if (rc == pcmk_ok && (call_options & cib_dryrun) == 0) { rc = activateCibXml(result_cib, config_changed, op); if (rc == pcmk_ok && cib_internal_config_changed(*cib_diff)) { cib_read_config(config_hash, result_cib); } if (crm_str_eq(CIB_OP_REPLACE, op, TRUE)) { if (section == NULL) { send_r_notify = TRUE; } else if (safe_str_eq(section, XML_TAG_CIB)) { send_r_notify = TRUE; } else if (safe_str_eq(section, XML_CIB_TAG_NODES)) { send_r_notify = TRUE; } else if (safe_str_eq(section, XML_CIB_TAG_STATUS)) { send_r_notify = TRUE; } } else if (crm_str_eq(CIB_OP_ERASE, op, TRUE)) { send_r_notify = TRUE; } } else if (rc == -pcmk_err_dtd_validation) { if (output != NULL) { crm_log_xml_info(output, "cib:output"); free_xml(output); } #if ENABLE_ACL { xmlNode *filtered_result_cib = NULL; if (acl_enabled(config_hash) == FALSE || acl_filter_cib(request, current_cib, result_cib, &filtered_result_cib) == FALSE) { output = result_cib; } else { crm_debug("Filtered the result cib for output according to the ACLs"); output = filtered_result_cib; if (result_cib != NULL) { free_xml(result_cib); } } } #else output = result_cib; #endif } else { free_xml(result_cib); } if ((call_options & cib_inhibit_notify) == 0) { const char *call_id = crm_element_value(request, F_CIB_CALLID); const char *client = crm_element_value(request, F_CIB_CLIENTNAME); #ifdef SUPPORT_POSTNOTIFY cib_post_notify(call_options, op, input, rc, the_cib); #endif cib_diff_notify(call_options, client, call_id, op, input, rc, *cib_diff); } if (send_r_notify) { const char *origin = crm_element_value(request, F_ORIG); cib_replace_notify(origin, the_cib, rc, *cib_diff); } if (rc != pcmk_ok) { log_level = LOG_DEBUG_4; if (rc == -pcmk_err_dtd_validation && global_update) { log_level = LOG_WARNING; crm_log_xml_info(input, "cib:global_update"); } } else if (config_changed) { log_level = LOG_DEBUG_3; if (cib_is_master) { log_level = LOG_NOTICE; } } else if (cib_is_master) { log_level = LOG_DEBUG_2; } log_cib_diff(log_level, *cib_diff, "cib:diff"); done: if ((call_options & cib_discard_reply) == 0) { *reply = cib_construct_reply(request, output, rc); crm_log_xml_trace(*reply, "cib:reply"); } #if ENABLE_ACL if (filtered_current_cib != NULL) { free_xml(filtered_current_cib); } #endif if (call_type >= 0) { cib_op_cleanup(call_type, call_options, &input, &output); } return rc; } gint cib_GCompareFunc(gconstpointer a, gconstpointer b) { const xmlNode *a_msg = a; const xmlNode *b_msg = b; int msg_a_id = 0; int msg_b_id = 0; const char *value = NULL; value = crm_element_value_const(a_msg, F_CIB_CALLID); msg_a_id = crm_parse_int(value, NULL); value = crm_element_value_const(b_msg, F_CIB_CALLID); msg_b_id = crm_parse_int(value, NULL); if (msg_a_id == msg_b_id) { return 0; } else if (msg_a_id < msg_b_id) { return -1; } return 1; } #if SUPPORT_HEARTBEAT void cib_ha_peer_callback(HA_Message * msg, void *private_data) { xmlNode *xml = convert_ha_message(NULL, msg, __FUNCTION__); cib_peer_callback(xml, private_data); free_xml(xml); } #endif void cib_peer_callback(xmlNode * msg, void *private_data) { const char *reason = NULL; const char *originator = crm_element_value(msg, F_ORIG); if (originator == NULL || crm_str_eq(originator, cib_our_uname, TRUE)) { /* message is from ourselves */ int bcast_id = 0; if (!(crm_element_value_int(msg, F_CIB_LOCAL_NOTIFY_ID, &bcast_id))) { check_local_notify(bcast_id); } return; } else if (crm_peer_cache == NULL) { reason = "membership not established"; goto bail; } if (crm_element_value(msg, F_CIB_CLIENTNAME) == NULL) { crm_xml_add(msg, F_CIB_CLIENTNAME, originator); } /* crm_log_xml_trace("Peer[inbound]", msg); */ cib_process_request(msg, FALSE, TRUE, TRUE, NULL); return; bail: if (reason) { const char *seq = crm_element_value(msg, F_SEQ); const char *op = crm_element_value(msg, F_CIB_OPERATION); crm_warn("Discarding %s message (%s) from %s: %s", op, seq, originator, reason); } } #if SUPPORT_HEARTBEAT extern oc_ev_t *cib_ev_token; static void *ccm_library = NULL; int (*ccm_api_callback_done) (void *cookie) = NULL; int (*ccm_api_handle_event) (const oc_ev_t * token) = NULL; void cib_client_status_callback(const char *node, const char *client, const char *status, void *private) { crm_node_t *peer = NULL; if (safe_str_eq(client, CRM_SYSTEM_CIB)) { crm_info("Status update: Client %s/%s now has status [%s]", node, client, status); if (safe_str_eq(status, JOINSTATUS)) { status = ONLINESTATUS; } else if (safe_str_eq(status, LEAVESTATUS)) { status = OFFLINESTATUS; } peer = crm_get_peer(0, node); crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cib, status); } return; } int cib_ccm_dispatch(gpointer user_data) { int rc = 0; oc_ev_t *ccm_token = (oc_ev_t *) user_data; crm_trace("received callback"); if (ccm_api_handle_event == NULL) { ccm_api_handle_event = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_handle_event", 1); } rc = (*ccm_api_handle_event) (ccm_token); if (0 == rc) { return 0; } crm_err("CCM connection appears to have failed: rc=%d.", rc); /* eventually it might be nice to recover and reconnect... but until then... */ crm_err("Exiting to recover from CCM connection failure"); exit(2); return -1; } int current_instance = 0; void cib_ccm_msg_callback(oc_ed_t event, void *cookie, size_t size, const void *data) { gboolean update_id = FALSE; const oc_ev_membership_t *membership = data; CRM_ASSERT(membership != NULL); crm_info("Processing CCM event=%s (id=%d)", ccm_event_name(event), membership->m_instance); if (current_instance > membership->m_instance) { crm_err("Membership instance ID went backwards! %d->%d", current_instance, membership->m_instance); CRM_ASSERT(current_instance <= membership->m_instance); } switch (event) { case OC_EV_MS_NEW_MEMBERSHIP: case OC_EV_MS_INVALID: update_id = TRUE; break; case OC_EV_MS_PRIMARY_RESTORED: update_id = TRUE; break; case OC_EV_MS_NOT_PRIMARY: crm_trace("Ignoring transitional CCM event: %s", ccm_event_name(event)); break; case OC_EV_MS_EVICTED: crm_err("Evicted from CCM: %s", ccm_event_name(event)); break; default: crm_err("Unknown CCM event: %d", event); } if (update_id) { unsigned int lpc = 0; CRM_CHECK(membership != NULL, return); current_instance = membership->m_instance; for (lpc = 0; lpc < membership->m_n_out; lpc++) { crm_update_ccm_node(membership, lpc + membership->m_out_idx, CRM_NODE_LOST, current_instance); } for (lpc = 0; lpc < membership->m_n_member; lpc++) { crm_update_ccm_node(membership, lpc + membership->m_memb_idx, CRM_NODE_ACTIVE, current_instance); } } if (ccm_api_callback_done == NULL) { ccm_api_callback_done = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_callback_done", 1); } (*ccm_api_callback_done) (cookie); return; } #endif gboolean can_write(int flags) { return TRUE; } static gboolean cib_force_exit(gpointer data) { crm_notice("Forcing exit!"); terminate_cib(__FUNCTION__, TRUE); return FALSE; } static void disconnect_remote_client(gpointer key, gpointer value, gpointer user_data) { cib_client_t *a_client = value; crm_err("Disconnecting %s... Not implemented", crm_str(a_client->name)); } void cib_shutdown(int nsig) { struct qb_ipcs_stats srv_stats; if (cib_shutdown_flag == FALSE) { int disconnects = 0; qb_ipcs_connection_t *c = NULL; cib_shutdown_flag = TRUE; c = qb_ipcs_connection_first_get(ipcs_rw); while(c != NULL) { qb_ipcs_connection_t *last = c; c = qb_ipcs_connection_next_get(ipcs_rw, last); crm_debug("Disconnecting r/w client %p...", last); qb_ipcs_disconnect(last); qb_ipcs_connection_unref(last); disconnects++; } c = qb_ipcs_connection_first_get(ipcs_ro); while(c != NULL) { qb_ipcs_connection_t *last = c; c = qb_ipcs_connection_next_get(ipcs_ro, last); crm_debug("Disconnecting r/o client %p...", last); qb_ipcs_disconnect(last); qb_ipcs_connection_unref(last); disconnects++; } c = qb_ipcs_connection_first_get(ipcs_shm); while(c != NULL) { qb_ipcs_connection_t *last = c; c = qb_ipcs_connection_next_get(ipcs_shm, last); crm_debug("Disconnecting non-blocking r/w client %p...", last); qb_ipcs_disconnect(last); qb_ipcs_connection_unref(last); disconnects++; } disconnects += g_hash_table_size(client_list); crm_debug("Disconnecting %d remote clients", g_hash_table_size(client_list)); g_hash_table_foreach(client_list, disconnect_remote_client, NULL); crm_info("Disconnected %d clients", disconnects); } qb_ipcs_stats_get(ipcs_rw, &srv_stats, QB_FALSE); if(g_hash_table_size(client_list) == 0) { crm_info("All clients disconnected (%d)", srv_stats.active_connections); initiate_exit(); } else { crm_info("Waiting on %d clients to disconnect (%d)", g_hash_table_size(client_list), srv_stats.active_connections); } } void initiate_exit(void) { int active = 0; xmlNode *leaving = NULL; active = crm_active_peers(); if (active < 2) { terminate_cib(__FUNCTION__, FALSE); return; } crm_info("Sending disconnect notification to %d peers...", active); leaving = create_xml_node(NULL, "exit-notification"); crm_xml_add(leaving, F_TYPE, "cib"); crm_xml_add(leaving, F_CIB_OPERATION, "cib_shutdown_req"); send_cluster_message(NULL, crm_msg_cib, leaving, TRUE); free_xml(leaving); g_timeout_add(crm_get_msec("5s"), cib_force_exit, NULL); } extern int remote_fd; extern int remote_tls_fd; -extern void terminate_ais_connection(void); +extern void terminate_cs_connection(void); void terminate_cib(const char *caller, gboolean fast) { if (remote_fd > 0) { close(remote_fd); remote_fd = 0; } if (remote_tls_fd > 0) { close(remote_tls_fd); remote_tls_fd = 0; } if(!fast) { if(is_heartbeat_cluster()) { #if SUPPORT_HEARTBEAT if (hb_conn != NULL) { crm_info("%s: Disconnecting heartbeat", caller); hb_conn->llc_ops->signoff(hb_conn, FALSE); hb_conn = NULL; } else { crm_err("%s: No heartbeat connection", caller); } #endif } else { #if SUPPORT_COROSYNC crm_info("%s: Disconnecting corosync", caller); - terminate_ais_connection(); + terminate_cs_connection(); #endif } } uninitializeCib(); crm_info("%s: Exiting%s...", caller, fast?" fast":mainloop?" from mainloop":""); if(fast == FALSE && mainloop != NULL && g_main_is_running(mainloop)) { g_main_quit(mainloop); } else { qb_ipcs_destroy(ipcs_ro); qb_ipcs_destroy(ipcs_rw); qb_ipcs_destroy(ipcs_shm); qb_log_fini(); if (fast) { exit(EX_USAGE); } else { exit(EX_OK); } } } diff --git a/cib/main.c b/cib/main.c index a8dc103f74..82105a3d65 100644 --- a/cib/main.c +++ b/cib/main.c @@ -1,622 +1,619 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #if HAVE_LIBXML2 # include #endif #ifdef HAVE_GETOPT_H # include #endif #if HAVE_BZLIB_H # include #endif extern int init_remote_listener(int port, gboolean encrypted); extern gboolean stand_alone; gboolean cib_shutdown_flag = FALSE; int cib_status = pcmk_ok; #if SUPPORT_HEARTBEAT oc_ev_t *cib_ev_token; ll_cluster_t *hb_conn = NULL; extern void oc_ev_special(const oc_ev_t *, oc_ev_class_t, int); gboolean cib_register_ha(ll_cluster_t * hb_cluster, const char *client_name); +#else +void *hb_conn = NULL; #endif extern void terminate_cib(const char *caller, gboolean fast); GMainLoop *mainloop = NULL; const char *cib_root = NULL; char *cib_our_uname = NULL; gboolean preserve_status = FALSE; gboolean cib_writes_enabled = TRUE; int remote_fd = 0; int remote_tls_fd = 0; void usage(const char *cmd, int exit_status); int cib_init(void); void cib_shutdown(int nsig); gboolean startCib(const char *filename); extern int write_cib_contents(gpointer p); GHashTable *client_list = NULL; GHashTable *config_hash = NULL; GHashTable *local_notify_queue = NULL; char *channel1 = NULL; char *channel2 = NULL; char *channel3 = NULL; char *channel4 = NULL; char *channel5 = NULL; #define OPTARGS "maswr:V?" void cib_cleanup(void); static void cib_enable_writes(int nsig) { crm_info("(Re)enabling disk writes"); cib_writes_enabled = TRUE; } static void log_cib_client(gpointer key, gpointer value, gpointer user_data) { cib_client_t *a_client = value; crm_info("Client %s", crm_str(a_client->name)); } int main(int argc, char **argv) { int flag; int rc = 0; int argerr = 0; #ifdef HAVE_GETOPT_H int option_index = 0; /* *INDENT-OFF* */ static struct option long_options[] = { {"per-action-cib", 0, 0, 'a'}, {"stand-alone", 0, 0, 's'}, {"disk-writes", 0, 0, 'w'}, {"cib-root", 1, 0, 'r'}, {"verbose", 0, 0, 'V'}, {"help", 0, 0, '?'}, {"metadata", 0, 0, 'm'}, {0, 0, 0, 0} }; /* *INDENT-ON* */ #endif struct passwd *pwentry = NULL; crm_log_init(NULL, LOG_INFO, TRUE, FALSE, argc, argv, FALSE); mainloop_add_signal(SIGTERM, cib_shutdown); mainloop_add_signal(SIGPIPE, cib_enable_writes); cib_writer = mainloop_add_trigger(G_PRIORITY_LOW, write_cib_contents, NULL); crm_peer_init(); client_list = g_hash_table_new(crm_str_hash, g_str_equal); while (1) { #ifdef HAVE_GETOPT_H flag = getopt_long(argc, argv, OPTARGS, long_options, &option_index); #else flag = getopt(argc, argv, OPTARGS); #endif if (flag == -1) break; switch (flag) { case 'V': crm_bump_log_level(argc, argv); break; case 's': stand_alone = TRUE; preserve_status = TRUE; cib_writes_enabled = FALSE; pwentry = getpwnam(CRM_DAEMON_USER); CRM_CHECK(pwentry != NULL, crm_perror(LOG_ERR, "Invalid uid (%s) specified", CRM_DAEMON_USER); return 100); rc = setgid(pwentry->pw_gid); if (rc < 0) { crm_perror(LOG_ERR, "Could not set group to %d", pwentry->pw_gid); return 100; } rc = setuid(pwentry->pw_uid); if (rc < 0) { crm_perror(LOG_ERR, "Could not set user to %d", pwentry->pw_uid); return 100; } break; case '?': /* Help message */ usage(crm_system_name, EX_OK); break; case 'w': cib_writes_enabled = TRUE; break; case 'r': cib_root = optarg; break; case 'm': cib_metadata(); return 0; default: ++argerr; break; } } if (argc - optind == 1 && safe_str_eq("metadata", argv[optind])) { cib_metadata(); return 0; } if (optind > argc) { ++argerr; } if (argerr) { usage(crm_system_name, EX_USAGE); } if(cib_root == NULL) { char *path = g_strdup_printf("%s/cib.xml", CRM_CONFIG_DIR); char *legacy = g_strdup_printf("%s/cib.xml", CRM_LEGACY_CONFIG_DIR); if(g_file_test(path, G_FILE_TEST_EXISTS)) { cib_root = CRM_CONFIG_DIR; } else if(g_file_test(legacy, G_FILE_TEST_EXISTS)) { cib_root = CRM_LEGACY_CONFIG_DIR; crm_notice("Using legacy config location: %s", cib_root); } else { cib_root = CRM_CONFIG_DIR; crm_notice("Using new config location: %s", cib_root); } g_free(legacy); g_free(path); } else { crm_notice("Using custom config location: %s", cib_root); } if (crm_is_writable(cib_root, NULL, CRM_DAEMON_USER, CRM_DAEMON_GROUP, FALSE) == FALSE) { crm_err("Bad permissions on %s. Terminating", cib_root); fprintf(stderr, "ERROR: Bad permissions on %s. See logs for details\n", cib_root); fflush(stderr); return 100; } /* read local config file */ rc = cib_init(); CRM_CHECK(g_hash_table_size(client_list) == 0, crm_warn("Not all clients gone at exit")); g_hash_table_foreach(client_list, log_cib_client, NULL); cib_cleanup(); #if SUPPORT_HEARTBEAT if (hb_conn) { hb_conn->llc_ops->delete(hb_conn); } #endif crm_info("Done"); return rc; } void cib_cleanup(void) { crm_peer_destroy(); if (local_notify_queue) { g_hash_table_destroy(local_notify_queue); } g_hash_table_destroy(config_hash); g_hash_table_destroy(client_list); free(cib_our_uname); #if HAVE_LIBXML2 crm_xml_cleanup(); #endif free(channel1); free(channel2); free(channel3); free(channel4); free(channel5); } unsigned long cib_num_ops = 0; const char *cib_stat_interval = "10min"; unsigned long cib_num_local = 0, cib_num_updates = 0, cib_num_fail = 0; unsigned long cib_bad_connects = 0, cib_num_timeouts = 0; #if SUPPORT_HEARTBEAT gboolean ccm_connect(void); static void ccm_connection_destroy(gpointer user_data) { crm_err("CCM connection failed... blocking while we reconnect"); CRM_ASSERT(ccm_connect()); return; } static void *ccm_library = NULL; gboolean ccm_connect(void) { gboolean did_fail = TRUE; int num_ccm_fails = 0; int max_ccm_fails = 30; int ret; int cib_ev_fd; int (*ccm_api_register) (oc_ev_t ** token) = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_register", 1); int (*ccm_api_set_callback) (const oc_ev_t * token, oc_ev_class_t class, oc_ev_callback_t * fn, oc_ev_callback_t ** prev_fn) = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_set_callback", 1); void (*ccm_api_special) (const oc_ev_t *, oc_ev_class_t, int) = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_special", 1); int (*ccm_api_activate) (const oc_ev_t * token, int *fd) = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_activate", 1); int (*ccm_api_unregister) (oc_ev_t * token) = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_unregister", 1); static struct mainloop_fd_callbacks ccm_fd_callbacks = { .dispatch = cib_ccm_dispatch, .destroy = ccm_connection_destroy, }; while (did_fail) { did_fail = FALSE; crm_info("Registering with CCM..."); ret = (*ccm_api_register) (&cib_ev_token); if (ret != 0) { did_fail = TRUE; } if (did_fail == FALSE) { crm_trace("Setting up CCM callbacks"); ret = (*ccm_api_set_callback) (cib_ev_token, OC_EV_MEMB_CLASS, cib_ccm_msg_callback, NULL); if (ret != 0) { crm_warn("CCM callback not set"); did_fail = TRUE; } } if (did_fail == FALSE) { (*ccm_api_special) (cib_ev_token, OC_EV_MEMB_CLASS, 0); crm_trace("Activating CCM token"); ret = (*ccm_api_activate) (cib_ev_token, &cib_ev_fd); if (ret != 0) { crm_warn("CCM Activation failed"); did_fail = TRUE; } } if (did_fail) { num_ccm_fails++; (*ccm_api_unregister) (cib_ev_token); if (num_ccm_fails < max_ccm_fails) { crm_warn("CCM Connection failed %d times (%d max)", num_ccm_fails, max_ccm_fails); sleep(3); } else { crm_err("CCM Activation failed %d (max) times", num_ccm_fails); return FALSE; } } } crm_debug("CCM Activation passed... all set to go!"); mainloop_add_fd("heartbeat-ccm", G_PRIORITY_MEDIUM, cib_ev_fd, cib_ev_token, &ccm_fd_callbacks); return TRUE; } #endif #if SUPPORT_COROSYNC static gboolean -cib_ais_dispatch(AIS_Message * wrapper, char *data, int sender) +cib_ais_dispatch(int kind, const char *from, const char *data) { xmlNode *xml = NULL; - if (wrapper->header.id == crm_class_cluster) { + if (kind == crm_class_cluster) { xml = string2xml(data); if (xml == NULL) { goto bail; } - crm_xml_add(xml, F_ORIG, wrapper->sender.uname); - crm_xml_add_int(xml, F_SEQ, wrapper->id); + crm_xml_add(xml, F_ORIG, from); + /* crm_xml_add_int(xml, F_SEQ, wrapper->id); */ cib_peer_callback(xml, NULL); } free_xml(xml); return TRUE; bail: crm_err("Invalid XML: '%.120s'", data); return TRUE; } static void cib_ais_destroy(gpointer user_data) { if (cib_shutdown_flag) { crm_info("Corosync disconnection complete"); } else { crm_err("Corosync connection lost! Exiting."); terminate_cib(__FUNCTION__, TRUE); } } #endif static void cib_peer_update_callback(enum crm_status_type type, crm_node_t * node, const void *data) { #if 0 /* crm_active_peers(crm_proc_cib) appears to give the wrong answer * sometimes, this might help figure out why */ if(type == crm_status_nstate) { crm_info("status: %s is now %s (was %s)", node->uname, node->state, (const char *)data); if (safe_str_eq(CRMD_JOINSTATE_MEMBER, node->state)) { return; } } else if(type == crm_status_processes) { uint32_t old = 0; if (data) { old = *(const uint32_t *)data; } if ((node->processes ^ old) & crm_proc_cib) { crm_info("status: cib process on %s is now %sactive", node->uname, is_set(node->processes, crm_proc_cib)?"":"in"); } else { return; } } else { return; } #endif if(cib_shutdown_flag && crm_active_peers() < 2 && g_hash_table_size(client_list) == 0) { crm_info("No more peers"); terminate_cib(__FUNCTION__, FALSE); } } #if SUPPORT_HEARTBEAT static void cib_ha_connection_destroy(gpointer user_data) { if (cib_shutdown_flag) { crm_info("Heartbeat disconnection complete... exiting"); terminate_cib(__FUNCTION__, FALSE); } else { crm_err("Heartbeat connection lost! Exiting."); terminate_cib(__FUNCTION__, TRUE); } } #endif int cib_init(void) { + static crm_cluster_t cluster; + + if (is_openais_cluster()) { +#if SUPPORT_COROSYNC + cluster.destroy = cib_ais_destroy; + cluster.cs_dispatch = cib_ais_dispatch; +#endif + } else if(is_heartbeat_cluster()) { +#if SUPPORT_HEARTBEAT + cluster.hb_dispatch = cib_ha_peer_callback; + cluster.destroy = cib_ha_connection_destroy; +#endif + } + config_hash = g_hash_table_new_full(crm_str_hash, g_str_equal, g_hash_destroy_str, g_hash_destroy_str); if (startCib("cib.xml") == FALSE) { crm_crit("Cannot start CIB... terminating"); exit(1); } if (stand_alone == FALSE) { - void *dispatch = NULL; - void *destroy = NULL; - - if (is_openais_cluster()) { -#if SUPPORT_COROSYNC - destroy = cib_ais_destroy; - dispatch = cib_ais_dispatch; -#endif - } else if(is_heartbeat_cluster()) { -#if SUPPORT_HEARTBEAT - dispatch = cib_ha_peer_callback; - destroy = cib_ha_connection_destroy; -#endif - } - - if (crm_cluster_connect(&cib_our_uname, NULL, dispatch, destroy, -#if SUPPORT_HEARTBEAT - &hb_conn -#else - NULL -#endif - ) == FALSE) { + if (crm_cluster_connect(&cluster) == FALSE) { crm_crit("Cannot sign in to the cluster... terminating"); exit(100); } + cib_our_uname = cluster.uname; if (is_openais_cluster()) { crm_set_status_callback(&cib_peer_update_callback); } #if SUPPORT_HEARTBEAT if (is_heartbeat_cluster()) { gboolean was_error = FALSE; + hb_conn = cluster.hb_conn; if (was_error == FALSE) { if (HA_OK != hb_conn->llc_ops->set_cstatus_callback(hb_conn, cib_client_status_callback, hb_conn)) { crm_err("Cannot set cstatus callback: %s", hb_conn->llc_ops->errmsg(hb_conn)); was_error = TRUE; } } if (was_error == FALSE) { was_error = (ccm_connect() == FALSE); } if (was_error == FALSE) { /* Async get client status information in the cluster */ crm_info("Requesting the list of configured nodes"); hb_conn->llc_ops->client_status(hb_conn, NULL, CRM_SYSTEM_CIB, -1); } } #endif } else { cib_our_uname = strdup("localhost"); } ipcs_ro = mainloop_add_ipc_server(cib_channel_ro, QB_IPC_NATIVE, &ipc_ro_callbacks); ipcs_rw = mainloop_add_ipc_server(cib_channel_rw, QB_IPC_NATIVE, &ipc_rw_callbacks); ipcs_shm = mainloop_add_ipc_server(cib_channel_shm, QB_IPC_SHM, &ipc_rw_callbacks); if (stand_alone) { cib_is_master = TRUE; } if (ipcs_ro != NULL && ipcs_rw != NULL && ipcs_shm != NULL) { /* Create the mainloop and run it... */ mainloop = g_main_new(FALSE); crm_info("Starting %s mainloop", crm_system_name); g_main_run(mainloop); } else { crm_err("Couldnt start all IPC channels, exiting."); return -1; } qb_ipcs_destroy(ipcs_ro); qb_ipcs_destroy(ipcs_rw); qb_ipcs_destroy(ipcs_shm); qb_log_fini(); return 0; } void usage(const char *cmd, int exit_status) { FILE *stream; stream = exit_status ? stderr : stdout; fprintf(stream, "usage: %s [-%s]\n", cmd, OPTARGS); fprintf(stream, "\t--%s (-%c)\t\tTurn on debug info." " Additional instances increase verbosity\n", "verbose", 'V'); fprintf(stream, "\t--%s (-%c)\t\tThis help message\n", "help", '?'); fprintf(stream, "\t--%s (-%c)\t\tShow configurable cib options\n", "metadata", 'm'); fprintf(stream, "\t--%s (-%c)\tAdvanced use only\n", "per-action-cib", 'a'); fprintf(stream, "\t--%s (-%c)\tAdvanced use only\n", "stand-alone", 's'); fprintf(stream, "\t--%s (-%c)\tAdvanced use only\n", "disk-writes", 'w'); fprintf(stream, "\t--%s (-%c)\t\tAdvanced use only\n", "cib-root", 'r'); fflush(stream); exit(exit_status); } gboolean startCib(const char *filename) { gboolean active = FALSE; xmlNode *cib = readCibXmlFile(cib_root, filename, !preserve_status); CRM_ASSERT(cib != NULL); if (activateCibXml(cib, TRUE, "start") == 0) { int port = 0; const char *port_s = NULL; active = TRUE; cib_read_config(config_hash, cib); port_s = crm_element_value(cib, "remote-tls-port"); if (port_s) { port = crm_parse_int(port_s, "0"); remote_tls_fd = init_remote_listener(port, TRUE); } port_s = crm_element_value(cib, "remote-clear-port"); if (port_s) { port = crm_parse_int(port_s, "0"); remote_fd = init_remote_listener(port, FALSE); } crm_info("CIB Initialization completed successfully"); } return active; } diff --git a/crmd/control.c b/crmd/control.c index 230d45f584..9ee4b4b33a 100644 --- a/crmd/control.c +++ b/crmd/control.c @@ -1,919 +1,921 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include qb_ipcs_service_t *ipcs = NULL; -extern gboolean crm_connect_corosync(void); +extern gboolean crm_connect_corosync(crm_cluster_t *cluster); extern void crmd_ha_connection_destroy(gpointer user_data); void crm_shutdown(int nsig); gboolean crm_read_options(gpointer user_data); gboolean fsa_has_quorum = FALSE; GHashTable *ipc_clients = NULL; crm_trigger_t *fsa_source = NULL; crm_trigger_t *config_read = NULL; /* A_HA_CONNECT */ void do_ha_control(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t * msg_data) { gboolean registered = FALSE; + static crm_cluster_t cluster; if (action & A_HA_DISCONNECT) { if (is_openais_cluster()) { crm_peer_destroy(); #if SUPPORT_COROSYNC - terminate_ais_connection(); + terminate_cs_connection(); #endif crm_info("Disconnected from OpenAIS"); #if SUPPORT_HEARTBEAT } else if (fsa_cluster_conn != NULL) { set_bit(fsa_input_register, R_HA_DISCONNECTED); fsa_cluster_conn->llc_ops->signoff(fsa_cluster_conn, FALSE); crm_info("Disconnected from Heartbeat"); #endif } } if (action & A_HA_CONNECT) { crm_set_status_callback(&peer_update_callback); if (is_openais_cluster()) { #if SUPPORT_COROSYNC - registered = crm_connect_corosync(); + registered = crm_connect_corosync(&cluster); #endif } else if (is_heartbeat_cluster()) { #if SUPPORT_HEARTBEAT - registered = - crm_cluster_connect(&fsa_our_uname, &fsa_our_uuid, crmd_ha_msg_callback, - crmd_ha_connection_destroy, &fsa_cluster_conn); -#endif - } -#if SUPPORT_HEARTBEAT - if (is_heartbeat_cluster()) { + cluster.destroy = crmd_ha_connection_destroy; + cluster.hb_dispatch = crmd_ha_msg_callback; + + registered = crm_cluster_connect(&cluster); + fsa_cluster_conn = cluster.hb_conn; + crm_trace("Be informed of Node Status changes"); if (registered && fsa_cluster_conn->llc_ops->set_nstatus_callback(fsa_cluster_conn, crmd_ha_status_callback, fsa_cluster_conn) != HA_OK) { crm_err("Cannot set nstatus callback: %s", fsa_cluster_conn->llc_ops->errmsg(fsa_cluster_conn)); registered = FALSE; } crm_trace("Be informed of CRM Client Status changes"); if (registered && fsa_cluster_conn->llc_ops->set_cstatus_callback(fsa_cluster_conn, crmd_client_status_callback, fsa_cluster_conn) != HA_OK) { crm_err("Cannot set cstatus callback: %s", fsa_cluster_conn->llc_ops->errmsg(fsa_cluster_conn)); registered = FALSE; } if (registered) { crm_trace("Requesting an initial dump of CRMD client_status"); fsa_cluster_conn->llc_ops->client_status(fsa_cluster_conn, NULL, CRM_SYSTEM_CRMD, -1); } - } #endif + } + fsa_our_uname = cluster.uname; + fsa_our_uuid = cluster.uuid; if (registered == FALSE) { set_bit(fsa_input_register, R_HA_DISCONNECTED); register_fsa_error(C_FSA_INTERNAL, I_ERROR, NULL); return; } populate_cib_nodes(node_update_none, __FUNCTION__); clear_bit(fsa_input_register, R_HA_DISCONNECTED); crm_info("Connected to the cluster"); } if (action & ~(A_HA_CONNECT | A_HA_DISCONNECT)) { crm_err("Unexpected action %s in %s", fsa_action2string(action), __FUNCTION__); } } /* A_SHUTDOWN */ void do_shutdown(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t * msg_data) { /* just in case */ set_bit(fsa_input_register, R_SHUTDOWN); if (is_heartbeat_cluster()) { if (is_set(fsa_input_register, pe_subsystem->flag_connected)) { crm_info("Terminating the %s", pe_subsystem->name); if (stop_subsystem(pe_subsystem, TRUE) == FALSE) { /* its gone... */ crm_err("Faking %s exit", pe_subsystem->name); clear_bit(fsa_input_register, pe_subsystem->flag_connected); } else { crm_info("Waiting for subsystems to exit"); crmd_fsa_stall(NULL); } } crm_info("All subsystems stopped, continuing"); } if (stonith_api) { /* Prevent it from comming up again */ clear_bit(fsa_input_register, R_ST_REQUIRED); crm_info("Disconnecting STONITH..."); stonith_api->cmds->disconnect(stonith_api); } } /* A_SHUTDOWN_REQ */ void do_shutdown_req(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t * msg_data) { xmlNode *msg = NULL; crm_info("Sending shutdown request to %s", crm_str(fsa_our_dc)); msg = create_request(CRM_OP_SHUTDOWN_REQ, NULL, NULL, CRM_SYSTEM_DC, CRM_SYSTEM_CRMD, NULL); /* set_bit(fsa_input_register, R_STAYDOWN); */ if (send_cluster_message(NULL, crm_msg_crmd, msg, TRUE) == FALSE) { register_fsa_error(C_FSA_INTERNAL, I_ERROR, NULL); } free_xml(msg); } extern crm_ipc_t *attrd_ipc; extern char *max_generation_from; extern xmlNode *max_generation_xml; extern GHashTable *resource_history; extern GHashTable *voted; extern GHashTable *reload_hash; void log_connected_client(gpointer key, gpointer value, gpointer user_data); void log_connected_client(gpointer key, gpointer value, gpointer user_data) { crmd_client_t *client = value; crm_err("%s is still connected at exit", client->table_key); } static void free_mem(fsa_data_t * msg_data) { GListPtr gIter = NULL; if(attrd_ipc) { crm_ipc_close(attrd_ipc); crm_ipc_destroy(attrd_ipc); } if(crmd_mainloop) { g_main_loop_quit(crmd_mainloop); g_main_loop_unref(crmd_mainloop); } #if SUPPORT_HEARTBEAT if (fsa_cluster_conn) { fsa_cluster_conn->llc_ops->delete(fsa_cluster_conn); fsa_cluster_conn = NULL; } #endif for(gIter = fsa_message_queue; gIter != NULL; gIter = gIter->next) { fsa_data_t *fsa_data = gIter->data; crm_info("Dropping %s: [ state=%s cause=%s origin=%s ]", fsa_input2string(fsa_data->fsa_input), fsa_state2string(fsa_state), fsa_cause2string(fsa_data->fsa_cause), fsa_data->origin); delete_fsa_input(fsa_data); } g_list_free(fsa_message_queue); delete_fsa_input(msg_data); if (ipc_clients) { crm_debug("Number of connected clients: %d", g_hash_table_size(ipc_clients)); /* g_hash_table_foreach(ipc_clients, log_connected_client, NULL); */ g_hash_table_destroy(ipc_clients); } empty_uuid_cache(); crm_peer_destroy(); clear_bit(fsa_input_register, R_MEMBERSHIP); if (te_subsystem->client && te_subsystem->client->ipc) { crm_debug("Full destroy: TE"); qb_ipcs_disconnect(te_subsystem->client->ipc); } free(te_subsystem); if (pe_subsystem->client && pe_subsystem->client->ipc) { crm_debug("Full destroy: PE"); qb_ipcs_disconnect(pe_subsystem->client->ipc); } free(pe_subsystem); free(cib_subsystem); if (integrated_nodes) { g_hash_table_destroy(integrated_nodes); } if (finalized_nodes) { g_hash_table_destroy(finalized_nodes); } if (confirmed_nodes) { g_hash_table_destroy(confirmed_nodes); } if (reload_hash) { g_hash_table_destroy(reload_hash); } if (resource_history) { g_hash_table_destroy(resource_history); } if (voted) { g_hash_table_destroy(voted); } cib_delete(fsa_cib_conn); fsa_cib_conn = NULL; if (fsa_lrm_conn) { lrmd_api_delete(fsa_lrm_conn); fsa_lrm_conn = NULL; } free(transition_timer); free(integration_timer); free(finalization_timer); free(election_trigger); free(election_timeout); free(shutdown_escalation_timer); free(wait_timer); free(recheck_timer); free(fsa_our_dc_version); free(fsa_our_uname); free(fsa_our_uuid); free(fsa_our_dc); free(max_generation_from); free_xml(max_generation_xml); crm_xml_cleanup(); } /* A_EXIT_0, A_EXIT_1 */ void do_exit(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t * msg_data) { int exit_code = 0; int log_level = LOG_INFO; const char *exit_type = "gracefully"; if (action & A_EXIT_1) { exit_code = 1; log_level = LOG_ERR; exit_type = "forcefully"; } verify_stopped(cur_state, LOG_ERR); do_crm_log(log_level, "Performing %s - %s exiting the CRMd", fsa_action2string(action), exit_type); if (is_set(fsa_input_register, R_IN_RECOVERY)) { crm_err("Could not recover from internal error"); exit_code = 2; } if (is_set(fsa_input_register, R_STAYDOWN)) { crm_warn("Inhibiting respawn by Heartbeat"); exit_code = 100; } crm_info("[%s] stopped (%d)", crm_system_name, exit_code); free_mem(msg_data); exit(exit_code); } /* A_STARTUP */ void do_startup(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t * msg_data) { int was_error = 0; int interval = 1; /* seconds between DC heartbeats */ crm_debug("Registering Signal Handlers"); mainloop_add_signal(SIGTERM, crm_shutdown); fsa_source = mainloop_add_trigger(G_PRIORITY_HIGH, crm_fsa_trigger, NULL); config_read = mainloop_add_trigger(G_PRIORITY_HIGH, crm_read_options, NULL); ipc_clients = g_hash_table_new(crm_str_hash, g_str_equal); crm_debug("Creating CIB and LRM objects"); fsa_cib_conn = cib_new(); fsa_lrm_conn = lrmd_api_new(); /* set up the timers */ transition_timer = calloc(1, sizeof(fsa_timer_t)); integration_timer = calloc(1, sizeof(fsa_timer_t)); finalization_timer = calloc(1, sizeof(fsa_timer_t)); election_trigger = calloc(1, sizeof(fsa_timer_t)); election_timeout = calloc(1, sizeof(fsa_timer_t)); shutdown_escalation_timer = calloc(1, sizeof(fsa_timer_t)); wait_timer = calloc(1, sizeof(fsa_timer_t)); recheck_timer = calloc(1, sizeof(fsa_timer_t)); interval = interval * 1000; if (election_trigger != NULL) { election_trigger->source_id = 0; election_trigger->period_ms = -1; election_trigger->fsa_input = I_DC_TIMEOUT; election_trigger->callback = crm_timer_popped; election_trigger->repeat = FALSE; } else { was_error = TRUE; } if (election_timeout != NULL) { election_timeout->source_id = 0; election_timeout->period_ms = -1; election_timeout->fsa_input = I_ELECTION_DC; election_timeout->callback = crm_timer_popped; election_timeout->repeat = FALSE; } else { was_error = TRUE; } if (transition_timer != NULL) { transition_timer->source_id = 0; transition_timer->period_ms = -1; transition_timer->fsa_input = I_PE_CALC; transition_timer->callback = crm_timer_popped; transition_timer->repeat = FALSE; } else { was_error = TRUE; } if (integration_timer != NULL) { integration_timer->source_id = 0; integration_timer->period_ms = -1; integration_timer->fsa_input = I_INTEGRATED; integration_timer->callback = crm_timer_popped; integration_timer->repeat = FALSE; } else { was_error = TRUE; } if (finalization_timer != NULL) { finalization_timer->source_id = 0; finalization_timer->period_ms = -1; finalization_timer->fsa_input = I_FINALIZED; finalization_timer->callback = crm_timer_popped; finalization_timer->repeat = FALSE; /* for possible enabling... a bug in the join protocol left * a slave in S_PENDING while we think its in S_NOT_DC * * raising I_FINALIZED put us into a transition loop which is * never resolved. * in this loop we continually send probes which the node * NACK's because its in S_PENDING * * if we have nodes where heartbeat is active but the * CRM is not... then this will be handled in the * integration phase */ finalization_timer->fsa_input = I_ELECTION; } else { was_error = TRUE; } if (shutdown_escalation_timer != NULL) { shutdown_escalation_timer->source_id = 0; shutdown_escalation_timer->period_ms = -1; shutdown_escalation_timer->fsa_input = I_STOP; shutdown_escalation_timer->callback = crm_timer_popped; shutdown_escalation_timer->repeat = FALSE; } else { was_error = TRUE; } if (wait_timer != NULL) { wait_timer->source_id = 0; wait_timer->period_ms = 2000; wait_timer->fsa_input = I_NULL; wait_timer->callback = crm_timer_popped; wait_timer->repeat = FALSE; } else { was_error = TRUE; } if (recheck_timer != NULL) { recheck_timer->source_id = 0; recheck_timer->period_ms = -1; recheck_timer->fsa_input = I_PE_CALC; recheck_timer->callback = crm_timer_popped; recheck_timer->repeat = FALSE; } else { was_error = TRUE; } /* set up the sub systems */ cib_subsystem = calloc(1, sizeof(struct crm_subsystem_s)); te_subsystem = calloc(1, sizeof(struct crm_subsystem_s)); pe_subsystem = calloc(1, sizeof(struct crm_subsystem_s)); if (cib_subsystem != NULL) { cib_subsystem->pid = -1; cib_subsystem->name = CRM_SYSTEM_CIB; cib_subsystem->flag_connected = R_CIB_CONNECTED; cib_subsystem->flag_required = R_CIB_REQUIRED; } else { was_error = TRUE; } if (te_subsystem != NULL) { te_subsystem->pid = -1; te_subsystem->name = CRM_SYSTEM_TENGINE; te_subsystem->flag_connected = R_TE_CONNECTED; te_subsystem->flag_required = R_TE_REQUIRED; } else { was_error = TRUE; } if (pe_subsystem != NULL) { pe_subsystem->pid = -1; pe_subsystem->path = CRM_DAEMON_DIR; pe_subsystem->name = CRM_SYSTEM_PENGINE; pe_subsystem->command = CRM_DAEMON_DIR "/" CRM_SYSTEM_PENGINE; pe_subsystem->args = NULL; pe_subsystem->flag_connected = R_PE_CONNECTED; pe_subsystem->flag_required = R_PE_REQUIRED; } else { was_error = TRUE; } if (was_error == FALSE && is_heartbeat_cluster()) { if(start_subsystem(pe_subsystem) == FALSE) { was_error = TRUE; } } if (was_error) { register_fsa_error(C_FSA_INTERNAL, I_ERROR, NULL); } welcomed_nodes = g_hash_table_new_full(crm_str_hash, g_str_equal, g_hash_destroy_str, g_hash_destroy_str); integrated_nodes = g_hash_table_new_full(crm_str_hash, g_str_equal, g_hash_destroy_str, g_hash_destroy_str); finalized_nodes = g_hash_table_new_full(crm_str_hash, g_str_equal, g_hash_destroy_str, g_hash_destroy_str); confirmed_nodes = g_hash_table_new_full(crm_str_hash, g_str_equal, g_hash_destroy_str, g_hash_destroy_str); } static int32_t crmd_ipc_accept(qb_ipcs_connection_t *c, uid_t uid, gid_t gid) { crmd_client_t *blank_client = NULL; #if ENABLE_ACL struct group *crm_grp = NULL; #endif crm_trace("Connecting %p for uid=%d gid=%d", c, uid, gid); blank_client = calloc(1, sizeof(crmd_client_t)); CRM_ASSERT(blank_client != NULL); crm_trace("Created client: %p", blank_client); blank_client->ipc = c; blank_client->sub_sys = NULL; blank_client->uuid = NULL; blank_client->table_key = NULL; #if ENABLE_ACL crm_grp = getgrnam(CRM_DAEMON_GROUP); if (crm_grp) { qb_ipcs_connection_auth_set(c, -1, crm_grp->gr_gid, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); } blank_client->user = uid2username(uid); #endif qb_ipcs_context_set(c, blank_client); return 0; } static void crmd_ipc_created(qb_ipcs_connection_t *c) { crm_trace("Client %p connected", c); } static int32_t crmd_ipc_dispatch(qb_ipcs_connection_t *c, void *data, size_t size) { uint32_t id = 0; uint32_t flags = 0; crmd_client_t *client = qb_ipcs_context_get(c); xmlNode *msg = crm_ipcs_recv(c, data, size, &id, &flags); crm_trace("Invoked: %s", client->table_key); if(flags & crm_ipc_client_response) { crm_ipcs_send_ack(c, id, "ack", __FUNCTION__, __LINE__); } if (msg == NULL) { return 0; } #if ENABLE_ACL determine_request_user(client->user, msg, F_CRM_USER); #endif crm_trace("Processing msg from %s", client->table_key); crm_log_xml_trace(msg, "CRMd[inbound]"); if (crmd_authorize_message(msg, client)) { route_message(C_IPC_MESSAGE, msg); } trigger_fsa(fsa_source); free_xml(msg); return 0; } static int32_t crmd_ipc_closed(qb_ipcs_connection_t *c) { return 0; } static void crmd_ipc_destroy(qb_ipcs_connection_t *c) { crmd_client_t *client = qb_ipcs_context_get(c); if (client == NULL) { crm_trace("No client to delete"); return; } process_client_disconnect(client); crm_trace("Disconnecting client %s (%p)", client->table_key, client); free(client->table_key); free(client->sub_sys); free(client->uuid); free(client->user); free(client); trigger_fsa(fsa_source); } /* A_STOP */ void do_stop(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t * msg_data) { if (is_heartbeat_cluster()) { stop_subsystem(pe_subsystem, FALSE); } mainloop_del_ipc_server(ipcs); register_fsa_input(C_FSA_INTERNAL, I_TERMINATE, NULL); } /* A_STARTED */ void do_started(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t * msg_data) { static struct qb_ipcs_service_handlers crmd_callbacks = { .connection_accept = crmd_ipc_accept, .connection_created = crmd_ipc_created, .msg_process = crmd_ipc_dispatch, .connection_closed = crmd_ipc_closed, .connection_destroyed = crmd_ipc_destroy }; if (cur_state != S_STARTING) { crm_err("Start cancelled... %s", fsa_state2string(cur_state)); return; } else if (is_set(fsa_input_register, R_MEMBERSHIP) == FALSE) { crm_info("Delaying start, no membership data (%.16llx)", R_MEMBERSHIP); crmd_fsa_stall(NULL); return; } else if (is_set(fsa_input_register, R_LRM_CONNECTED) == FALSE) { crm_info("Delaying start, LRM not connected (%.16llx)", R_LRM_CONNECTED); crmd_fsa_stall(NULL); return; } else if (is_set(fsa_input_register, R_CIB_CONNECTED) == FALSE) { crm_info("Delaying start, CIB not connected (%.16llx)", R_CIB_CONNECTED); crmd_fsa_stall(NULL); return; } else if (is_set(fsa_input_register, R_READ_CONFIG) == FALSE) { crm_info("Delaying start, Config not read (%.16llx)", R_READ_CONFIG); crmd_fsa_stall(NULL); return; } else if (is_set(fsa_input_register, R_PEER_DATA) == FALSE) { /* try reading from HA */ crm_info("Delaying start, No peer data (%.16llx)", R_PEER_DATA); #if SUPPORT_HEARTBEAT if (is_heartbeat_cluster()) { HA_Message *msg = NULL; crm_trace("Looking for a HA message"); msg = fsa_cluster_conn->llc_ops->readmsg(fsa_cluster_conn, 0); if (msg != NULL) { crm_trace("There was a HA message"); ha_msg_del(msg); } } #endif crmd_fsa_stall(NULL); return; } crm_debug("Init server comms"); ipcs = mainloop_add_ipc_server(CRM_SYSTEM_CRMD, QB_IPC_NATIVE, &crmd_callbacks); if (ipcs == NULL) { crm_err("Couldn't start IPC server"); register_fsa_error(C_FSA_INTERNAL, I_ERROR, NULL); } if (stonith_reconnect == NULL) { int dummy; stonith_reconnect = mainloop_add_trigger(G_PRIORITY_LOW, te_connect_stonith, &dummy); } set_bit(fsa_input_register, R_ST_REQUIRED); mainloop_set_trigger(stonith_reconnect); crm_notice("The local CRM is operational"); clear_bit(fsa_input_register, R_STARTING); register_fsa_input(msg_data->fsa_cause, I_PENDING, NULL); } /* A_RECOVER */ void do_recover(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t * msg_data) { set_bit(fsa_input_register, R_IN_RECOVERY); crm_err("Action %s (%.16llx) not supported", fsa_action2string(action), action); register_fsa_input(C_FSA_INTERNAL, I_TERMINATE, NULL); } /* *INDENT-OFF* */ pe_cluster_option crmd_opts[] = { /* name, old-name, validate, default, description */ { "dc-version", NULL, "string", NULL, "none", NULL, "Version of Pacemaker on the cluster's DC.", "Includes the hash which identifies the exact Mercurial changeset it was built from. Used for diagnostic purposes." }, { "cluster-infrastructure", NULL, "string", NULL, "heartbeat", NULL, "The messaging stack on which Pacemaker is currently running.", "Used for informational and diagnostic purposes." }, { XML_CONFIG_ATTR_DC_DEADTIME, "dc_deadtime", "time", NULL, "20s", &check_time, "How long to wait for a response from other nodes during startup.", "The \"correct\" value will depend on the speed/load of your network and the type of switches used." }, { XML_CONFIG_ATTR_RECHECK, "cluster_recheck_interval", "time", "Zero disables polling. Positive values are an interval in seconds (unless other SI units are specified. eg. 5min)", "15min", &check_timer, "Polling interval for time based changes to options, resource parameters and constraints.", "The Cluster is primarily event driven, however the configuration can have elements that change based on time." " To ensure these changes take effect, we can optionally poll the cluster's status for changes." }, { XML_CONFIG_ATTR_ELECTION_FAIL, "election_timeout", "time", NULL, "2min", &check_timer, "*** Advanced Use Only ***.", "If need to adjust this value, it probably indicates the presence of a bug." }, { XML_CONFIG_ATTR_FORCE_QUIT, "shutdown_escalation", "time", NULL, "20min", &check_timer, "*** Advanced Use Only ***.", "If need to adjust this value, it probably indicates the presence of a bug." }, { "crmd-integration-timeout", NULL, "time", NULL, "3min", &check_timer, "*** Advanced Use Only ***.", "If need to adjust this value, it probably indicates the presence of a bug." }, { "crmd-finalization-timeout", NULL, "time", NULL, "30min", &check_timer, "*** Advanced Use Only ***.", "If you need to adjust this value, it probably indicates the presence of a bug." }, { "crmd-transition-delay", NULL, "time", NULL, "0s", &check_timer, "*** Advanced Use Only ***\nEnabling this option will slow down cluster recovery under all conditions", "Delay cluster recovery for the configured interval to allow for additional/related events to occur.\nUseful if your configuration is sensitive to the order in which ping updates arrive." }, { XML_ATTR_EXPECTED_VOTES, NULL, "integer", NULL, "2", &check_number, "The number of nodes expected to be in the cluster", "Used to calculate quorum in openais based clusters." }, }; /* *INDENT-ON* */ void crmd_metadata(void) { config_metadata("CRM Daemon", "1.0", "CRM Daemon Options", "This is a fake resource that details the options that can be configured for the CRM Daemon.", crmd_opts, DIMOF(crmd_opts)); } static void verify_crmd_options(GHashTable * options) { verify_all_options(options, crmd_opts, DIMOF(crmd_opts)); } static const char * crmd_pref(GHashTable * options, const char *name) { return get_cluster_pref(options, crmd_opts, DIMOF(crmd_opts), name); } static void config_query_callback(xmlNode * msg, int call_id, int rc, xmlNode * output, void *user_data) { const char *value = NULL; GHashTable *config_hash = NULL; ha_time_t *now = new_ha_date(TRUE); if (rc != pcmk_ok) { fsa_data_t *msg_data = NULL; crm_err("Local CIB query resulted in an error: %s", pcmk_strerror(rc)); register_fsa_error(C_FSA_INTERNAL, I_ERROR, NULL); if (rc == -EACCES || rc == -pcmk_err_dtd_validation) { crm_err("The cluster is mis-configured - shutting down and staying down"); set_bit(fsa_input_register, R_STAYDOWN); } goto bail; } crm_debug("Call %d : Parsing CIB options", call_id); config_hash = g_hash_table_new_full(crm_str_hash, g_str_equal, g_hash_destroy_str, g_hash_destroy_str); unpack_instance_attributes(output, output, XML_CIB_TAG_PROPSET, NULL, config_hash, CIB_OPTIONS_FIRST, FALSE, now); verify_crmd_options(config_hash); value = crmd_pref(config_hash, XML_CONFIG_ATTR_DC_DEADTIME); election_trigger->period_ms = crm_get_msec(value); value = crmd_pref(config_hash, XML_CONFIG_ATTR_FORCE_QUIT); shutdown_escalation_timer->period_ms = crm_get_msec(value); crm_debug("Shutdown escalation occurs after: %dms", shutdown_escalation_timer->period_ms); value = crmd_pref(config_hash, XML_CONFIG_ATTR_ELECTION_FAIL); election_timeout->period_ms = crm_get_msec(value); value = crmd_pref(config_hash, XML_CONFIG_ATTR_RECHECK); recheck_timer->period_ms = crm_get_msec(value); crm_debug("Checking for expired actions every %dms", recheck_timer->period_ms); value = crmd_pref(config_hash, "crmd-transition-delay"); transition_timer->period_ms = crm_get_msec(value); value = crmd_pref(config_hash, "crmd-integration-timeout"); integration_timer->period_ms = crm_get_msec(value); value = crmd_pref(config_hash, "crmd-finalization-timeout"); finalization_timer->period_ms = crm_get_msec(value); #if SUPPORT_COROSYNC if (is_classic_ais_cluster()) { value = crmd_pref(config_hash, XML_ATTR_EXPECTED_VOTES); crm_debug("Sending expected-votes=%s to corosync", value); send_ais_text(crm_class_quorum, value, TRUE, NULL, crm_msg_ais); } #endif set_bit(fsa_input_register, R_READ_CONFIG); crm_trace("Triggering FSA: %s", __FUNCTION__); mainloop_set_trigger(fsa_source); g_hash_table_destroy(config_hash); bail: free_ha_date(now); } gboolean crm_read_options(gpointer user_data) { int call_id = fsa_cib_conn->cmds->query(fsa_cib_conn, XML_CIB_TAG_CRMCONFIG, NULL, cib_scope_local); add_cib_op_callback(fsa_cib_conn, call_id, FALSE, NULL, config_query_callback); crm_trace("Querying the CIB... call %d", call_id); return TRUE; } /* A_READCONFIG */ void do_read_config(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t * msg_data) { mainloop_set_trigger(config_read); } void crm_shutdown(int nsig) { if (crmd_mainloop != NULL && g_main_is_running(crmd_mainloop)) { if (is_set(fsa_input_register, R_SHUTDOWN)) { crm_err("Escalating the shutdown"); register_fsa_input_before(C_SHUTDOWN, I_ERROR, NULL); } else { set_bit(fsa_input_register, R_SHUTDOWN); register_fsa_input(C_SHUTDOWN, I_SHUTDOWN, NULL); if (shutdown_escalation_timer->period_ms < 1) { const char *value = crmd_pref(NULL, XML_CONFIG_ATTR_FORCE_QUIT); int msec = crm_get_msec(value); crm_debug("Using default shutdown escalation: %dms", msec); shutdown_escalation_timer->period_ms = msec; } /* cant rely on this... */ crm_notice("Requesting shutdown, upper limit is %dms", shutdown_escalation_timer->period_ms); crm_timer_start(shutdown_escalation_timer); } } else { crm_info("exit from shutdown"); exit(EX_OK); } } void default_cib_update_callback(xmlNode * msg, int call_id, int rc, xmlNode * output, void *user_data) { if (rc != pcmk_ok) { fsa_data_t *msg_data = NULL; crm_err("CIB Update failed: %s", pcmk_strerror(rc)); crm_log_xml_warn(output, "update:failed"); register_fsa_error(C_FSA_INTERNAL, I_ERROR, NULL); } } diff --git a/crmd/corosync.c b/crmd/corosync.c index 8397c89a0f..39fb5e7cbb 100644 --- a/crmd/corosync.c +++ b/crmd/corosync.c @@ -1,200 +1,202 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include extern void post_cache_update(int seq); extern void crmd_ha_connection_destroy(gpointer user_data); /* A_HA_CONNECT */ #if SUPPORT_COROSYNC static gboolean -crmd_ais_dispatch(AIS_Message * wrapper, char *data, int sender) +crmd_ais_dispatch(int kind, const char *from, const char *data) { int seq = 0; xmlNode *xml = NULL; const char *seq_s = NULL; crm_node_t *peer = NULL; enum crm_proc_flag flag = crm_proc_cpg; xml = string2xml(data); if (xml == NULL) { - crm_err("Could not parse message content (%d): %.100s", wrapper->header.id, data); + crm_err("Could not parse message content (%d): %.100s", kind, data); return TRUE; } - switch (wrapper->header.id) { + switch (kind) { case crm_class_members: seq_s = crm_element_value(xml, "id"); seq = crm_int_helper(seq_s, NULL); set_bit(fsa_input_register, R_PEER_DATA); post_cache_update(seq); /* fall through */ case crm_class_quorum: crm_update_quorum(crm_have_quorum, FALSE); if (AM_I_DC) { const char *votes = crm_element_value(xml, "expected"); if (votes == NULL || check_number(votes) == FALSE) { crm_log_xml_err(xml, "Invalid quorum/membership update"); } else { int rc = update_attr_delegate( fsa_cib_conn, cib_quorum_override | cib_scope_local | cib_inhibit_notify, XML_CIB_TAG_CRMCONFIG, NULL, NULL, NULL, NULL, XML_ATTR_EXPECTED_VOTES, votes, FALSE, NULL); crm_info("Setting expected votes to %s", votes); if (pcmk_ok > rc) { crm_err("Quorum update failed: %s", pcmk_strerror(rc)); } } } break; case crm_class_cluster: - crm_xml_add(xml, F_ORIG, wrapper->sender.uname); - crm_xml_add_int(xml, F_SEQ, wrapper->id); + crm_xml_add(xml, F_ORIG, from); + /* crm_xml_add_int(xml, F_SEQ, wrapper->id); Fake? */ if(is_heartbeat_cluster()) { flag = crm_proc_heartbeat; } else if(is_classic_ais_cluster()) { flag = crm_proc_plugin; } - peer = crm_get_peer(0, wrapper->sender.uname); + peer = crm_get_peer(0, from); if(is_not_set(peer->processes, flag)) { /* If we can still talk to our peer process on that node, * then its also part of the corosync membership */ crm_err("Recieving messages from a node we think is dead: %s[%d]", peer->uname, peer->id); crm_update_peer_proc(__FUNCTION__, peer, flag, ONLINESTATUS); } crmd_ha_msg_filter(xml); break; case crm_class_rmpeer: /* Ignore */ break; case crm_class_notify: case crm_class_nodeid: - crm_err("Unexpected message class (%d): %.100s", wrapper->header.id, data); + crm_err("Unexpected message class (%d): %.100s", kind, data); break; default: - crm_err("Invalid message class (%d): %.100s", wrapper->header.id, data); + crm_err("Invalid message class (%d): %.100s", kind, data); } free_xml(xml); return TRUE; } static gboolean crmd_cman_dispatch(unsigned long long seq, gboolean quorate) { crm_update_quorum(quorate, FALSE); post_cache_update(seq); return TRUE; } static void crmd_quorum_destroy(gpointer user_data) { if (is_set(fsa_input_register, R_HA_DISCONNECTED)) { crm_err("connection terminated"); exit(1); } else { crm_info("connection closed"); } } static void crmd_ais_destroy(gpointer user_data) { if (is_set(fsa_input_register, R_HA_DISCONNECTED)) { crm_err("connection terminated"); exit(1); } else { crm_info("connection closed"); } } #if SUPPORT_CMAN static void crmd_cman_destroy(gpointer user_data) { if (is_set(fsa_input_register, R_HA_DISCONNECTED)) { crm_err("connection terminated"); exit(1); } else { crm_info("connection closed"); } } #endif -extern gboolean crm_connect_corosync(void); +extern gboolean crm_connect_corosync(crm_cluster_t *cluster); gboolean -crm_connect_corosync(void) +crm_connect_corosync(crm_cluster_t *cluster) { gboolean rc = FALSE; if (is_openais_cluster()) { crm_set_status_callback(&peer_update_callback); - rc = crm_cluster_connect(&fsa_our_uname, &fsa_our_uuid, crmd_ais_dispatch, crmd_ais_destroy, - NULL); + cluster->cs_dispatch = crmd_ais_dispatch; + cluster->destroy = crmd_ais_destroy; + + rc = crm_cluster_connect(cluster); } if (rc && is_corosync_cluster()) { init_quorum_connection(crmd_cman_dispatch, crmd_quorum_destroy); } #if SUPPORT_CMAN if (rc && is_cman_cluster()) { init_cman_connection(crmd_cman_dispatch, crmd_cman_destroy); set_bit(fsa_input_register, R_MEMBERSHIP); } #endif return rc; } #endif diff --git a/fencing/main.c b/fencing/main.c index 1988d57052..52853219f2 100644 --- a/fencing/main.c +++ b/fencing/main.c @@ -1,891 +1,878 @@ /* * Copyright (C) 2009 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include char *stonith_our_uname = NULL; GMainLoop *mainloop = NULL; GHashTable *client_list = NULL; gboolean stand_alone = FALSE; gboolean no_cib_connect = FALSE; gboolean stonith_shutdown_flag = FALSE; qb_ipcs_service_t *ipcs = NULL; -#if SUPPORT_HEARTBEAT -ll_cluster_t *hb_conn = NULL; -#endif - static void stonith_shutdown(int nsig); static void stonith_cleanup(void); static int32_t st_ipc_accept(qb_ipcs_connection_t *c, uid_t uid, gid_t gid) { crm_trace("Connecting %p for uid=%d gid=%d", c, uid, gid); if(stonith_shutdown_flag) { crm_info("Ignoring new client [%d] during shutdown", crm_ipcs_client_pid(c)); return -EPERM; } return 0; } static void st_ipc_created(qb_ipcs_connection_t *c) { stonith_client_t *new_client = NULL; #if 0 struct qb_ipcs_stats srv_stats; qb_ipcs_stats_get(s1, &srv_stats, QB_FALSE); qb_log(LOG_INFO, "Connection created (active:%d, closed:%d)", srv_stats.active_connections, srv_stats.closed_connections); #endif new_client = calloc(1, sizeof(stonith_client_t)); new_client->channel = c; new_client->channel_name = strdup("ipc"); CRM_CHECK(new_client->id == NULL, free(new_client->id)); new_client->id = crm_generate_uuid(); crm_trace("Created channel %p for client %s", c, new_client->id); /* make sure we can find ourselves later for sync calls * redirected to the master instance */ g_hash_table_insert(client_list, new_client->id, new_client); qb_ipcs_context_set(c, new_client); CRM_ASSERT(qb_ipcs_context_get(c) != NULL); } /* Exit code means? */ static int32_t st_ipc_dispatch(qb_ipcs_connection_t *c, void *data, size_t size) { uint32_t id = 0; uint32_t flags = 0; xmlNode *request = NULL; stonith_client_t *client = (stonith_client_t*)qb_ipcs_context_get(c); request = crm_ipcs_recv(c, data, size, &id, &flags); if (request == NULL) { crm_ipcs_send_ack(c, id, "nack", __FUNCTION__, __LINE__); return 0; } CRM_CHECK(client != NULL, goto cleanup); if(client->name == NULL) { const char *value = crm_element_value(request, F_STONITH_CLIENTNAME); if(value == NULL) { client->name = crm_itoa(crm_ipcs_client_pid(c)); } else { client->name = strdup(value); } } CRM_CHECK(client->id != NULL, crm_err("Invalid client: %p/%s", client, client->name); goto cleanup); if(flags & crm_ipc_client_response) { CRM_LOG_ASSERT(client->request_id == 0); /* This means the client has two synchronous events in-flight */ client->request_id = id; /* Reply only to the last one */ } crm_xml_add(request, F_STONITH_CLIENTID, client->id); crm_xml_add(request, F_STONITH_CLIENTNAME, client->name); crm_log_xml_trace(request, "Client[inbound]"); stonith_command(client, id, flags, request, NULL); cleanup: if(client == NULL || client->id == NULL) { crm_log_xml_notice(request, "Invalid client"); } free_xml(request); return 0; } /* Error code means? */ static int32_t st_ipc_closed(qb_ipcs_connection_t *c) { stonith_client_t *client = (stonith_client_t*)qb_ipcs_context_get(c); #if 0 qb_ipcs_stats_get(s1, &srv_stats, QB_FALSE); qb_ipcs_connection_stats_get(c, &stats, QB_FALSE); qb_log(LOG_INFO, "Connection to pid:%d destroyed (active:%d, closed:%d)", stats.client_pid, srv_stats.active_connections, srv_stats.closed_connections); qb_log(LOG_DEBUG, " Requests %"PRIu64"", stats.requests); qb_log(LOG_DEBUG, " Responses %"PRIu64"", stats.responses); qb_log(LOG_DEBUG, " Events %"PRIu64"", stats.events); qb_log(LOG_DEBUG, " Send retries %"PRIu64"", stats.send_retries); qb_log(LOG_DEBUG, " Recv retries %"PRIu64"", stats.recv_retries); qb_log(LOG_DEBUG, " FC state %d", stats.flow_control_state); qb_log(LOG_DEBUG, " FC count %"PRIu64"", stats.flow_control_count); #endif if (client == NULL) { crm_err("No client"); return 0; } crm_trace("Cleaning up after client disconnect: %p/%s/%s", client, crm_str(client->name), client->id); if(client->id != NULL) { g_hash_table_remove(client_list, client->id); } /* 0 means: yes, go ahead and destroy the connection */ return 0; } static void st_ipc_destroy(qb_ipcs_connection_t *c) { stonith_client_t *client = (stonith_client_t*)qb_ipcs_context_get(c); /* Make sure the connection is fully cleaned up */ st_ipc_closed(c); if(client == NULL) { crm_trace("Nothing to destroy"); return; } crm_trace("Destroying %s (%p)", client->name, client); free(client->name); free(client->id); free(client); crm_trace("Done"); return; } static void stonith_peer_callback(xmlNode * msg, void* private_data) { const char *remote = crm_element_value(msg, F_ORIG); crm_log_xml_trace(msg, "Peer[inbound]"); stonith_command(NULL, 0, 0, msg, remote); } #if SUPPORT_HEARTBEAT static void stonith_peer_hb_callback(HA_Message * msg, void* private_data) { xmlNode *xml = convert_ha_message(NULL, msg, __FUNCTION__); stonith_peer_callback(xml, private_data); free_xml(xml); } static void stonith_peer_hb_destroy(gpointer user_data) { if(stonith_shutdown_flag) { crm_info("Heartbeat disconnection complete... exiting"); } else { crm_err("Heartbeat connection lost! Exiting."); } stonith_shutdown(0); } #endif #if SUPPORT_COROSYNC -static gboolean stonith_peer_ais_callback( - AIS_Message *wrapper, char *data, int sender) +static gboolean stonith_peer_ais_callback(int kind, const char *from, const char *data) { xmlNode *xml = NULL; - if(wrapper->header.id == crm_class_cluster) { + if(kind == crm_class_cluster) { xml = string2xml(data); if(xml == NULL) { goto bail; } - crm_xml_add(xml, F_ORIG, wrapper->sender.uname); - crm_xml_add_int(xml, F_SEQ, wrapper->id); + crm_xml_add(xml, F_ORIG, from); + /* crm_xml_add_int(xml, F_SEQ, wrapper->id); */ stonith_peer_callback(xml, NULL); } free_xml(xml); return TRUE; bail: crm_err("Invalid XML: '%.120s'", data); return TRUE; } static void stonith_peer_ais_destroy(gpointer user_data) { crm_err("AIS connection terminated"); stonith_shutdown(0); } #endif void do_local_reply(xmlNode *notify_src, const char *client_id, gboolean sync_reply, gboolean from_peer) { /* send callback to originating child */ stonith_client_t *client_obj = NULL; int local_rc = pcmk_ok; crm_trace("Sending response"); if(client_id != NULL) { client_obj = g_hash_table_lookup(client_list, client_id); } else { crm_trace("No client to sent the response to." " F_STONITH_CLIENTID not set."); } crm_trace("Sending callback to request originator"); if(client_obj == NULL) { local_rc = -1; } else { int rid = 0; if(sync_reply) { CRM_LOG_ASSERT(client_obj->request_id); rid = client_obj->request_id; client_obj->request_id = 0; crm_trace("Sending response %d to %s %s", rid, client_obj->name, from_peer?"(originator of delegated request)":""); } else { crm_trace("Sending an event to %s %s", client_obj->name, from_peer?"(originator of delegated request)":""); } local_rc = crm_ipcs_send(client_obj->channel, rid, notify_src, !sync_reply); } if(local_rc < pcmk_ok && client_obj != NULL) { crm_warn("%sSync reply to %s failed: %s", sync_reply?"":"A-", client_obj?client_obj->name:"", pcmk_strerror(local_rc)); } } long long get_stonith_flag(const char *name) { if(safe_str_eq(name, T_STONITH_NOTIFY_FENCE)) { return 0x01; } else if(safe_str_eq(name, STONITH_OP_DEVICE_ADD)) { return 0x04; } else if(safe_str_eq(name, STONITH_OP_DEVICE_DEL)) { return 0x10; } return 0; } static void stonith_notify_client(gpointer key, gpointer value, gpointer user_data) { xmlNode *update_msg = user_data; stonith_client_t *client = value; const char *type = NULL; CRM_CHECK(client != NULL, return); CRM_CHECK(update_msg != NULL, return); type = crm_element_value(update_msg, F_SUBTYPE); CRM_CHECK(type != NULL, crm_log_xml_err(update_msg, "notify"); return); if(client->channel == NULL) { crm_trace("Skipping client with NULL channel"); return; } else if(client->name == NULL) { crm_trace("Skipping unnammed client / comamnd channel"); return; } if(client->flags & get_stonith_flag(type)) { crm_trace("Sending %s-notification to client %s/%s", type, client->name, client->id); if(crm_ipcs_send(client->channel, 0, update_msg, crm_ipc_server_event|crm_ipc_server_error) <= 0) { crm_warn("%s-Notification of client %s/%s failed", type, client->name, client->id); } } } void do_stonith_notify( int options, const char *type, int result, xmlNode *data, const char *remote) { /* TODO: Standardize the contents of data */ xmlNode *update_msg = create_xml_node(NULL, "notify"); CRM_CHECK(type != NULL, ;); crm_xml_add(update_msg, F_TYPE, T_STONITH_NOTIFY); crm_xml_add(update_msg, F_SUBTYPE, type); crm_xml_add(update_msg, F_STONITH_OPERATION, type); crm_xml_add_int(update_msg, F_STONITH_RC, result); if(data != NULL) { add_message_xml(update_msg, F_STONITH_CALLDATA, data); } crm_trace("Notifying clients"); g_hash_table_foreach(client_list, stonith_notify_client, update_msg); free_xml(update_msg); crm_trace("Notify complete"); } static stonith_key_value_t *parse_device_list(const char *devices) { int lpc = 0; int max = 0; int last = 0; stonith_key_value_t *output = NULL; if(devices == NULL) { return output; } max = strlen(devices); for(lpc = 0; lpc <= max; lpc++) { if(devices[lpc] == ',' || devices[lpc] == 0) { char *line = NULL; line = calloc(1, 2 + lpc - last); snprintf(line, 1 + lpc - last, "%s", devices+last); output = stonith_key_value_add(output, NULL, line); free(line); last = lpc + 1; } } return output; } static void topology_remove_helper(const char *node, int level) { int rc; char *desc = NULL; xmlNode *data = create_xml_node(NULL, F_STONITH_LEVEL); xmlNode *notify_data = create_xml_node(NULL, STONITH_OP_LEVEL_DEL); crm_xml_add(data, "origin", __FUNCTION__); crm_xml_add_int(data, XML_ATTR_ID, level); crm_xml_add(data, F_STONITH_TARGET, node); rc = stonith_level_remove(data, &desc); crm_xml_add(notify_data, F_STONITH_DEVICE, desc); crm_xml_add_int(notify_data, F_STONITH_ACTIVE, g_hash_table_size(topology)); do_stonith_notify(0, STONITH_OP_LEVEL_DEL, rc, notify_data, NULL); free_xml(notify_data); free_xml(data); free(desc); } static void topology_register_helper(const char *node, int level, stonith_key_value_t *device_list) { int rc; char *desc = NULL; xmlNode *notify_data = create_xml_node(NULL, STONITH_OP_LEVEL_ADD); xmlNode *data = create_level_registration_xml(node, level, device_list); rc = stonith_level_register(data, &desc); crm_xml_add(notify_data, F_STONITH_DEVICE, desc); crm_xml_add_int(notify_data, F_STONITH_ACTIVE, g_hash_table_size(topology)); do_stonith_notify(0, STONITH_OP_LEVEL_ADD, rc, notify_data, NULL); free_xml(notify_data); free_xml(data); free(desc); } static void remove_fencing_topology(xmlXPathObjectPtr xpathObj) { int max = 0, lpc = 0; if(xpathObj && xpathObj->nodesetval) { max = xpathObj->nodesetval->nodeNr; } for(lpc = 0; lpc < max; lpc++) { xmlNode *match = getXpathResult(xpathObj, lpc); CRM_CHECK(match != NULL, continue); if(crm_element_value(match, XML_DIFF_MARKER)) { /* Deletion */ int index = 0; const char *target = crm_element_value(match, XML_ATTR_STONITH_TARGET); crm_element_value_int(match, XML_ATTR_STONITH_INDEX, &index); if(target == NULL) { crm_err("Invalid fencing target in element %s", ID(match)); } else if(index <= 0) { crm_err("Invalid level for %s in element %s", target, ID(match)); } else { topology_remove_helper(target, index); } /* } else { Deal with modifications during the 'addition' stage */ } } } static void register_fencing_topology(xmlXPathObjectPtr xpathObj, gboolean force) { int max = 0, lpc = 0; if(xpathObj && xpathObj->nodesetval) { max = xpathObj->nodesetval->nodeNr; } for(lpc = 0; lpc < max; lpc++) { int index = 0; const char *target; const char *dev_list; stonith_key_value_t *devices = NULL; xmlNode *match = getXpathResult(xpathObj, lpc); CRM_CHECK(match != NULL, continue); crm_element_value_int(match, XML_ATTR_STONITH_INDEX, &index); target = crm_element_value(match, XML_ATTR_STONITH_TARGET); dev_list = crm_element_value(match, XML_ATTR_STONITH_DEVICES); devices = parse_device_list(dev_list); crm_trace("Updating %s[%d] (%s) to %s", target, index, ID(match), dev_list); if(target == NULL) { crm_err("Invalid fencing target in element %s", ID(match)); } else if(index <= 0) { crm_err("Invalid level for %s in element %s", target, ID(match)); } else if(force == FALSE && crm_element_value(match, XML_DIFF_MARKER)) { /* Addition */ topology_register_helper(target, index, devices); } else { /* Modification */ /* Remove then re-add */ topology_remove_helper(target, index); topology_register_helper(target, index, devices); } stonith_key_value_freeall(devices, 1, 1); } } /* Fencing */ static void fencing_topology_callback(xmlNode * msg, int call_id, int rc, xmlNode * output, void *user_data) { xmlXPathObjectPtr xpathObj = NULL; const char *xpath = "//" XML_TAG_FENCING_LEVEL; crm_trace("Pushing in stonith topology"); /* Grab everything */ xpathObj = xpath_search(msg, xpath); register_fencing_topology(xpathObj, TRUE); if(xpathObj) { xmlXPathFreeObject(xpathObj); } } static void update_fencing_topology(const char *event, xmlNode * msg) { const char *xpath; xmlXPathObjectPtr xpathObj = NULL; /* Process deletions (only) */ xpath = "//" F_CIB_UPDATE_RESULT "//" XML_TAG_DIFF_REMOVED "//" XML_TAG_FENCING_LEVEL; xpathObj = xpath_search(msg, xpath); remove_fencing_topology(xpathObj); if(xpathObj) { xmlXPathFreeObject(xpathObj); } /* Process additions and changes */ xpath = "//" F_CIB_UPDATE_RESULT "//" XML_TAG_DIFF_ADDED "//" XML_TAG_FENCING_LEVEL; xpathObj = xpath_search(msg, xpath); register_fencing_topology(xpathObj, FALSE); if(xpathObj) { xmlXPathFreeObject(xpathObj); } } static void stonith_shutdown(int nsig) { stonith_shutdown_flag = TRUE; crm_info("Terminating with %d clients", g_hash_table_size(client_list)); if(mainloop != NULL && g_main_is_running(mainloop)) { g_main_quit(mainloop); } else { stonith_cleanup(); exit(EX_OK); } } cib_t *cib = NULL; static void stonith_cleanup(void) { if(cib) { cib->cmds->signoff(cib); } qb_ipcs_destroy(ipcs); crm_peer_destroy(); g_hash_table_destroy(client_list); free(stonith_our_uname); #if HAVE_LIBXML2 crm_xml_cleanup(); #endif } /* *INDENT-OFF* */ static struct crm_option long_options[] = { {"stand-alone", 0, 0, 's'}, {"stand-alone-w-cpg", 0, 0, 'c'}, {"verbose", 0, 0, 'V'}, {"version", 0, 0, '$'}, {"help", 0, 0, '?'}, {0, 0, 0, 0} }; /* *INDENT-ON* */ static void setup_cib(void) { static void *cib_library = NULL; static cib_t *(*cib_new_fn)(void) = NULL; static const char *(*cib_err_fn)(int) = NULL; int rc, retries = 0; if(cib_library == NULL) { cib_library = dlopen(CIB_LIBRARY, RTLD_LAZY); } if(cib_library && cib_new_fn == NULL) { cib_new_fn = dlsym(cib_library, "cib_new"); } if(cib_library && cib_err_fn == NULL) { cib_err_fn = dlsym(cib_library, "pcmk_strerror"); } if(cib_new_fn != NULL) { cib = (*cib_new_fn)(); } if(cib == NULL) { crm_err("No connection to the CIB"); return; } do { sleep(retries); rc = cib->cmds->signon(cib, CRM_SYSTEM_CRMD, cib_command); } while(rc == -ENOTCONN && ++retries < 5); if (rc != pcmk_ok) { crm_err("Could not connect to the CIB service: %s", (*cib_err_fn)(rc)); } else if (pcmk_ok != cib->cmds->add_notify_callback( cib, T_CIB_DIFF_NOTIFY, update_fencing_topology)) { crm_err("Could not set CIB notification callback"); } else { rc = cib->cmds->query(cib, NULL, NULL, cib_scope_local); add_cib_op_callback(cib, rc, FALSE, NULL, fencing_topology_callback); crm_notice("Watching for stonith topology changes"); } } struct qb_ipcs_service_handlers ipc_callbacks = { .connection_accept = st_ipc_accept, .connection_created = st_ipc_created, .msg_process = st_ipc_dispatch, .connection_closed = st_ipc_closed, .connection_destroyed = st_ipc_destroy }; int main(int argc, char ** argv) { int flag; int rc = 0; int lpc = 0; int argerr = 0; int option_index = 0; + crm_cluster_t cluster; const char *actions[] = { "reboot", "off", "list", "monitor", "status" }; crm_log_init("stonith-ng", LOG_INFO, TRUE, FALSE, argc, argv, FALSE); crm_set_options(NULL, "mode [options]", long_options, "Provides a summary of cluster's current state." "\n\nOutputs varying levels of detail in a number of different formats.\n"); while (1) { flag = crm_get_option(argc, argv, &option_index); if (flag == -1) { break; } switch(flag) { case 'V': crm_bump_log_level(argc, argv); break; case 's': stand_alone = TRUE; break; case 'c': stand_alone = FALSE; no_cib_connect = TRUE; break; case '$': case '?': crm_help(flag, EX_OK); break; default: ++argerr; break; } } if(argc - optind == 1 && safe_str_eq("metadata", argv[optind])) { printf("\n"); printf("\n"); printf(" 1.0\n"); printf(" This is a fake resource that details the instance attributes handled by stonithd.\n"); printf(" Options available for all stonith resources\n"); printf(" \n"); printf(" \n"); printf(" How long to wait for the STONITH action to complete.\n"); printf(" Overrides the stonith-timeout cluster property\n"); printf(" \n"); printf(" \n"); printf(" \n"); printf(" The priority of the stonith resource. The lower the number, the higher the priority.\n"); printf(" \n"); printf(" \n"); printf(" \n", STONITH_ATTR_HOSTARG); printf(" Advanced use only: An alternate parameter to supply instead of 'port'\n"); printf(" Some devices do not support the standard 'port' parameter or may provide additional ones.\n" "Use this to specify an alternate, device-specific, parameter that should indicate the machine to be fenced.\n" "A value of 'none' can be used to tell the cluster not to supply any additional parameters.\n" " \n"); printf(" \n"); printf(" \n"); printf(" \n", STONITH_ATTR_HOSTMAP); printf(" A mapping of host names to ports numbers for devices that do not support host names.\n"); printf(" Eg. node1:1;node2:2,3 would tell the cluster to use port 1 for node1 and ports 2 and 3 for node2\n"); printf(" \n"); printf(" \n"); printf(" \n", STONITH_ATTR_HOSTLIST); printf(" A list of machines controlled by this device (Optional unless %s=static-list).\n", STONITH_ATTR_HOSTCHECK); printf(" \n"); printf(" \n"); printf(" \n", STONITH_ATTR_HOSTCHECK); printf(" How to determin which machines are controlled by the device.\n"); printf(" Allowed values: dynamic-list (query the device), static-list (check the %s attribute), none (assume every device can fence every machine)\n", STONITH_ATTR_HOSTLIST); printf(" \n"); printf(" \n"); for(lpc = 0; lpc < DIMOF(actions); lpc++) { printf(" \n", actions[lpc]); printf(" Advanced use only: An alternate command to run instead of '%s'\n", actions[lpc]); printf(" Some devices do not support the standard commands or may provide additional ones.\n" "Use this to specify an alternate, device-specific, command that implements the '%s' action.\n", actions[lpc]); printf(" \n", actions[lpc]); printf(" \n"); } printf(" \n"); printf("\n"); return 0; } if (optind != argc) { ++argerr; } if (argerr) { crm_help('?', EX_USAGE); } mainloop_add_signal(SIGTERM, stonith_shutdown); crm_peer_init(); client_list = g_hash_table_new(crm_str_hash, g_str_equal); if(stand_alone == FALSE) { - void *dispatch = NULL; - void *destroy = NULL; - #if SUPPORT_HEARTBEAT - dispatch = stonith_peer_hb_callback; - destroy = stonith_peer_hb_destroy; + cluster.hb_dispatch = stonith_peer_hb_callback; + cluster.destroy = stonith_peer_hb_destroy; #endif if(is_openais_cluster()) { #if SUPPORT_COROSYNC - destroy = stonith_peer_ais_destroy; - dispatch = stonith_peer_ais_callback; + cluster.destroy = stonith_peer_ais_destroy; + cluster.cs_dispatch = stonith_peer_ais_callback; #endif } - if(crm_cluster_connect(&stonith_our_uname, NULL, dispatch, destroy, -#if SUPPORT_HEARTBEAT - &hb_conn -#else - NULL -#endif - ) == FALSE) { + if(crm_cluster_connect(&cluster) == FALSE) { crm_crit("Cannot sign in to the cluster... terminating"); exit(100); } if (no_cib_connect == FALSE) { setup_cib(); } } else { stonith_our_uname = strdup("localhost"); } device_list = g_hash_table_new_full( crm_str_hash, g_str_equal, NULL, free_device); topology = g_hash_table_new_full( crm_str_hash, g_str_equal, NULL, free_topology_entry); ipcs = mainloop_add_ipc_server("stonith-ng", QB_IPC_NATIVE, &ipc_callbacks); #if SUPPORT_STONITH_CONFIG if (((stand_alone == TRUE)) && !(standalone_cfg_read_file(STONITH_NG_CONF_FILE))) { standalone_cfg_commit(); } #endif if(ipcs != NULL) { /* Create the mainloop and run it... */ mainloop = g_main_new(FALSE); crm_info("Starting %s mainloop", crm_system_name); g_main_run(mainloop); } else { crm_err("Couldnt start all communication channels, exiting."); } stonith_cleanup(); #if SUPPORT_HEARTBEAT - if(hb_conn) { - hb_conn->llc_ops->delete(hb_conn); + if(cluster.hb_conn) { + cluster.hb_conn->llc_ops->delete(cluster.hb_conn); } #endif crm_info("Done"); qb_log_fini(); return rc; } diff --git a/include/crm/cluster.h b/include/crm/cluster.h index 830333bf37..81fc3d0e11 100644 --- a/include/crm/cluster.h +++ b/include/crm/cluster.h @@ -1,219 +1,228 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #ifndef CRM_COMMON_CLUSTER__H # define CRM_COMMON_CLUSTER__H # include # include # if SUPPORT_HEARTBEAT # include # include # endif extern gboolean crm_have_quorum; extern GHashTable *crm_peer_cache; extern GHashTable *crm_peer_id_cache; extern unsigned long long crm_peer_seq; # ifndef CRM_SERVICE # define CRM_SERVICE PCMK_SERVICE_ID # endif /* *INDENT-OFF* */ #define CRM_NODE_LOST "lost" #define CRM_NODE_MEMBER "member" #define CRM_NODE_ACTIVE CRM_NODE_MEMBER #define CRM_NODE_EVICTED "evicted" enum crm_proc_flag { crm_proc_none = 0x00000001, /* These values are sent over the network by the legacy plugin * Therefor changing any of these values is going to break compatability * * So don't */ /* 3 messaging types */ crm_proc_heartbeat = 0x01000000, crm_proc_plugin = 0x00000002, crm_proc_cpg = 0x04000000, crm_proc_lrmd = 0x00000010, crm_proc_cib = 0x00000100, crm_proc_crmd = 0x00000200, crm_proc_attrd = 0x00001000, crm_proc_stonithd = 0x00002000, crm_proc_stonith_ng= 0x00100000, crm_proc_pe = 0x00010000, crm_proc_te = 0x00020000, crm_proc_mgmtd = 0x00040000, }; /* *INDENT-ON* */ typedef struct crm_peer_node_s { uint32_t id; /* Only used by corosync derivatives */ uint64_t born; /* Only used by heartbeat and the legacy plugin */ uint64_t last_seen; int32_t votes; /* Only used by the legacy plugin */ uint32_t processes; char *uname; char *uuid; char *state; char *expected; char *addr; /* Only used by the legacy plugin */ char *version;/* Unused */ } crm_node_t; static inline const char * peer2text(enum crm_proc_flag proc) { const char *text = "unknown"; if( proc == (crm_proc_cpg|crm_proc_crmd) ) { return "peer"; } switch (proc) { case crm_proc_none: text = "none"; break; case crm_proc_plugin: text = "ais"; break; case crm_proc_heartbeat: text = "heartbeat"; break; case crm_proc_cib: text = "cib"; break; case crm_proc_crmd: text = "crmd"; break; case crm_proc_pe: text = "pengine"; break; case crm_proc_te: text = "tengine"; break; case crm_proc_lrmd: text = "lrmd"; break; case crm_proc_attrd: text = "attrd"; break; case crm_proc_stonithd: text = "stonithd"; break; case crm_proc_stonith_ng: text = "stonith-ng"; break; case crm_proc_mgmtd: text = "mgmtd"; break; case crm_proc_cpg: text = "corosync-cpg"; break; } return text; } void crm_peer_init(void); void crm_peer_destroy(void); char *get_corosync_uuid(uint32_t id, const char *uname); const char *get_node_uuid(uint32_t id, const char *uname); int get_corosync_id(int id, const char *uuid); -gboolean crm_cluster_connect(char **our_uname, char **our_uuid, void *dispatch, - void *destroy, -# if SUPPORT_HEARTBEAT - ll_cluster_t ** hb_conn -# else - void **unused -# endif - ); +typedef struct crm_cluster_s +{ + char *uuid; + char *uname; + uint32_t nodeid; + +#if SUPPORT_HEARTBEAT + ll_cluster_t *hb_conn; + void (*hb_dispatch)(HA_Message *msg, void *private); +#endif + + gboolean (*cs_dispatch) (int kind, const char *from, const char *data); + void (*destroy) (gpointer); + +} crm_cluster_t; + +gboolean crm_cluster_connect(crm_cluster_t *cluster); enum crm_ais_msg_types; gboolean send_cluster_message(const char *node, enum crm_ais_msg_types service, xmlNode * data, gboolean ordered); void destroy_crm_node(gpointer/* crm_node_t* */ data); crm_node_t *crm_get_peer(unsigned int id, const char *uname); guint crm_active_peers(void); gboolean crm_is_peer_active(const crm_node_t * node); guint reap_crm_member(uint32_t id); int crm_terminate_member(int nodeid, const char *uname, void* unused); int crm_terminate_member_no_mainloop(int nodeid, const char *uname, int *connection); gboolean crm_get_cluster_name(char **cname); # if SUPPORT_HEARTBEAT gboolean crm_is_heartbeat_peer_active(const crm_node_t * node); # endif # if SUPPORT_COROSYNC extern int ais_fd_sync; gboolean crm_is_corosync_peer_active(const crm_node_t * node); gboolean send_ais_text(int class, const char *data, gboolean local, const char *node, enum crm_ais_msg_types dest); gboolean get_ais_nodeid(uint32_t * id, char **uname); # endif void empty_uuid_cache(void); const char *get_uuid(const char *uname); const char *get_uname(const char *uuid); void set_uuid(xmlNode * node, const char *attr, const char *uname); void unget_uuid(const char *uname); enum crm_status_type { crm_status_uname, crm_status_nstate, crm_status_processes, }; enum crm_ais_msg_types text2msg_type(const char *text); void crm_set_status_callback(void (*dispatch) (enum crm_status_type, crm_node_t *, const void *)); /* *INDENT-OFF* */ enum cluster_type_e { pcmk_cluster_unknown = 0x0001, pcmk_cluster_invalid = 0x0002, pcmk_cluster_heartbeat = 0x0004, pcmk_cluster_classic_ais = 0x0010, pcmk_cluster_corosync = 0x0020, pcmk_cluster_cman = 0x0040, }; /* *INDENT-ON* */ enum cluster_type_e get_cluster_type(void); const char *name_for_cluster_type(enum cluster_type_e type); gboolean is_corosync_cluster(void); gboolean is_cman_cluster(void); gboolean is_openais_cluster(void); gboolean is_classic_ais_cluster(void); gboolean is_heartbeat_cluster(void); #endif diff --git a/include/crm/cluster/internal.h b/include/crm/cluster/internal.h index 0779feada2..0044d97c89 100644 --- a/include/crm/cluster/internal.h +++ b/include/crm/cluster/internal.h @@ -1,335 +1,328 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #ifndef CRM_CLUSTER_INTERNAL__H # define CRM_CLUSTER_INTERNAL__H # include # define AIS_IPC_NAME "ais-crm-ipc" # define AIS_IPC_MESSAGE_SIZE 8192*128 # define CRM_MESSAGE_IPC_ACK 0 typedef struct crm_ais_host_s AIS_Host; typedef struct crm_ais_msg_s AIS_Message; enum crm_ais_msg_class { crm_class_cluster = 0, crm_class_members = 1, crm_class_notify = 2, crm_class_nodeid = 3, crm_class_rmpeer = 4, crm_class_quorum = 5, }; /* order here matters - its used to index into the crm_children array */ enum crm_ais_msg_types { crm_msg_none = 0, crm_msg_ais = 1, crm_msg_lrmd = 2, crm_msg_cib = 3, crm_msg_crmd = 4, crm_msg_attrd = 5, crm_msg_stonithd = 6, crm_msg_te = 7, crm_msg_pe = 8, crm_msg_stonith_ng = 9, }; struct crm_ais_host_s { uint32_t id; uint32_t pid; gboolean local; enum crm_ais_msg_types type; uint32_t size; char uname[MAX_NAME]; } __attribute__ ((packed)); struct crm_ais_msg_s { cs_ipc_header_response_t header __attribute__ ((aligned(8))); uint32_t id; gboolean is_compressed; AIS_Host host; AIS_Host sender; uint32_t size; uint32_t compressed_size; /* 584 bytes */ char data[0]; } __attribute__ ((packed)); struct crm_ais_nodeid_resp_s { cs_ipc_header_response_t header __attribute__ ((aligned(8))); uint32_t id; uint32_t counter; char uname[MAX_NAME]; char cname[MAX_NAME]; } __attribute__ ((packed)); struct crm_ais_quorum_resp_s { cs_ipc_header_response_t header __attribute__ ((aligned(8))); uint64_t id; uint32_t votes; uint32_t expected_votes; uint32_t quorate; } __attribute__ ((packed)); static inline enum crm_proc_flag text2proc(const char *proc) { /* We only care about these two so far */ if(proc && strcmp(proc, "cib") == 0) { return crm_proc_cib; } else if(proc && strcmp(proc, "crmd") == 0) { return crm_proc_crmd; } return crm_proc_none; } static inline const char * ais_dest(const struct crm_ais_host_s *host) { if (host->local) { return "local"; } else if (host->size > 0) { return host->uname; } else { return ""; } } # define ais_data_len(msg) (msg->is_compressed?msg->compressed_size:msg->size) static inline AIS_Message * ais_msg_copy(const AIS_Message * source) { AIS_Message *target = malloc(sizeof(AIS_Message) + ais_data_len(source)); memcpy(target, source, sizeof(AIS_Message)); memcpy(target->data, source->data, ais_data_len(target)); return target; } static inline const char * ais_error2text(int error) { const char *text = "unknown"; # if SUPPORT_COROSYNC switch (error) { case CS_OK: text = "None"; break; case CS_ERR_LIBRARY: text = "Library error"; break; case CS_ERR_VERSION: text = "Version error"; break; case CS_ERR_INIT: text = "Initialization error"; break; case CS_ERR_TIMEOUT: text = "Timeout"; break; case CS_ERR_TRY_AGAIN: text = "Try again"; break; case CS_ERR_INVALID_PARAM: text = "Invalid parameter"; break; case CS_ERR_NO_MEMORY: text = "No memory"; break; case CS_ERR_BAD_HANDLE: text = "Bad handle"; break; case CS_ERR_BUSY: text = "Busy"; break; case CS_ERR_ACCESS: text = "Access error"; break; case CS_ERR_NOT_EXIST: text = "Doesn't exist"; break; case CS_ERR_NAME_TOO_LONG: text = "Name too long"; break; case CS_ERR_EXIST: text = "Exists"; break; case CS_ERR_NO_SPACE: text = "No space"; break; case CS_ERR_INTERRUPT: text = "Interrupt"; break; case CS_ERR_NAME_NOT_FOUND: text = "Name not found"; break; case CS_ERR_NO_RESOURCES: text = "No resources"; break; case CS_ERR_NOT_SUPPORTED: text = "Not supported"; break; case CS_ERR_BAD_OPERATION: text = "Bad operation"; break; case CS_ERR_FAILED_OPERATION: text = "Failed operation"; break; case CS_ERR_MESSAGE_ERROR: text = "Message error"; break; case CS_ERR_QUEUE_FULL: text = "Queue full"; break; case CS_ERR_QUEUE_NOT_AVAILABLE: text = "Queue not available"; break; case CS_ERR_BAD_FLAGS: text = "Bad flags"; break; case CS_ERR_TOO_BIG: text = "To big"; break; case CS_ERR_NO_SECTIONS: text = "No sections"; break; } # endif return text; } static inline const char * msg_type2text(enum crm_ais_msg_types type) { const char *text = "unknown"; switch (type) { case crm_msg_none: text = "unknown"; break; case crm_msg_ais: text = "ais"; break; case crm_msg_cib: text = "cib"; break; case crm_msg_crmd: text = "crmd"; break; case crm_msg_pe: text = "pengine"; break; case crm_msg_te: text = "tengine"; break; case crm_msg_lrmd: text = "lrmd"; break; case crm_msg_attrd: text = "attrd"; break; case crm_msg_stonithd: text = "stonithd"; break; case crm_msg_stonith_ng: text = "stonith-ng"; break; } return text; } enum crm_ais_msg_types text2msg_type(const char *text); char *get_ais_data(const AIS_Message * msg); gboolean check_message_sanity(const AIS_Message * msg, const char *data); # if SUPPORT_HEARTBEAT extern ll_cluster_t *heartbeat_cluster; gboolean send_ha_message(ll_cluster_t * hb_conn, xmlNode * msg, const char *node, gboolean force_ordered); gboolean ha_msg_dispatch(ll_cluster_t * cluster_conn, gpointer user_data); -gboolean register_heartbeat_conn(ll_cluster_t * hb_cluster, char **uuid, char **uname, - void (*hb_message) (HA_Message * msg, void *private_data), - void (*hb_destroy) (gpointer user_data)); +gboolean register_heartbeat_conn(crm_cluster_t *cluster); xmlNode *convert_ha_message(xmlNode * parent, HA_Message *msg, const char *field); gboolean ccm_have_quorum(oc_ed_t event); const char *ccm_event_name(oc_ed_t event); crm_node_t *crm_update_ccm_node(const oc_ev_membership_t * oc, int offset, const char *state, uint64_t seq); gboolean heartbeat_initialize_nodelist(void *cluster, gboolean force_member, xmlNode *xml_parent); # endif # if SUPPORT_COROSYNC gboolean corosync_initialize_nodelist(void *cluster, gboolean force_member, xmlNode *xml_parent); gboolean send_ais_message(xmlNode * msg, gboolean local, const char *node, enum crm_ais_msg_types dest); enum cluster_type_e find_corosync_variant(void); -void terminate_ais_connection(void); -gboolean init_ais_connection(gboolean(*dispatch) (AIS_Message *, char *, int), - void (*destroy) (gpointer), char **our_uuid, char **our_uname, - int *nodeid); -gboolean init_ais_connection_once(gboolean(*dispatch) (AIS_Message *, char *, int), - void (*destroy) (gpointer), char **our_uuid, - char **our_uname, int *nodeid); - +void terminate_cs_connection(void); +gboolean init_cs_connection(crm_cluster_t *cluster); +gboolean init_cs_connection_once(crm_cluster_t *cluster); # endif enum crm_quorum_source { crm_quorum_cman, crm_quorum_corosync, crm_quorum_pacemaker, }; enum crm_quorum_source get_quorum_source(void); void crm_update_peer_proc(const char *source, crm_node_t *peer, uint32_t flag, const char *status); crm_node_t *crm_update_peer( const char *source, unsigned int id, uint64_t born, uint64_t seen, int32_t votes, uint32_t children, const char *uuid, const char *uname, const char *addr, const char *state); void crm_update_peer_expected(const char *source, crm_node_t *node, const char *expected); void crm_update_peer_state(const char *source, crm_node_t *node, const char *state, int membership); gboolean init_cman_connection( gboolean(*dispatch) (unsigned long long, gboolean), void (*destroy) (gpointer)); gboolean init_quorum_connection( gboolean(*dispatch) (unsigned long long, gboolean), void (*destroy) (gpointer)); void set_node_uuid(const char *uname, const char *uuid); #endif diff --git a/lib/cluster/cluster.c b/lib/cluster/cluster.c index 4ace407f92..be322ac865 100644 --- a/lib/cluster/cluster.c +++ b/lib/cluster/cluster.c @@ -1,531 +1,519 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include CRM_TRACE_INIT_DATA(cluster); #if SUPPORT_HEARTBEAT void *hb_library = NULL; #endif static GHashTable *crm_uuid_cache = NULL; static GHashTable *crm_uname_cache = NULL; static char * get_heartbeat_uuid(uint32_t unused, const char *uname) { char *uuid_calc = NULL; #if SUPPORT_HEARTBEAT cl_uuid_t uuid_raw; const char *unknown = "00000000-0000-0000-0000-000000000000"; if (heartbeat_cluster == NULL) { crm_warn("No connection to heartbeat, using uuid=uname"); return NULL; } if (heartbeat_cluster->llc_ops->get_uuid_by_name(heartbeat_cluster, uname, &uuid_raw) == HA_FAIL) { crm_err("get_uuid_by_name() call failed for host %s", uname); free(uuid_calc); return NULL; } uuid_calc = calloc(1, 50); cl_uuid_unparse(&uuid_raw, uuid_calc); if (safe_str_eq(uuid_calc, unknown)) { crm_warn("Could not calculate UUID for %s", uname); free(uuid_calc); return NULL; } #endif return uuid_calc; } static gboolean uname_is_uuid(void) { static const char *uuid_pref = NULL; if (uuid_pref == NULL) { uuid_pref = getenv("PCMK_uname_is_uuid"); } if (uuid_pref == NULL) { /* true is legacy mode */ uuid_pref = "false"; } return crm_is_true(uuid_pref); } int get_corosync_id(int id, const char *uuid) { if (id == 0 && !uname_is_uuid() && is_corosync_cluster()) { id = crm_atoi(uuid, "0"); } return id; } char * get_corosync_uuid(uint32_t id, const char *uname) { if (!uname_is_uuid() && is_corosync_cluster()) { if (id <= 0) { /* Try the membership cache... */ crm_node_t *node = g_hash_table_lookup(crm_peer_cache, uname); if (node != NULL) { id = node->id; } } if (id > 0) { return crm_itoa(id); } else { crm_warn("Node %s is not yet known by corosync", uname); } } else if (uname != NULL) { return strdup(uname); } return NULL; } void set_node_uuid(const char *uname, const char *uuid) { CRM_CHECK(uuid != NULL, return); CRM_CHECK(uname != NULL, return); if (crm_uuid_cache == NULL) { crm_uuid_cache = g_hash_table_new_full(crm_str_hash, g_str_equal, g_hash_destroy_str, g_hash_destroy_str); } g_hash_table_insert(crm_uuid_cache, strdup(uname), strdup(uuid)); } const char * get_node_uuid(uint32_t id, const char *uname) { char *uuid = NULL; enum cluster_type_e type = get_cluster_type(); if (crm_uuid_cache == NULL) { crm_uuid_cache = g_hash_table_new_full(crm_str_hash, g_str_equal, g_hash_destroy_str, g_hash_destroy_str); } /* avoid blocking heartbeat calls where possible */ if (uname) { uuid = g_hash_table_lookup(crm_uuid_cache, uname); } if (uuid != NULL) { return uuid; } switch (type) { case pcmk_cluster_corosync: uuid = get_corosync_uuid(id, uname); break; case pcmk_cluster_cman: case pcmk_cluster_classic_ais: if (uname) { uuid = strdup(uname); } break; case pcmk_cluster_heartbeat: uuid = get_heartbeat_uuid(id, uname); break; case pcmk_cluster_unknown: case pcmk_cluster_invalid: crm_err("Unsupported cluster type"); break; } if (uuid == NULL) { return NULL; } if (uname) { g_hash_table_insert(crm_uuid_cache, strdup(uname), uuid); return g_hash_table_lookup(crm_uuid_cache, uname); } /* Memory leak! */ CRM_LOG_ASSERT(uuid != NULL); return uuid; } gboolean -crm_cluster_connect(char **our_uname, char **our_uuid, void *dispatch, void *destroy, -#if SUPPORT_HEARTBEAT - ll_cluster_t ** hb_conn -#else - void **hb_conn -#endif - ) +crm_cluster_connect(crm_cluster_t *cluster) { enum cluster_type_e type = get_cluster_type(); crm_notice("Connecting to cluster infrastructure: %s", name_for_cluster_type(type)); - if (hb_conn != NULL) { - *hb_conn = NULL; - } #if SUPPORT_COROSYNC if (is_openais_cluster()) { crm_peer_init(); - return init_ais_connection(dispatch, destroy, our_uuid, our_uname, NULL); + return init_cs_connection(cluster); } #endif #if SUPPORT_HEARTBEAT if (is_heartbeat_cluster()) { int rv; - CRM_ASSERT(hb_conn != NULL); + CRM_ASSERT(cluster->hb_conn != NULL); /* coverity[var_deref_op] False positive */ - if (*hb_conn == NULL) { + if (cluster->hb_conn == NULL) { /* No object passed in, create a new one. */ ll_cluster_t *(*new_cluster) (const char *llctype) = find_library_function(&hb_library, HEARTBEAT_LIBRARY, "ll_cluster_new", 1); - *hb_conn = (*new_cluster) ("heartbeat"); + cluster->hb_conn = (*new_cluster) ("heartbeat"); /* dlclose(handle); */ } else { /* Object passed in. Disconnect first, then reconnect below. */ - ll_cluster_t *conn = *hb_conn; - - conn->llc_ops->signoff(conn, FALSE); + cluster->hb_conn->llc_ops->signoff(cluster->hb_conn, FALSE); } /* make sure we are disconnected first with the old object, if any. */ - if (heartbeat_cluster && heartbeat_cluster != *hb_conn) { + if (heartbeat_cluster && heartbeat_cluster != cluster->hb_conn) { heartbeat_cluster->llc_ops->signoff(heartbeat_cluster, FALSE); } - CRM_ASSERT(*hb_conn != NULL); - heartbeat_cluster = *hb_conn; - - rv = register_heartbeat_conn(heartbeat_cluster, our_uuid, our_uname, dispatch, destroy); + CRM_ASSERT(cluster->hb_conn != NULL); + heartbeat_cluster = cluster->hb_conn; + rv = register_heartbeat_conn(cluster); if (rv) { /* we'll benefit from a bigger queue length on heartbeat side. * Otherwise, if peers send messages faster than we can consume * them right now, heartbeat messaging layer will kick us out once * it's (small) default queue fills up :( * If we fail to adjust the sendq length, that's not yet fatal, though. */ - if (HA_OK != (*hb_conn)->llc_ops->set_sendq_len(*hb_conn, 1024)) { - crm_warn("Cannot set sendq length: %s", (*hb_conn)->llc_ops->errmsg(*hb_conn)); + if (HA_OK != heartbeat_cluster->llc_ops->set_sendq_len(heartbeat_cluster, 1024)) { + crm_warn("Cannot set sendq length: %s", heartbeat_cluster->llc_ops->errmsg(heartbeat_cluster)); } } return rv; } #endif crm_info("Unsupported cluster stack: %s", getenv("HA_cluster_type")); return FALSE; } gboolean send_cluster_message(const char *node, enum crm_ais_msg_types service, xmlNode * data, gboolean ordered) { #if SUPPORT_COROSYNC if (is_openais_cluster()) { return send_ais_message(data, FALSE, node, service); } #endif #if SUPPORT_HEARTBEAT if (is_heartbeat_cluster()) { return send_ha_message(heartbeat_cluster, data, node, ordered); } #endif return FALSE; } void empty_uuid_cache(void) { if (crm_uuid_cache != NULL) { g_hash_table_destroy(crm_uuid_cache); crm_uuid_cache = NULL; } } void unget_uuid(const char *uname) { if (crm_uuid_cache == NULL) { return; } g_hash_table_remove(crm_uuid_cache, uname); } const char * get_uuid(const char *uname) { return get_node_uuid(0, uname); } const char * get_uname(const char *uuid) { const char *uname = NULL; if (crm_uname_cache == NULL) { crm_uname_cache = g_hash_table_new_full(crm_str_hash, g_str_equal, g_hash_destroy_str, g_hash_destroy_str); } CRM_CHECK(uuid != NULL, return NULL); /* avoid blocking calls where possible */ uname = g_hash_table_lookup(crm_uname_cache, uuid); if (uname != NULL) { crm_trace("%s = %s (cached)", uuid, uname); return uname; } #if SUPPORT_COROSYNC if (is_openais_cluster()) { if (!uname_is_uuid() && is_corosync_cluster()) { uint32_t id = crm_int_helper(uuid, NULL); crm_node_t *node = g_hash_table_lookup(crm_peer_id_cache, GUINT_TO_POINTER(id)); uname = node ? node->uname : NULL; } else { uname = uuid; } if (uname) { crm_trace("Storing %s = %s", uuid, uname); g_hash_table_insert(crm_uname_cache, strdup(uuid), strdup(uname)); } } #endif #if SUPPORT_HEARTBEAT if (is_heartbeat_cluster()) { if (heartbeat_cluster != NULL && uuid != NULL) { cl_uuid_t uuid_raw; char *hb_uname = NULL; char *uuid_copy = strdup(uuid); cl_uuid_parse(uuid_copy, &uuid_raw); hb_uname = malloc( MAX_NAME); if (heartbeat_cluster->llc_ops->get_name_by_uuid(heartbeat_cluster, &uuid_raw, hb_uname, MAX_NAME) == HA_FAIL) { crm_err("Could not calculate uname for %s", uuid); free(uuid_copy); free(hb_uname); } else { crm_trace("Storing %s = %s", uuid, uname); g_hash_table_insert(crm_uname_cache, uuid_copy, hb_uname); } } } #endif return g_hash_table_lookup(crm_uname_cache, uuid); } void set_uuid(xmlNode * node, const char *attr, const char *uname) { const char *uuid_calc = get_uuid(uname); crm_xml_add(node, attr, uuid_calc); return; } const char * name_for_cluster_type(enum cluster_type_e type) { switch (type) { case pcmk_cluster_classic_ais: return "classic openais (with plugin)"; case pcmk_cluster_cman: return "cman"; case pcmk_cluster_corosync: return "corosync"; case pcmk_cluster_heartbeat: return "heartbeat"; case pcmk_cluster_unknown: return "unknown"; case pcmk_cluster_invalid: return "invalid"; } crm_err("Invalid cluster type: %d", type); return "invalid"; } /* Do not expose these two */ int set_cluster_type(enum cluster_type_e type); static enum cluster_type_e cluster_type = pcmk_cluster_unknown; int set_cluster_type(enum cluster_type_e type) { if (cluster_type == pcmk_cluster_unknown) { crm_info("Cluster type set to: %s", name_for_cluster_type(type)); cluster_type = type; return 0; } else if (cluster_type == type) { return 0; } else if (pcmk_cluster_unknown == type) { cluster_type = type; return 0; } crm_err("Cluster type already set to %s, ignoring %s", name_for_cluster_type(cluster_type), name_for_cluster_type(type)); return -1; } enum cluster_type_e get_cluster_type(void) { if (cluster_type == pcmk_cluster_unknown) { const char *cluster = getenv("HA_cluster_type"); cluster_type = pcmk_cluster_invalid; if (cluster) { crm_info("Cluster type is: '%s'", cluster); } else { #if SUPPORT_COROSYNC cluster_type = find_corosync_variant(); if (cluster_type == pcmk_cluster_unknown) { cluster = "heartbeat"; crm_info("Assuming a 'heartbeat' based cluster"); } else { cluster = name_for_cluster_type(cluster_type); crm_info("Detected an active '%s' cluster", cluster); } #else cluster = "heartbeat"; #endif } if (safe_str_eq(cluster, "heartbeat")) { #if SUPPORT_HEARTBEAT cluster_type = pcmk_cluster_heartbeat; #else cluster_type = pcmk_cluster_invalid; #endif } else if (safe_str_eq(cluster, "openais") || safe_str_eq(cluster, "classic openais (with plugin)")) { #if SUPPORT_COROSYNC cluster_type = pcmk_cluster_classic_ais; #else cluster_type = pcmk_cluster_invalid; #endif } else if (safe_str_eq(cluster, "corosync")) { #if SUPPORT_COROSYNC cluster_type = pcmk_cluster_corosync; #else cluster_type = pcmk_cluster_invalid; #endif } else if (safe_str_eq(cluster, "cman")) { #if SUPPORT_CMAN cluster_type = pcmk_cluster_cman; #else cluster_type = pcmk_cluster_invalid; #endif } else { cluster_type = pcmk_cluster_invalid; } if (cluster_type == pcmk_cluster_invalid) { crm_notice ("This installation of Pacemaker does not support the '%s' cluster infrastructure. Terminating.", cluster); exit(100); } } return cluster_type; } gboolean is_cman_cluster(void) { return get_cluster_type() == pcmk_cluster_cman; } gboolean is_corosync_cluster(void) { return get_cluster_type() == pcmk_cluster_corosync; } gboolean is_classic_ais_cluster(void) { return get_cluster_type() == pcmk_cluster_classic_ais; } gboolean is_openais_cluster(void) { enum cluster_type_e type = get_cluster_type(); if (type == pcmk_cluster_classic_ais) { return TRUE; } else if (type == pcmk_cluster_corosync) { return TRUE; } else if (type == pcmk_cluster_cman) { return TRUE; } return FALSE; } gboolean is_heartbeat_cluster(void) { return get_cluster_type() == pcmk_cluster_heartbeat; } diff --git a/lib/cluster/corosync.c b/lib/cluster/corosync.c index 10e43d3073..7a29d56a38 100644 --- a/lib/cluster/corosync.c +++ b/lib/cluster/corosync.c @@ -1,1096 +1,1086 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include cpg_handle_t pcmk_cpg_handle = 0; struct cpg_name pcmk_cpg_group = { .length = 0, .value[0] = 0, }; quorum_handle_t pcmk_quorum_handle = 0; gboolean(*quorum_app_callback) (unsigned long long seq, gboolean quorate) = NULL; static char *pcmk_uname = NULL; static int pcmk_uname_len = 0; static uint32_t pcmk_nodeid = 0; #define cs_repeat(counter, max, code) do { \ code; \ if(rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { \ counter++; \ crm_debug("Retrying operation after %ds", counter); \ sleep(counter); \ } else { \ break; \ } \ } while(counter < max) #ifndef INTERFACE_MAX # define INTERFACE_MAX 2 /* from the private coroapi.h header */ #endif static gboolean corosync_name_is_valid(const char *key, const char *name) { int octet; if(name == NULL) { crm_trace("%s is empty", key); return FALSE; } else if(sscanf(name, "%d.%d.%d.%d", &octet, &octet, &octet, &octet) == 4) { crm_trace("%s contains an ipv4 address, ignoring: %s", key, name); return FALSE; } else if(strstr(name, ":") != NULL) { crm_trace("%s contains an ipv4 address, ignoring: %s", key, name); return FALSE; } crm_trace("%s is valid", key); return TRUE; } /* * CFG functionality stolen from node_name() in corosync-quorumtool.c * This resolves the first address assigned to a node and returns the name or IP address. */ static char *corosync_node_name(cmap_handle_t cmap_handle, uint32_t nodeid) { int lpc = 0; int rc = CS_OK; int retries = 0; char *name = NULL; cmap_handle_t local_handle = 0; corosync_cfg_handle_t cfg_handle = 0; static corosync_cfg_callbacks_t cfg_callbacks = {}; if(cmap_handle == 0 && local_handle == 0) { retries = 0; crm_trace("Initializing CMAP connection"); do { rc = cmap_initialize(&local_handle); if(rc != CS_OK) { retries++; crm_debug("API connection setup failed: %s. Retrying in %ds", cs_strerror(rc), retries); sleep(retries); } } while(retries < 5 && rc != CS_OK); if (rc != CS_OK) { crm_warn("Could not connect to Cluster Configuration Database API, error %d", cs_strerror(rc)); local_handle = 0; } } if(cmap_handle == 0) { cmap_handle = local_handle; } while(name == NULL && cmap_handle != 0) { uint32_t id = 0; char *key = NULL; key = g_strdup_printf("nodelist.node.%d.nodeid", lpc); rc = cmap_get_uint32(cmap_handle, key, &id); crm_trace("Checking %u vs %u from %s", nodeid, id, key); g_free(key); if(rc != CS_OK) { break; } if(nodeid == id) { crm_trace("Searching for node name for %u in nodelist.node.%d %s", nodeid, lpc, name); if(name == NULL) { key = g_strdup_printf("nodelist.node.%d.ring0_addr", lpc); rc = cmap_get_string(cmap_handle, key, &name); crm_trace("%s = %s", key, name); if(corosync_name_is_valid(key, name) == FALSE) { free(name); name = NULL; } g_free(key); } if(name == NULL) { key = g_strdup_printf("nodelist.node.%d.name", lpc); rc = cmap_get_string(cmap_handle, key, &name); crm_trace("%s = %s %d", key, name, rc); if(corosync_name_is_valid(key, name) == FALSE) { free(name); name = NULL; } g_free(key); } break; } lpc++; } if(name == NULL) { retries = 0; crm_trace("Initializing CFG connection"); do { rc = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks); if(rc != CS_OK) { retries++; crm_debug("API connection setup failed: %s. Retrying in %ds", cs_strerror(rc), retries); sleep(retries); } } while(retries < 5 && rc != CS_OK); if (rc != CS_OK) { crm_warn("Could not connect to the Corosync CFG API, error %d", cs_strerror(rc)); cfg_handle = 0; } } if(name == NULL && cfg_handle != 0) { int numaddrs; char buf[INET6_ADDRSTRLEN]; socklen_t addrlen; struct sockaddr_storage *ss; corosync_cfg_node_address_t addrs[INTERFACE_MAX]; rc = corosync_cfg_get_node_addrs(cfg_handle, nodeid, INTERFACE_MAX, &numaddrs, addrs); if (rc == CS_OK) { ss = (struct sockaddr_storage *)addrs[0].address; if (ss->ss_family == AF_INET6) { addrlen = sizeof(struct sockaddr_in6); } else { addrlen = sizeof(struct sockaddr_in); } if (getnameinfo((struct sockaddr *)addrs[0].address, addrlen, buf, sizeof(buf), NULL, 0, 0) == 0) { crm_notice("Inferred node name '%s' for nodeid %u from DNS", buf, nodeid); if(corosync_name_is_valid("DNS", buf)) { name = strdup(buf); } } } else { crm_debug("Unable to get node address for nodeid %u: %s", nodeid, cs_strerror(rc)); } cmap_finalize(cfg_handle); } if(local_handle) { cmap_finalize(local_handle); } if(name == NULL) { crm_err("Unable to get node name for nodeid %u", nodeid); } return name; } enum crm_ais_msg_types text2msg_type(const char *text) { int type = crm_msg_none; CRM_CHECK(text != NULL, return type); if (safe_str_eq(text, "ais")) { type = crm_msg_ais; } else if (safe_str_eq(text, "crm_plugin")) { type = crm_msg_ais; } else if (safe_str_eq(text, CRM_SYSTEM_CIB)) { type = crm_msg_cib; } else if (safe_str_eq(text, CRM_SYSTEM_CRMD)) { type = crm_msg_crmd; } else if (safe_str_eq(text, CRM_SYSTEM_DC)) { type = crm_msg_crmd; } else if (safe_str_eq(text, CRM_SYSTEM_TENGINE)) { type = crm_msg_te; } else if (safe_str_eq(text, CRM_SYSTEM_PENGINE)) { type = crm_msg_pe; } else if (safe_str_eq(text, CRM_SYSTEM_LRMD)) { type = crm_msg_lrmd; } else if (safe_str_eq(text, CRM_SYSTEM_STONITHD)) { type = crm_msg_stonithd; } else if (safe_str_eq(text, "stonith-ng")) { type = crm_msg_stonith_ng; } else if (safe_str_eq(text, "attrd")) { type = crm_msg_attrd; } else { /* This will normally be a transient client rather than * a cluster daemon. Set the type to the pid of the client */ int scan_rc = sscanf(text, "%d", &type); if (scan_rc != 1) { /* Ensure its sane */ type = crm_msg_none; } } return type; } static char *ais_cluster_name = NULL; gboolean crm_get_cluster_name(char **cname) { CRM_CHECK(cname != NULL, return FALSE); if (ais_cluster_name) { *cname = strdup(ais_cluster_name); return TRUE; } return FALSE; } gboolean send_ais_text(int class, const char *data, gboolean local, const char *node, enum crm_ais_msg_types dest) { static int msg_id = 0; static int local_pid = 0; int retries = 0; int rc = CS_OK; int buf_len = sizeof(cs_ipc_header_response_t); char *buf = NULL; struct iovec iov; const char *transport = "pcmk"; AIS_Message *ais_msg = NULL; enum crm_ais_msg_types sender = text2msg_type(crm_system_name); /* There are only 6 handlers registered to crm_lib_service in plugin.c */ CRM_CHECK(class < 6, crm_err("Invalid message class: %d", class); return FALSE); if (data == NULL) { data = ""; } if (local_pid == 0) { local_pid = getpid(); } if (sender == crm_msg_none) { sender = local_pid; } ais_msg = calloc(1, sizeof(AIS_Message)); ais_msg->id = msg_id++; ais_msg->header.id = class; ais_msg->header.error = CS_OK; ais_msg->host.type = dest; ais_msg->host.local = local; if (node) { ais_msg->host.size = strlen(node); memset(ais_msg->host.uname, 0, MAX_NAME); memcpy(ais_msg->host.uname, node, ais_msg->host.size); ais_msg->host.id = 0; } else { ais_msg->host.size = 0; memset(ais_msg->host.uname, 0, MAX_NAME); ais_msg->host.id = 0; } ais_msg->sender.id = 0; ais_msg->sender.type = sender; ais_msg->sender.pid = local_pid; ais_msg->sender.size = pcmk_uname_len; memset(ais_msg->sender.uname, 0, MAX_NAME); memcpy(ais_msg->sender.uname, pcmk_uname, ais_msg->sender.size); ais_msg->size = 1 + strlen(data); if (ais_msg->size < CRM_BZ2_THRESHOLD) { failback: ais_msg = realloc(ais_msg, sizeof(AIS_Message) + ais_msg->size); memcpy(ais_msg->data, data, ais_msg->size); } else { char *compressed = NULL; char *uncompressed = strdup(data); unsigned int len = (ais_msg->size * 1.1) + 600; /* recomended size */ crm_trace("Compressing message payload"); /* coverity[returned_null] Ignore */ compressed = malloc( len); rc = BZ2_bzBuffToBuffCompress(compressed, &len, uncompressed, ais_msg->size, CRM_BZ2_BLOCKS, 0, CRM_BZ2_WORK); free(uncompressed); if (rc != BZ_OK) { crm_err("Compression failed: %d", rc); free(compressed); goto failback; } ais_msg = realloc(ais_msg, sizeof(AIS_Message) + len + 1); memcpy(ais_msg->data, compressed, len); ais_msg->data[len] = 0; free(compressed); ais_msg->is_compressed = TRUE; ais_msg->compressed_size = len; crm_trace("Compression details: %d -> %d", ais_msg->size, ais_data_len(ais_msg)); } ais_msg->header.size = sizeof(AIS_Message) + ais_data_len(ais_msg); crm_trace("Sending%s message %d to %s.%s (data=%d, total=%d)", ais_msg->is_compressed ? " compressed" : "", ais_msg->id, ais_dest(&(ais_msg->host)), msg_type2text(dest), ais_data_len(ais_msg), ais_msg->header.size); iov.iov_base = ais_msg; iov.iov_len = ais_msg->header.size; buf = realloc(buf, buf_len); do { if (rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { retries++; crm_info("Peer overloaded or membership in flux:" " Re-sending message (Attempt %d of 20)", retries); sleep(retries); /* Proportional back off */ } errno = 0; transport = "cpg"; CRM_CHECK(dest != crm_msg_ais, rc = CS_ERR_MESSAGE_ERROR; goto bail); rc = cpg_mcast_joined(pcmk_cpg_handle, CPG_TYPE_AGREED, &iov, 1); if (rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { cpg_flow_control_state_t fc_state = CPG_FLOW_CONTROL_DISABLED; int rc2 = cpg_flow_control_state_get(pcmk_cpg_handle, &fc_state); if (rc2 == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) { crm_warn("Connection overloaded, cannot send messages"); goto bail; } else if (rc2 != CS_OK) { crm_warn("Could not determin the connection state: %s (%d)", ais_error2text(rc2), rc2); goto bail; } } } while ((rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) && retries < 20); bail: if (rc != CS_OK) { crm_perror(LOG_ERR, "Sending message %d via %s: FAILED (rc=%d): %s", ais_msg->id, transport, rc, ais_error2text(rc)); } else { crm_trace("Message %d: sent", ais_msg->id); } free(buf); free(ais_msg); return (rc == CS_OK); } gboolean send_ais_message(xmlNode * msg, gboolean local, const char *node, enum crm_ais_msg_types dest) { gboolean rc = TRUE; char *data = dump_xml_unformatted(msg); rc = send_ais_text(crm_class_cluster, data, local, node, dest); free(data); return rc; } void -terminate_ais_connection(void) +terminate_cs_connection(void) { crm_notice("Disconnecting from Corosync"); if(pcmk_cpg_handle) { crm_trace("Disconnecting CPG"); cpg_leave(pcmk_cpg_handle, &pcmk_cpg_group); cpg_finalize(pcmk_cpg_handle); pcmk_cpg_handle = 0; } else { crm_info("No CPG connection"); } if(pcmk_quorum_handle) { crm_trace("Disconnecting quorum"); quorum_finalize(pcmk_quorum_handle); pcmk_quorum_handle = 0; } else { crm_info("No Quorum connection"); } } int ais_membership_timer = 0; gboolean ais_membership_force = FALSE; static gboolean -ais_dispatch_message(AIS_Message * msg, gboolean(*dispatch) (AIS_Message *, char *, int)) +ais_dispatch_message(AIS_Message * msg, gboolean(*dispatch) (int kind, const char *from, const char *data)) { char *data = NULL; char *uncompressed = NULL; xmlNode *xml = NULL; CRM_ASSERT(msg != NULL); crm_trace("Got new%s message (size=%d, %d, %d)", msg->is_compressed ? " compressed" : "", ais_data_len(msg), msg->size, msg->compressed_size); data = msg->data; if (msg->is_compressed && msg->size > 0) { int rc = BZ_OK; unsigned int new_size = msg->size + 1; if (check_message_sanity(msg, NULL) == FALSE) { goto badmsg; } crm_trace("Decompressing message data"); uncompressed = calloc(1, new_size); rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, data, msg->compressed_size, 1, 0); if (rc != BZ_OK) { crm_err("Decompression failed: %d", rc); goto badmsg; } CRM_ASSERT(rc == BZ_OK); CRM_ASSERT(new_size == msg->size); data = uncompressed; } else if (check_message_sanity(msg, data) == FALSE) { goto badmsg; } else if (safe_str_eq("identify", data)) { int pid = getpid(); char *pid_s = crm_itoa(pid); send_ais_text(crm_class_cluster, pid_s, TRUE, NULL, crm_msg_ais); free(pid_s); goto done; } if (msg->header.id != crm_class_members) { /* Is this even needed anymore? */ crm_get_peer(msg->sender.id, msg->sender.uname); } if (msg->header.id == crm_class_rmpeer) { uint32_t id = crm_int_helper(data, NULL); crm_info("Removing peer %s/%u", data, id); reap_crm_member(id); goto done; } crm_trace("Payload: %s", data); if (dispatch != NULL) { - dispatch(msg, data, 0); + dispatch(msg->header.id, msg->sender.uname, data); } done: free(uncompressed); free_xml(xml); return TRUE; badmsg: crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):" " min=%d, total=%d, size=%d, bz2_size=%d", msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type), ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), msg->sender.pid, (int)sizeof(AIS_Message), msg->header.size, msg->size, msg->compressed_size); goto done; } -gboolean(*pcmk_cpg_dispatch_fn) (AIS_Message *, char *, int) = NULL; +gboolean(*pcmk_cpg_dispatch_fn) (int kind, const char *from, const char *data) = NULL; static int pcmk_cpg_dispatch(gpointer user_data) { int rc = 0; pcmk_cpg_dispatch_fn = user_data; rc = cpg_dispatch(pcmk_cpg_handle, CS_DISPATCH_ALL); if (rc != CS_OK) { crm_err("Connection to the CPG API failed: %d", rc); return -1; } return 0; } static void pcmk_cpg_deliver(cpg_handle_t handle, const struct cpg_name *groupName, uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len) { AIS_Message *ais_msg = (AIS_Message *) msg; if (ais_msg->sender.id > 0 && ais_msg->sender.id != nodeid) { crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, ais_msg->sender.id); return; } else if (ais_msg->host.size != 0 && safe_str_neq(ais_msg->host.uname, pcmk_uname)) { /* Not for us */ return; } ais_msg->sender.id = nodeid; if (ais_msg->sender.size == 0) { crm_node_t *peer = crm_get_peer(nodeid, NULL); if (peer == NULL) { crm_err("Peer with nodeid=%u is unknown", nodeid); } else if (peer->uname == NULL) { crm_err("No uname for peer with nodeid=%u", nodeid); } else { crm_notice("Fixing uname for peer with nodeid=%u", nodeid); ais_msg->sender.size = strlen(peer->uname); memset(ais_msg->sender.uname, 0, MAX_NAME); memcpy(ais_msg->sender.uname, peer->uname, ais_msg->sender.size); } } ais_dispatch_message(ais_msg, pcmk_cpg_dispatch_fn); } static void pcmk_cpg_membership(cpg_handle_t handle, const struct cpg_name *groupName, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries) { int i; gboolean found = FALSE; static int counter = 0; for (i = 0; i < left_list_entries; i++) { crm_node_t *peer = crm_get_peer(left_list[i].nodeid, NULL); crm_info("Left[%d.%d] %s.%d ", counter, i, groupName->value, left_list[i].nodeid); crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, OFFLINESTATUS); } for (i = 0; i < joined_list_entries; i++) { crm_info("Joined[%d.%d] %s.%d ", counter, i, groupName->value, joined_list[i].nodeid); } for (i = 0; i < member_list_entries; i++) { crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL); crm_info("Member[%d.%d] %s.%d ", counter, i, groupName->value, member_list[i].nodeid); crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS); if(pcmk_nodeid == member_list[i].nodeid) { found = TRUE; } } if(!found) { crm_err("We're not part of CPG group %s anymore!", groupName->value); /* Possibly re-call cpg_join() */ } counter++; } cpg_callbacks_t cpg_callbacks = { .cpg_deliver_fn = pcmk_cpg_deliver, .cpg_confchg_fn = pcmk_cpg_membership, }; static gboolean -init_cpg_connection(gboolean(*dispatch) (AIS_Message *, char *, int), void (*destroy) (gpointer), +init_cpg_connection(gboolean(*dispatch) (int kind, const char *from, const char *data), void (*destroy) (gpointer), uint32_t * nodeid) { int rc = -1; int fd = 0; int retries = 0; crm_node_t *peer = NULL; struct mainloop_fd_callbacks cpg_fd_callbacks = { .dispatch = pcmk_cpg_dispatch, .destroy = destroy, }; strcpy(pcmk_cpg_group.value, crm_system_name); pcmk_cpg_group.length = strlen(crm_system_name) + 1; cs_repeat(retries, 30, rc = cpg_initialize(&pcmk_cpg_handle, &cpg_callbacks)); if (rc != CS_OK) { crm_err("Could not connect to the Cluster Process Group API: %d\n", rc); goto bail; } retries = 0; cs_repeat(retries, 30, rc = cpg_local_get(pcmk_cpg_handle, (unsigned int *)nodeid)); if (rc != CS_OK) { crm_err("Could not get local node id from the CPG API"); goto bail; } retries = 0; cs_repeat(retries, 30, rc = cpg_join(pcmk_cpg_handle, &pcmk_cpg_group)); if (rc != CS_OK) { crm_err("Could not join the CPG group '%s': %d", crm_system_name, rc); goto bail; } rc = cpg_fd_get(pcmk_cpg_handle, &fd); if (rc != CS_OK) { crm_err("Could not obtain the CPG API connection: %d\n", rc); goto bail; } mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, dispatch, &cpg_fd_callbacks); bail: if (rc != CS_OK) { cpg_finalize(pcmk_cpg_handle); return FALSE; } peer = crm_get_peer(pcmk_nodeid, pcmk_uname); crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS); return TRUE; } static int pcmk_quorum_dispatch(gpointer user_data) { int rc = 0; rc = quorum_dispatch(pcmk_quorum_handle, CS_DISPATCH_ALL); if (rc < 0) { crm_err("Connection to the Quorum API failed: %d", rc); return -1; } return 0; } static void corosync_mark_unseen_peer_dead(gpointer key, gpointer value, gpointer user_data) { int *seq = user_data; crm_node_t *node = value; if (node->last_seen != *seq && node->state && crm_str_eq(CRM_NODE_LOST, node->state, TRUE) == FALSE) { crm_notice("Node %d/%s was not seen in the previous transition", node->id, node->uname); crm_update_peer_state(__FUNCTION__, node, CRM_NODE_LOST, 0); } } static void corosync_mark_node_unseen(gpointer key, gpointer value, gpointer user_data) { crm_node_t *node = value; node->last_seen = 0; } static void pcmk_quorum_notification(quorum_handle_t handle, uint32_t quorate, uint64_t ring_id, uint32_t view_list_entries, uint32_t * view_list) { int i; static gboolean init_phase = TRUE; if (quorate != crm_have_quorum) { crm_notice("Membership " U64T ": quorum %s (%lu)", ring_id, quorate ? "acquired" : "lost", (long unsigned int)view_list_entries); crm_have_quorum = quorate; } else { crm_info("Membership " U64T ": quorum %s (%lu)", ring_id, quorate ? "retained" : "still lost", (long unsigned int)view_list_entries); } if(view_list_entries == 0 && init_phase) { crm_info("Corosync membership is still forming, ignoring"); return; } init_phase = FALSE; g_hash_table_foreach(crm_peer_cache, corosync_mark_node_unseen, NULL); for (i = 0; i < view_list_entries; i++) { uint32_t id = view_list[i]; char *name = NULL; crm_node_t *node = NULL; crm_debug("Member[%d] %d ", i, id); node = crm_get_peer(id, NULL); if(node->uname == NULL) { crm_info("Obtaining name for new node %u", id); name = corosync_node_name(0, id); node = crm_get_peer(id, name); } crm_update_peer_state(__FUNCTION__, node, CRM_NODE_MEMBER, ring_id); free(name); } crm_trace("Reaping unseen nodes..."); g_hash_table_foreach(crm_peer_cache, corosync_mark_unseen_peer_dead, &ring_id); if (quorum_app_callback) { quorum_app_callback(ring_id, quorate); } } quorum_callbacks_t quorum_callbacks = { .quorum_notify_fn = pcmk_quorum_notification, }; gboolean init_quorum_connection(gboolean(*dispatch) (unsigned long long, gboolean), void (*destroy) (gpointer)) { int rc = -1; int fd = 0; int quorate = 0; uint32_t quorum_type = 0; struct mainloop_fd_callbacks quorum_fd_callbacks; quorum_fd_callbacks.dispatch = pcmk_quorum_dispatch; quorum_fd_callbacks.destroy = destroy; crm_debug("Configuring Pacemaker to obtain quorum from Corosync"); rc = quorum_initialize(&pcmk_quorum_handle, &quorum_callbacks, &quorum_type); if (rc != CS_OK) { crm_err("Could not connect to the Quorum API: %d\n", rc); goto bail; } else if (quorum_type != QUORUM_SET) { crm_err("Corosync quorum is not configured\n"); goto bail; } rc = quorum_getquorate(pcmk_quorum_handle, &quorate); if (rc != CS_OK) { crm_err("Could not obtain the current Quorum API state: %d\n", rc); goto bail; } crm_notice("Quorum %s", quorate ? "acquired" : "lost"); quorum_app_callback = dispatch; crm_have_quorum = quorate; rc = quorum_trackstart(pcmk_quorum_handle, CS_TRACK_CHANGES | CS_TRACK_CURRENT); if (rc != CS_OK) { crm_err("Could not setup Quorum API notifications: %d\n", rc); goto bail; } rc = quorum_fd_get(pcmk_quorum_handle, &fd); if (rc != CS_OK) { crm_err("Could not obtain the Quorum API connection: %d\n", rc); goto bail; } mainloop_add_fd("quorum", G_PRIORITY_HIGH, fd, dispatch, &quorum_fd_callbacks); corosync_initialize_nodelist(NULL, FALSE, NULL); bail: if (rc != CS_OK) { quorum_finalize(pcmk_quorum_handle); return FALSE; } return TRUE; } gboolean -init_ais_connection(gboolean(*dispatch) (AIS_Message *, char *, int), void (*destroy) (gpointer), - char **our_uuid, char **our_uname, int *nodeid) +init_cs_connection(crm_cluster_t *cluster) { int retries = 0; while (retries < 5) { - int rc = init_ais_connection_once(dispatch, destroy, our_uuid, our_uname, nodeid); + int rc = init_cs_connection_once(cluster); retries++; switch (rc) { case CS_OK: return TRUE; break; case CS_ERR_TRY_AGAIN: case CS_ERR_QUEUE_FULL: sleep(retries); break; default: return FALSE; } } crm_err("Could not connect to corosync after %d retries", retries); return FALSE; } gboolean -init_ais_connection_once(gboolean(*dispatch) (AIS_Message *, char *, int), - void (*destroy) (gpointer), char **our_uuid, char **our_uname, int *nodeid) +init_cs_connection_once(crm_cluster_t *cluster) { struct utsname res; enum cluster_type_e stack = get_cluster_type(); crm_peer_init(); /* Here we just initialize comms */ if(stack != pcmk_cluster_corosync) { crm_err("Invalid cluster type: %s (%d)", name_for_cluster_type(stack), stack); return FALSE; } - if (init_cpg_connection(dispatch, destroy, &pcmk_nodeid) == FALSE) { + if (init_cpg_connection(cluster->cs_dispatch, cluster->destroy, &pcmk_nodeid) == FALSE) { return FALSE; } else if (uname(&res) < 0) { crm_perror(LOG_ERR, "Could not determin the current host"); exit(100); } else { pcmk_uname = strdup(res.nodename); } crm_info("Connection to '%s': established", name_for_cluster_type(stack)); CRM_ASSERT(pcmk_uname != NULL); pcmk_uname_len = strlen(pcmk_uname); if (pcmk_nodeid != 0) { /* Ensure the local node always exists */ crm_get_peer(pcmk_nodeid, pcmk_uname); } - if (our_uuid != NULL) { - *our_uuid = get_corosync_uuid(pcmk_nodeid, pcmk_uname); - } - - if (our_uname != NULL) { - *our_uname = strdup(pcmk_uname); - } - - if (nodeid != NULL) { - *nodeid = pcmk_nodeid; - } + cluster->uuid = get_corosync_uuid(pcmk_nodeid, pcmk_uname); + cluster->uname = strdup(pcmk_uname); + cluster->nodeid = pcmk_nodeid; return TRUE; } gboolean check_message_sanity(const AIS_Message * msg, const char *data) { gboolean sane = TRUE; int dest = msg->host.type; int tmp_size = msg->header.size - sizeof(AIS_Message); if (sane && msg->header.size == 0) { crm_warn("Message with no size"); sane = FALSE; } if (sane && msg->header.error != CS_OK) { crm_warn("Message header contains an error: %d", msg->header.error); sane = FALSE; } if (sane && ais_data_len(msg) != tmp_size) { crm_warn("Message payload size is incorrect: expected %d, got %d", ais_data_len(msg), tmp_size); sane = TRUE; } if (sane && ais_data_len(msg) == 0) { crm_warn("Message with no payload"); sane = FALSE; } if (sane && data && msg->is_compressed == FALSE) { int str_size = strlen(data) + 1; if (ais_data_len(msg) != str_size) { int lpc = 0; crm_warn("Message payload is corrupted: expected %d bytes, got %d", ais_data_len(msg), str_size); sane = FALSE; for (lpc = (str_size - 10); lpc < msg->size; lpc++) { if (lpc < 0) { lpc = 0; } crm_debug("bad_data[%d]: %d / '%c'", lpc, data[lpc], data[lpc]); } } } if (sane == FALSE) { crm_err("Invalid message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)", msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed, ais_data_len(msg), msg->header.size); } else { crm_trace ("Verfied message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)", msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed, ais_data_len(msg), msg->header.size); } return sane; } enum cluster_type_e find_corosync_variant(void) { int rc = CS_OK; cmap_handle_t handle; /* There can be only one (possibility if confdb isn't around) */ rc = cmap_initialize(&handle); if (rc != CS_OK) { crm_info("Failed to initialize the cmap API. Error %d", rc); return pcmk_cluster_unknown; } cmap_finalize(handle); return pcmk_cluster_corosync; } gboolean crm_is_corosync_peer_active(const crm_node_t * node) { if (node == NULL) { crm_trace("NULL"); return FALSE; } else if(safe_str_neq(node->state, CRM_NODE_MEMBER)) { crm_trace("%s: state=%s", node->uname, node->state); return FALSE; } else if((node->processes & crm_proc_cpg) == 0) { crm_trace("%s: processes=%.16x", node->uname, node->processes); return FALSE; } return TRUE; } gboolean corosync_initialize_nodelist(void *cluster, gboolean force_member, xmlNode *xml_parent) { int lpc = 0; int rc = CS_OK; int retries = 0; gboolean any = FALSE; cmap_handle_t cmap_handle; do { rc = cmap_initialize(&cmap_handle); if(rc != CS_OK) { retries++; crm_debug("API connection setup failed: %s. Retrying in %ds", cs_strerror(rc), retries); sleep(retries); } } while(retries < 5 && rc != CS_OK); if (rc != CS_OK) { crm_warn("Could not connect to Cluster Configuration Database API, error %d", rc); return FALSE; } crm_trace("Initializing corosync nodelist"); for(lpc = 0; ; lpc++) { uint32_t nodeid = 0; char *name = NULL; char *key = NULL; key = g_strdup_printf("nodelist.node.%d.nodeid", lpc); rc = cmap_get_uint32(cmap_handle, key, &nodeid); g_free(key); if(rc != CS_OK) { break; } name = corosync_node_name(cmap_handle, nodeid); if(nodeid > 0 || name != NULL) { crm_trace("Initializing node[%d] %u = %s", lpc, nodeid, name); crm_get_peer(nodeid, name); } if(nodeid > 0 && name != NULL) { any = TRUE; if(xml_parent) { xmlNode *node = create_xml_node(xml_parent, XML_CIB_TAG_NODE); crm_xml_add_int(node, XML_ATTR_ID, nodeid); crm_xml_add(node, XML_ATTR_UNAME, name); if(force_member) { crm_xml_add(node, XML_ATTR_TYPE, CRM_NODE_MEMBER); } } } free(name); } cmap_finalize(cmap_handle); return any; } diff --git a/lib/cluster/heartbeat.c b/lib/cluster/heartbeat.c index 631d793a6c..87f994e679 100644 --- a/lib/cluster/heartbeat.c +++ b/lib/cluster/heartbeat.c @@ -1,613 +1,607 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #if HAVE_BZLIB_H # include #endif #if SUPPORT_HEARTBEAT ll_cluster_t *heartbeat_cluster = NULL; static void convert_ha_field(xmlNode *parent, void *msg_v, int lpc) { int type = 0; const char *name = NULL; const char *value = NULL; xmlNode *xml = NULL; HA_Message *msg = msg_v; int rc = BZ_OK; size_t orig_len = 0; unsigned int used = 0; char *uncompressed = NULL; char *compressed = NULL; int size = orig_len * 10; CRM_CHECK(parent != NULL, return); CRM_CHECK(msg != NULL, return); name = msg->names[lpc]; type = cl_get_type(msg, name); switch(type) { case FT_STRUCT: convert_ha_message(parent, msg->values[lpc], name); break; case FT_COMPRESS: case FT_UNCOMPRESS: convert_ha_message(parent, cl_get_struct(msg, name), name); break; case FT_STRING: value = msg->values[lpc]; CRM_CHECK(value != NULL, return); crm_trace("Converting %s/%d/%s", name, type, value[0] == '<' ? "xml":"field"); if( value[0] != '<' ) { crm_xml_add(parent, name, value); break; } /* unpack xml string */ xml = string2xml(value); if(xml == NULL) { crm_err("Conversion of field '%s' failed", name); return; } add_node_nocopy(parent, NULL, xml); break; case FT_BINARY: value = cl_get_binary(msg, name, &orig_len); size = orig_len * 10 + 1; /* +1 because an exact 10x compression factor happens occasionally */ if(orig_len < 3 || value[0] != 'B' || value[1] != 'Z' || value[2] != 'h') { if(strstr(name, "uuid") == NULL) { crm_err("Skipping non-bzip binary field: %s", name); } return; } compressed = calloc(1, orig_len); memcpy(compressed, value, orig_len); crm_trace("Trying to decompress %d bytes", (int)orig_len); retry: uncompressed = realloc(uncompressed, size); memset(uncompressed, 0, size); used = size - 1; /* always leave room for a trailing '\0' * BZ2_bzBuffToBuffDecompress wont say anything if * the uncompressed data is exactly 'size' bytes */ rc = BZ2_bzBuffToBuffDecompress( uncompressed, &used, compressed, orig_len, 1, 0); if(rc == BZ_OUTBUFF_FULL) { size = size * 2; /* dont try to allocate more memory than we have */ if(size > 0) { goto retry; } } if(rc != BZ_OK) { crm_err("Decompression of %s (%d bytes) into %d failed: %d", name, (int)orig_len, size, rc); } else if(used >= size) { CRM_ASSERT(used < size); } else { CRM_LOG_ASSERT(uncompressed[used] == 0); uncompressed[used] = 0; xml = string2xml(uncompressed); } if(xml != NULL) { add_node_copy(parent, xml); free_xml(xml); } free(uncompressed); free(compressed); break; } } xmlNode * convert_ha_message(xmlNode *parent, HA_Message *msg, const char *field) { int lpc = 0; xmlNode *child = NULL; const char *tag = NULL; CRM_CHECK(msg != NULL, crm_err("Empty message for %s", field); return parent); tag = cl_get_string(msg, F_XML_TAGNAME); if(tag == NULL) { tag = field; } else if(parent && safe_str_neq(field, tag)) { /* For compatability with 0.6.x */ crm_debug("Creating intermediate parent %s between %s and %s", field, crm_element_name(parent), tag); parent = create_xml_node(parent, field); } if(parent == NULL) { parent = create_xml_node(NULL, tag); child = parent; } else { child = create_xml_node(parent, tag); } for (lpc = 0; lpc < msg->nfields; lpc++) { convert_ha_field(child, msg, lpc); } return parent; } static void add_ha_nocopy(HA_Message *parent, HA_Message *child, const char *field) { int next = parent->nfields; if (parent->nfields >= parent->nalloc && ha_msg_expand(parent) != HA_OK ) { crm_err("Parent expansion failed"); return; } parent->names[next] = strdup(field); parent->nlens[next] = strlen(field); parent->values[next] = child; parent->vlens[next] = sizeof(HA_Message); parent->types[next] = FT_UNCOMPRESS; parent->nfields++; } static HA_Message* convert_xml_message_struct(HA_Message *parent, xmlNode *src_node, const char *field) { xmlNode *child = NULL; xmlNode *__crm_xml_iter = src_node->children; xmlAttrPtr prop_iter = src_node->properties; const char *name = NULL; const char *value = NULL; HA_Message *result = ha_msg_new(3); ha_msg_add(result, F_XML_TAGNAME, (const char *)src_node->name); while(prop_iter != NULL) { name = (const char *)prop_iter->name; value = (const char *)xmlGetProp(src_node, prop_iter->name); prop_iter = prop_iter->next; ha_msg_add(result, name, value); } while(__crm_xml_iter != NULL) { child = __crm_xml_iter; __crm_xml_iter = __crm_xml_iter->next; convert_xml_message_struct(result, child, NULL); } if(parent == NULL) { return result; } if(field) { HA_Message *holder = ha_msg_new(3); CRM_ASSERT(holder != NULL); ha_msg_add(holder, F_XML_TAGNAME, field); add_ha_nocopy(holder, result, (const char*)src_node->name); ha_msg_addstruct_compress(parent, field, holder); ha_msg_del(holder); } else { add_ha_nocopy(parent, result, (const char*)src_node->name); } return result; } static void convert_xml_child(HA_Message *msg, xmlNode *xml) { int orig = 0; int rc = BZ_OK; unsigned int len = 0; char *buffer = NULL; char *compressed = NULL; const char *name = NULL; name = (const char *)xml->name; buffer = dump_xml_unformatted(xml); orig = strlen(buffer); if(orig < CRM_BZ2_THRESHOLD) { ha_msg_add(msg, name, buffer); goto done; } len = (orig * 1.1) + 600; /* recomended size */ compressed = malloc( len); rc = BZ2_bzBuffToBuffCompress(compressed, &len, buffer, orig, CRM_BZ2_BLOCKS, 0, CRM_BZ2_WORK); if(rc != BZ_OK) { crm_err("Compression failed: %d", rc); free(compressed); convert_xml_message_struct(msg, xml, name); goto done; } free(buffer); buffer = compressed; crm_trace("Compression details: %d -> %d", orig, len); ha_msg_addbin(msg, name, buffer, len); done: free(buffer); # if 0 { unsigned int used = orig; char *uncompressed = NULL; crm_debug("Trying to decompress %d bytes", len); uncompressed = calloc(1, orig); rc = BZ2_bzBuffToBuffDecompress( uncompressed, &used, compressed, len, 1, 0); CRM_CHECK(rc == BZ_OK, ;); CRM_CHECK(used == orig, ;); crm_debug("rc=%d, used=%d", rc, used); if(rc != BZ_OK) { exit(100); } crm_debug("Original %s, decompressed %s", buffer, uncompressed); free(uncompressed); } # endif } static HA_Message* convert_xml_message(xmlNode *xml) { xmlNode *child = NULL; xmlAttrPtr pIter = NULL; HA_Message *result = NULL; result = ha_msg_new(3); ha_msg_add(result, F_XML_TAGNAME, (const char *)xml->name); for(pIter = xml->properties; pIter != NULL; pIter = pIter->next) { const char *p_name = (const char *)pIter->name; if(pIter->children) { const char *p_value = (const char *)pIter->children->content; ha_msg_add(result, p_name, p_value); } } for(child = __xml_first_child(xml); child != NULL; child = __xml_next(child)) { convert_xml_child(result, child); } return result; } gboolean crm_is_heartbeat_peer_active(const crm_node_t * node) { enum crm_proc_flag proc = text2proc(crm_system_name); if (node == NULL) { crm_trace("NULL"); return FALSE; } else if(safe_str_neq(node->state, CRM_NODE_MEMBER)) { crm_trace("%s: state=%s", node->uname, node->state); return FALSE; } else if((node->processes & crm_proc_heartbeat) == 0) { crm_trace("%s: processes=%.16x", node->uname, node->processes); return FALSE; } else if(proc == crm_proc_none) { return TRUE; } else if((node->processes & proc) == 0) { crm_trace("%s: proc %.16x not in %.16x", node->uname, proc, node->processes); return FALSE; } return TRUE; } crm_node_t * crm_update_ccm_node(const oc_ev_membership_t * oc, int offset, const char *state, uint64_t seq) { crm_node_t *peer = NULL; const char *uuid = NULL; CRM_CHECK(oc->m_array[offset].node_uname != NULL, return NULL); uuid = get_uuid(oc->m_array[offset].node_uname); peer = crm_update_peer(__FUNCTION__, oc->m_array[offset].node_id, oc->m_array[offset].node_born_on, seq, -1, 0, uuid, oc->m_array[offset].node_uname, NULL, state); if (safe_str_eq(CRM_NODE_ACTIVE, state)) { /* Heartbeat doesn't send status notifications for nodes that were already part of the cluster */ crm_update_peer_proc(__FUNCTION__, peer, crm_proc_heartbeat, ONLINESTATUS); /* Nor does it send status notifications for processes that were already active */ crm_update_peer_proc(__FUNCTION__, peer, crm_proc_crmd, ONLINESTATUS); } return peer; } gboolean send_ha_message(ll_cluster_t * hb_conn, xmlNode * xml, const char *node, gboolean force_ordered) { gboolean all_is_good = TRUE; HA_Message *msg = convert_xml_message(xml); if (msg == NULL) { crm_err("cant send NULL message"); all_is_good = FALSE; } else if (hb_conn == NULL) { crm_err("No heartbeat connection specified"); all_is_good = FALSE; } else if (hb_conn->llc_ops->chan_is_connected(hb_conn) == FALSE) { crm_err("Not connected to Heartbeat"); all_is_good = FALSE; } else if (node != NULL) { if (hb_conn->llc_ops->send_ordered_nodemsg(hb_conn, msg, node) != HA_OK) { all_is_good = FALSE; crm_err("Send failed"); } } else if (force_ordered) { if (hb_conn->llc_ops->send_ordered_clustermsg(hb_conn, msg) != HA_OK) { all_is_good = FALSE; crm_err("Broadcast Send failed"); } } else { if (hb_conn->llc_ops->sendclustermsg(hb_conn, msg) != HA_OK) { all_is_good = FALSE; crm_err("Broadcast Send failed"); } } if (all_is_good == FALSE && hb_conn != NULL) { IPC_Channel *ipc = NULL; IPC_Queue *send_q = NULL; if (hb_conn->llc_ops->chan_is_connected(hb_conn) != HA_OK) { ipc = hb_conn->llc_ops->ipcchan(hb_conn); } if (ipc != NULL) { /* ipc->ops->resume_io(ipc); */ send_q = ipc->send_queue; } if (send_q != NULL) { CRM_CHECK(send_q->current_qlen < send_q->max_qlen,;); } } if (all_is_good) { crm_log_xml_trace(xml, "outbound"); } else { crm_log_xml_warn(xml, "outbound"); } if(msg != NULL) { ha_msg_del(msg); } return all_is_good; } gboolean ha_msg_dispatch(ll_cluster_t * cluster_conn, gpointer user_data) { IPC_Channel *channel = NULL; crm_trace("Invoked"); if (cluster_conn != NULL) { channel = cluster_conn->llc_ops->ipcchan(cluster_conn); } CRM_CHECK(cluster_conn != NULL, return FALSE); CRM_CHECK(channel != NULL, return FALSE); if (channel != NULL && IPC_ISRCONN(channel)) { if (cluster_conn->llc_ops->msgready(cluster_conn) == 0) { crm_trace("no message ready yet"); } /* invoke the callbacks but dont block */ cluster_conn->llc_ops->rcvmsg(cluster_conn, 0); } if (channel == NULL || channel->ch_status != IPC_CONNECT) { crm_info("Lost connection to heartbeat service."); return FALSE; } return TRUE; } gboolean -register_heartbeat_conn(ll_cluster_t * hb_cluster, char **uuid, char **uname, - void (*hb_message) (HA_Message * msg, void *private_data), - void (*hb_destroy) (gpointer user_data)) +register_heartbeat_conn(crm_cluster_t *cluster) { const char *const_uuid = NULL; const char *const_uname = NULL; crm_debug("Signing in with Heartbeat"); - if (hb_cluster->llc_ops->signon(hb_cluster, crm_system_name) != HA_OK) { - crm_err("Cannot sign on with heartbeat: %s", hb_cluster->llc_ops->errmsg(hb_cluster)); + if (cluster->hb_conn->llc_ops->signon(cluster->hb_conn, crm_system_name) != HA_OK) { + crm_err("Cannot sign on with heartbeat: %s", cluster->hb_conn->llc_ops->errmsg(cluster->hb_conn)); return FALSE; } if (HA_OK != - hb_cluster->llc_ops->set_msg_callback(hb_cluster, crm_system_name, hb_message, - hb_cluster)) { + cluster->hb_conn->llc_ops->set_msg_callback( + cluster->hb_conn, crm_system_name, cluster->hb_dispatch, cluster->hb_conn)) { - crm_err("Cannot set msg callback: %s", hb_cluster->llc_ops->errmsg(hb_cluster)); + crm_err("Cannot set msg callback: %s", cluster->hb_conn->llc_ops->errmsg(cluster->hb_conn)); return FALSE; - } - { + + } else { void *handle = NULL; GLLclusterSource *(*g_main_add_cluster) (int priority, ll_cluster_t * api, gboolean can_recurse, gboolean(*dispatch) (ll_cluster_t * source_data, gpointer user_data), gpointer userdata, GDestroyNotify notify) = find_library_function(&handle, HEARTBEAT_LIBRARY, "G_main_add_ll_cluster", 1); - (*g_main_add_cluster) (G_PRIORITY_HIGH, hb_cluster, - FALSE, ha_msg_dispatch, hb_cluster, hb_destroy); + (*g_main_add_cluster) (G_PRIORITY_HIGH, cluster->hb_conn, + FALSE, ha_msg_dispatch, cluster->hb_conn, cluster->destroy); dlclose(handle); } - const_uname = hb_cluster->llc_ops->get_mynodeid(hb_cluster); + const_uname = cluster->hb_conn->llc_ops->get_mynodeid(cluster->hb_conn); CRM_CHECK(const_uname != NULL, return FALSE); const_uuid = get_uuid(const_uname); CRM_CHECK(const_uuid != NULL, return FALSE); crm_info("Hostname: %s", const_uname); crm_info("UUID: %s", const_uuid); - if (uname) { - *uname = strdup(const_uname); - } - if (uuid) { - *uuid = strdup(const_uuid); - } + cluster->uname = strdup(const_uname); + cluster->uuid = strdup(const_uuid); return TRUE; } gboolean ccm_have_quorum(oc_ed_t event) { if (event == OC_EV_MS_NEW_MEMBERSHIP || event == OC_EV_MS_PRIMARY_RESTORED) { return TRUE; } return FALSE; } const char * ccm_event_name(oc_ed_t event) { if (event == OC_EV_MS_NEW_MEMBERSHIP) { return "NEW MEMBERSHIP"; } else if (event == OC_EV_MS_NOT_PRIMARY) { return "NOT PRIMARY"; } else if (event == OC_EV_MS_PRIMARY_RESTORED) { return "PRIMARY RESTORED"; } else if (event == OC_EV_MS_EVICTED) { return "EVICTED"; } else if (event == OC_EV_MS_INVALID) { return "INVALID"; } return "NO QUORUM MEMBERSHIP"; } gboolean heartbeat_initialize_nodelist(void *cluster, gboolean force_member, xmlNode *xml_parent) { const char *ha_node = NULL; ll_cluster_t *conn = cluster; if (conn == NULL) { crm_debug("Not connected"); return FALSE; } /* Async get client status information in the cluster */ crm_info("Requesting the list of configured nodes"); conn->llc_ops->init_nodewalk(conn); do { xmlNode *node = NULL; const char *ha_node_type = NULL; const char *ha_node_uuid = NULL; ha_node = conn->llc_ops->nextnode(conn); if (ha_node == NULL) { continue; } ha_node_type = conn->llc_ops->node_type(conn, ha_node); if (safe_str_neq(NORMALNODE, ha_node_type)) { crm_debug("Node %s: skipping '%s'", ha_node, ha_node_type); continue; } ha_node_uuid = get_uuid(ha_node); if (ha_node_uuid == NULL) { crm_warn("Node %s: no uuid found", ha_node); continue; } crm_debug("Node: %s (uuid: %s)", ha_node, ha_node_uuid); node = create_xml_node(xml_parent, XML_CIB_TAG_NODE); crm_xml_add(node, XML_ATTR_ID, ha_node_uuid); crm_xml_add(node, XML_ATTR_UNAME, ha_node); crm_xml_add(node, XML_ATTR_TYPE, ha_node_type); } while (ha_node != NULL); conn->llc_ops->end_nodewalk(conn); return TRUE; } #endif diff --git a/lib/cluster/legacy.c b/lib/cluster/legacy.c index 88590df24b..9531b33759 100644 --- a/lib/cluster/legacy.c +++ b/lib/cluster/legacy.c @@ -1,1393 +1,1379 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #if SUPPORT_COROSYNC # include # include # include cpg_handle_t pcmk_cpg_handle = 0; struct cpg_name pcmk_cpg_group = { .length = 0, .value[0] = 0, }; #endif #if HAVE_CMAP # include #endif #if SUPPORT_CMAN # include cman_handle_t pcmk_cman_handle = NULL; #endif static char *pcmk_uname = NULL; static int pcmk_uname_len = 0; static uint32_t pcmk_nodeid = 0; int ais_membership_timer = 0; gboolean ais_membership_force = FALSE; int ais_dispatch(gpointer user_data); #define cs_repeat(counter, max, code) do { \ code; \ if(rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { \ counter++; \ crm_debug("Retrying operation after %ds", counter); \ sleep(counter); \ } else { \ break; \ } \ } while(counter < max) enum crm_ais_msg_types text2msg_type(const char *text) { int type = crm_msg_none; CRM_CHECK(text != NULL, return type); if (safe_str_eq(text, "ais")) { type = crm_msg_ais; } else if (safe_str_eq(text, "crm_plugin")) { type = crm_msg_ais; } else if (safe_str_eq(text, CRM_SYSTEM_CIB)) { type = crm_msg_cib; } else if (safe_str_eq(text, CRM_SYSTEM_CRMD)) { type = crm_msg_crmd; } else if (safe_str_eq(text, CRM_SYSTEM_DC)) { type = crm_msg_crmd; } else if (safe_str_eq(text, CRM_SYSTEM_TENGINE)) { type = crm_msg_te; } else if (safe_str_eq(text, CRM_SYSTEM_PENGINE)) { type = crm_msg_pe; } else if (safe_str_eq(text, CRM_SYSTEM_LRMD)) { type = crm_msg_lrmd; } else if (safe_str_eq(text, CRM_SYSTEM_STONITHD)) { type = crm_msg_stonithd; } else if (safe_str_eq(text, "stonith-ng")) { type = crm_msg_stonith_ng; } else if (safe_str_eq(text, "attrd")) { type = crm_msg_attrd; } else { /* This will normally be a transient client rather than * a cluster daemon. Set the type to the pid of the client */ int scan_rc = sscanf(text, "%d", &type); if (scan_rc != 1) { /* Ensure its sane */ type = crm_msg_none; } } return type; } char * get_ais_data(const AIS_Message * msg) { int rc = BZ_OK; char *uncompressed = NULL; unsigned int new_size = msg->size + 1; if (msg->is_compressed == FALSE) { crm_trace("Returning uncompressed message data"); uncompressed = strdup(msg->data); } else { crm_trace("Decompressing message data"); uncompressed = calloc(1, new_size); rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, (char *)msg->data, msg->compressed_size, 1, 0); CRM_ASSERT(rc == BZ_OK); CRM_ASSERT(new_size == msg->size); } return uncompressed; } #if SUPPORT_COROSYNC int ais_fd_sync = -1; int ais_fd_async = -1; /* never send messages via this channel */ void *ais_ipc_ctx = NULL; hdb_handle_t ais_ipc_handle = 0; static char *ais_cluster_name = NULL; gboolean get_ais_nodeid(uint32_t * id, char **uname) { struct iovec iov; int retries = 0; int rc = CS_OK; cs_ipc_header_response_t header; struct crm_ais_nodeid_resp_s answer; header.error = CS_OK; header.id = crm_class_nodeid; header.size = sizeof(cs_ipc_header_response_t); CRM_CHECK(id != NULL, return FALSE); CRM_CHECK(uname != NULL, return FALSE); iov.iov_base = &header; iov.iov_len = header.size; retry: errno = 0; rc = coroipcc_msg_send_reply_receive(ais_ipc_handle, &iov, 1, &answer, sizeof(answer)); if (rc == CS_OK) { CRM_CHECK(answer.header.size == sizeof(struct crm_ais_nodeid_resp_s), crm_err("Odd message: id=%d, size=%d, error=%d", answer.header.id, answer.header.size, answer.header.error)); CRM_CHECK(answer.header.id == crm_class_nodeid, crm_err("Bad response id: %d", answer.header.id)); } if ((rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) && retries < 20) { retries++; crm_info("Peer overloaded: Re-sending message (Attempt %d of 20)", retries); sleep(retries); /* Proportional back off */ goto retry; } if (rc != CS_OK) { crm_err("Sending nodeid request: FAILED (rc=%d): %s", rc, ais_error2text(rc)); return FALSE; } else if (answer.header.error != CS_OK) { crm_err("Bad response from peer: (rc=%d): %s", rc, ais_error2text(rc)); return FALSE; } crm_info("Server details: id=%u uname=%s cname=%s", answer.id, answer.uname, answer.cname); *id = answer.id; *uname = strdup(answer.uname); ais_cluster_name = strdup(answer.cname); return TRUE; } gboolean crm_get_cluster_name(char **cname) { CRM_CHECK(cname != NULL, return FALSE); if (ais_cluster_name) { *cname = strdup(ais_cluster_name); return TRUE; } return FALSE; } gboolean send_ais_text(int class, const char *data, gboolean local, const char *node, enum crm_ais_msg_types dest) { static int msg_id = 0; static int local_pid = 0; enum cluster_type_e cluster_type = get_cluster_type(); int retries = 0; int rc = CS_OK; int buf_len = sizeof(cs_ipc_header_response_t); char *buf = NULL; struct iovec iov; const char *transport = "pcmk"; cs_ipc_header_response_t *header = NULL; AIS_Message *ais_msg = NULL; enum crm_ais_msg_types sender = text2msg_type(crm_system_name); /* There are only 6 handlers registered to crm_lib_service in plugin.c */ CRM_CHECK(class < 6, crm_err("Invalid message class: %d", class); return FALSE); if (data == NULL) { data = ""; } if (local_pid == 0) { local_pid = getpid(); } if (sender == crm_msg_none) { sender = local_pid; } ais_msg = calloc(1, sizeof(AIS_Message)); ais_msg->id = msg_id++; ais_msg->header.id = class; ais_msg->header.error = CS_OK; ais_msg->host.type = dest; ais_msg->host.local = local; if (node) { ais_msg->host.size = strlen(node); memset(ais_msg->host.uname, 0, MAX_NAME); memcpy(ais_msg->host.uname, node, ais_msg->host.size); ais_msg->host.id = 0; } else { ais_msg->host.size = 0; memset(ais_msg->host.uname, 0, MAX_NAME); ais_msg->host.id = 0; } ais_msg->sender.id = 0; ais_msg->sender.type = sender; ais_msg->sender.pid = local_pid; ais_msg->sender.size = pcmk_uname_len; memset(ais_msg->sender.uname, 0, MAX_NAME); memcpy(ais_msg->sender.uname, pcmk_uname, ais_msg->sender.size); ais_msg->size = 1 + strlen(data); if (ais_msg->size < CRM_BZ2_THRESHOLD) { failback: ais_msg = realloc(ais_msg, sizeof(AIS_Message) + ais_msg->size); memcpy(ais_msg->data, data, ais_msg->size); } else { char *compressed = NULL; char *uncompressed = strdup(data); unsigned int len = (ais_msg->size * 1.1) + 600; /* recomended size */ crm_trace("Compressing message payload"); compressed = malloc( len); rc = BZ2_bzBuffToBuffCompress(compressed, &len, uncompressed, ais_msg->size, CRM_BZ2_BLOCKS, 0, CRM_BZ2_WORK); free(uncompressed); if (rc != BZ_OK) { crm_err("Compression failed: %d", rc); free(compressed); goto failback; } ais_msg = realloc(ais_msg, sizeof(AIS_Message) + len + 1); memcpy(ais_msg->data, compressed, len); ais_msg->data[len] = 0; free(compressed); ais_msg->is_compressed = TRUE; ais_msg->compressed_size = len; crm_trace("Compression details: %d -> %d", ais_msg->size, ais_data_len(ais_msg)); } ais_msg->header.size = sizeof(AIS_Message) + ais_data_len(ais_msg); crm_trace("Sending%s message %d to %s.%s (data=%d, total=%d)", ais_msg->is_compressed ? " compressed" : "", ais_msg->id, ais_dest(&(ais_msg->host)), msg_type2text(dest), ais_data_len(ais_msg), ais_msg->header.size); iov.iov_base = ais_msg; iov.iov_len = ais_msg->header.size; buf = realloc(buf, buf_len); do { if (rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { retries++; crm_info("Peer overloaded or membership in flux:" " Re-sending message (Attempt %d of 20)", retries); sleep(retries); /* Proportional back off */ } errno = 0; switch (cluster_type) { case pcmk_cluster_corosync: CRM_ASSERT(FALSE/*Not supported here*/); break; case pcmk_cluster_classic_ais: rc = coroipcc_msg_send_reply_receive(ais_ipc_handle, &iov, 1, buf, buf_len); header = (cs_ipc_header_response_t *) buf; if (rc == CS_OK) { CRM_CHECK(header->size == sizeof(cs_ipc_header_response_t), crm_err("Odd message: id=%d, size=%d, class=%d, error=%d", header->id, header->size, class, header->error)); CRM_ASSERT(buf_len >= header->size); CRM_CHECK(header->id == CRM_MESSAGE_IPC_ACK, crm_err("Bad response id (%d) for request (%d)", header->id, ais_msg->header.id)); CRM_CHECK(header->error == CS_OK, rc = header->error); } break; case pcmk_cluster_cman: transport = "cpg"; CRM_CHECK(dest != crm_msg_ais, rc = CS_ERR_MESSAGE_ERROR; goto bail); rc = cpg_mcast_joined(pcmk_cpg_handle, CPG_TYPE_AGREED, &iov, 1); if (rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { cpg_flow_control_state_t fc_state = CPG_FLOW_CONTROL_DISABLED; int rc2 = cpg_flow_control_state_get(pcmk_cpg_handle, &fc_state); if (rc2 == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) { crm_warn("Connection overloaded, cannot send messages"); goto bail; } else if (rc2 != CS_OK) { crm_warn("Could not determin the connection state: %s (%d)", ais_error2text(rc2), rc2); goto bail; } } break; case pcmk_cluster_unknown: case pcmk_cluster_invalid: case pcmk_cluster_heartbeat: CRM_ASSERT(is_openais_cluster()); break; } } while ((rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) && retries < 20); bail: if (rc != CS_OK) { crm_perror(LOG_ERR, "Sending message %d via %s: FAILED (rc=%d): %s", ais_msg->id, transport, rc, ais_error2text(rc)); } else { crm_trace("Message %d: sent", ais_msg->id); } free(buf); free(ais_msg); return (rc == CS_OK); } gboolean send_ais_message(xmlNode * msg, gboolean local, const char *node, enum crm_ais_msg_types dest) { gboolean rc = TRUE; char *data = NULL; if (is_classic_ais_cluster()) { if (ais_fd_async < 0) { crm_err("Not connected to AIS: %d", ais_fd_async); return FALSE; } } data = dump_xml_unformatted(msg); rc = send_ais_text(crm_class_cluster, data, local, node, dest); free(data); return rc; } void -terminate_ais_connection(void) +terminate_cs_connection(void) { crm_notice("Disconnecting from Corosync"); if (is_classic_ais_cluster()) { if(ais_ipc_handle) { crm_trace("Disconnecting plugin"); coroipcc_service_disconnect(ais_ipc_handle); ais_ipc_handle = 0; } else { crm_info("No plugin connection"); } } else { if(pcmk_cpg_handle) { crm_trace("Disconnecting CPG"); cpg_leave(pcmk_cpg_handle, &pcmk_cpg_group); cpg_finalize(pcmk_cpg_handle); pcmk_cpg_handle = 0; } else { crm_info("No CPG connection"); } } # if SUPPORT_CMAN if (is_cman_cluster()) { if(pcmk_cman_handle) { crm_trace("Disconnecting cman"); cman_stop_notification(pcmk_cman_handle); cman_finish(pcmk_cman_handle); } else { crm_info("No cman connection"); } } # endif ais_fd_async = -1; ais_fd_sync = -1; } static crm_node_t * crm_update_ais_node(xmlNode * member, long long seq) { const char *id_s = crm_element_value(member, "id"); const char *addr = crm_element_value(member, "addr"); const char *uname = crm_element_value(member, "uname"); const char *state = crm_element_value(member, "state"); const char *born_s = crm_element_value(member, "born"); const char *seen_s = crm_element_value(member, "seen"); const char *votes_s = crm_element_value(member, "votes"); const char *procs_s = crm_element_value(member, "processes"); int votes = crm_int_helper(votes_s, NULL); unsigned int id = crm_int_helper(id_s, NULL); unsigned int procs = crm_int_helper(procs_s, NULL); /* TODO: These values will contain garbage if version < 0.7.1 */ uint64_t born = crm_int_helper(born_s, NULL); uint64_t seen = crm_int_helper(seen_s, NULL); return crm_update_peer(__FUNCTION__, id, born, seen, votes, procs, uname, uname, addr, state); } static gboolean -ais_dispatch_message(AIS_Message * msg, gboolean(*dispatch) (AIS_Message *, char *, int)) +ais_dispatch_message(AIS_Message * msg, gboolean(*dispatch) (int kind, const char *from, const char *data)) { char *data = NULL; char *uncompressed = NULL; xmlNode *xml = NULL; CRM_ASSERT(msg != NULL); crm_trace("Got new%s message (size=%d, %d, %d)", msg->is_compressed ? " compressed" : "", ais_data_len(msg), msg->size, msg->compressed_size); data = msg->data; if (msg->is_compressed && msg->size > 0) { int rc = BZ_OK; unsigned int new_size = msg->size + 1; if (check_message_sanity(msg, NULL) == FALSE) { goto badmsg; } crm_trace("Decompressing message data"); uncompressed = calloc(1, new_size); rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, data, msg->compressed_size, 1, 0); if (rc != BZ_OK) { crm_err("Decompression failed: %d", rc); goto badmsg; } CRM_ASSERT(rc == BZ_OK); CRM_ASSERT(new_size == msg->size); data = uncompressed; } else if (check_message_sanity(msg, data) == FALSE) { goto badmsg; } else if (safe_str_eq("identify", data)) { int pid = getpid(); char *pid_s = crm_itoa(pid); send_ais_text(crm_class_cluster, pid_s, TRUE, NULL, crm_msg_ais); free(pid_s); goto done; } if (msg->header.id != crm_class_members) { crm_get_peer(msg->sender.id, msg->sender.uname); } if (msg->header.id == crm_class_rmpeer) { uint32_t id = crm_int_helper(data, NULL); crm_info("Removing peer %s/%u", data, id); reap_crm_member(id); goto done; } else if (is_classic_ais_cluster()) { if (msg->header.id == crm_class_members || msg->header.id == crm_class_quorum) { xmlNode *node = NULL; const char *value = NULL; gboolean quorate = FALSE; xml = string2xml(data); if (xml == NULL) { crm_err("Invalid membership update: %s", data); goto badmsg; } value = crm_element_value(xml, "quorate"); CRM_CHECK(value != NULL, crm_log_xml_err(xml, "No quorum value:"); goto badmsg); if (crm_is_true(value)) { quorate = TRUE; } value = crm_element_value(xml, "id"); CRM_CHECK(value != NULL, crm_log_xml_err(xml, "No membership id"); goto badmsg); crm_peer_seq = crm_int_helper(value, NULL); if (quorate != crm_have_quorum) { crm_notice("Membership %s: quorum %s", value, quorate ? "acquired" : "lost"); crm_have_quorum = quorate; } else { crm_info("Membership %s: quorum %s", value, quorate ? "retained" : "still lost"); } for (node = __xml_first_child(xml); node != NULL; node = __xml_next(node)) { crm_update_ais_node(node, crm_peer_seq); } } } crm_trace("Payload: %s", data); if (dispatch != NULL) { - dispatch(msg, data, 0); + dispatch(msg->header.id, msg->sender.uname, data); } done: free(uncompressed); free_xml(xml); return TRUE; badmsg: crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):" " min=%d, total=%d, size=%d, bz2_size=%d", msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type), ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), msg->sender.pid, (int)sizeof(AIS_Message), msg->header.size, msg->size, msg->compressed_size); goto done; } int ais_dispatch(gpointer user_data) { int rc = CS_OK; gboolean good = TRUE; - gboolean(*dispatch) (AIS_Message *, char *, int) = user_data; + gboolean(*dispatch) (int kind, const char *from, const char *data) = user_data; do { char *buffer = NULL; rc = coroipcc_dispatch_get(ais_ipc_handle, (void **)&buffer, 0); if (rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { return 0; } if (rc != CS_OK) { crm_perror(LOG_ERR, "Receiving message body failed: (%d) %s", rc, ais_error2text(rc)); return -1; } if (buffer == NULL) { /* NULL is a legal "no message afterall" value */ return 0; } good = ais_dispatch_message((AIS_Message *) buffer, dispatch); coroipcc_dispatch_put(ais_ipc_handle); } while (good && ais_ipc_handle); if(good) { return 0; } return -1; } static void ais_destroy(gpointer user_data) { crm_err("AIS connection terminated"); ais_fd_sync = -1; exit(1); } # if SUPPORT_CMAN static int pcmk_cman_dispatch(gpointer user_data) { int rc = cman_dispatch(pcmk_cman_handle, CMAN_DISPATCH_ALL); if (rc < 0) { crm_err("Connection to cman failed: %d", rc); return FALSE; } return TRUE; } # define MAX_NODES 256 static void cman_event_callback(cman_handle_t handle, void *privdata, int reason, int arg) { int rc = 0, lpc = 0, node_count = 0; cman_cluster_t cluster; static cman_node_t cman_nodes[MAX_NODES]; gboolean(*dispatch) (unsigned long long, gboolean) = privdata; switch (reason) { case CMAN_REASON_STATECHANGE: memset(&cluster, 0, sizeof(cluster)); rc = cman_get_cluster(pcmk_cman_handle, &cluster); if (rc < 0) { crm_err("Couldn't query cman cluster details: %d %d", rc, errno); return; } crm_peer_seq = cluster.ci_generation; if (arg != crm_have_quorum) { crm_notice("Membership %llu: quorum %s", crm_peer_seq, arg ? "acquired" : "lost"); crm_have_quorum = arg; } else { crm_info("Membership %llu: quorum %s", crm_peer_seq, arg ? "retained" : "still lost"); } rc = cman_get_nodes(pcmk_cman_handle, MAX_NODES, &node_count, cman_nodes); if (rc < 0) { crm_err("Couldn't query cman node list: %d %d", rc, errno); return; } for (lpc = 0; lpc < node_count; lpc++) { if (cman_nodes[lpc].cn_nodeid == 0) { /* Never allow node ID 0 to be considered a member #315711 */ cman_nodes[lpc].cn_member = 0; } crm_update_peer(__FUNCTION__, cman_nodes[lpc].cn_nodeid, cman_nodes[lpc].cn_incarnation, cman_nodes[lpc].cn_member ? crm_peer_seq : 0, 0, 0, cman_nodes[lpc].cn_name, cman_nodes[lpc].cn_name, NULL, cman_nodes[lpc].cn_member ? CRM_NODE_MEMBER : CRM_NODE_LOST); } if (dispatch) { dispatch(crm_peer_seq, crm_have_quorum); } break; case CMAN_REASON_TRY_SHUTDOWN: /* Always reply with a negative - pacemaker needs to be stopped first */ crm_info("CMAN wants to shut down: %s", arg ? "forced" : "optional"); cman_replyto_shutdown(pcmk_cman_handle, 0); break; case CMAN_REASON_CONFIG_UPDATE: /* Ignore */ break; } } # endif gboolean init_cman_connection(gboolean(*dispatch) (unsigned long long, gboolean), void (*destroy) (gpointer)) { # if SUPPORT_CMAN int rc = -1, fd = -1; cman_cluster_t cluster; struct mainloop_fd_callbacks cman_fd_callbacks = { .dispatch = pcmk_cman_dispatch, .destroy = destroy, }; crm_info("Configuring Pacemaker to obtain quorum from cman"); memset(&cluster, 0, sizeof(cluster)); pcmk_cman_handle = cman_init(dispatch); if (pcmk_cman_handle == NULL || cman_is_active(pcmk_cman_handle) == FALSE) { crm_err("Couldn't connect to cman"); goto cman_bail; } rc = cman_get_cluster(pcmk_cman_handle, &cluster); if (rc < 0) { crm_err("Couldn't query cman cluster details: %d %d", rc, errno); goto cman_bail; } ais_cluster_name = strdup(cluster.ci_name); rc = cman_start_notification(pcmk_cman_handle, cman_event_callback); if (rc < 0) { crm_err("Couldn't register for cman notifications: %d %d", rc, errno); goto cman_bail; } /* Get the current membership state */ cman_event_callback(pcmk_cman_handle, dispatch, CMAN_REASON_STATECHANGE, cman_is_quorate(pcmk_cman_handle)); fd = cman_get_fd(pcmk_cman_handle); mainloop_add_fd("cman", G_PRIORITY_MEDIUM, fd, dispatch, &cman_fd_callbacks); cman_bail: if (rc < 0) { cman_finish(pcmk_cman_handle); return FALSE; } # else crm_err("cman qorum is not supported in this build"); exit(100); # endif return TRUE; } # ifdef SUPPORT_COROSYNC -gboolean(*pcmk_cpg_dispatch_fn) (AIS_Message *, char *, int) = NULL; +gboolean(*pcmk_cpg_dispatch_fn) (int kind, const char *from, const char *data) = NULL; static int pcmk_cpg_dispatch(gpointer user_data) { int rc = 0; pcmk_cpg_dispatch_fn = user_data; rc = cpg_dispatch(pcmk_cpg_handle, CS_DISPATCH_ALL); if (rc != CS_OK) { crm_err("Connection to the CPG API failed: %d", rc); return -1; } return 0; } static void pcmk_cpg_deliver(cpg_handle_t handle, const struct cpg_name *groupName, uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len) { AIS_Message *ais_msg = (AIS_Message *) msg; if (ais_msg->sender.id > 0 && ais_msg->sender.id != nodeid) { crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, ais_msg->sender.id); return; } else if (ais_msg->host.size != 0 && safe_str_neq(ais_msg->host.uname, pcmk_uname)) { /* Not for us */ return; } ais_msg->sender.id = nodeid; if (ais_msg->sender.size == 0) { crm_node_t *peer = crm_get_peer(nodeid, NULL); if (peer == NULL) { crm_err("Peer with nodeid=%u is unknown", nodeid); } else if (peer->uname == NULL) { crm_err("No uname for peer with nodeid=%u", nodeid); } else { crm_notice("Fixing uname for peer with nodeid=%u", nodeid); ais_msg->sender.size = strlen(peer->uname); memset(ais_msg->sender.uname, 0, MAX_NAME); memcpy(ais_msg->sender.uname, peer->uname, ais_msg->sender.size); } } ais_dispatch_message(ais_msg, pcmk_cpg_dispatch_fn); } static void pcmk_cpg_membership(cpg_handle_t handle, const struct cpg_name *groupName, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries) { int i; for (i = 0; i < member_list_entries; i++) { crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL); crm_debug("Member[%d] %d ", i, member_list[i].nodeid); crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS); } for (i = 0; i < left_list_entries; i++) { crm_node_t *peer = crm_get_peer(left_list[i].nodeid, NULL); crm_debug("Left[%d] %d ", i, left_list[i].nodeid); crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, OFFLINESTATUS); } } cpg_callbacks_t cpg_callbacks = { .cpg_deliver_fn = pcmk_cpg_deliver, .cpg_confchg_fn = pcmk_cpg_membership, }; # endif static gboolean -init_cpg_connection(gboolean(*dispatch) (AIS_Message *, char *, int), void (*destroy) (gpointer), - uint32_t * nodeid) +init_cpg_connection(crm_cluster_t *cluster) { # ifdef SUPPORT_COROSYNC int rc = -1; int fd = 0; int retries = 0; crm_node_t *peer = NULL; struct mainloop_fd_callbacks cpg_fd_callbacks = { .dispatch = pcmk_cpg_dispatch, - .destroy = destroy, + .destroy = cluster->destroy, }; strcpy(pcmk_cpg_group.value, crm_system_name); pcmk_cpg_group.length = strlen(crm_system_name) + 1; cs_repeat(retries, 30, rc = cpg_initialize(&pcmk_cpg_handle, &cpg_callbacks)); if (rc != CS_OK) { crm_err("Could not connect to the Cluster Process Group API: %d\n", rc); goto bail; } retries = 0; - cs_repeat(retries, 30, rc = cpg_local_get(pcmk_cpg_handle, (unsigned int *)nodeid)); + cs_repeat(retries, 30, rc = cpg_local_get(pcmk_cpg_handle, (unsigned int *)&cluster->nodeid)); if (rc != CS_OK) { crm_err("Could not get local node id from the CPG API"); goto bail; } retries = 0; cs_repeat(retries, 30, rc = cpg_join(pcmk_cpg_handle, &pcmk_cpg_group)); if (rc != CS_OK) { crm_err("Could not join the CPG group '%s': %d", crm_system_name, rc); goto bail; } rc = cpg_fd_get(pcmk_cpg_handle, &fd); if (rc != CS_OK) { crm_err("Could not obtain the CPG API connection: %d\n", rc); goto bail; } - mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, dispatch, &cpg_fd_callbacks); + mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster->cs_dispatch, &cpg_fd_callbacks); bail: if (rc != CS_OK) { cpg_finalize(pcmk_cpg_handle); return FALSE; } - peer = crm_get_peer(pcmk_nodeid, pcmk_uname); + peer = crm_get_peer(cluster->nodeid, pcmk_uname); crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS); # else crm_err("The Corosync CPG API is not supported in this build"); exit(100); # endif return TRUE; } gboolean init_quorum_connection(gboolean(*dispatch) (unsigned long long, gboolean), void (*destroy) (gpointer)) { crm_err("The Corosync quorum API is not supported in this build"); exit(100); return TRUE; } static gboolean -init_ais_connection_classic(gboolean(*dispatch) (AIS_Message *, char *, int), - void (*destroy) (gpointer), char **our_uuid, char **our_uname, - int *nodeid) +init_cs_connection_classic(crm_cluster_t *cluster) { int rc; int pid = 0; char *pid_s = NULL; struct utsname name; struct mainloop_fd_callbacks ais_fd_callbacks = { .dispatch = ais_dispatch, - .destroy = destroy, + .destroy = cluster->destroy, }; crm_info("Creating connection to our Corosync plugin"); rc = coroipcc_service_connect(COROSYNC_SOCKET_NAME, PCMK_SERVICE_ID, AIS_IPC_MESSAGE_SIZE, AIS_IPC_MESSAGE_SIZE, AIS_IPC_MESSAGE_SIZE, &ais_ipc_handle); if (ais_ipc_handle) { coroipcc_fd_get(ais_ipc_handle, &ais_fd_async); } else { crm_info("Connection to our AIS plugin (%d) failed: %s (%d)", PCMK_SERVICE_ID, strerror(errno), errno); return FALSE; } if (ais_fd_async <= 0 && rc == CS_OK) { crm_err("No context created, but connection reported 'ok'"); rc = CS_ERR_LIBRARY; } if (rc != CS_OK) { crm_info("Connection to our AIS plugin (%d) failed: %s (%d)", PCMK_SERVICE_ID, ais_error2text(rc), rc); } if (rc != CS_OK) { return FALSE; } - if (destroy == NULL) { - destroy = ais_destroy; + if (ais_fd_callbacks.destroy == NULL) { + ais_fd_callbacks.destroy = ais_destroy; } - mainloop_add_fd("corosync-plugin", G_PRIORITY_MEDIUM, ais_fd_async, dispatch, &ais_fd_callbacks); + mainloop_add_fd("corosync-plugin", G_PRIORITY_MEDIUM, ais_fd_async, cluster->cs_dispatch, &ais_fd_callbacks); crm_info("AIS connection established"); pid = getpid(); pid_s = crm_itoa(pid); send_ais_text(crm_class_cluster, pid_s, TRUE, NULL, crm_msg_ais); free(pid_s); if (uname(&name) < 0) { crm_perror(LOG_ERR, "Could not determin the current host"); exit(100); } get_ais_nodeid(&pcmk_nodeid, &pcmk_uname); if (safe_str_neq(name.nodename, pcmk_uname)) { crm_crit("Node name mismatch! OpenAIS supplied %s, our lookup returned %s", pcmk_uname, name.nodename); crm_notice ("Node name mismatches usually occur when assigned automatically by DHCP servers"); crm_notice("If this node was part of the cluster with a different name," " you will need to remove the old entry with crm_node --remove"); } return TRUE; } static int pcmk_mcp_dispatch(const char *buffer, ssize_t length, gpointer userdata) { xmlNode *msg = string2xml(buffer); if (msg && is_classic_ais_cluster()) { xmlNode *node = NULL; for (node = __xml_first_child(msg); node != NULL; node = __xml_next(node)) { int id = 0; int children = 0; const char *uname = crm_element_value(node, "uname"); crm_element_value_int(node, "id", &id); crm_element_value_int(node, "processes", &children); if (id == 0) { crm_log_xml_err(msg, "Bad Update"); } else { crm_node_t *peer = crm_get_peer(id, uname); crm_update_peer_proc(__FUNCTION__, peer, children, NULL); } } } free_xml(msg); return 0; } static void pcmk_mcp_destroy(gpointer user_data) { void (*callback)(gpointer data) = user_data; if(callback) { callback(NULL); } } gboolean -init_ais_connection(gboolean(*dispatch) (AIS_Message *, char *, int), void (*destroy) (gpointer), - char **our_uuid, char **our_uname, int *nodeid) +init_cs_connection(crm_cluster_t *cluster) { int retries = 0; static struct ipc_client_callbacks mcp_callbacks = { .dispatch = pcmk_mcp_dispatch, .destroy = pcmk_mcp_destroy }; while (retries < 5) { - int rc = init_ais_connection_once(dispatch, destroy, our_uuid, our_uname, nodeid); + int rc = init_cs_connection_once(cluster); retries++; switch (rc) { case CS_OK: if (getenv("HA_mcp")) { xmlNode *poke = create_xml_node(NULL, "poke"); - mainloop_io_t *ipc = mainloop_add_ipc_client(CRM_SYSTEM_MCP, G_PRIORITY_MEDIUM, 0, destroy, &mcp_callbacks); + mainloop_io_t *ipc = mainloop_add_ipc_client(CRM_SYSTEM_MCP, G_PRIORITY_MEDIUM, 0, cluster->destroy, &mcp_callbacks); crm_ipc_send(mainloop_get_ipc_client(ipc), poke, 0, 0, NULL); free_xml(poke); } return TRUE; break; case CS_ERR_TRY_AGAIN: case CS_ERR_QUEUE_FULL: sleep(retries); break; default: return FALSE; } } crm_err("Retry count exceeded: %d", retries); return FALSE; } static char * get_local_node_name(void) { char *name = NULL; struct utsname res; if (is_cman_cluster()) { # if SUPPORT_CMAN cman_node_t us; cman_handle_t cman; cman = cman_init(NULL); if (cman != NULL && cman_is_active(cman)) { us.cn_name[0] = 0; cman_get_node(cman, CMAN_NODEID_US, &us); name = strdup(us.cn_name); crm_info("Using CMAN node name: %s", name); } else { crm_err("Couldn't determin node name from CMAN"); } cman_finish(cman); # endif } else if (uname(&res) < 0) { crm_perror(LOG_ERR, "Could not determin the current host"); exit(100); } else { name = strdup(res.nodename); } return name; } extern int set_cluster_type(enum cluster_type_e type); gboolean -init_ais_connection_once(gboolean(*dispatch) (AIS_Message *, char *, int), - void (*destroy) (gpointer), char **our_uuid, char **our_uname, int *nodeid) +init_cs_connection_once(crm_cluster_t *cluster) { enum cluster_type_e stack = get_cluster_type(); crm_peer_init(); /* Here we just initialize comms */ switch (stack) { case pcmk_cluster_classic_ais: - if (init_ais_connection_classic(dispatch, destroy, our_uuid, &pcmk_uname, nodeid) == - FALSE) { + if (init_cs_connection_classic(cluster) == FALSE) { return FALSE; } break; case pcmk_cluster_cman: - if (init_cpg_connection(dispatch, destroy, &pcmk_nodeid) == FALSE) { + if (init_cpg_connection(cluster) == FALSE) { return FALSE; } pcmk_uname = get_local_node_name(); break; case pcmk_cluster_heartbeat: crm_info("Could not find an active corosync based cluster"); return FALSE; break; default: crm_err("Invalid cluster type: %s (%d)", name_for_cluster_type(stack), stack); return FALSE; break; } crm_info("Connection to '%s': established", name_for_cluster_type(stack)); CRM_ASSERT(pcmk_uname != NULL); pcmk_uname_len = strlen(pcmk_uname); + pcmk_nodeid = cluster->nodeid; if (pcmk_nodeid != 0) { /* Ensure the local node always exists */ crm_get_peer(pcmk_nodeid, pcmk_uname); } - if (our_uuid != NULL) { - *our_uuid = get_corosync_uuid(pcmk_nodeid, pcmk_uname); - } - - if (our_uname != NULL) { - *our_uname = strdup(pcmk_uname); - } - - if (nodeid != NULL) { - *nodeid = pcmk_nodeid; - } + cluster->uuid = get_corosync_uuid(pcmk_nodeid, pcmk_uname); + cluster->uname = strdup(pcmk_uname); return TRUE; } gboolean check_message_sanity(const AIS_Message * msg, const char *data) { gboolean sane = TRUE; gboolean repaired = FALSE; int dest = msg->host.type; int tmp_size = msg->header.size - sizeof(AIS_Message); if (sane && msg->header.size == 0) { crm_warn("Message with no size"); sane = FALSE; } if (sane && msg->header.error != CS_OK) { crm_warn("Message header contains an error: %d", msg->header.error); sane = FALSE; } if (sane && ais_data_len(msg) != tmp_size) { crm_warn("Message payload size is incorrect: expected %d, got %d", ais_data_len(msg), tmp_size); sane = TRUE; } if (sane && ais_data_len(msg) == 0) { crm_warn("Message with no payload"); sane = FALSE; } if (sane && data && msg->is_compressed == FALSE) { int str_size = strlen(data) + 1; if (ais_data_len(msg) != str_size) { int lpc = 0; crm_warn("Message payload is corrupted: expected %d bytes, got %d", ais_data_len(msg), str_size); sane = FALSE; for (lpc = (str_size - 10); lpc < msg->size; lpc++) { if (lpc < 0) { lpc = 0; } crm_debug("bad_data[%d]: %d / '%c'", lpc, data[lpc], data[lpc]); } } } if (sane == FALSE) { crm_err("Invalid message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)", msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed, ais_data_len(msg), msg->header.size); } else if (repaired) { crm_err ("Repaired message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)", msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed, ais_data_len(msg), msg->header.size); } else { crm_trace ("Verfied message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)", msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed, ais_data_len(msg), msg->header.size); } return sane; } #endif static int get_config_opt(confdb_handle_t config, hdb_handle_t object_handle, const char *key, char **value, const char *fallback) { size_t len = 0; char *env_key = NULL; const char *env_value = NULL; char buffer[256]; if (*value) { free(*value); *value = NULL; } if (object_handle > 0) { if (CS_OK == confdb_key_get(config, object_handle, key, strlen(key), &buffer, &len)) { *value = strdup(buffer); } } if (*value) { crm_info("Found '%s' for option: %s", *value, key); return 0; } env_key = crm_concat("HA", key, '_'); env_value = getenv(env_key); free(env_key); if (*value) { crm_info("Found '%s' in ENV for option: %s", *value, key); *value = strdup(env_value); return 0; } if (fallback) { crm_info("Defaulting to '%s' for option: %s", fallback, key); *value = strdup(fallback); } else { crm_info("No default for option: %s", key); } return -1; } static confdb_handle_t config_find_init(confdb_handle_t config) { cs_error_t rc = CS_OK; confdb_handle_t local_handle = OBJECT_PARENT_HANDLE; rc = confdb_object_find_start(config, local_handle); if (rc == CS_OK) { return local_handle; } else { crm_err("Couldn't create search context: %d", rc); } return 0; } static hdb_handle_t config_find_next(confdb_handle_t config, const char *name, confdb_handle_t top_handle) { cs_error_t rc = CS_OK; hdb_handle_t local_handle = 0; if (top_handle == 0) { crm_err("Couldn't search for %s: no valid context", name); return 0; } crm_trace("Searching for %s in " HDB_X_FORMAT, name, top_handle); rc = confdb_object_find(config, top_handle, name, strlen(name), &local_handle); if (rc != CS_OK) { crm_info("No additional configuration supplied for: %s", name); local_handle = 0; } else { crm_info("Processing additional %s options...", name); } return local_handle; } enum cluster_type_e find_corosync_variant(void) { confdb_handle_t config; enum cluster_type_e found = pcmk_cluster_unknown; int rc; char *value = NULL; confdb_handle_t top_handle = 0; hdb_handle_t local_handle = 0; static confdb_callbacks_t callbacks = { }; rc = confdb_initialize(&config, &callbacks); if (rc != CS_OK) { crm_debug("Could not initialize Cluster Configuration Database API instance error %d", rc); return found; } top_handle = config_find_init(config); local_handle = config_find_next(config, "service", top_handle); while (local_handle) { get_config_opt(config, local_handle, "name", &value, NULL); if (safe_str_eq("pacemaker", value)) { found = pcmk_cluster_classic_ais; get_config_opt(config, local_handle, "ver", &value, "0"); crm_trace("Found Pacemaker plugin version: %s", value); break; } local_handle = config_find_next(config, "service", top_handle); } if (found == pcmk_cluster_unknown) { top_handle = config_find_init(config); local_handle = config_find_next(config, "quorum", top_handle); get_config_opt(config, local_handle, "provider", &value, NULL); if (safe_str_eq("quorum_cman", value)) { crm_trace("Found CMAN quorum provider"); found = pcmk_cluster_cman; } } free(value); confdb_finalize(config); return found; } gboolean crm_is_corosync_peer_active(const crm_node_t * node) { enum crm_proc_flag proc = crm_proc_none; if (node == NULL) { crm_trace("NULL"); return FALSE; } else if(safe_str_neq(node->state, CRM_NODE_MEMBER)) { crm_trace("%s: state=%s", node->uname, node->state); return FALSE; } else if(is_cman_cluster() && (node->processes & crm_proc_cpg)) { /* If we can still talk to our peer process on that node, * then its also part of the corosync membership */ crm_trace("%s: processes=%.16x", node->uname, node->processes); return TRUE; } else if(is_classic_ais_cluster() && (node->processes & crm_proc_plugin) == 0) { crm_trace("%s: processes=%.16x", node->uname, node->processes); return FALSE; } proc = text2proc(crm_system_name); if(proc != crm_proc_none && (node->processes & proc) == 0) { crm_trace("%s: proc %.16x not in %.16x", node->uname, proc, node->processes); return FALSE; } return TRUE; } diff --git a/tools/attrd.c b/tools/attrd.c index 2ca52b3860..4c1c6da6a7 100644 --- a/tools/attrd.c +++ b/tools/attrd.c @@ -1,913 +1,916 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define OPTARGS "hV" #if SUPPORT_HEARTBEAT ll_cluster_t *attrd_cluster_conn; #endif GMainLoop *mainloop = NULL; char *attrd_uname = NULL; char *attrd_uuid = NULL; gboolean need_shutdown = FALSE; GHashTable *attr_hash = NULL; cib_t *cib_conn = NULL; typedef struct attrd_client_s { char *user; } attrd_client_t; typedef struct attr_hash_entry_s { char *uuid; char *id; char *set; char *section; char *value; char *stored_value; int timeout; char *dampen; guint timer_id; char *user; } attr_hash_entry_t; void attrd_local_callback(xmlNode * msg); gboolean attrd_timer_callback(void *user_data); gboolean attrd_trigger_update(attr_hash_entry_t * hash_entry); void attrd_perform_update(attr_hash_entry_t * hash_entry); static void free_hash_entry(gpointer data) { attr_hash_entry_t *entry = data; if (entry == NULL) { return; } free(entry->id); free(entry->set); free(entry->dampen); free(entry->section); free(entry->uuid); free(entry->value); free(entry->stored_value); free(entry->user); free(entry); } static int32_t attrd_ipc_accept(qb_ipcs_connection_t *c, uid_t uid, gid_t gid) { attrd_client_t *new_client = NULL; #if ENABLE_ACL struct group *crm_grp = NULL; #endif crm_trace("Connecting %p for connection from %d by uid=%d gid=%d", c, crm_ipcs_client_pid(c), uid, gid); if (need_shutdown) { crm_info("Ignoring connection request during shutdown"); return FALSE; } new_client = calloc(1, sizeof(attrd_client_t)); #if ENABLE_ACL crm_grp = getgrnam(CRM_DAEMON_GROUP); if (crm_grp) { qb_ipcs_connection_auth_set(c, -1, crm_grp->gr_gid, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); } new_client->user = uid2username(uid); #endif qb_ipcs_context_set(c, new_client); return 0; } static void attrd_ipc_created(qb_ipcs_connection_t *c) { crm_trace("Client %p connected from %d", c, crm_ipcs_client_pid(c)); } /* Exit code means? */ static int32_t attrd_ipc_dispatch(qb_ipcs_connection_t *c, void *data, size_t size) { uint32_t id = 0; uint32_t flags = 0; #if ENABLE_ACL attrd_client_t *client = qb_ipcs_context_get(c); #endif xmlNode *msg = crm_ipcs_recv(c, data, size, &id, &flags); if(flags & crm_ipc_client_response) { crm_trace("Ack'ing msg from %d (%p)", crm_ipcs_client_pid(c), c); crm_ipcs_send_ack(c, id, "ack", __FUNCTION__, __LINE__); } if (msg == NULL) { crm_debug("No msg from %d (%p)", crm_ipcs_client_pid(c), c); return 0; } #if ENABLE_ACL determine_request_user(client->user, msg, F_ATTRD_USER); #endif crm_trace("Processing msg from %d (%p)", crm_ipcs_client_pid(c), c); crm_log_xml_trace(msg, __PRETTY_FUNCTION__); attrd_local_callback(msg); free_xml(msg); return 0; } /* Error code means? */ static int32_t attrd_ipc_closed(qb_ipcs_connection_t *c) { crm_trace("Connection %p from %d closed", c, crm_ipcs_client_pid(c)); return 0; } static void attrd_ipc_destroy(qb_ipcs_connection_t *c) { attrd_client_t *client = qb_ipcs_context_get(c); if (client == NULL) { return; } crm_trace("Destroying %p", c); free(client->user); free(client); crm_trace("Free'd the attrd client"); return; } struct qb_ipcs_service_handlers ipc_callbacks = { .connection_accept = attrd_ipc_accept, .connection_created = attrd_ipc_created, .msg_process = attrd_ipc_dispatch, .connection_closed = attrd_ipc_closed, .connection_destroyed = attrd_ipc_destroy }; static void attrd_shutdown(int nsig) { need_shutdown = TRUE; crm_info("Exiting"); if (mainloop != NULL && g_main_is_running(mainloop)) { g_main_quit(mainloop); } else { exit(0); } } static void usage(const char *cmd, int exit_status) { FILE *stream; stream = exit_status ? stderr : stdout; fprintf(stream, "usage: %s [-srkh] [-c configure file]\n", cmd); /* fprintf(stream, "\t-d\tsets debug level\n"); */ /* fprintf(stream, "\t-s\tgets daemon status\n"); */ /* fprintf(stream, "\t-r\trestarts daemon\n"); */ /* fprintf(stream, "\t-k\tstops daemon\n"); */ /* fprintf(stream, "\t-h\thelp message\n"); */ fflush(stream); exit(exit_status); } static void stop_attrd_timer(attr_hash_entry_t * hash_entry) { if (hash_entry != NULL && hash_entry->timer_id != 0) { crm_trace("Stopping %s timer", hash_entry->id); g_source_remove(hash_entry->timer_id); hash_entry->timer_id = 0; } } static void log_hash_entry(int level, attr_hash_entry_t * entry, const char *text) { do_crm_log(level, "%s: Set: %s, Name: %s, Value: %s, Timeout: %s", text, entry->section, entry->id, entry->value, entry->dampen); } static attr_hash_entry_t * find_hash_entry(xmlNode * msg) { const char *value = NULL; const char *attr = crm_element_value(msg, F_ATTRD_ATTRIBUTE); attr_hash_entry_t *hash_entry = NULL; if (attr == NULL) { crm_info("Ignoring message with no attribute name"); return NULL; } hash_entry = g_hash_table_lookup(attr_hash, attr); if (hash_entry == NULL) { /* create one and add it */ crm_info("Creating hash entry for %s", attr); hash_entry = calloc(1, sizeof(attr_hash_entry_t)); hash_entry->id = strdup(attr); g_hash_table_insert(attr_hash, hash_entry->id, hash_entry); hash_entry = g_hash_table_lookup(attr_hash, attr); CRM_CHECK(hash_entry != NULL, return NULL); } value = crm_element_value(msg, F_ATTRD_SET); if (value != NULL) { free(hash_entry->set); hash_entry->set = strdup(value); crm_debug("\t%s->set: %s", attr, value); } value = crm_element_value(msg, F_ATTRD_SECTION); if (value == NULL) { value = XML_CIB_TAG_STATUS; } free(hash_entry->section); hash_entry->section = strdup(value); crm_trace("\t%s->section: %s", attr, value); value = crm_element_value(msg, F_ATTRD_DAMPEN); if (value != NULL) { free(hash_entry->dampen); hash_entry->dampen = strdup(value); hash_entry->timeout = crm_get_msec(value); crm_trace("\t%s->timeout: %s", attr, value); } #if ENABLE_ACL free(hash_entry->user); value = crm_element_value(msg, F_ATTRD_USER); if (value != NULL) { hash_entry->user = strdup(value); crm_trace("\t%s->user: %s", attr, value); } #endif log_hash_entry(LOG_DEBUG_2, hash_entry, "Found (and updated) entry:"); return hash_entry; } #if SUPPORT_HEARTBEAT static void attrd_ha_connection_destroy(gpointer user_data) { crm_trace("Invoked"); if (need_shutdown) { /* we signed out, so this is expected */ crm_info("Heartbeat disconnection complete"); return; } crm_crit("Lost connection to heartbeat service!"); if (mainloop != NULL && g_main_is_running(mainloop)) { g_main_quit(mainloop); return; } exit(EX_OK); } static void attrd_ha_callback(HA_Message * msg, void *private_data) { attr_hash_entry_t *hash_entry = NULL; xmlNode *xml = convert_ha_message(NULL, msg, __FUNCTION__); const char *from = crm_element_value(xml, F_ORIG); const char *op = crm_element_value(xml, F_ATTRD_TASK); const char *host = crm_element_value(xml, F_ATTRD_HOST); const char *ignore = crm_element_value(xml, F_ATTRD_IGNORE_LOCALLY); if (host != NULL && safe_str_eq(host, attrd_uname)) { crm_info("Update relayed from %s", from); attrd_local_callback(xml); } else if (ignore == NULL || safe_str_neq(from, attrd_uname)) { crm_info("%s message from %s", op, from); hash_entry = find_hash_entry(xml); stop_attrd_timer(hash_entry); attrd_perform_update(hash_entry); } free_xml(xml); } #endif #if SUPPORT_COROSYNC static gboolean -attrd_ais_dispatch(AIS_Message * wrapper, char *data, int sender) +attrd_ais_dispatch(int kind, const char *from, const char *data) { xmlNode *xml = NULL; - if (wrapper->header.id == crm_class_cluster) { + if (kind == crm_class_cluster) { xml = string2xml(data); if (xml == NULL) { - crm_err("Bad message received: %d:'%.120s'", wrapper->id, data); + crm_err("Bad message received: '%.120s'", data); } } if (xml != NULL) { attr_hash_entry_t *hash_entry = NULL; const char *op = crm_element_value(xml, F_ATTRD_TASK); const char *host = crm_element_value(xml, F_ATTRD_HOST); const char *ignore = crm_element_value(xml, F_ATTRD_IGNORE_LOCALLY); - crm_xml_add_int(xml, F_SEQ, wrapper->id); - crm_xml_add(xml, F_ORIG, wrapper->sender.uname); + /* crm_xml_add_int(xml, F_SEQ, wrapper->id); */ + crm_xml_add(xml, F_ORIG, from); if (host != NULL && safe_str_eq(host, attrd_uname)) { - crm_notice("Update relayed from %s", wrapper->sender.uname); + crm_notice("Update relayed from %s", from); attrd_local_callback(xml); - } else if (ignore == NULL || safe_str_neq(wrapper->sender.uname, attrd_uname)) { - crm_trace("%s message from %s", op, wrapper->sender.uname); + } else if (ignore == NULL || safe_str_neq(from, attrd_uname)) { + crm_trace("%s message from %s", op, from); hash_entry = find_hash_entry(xml); stop_attrd_timer(hash_entry); attrd_perform_update(hash_entry); } free_xml(xml); } return TRUE; } static void attrd_ais_destroy(gpointer unused) { if (need_shutdown) { /* we signed out, so this is expected */ crm_info("OpenAIS disconnection complete"); return; } crm_crit("Lost connection to OpenAIS service!"); if (mainloop != NULL && g_main_is_running(mainloop)) { g_main_quit(mainloop); return; } exit(EX_USAGE); } #endif static void attrd_cib_connection_destroy(gpointer user_data) { cib_t *conn = user_data; conn->cmds->signoff(conn); /* Ensure IPC is cleaned up */ if (need_shutdown) { crm_info("Connection to the CIB terminated..."); } else { /* eventually this will trigger a reconnect, not a shutdown */ crm_err("Connection to the CIB terminated..."); exit(1); } return; } static void update_for_hash_entry(gpointer key, gpointer value, gpointer user_data) { attr_hash_entry_t *entry = value; if (entry->value != NULL) { attrd_timer_callback(value); } } static void local_update_for_hash_entry(gpointer key, gpointer value, gpointer user_data) { attr_hash_entry_t *entry = value; if (entry->timer_id == 0) { crm_trace("Performing local-only update after replace for %s", entry->id); attrd_perform_update(entry); /* } else { * just let the timer expire and attrd_timer_callback() will do the right thing */ } } static void do_cib_replaced(const char *event, xmlNode * msg) { crm_info("Updating all attributes after %s event", event); g_hash_table_foreach(attr_hash, local_update_for_hash_entry, NULL); } static gboolean cib_connect(void *user_data) { static int attempts = 1; static int max_retry = 20; gboolean was_err = FALSE; static cib_t *local_conn = NULL; if (local_conn == NULL) { local_conn = cib_new(); } if (was_err == FALSE) { int rc = -ENOTCONN; if (attempts < max_retry) { crm_debug("CIB signon attempt %d", attempts); rc = local_conn->cmds->signon(local_conn, T_ATTRD, cib_command); } if (rc != pcmk_ok && attempts > max_retry) { crm_err("Signon to CIB failed: %s", pcmk_strerror(rc)); was_err = TRUE; } else if (rc != pcmk_ok) { attempts++; return TRUE; } } crm_info("Connected to the CIB after %d signon attempts", attempts); if (was_err == FALSE) { int rc = local_conn->cmds->set_connection_dnotify(local_conn, attrd_cib_connection_destroy); if (rc != pcmk_ok) { crm_err("Could not set dnotify callback"); was_err = TRUE; } } if (was_err == FALSE) { if (pcmk_ok != local_conn->cmds->add_notify_callback(local_conn, T_CIB_REPLACE_NOTIFY, do_cib_replaced)) { crm_err("Could not set CIB notification callback"); was_err = TRUE; } } if (was_err) { crm_err("Aborting startup"); exit(100); } cib_conn = local_conn; crm_info("Sending full refresh now that we're connected to the cib"); g_hash_table_foreach(attr_hash, local_update_for_hash_entry, NULL); return FALSE; } int main(int argc, char **argv) { int flag = 0; int argerr = 0; + crm_cluster_t cluster; gboolean was_err = FALSE; qb_ipcs_connection_t *c = NULL; qb_ipcs_service_t *ipcs = NULL; crm_log_init(T_ATTRD, LOG_NOTICE, TRUE, FALSE, argc, argv, FALSE); mainloop_add_signal(SIGTERM, attrd_shutdown); while ((flag = getopt(argc, argv, OPTARGS)) != EOF) { switch (flag) { case 'V': crm_bump_log_level(argc, argv); break; case 'h': /* Help message */ usage(T_ATTRD, EX_OK); break; default: ++argerr; break; } } if (optind > argc) { ++argerr; } if (argerr) { usage(T_ATTRD, EX_USAGE); } attr_hash = g_hash_table_new_full(crm_str_hash, g_str_equal, NULL, free_hash_entry); crm_info("Starting up"); if (was_err == FALSE) { - void *destroy = NULL; - void *dispatch = NULL; - void *data = NULL; #if SUPPORT_COROSYNC if (is_openais_cluster()) { - destroy = attrd_ais_destroy; - dispatch = attrd_ais_dispatch; + cluster.destroy = attrd_ais_destroy; + cluster.cs_dispatch = attrd_ais_dispatch; } #endif #if SUPPORT_HEARTBEAT if (is_heartbeat_cluster()) { - data = &attrd_cluster_conn; - dispatch = attrd_ha_callback; - destroy = attrd_ha_connection_destroy; + cluster.hb_dispatch = attrd_ha_callback; + cluster.destroy = attrd_ha_connection_destroy; } #endif - if (FALSE == crm_cluster_connect(&attrd_uname, &attrd_uuid, dispatch, destroy, data)) { + if (FALSE == crm_cluster_connect(&cluster)) { crm_err("HA Signon failed"); was_err = TRUE; } + + attrd_uname = cluster.uname; + attrd_uuid = cluster.uuid; +#if SUPPORT_HEARTBEAT + attrd_cluster_conn = cluster.hb_conn; +#endif } crm_info("Cluster connection active"); if (was_err == FALSE) { ipcs = mainloop_add_ipc_server(T_ATTRD, QB_IPC_NATIVE, &ipc_callbacks); if (ipcs == NULL) { crm_err("Could not start IPC server"); was_err = TRUE; } } crm_info("Accepting attribute updates"); mainloop = g_main_new(FALSE); if (0 == g_timeout_add_full(G_PRIORITY_LOW + 1, 5000, cib_connect, NULL, NULL)) { crm_info("Adding timer failed"); was_err = TRUE; } if (was_err) { crm_err("Aborting startup"); return 100; } crm_notice("Starting mainloop..."); g_main_run(mainloop); crm_notice("Exiting..."); #if SUPPORT_HEARTBEAT if (is_heartbeat_cluster()) { attrd_cluster_conn->llc_ops->signoff(attrd_cluster_conn, TRUE); attrd_cluster_conn->llc_ops->delete(attrd_cluster_conn); } #endif c = qb_ipcs_connection_first_get(ipcs); while(c != NULL) { qb_ipcs_connection_t *last = c; c = qb_ipcs_connection_next_get(ipcs, last); /* There really shouldn't be anyone connected at this point */ crm_notice("Disconnecting client %p, pid=%d...", last, crm_ipcs_client_pid(last)); qb_ipcs_disconnect(last); qb_ipcs_connection_unref(last); } qb_ipcs_destroy(ipcs); if (cib_conn) { cib_conn->cmds->signoff(cib_conn); cib_delete(cib_conn); } g_hash_table_destroy(attr_hash); free(attrd_uuid); empty_uuid_cache(); qb_log_fini(); return 0; } struct attrd_callback_s { char *attr; char *value; }; static void attrd_cib_callback(xmlNode * msg, int call_id, int rc, xmlNode * output, void *user_data) { attr_hash_entry_t *hash_entry = NULL; struct attrd_callback_s *data = user_data; if(call_id < 0) { crm_warn("Update %s=%s failed: %s", data->attr, data->value, pcmk_strerror(call_id)); goto cleanup; } else if (data->value == NULL && rc == -ENXIO) { rc = pcmk_ok; } switch (rc) { case pcmk_ok: crm_debug("Update %d for %s=%s passed", call_id, data->attr, data->value); hash_entry = g_hash_table_lookup(attr_hash, data->attr); if (hash_entry) { free(hash_entry->stored_value); hash_entry->stored_value = NULL; if (data->value != NULL) { hash_entry->stored_value = strdup(data->value); } } break; case -pcmk_err_diff_failed: /* When an attr changes while the CIB is syncing */ case -ETIME: /* When an attr changes while there is a DC election */ case -ENXIO: /* When an attr changes while the CIB is syncing a * newer config from a node that just came up */ crm_warn("Update %d for %s=%s failed: %s", call_id, data->attr, data->value, pcmk_strerror(rc)); break; default: crm_err("Update %d for %s=%s failed: %s", call_id, data->attr, data->value, pcmk_strerror(rc)); } cleanup: free(data->value); free(data->attr); free(data); } void attrd_perform_update(attr_hash_entry_t * hash_entry) { int rc = pcmk_ok; struct attrd_callback_s *data = NULL; const char *user_name = NULL; if (hash_entry == NULL) { return; } else if (cib_conn == NULL) { crm_info("Delaying operation %s=%s: cib not connected", hash_entry->id, crm_str(hash_entry->value)); return; } #if ENABLE_ACL if (hash_entry->user) { user_name = hash_entry->user; crm_trace("Performing request from user '%s'", hash_entry->user); } #endif if (hash_entry->value == NULL) { /* delete the attr */ rc = delete_attr_delegate(cib_conn, cib_none, hash_entry->section, attrd_uuid, NULL, hash_entry->set, hash_entry->uuid, hash_entry->id, NULL, FALSE, user_name); if (rc >= 0 && hash_entry->stored_value) { crm_notice("Sent delete %d: node=%s, attr=%s, id=%s, set=%s, section=%s", rc, attrd_uuid, hash_entry->id, hash_entry->uuid ? hash_entry->uuid : "", hash_entry->set, hash_entry->section); } else if (rc < 0 && rc != -ENXIO) { crm_notice ("Delete operation failed: node=%s, attr=%s, id=%s, set=%s, section=%s: %s (%d)", attrd_uuid, hash_entry->id, hash_entry->uuid ? hash_entry->uuid : "", hash_entry->set, hash_entry->section, pcmk_strerror(rc), rc); } else { crm_trace("Sent delete %d: node=%s, attr=%s, id=%s, set=%s, section=%s", rc, attrd_uuid, hash_entry->id, hash_entry->uuid ? hash_entry->uuid : "", hash_entry->set, hash_entry->section); } } else { /* send update */ rc = update_attr_delegate(cib_conn, cib_none, hash_entry->section, attrd_uuid, NULL, hash_entry->set, hash_entry->uuid, hash_entry->id, hash_entry->value, FALSE, user_name); if (rc < 0) { crm_notice("Sent update %s=%s failed: %s", hash_entry->id, hash_entry->value, pcmk_strerror(rc)); } if (safe_str_neq(hash_entry->value, hash_entry->stored_value) || rc < 0) { crm_notice("Sent update %d: %s=%s", rc, hash_entry->id, hash_entry->value); } else { crm_trace("Sent update %d: %s=%s", rc, hash_entry->id, hash_entry->value); } } data = calloc(1, sizeof(struct attrd_callback_s)); data->attr = strdup(hash_entry->id); if (hash_entry->value != NULL) { data->value = strdup(hash_entry->value); } add_cib_op_callback(cib_conn, rc, FALSE, data, attrd_cib_callback); return; } void attrd_local_callback(xmlNode * msg) { static int plus_plus_len = 5; attr_hash_entry_t *hash_entry = NULL; const char *from = crm_element_value(msg, F_ORIG); const char *op = crm_element_value(msg, F_ATTRD_TASK); const char *attr = crm_element_value(msg, F_ATTRD_ATTRIBUTE); const char *value = crm_element_value(msg, F_ATTRD_VALUE); const char *host = crm_element_value(msg, F_ATTRD_HOST); if (safe_str_eq(op, "refresh")) { crm_notice("Sending full refresh (origin=%s)", from); g_hash_table_foreach(attr_hash, update_for_hash_entry, NULL); return; } if (host != NULL && safe_str_neq(host, attrd_uname)) { send_cluster_message(host, crm_msg_attrd, msg, FALSE); return; } crm_debug("%s message from %s: %s=%s", op, from, attr, crm_str(value)); hash_entry = find_hash_entry(msg); if (hash_entry == NULL) { return; } if (hash_entry->uuid == NULL) { const char *key = crm_element_value(msg, F_ATTRD_KEY); if (key) { hash_entry->uuid = strdup(key); } } crm_debug("Supplied: %s, Current: %s, Stored: %s", value, hash_entry->value, hash_entry->stored_value); if (safe_str_eq(value, hash_entry->value) && safe_str_eq(value, hash_entry->stored_value)) { crm_trace("Ignoring non-change"); return; } else if (value) { int offset = 1; int int_value = 0; int value_len = strlen(value); if (value_len < (plus_plus_len + 2) || value[plus_plus_len] != '+' || (value[plus_plus_len + 1] != '+' && value[plus_plus_len + 1] != '=')) { goto set_unexpanded; } int_value = char2score(hash_entry->value); if (value[plus_plus_len + 1] != '+') { const char *offset_s = value + (plus_plus_len + 2); offset = char2score(offset_s); } int_value += offset; if (int_value > INFINITY) { int_value = INFINITY; } crm_info("Expanded %s=%s to %d", attr, value, int_value); crm_xml_add_int(msg, F_ATTRD_VALUE, int_value); value = crm_element_value(msg, F_ATTRD_VALUE); } set_unexpanded: if (safe_str_eq(value, hash_entry->value) && hash_entry->timer_id) { /* We're already waiting to set this value */ return; } free(hash_entry->value); hash_entry->value = NULL; if (value != NULL) { hash_entry->value = strdup(value); crm_debug("New value of %s is %s", attr, value); } stop_attrd_timer(hash_entry); if (hash_entry->timeout > 0) { hash_entry->timer_id = g_timeout_add(hash_entry->timeout, attrd_timer_callback, hash_entry); } else { attrd_trigger_update(hash_entry); } return; } gboolean attrd_timer_callback(void *user_data) { stop_attrd_timer(user_data); attrd_trigger_update(user_data); return TRUE; /* Always return true, removed cleanly by stop_attrd_timer() */ } gboolean attrd_trigger_update(attr_hash_entry_t * hash_entry) { xmlNode *msg = NULL; /* send HA message to everyone */ crm_notice("Sending flush op to all hosts for: %s (%s)", hash_entry->id, crm_str(hash_entry->value)); log_hash_entry(LOG_DEBUG_2, hash_entry, "Sending flush op to all hosts for:"); msg = create_xml_node(NULL, __FUNCTION__); crm_xml_add(msg, F_TYPE, T_ATTRD); crm_xml_add(msg, F_ORIG, attrd_uname); crm_xml_add(msg, F_ATTRD_TASK, "flush"); crm_xml_add(msg, F_ATTRD_ATTRIBUTE, hash_entry->id); crm_xml_add(msg, F_ATTRD_SET, hash_entry->set); crm_xml_add(msg, F_ATTRD_SECTION, hash_entry->section); crm_xml_add(msg, F_ATTRD_DAMPEN, hash_entry->dampen); crm_xml_add(msg, F_ATTRD_VALUE, hash_entry->value); #if ENABLE_ACL if (hash_entry->user) { crm_xml_add(msg, F_ATTRD_USER, hash_entry->user); } #endif if (hash_entry->timeout <= 0) { crm_xml_add(msg, F_ATTRD_IGNORE_LOCALLY, hash_entry->value); attrd_perform_update(hash_entry); } send_cluster_message(NULL, crm_msg_attrd, msg, FALSE); free_xml(msg); return TRUE; } diff --git a/tools/crm_node.c b/tools/crm_node.c index 564f0bb0ff..b968abd845 100644 --- a/tools/crm_node.c +++ b/tools/crm_node.c @@ -1,781 +1,781 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include /* for basename() */ #include #include #include #include #include int command = 0; int ccm_fd = 0; gboolean do_quiet = FALSE; char *target_uuid = NULL; char *target_uname = NULL; const char *standby_value = NULL; const char *standby_scope = NULL; /* *INDENT-OFF* */ static struct crm_option long_options[] = { /* Top-level Options */ {"help", 0, 0, '?', "\tThis text"}, {"version", 0, 0, '$', "\tVersion information" }, {"verbose", 0, 0, 'V', "\tIncrease debug output"}, {"quiet", 0, 0, 'Q', "\tEssential output only"}, {"-spacer-", 1, 0, '-', "\nStack:"}, #if SUPPORT_CMAN {"cman", 0, 0, 'c', "\tOnly try connecting to a cman-based cluster"}, #endif #if SUPPORT_COROSYNC {"openais", 0, 0, 'A', "\tOnly try connecting to an OpenAIS-based cluster"}, #endif #ifdef SUPPORT_HEARTBEAT {"heartbeat", 0, 0, 'H', "Only try connecting to a Heartbeat-based cluster"}, #endif {"-spacer-", 1, 0, '-', "\nCommands:"}, {"epoch", 0, 0, 'e', "\tDisplay the epoch during which this node joined the cluster"}, {"quorum", 0, 0, 'q', "\tDisplay a 1 if our partition has quorum, 0 if not"}, {"list", 0, 0, 'l', "\tDisplay all known members (past and present) of this cluster (Not available for heartbeat clusters)"}, {"partition", 0, 0, 'p', "Display the members of this partition"}, {"cluster-id", 0, 0, 'i', "Display this node's cluster id"}, {"remove", 1, 0, 'R', "(Advanced, AIS-Only) Remove the (stopped) node with the specified nodeid from the cluster"}, {"-spacer-", 1, 0, '-', "\nAdditional Options:"}, {"force", 0, 0, 'f'}, {0, 0, 0, 0} }; /* *INDENT-ON* */ -int local_id = 0; - #if SUPPORT_HEARTBEAT # include # include # include # define UUID_LEN 16 oc_ev_t *ccm_token = NULL; static void *ccm_library = NULL; void oc_ev_special(const oc_ev_t *, oc_ev_class_t, int); static gboolean read_local_hb_uuid(void) { cl_uuid_t uuid; char *buffer = NULL; long start = 0, read_len = 0; FILE *input = fopen(UUID_FILE, "r"); if (input == NULL) { crm_info("Could not open UUID file %s\n", UUID_FILE); return FALSE; } /* see how big the file is */ start = ftell(input); fseek(input, 0L, SEEK_END); if (UUID_LEN != ftell(input)) { fprintf(stderr, "%s must contain exactly %d bytes\n", UUID_FILE, UUID_LEN); abort(); } fseek(input, 0L, start); if (start != ftell(input)) { fprintf(stderr, "fseek not behaving: %ld vs. %ld\n", start, ftell(input)); exit(2); } buffer = malloc(50); read_len = fread(uuid.uuid, 1, UUID_LEN, input); fclose(input); if (read_len != UUID_LEN) { fprintf(stderr, "Expected and read bytes differ: %d vs. %ld\n", UUID_LEN, read_len); exit(3); } else if (buffer != NULL) { cl_uuid_unparse(&uuid, buffer); fprintf(stdout, "%s\n", buffer); return TRUE; } else { fprintf(stderr, "No buffer to unparse\n"); exit(4); } free(buffer); return FALSE; } static void ccm_age_callback(oc_ed_t event, void *cookie, size_t size, const void *data) { int lpc; int node_list_size; const oc_ev_membership_t *oc = (const oc_ev_membership_t *)data; int (*ccm_api_callback_done) (void *cookie) = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_callback_done", 1); node_list_size = oc->m_n_member; if (command == 'q') { crm_debug("Processing \"%s\" event.", event == OC_EV_MS_NEW_MEMBERSHIP ? "NEW MEMBERSHIP" : event == OC_EV_MS_NOT_PRIMARY ? "NOT PRIMARY" : event == OC_EV_MS_PRIMARY_RESTORED ? "PRIMARY RESTORED" : event == OC_EV_MS_EVICTED ? "EVICTED" : "NO QUORUM MEMBERSHIP"); if (ccm_have_quorum(event)) { fprintf(stdout, "1\n"); } else { fprintf(stdout, "0\n"); } } else if (command == 'e') { crm_debug("Searching %d members for our birth", oc->m_n_member); } for (lpc = 0; lpc < node_list_size; lpc++) { if (command == 'p') { fprintf(stdout, "%s ", oc->m_array[oc->m_memb_idx + lpc].node_uname); } else if (command == 'e') { int (*ccm_api_is_my_nodeid) (const oc_ev_t * token, const oc_node_t * node) = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_is_my_nodeid", 1); if ((*ccm_api_is_my_nodeid) (ccm_token, &(oc->m_array[lpc]))) { crm_debug("MATCH: nodeid=%d, uname=%s, born=%d", oc->m_array[oc->m_memb_idx + lpc].node_id, oc->m_array[oc->m_memb_idx + lpc].node_uname, oc->m_array[oc->m_memb_idx + lpc].node_born_on); fprintf(stdout, "%d\n", oc->m_array[oc->m_memb_idx + lpc].node_born_on); } } } (*ccm_api_callback_done) (cookie); if (command == 'p') { fprintf(stdout, "\n"); } fflush(stdout); exit(0); } static gboolean ccm_age_connect(int *ccm_fd) { gboolean did_fail = FALSE; int ret = 0; int (*ccm_api_register) (oc_ev_t ** token) = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_register", 1); int (*ccm_api_set_callback) (const oc_ev_t * token, oc_ev_class_t class, oc_ev_callback_t * fn, oc_ev_callback_t ** prev_fn) = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_set_callback", 1); void (*ccm_api_special) (const oc_ev_t *, oc_ev_class_t, int) = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_special", 1); int (*ccm_api_activate) (const oc_ev_t * token, int *fd) = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_activate", 1); crm_debug("Registering with CCM"); ret = (*ccm_api_register) (&ccm_token); if (ret != 0) { crm_info("CCM registration failed: %d", ret); did_fail = TRUE; } if (did_fail == FALSE) { crm_debug("Setting up CCM callbacks"); ret = (*ccm_api_set_callback) (ccm_token, OC_EV_MEMB_CLASS, ccm_age_callback, NULL); if (ret != 0) { crm_warn("CCM callback not set: %d", ret); did_fail = TRUE; } } if (did_fail == FALSE) { (*ccm_api_special) (ccm_token, OC_EV_MEMB_CLASS, 0 /*don't care */ ); crm_debug("Activating CCM token"); ret = (*ccm_api_activate) (ccm_token, ccm_fd); if (ret != 0) { crm_warn("CCM Activation failed: %d", ret); did_fail = TRUE; } } return !did_fail; } static gboolean try_heartbeat(int command, enum cluster_type_e stack) { crm_debug("Attempting to process %c command", command); if (command == 'i') { if (read_local_hb_uuid()) { exit(0); } } else if (ccm_age_connect(&ccm_fd)) { int rc = 0; fd_set rset; int (*ccm_api_handle_event) (const oc_ev_t * token) = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_handle_event", 1); while (1) { sleep(1); FD_ZERO(&rset); FD_SET(ccm_fd, &rset); errno = 0; rc = select(ccm_fd + 1, &rset, NULL, NULL, NULL); if (rc > 0 && (*ccm_api_handle_event) (ccm_token) != 0) { crm_err("oc_ev_handle_event failed"); return FALSE; } else if (rc < 0 && errno != EINTR) { crm_perror(LOG_ERR, "select failed: %d", rc); return FALSE; } } } return FALSE; } #endif #if SUPPORT_CMAN # include # define MAX_NODES 256 static gboolean try_cman(int command, enum cluster_type_e stack) { int rc = -1, lpc = 0, node_count = 0; cman_node_t node; cman_cluster_t cluster; cman_handle_t cman_handle = NULL; cman_node_t cman_nodes[MAX_NODES]; memset(&cluster, 0, sizeof(cluster)); cman_handle = cman_init(NULL); if (cman_handle == NULL || cman_is_active(cman_handle) == FALSE) { crm_info("Couldn't connect to cman"); return FALSE; } switch (command) { case 'R': fprintf(stderr, "Node removal not supported for cman based clusters\n"); exit(-EPROTONOSUPPORT); break; case 'e': /* Age makes no sense (yet?) in a cman cluster */ fprintf(stdout, "1\n"); break; case 'q': fprintf(stdout, "%d\n", cman_is_quorate(cman_handle)); break; case 'l': case 'p': rc = cman_get_nodes(cman_handle, MAX_NODES, &node_count, cman_nodes); if (rc != 0) { fprintf(stderr, "Couldn't query cman node list: %d %d", rc, errno); goto cman_bail; } for (lpc = 0; lpc < node_count; lpc++) { if (command == 'l') { printf("%s ", cman_nodes[lpc].cn_name); } else if (cman_nodes[lpc].cn_nodeid != 0 && cman_nodes[lpc].cn_member) { /* Never allow node ID 0 to be considered a member #315711 */ printf("%s ", cman_nodes[lpc].cn_name); } } printf("\n"); break; case 'i': rc = cman_get_node(cman_handle, CMAN_NODEID_US, &node); if (rc != 0) { fprintf(stderr, "Couldn't query cman node id: %d %d", rc, errno); goto cman_bail; } fprintf(stdout, "%u\n", node.cn_nodeid); break; default: fprintf(stderr, "Unknown option '%c'\n", command); crm_help('?', EX_USAGE); } cman_finish(cman_handle); exit(0); cman_bail: cman_finish(cman_handle); exit(EX_USAGE); } #endif #if HAVE_CONFDB static void ais_membership_destroy(gpointer user_data) { crm_err("AIS connection terminated"); ais_fd_sync = -1; exit(1); } static gint member_sort(gconstpointer a, gconstpointer b) { const crm_node_t *node_a = a; const crm_node_t *node_b = b; return strcmp(node_a->uname, node_b->uname); } static void crm_add_member(gpointer key, gpointer value, gpointer user_data) { GList **list = user_data; crm_node_t *node = value; if (node->uname != NULL) { *list = g_list_insert_sorted(*list, node, member_sort); } } static gboolean -ais_membership_dispatch(AIS_Message * wrapper, char *data, int sender) +ais_membership_dispatch(int kind, const char *from, const char *data) { - switch (wrapper->header.id) { + switch (kind) { case crm_class_members: case crm_class_notify: case crm_class_quorum: break; default: return TRUE; break; } if (command == 'q') { if (crm_have_quorum) { fprintf(stdout, "1\n"); } else { fprintf(stdout, "0\n"); } } else if (command == 'l') { GList *nodes = NULL; GListPtr lpc = NULL; g_hash_table_foreach(crm_peer_cache, crm_add_member, &nodes); for (lpc = nodes; lpc != NULL; lpc = lpc->next) { crm_node_t *node = (crm_node_t *) lpc->data; fprintf(stdout, "%u %s %s\n", node->id, node->uname, node->state); } fprintf(stdout, "\n"); } else if (command == 'p') { GList *nodes = NULL; GListPtr lpc = NULL; g_hash_table_foreach(crm_peer_cache, crm_add_member, &nodes); for (lpc = nodes; lpc != NULL; lpc = lpc->next) { crm_node_t *node = (crm_node_t *) lpc->data; if (node->uname && safe_str_eq(node->state, CRM_NODE_MEMBER)) { fprintf(stdout, "%s ", node->uname); } } fprintf(stdout, "\n"); } exit(0); return TRUE; } #endif #ifdef SUPPORT_CS_QUORUM # include # include static int node_mcp_dispatch(const char *buffer, ssize_t length, gpointer userdata) { xmlNode *msg = string2xml(buffer); if (msg) { xmlNode *node = NULL; crm_log_xml_trace(msg, "message"); for (node = __xml_first_child(msg); node != NULL; node = __xml_next(node)) { const char *uname = crm_element_value(node, "uname"); if (command == 'l') { int id = 0; crm_element_value_int(node, "id", &id); fprintf(stdout, "%u %s\n", id, uname); } else if (command == 'p') { fprintf(stdout, "%s ", uname); } } free_xml(msg); if (command == 'p') { fprintf(stdout, "\n"); } exit(0); } return 0; } static void node_mcp_destroy(gpointer user_data) { exit(1); } static int crmd_remove_node_cache(int id) { int rc = -1; char *admin_uuid = NULL; crm_ipc_t *conn = crm_ipc_new(CRM_SYSTEM_CRMD, 0); xmlNode *cmd = NULL; xmlNode *hello = NULL; xmlNode *msg_data = NULL; if (!conn) { goto rm_node_cleanup; } if (!crm_ipc_connect(conn)) { goto rm_node_cleanup; } admin_uuid = calloc(1, 11); snprintf(admin_uuid, 10, "%d", getpid()); admin_uuid[10] = '\0'; hello = create_hello_message(admin_uuid, "crm_node", "0", "1"); rc = crm_ipc_send(conn, hello, 0, 0, NULL); if (rc < 0) { goto rm_node_cleanup; } msg_data = create_xml_node(NULL, XML_TAG_OPTIONS); crm_xml_add_int(msg_data, XML_ATTR_ID, id); cmd = create_request(CRM_OP_RM_NODE_CACHE, msg_data, NULL, CRM_SYSTEM_CRMD, "crm_node", admin_uuid); rc = crm_ipc_send(conn, cmd, 0, 0, NULL); rm_node_cleanup: if (conn) { crm_ipc_close(conn); crm_ipc_destroy(conn); } free_xml(cmd); free_xml(hello); free(admin_uuid); return rc > 0 ? 0 : rc; } static gboolean try_corosync(int command, enum cluster_type_e stack) { int rc = 0; int quorate = 0; uint32_t quorum_type = 0; unsigned int nodeid = 0; cpg_handle_t c_handle = 0; quorum_handle_t q_handle = 0; mainloop_io_t *ipc = NULL; GMainLoop *amainloop = NULL; struct ipc_client_callbacks node_callbacks = { .dispatch = node_mcp_dispatch, .destroy = node_mcp_destroy }; switch (command) { case 'R': if (crmd_remove_node_cache(atoi(target_uname))) { crm_err("Failed to connect to crmd to remove node id %s", target_uname); } break; case 'e': /* Age makes no sense (yet) in an AIS cluster */ fprintf(stdout, "1\n"); exit(0); case 'q': /* Go direct to the Quorum API */ rc = quorum_initialize(&q_handle, NULL, &quorum_type); if (rc != CS_OK) { crm_err("Could not connect to the Quorum API: %d\n", rc); return FALSE; } rc = quorum_getquorate(q_handle, &quorate); if (rc != CS_OK) { crm_err("Could not obtain the current Quorum API state: %d\n", rc); return FALSE; } if (quorate) { fprintf(stdout, "1\n"); } else { fprintf(stdout, "0\n"); } quorum_finalize(q_handle); exit(0); case 'i': /* Go direct to the CPG API */ rc = cpg_initialize(&c_handle, NULL); if (rc != CS_OK) { crm_err("Could not connect to the Cluster Process Group API: %d\n", rc); return FALSE; } rc = cpg_local_get(c_handle, &nodeid); if (rc != CS_OK) { crm_err("Could not get local node id from the CPG API"); return FALSE; } fprintf(stdout, "%u\n", nodeid); cpg_finalize(c_handle); exit(0); case 'l': case 'p': /* Go to pacemakerd */ amainloop = g_main_new(FALSE); ipc = mainloop_add_ipc_client(CRM_SYSTEM_MCP, G_PRIORITY_DEFAULT, 0, NULL, &node_callbacks); if(ipc != NULL) { xmlNode *poke = create_xml_node(NULL, "poke"); crm_ipc_send(mainloop_get_ipc_client(ipc), poke, 0, 0, NULL); free_xml(poke); g_main_run(amainloop); } break; } return FALSE; } #endif #if HAVE_CONFDB static gboolean try_openais(int command, enum cluster_type_e stack) { - if (init_ais_connection_once - (ais_membership_dispatch, ais_membership_destroy, NULL, NULL, &local_id)) { + static crm_cluster_t cluster; + cluster.destroy = ais_membership_destroy; + cluster.cs_dispatch = ais_membership_dispatch; + + if (init_cs_connection_once(&cluster)) { GMainLoop *amainloop = NULL; - switch (command) { case 'R': send_ais_text(crm_class_rmpeer, target_uname, TRUE, NULL, crm_msg_ais); exit(0); case 'e': /* Age makes no sense (yet) in an AIS cluster */ fprintf(stdout, "1\n"); exit(0); case 'q': send_ais_text(crm_class_quorum, NULL, TRUE, NULL, crm_msg_ais); break; case 'l': case 'p': crm_info("Requesting the list of configured nodes"); send_ais_text(crm_class_members, __FUNCTION__, TRUE, NULL, crm_msg_ais); break; case 'i': - printf("%u\n", local_id); + printf("%u\n", cluster.nodeid); exit(0); default: fprintf(stderr, "Unknown option '%c'\n", command); crm_help('?', EX_USAGE); } amainloop = g_main_new(FALSE); g_main_run(amainloop); } return FALSE; } #endif int set_cluster_type(enum cluster_type_e type); int main(int argc, char **argv) { int flag = 0; int argerr = 0; gboolean force_flag = FALSE; gboolean dangerous_cmd = FALSE; enum cluster_type_e try_stack = pcmk_cluster_unknown; int option_index = 0; crm_peer_init(); crm_log_cli_init("crm_node"); crm_set_options(NULL, "command [options]", long_options, "Tool for displaying low-level node information"); while (flag >= 0) { flag = crm_get_option(argc, argv, &option_index); switch (flag) { case -1: break; case 'V': crm_bump_log_level(argc, argv); break; case '$': case '?': crm_help(flag, EX_OK); break; case 'Q': do_quiet = TRUE; break; case 'H': set_cluster_type(pcmk_cluster_heartbeat); break; case 'A': set_cluster_type(pcmk_cluster_classic_ais); break; case 'C': set_cluster_type(pcmk_cluster_corosync); break; case 'c': set_cluster_type(pcmk_cluster_cman); break; case 'f': force_flag = TRUE; break; case 'R': dangerous_cmd = TRUE; command = flag; target_uname = optarg; break; case 'p': case 'e': case 'q': case 'i': case 'l': command = flag; break; default: ++argerr; break; } } if (optind > argc) { ++argerr; } if (argerr) { crm_help('?', EX_USAGE); } if (dangerous_cmd && force_flag == FALSE) { fprintf(stderr, "The supplied command is considered dangerous." " To prevent accidental destruction of the cluster," " the --force flag is required in order to proceed.\n"); fflush(stderr); exit(EX_USAGE); } try_stack = get_cluster_type(); crm_debug("Attempting to process -%c command for cluster type: %s", command, name_for_cluster_type(try_stack)); #if SUPPORT_CMAN if (try_stack == pcmk_cluster_cman) { try_cman(command, try_stack); } #endif #ifdef SUPPORT_CS_QUORUM if (try_stack == pcmk_cluster_corosync) { try_corosync(command, try_stack); } #endif #if HAVE_CONFDB /* Only an option if we're using the plugins */ if (try_stack == pcmk_cluster_classic_ais) { try_openais(command, try_stack); } #endif #if SUPPORT_HEARTBEAT if (try_stack == pcmk_cluster_heartbeat) { try_heartbeat(command, try_stack); } #endif return (1); }