diff --git a/cib/callbacks.c b/cib/callbacks.c index c28bd9fadd..daa736a927 100644 --- a/cib/callbacks.c +++ b/cib/callbacks.c @@ -1,1421 +1,1419 @@ -/* +/* * Copyright (C) 2004 Andrew Beekhof - * + * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. - * + * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. - * + * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include #include #include -#include -#include #include #include #include #include #include #include #include "common.h" extern GMainLoop *mainloop; extern gboolean cib_shutdown_flag; extern gboolean stand_alone; extern const char *cib_root; static unsigned long cib_local_bcast_num = 0; typedef struct cib_local_notify_s { xmlNode *notify_src; char *client_id; gboolean from_peer; gboolean sync_reply; } cib_local_notify_t; qb_ipcs_service_t *ipcs_ro = NULL; qb_ipcs_service_t *ipcs_rw = NULL; qb_ipcs_service_t *ipcs_shm = NULL; extern crm_cluster_t crm_cluster; extern int cib_update_counter(xmlNode * xml_obj, const char *field, gboolean reset); extern void GHFunc_count_peers(gpointer key, gpointer value, gpointer user_data); gint cib_GCompareFunc(gconstpointer a, gconstpointer b); gboolean can_write(int flags); void send_cib_replace(const xmlNode * sync_request, const char *host); void cib_process_request(xmlNode * request, gboolean privileged, gboolean force_synchronous, gboolean from_peer, crm_client_t * cib_client); extern GHashTable *local_notify_queue; int next_client_id = 0; extern const char *cib_our_uname; extern unsigned long cib_num_ops, cib_num_local, cib_num_updates, cib_num_fail; extern unsigned long cib_bad_connects, cib_num_timeouts; extern int cib_status; int cib_process_command(xmlNode * request, xmlNode ** reply, xmlNode ** cib_diff, gboolean privileged); gboolean cib_common_callback(qb_ipcs_connection_t *c, void *data, size_t size, gboolean privileged); static int32_t cib_ipc_accept(qb_ipcs_connection_t *c, uid_t uid, gid_t gid) { crm_trace("Connection %p", c); if (cib_shutdown_flag) { crm_info("Ignoring new client [%d] during shutdown", crm_ipcs_client_pid(c)); return -EPERM; } if(crm_client_new(c, uid, gid) == NULL) { return -EIO; } return 0; } static void cib_ipc_created(qb_ipcs_connection_t *c) { crm_trace("Connection %p", c); } static int32_t cib_ipc_dispatch_rw(qb_ipcs_connection_t *c, void *data, size_t size) { crm_client_t *client = crm_client_get(c); crm_trace("%p message from %s", c, client->id); return cib_common_callback(c, data, size, TRUE); } static int32_t cib_ipc_dispatch_ro(qb_ipcs_connection_t *c, void *data, size_t size) { crm_client_t *client = crm_client_get(c); crm_trace("%p message from %s", c, client->id); return cib_common_callback(c, data, size, FALSE); } /* Error code means? */ static int32_t -cib_ipc_closed(qb_ipcs_connection_t *c) +cib_ipc_closed(qb_ipcs_connection_t *c) { crm_client_t *client = crm_client_get(c); crm_trace("Connection %p", c); crm_client_destroy(client); return 0; } static void -cib_ipc_destroy(qb_ipcs_connection_t *c) +cib_ipc_destroy(qb_ipcs_connection_t *c) { crm_trace("Connection %p", c); if (cib_shutdown_flag) { cib_shutdown(0); } } -struct qb_ipcs_service_handlers ipc_ro_callbacks = +struct qb_ipcs_service_handlers ipc_ro_callbacks = { .connection_accept = cib_ipc_accept, .connection_created = cib_ipc_created, .msg_process = cib_ipc_dispatch_ro, .connection_closed = cib_ipc_closed, .connection_destroyed = cib_ipc_destroy }; -struct qb_ipcs_service_handlers ipc_rw_callbacks = +struct qb_ipcs_service_handlers ipc_rw_callbacks = { .connection_accept = cib_ipc_accept, .connection_created = cib_ipc_created, .msg_process = cib_ipc_dispatch_rw, .connection_closed = cib_ipc_closed, .connection_destroyed = cib_ipc_destroy }; void cib_common_callback_worker(uint32_t id, uint32_t flags, xmlNode * op_request, crm_client_t * cib_client, gboolean privileged) { const char *op = crm_element_value(op_request, F_CIB_OPERATION); if (crm_str_eq(op, CRM_OP_REGISTER, TRUE)) { if(flags & crm_ipc_client_response) { xmlNode *ack = create_xml_node(NULL, __FUNCTION__); crm_xml_add(ack, F_CIB_OPERATION, CRM_OP_REGISTER); crm_xml_add(ack, F_CIB_CLIENTID, cib_client->id); crm_ipcs_send(cib_client, id, ack, FALSE); cib_client->request_id = 0; free_xml(ack); } return; } else if (crm_str_eq(op, T_CIB_NOTIFY, TRUE)) { /* Update the notify filters for this client */ int on_off = 0; long long bit = 0; const char *type = crm_element_value(op_request, F_CIB_NOTIFY_TYPE); crm_element_value_int(op_request, F_CIB_NOTIFY_ACTIVATE, &on_off); crm_debug("Setting %s callbacks for %s (%s): %s", type, cib_client->name, cib_client->id, on_off ? "on" : "off"); if (safe_str_eq(type, T_CIB_POST_NOTIFY)) { bit = cib_notify_post; } else if (safe_str_eq(type, T_CIB_PRE_NOTIFY)) { bit = cib_notify_pre; } else if (safe_str_eq(type, T_CIB_UPDATE_CONFIRM)) { bit = cib_notify_confirm; } else if (safe_str_eq(type, T_CIB_DIFF_NOTIFY)) { bit = cib_notify_diff; } else if (safe_str_eq(type, T_CIB_REPLACE_NOTIFY)) { bit = cib_notify_replace; } if(on_off) { set_bit(cib_client->options, bit); } else { clear_bit(cib_client->options, bit); } if(flags & crm_ipc_client_response) { /* TODO - include rc */ crm_ipcs_send_ack(cib_client, id, "ack", __FUNCTION__, __LINE__); cib_client->request_id = 0; } return; } cib_process_request(op_request, FALSE, privileged, FALSE, cib_client); } int32_t cib_common_callback(qb_ipcs_connection_t *c, void *data, size_t size, gboolean privileged) { uint32_t id = 0; uint32_t flags = 0; int call_options = 0; crm_client_t *cib_client = crm_client_get(c); xmlNode *op_request = crm_ipcs_recv(cib_client, data, size, &id, &flags); if(op_request) { crm_element_value_int(op_request, F_CIB_CALLOPTS, &call_options); } crm_trace("Inbound: %.200s", data); if (op_request == NULL || cib_client == NULL) { crm_ipcs_send_ack(cib_client, id, "nack", __FUNCTION__, __LINE__); return 0; } if(is_set(call_options, cib_sync_call)) { CRM_ASSERT(flags & crm_ipc_client_response); } if(flags & crm_ipc_client_response) { CRM_LOG_ASSERT(cib_client->request_id == 0); /* This means the client has two synchronous events in-flight */ cib_client->request_id = id; /* Reply only to the last one */ } if (cib_client->name == NULL) { const char *value = crm_element_value(op_request, F_CIB_CLIENTNAME); if (value == NULL) { cib_client->name = crm_itoa(cib_client->pid); } else { cib_client->name = strdup(value); } } crm_xml_add(op_request, F_CIB_CLIENTID, cib_client->id); crm_xml_add(op_request, F_CIB_CLIENTNAME, cib_client->name); #if ENABLE_ACL determine_request_user(cib_client->user, op_request, F_CIB_USER); #endif crm_log_xml_trace(op_request, "Client[inbound]"); cib_common_callback_worker(id, flags, op_request, cib_client, privileged); free_xml(op_request); return 0; } static void do_local_notify(xmlNode * notify_src, const char *client_id, gboolean sync_reply, gboolean from_peer) { /* send callback to originating child */ crm_client_t *client_obj = NULL; int local_rc = pcmk_ok; if (client_id != NULL) { client_obj = crm_client_get_by_id(client_id); } if (client_obj == NULL) { local_rc = -ECONNRESET; crm_trace("No client to sent the response to. F_CIB_CLIENTID not set."); } else { int rid = 0; if(sync_reply) { if (client_obj->ipcs) { CRM_LOG_ASSERT(client_obj->request_id); rid = client_obj->request_id; client_obj->request_id = 0; crm_trace("Sending response %d to %s %s", rid, client_obj->name, from_peer?"(originator of delegated request)":""); } else { crm_trace("Sending response to %s %s", client_obj->name, from_peer?"(originator of delegated request)":""); } } else { crm_trace("Sending an event to %s %s", client_obj->name, from_peer?"(originator of delegated request)":""); } switch(client_obj->kind) { case CRM_CLIENT_IPC: if (crm_ipcs_send(client_obj, rid, notify_src, !sync_reply) < 0) { local_rc = -ENOMSG; } break; # ifdef HAVE_GNUTLS_GNUTLS_H case CRM_CLIENT_TLS: # endif case CRM_CLIENT_TCP: crm_remote_send(client_obj->remote, notify_src); break; default: crm_err("Unknown transport %d for %s", client_obj->kind, client_obj->name); } } if (local_rc != pcmk_ok && client_obj != NULL) { crm_warn("%sSync reply to %s failed: %s", sync_reply ? "" : "A-", client_obj ? client_obj->name : "", pcmk_strerror(local_rc)); } } static void local_notify_destroy_callback(gpointer data) { cib_local_notify_t *notify = data; free_xml(notify->notify_src); free(notify->client_id); free(notify); } static void check_local_notify(int bcast_id) { cib_local_notify_t *notify = NULL; if (!local_notify_queue) { return; } notify = g_hash_table_lookup(local_notify_queue, GINT_TO_POINTER(bcast_id)); if (notify) { do_local_notify(notify->notify_src, notify->client_id, notify->sync_reply, notify->from_peer); g_hash_table_remove(local_notify_queue, GINT_TO_POINTER(bcast_id)); } } static void queue_local_notify(xmlNode * notify_src, const char *client_id, gboolean sync_reply, gboolean from_peer) { cib_local_notify_t *notify = calloc(1, sizeof(cib_local_notify_t)); notify->notify_src = notify_src; notify->client_id = strdup(client_id); notify->sync_reply = sync_reply; notify->from_peer = from_peer; if (!local_notify_queue) { local_notify_queue = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, local_notify_destroy_callback); } g_hash_table_insert(local_notify_queue, GINT_TO_POINTER(cib_local_bcast_num), notify); } static void parse_local_options(crm_client_t * cib_client, int call_type, int call_options, const char *host, const char *op, gboolean * local_notify, gboolean * needs_reply, gboolean * process, gboolean * needs_forward) { if (cib_op_modifies(call_type) && !(call_options & cib_inhibit_bcast)) { /* we need to send an update anyway */ *needs_reply = TRUE; } else { *needs_reply = FALSE; } if (host == NULL && (call_options & cib_scope_local)) { crm_trace("Processing locally scoped %s op from %s", op, cib_client->name); *local_notify = TRUE; } else if (host == NULL && cib_is_master) { crm_trace("Processing master %s op locally from %s", op, cib_client->name); *local_notify = TRUE; } else if (safe_str_eq(host, cib_our_uname)) { crm_trace("Processing locally addressed %s op from %s", op, cib_client->name); *local_notify = TRUE; } else if (stand_alone) { *needs_forward = FALSE; *local_notify = TRUE; *process = TRUE; } else { crm_trace("%s op from %s needs to be forwarded to %s", op, cib_client->name, host ? host : "the master instance"); *needs_forward = TRUE; *process = FALSE; } } static gboolean parse_peer_options(int call_type, xmlNode * request, gboolean * local_notify, gboolean * needs_reply, gboolean * process, gboolean * needs_forward) { const char *op = NULL; const char *host = NULL; const char *delegated = NULL; const char *originator = crm_element_value(request, F_ORIG); const char *reply_to = crm_element_value(request, F_CIB_ISREPLY); const char *update = crm_element_value(request, F_CIB_GLOBAL_UPDATE); gboolean is_reply = safe_str_eq(reply_to, cib_our_uname); if (crm_is_true(update)) { *needs_reply = FALSE; if (is_reply) { *local_notify = TRUE; crm_trace("Processing global/peer update from %s" " that originated from us", originator); } else { crm_trace("Processing global/peer update from %s", originator); } return TRUE; } host = crm_element_value(request, F_CIB_HOST); if (host != NULL && safe_str_eq(host, cib_our_uname)) { crm_trace("Processing request sent to us from %s", originator); return TRUE; } else if (host == NULL && cib_is_master == TRUE) { crm_trace("Processing request sent to master instance from %s", originator); return TRUE; } op = crm_element_value(request, F_CIB_OPERATION); if(safe_str_eq(op, "cib_shutdown_req")) { /* Always process these */ *local_notify = FALSE; if(reply_to == NULL || is_reply) { *process = TRUE; } if(is_reply) { *needs_reply = FALSE; } return *process; } if (is_reply) { crm_trace("Forward reply sent from %s to local clients", originator); *process = FALSE; *needs_reply = FALSE; *local_notify = TRUE; return TRUE; } delegated = crm_element_value(request, F_CIB_DELEGATED); if (delegated != NULL) { crm_trace("Ignoring msg for master instance"); } else if (host != NULL) { /* this is for a specific instance and we're not it */ crm_trace("Ignoring msg for instance on %s", crm_str(host)); } else if (reply_to == NULL && cib_is_master == FALSE) { /* this is for the master instance and we're not it */ crm_trace("Ignoring reply to %s", crm_str(reply_to)); } else if (safe_str_eq(op, "cib_shutdown_req")) { if (reply_to != NULL) { crm_debug("Processing %s from %s", op, host); *needs_reply = FALSE; } else { crm_debug("Processing %s reply from %s", op, host); } return TRUE; } else { crm_err("Nothing for us to do?"); crm_log_xml_err(request, "Peer[inbound]"); } return FALSE; } static void forward_request(xmlNode * request, crm_client_t * cib_client, int call_options) { const char *op = crm_element_value(request, F_CIB_OPERATION); const char *host = crm_element_value(request, F_CIB_HOST); crm_xml_add(request, F_CIB_DELEGATED, cib_our_uname); if (host != NULL) { crm_trace("Forwarding %s op to %s", op, host); send_cluster_message(crm_get_peer(0, host), crm_msg_cib, request, FALSE); } else { crm_trace("Forwarding %s op to master instance", op); send_cluster_message(NULL, crm_msg_cib, request, FALSE); } /* Return the request to its original state */ xml_remove_prop(request, F_CIB_DELEGATED); if (call_options & cib_discard_reply) { crm_trace("Client not interested in reply"); } } static gboolean send_peer_reply(xmlNode * msg, xmlNode * result_diff, const char *originator, gboolean broadcast) { CRM_ASSERT(msg != NULL); if (broadcast) { /* this (successful) call modified the CIB _and_ the * change needs to be broadcast... * send via HA to other nodes */ int diff_add_updates = 0; int diff_add_epoch = 0; int diff_add_admin_epoch = 0; int diff_del_updates = 0; int diff_del_epoch = 0; int diff_del_admin_epoch = 0; const char *digest = NULL; CRM_LOG_ASSERT(result_diff != NULL); digest = crm_element_value(result_diff, XML_ATTR_DIGEST); cib_diff_version_details(result_diff, &diff_add_admin_epoch, &diff_add_epoch, &diff_add_updates, &diff_del_admin_epoch, &diff_del_epoch, &diff_del_updates); crm_trace("Sending update diff %d.%d.%d -> %d.%d.%d %s", diff_del_admin_epoch, diff_del_epoch, diff_del_updates, diff_add_admin_epoch, diff_add_epoch, diff_add_updates, digest); crm_xml_add(msg, F_CIB_ISREPLY, originator); crm_xml_add(msg, F_CIB_GLOBAL_UPDATE, XML_BOOLEAN_TRUE); crm_xml_add(msg, F_CIB_OPERATION, CIB_OP_APPLY_DIFF); CRM_ASSERT(digest != NULL); add_message_xml(msg, F_CIB_UPDATE_DIFF, result_diff); crm_log_xml_explicit(msg, "copy"); return send_cluster_message(NULL, crm_msg_cib, msg, TRUE); } else if (originator != NULL) { /* send reply via HA to originating node */ crm_trace("Sending request result to originator only"); crm_xml_add(msg, F_CIB_ISREPLY, originator); return send_cluster_message(crm_get_peer(0, originator), crm_msg_cib, msg, FALSE); } return FALSE; } void cib_process_request(xmlNode * request, gboolean force_synchronous, gboolean privileged, gboolean unused, crm_client_t * cib_client) { int call_type = 0; int call_options = 0; gboolean process = TRUE; gboolean is_update = TRUE; gboolean from_peer = TRUE; gboolean needs_reply = TRUE; gboolean local_notify = FALSE; gboolean needs_forward = FALSE; gboolean global_update = crm_is_true(crm_element_value(request, F_CIB_GLOBAL_UPDATE)); xmlNode *op_reply = NULL; xmlNode *result_diff = NULL; int rc = pcmk_ok; const char *op = crm_element_value(request, F_CIB_OPERATION); const char *originator = crm_element_value(request, F_ORIG); const char *host = crm_element_value(request, F_CIB_HOST); const char *target = NULL; const char *client_id = crm_element_value(request, F_CIB_CLIENTID); if(cib_client) { from_peer = FALSE; } cib_num_ops++; if (cib_num_ops == 0) { cib_num_fail = 0; cib_num_local = 0; cib_num_updates = 0; crm_info("Stats wrapped around"); } crm_element_value_int(request, F_CIB_CALLOPTS, &call_options); if (force_synchronous) { call_options |= cib_sync_call; } if (host != NULL && strlen(host) == 0) { host = NULL; } if(host) { target = host; } else if(call_options & cib_scope_local) { target = "local host"; } else { target = "master"; } if(from_peer) { crm_trace("Processing peer %s operation from %s on %s intended for %s", op, client_id, originator, target); } else { crm_xml_add(request, F_ORIG, cib_our_uname); crm_trace("Processing local %s operation from %s intended for %s", op, client_id, target); } rc = cib_get_operation_id(op, &call_type); if (rc != pcmk_ok) { /* TODO: construct error reply? */ crm_err("Pre-processing of command failed: %s", pcmk_strerror(rc)); return; } if (from_peer == FALSE) { parse_local_options(cib_client, call_type, call_options, host, op, &local_notify, &needs_reply, &process, &needs_forward); } else if (parse_peer_options(call_type, request, &local_notify, &needs_reply, &process, &needs_forward) == FALSE) { return; } is_update = cib_op_modifies(call_type); if (is_update) { cib_num_updates++; } if (call_options & cib_discard_reply) { needs_reply = is_update; local_notify = FALSE; } if (needs_forward) { const char *host = crm_element_value(request, F_CIB_HOST); const char *section = crm_element_value(request, F_CIB_SECTION); crm_info("Forwarding %s operation for section %s to %s (origin=%s/%s/%s)", op, section ? section : "'all'", host ? host : "master", originator ? originator : "local", crm_element_value(request, F_CIB_CLIENTNAME), crm_element_value(request, F_CIB_CALLID)); forward_request(request, cib_client, call_options); return; } if (cib_status != pcmk_ok) { rc = cib_status; crm_err("Operation ignored, cluster configuration is invalid." " Please repair and restart: %s", pcmk_strerror(cib_status)); op_reply = cib_construct_reply(request, the_cib, cib_status); } else if (process) { int now = time(NULL); int level = LOG_INFO; const char *section = crm_element_value(request, F_CIB_SECTION); cib_num_local++; rc = cib_process_command(request, &op_reply, &result_diff, privileged); if (global_update) { switch (rc) { case pcmk_ok: level = LOG_INFO; break; case -pcmk_err_old_data: case -pcmk_err_diff_resync: case -pcmk_err_diff_failed: level = LOG_TRACE; break; default: level = LOG_ERR; } } else if (rc != pcmk_ok && is_update) { cib_num_fail++; level = LOG_WARNING; /* } else if (safe_str_eq(op, CIB_OP_QUERY)) { level = LOG_TRACE; } else if (safe_str_eq(op, CIB_OP_SLAVE)) { level = LOG_TRACE; } else if (safe_str_eq(section, XML_CIB_TAG_STATUS)) { level = LOG_TRACE; */ } do_crm_log(level, "Completed %s operation for section %s: %s (rc=%d, origin=%s/%s/%s, version=%s.%s.%s)", op, section ? section : "'all'", pcmk_strerror(rc), rc, originator ? originator : "local", crm_element_value(request, F_CIB_CLIENTNAME), crm_element_value(request, F_CIB_CALLID), the_cib ? crm_element_value(the_cib, XML_ATTR_GENERATION_ADMIN) : "0", the_cib ? crm_element_value(the_cib, XML_ATTR_GENERATION) : "0", the_cib ? crm_element_value(the_cib, XML_ATTR_NUMUPDATES) : "0" ); if((now + 1) < time(NULL)) { crm_write_blackbox(0, NULL); } if (op_reply == NULL && (needs_reply || local_notify)) { crm_err("Unexpected NULL reply to message"); crm_log_xml_err(request, "null reply"); needs_reply = FALSE; local_notify = FALSE; } } /* from now on we are the server */ if (needs_reply == FALSE || stand_alone) { /* nothing more to do... * this was a non-originating slave update */ crm_trace("Completed slave update"); } else if (rc == pcmk_ok && result_diff != NULL && !(call_options & cib_inhibit_bcast)) { gboolean broadcast = FALSE; cib_local_bcast_num++; crm_xml_add_int(request, F_CIB_LOCAL_NOTIFY_ID, cib_local_bcast_num); broadcast = send_peer_reply(request, result_diff, originator, TRUE); if (broadcast && client_id && local_notify && op_reply) { /* If we have been asked to sync the reply, * and a bcast msg has gone out, we queue the local notify * until we know the bcast message has been received */ local_notify = FALSE; crm_trace("Queuing local %ssync notification for %s", (call_options & cib_sync_call)?"":"a-", client_id); queue_local_notify(op_reply, client_id, (call_options & cib_sync_call), from_peer); op_reply = NULL; /* the reply is queued, so don't free here */ } } else if (call_options & cib_discard_reply) { crm_trace("Caller isn't interested in reply"); } else if (from_peer) { if (is_update == FALSE || result_diff == NULL) { crm_trace("Request not broadcast: R/O call"); } else if (call_options & cib_inhibit_bcast) { crm_trace("Request not broadcast: inhibited"); } else if (rc != pcmk_ok) { crm_trace("Request not broadcast: call failed: %s", pcmk_strerror(rc)); } else { crm_trace("Directing reply to %s", originator); } send_peer_reply(op_reply, result_diff, originator, FALSE); } if (local_notify && client_id) { crm_trace("Performing local %ssync notification for %s", (call_options & cib_sync_call)?"":"a-", client_id); if (process == FALSE) { do_local_notify(request, client_id, call_options & cib_sync_call, from_peer); } else { do_local_notify(op_reply, client_id, call_options & cib_sync_call, from_peer); } } free_xml(op_reply); free_xml(result_diff); return; } xmlNode * cib_construct_reply(xmlNode * request, xmlNode * output, int rc) { int lpc = 0; xmlNode *reply = NULL; const char *name = NULL; const char *value = NULL; const char *names[] = { F_CIB_OPERATION, F_CIB_CALLID, F_CIB_CLIENTID, F_CIB_CALLOPTS }; static int max = DIMOF(names); crm_trace("Creating a basic reply"); reply = create_xml_node(NULL, "cib-reply"); crm_xml_add(reply, F_TYPE, T_CIB); for (lpc = 0; lpc < max; lpc++) { name = names[lpc]; value = crm_element_value(request, name); crm_xml_add(reply, name, value); } crm_xml_add_int(reply, F_CIB_RC, rc); if (output != NULL) { crm_trace("Attaching reply output"); add_message_xml(reply, F_CIB_CALLDATA, output); } return reply; } int cib_process_command(xmlNode * request, xmlNode ** reply, xmlNode ** cib_diff, gboolean privileged) { xmlNode *input = NULL; xmlNode *output = NULL; xmlNode *result_cib = NULL; xmlNode *current_cib = NULL; #if ENABLE_ACL xmlNode *filtered_current_cib = NULL; #endif int call_type = 0; int call_options = 0; int log_level = LOG_TRACE; const char *op = NULL; const char *section = NULL; int rc = pcmk_ok; int rc2 = pcmk_ok; gboolean send_r_notify = FALSE; gboolean global_update = FALSE; gboolean config_changed = FALSE; gboolean manage_counters = TRUE; CRM_ASSERT(cib_status == pcmk_ok); *reply = NULL; *cib_diff = NULL; current_cib = the_cib; /* Start processing the request... */ op = crm_element_value(request, F_CIB_OPERATION); crm_element_value_int(request, F_CIB_CALLOPTS, &call_options); rc = cib_get_operation_id(op, &call_type); if (rc == pcmk_ok && privileged == FALSE) { rc = cib_op_can_run(call_type, call_options, privileged, global_update); } rc2 = cib_op_prepare(call_type, request, &input, §ion); if (rc == pcmk_ok) { rc = rc2; } if (rc != pcmk_ok) { crm_trace("Call setup failed: %s", pcmk_strerror(rc)); goto done; } else if (cib_op_modifies(call_type) == FALSE) { #if ENABLE_ACL if (acl_enabled(config_hash) == FALSE || acl_filter_cib(request, current_cib, current_cib, &filtered_current_cib) == FALSE) { rc = cib_perform_op(op, call_options, cib_op_func(call_type), TRUE, section, request, input, FALSE, &config_changed, current_cib, &result_cib, NULL, &output); } else if (filtered_current_cib == NULL) { crm_debug("Pre-filtered the entire cib"); rc = -EACCES; } else { crm_debug("Pre-filtered the queried cib according to the ACLs"); rc = cib_perform_op(op, call_options, cib_op_func(call_type), TRUE, section, request, input, FALSE, &config_changed, filtered_current_cib, &result_cib, NULL, &output); } #else rc = cib_perform_op(op, call_options, cib_op_func(call_type), TRUE, section, request, input, FALSE, &config_changed, current_cib, &result_cib, NULL, &output); #endif CRM_CHECK(result_cib == NULL, free_xml(result_cib)); goto done; } /* Handle a valid write action */ global_update = crm_is_true(crm_element_value(request, F_CIB_GLOBAL_UPDATE)); if (global_update) { manage_counters = FALSE; call_options |= cib_force_diff; CRM_CHECK(call_type == 3 || call_type == 4, crm_err("Call type: %d", call_type); crm_log_xml_err(request, "bad op")); } #ifdef SUPPORT_PRENOTIFY if ((call_options & cib_inhibit_notify) == 0) { cib_pre_notify(call_options, op, the_cib, input); } #endif if (rc == pcmk_ok) { if (call_options & cib_inhibit_bcast) { /* skip */ crm_trace("Skipping update: inhibit broadcast"); manage_counters = FALSE; } /* result_cib must not be modified after cib_perform_op() returns */ rc = cib_perform_op(op, call_options, cib_op_func(call_type), FALSE, section, request, input, manage_counters, &config_changed, current_cib, &result_cib, cib_diff, &output); #if ENABLE_ACL if (acl_enabled(config_hash) == TRUE && acl_check_diff(request, current_cib, result_cib, *cib_diff) == FALSE) { rc = -EACCES; } #endif if (manage_counters == FALSE) { /* If the diff is NULL at this point, its because nothing changed */ config_changed = cib_config_changed(NULL, NULL, cib_diff); } /* Always write to disk for replace ops, * this also negates the need to detect ordering changes */ if (crm_str_eq(CIB_OP_REPLACE, op, TRUE)) { config_changed = TRUE; } } if (rc == pcmk_ok && (call_options & cib_dryrun) == 0) { rc = activateCibXml(result_cib, config_changed, op); if (rc == pcmk_ok && cib_internal_config_changed(*cib_diff)) { cib_read_config(config_hash, result_cib); } if (crm_str_eq(CIB_OP_REPLACE, op, TRUE)) { if (section == NULL) { send_r_notify = TRUE; } else if (safe_str_eq(section, XML_TAG_CIB)) { send_r_notify = TRUE; } else if (safe_str_eq(section, XML_CIB_TAG_NODES)) { send_r_notify = TRUE; } else if (safe_str_eq(section, XML_CIB_TAG_STATUS)) { send_r_notify = TRUE; } } else if (crm_str_eq(CIB_OP_ERASE, op, TRUE)) { send_r_notify = TRUE; } } else if (rc == -pcmk_err_dtd_validation) { if (output != NULL) { crm_log_xml_info(output, "cib:output"); free_xml(output); } #if ENABLE_ACL { xmlNode *filtered_result_cib = NULL; if (acl_enabled(config_hash) == FALSE || acl_filter_cib(request, current_cib, result_cib, &filtered_result_cib) == FALSE) { output = result_cib; } else { crm_debug("Filtered the result cib for output according to the ACLs"); output = filtered_result_cib; if (result_cib != NULL) { free_xml(result_cib); } } } #else output = result_cib; #endif } else { free_xml(result_cib); } if ((call_options & cib_inhibit_notify) == 0) { const char *call_id = crm_element_value(request, F_CIB_CALLID); const char *client = crm_element_value(request, F_CIB_CLIENTNAME); #ifdef SUPPORT_POSTNOTIFY cib_post_notify(call_options, op, input, rc, the_cib); #endif cib_diff_notify(call_options, client, call_id, op, input, rc, *cib_diff); } if (send_r_notify) { const char *origin = crm_element_value(request, F_ORIG); cib_replace_notify(origin, the_cib, rc, *cib_diff); } if (rc != pcmk_ok) { log_level = LOG_TRACE; if (rc == -pcmk_err_dtd_validation && global_update) { log_level = LOG_WARNING; crm_log_xml_info(input, "cib:global_update"); } } else if (config_changed) { log_level = LOG_TRACE; if (cib_is_master) { log_level = LOG_NOTICE; } } else if (cib_is_master) { log_level = LOG_TRACE; } log_cib_diff(log_level, *cib_diff, "cib:diff"); done: if ((call_options & cib_discard_reply) == 0) { *reply = cib_construct_reply(request, output, rc); crm_log_xml_explicit(*reply, "cib:reply"); } #if ENABLE_ACL if (filtered_current_cib != NULL) { free_xml(filtered_current_cib); } #endif if (call_type >= 0) { cib_op_cleanup(call_type, call_options, &input, &output); } return rc; } gint cib_GCompareFunc(gconstpointer a, gconstpointer b) { const xmlNode *a_msg = a; const xmlNode *b_msg = b; int msg_a_id = 0; int msg_b_id = 0; const char *value = NULL; value = crm_element_value_const(a_msg, F_CIB_CALLID); msg_a_id = crm_parse_int(value, NULL); value = crm_element_value_const(b_msg, F_CIB_CALLID); msg_b_id = crm_parse_int(value, NULL); if (msg_a_id == msg_b_id) { return 0; } else if (msg_a_id < msg_b_id) { return -1; } return 1; } #if SUPPORT_HEARTBEAT void cib_ha_peer_callback(HA_Message * msg, void *private_data) { xmlNode *xml = convert_ha_message(NULL, msg, __FUNCTION__); cib_peer_callback(xml, private_data); free_xml(xml); } #endif void cib_peer_callback(xmlNode * msg, void *private_data) { const char *reason = NULL; const char *originator = crm_element_value(msg, F_ORIG); if (originator == NULL || crm_str_eq(originator, cib_our_uname, TRUE)) { /* message is from ourselves */ int bcast_id = 0; if (!(crm_element_value_int(msg, F_CIB_LOCAL_NOTIFY_ID, &bcast_id))) { check_local_notify(bcast_id); } return; } else if (crm_peer_cache == NULL) { reason = "membership not established"; goto bail; } if (crm_element_value(msg, F_CIB_CLIENTNAME) == NULL) { crm_xml_add(msg, F_CIB_CLIENTNAME, originator); } /* crm_log_xml_trace("Peer[inbound]", msg); */ cib_process_request(msg, FALSE, TRUE, TRUE, NULL); return; bail: if (reason) { const char *seq = crm_element_value(msg, F_SEQ); const char *op = crm_element_value(msg, F_CIB_OPERATION); crm_warn("Discarding %s message (%s) from %s: %s", op, seq, originator, reason); } } #if SUPPORT_HEARTBEAT extern oc_ev_t *cib_ev_token; static void *ccm_library = NULL; int (*ccm_api_callback_done) (void *cookie) = NULL; int (*ccm_api_handle_event) (const oc_ev_t * token) = NULL; void cib_client_status_callback(const char *node, const char *client, const char *status, void *private) { crm_node_t *peer = NULL; if (safe_str_eq(client, CRM_SYSTEM_CIB)) { crm_info("Status update: Client %s/%s now has status [%s]", node, client, status); if (safe_str_eq(status, JOINSTATUS)) { status = ONLINESTATUS; } else if (safe_str_eq(status, LEAVESTATUS)) { status = OFFLINESTATUS; } peer = crm_get_peer(0, node); crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cib, status); } return; } int cib_ccm_dispatch(gpointer user_data) { int rc = 0; oc_ev_t *ccm_token = (oc_ev_t *) user_data; crm_trace("received callback"); if (ccm_api_handle_event == NULL) { ccm_api_handle_event = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_handle_event", 1); } rc = (*ccm_api_handle_event) (ccm_token); if (0 == rc) { return 0; } crm_err("CCM connection appears to have failed: rc=%d.", rc); /* eventually it might be nice to recover and reconnect... but until then... */ crm_err("Exiting to recover from CCM connection failure"); crm_exit(2); return -1; } int current_instance = 0; void cib_ccm_msg_callback(oc_ed_t event, void *cookie, size_t size, const void *data) { gboolean update_id = FALSE; const oc_ev_membership_t *membership = data; CRM_ASSERT(membership != NULL); crm_info("Processing CCM event=%s (id=%d)", ccm_event_name(event), membership->m_instance); if (current_instance > membership->m_instance) { crm_err("Membership instance ID went backwards! %d->%d", current_instance, membership->m_instance); CRM_ASSERT(current_instance <= membership->m_instance); } switch (event) { case OC_EV_MS_NEW_MEMBERSHIP: case OC_EV_MS_INVALID: update_id = TRUE; break; case OC_EV_MS_PRIMARY_RESTORED: update_id = TRUE; break; case OC_EV_MS_NOT_PRIMARY: crm_trace("Ignoring transitional CCM event: %s", ccm_event_name(event)); break; case OC_EV_MS_EVICTED: crm_err("Evicted from CCM: %s", ccm_event_name(event)); break; default: crm_err("Unknown CCM event: %d", event); } if (update_id) { unsigned int lpc = 0; CRM_CHECK(membership != NULL, return); current_instance = membership->m_instance; for (lpc = 0; lpc < membership->m_n_out; lpc++) { crm_update_ccm_node(membership, lpc + membership->m_out_idx, CRM_NODE_LOST, current_instance); } for (lpc = 0; lpc < membership->m_n_member; lpc++) { crm_update_ccm_node(membership, lpc + membership->m_memb_idx, CRM_NODE_ACTIVE, current_instance); } } if (ccm_api_callback_done == NULL) { ccm_api_callback_done = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_callback_done", 1); } (*ccm_api_callback_done) (cookie); return; } #endif gboolean can_write(int flags) { return TRUE; } static gboolean cib_force_exit(gpointer data) { crm_notice("Forcing exit!"); terminate_cib(__FUNCTION__, TRUE); return FALSE; } static void disconnect_remote_client(gpointer key, gpointer value, gpointer user_data) { crm_client_t *a_client = value; crm_err("Disconnecting %s... Not implemented", crm_str(a_client->name)); } void cib_shutdown(int nsig) { struct qb_ipcs_stats srv_stats; if (cib_shutdown_flag == FALSE) { int disconnects = 0; qb_ipcs_connection_t *c = NULL; cib_shutdown_flag = TRUE; c = qb_ipcs_connection_first_get(ipcs_rw); while(c != NULL) { qb_ipcs_connection_t *last = c; c = qb_ipcs_connection_next_get(ipcs_rw, last); crm_debug("Disconnecting r/w client %p...", last); qb_ipcs_disconnect(last); qb_ipcs_connection_unref(last); disconnects++; } c = qb_ipcs_connection_first_get(ipcs_ro); while(c != NULL) { qb_ipcs_connection_t *last = c; c = qb_ipcs_connection_next_get(ipcs_ro, last); crm_debug("Disconnecting r/o client %p...", last); qb_ipcs_disconnect(last); qb_ipcs_connection_unref(last); disconnects++; } c = qb_ipcs_connection_first_get(ipcs_shm); while(c != NULL) { qb_ipcs_connection_t *last = c; c = qb_ipcs_connection_next_get(ipcs_shm, last); crm_debug("Disconnecting non-blocking r/w client %p...", last); qb_ipcs_disconnect(last); qb_ipcs_connection_unref(last); disconnects++; } disconnects += crm_hash_table_size(client_connections); crm_debug("Disconnecting %d remote clients", crm_hash_table_size(client_connections)); g_hash_table_foreach(client_connections, disconnect_remote_client, NULL); crm_info("Disconnected %d clients", disconnects); } qb_ipcs_stats_get(ipcs_rw, &srv_stats, QB_FALSE); - + if(crm_hash_table_size(client_connections) == 0) { crm_info("All clients disconnected (%d)", srv_stats.active_connections); initiate_exit(); - + } else { crm_info("Waiting on %d clients to disconnect (%d)", crm_hash_table_size(client_connections), srv_stats.active_connections); } } void initiate_exit(void) { int active = 0; xmlNode *leaving = NULL; active = crm_active_peers(); if (active < 2) { terminate_cib(__FUNCTION__, FALSE); return; } crm_info("Sending disconnect notification to %d peers...", active); leaving = create_xml_node(NULL, "exit-notification"); crm_xml_add(leaving, F_TYPE, "cib"); crm_xml_add(leaving, F_CIB_OPERATION, "cib_shutdown_req"); send_cluster_message(NULL, crm_msg_cib, leaving, TRUE); free_xml(leaving); g_timeout_add(crm_get_msec("5s"), cib_force_exit, NULL); } extern int remote_fd; extern int remote_tls_fd; extern void terminate_cs_connection(void); void terminate_cib(const char *caller, gboolean fast) { if (remote_fd > 0) { close(remote_fd); remote_fd = 0; } if (remote_tls_fd > 0) { close(remote_tls_fd); remote_tls_fd = 0; } - + if(!fast) { crm_info("%s: Disconnecting from cluster infrastructure", caller); crm_cluster_disconnect(&crm_cluster); } uninitializeCib(); crm_info("%s: Exiting%s...", caller, fast?" fast":mainloop?" from mainloop":""); if(fast == FALSE && mainloop != NULL && g_main_is_running(mainloop)) { g_main_quit(mainloop); } else { qb_ipcs_destroy(ipcs_ro); qb_ipcs_destroy(ipcs_rw); qb_ipcs_destroy(ipcs_shm); if (fast) { crm_exit(EX_USAGE); } else { crm_exit(EX_OK); } } } diff --git a/cib/callbacks.h b/cib/callbacks.h index f2f95cc37b..8a58213aa1 100644 --- a/cib/callbacks.h +++ b/cib/callbacks.h @@ -1,77 +1,78 @@ -/* +/* * Copyright (C) 2004 Andrew Beekhof - * + * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. - * + * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. - * + * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include +#include #include #ifdef HAVE_GNUTLS_GNUTLS_H # undef KEYFILE # include #endif extern gboolean cib_is_master; extern GHashTable *peer_hash; extern GHashTable *config_hash; -enum cib_notifications +enum cib_notifications { cib_notify_pre = 0x0001, cib_notify_post = 0x0002, cib_notify_replace = 0x0004, cib_notify_confirm = 0x0008, cib_notify_diff = 0x0010, }; typedef struct cib_operation_s { const char *operation; gboolean modifies_cib; gboolean needs_privileges; gboolean needs_quorum; int (*prepare) (xmlNode *, xmlNode **, const char **); int (*cleanup) (int, xmlNode **, xmlNode **); int (*fn) (const char *, int, const char *, xmlNode *, xmlNode *, xmlNode *, xmlNode **, xmlNode **); } cib_operation_t; extern struct qb_ipcs_service_handlers ipc_ro_callbacks; extern struct qb_ipcs_service_handlers ipc_rw_callbacks; extern qb_ipcs_service_t *ipcs_ro; extern qb_ipcs_service_t *ipcs_rw; extern qb_ipcs_service_t *ipcs_shm; extern void cib_peer_callback(xmlNode * msg, void *private_data); extern void cib_client_status_callback(const char *node, const char *client, const char *status, void *private); extern void cib_common_callback_worker(uint32_t id, uint32_t flags, xmlNode * op_request, crm_client_t * cib_client, gboolean privileged); void cib_shutdown(int nsig); void initiate_exit(void); void terminate_cib(const char *caller, gboolean fast); #if SUPPORT_HEARTBEAT extern void cib_ha_peer_callback(HA_Message * msg, void *private_data); extern int cib_ccm_dispatch(gpointer user_data); extern void cib_ccm_msg_callback(oc_ed_t event, void *cookie, size_t size, const void *data); #endif diff --git a/cib/main.c b/cib/main.c index 22053e97fa..d0bee819cd 100644 --- a/cib/main.c +++ b/cib/main.c @@ -1,584 +1,583 @@ -/* +/* * Copyright (C) 2004 Andrew Beekhof - * + * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. - * + * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. - * + * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include -#include #include #include #include #include #include #include #if HAVE_LIBXML2 # include #endif #ifdef HAVE_GETOPT_H # include #endif #if HAVE_BZLIB_H # include #endif extern int init_remote_listener(int port, gboolean encrypted); extern gboolean stand_alone; gboolean cib_shutdown_flag = FALSE; int cib_status = pcmk_ok; crm_cluster_t crm_cluster; #if SUPPORT_HEARTBEAT oc_ev_t *cib_ev_token; ll_cluster_t *hb_conn = NULL; extern void oc_ev_special(const oc_ev_t *, oc_ev_class_t, int); gboolean cib_register_ha(ll_cluster_t * hb_cluster, const char *client_name); #else void *hb_conn = NULL; #endif extern void terminate_cib(const char *caller, gboolean fast); GMainLoop *mainloop = NULL; const char *cib_root = NULL; char *cib_our_uname = NULL; gboolean preserve_status = FALSE; gboolean cib_writes_enabled = TRUE; int remote_fd = 0; int remote_tls_fd = 0; int cib_init(void); void cib_shutdown(int nsig); gboolean startCib(const char *filename); extern int write_cib_contents(gpointer p); GHashTable *config_hash = NULL; GHashTable *local_notify_queue = NULL; char *channel1 = NULL; char *channel2 = NULL; char *channel3 = NULL; char *channel4 = NULL; char *channel5 = NULL; #define OPTARGS "maswr:V?" void cib_cleanup(void); static void cib_enable_writes(int nsig) { crm_info("(Re)enabling disk writes"); cib_writes_enabled = TRUE; } static void log_cib_client(gpointer key, gpointer value, gpointer user_data) { crm_info("Client %s", crm_client_name(value)); } /* *INDENT-OFF* */ static struct crm_option long_options[] = { /* Top-level Options */ {"help", 0, 0, '?', "\tThis text"}, {"verbose", 0, 0, 'V', "\tIncrease debug output"}, {"per-action-cib", 0, 0, 'a', "\tAdvanced use only"}, {"stand-alone", 0, 0, 's', "\tAdvanced use only"}, {"disk-writes", 0, 0, 'w', "\tAdvanced use only"}, {"cib-root", 1, 0, 'r', "\tAdvanced use only"}, {0, 0, 0, 0} }; /* *INDENT-ON* */ int main(int argc, char **argv) { int flag; int rc = 0; int index = 0; int argerr = 0; struct passwd *pwentry = NULL; crm_log_init(NULL, LOG_INFO, TRUE, FALSE, argc, argv, FALSE); crm_set_options(NULL, "[options]", long_options, "Daemon for storing and replicating the cluster configuration"); - + mainloop_add_signal(SIGTERM, cib_shutdown); mainloop_add_signal(SIGPIPE, cib_enable_writes); cib_writer = mainloop_add_trigger(G_PRIORITY_LOW, write_cib_contents, NULL); crm_peer_init(); while (1) { flag = crm_get_option(argc, argv, &index); if (flag == -1) break; switch (flag) { case 'V': crm_bump_log_level(argc, argv); break; case 's': stand_alone = TRUE; preserve_status = TRUE; cib_writes_enabled = FALSE; pwentry = getpwnam(CRM_DAEMON_USER); CRM_CHECK(pwentry != NULL, crm_perror(LOG_ERR, "Invalid uid (%s) specified", CRM_DAEMON_USER); return 100); rc = setgid(pwentry->pw_gid); if (rc < 0) { crm_perror(LOG_ERR, "Could not set group to %d", pwentry->pw_gid); return 100; } rc = setuid(pwentry->pw_uid); if (rc < 0) { crm_perror(LOG_ERR, "Could not set user to %d", pwentry->pw_uid); return 100; } break; case '?': /* Help message */ crm_help(flag, EX_OK); break; case 'w': cib_writes_enabled = TRUE; break; case 'r': cib_root = optarg; break; case 'm': cib_metadata(); return 0; default: ++argerr; break; } } if (argc - optind == 1 && safe_str_eq("metadata", argv[optind])) { cib_metadata(); return 0; } if (optind > argc) { ++argerr; } if (argerr) { crm_help('?', EX_USAGE); } if(cib_root == NULL) { char *path = g_strdup_printf("%s/cib.xml", CRM_CONFIG_DIR); char *legacy = g_strdup_printf("%s/cib.xml", CRM_LEGACY_CONFIG_DIR); if(g_file_test(path, G_FILE_TEST_EXISTS)) { cib_root = CRM_CONFIG_DIR; } else if(g_file_test(legacy, G_FILE_TEST_EXISTS)) { cib_root = CRM_LEGACY_CONFIG_DIR; crm_notice("Using legacy config location: %s", cib_root); } else { cib_root = CRM_CONFIG_DIR; crm_notice("Using new config location: %s", cib_root); } g_free(legacy); g_free(path); } else { crm_notice("Using custom config location: %s", cib_root); } if (crm_is_writable(cib_root, NULL, CRM_DAEMON_USER, CRM_DAEMON_GROUP, FALSE) == FALSE) { crm_err("Bad permissions on %s. Terminating", cib_root); fprintf(stderr, "ERROR: Bad permissions on %s. See logs for details\n", cib_root); fflush(stderr); return 100; } /* read local config file */ rc = cib_init(); CRM_CHECK(crm_hash_table_size(client_connections) == 0, crm_warn("Not all clients gone at exit")); g_hash_table_foreach(client_connections, log_cib_client, NULL); cib_cleanup(); #if SUPPORT_HEARTBEAT if (hb_conn) { hb_conn->llc_ops->delete(hb_conn); } #endif crm_info("Done"); return rc; } void cib_cleanup(void) { crm_peer_destroy(); if (local_notify_queue) { g_hash_table_destroy(local_notify_queue); } crm_client_cleanup(); g_hash_table_destroy(config_hash); free(cib_our_uname); free(channel1); free(channel2); free(channel3); free(channel4); free(channel5); } unsigned long cib_num_ops = 0; const char *cib_stat_interval = "10min"; unsigned long cib_num_local = 0, cib_num_updates = 0, cib_num_fail = 0; unsigned long cib_bad_connects = 0, cib_num_timeouts = 0; #if SUPPORT_HEARTBEAT gboolean ccm_connect(void); static void ccm_connection_destroy(gpointer user_data) { crm_err("CCM connection failed... blocking while we reconnect"); CRM_ASSERT(ccm_connect()); return; } static void *ccm_library = NULL; gboolean ccm_connect(void) { gboolean did_fail = TRUE; int num_ccm_fails = 0; int max_ccm_fails = 30; int ret; int cib_ev_fd; int (*ccm_api_register) (oc_ev_t ** token) = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_register", 1); int (*ccm_api_set_callback) (const oc_ev_t * token, oc_ev_class_t class, oc_ev_callback_t * fn, oc_ev_callback_t ** prev_fn) = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_set_callback", 1); void (*ccm_api_special) (const oc_ev_t *, oc_ev_class_t, int) = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_special", 1); int (*ccm_api_activate) (const oc_ev_t * token, int *fd) = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_activate", 1); int (*ccm_api_unregister) (oc_ev_t * token) = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_unregister", 1); - static struct mainloop_fd_callbacks ccm_fd_callbacks = + static struct mainloop_fd_callbacks ccm_fd_callbacks = { .dispatch = cib_ccm_dispatch, .destroy = ccm_connection_destroy, }; - + while (did_fail) { did_fail = FALSE; crm_info("Registering with CCM..."); ret = (*ccm_api_register) (&cib_ev_token); if (ret != 0) { did_fail = TRUE; } if (did_fail == FALSE) { crm_trace("Setting up CCM callbacks"); ret = (*ccm_api_set_callback) (cib_ev_token, OC_EV_MEMB_CLASS, cib_ccm_msg_callback, NULL); if (ret != 0) { crm_warn("CCM callback not set"); did_fail = TRUE; } } if (did_fail == FALSE) { (*ccm_api_special) (cib_ev_token, OC_EV_MEMB_CLASS, 0); crm_trace("Activating CCM token"); ret = (*ccm_api_activate) (cib_ev_token, &cib_ev_fd); if (ret != 0) { crm_warn("CCM Activation failed"); did_fail = TRUE; } } if (did_fail) { num_ccm_fails++; (*ccm_api_unregister) (cib_ev_token); if (num_ccm_fails < max_ccm_fails) { crm_warn("CCM Connection failed %d times (%d max)", num_ccm_fails, max_ccm_fails); sleep(3); } else { crm_err("CCM Activation failed %d (max) times", num_ccm_fails); return FALSE; } } } crm_debug("CCM Activation passed... all set to go!"); mainloop_add_fd("heartbeat-ccm", G_PRIORITY_MEDIUM, cib_ev_fd, cib_ev_token, &ccm_fd_callbacks); return TRUE; } #endif #if SUPPORT_COROSYNC static gboolean cib_ais_dispatch(int kind, const char *from, const char *data) { xmlNode *xml = NULL; if (kind == crm_class_cluster) { xml = string2xml(data); if (xml == NULL) { goto bail; } crm_xml_add(xml, F_ORIG, from); /* crm_xml_add_int(xml, F_SEQ, wrapper->id); */ cib_peer_callback(xml, NULL); } free_xml(xml); return TRUE; bail: crm_err("Invalid XML: '%.120s'", data); return TRUE; } static void cib_ais_destroy(gpointer user_data) { if (cib_shutdown_flag) { crm_info("Corosync disconnection complete"); } else { crm_err("Corosync connection lost! Exiting."); terminate_cib(__FUNCTION__, TRUE); } } #endif static void cib_peer_update_callback(enum crm_status_type type, crm_node_t * node, const void *data) { #if 0 /* crm_active_peers(crm_proc_cib) appears to give the wrong answer * sometimes, this might help figure out why */ if(type == crm_status_nstate) { crm_info("status: %s is now %s (was %s)", node->uname, node->state, (const char *)data); if (safe_str_eq(CRMD_JOINSTATE_MEMBER, node->state)) { return; } } else if(type == crm_status_processes) { uint32_t old = 0; if (data) { old = *(const uint32_t *)data; } - + if ((node->processes ^ old) & crm_proc_cib) { crm_info("status: cib process on %s is now %sactive", node->uname, is_set(node->processes, crm_proc_cib)?"":"in"); } else { return; } } else { return; } #endif if(cib_shutdown_flag && crm_active_peers() < 2 && crm_hash_table_size(client_connections) == 0) { crm_info("No more peers"); terminate_cib(__FUNCTION__, FALSE); } } #if SUPPORT_HEARTBEAT static void cib_ha_connection_destroy(gpointer user_data) { if (cib_shutdown_flag) { crm_info("Heartbeat disconnection complete... exiting"); terminate_cib(__FUNCTION__, FALSE); } else { crm_err("Heartbeat connection lost! Exiting."); terminate_cib(__FUNCTION__, TRUE); } } #endif int cib_init(void) { if (is_openais_cluster()) { #if SUPPORT_COROSYNC crm_cluster.destroy = cib_ais_destroy; crm_cluster.cs_dispatch = cib_ais_dispatch; #endif } else if(is_heartbeat_cluster()) { #if SUPPORT_HEARTBEAT crm_cluster.hb_dispatch = cib_ha_peer_callback; crm_cluster.destroy = cib_ha_connection_destroy; #endif } config_hash = g_hash_table_new_full(crm_str_hash, g_str_equal, g_hash_destroy_str, g_hash_destroy_str); if (startCib("cib.xml") == FALSE) { crm_crit("Cannot start CIB... terminating"); crm_exit(1); } if (stand_alone == FALSE) { if (crm_cluster_connect(&crm_cluster) == FALSE) { crm_crit("Cannot sign in to the cluster... terminating"); crm_exit(100); } cib_our_uname = crm_cluster.uname; if (is_openais_cluster()) { crm_set_status_callback(&cib_peer_update_callback); } #if SUPPORT_HEARTBEAT if (is_heartbeat_cluster()) { gboolean was_error = FALSE; hb_conn = crm_cluster.hb_conn; if (was_error == FALSE) { if (HA_OK != hb_conn->llc_ops->set_cstatus_callback(hb_conn, cib_client_status_callback, hb_conn)) { crm_err("Cannot set cstatus callback: %s", hb_conn->llc_ops->errmsg(hb_conn)); was_error = TRUE; } } if (was_error == FALSE) { was_error = (ccm_connect() == FALSE); } if (was_error == FALSE) { /* Async get client status information in the cluster */ crm_info("Requesting the list of configured nodes"); hb_conn->llc_ops->client_status(hb_conn, NULL, CRM_SYSTEM_CIB, -1); } } #endif } else { cib_our_uname = strdup("localhost"); } ipcs_ro = mainloop_add_ipc_server(cib_channel_ro, QB_IPC_NATIVE, &ipc_ro_callbacks); ipcs_rw = mainloop_add_ipc_server(cib_channel_rw, QB_IPC_NATIVE, &ipc_rw_callbacks); ipcs_shm = mainloop_add_ipc_server(cib_channel_shm, QB_IPC_SHM, &ipc_rw_callbacks); if (stand_alone) { cib_is_master = TRUE; } if (ipcs_ro != NULL && ipcs_rw != NULL && ipcs_shm != NULL) { /* Create the mainloop and run it... */ mainloop = g_main_new(FALSE); crm_info("Starting %s mainloop", crm_system_name); g_main_run(mainloop); } else { crm_err("Failed to create IPC servers: shutting down and inhibiting respawn"); crm_exit(100); } qb_ipcs_destroy(ipcs_ro); qb_ipcs_destroy(ipcs_rw); qb_ipcs_destroy(ipcs_shm); return crm_exit(0); } gboolean startCib(const char *filename) { gboolean active = FALSE; xmlNode *cib = readCibXmlFile(cib_root, filename, !preserve_status); CRM_ASSERT(cib != NULL); if (activateCibXml(cib, TRUE, "start") == 0) { int port = 0; const char *port_s = NULL; active = TRUE; cib_read_config(config_hash, cib); port_s = crm_element_value(cib, "remote-tls-port"); if (port_s) { port = crm_parse_int(port_s, "0"); remote_tls_fd = init_remote_listener(port, TRUE); } port_s = crm_element_value(cib, "remote-clear-port"); if (port_s) { port = crm_parse_int(port_s, "0"); remote_fd = init_remote_listener(port, FALSE); } crm_info("CIB Initialization completed successfully"); } return active; } diff --git a/cib/messages.c b/cib/messages.c index 29ad5d7eac..e4afac65d9 100644 --- a/cib/messages.c +++ b/cib/messages.c @@ -1,466 +1,467 @@ -/* +/* * Copyright (C) 2004 Andrew Beekhof - * + * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. - * + * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. - * + * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include +#include #include #include #include #include #define MAX_DIFF_RETRY 5 #ifdef CIBPIPE gboolean cib_is_master = TRUE; #else gboolean cib_is_master = FALSE; #endif xmlNode *the_cib = NULL; gboolean syncd_once = FALSE; extern const char *cib_our_uname; int revision_check(xmlNode * cib_update, xmlNode * cib_copy, int flags); int get_revision(xmlNode * xml_obj, int cur_revision); int updateList(xmlNode * local_cib, xmlNode * update_command, xmlNode * failed, int operation, const char *section); gboolean check_generation(xmlNode * newCib, xmlNode * oldCib); gboolean update_results(xmlNode * failed, xmlNode * target, const char *operation, int return_code); int cib_update_counter(xmlNode * xml_obj, const char *field, gboolean reset); int sync_our_cib(xmlNode * request, gboolean all); extern xmlNode *cib_msg_copy(const xmlNode * msg, gboolean with_data); extern gboolean cib_shutdown_flag; int cib_process_shutdown_req(const char *op, int options, const char *section, xmlNode * req, xmlNode * input, xmlNode * existing_cib, xmlNode ** result_cib, xmlNode ** answer) { #ifdef CIBPIPE return -EINVAL; #else int result = pcmk_ok; const char *host = crm_element_value(req, F_ORIG); *answer = NULL; if (crm_element_value(req, F_CIB_ISREPLY) == NULL) { crm_info("Shutdown REQ from %s", host); return pcmk_ok; } else if (cib_shutdown_flag) { crm_info("Shutdown ACK from %s", host); terminate_cib(__FUNCTION__, FALSE); return pcmk_ok; } else { crm_err("Shutdown ACK from %s - not shutting down", host); result = -EINVAL; } return result; #endif } int cib_process_default(const char *op, int options, const char *section, xmlNode * req, xmlNode * input, xmlNode * existing_cib, xmlNode ** result_cib, xmlNode ** answer) { int result = pcmk_ok; crm_trace("Processing \"%s\" event", op); *answer = NULL; if (op == NULL) { result = -EINVAL; crm_err("No operation specified"); } else if (strcasecmp(CRM_OP_NOOP, op) == 0) { ; } else { result = -EPROTONOSUPPORT; crm_err("Action [%s] is not supported by the CIB", op); } return result; } int cib_process_quit(const char *op, int options, const char *section, xmlNode * req, xmlNode * input, xmlNode * existing_cib, xmlNode ** result_cib, xmlNode ** answer) { int result = pcmk_ok; crm_trace("Processing \"%s\" event", op); crm_warn("The CRMd has asked us to exit... complying"); crm_exit(0); return result; } int cib_process_readwrite(const char *op, int options, const char *section, xmlNode * req, xmlNode * input, xmlNode * existing_cib, xmlNode ** result_cib, xmlNode ** answer) { #ifdef CIBPIPE return -EINVAL; #else int result = pcmk_ok; crm_trace("Processing \"%s\" event", op); if (safe_str_eq(op, CIB_OP_ISMASTER)) { if (cib_is_master == TRUE) { result = pcmk_ok; } else { result = -EPERM; } return result; } if (safe_str_eq(op, CIB_OP_MASTER)) { if (cib_is_master == FALSE) { crm_info("We are now in R/W mode"); cib_is_master = TRUE; syncd_once = TRUE; } else { crm_debug("We are still in R/W mode"); } } else if (cib_is_master) { crm_info("We are now in R/O mode"); cib_is_master = FALSE; } return result; #endif } int cib_process_ping(const char *op, int options, const char *section, xmlNode * req, xmlNode * input, xmlNode * existing_cib, xmlNode ** result_cib, xmlNode ** answer) { #ifdef CIBPIPE return -EINVAL; #else int result = pcmk_ok; crm_trace("Processing \"%s\" event", op); *answer = create_xml_node(NULL, XML_CRM_TAG_PING); crm_xml_add(*answer, XML_PING_ATTR_STATUS, "ok"); crm_xml_add(*answer, XML_PING_ATTR_SYSFROM, CRM_SYSTEM_CIB); return result; #endif } int cib_process_sync(const char *op, int options, const char *section, xmlNode * req, xmlNode * input, xmlNode * existing_cib, xmlNode ** result_cib, xmlNode ** answer) { #ifdef CIBPIPE return -EINVAL; #else return sync_our_cib(req, TRUE); #endif } int cib_process_sync_one(const char *op, int options, const char *section, xmlNode * req, xmlNode * input, xmlNode * existing_cib, xmlNode ** result_cib, xmlNode ** answer) { #ifdef CIBPIPE return -EINVAL; #else return sync_our_cib(req, FALSE); #endif } int sync_in_progress = 0; int cib_server_process_diff(const char *op, int options, const char *section, xmlNode * req, xmlNode * input, xmlNode * existing_cib, xmlNode ** result_cib, xmlNode ** answer) { int rc = pcmk_ok; if (cib_is_master) { /* the master is never waiting for a resync */ sync_in_progress = 0; } if (sync_in_progress > MAX_DIFF_RETRY) { /* request another full-sync, * the last request may have been lost */ sync_in_progress = 0; } if (sync_in_progress) { int diff_add_updates = 0; int diff_add_epoch = 0; int diff_add_admin_epoch = 0; int diff_del_updates = 0; int diff_del_epoch = 0; int diff_del_admin_epoch = 0; cib_diff_version_details(input, &diff_add_admin_epoch, &diff_add_epoch, &diff_add_updates, &diff_del_admin_epoch, &diff_del_epoch, &diff_del_updates); sync_in_progress++; crm_notice("Not applying diff %d.%d.%d -> %d.%d.%d (sync in progress)", diff_del_admin_epoch, diff_del_epoch, diff_del_updates, diff_add_admin_epoch, diff_add_epoch, diff_add_updates); return -pcmk_err_diff_resync; } rc = cib_process_diff(op, options, section, req, input, existing_cib, result_cib, answer); if (rc == -pcmk_err_diff_resync && cib_is_master == FALSE) { xmlNode *sync_me = create_xml_node(NULL, "sync-me"); free_xml(*result_cib); *result_cib = NULL; crm_info("Requesting re-sync from peer"); sync_in_progress++; crm_xml_add(sync_me, F_TYPE, "cib"); crm_xml_add(sync_me, F_CIB_OPERATION, CIB_OP_SYNC_ONE); crm_xml_add(sync_me, F_CIB_DELEGATED, cib_our_uname); if (send_cluster_message(NULL, crm_msg_cib, sync_me, FALSE) == FALSE) { rc = -ENOTCONN; } free_xml(sync_me); } else if (rc == -pcmk_err_diff_resync) { rc = -pcmk_err_diff_failed; if (options & cib_force_diff) { crm_warn("Not requesting full refresh in R/W mode"); } } return rc; } int cib_process_replace_svr(const char *op, int options, const char *section, xmlNode * req, xmlNode * input, xmlNode * existing_cib, xmlNode ** result_cib, xmlNode ** answer) { const char *tag = crm_element_name(input); int rc = cib_process_replace(op, options, section, req, input, existing_cib, result_cib, answer); if (rc == pcmk_ok && safe_str_eq(tag, XML_TAG_CIB)) { sync_in_progress = 0; } return rc; } static int delete_cib_object(xmlNode * parent, xmlNode * delete_spec) { const char *object_name = NULL; const char *object_id = NULL; xmlNode *equiv_node = NULL; int result = pcmk_ok; if (delete_spec != NULL) { object_name = crm_element_name(delete_spec); } object_id = crm_element_value(delete_spec, XML_ATTR_ID); crm_trace("Processing: <%s id=%s>", crm_str(object_name), crm_str(object_id)); if (delete_spec == NULL) { result = -EINVAL; } else if (parent == NULL) { result = -EINVAL; } else if (object_id == NULL) { /* placeholder object */ equiv_node = find_xml_node(parent, object_name, FALSE); } else { equiv_node = find_entity(parent, object_name, object_id); } if (result != pcmk_ok) { ; /* nothing */ } else if (equiv_node == NULL) { result = pcmk_ok; } else if (xml_has_children(delete_spec) == FALSE) { /* only leaves are deleted */ crm_debug("Removing leaf: <%s id=%s>", crm_str(object_name), crm_str(object_id)); free_xml(equiv_node); equiv_node = NULL; } else { xmlNode *child = NULL; for (child = __xml_first_child(delete_spec); child != NULL; child = __xml_next(child)) { int tmp_result = delete_cib_object(equiv_node, child); /* only the first error is likely to be interesting */ if (tmp_result != pcmk_ok && result == pcmk_ok) { result = tmp_result; } } } return result; } int cib_process_delete_absolute(const char *op, int options, const char *section, xmlNode * req, xmlNode * input, xmlNode * existing_cib, xmlNode ** result_cib, xmlNode ** answer) { xmlNode *failed = NULL; int result = pcmk_ok; xmlNode *update_section = NULL; crm_trace("Processing \"%s\" event for section=%s", op, crm_str(section)); if (safe_str_eq(XML_CIB_TAG_SECTION_ALL, section)) { section = NULL; } else if (safe_str_eq(XML_TAG_CIB, section)) { section = NULL; } else if (safe_str_eq(crm_element_name(input), XML_TAG_CIB)) { section = NULL; } CRM_CHECK(strcasecmp(CIB_OP_DELETE, op) == 0, return -EINVAL); if (input == NULL) { crm_err("Cannot perform modification with no data"); return -EINVAL; } failed = create_xml_node(NULL, XML_TAG_FAILED); update_section = get_object_root(section, *result_cib); result = delete_cib_object(update_section, input); update_results(failed, input, op, result); if (xml_has_children(failed)) { CRM_CHECK(result != pcmk_ok, result = -EINVAL); } if (result != pcmk_ok) { crm_log_xml_err(failed, "CIB Update failures"); *answer = failed; } else { free_xml(failed); } return result; } gboolean check_generation(xmlNode * newCib, xmlNode * oldCib) { if (cib_compare_generation(newCib, oldCib) >= 0) { return TRUE; } crm_warn("Generation from update is older than the existing one"); return FALSE; } #ifndef CIBPIPE int sync_our_cib(xmlNode * request, gboolean all) { int result = pcmk_ok; char *digest = NULL; const char *host = crm_element_value(request, F_ORIG); const char *op = crm_element_value(request, F_CIB_OPERATION); xmlNode *replace_request = cib_msg_copy(request, FALSE); CRM_CHECK(the_cib != NULL,;); CRM_CHECK(replace_request != NULL,;); crm_debug("Syncing CIB to %s", all ? "all peers" : host); if (all == FALSE && host == NULL) { crm_log_xml_err(request, "bad sync"); } /* remove the "all == FALSE" condition * * sync_from was failing, the local client wasnt being notified * because it didnt know it was a reply * setting this does not prevent the other nodes from applying it * if all == TRUE */ if (host != NULL) { crm_xml_add(replace_request, F_CIB_ISREPLY, host); } crm_xml_add(replace_request, F_CIB_OPERATION, CIB_OP_REPLACE); crm_xml_add(replace_request, "original_" F_CIB_OPERATION, op); crm_xml_add(replace_request, F_CIB_GLOBAL_UPDATE, XML_BOOLEAN_TRUE); crm_xml_add(replace_request, XML_ATTR_CRM_VERSION, CRM_FEATURE_SET); digest = calculate_xml_versioned_digest(the_cib, FALSE, TRUE, CRM_FEATURE_SET); crm_xml_add(replace_request, XML_ATTR_DIGEST, digest); add_message_xml(replace_request, F_CIB_CALLDATA, the_cib); if (send_cluster_message(all ? NULL : crm_get_peer(0, host), crm_msg_cib, replace_request, FALSE) == FALSE) { result = -ENOTCONN; } free_xml(replace_request); free(digest); return result; } #endif diff --git a/cib/notify.c b/cib/notify.c index bc01086640..272971948e 100644 --- a/cib/notify.c +++ b/cib/notify.c @@ -1,378 +1,377 @@ -/* +/* * Copyright (C) 2004 Andrew Beekhof - * + * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. - * + * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. - * + * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include -#include int pending_updates = 0; -struct cib_notification_s +struct cib_notification_s { xmlNode *msg; struct iovec *iov; }; void attach_cib_generation(xmlNode * msg, const char *field, xmlNode * a_cib); void do_cib_notify(int options, const char *op, xmlNode * update, int result, xmlNode * result_data, const char *msg_type); static void need_pre_notify(gpointer key, gpointer value, gpointer user_data) { crm_client_t *client = value; if (is_set(client->options, cib_notify_pre)) { gboolean *needed = user_data; *needed = TRUE; } } static void need_post_notify(gpointer key, gpointer value, gpointer user_data) { crm_client_t *client = value; if (is_set(client->options, cib_notify_post)) { gboolean *needed = user_data; *needed = TRUE; } } static gboolean cib_notify_send_one(gpointer key, gpointer value, gpointer user_data) { const char *type = NULL; gboolean do_send = FALSE; crm_client_t *client = value; struct cib_notification_s *update = user_data; CRM_CHECK(client != NULL, return TRUE); CRM_CHECK(update != NULL, return TRUE); if (client->ipcs == NULL && client->remote == NULL) { crm_warn("Skipping client with NULL channel"); return FALSE; } type = crm_element_value(update->msg, F_SUBTYPE); CRM_LOG_ASSERT(type != NULL); if (is_set(client->options, cib_notify_diff) && safe_str_eq(type, T_CIB_DIFF_NOTIFY)) { do_send = TRUE; } else if (is_set(client->options, cib_notify_replace) && safe_str_eq(type, T_CIB_REPLACE_NOTIFY)) { do_send = TRUE; } else if (is_set(client->options, cib_notify_confirm) && safe_str_eq(type, T_CIB_UPDATE_CONFIRM)) { do_send = TRUE; } else if (is_set(client->options, cib_notify_pre) && safe_str_eq(type, T_CIB_PRE_NOTIFY)) { do_send = TRUE; } else if (is_set(client->options, cib_notify_post) && safe_str_eq(type, T_CIB_POST_NOTIFY)) { do_send = TRUE; } if (do_send) { switch(client->kind) { case CRM_CLIENT_IPC: if(crm_ipcs_sendv(client, update->iov, crm_ipc_server_event) < 0) { crm_warn("Notification of client %s/%s failed", client->name, client->id); } break; # ifdef HAVE_GNUTLS_GNUTLS_H case CRM_CLIENT_TLS: # endif case CRM_CLIENT_TCP: crm_debug("Sent %s notification to client %s/%s", type, client->name, client->id); crm_remote_send(client->remote, update->msg); break; default: crm_err("Unknown transport %d for %s", client->kind, client->name); } } return FALSE; } static void cib_notify_send(xmlNode *xml) { struct iovec *iov; struct cib_notification_s update; ssize_t rc = crm_ipcs_prepare(0, xml, &iov); crm_trace("Notifying clients"); if(rc > 0) { update.msg = xml; update.iov = iov; g_hash_table_foreach_remove(client_connections, cib_notify_send_one, &update); } else { crm_notice("Notification failed: %s (%d)", pcmk_strerror(rc), rc); } - + if(iov) { free(iov[0].iov_base); free(iov[1].iov_base); free(iov); } crm_trace("Notify complete"); } void cib_pre_notify(int options, const char *op, xmlNode * existing, xmlNode * update) { xmlNode *update_msg = NULL; const char *type = NULL; const char *id = NULL; gboolean needed = FALSE; g_hash_table_foreach(client_connections, need_pre_notify, &needed); if (needed == FALSE) { return; } /* TODO: consider pre-notification for removal */ update_msg = create_xml_node(NULL, "pre-notify"); if (update != NULL) { id = crm_element_value(update, XML_ATTR_ID); } crm_xml_add(update_msg, F_TYPE, T_CIB_NOTIFY); crm_xml_add(update_msg, F_SUBTYPE, T_CIB_PRE_NOTIFY); crm_xml_add(update_msg, F_CIB_OPERATION, op); if (id != NULL) { crm_xml_add(update_msg, F_CIB_OBJID, id); } if (update != NULL) { crm_xml_add(update_msg, F_CIB_OBJTYPE, crm_element_name(update)); } else if (existing != NULL) { crm_xml_add(update_msg, F_CIB_OBJTYPE, crm_element_name(existing)); } type = crm_element_value(update_msg, F_CIB_OBJTYPE); attach_cib_generation(update_msg, "cib_generation", the_cib); if (existing != NULL) { add_message_xml(update_msg, F_CIB_EXISTING, existing); } if (update != NULL) { add_message_xml(update_msg, F_CIB_UPDATE, update); } cib_notify_send(update_msg); if (update == NULL) { crm_trace("Performing operation %s (on section=%s)", op, type); } else { crm_trace("Performing %s on <%s%s%s>", op, type, id ? " id=" : "", id ? id : ""); } free_xml(update_msg); } void cib_post_notify(int options, const char *op, xmlNode * update, int result, xmlNode * new_obj) { gboolean needed = FALSE; g_hash_table_foreach(client_connections, need_post_notify, &needed); if (needed == FALSE) { return; } do_cib_notify(options, op, update, result, new_obj, T_CIB_UPDATE_CONFIRM); } void cib_diff_notify(int options, const char *client, const char *call_id, const char *op, xmlNode * update, int result, xmlNode * diff) { int add_updates = 0; int add_epoch = 0; int add_admin_epoch = 0; int del_updates = 0; int del_epoch = 0; int del_admin_epoch = 0; int log_level = LOG_DEBUG_2; if (diff == NULL) { return; } if (result != pcmk_ok) { log_level = LOG_WARNING; } cib_diff_version_details(diff, &add_admin_epoch, &add_epoch, &add_updates, &del_admin_epoch, &del_epoch, &del_updates); if (add_updates != del_updates) { do_crm_log(log_level, "Update (client: %s%s%s): %d.%d.%d -> %d.%d.%d (%s)", client, call_id ? ", call:" : "", call_id ? call_id : "", del_admin_epoch, del_epoch, del_updates, add_admin_epoch, add_epoch, add_updates, pcmk_strerror(result)); } else if (diff != NULL) { do_crm_log(log_level, "Local-only Change (client:%s%s%s): %d.%d.%d (%s)", client, call_id ? ", call: " : "", call_id ? call_id : "", add_admin_epoch, add_epoch, add_updates, pcmk_strerror(result)); } do_cib_notify(options, op, update, result, diff, T_CIB_DIFF_NOTIFY); } void do_cib_notify(int options, const char *op, xmlNode * update, int result, xmlNode * result_data, const char *msg_type) { xmlNode *update_msg = NULL; const char *id = NULL; update_msg = create_xml_node(NULL, "notify"); if (result_data != NULL) { id = crm_element_value(result_data, XML_ATTR_ID); } crm_xml_add(update_msg, F_TYPE, T_CIB_NOTIFY); crm_xml_add(update_msg, F_SUBTYPE, msg_type); crm_xml_add(update_msg, F_CIB_OPERATION, op); crm_xml_add_int(update_msg, F_CIB_RC, result); if (id != NULL) { crm_xml_add(update_msg, F_CIB_OBJID, id); } if (update != NULL) { crm_trace("Setting type to update->name: %s", crm_element_name(update)); crm_xml_add(update_msg, F_CIB_OBJTYPE, crm_element_name(update)); } else if (result_data != NULL) { crm_trace("Setting type to new_obj->name: %s", crm_element_name(result_data)); crm_xml_add(update_msg, F_CIB_OBJTYPE, crm_element_name(result_data)); } else { crm_trace("Not Setting type"); } attach_cib_generation(update_msg, "cib_generation", the_cib); if (update != NULL) { add_message_xml(update_msg, F_CIB_UPDATE, update); } if (result_data != NULL) { add_message_xml(update_msg, F_CIB_UPDATE_RESULT, result_data); } cib_notify_send(update_msg); free_xml(update_msg); } void attach_cib_generation(xmlNode * msg, const char *field, xmlNode * a_cib) { xmlNode *generation = create_xml_node(NULL, XML_CIB_TAG_GENERATION_TUPPLE); if (a_cib != NULL) { copy_in_properties(generation, a_cib); } add_message_xml(msg, field, generation); free_xml(generation); } void cib_replace_notify(const char *origin, xmlNode * update, int result, xmlNode * diff) { xmlNode *replace_msg = NULL; int add_updates = 0; int add_epoch = 0; int add_admin_epoch = 0; int del_updates = 0; int del_epoch = 0; int del_admin_epoch = 0; if (diff == NULL) { return; } cib_diff_version_details(diff, &add_admin_epoch, &add_epoch, &add_updates, &del_admin_epoch, &del_epoch, &del_updates); if(del_updates < 0) { crm_log_xml_debug(diff, "Bad replace diff"); } if (add_updates != del_updates) { crm_info("Replaced: %d.%d.%d -> %d.%d.%d from %s", del_admin_epoch, del_epoch, del_updates, add_admin_epoch, add_epoch, add_updates, crm_str(origin)); } else if (diff != NULL) { crm_info("Local-only Replace: %d.%d.%d from %s", add_admin_epoch, add_epoch, add_updates, crm_str(origin)); } replace_msg = create_xml_node(NULL, "notify-replace"); crm_xml_add(replace_msg, F_TYPE, T_CIB_NOTIFY); crm_xml_add(replace_msg, F_SUBTYPE, T_CIB_REPLACE_NOTIFY); crm_xml_add(replace_msg, F_CIB_OPERATION, CIB_OP_REPLACE); crm_xml_add_int(replace_msg, F_CIB_RC, result); attach_cib_generation(replace_msg, "cib-replace-generation", update); crm_log_xml_trace(replace_msg, "CIB Replaced"); cib_notify_send(replace_msg); free_xml(replace_msg); } diff --git a/crmd/crmd_fsa.h b/crmd/crmd_fsa.h index 37de62a7f7..2550a0d8c2 100644 --- a/crmd/crmd_fsa.h +++ b/crmd/crmd_fsa.h @@ -1,134 +1,134 @@ -/* +/* * Copyright (C) 2004 Andrew Beekhof - * + * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. - * + * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. - * + * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #ifndef CRMD_FSA__H # define CRMD_FSA__H # include # include # include # include # include # include -# include +# include # if SUPPORT_HEARTBEAT extern ll_cluster_t *fsa_cluster_conn; # endif /* copy from struct client_child in heartbeat.h * * Plus a couple of other things */ struct crm_subsystem_s { pid_t pid; /* Process id of child process */ const char *name; /* executable name */ const char *path; /* Command location */ const char *command; /* Command with path */ const char *args; /* Command arguments */ crm_client_t *client; /* Client connection object */ gboolean sent_kill; mainloop_io_t *source; /* How can we communicate with it */ long long flag_connected; /* */ long long flag_required; /* */ }; typedef struct fsa_timer_s fsa_timer_t; struct fsa_timer_s { guint source_id; /* timer source id */ int period_ms; /* timer period */ enum crmd_fsa_input fsa_input; gboolean(*callback) (gpointer data); gboolean repeat; int counter; }; enum fsa_data_type { fsa_dt_none, fsa_dt_ha_msg, fsa_dt_xml, fsa_dt_lrm, }; typedef struct fsa_data_s fsa_data_t; struct fsa_data_s { int id; enum crmd_fsa_input fsa_input; enum crmd_fsa_cause fsa_cause; long long actions; const char *origin; void *data; enum fsa_data_type data_type; }; extern enum crmd_fsa_state s_crmd_fsa(enum crmd_fsa_cause cause); /* Global FSA stuff */ extern volatile gboolean do_fsa_stall; extern volatile enum crmd_fsa_state fsa_state; extern volatile long long fsa_input_register; extern volatile long long fsa_actions; extern cib_t *fsa_cib_conn; extern char *fsa_our_uname; extern char *fsa_our_uuid; extern char *fsa_pe_ref; /* the last invocation of the PE */ extern char *fsa_our_dc; extern char *fsa_our_dc_version; extern GListPtr fsa_message_queue; extern fsa_timer_t *election_trigger; /* */ extern fsa_timer_t *election_timeout; /* */ extern fsa_timer_t *shutdown_escalation_timer; /* */ extern fsa_timer_t *transition_timer; extern fsa_timer_t *integration_timer; extern fsa_timer_t *finalization_timer; extern fsa_timer_t *wait_timer; extern fsa_timer_t *recheck_timer; extern crm_trigger_t *fsa_source; extern crm_trigger_t *config_read; extern struct crm_subsystem_s *cib_subsystem; extern struct crm_subsystem_s *te_subsystem; extern struct crm_subsystem_s *pe_subsystem; extern GHashTable *welcomed_nodes; extern GHashTable *integrated_nodes; extern GHashTable *finalized_nodes; extern GHashTable *confirmed_nodes; extern GHashTable *crmd_peer_state; /* these two should be moved elsewhere... */ extern void do_update_cib_nodes(gboolean overwrite, const char *caller); extern gboolean do_dc_heartbeat(gpointer data); # define AM_I_DC is_set(fsa_input_register, R_THE_DC) # define AM_I_OPERATIONAL (is_set(fsa_input_register, R_STARTING)==FALSE) extern unsigned long long saved_ccm_membership_id; extern gboolean ever_had_quorum; # include # include #define trigger_fsa(source) crm_trace("Triggering FSA: %s", __FUNCTION__); \ mainloop_set_trigger(source); #endif diff --git a/crmd/crmd_messages.h b/crmd/crmd_messages.h index a4837bbd53..50a56cd35d 100644 --- a/crmd/crmd_messages.h +++ b/crmd/crmd_messages.h @@ -1,111 +1,111 @@ -/* +/* * Copyright (C) 2004 Andrew Beekhof - * + * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. - * + * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. - * + * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #ifndef XML_CRM_MESSAGES__H # define XML_CRM_MESSAGES__H # include -# include +# include # include # include # include typedef struct ha_msg_input_s { xmlNode *msg; xmlNode *xml; } ha_msg_input_t; extern ha_msg_input_t *new_ha_msg_input(xmlNode * orig); extern void delete_ha_msg_input(ha_msg_input_t * orig); extern void *fsa_typed_data_adv(fsa_data_t * fsa_data, enum fsa_data_type a_type, const char *caller); # define fsa_typed_data(x) fsa_typed_data_adv(msg_data, x, __FUNCTION__) extern void register_fsa_error_adv(enum crmd_fsa_cause cause, enum crmd_fsa_input input, fsa_data_t * cur_data, void *new_data, const char *raised_from); # define register_fsa_error(cause, input, new_data) register_fsa_error_adv(cause, input, msg_data, new_data, __FUNCTION__) extern int register_fsa_input_adv(enum crmd_fsa_cause cause, enum crmd_fsa_input input, void *data, long long with_actions, gboolean prepend, const char *raised_from); extern void fsa_dump_queue(int log_level); extern void route_message(enum crmd_fsa_cause cause, xmlNode * input); # define crmd_fsa_stall(suppress) do { \ if(suppress == FALSE && msg_data != NULL) { \ register_fsa_input_adv( \ ((fsa_data_t*)msg_data)->fsa_cause, I_WAIT_FOR_EVENT, \ ((fsa_data_t*)msg_data)->data, action, TRUE, __FUNCTION__); \ } else { \ register_fsa_input_adv( \ C_FSA_INTERNAL, I_WAIT_FOR_EVENT, \ NULL, action, TRUE, __FUNCTION__); \ } \ } while(0) # define register_fsa_input(cause, input, data) register_fsa_input_adv(cause, input, data, A_NOTHING, FALSE, __FUNCTION__) # define register_fsa_action(action) { \ fsa_actions |= action; \ if(fsa_source) { \ mainloop_set_trigger(fsa_source); \ } \ crm_debug("%s added action %s to the FSA", \ __FUNCTION__, fsa_action2string(action)); \ } # define register_fsa_input_before(cause, input, data) register_fsa_input_adv(cause, input, data, A_NOTHING, TRUE, __FUNCTION__) # define register_fsa_input_later(cause, input, data) register_fsa_input_adv(cause, input, data, A_NOTHING, FALSE, __FUNCTION__) void delete_fsa_input(fsa_data_t * fsa_data); GListPtr put_message(fsa_data_t * new_message); fsa_data_t *get_message(void); gboolean is_message(void); gboolean have_wait_message(void); extern gboolean relay_message(xmlNode * relay_message, gboolean originated_locally); extern void process_message(xmlNode * msg, gboolean originated_locally, const char *src_node_name); extern gboolean crm_dc_process_message(xmlNode * whole_message, xmlNode * action, const char *host_from, const char *sys_from, const char *sys_to, const char *op, gboolean dc_mode); extern gboolean send_msg_via_ipc(xmlNode * msg, const char *sys); extern gboolean add_pending_outgoing_reply(const char *originating_node_name, const char *crm_msg_reference, const char *sys_to, const char *sys_from); extern gboolean crmd_authorize_message(xmlNode * client_msg, crm_client_t * curr_client); extern gboolean send_request(xmlNode * msg, char **msg_reference); extern enum crmd_fsa_input handle_message(xmlNode * stored_msg); extern ha_msg_input_t *copy_ha_msg_input(ha_msg_input_t * orig); #endif diff --git a/include/crm/common/mainloop.h b/include/crm/common/mainloop.h index 144e028ef3..d08fb0761b 100644 --- a/include/crm/common/mainloop.h +++ b/include/crm/common/mainloop.h @@ -1,106 +1,106 @@ /* * Copyright (C) 2009 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #ifndef CRM_COMMON_MAINLOOP__H # define CRM_COMMON_MAINLOOP__H /** * \file * \brief Wrappers for and extensions to glib mainloop * \ingroup core */ # include typedef struct trigger_s crm_trigger_t; typedef struct mainloop_io_s mainloop_io_t; typedef struct mainloop_child_s mainloop_child_t; crm_trigger_t *mainloop_add_trigger(int priority, int(*dispatch) (gpointer user_data), gpointer userdata); void mainloop_set_trigger(crm_trigger_t * source); void mainloop_trigger_complete(crm_trigger_t *trig); gboolean mainloop_destroy_trigger(crm_trigger_t * source); gboolean crm_signal(int sig, void (*dispatch) (int sig)); gboolean mainloop_add_signal(int sig, void (*dispatch) (int sig)); gboolean mainloop_destroy_signal(int sig); #include -#include +#include struct ipc_client_callbacks { int (*dispatch)(const char *buffer, ssize_t length, gpointer userdata); void (*destroy) (gpointer); }; qb_ipcs_service_t *mainloop_add_ipc_server( const char *name, enum qb_ipc_type type, struct qb_ipcs_service_handlers *callbacks); void mainloop_del_ipc_server(qb_ipcs_service_t *server); mainloop_io_t *mainloop_add_ipc_client( const char *name, int priority, size_t max_size, void *userdata, struct ipc_client_callbacks *callbacks); void mainloop_del_ipc_client(mainloop_io_t *client); crm_ipc_t *mainloop_get_ipc_client(mainloop_io_t *client); struct mainloop_fd_callbacks { int (*dispatch)(gpointer userdata); void (*destroy)(gpointer userdata); }; mainloop_io_t *mainloop_add_fd( const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks); void mainloop_del_fd(mainloop_io_t *client); /* * Create a new tracked process * To track a process group, use -pid */ void mainloop_add_child(pid_t pid, int timeout, const char *desc, void *userdata, void (*callback)(mainloop_child_t* p, int status, int signo, int exitcode)); void * mainloop_get_child_userdata(mainloop_child_t *child); int mainloop_get_child_timeout(mainloop_child_t *child); pid_t mainloop_get_child_pid(mainloop_child_t *child); void mainloop_clear_child_userdata(mainloop_child_t *child); #define G_PRIORITY_MEDIUM (G_PRIORITY_HIGH/2) #endif diff --git a/lib/cib/cib_remote.c b/lib/cib/cib_remote.c index a19da594b0..0c19724e36 100644 --- a/lib/cib/cib_remote.c +++ b/lib/cib/cib_remote.c @@ -1,619 +1,619 @@ /* * Copyright (c) 2008 Andrew Beekhof * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. - * + * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. - * + * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA * */ #include #include #include #include #include #include #include #include #include #include #include #include #include -#include +#include #include #ifdef HAVE_GNUTLS_GNUTLS_H # undef KEYFILE # include gnutls_anon_client_credentials anon_cred_c; #define DEFAULT_CLIENT_HANDSHAKE_TIMEOUT 5000 /* 5 seconds */ const int kx_prio[] = { GNUTLS_KX_ANON_DH, 0 }; static gboolean remote_gnutls_credentials_init = FALSE; #else typedef void gnutls_session; #endif #include #include #define DH_BITS 1024 typedef struct cib_remote_opaque_s { int flags; int socket; int port; char *server; char *user; char *passwd; gboolean encrypted; crm_remote_t command; crm_remote_t callback; } cib_remote_opaque_t; void cib_remote_connection_destroy(gpointer user_data); int cib_remote_callback_dispatch(gpointer user_data); int cib_remote_command_dispatch(gpointer user_data); int cib_remote_signon(cib_t * cib, const char *name, enum cib_conn_type type); int cib_remote_signoff(cib_t * cib); int cib_remote_free(cib_t * cib); int cib_remote_perform_op(cib_t * cib, const char *op, const char *host, const char *section, xmlNode * data, xmlNode ** output_data, int call_options, const char *name); static int cib_remote_inputfd(cib_t * cib) { cib_remote_opaque_t *private = cib->variant_opaque; return private->callback.tcp_socket; } static int cib_remote_set_connection_dnotify(cib_t * cib, void (*dnotify) (gpointer user_data)) { return -EPROTONOSUPPORT; } static int cib_remote_register_notification(cib_t * cib, const char *callback, int enabled) { xmlNode *notify_msg = create_xml_node(NULL, "cib_command"); cib_remote_opaque_t *private = cib->variant_opaque; crm_xml_add(notify_msg, F_CIB_OPERATION, T_CIB_NOTIFY); crm_xml_add(notify_msg, F_CIB_NOTIFY_TYPE, callback); crm_xml_add_int(notify_msg, F_CIB_NOTIFY_ACTIVATE, enabled); crm_remote_send(&private->callback, notify_msg); free_xml(notify_msg); return pcmk_ok; } cib_t * cib_remote_new(const char *server, const char *user, const char *passwd, int port, gboolean encrypted) { cib_remote_opaque_t *private = NULL; cib_t *cib = cib_new_variant(); private = calloc(1, sizeof(cib_remote_opaque_t)); cib->variant = cib_remote; cib->variant_opaque = private; if (server) { private->server = strdup(server); } if (user) { private->user = strdup(user); } if (passwd) { private->passwd = strdup(passwd); } private->port = port; private->encrypted = encrypted; /* assign variant specific ops */ cib->delegate_fn = cib_remote_perform_op; cib->cmds->signon = cib_remote_signon; cib->cmds->signoff = cib_remote_signoff; cib->cmds->free = cib_remote_free; cib->cmds->inputfd = cib_remote_inputfd; cib->cmds->register_notification = cib_remote_register_notification; cib->cmds->set_connection_dnotify = cib_remote_set_connection_dnotify; return cib; } static int cib_tls_close(cib_t * cib) { cib_remote_opaque_t *private = cib->variant_opaque; #ifdef HAVE_GNUTLS_GNUTLS_H if (private->encrypted) { if (private->command.tls_session) { gnutls_bye(*(private->command.tls_session), GNUTLS_SHUT_RDWR); gnutls_deinit(*(private->command.tls_session)); gnutls_free(private->command.tls_session); } if (private->callback.tls_session) { gnutls_bye(*(private->callback.tls_session), GNUTLS_SHUT_RDWR); gnutls_deinit(*(private->callback.tls_session)); gnutls_free(private->callback.tls_session); } private->command.tls_session = NULL; private->callback.tls_session = NULL; if (remote_gnutls_credentials_init) { gnutls_anon_free_client_credentials(anon_cred_c); gnutls_global_deinit(); remote_gnutls_credentials_init = FALSE; } } #endif if (private->command.tcp_socket) { shutdown(private->command.tcp_socket, SHUT_RDWR); /* no more receptions */ close(private->command.tcp_socket); } if (private->callback.tcp_socket) { shutdown(private->callback.tcp_socket, SHUT_RDWR); /* no more receptions */ close(private->callback.tcp_socket); } private->command.tcp_socket = 0; private->callback.tcp_socket = 0; free(private->command.buffer); free(private->callback.buffer); private->command.buffer = NULL; private->callback.buffer = NULL; return 0; } static int cib_tls_signon(cib_t * cib, crm_remote_t *connection, gboolean event_channel) { int sock; cib_remote_opaque_t *private = cib->variant_opaque; int rc = 0; int disconnected = 0; xmlNode *answer = NULL; xmlNode *login = NULL; static struct mainloop_fd_callbacks cib_fd_callbacks = { 0, }; cib_fd_callbacks.dispatch = event_channel ? cib_remote_callback_dispatch : cib_remote_command_dispatch; cib_fd_callbacks.destroy = cib_remote_connection_destroy; connection->tcp_socket = 0; #ifdef HAVE_GNUTLS_GNUTLS_H connection->tls_session = NULL; #endif sock = crm_remote_tcp_connect(private->server, private->port); if (sock <= 0) { crm_perror(LOG_ERR, "remote tcp connection to %s:%d failed", private->server, private->port); } connection->tcp_socket = sock; if (private->encrypted) { /* initialize GnuTls lib */ #ifdef HAVE_GNUTLS_GNUTLS_H if (remote_gnutls_credentials_init == FALSE) { gnutls_global_init(); gnutls_anon_allocate_client_credentials(&anon_cred_c); remote_gnutls_credentials_init = TRUE; } /* bind the socket to GnuTls lib */ connection->tls_session = crm_create_anon_tls_session(sock, GNUTLS_CLIENT, anon_cred_c); if (crm_initiate_client_tls_handshake(connection, DEFAULT_CLIENT_HANDSHAKE_TIMEOUT) != 0) { crm_err("Session creation for %s:%d failed", private->server, private->port); gnutls_deinit(*connection->tls_session); gnutls_free(connection->tls_session); connection->tls_session = NULL; cib_tls_close(cib); return -1; } #else return -EPROTONOSUPPORT; #endif } /* login to server */ login = create_xml_node(NULL, "cib_command"); crm_xml_add(login, "op", "authenticate"); crm_xml_add(login, "user", private->user); crm_xml_add(login, "password", private->passwd); crm_xml_add(login, "hidden", "password"); crm_remote_send(connection, login); free_xml(login); crm_remote_recv(connection, -1, &disconnected); if (disconnected) { rc = -ENOTCONN; } answer = crm_remote_parse_buffer(connection); crm_log_xml_trace(answer, "Reply"); if (answer == NULL) { rc = -EPROTO; } else { /* grab the token */ const char *msg_type = crm_element_value(answer, F_CIB_OPERATION); const char *tmp_ticket = crm_element_value(answer, F_CIB_CLIENTID); if (safe_str_neq(msg_type, CRM_OP_REGISTER)) { crm_err("Invalid registration message: %s", msg_type); rc = -EPROTO; } else if (tmp_ticket == NULL) { rc = -EPROTO; } else { connection->token = strdup(tmp_ticket); } } free_xml(answer); answer = NULL; if (rc != 0) { cib_tls_close(cib); return rc; } crm_trace("remote client connection established"); connection->source = mainloop_add_fd("cib-remote", G_PRIORITY_HIGH, connection->tcp_socket, cib, &cib_fd_callbacks); return rc; } void cib_remote_connection_destroy(gpointer user_data) { crm_err("Connection destroyed"); #ifdef HAVE_GNUTLS_GNUTLS_H cib_tls_close(user_data); #endif return; } int cib_remote_command_dispatch(gpointer user_data) { int disconnected = 0; cib_t *cib = user_data; cib_remote_opaque_t *private = cib->variant_opaque; crm_remote_recv(&private->command, -1, &disconnected); free(private->command.buffer); private->command.buffer = NULL; crm_err("received late reply for remote cib connection, discarding"); if (disconnected) { return -1; } return 0; } int cib_remote_callback_dispatch(gpointer user_data) { cib_t *cib = user_data; cib_remote_opaque_t *private = cib->variant_opaque; xmlNode *msg = NULL; int disconnected = 0; crm_info("Message on callback channel"); crm_remote_recv(&private->callback, -1, &disconnected); msg = crm_remote_parse_buffer(&private->callback); while (msg) { const char *type = crm_element_value(msg, F_TYPE); crm_trace("Activating %s callbacks...", type); if (safe_str_eq(type, T_CIB)) { cib_native_callback(cib, msg, 0, 0); } else if (safe_str_eq(type, T_CIB_NOTIFY)) { g_list_foreach(cib->notify_list, cib_native_notify, msg); } else { crm_err("Unknown message type: %s", type); } free_xml(msg); msg = crm_remote_parse_buffer(&private->callback); } if (disconnected) { return -1; } return 0; } int cib_remote_signon(cib_t * cib, const char *name, enum cib_conn_type type) { int rc = pcmk_ok; cib_remote_opaque_t *private = cib->variant_opaque; if (private->passwd == NULL) { struct termios settings; int rc; rc = tcgetattr(0, &settings); settings.c_lflag &= ~ECHO; rc = tcsetattr(0, TCSANOW, &settings); fprintf(stderr, "Password: "); private->passwd = calloc(1, 1024); rc = scanf("%s", private->passwd); fprintf(stdout, "\n"); /* fprintf(stderr, "entered: '%s'\n", buffer); */ if (rc < 1) { private->passwd = NULL; } settings.c_lflag |= ECHO; rc = tcsetattr(0, TCSANOW, &settings); } if (private->server == NULL || private->user == NULL) { rc = -EINVAL; } if (rc == pcmk_ok) { rc = cib_tls_signon(cib, &(private->command), FALSE); } if (rc == pcmk_ok) { rc = cib_tls_signon(cib, &(private->callback), TRUE); } if (rc == pcmk_ok) { xmlNode *hello = cib_create_op(0, private->callback.token, CRM_OP_REGISTER, NULL, NULL, NULL, 0, NULL); crm_xml_add(hello, F_CIB_CLIENTNAME, name); crm_remote_send(&private->command, hello); free_xml(hello); } if (rc == pcmk_ok) { crm_notice("%s: Opened connection to %s:%d\n", name, private->server, private->port); cib->state = cib_connected_command; cib->type = cib_command; } else { fprintf(stderr, "%s: Connection to %s:%d failed: %s\n", name, private->server, private->port, pcmk_strerror(rc)); } return rc; } int cib_remote_signoff(cib_t * cib) { int rc = pcmk_ok; /* cib_remote_opaque_t *private = cib->variant_opaque; */ crm_debug("Signing out of the CIB Service"); #ifdef HAVE_GNUTLS_GNUTLS_H cib_tls_close(cib); #endif cib->state = cib_disconnected; cib->type = cib_none; return rc; } int cib_remote_free(cib_t * cib) { int rc = pcmk_ok; crm_warn("Freeing CIB"); if (cib->state != cib_disconnected) { rc = cib_remote_signoff(cib); if (rc == pcmk_ok) { cib_remote_opaque_t *private = cib->variant_opaque; free(private->server); free(private->user); free(private->passwd); free(cib->cmds); free(private); free(cib); } } return rc; } int cib_remote_perform_op(cib_t * cib, const char *op, const char *host, const char *section, xmlNode * data, xmlNode ** output_data, int call_options, const char *name) { int rc = pcmk_ok; int disconnected = 0; int remaining_time = 0; time_t start_time; xmlNode *op_msg = NULL; xmlNode *op_reply = NULL; cib_remote_opaque_t *private = cib->variant_opaque; if (cib->state == cib_disconnected) { return -ENOTCONN; } if (output_data != NULL) { *output_data = NULL; } if (op == NULL) { crm_err("No operation specified"); return -EINVAL; } cib->call_id++; /* prevent call_id from being negative (or zero) and conflicting * with the cib_errors enum * use 2 because we use it as (cib->call_id - 1) below */ if (cib->call_id < 1) { cib->call_id = 1; } op_msg = cib_create_op(cib->call_id, private->callback.token, op, host, section, data, call_options, NULL); if (op_msg == NULL) { return -EPROTO; } crm_trace("Sending %s message to CIB service", op); if (!(call_options & cib_sync_call)) { crm_remote_send(&private->callback, op_msg); } else { crm_remote_send(&private->command, op_msg); } free_xml(op_msg); if ((call_options & cib_discard_reply)) { crm_trace("Discarding reply"); return pcmk_ok; } else if (!(call_options & cib_sync_call)) { return cib->call_id; } crm_trace("Waiting for a syncronous reply"); start_time = time(NULL); remaining_time = cib->call_timeout ? cib->call_timeout : 60; while (remaining_time > 0 && !disconnected) { int reply_id = -1; int msg_id = cib->call_id; crm_remote_recv(&private->command, remaining_time * 1000, &disconnected); op_reply = crm_remote_parse_buffer(&private->command); if (!op_reply) { break; } crm_element_value_int(op_reply, F_CIB_CALLID, &reply_id); if (reply_id == msg_id) { break; } else if (reply_id < msg_id) { crm_debug("Received old reply: %d (wanted %d)", reply_id, msg_id); crm_log_xml_trace(op_reply, "Old reply"); } else if ((reply_id - 10000) > msg_id) { /* wrap-around case */ crm_debug("Received old reply: %d (wanted %d)", reply_id, msg_id); crm_log_xml_trace(op_reply, "Old reply"); } else { crm_err("Received a __future__ reply:" " %d (wanted %d)", reply_id, msg_id); } free_xml(op_reply); op_reply = NULL; /* wasn't the right reply, try and read some more */ remaining_time = time(NULL) - start_time; } /* if(IPC_ISRCONN(native->command_channel) == FALSE) { */ /* crm_err("CIB disconnected: %d", */ /* native->command_channel->ch_status); */ /* cib->state = cib_disconnected; */ /* } */ if (disconnected) { crm_err("Disconnected while waiting for reply."); return -ENOTCONN; } else if (op_reply == NULL) { crm_err("No reply message - empty"); return -ENOMSG; } crm_trace("Syncronous reply received"); /* Start processing the reply... */ if (crm_element_value_int(op_reply, F_CIB_RC, &rc) != 0) { rc = -EPROTO; } if (rc == -pcmk_err_diff_resync) { /* This is an internal value that clients do not and should not care about */ rc = pcmk_ok; } if (rc == pcmk_ok || rc == -EPERM) { crm_log_xml_debug(op_reply, "passed"); } else { /* } else if(rc == -ETIME) { */ crm_err("Call failed: %s", pcmk_strerror(rc)); crm_log_xml_warn(op_reply, "failed"); } if (output_data == NULL) { /* do nothing more */ } else if (!(call_options & cib_discard_reply)) { xmlNode *tmp = get_message_xml(op_reply, F_CIB_CALLDATA); if (tmp == NULL) { crm_trace("No output in reply to \"%s\" command %d", op, cib->call_id - 1); } else { *output_data = copy_xml(tmp); } } free_xml(op_reply); return rc; } diff --git a/lib/common/mainloop.c b/lib/common/mainloop.c index 887beb9355..1fa58e5d21 100644 --- a/lib/common/mainloop.c +++ b/lib/common/mainloop.c @@ -1,879 +1,879 @@ -/* +/* * Copyright (C) 2004 Andrew Beekhof - * + * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. - * + * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. - * + * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #ifndef _GNU_SOURCE # define _GNU_SOURCE #endif #include #include #include #include #include #include #include -#include +#include struct mainloop_child_s { pid_t pid; char *desc; unsigned timerid; unsigned watchid; gboolean timeout; void *privatedata; /* Called when a process dies */ void (*callback)(mainloop_child_t* p, int status, int signo, int exitcode); }; struct trigger_s { GSource source; gboolean running; gboolean trigger; void *user_data; guint id; }; static gboolean crm_trigger_prepare(GSource * source, gint * timeout) { crm_trigger_t *trig = (crm_trigger_t *) source; /* cluster-glue's FD and IPC related sources make use of * g_source_add_poll() but do not set a timeout in their prepare * functions * * This means mainloop's poll() will block until an event for one * of these sources occurs - any /other/ type of source, such as * this one or g_idle_*, that doesn't use g_source_add_poll() is * S-O-L and wont be processed until there is something fd-based * happens. * * Luckily the timeout we can set here affects all sources and * puts an upper limit on how long poll() can take. * * So unconditionally set a small-ish timeout, not too small that * we're in constant motion, which will act as an upper bound on * how long the signal handling might be delayed for. */ *timeout = 500; /* Timeout in ms */ return trig->trigger; } static gboolean crm_trigger_check(GSource * source) { crm_trigger_t *trig = (crm_trigger_t *) source; return trig->trigger; } static gboolean crm_trigger_dispatch(GSource * source, GSourceFunc callback, gpointer userdata) { int rc = TRUE; crm_trigger_t *trig = (crm_trigger_t *) source; if(trig->running) { /* Wait until the existing job is complete before starting the next one */ return TRUE; } trig->trigger = FALSE; if (callback) { rc = callback(trig->user_data); if(rc < 0) { crm_trace("Trigger handler %p not yet complete", trig); trig->running = TRUE; rc = TRUE; } } return rc; } static GSourceFuncs crm_trigger_funcs = { crm_trigger_prepare, crm_trigger_check, crm_trigger_dispatch, NULL }; static crm_trigger_t * mainloop_setup_trigger(GSource * source, int priority, int(*dispatch) (gpointer user_data), gpointer userdata) { crm_trigger_t *trigger = NULL; trigger = (crm_trigger_t *) source; trigger->id = 0; trigger->trigger = FALSE; trigger->user_data = userdata; if (dispatch) { g_source_set_callback(source, dispatch, trigger, NULL); } g_source_set_priority(source, priority); g_source_set_can_recurse(source, FALSE); trigger->id = g_source_attach(source, NULL); return trigger; } void -mainloop_trigger_complete(crm_trigger_t *trig) +mainloop_trigger_complete(crm_trigger_t *trig) { crm_trace("Trigger handler %p complete", trig); trig->running = FALSE; } /* If dispatch returns: * -1: Job running but not complete * 0: Remove the trigger from mainloop * 1: Leave the trigger in mainloop */ crm_trigger_t * mainloop_add_trigger(int priority, int(*dispatch) (gpointer user_data), gpointer userdata) { GSource *source = NULL; CRM_ASSERT(sizeof(crm_trigger_t) > sizeof(GSource)); source = g_source_new(&crm_trigger_funcs, sizeof(crm_trigger_t)); CRM_ASSERT(source != NULL); return mainloop_setup_trigger(source, priority, dispatch, userdata); } void mainloop_set_trigger(crm_trigger_t * source) { source->trigger = TRUE; } gboolean mainloop_destroy_trigger(crm_trigger_t * source) { source->trigger = FALSE; if (source->id > 0) { g_source_remove(source->id); source->id = 0; } return TRUE; } typedef struct signal_s { crm_trigger_t trigger; /* must be first */ void (*handler) (int sig); int signal; } crm_signal_t; static crm_signal_t *crm_signals[NSIG]; static gboolean crm_signal_dispatch(GSource * source, GSourceFunc callback, gpointer userdata) { crm_signal_t *sig = (crm_signal_t *) source; crm_info("Invoking handler for signal %d: %s", sig->signal, strsignal(sig->signal)); sig->trigger.trigger = FALSE; if (sig->handler) { sig->handler(sig->signal); } return TRUE; } static void mainloop_signal_handler(int sig) { if (sig > 0 && sig < NSIG && crm_signals[sig] != NULL) { mainloop_set_trigger((crm_trigger_t *) crm_signals[sig]); } } static GSourceFuncs crm_signal_funcs = { crm_trigger_prepare, crm_trigger_check, crm_signal_dispatch, NULL }; gboolean crm_signal(int sig, void (*dispatch) (int sig)) { sigset_t mask; struct sigaction sa; struct sigaction old; if (sigemptyset(&mask) < 0) { crm_perror(LOG_ERR, "Call to sigemptyset failed"); return FALSE; } memset(&sa, 0, sizeof(struct sigaction)); sa.sa_handler = dispatch; sa.sa_flags = SA_RESTART; sa.sa_mask = mask; if (sigaction(sig, &sa, &old) < 0) { crm_perror(LOG_ERR, "Could not install signal handler for signal %d", sig); return FALSE; } return TRUE; } gboolean mainloop_add_signal(int sig, void (*dispatch) (int sig)) { GSource *source = NULL; int priority = G_PRIORITY_HIGH - 1; if (sig == SIGTERM) { /* TERM is higher priority than other signals, * signals are higher priority than other ipc. * Yes, minus: smaller is "higher" */ priority--; } if (sig >= NSIG || sig < 0) { crm_err("Signal %d is out of range", sig); return FALSE; } else if (crm_signals[sig] != NULL && crm_signals[sig]->handler == dispatch) { crm_trace("Signal handler for %d is already installed", sig); return TRUE; } else if (crm_signals[sig] != NULL) { crm_err("Different signal handler for %d is already installed", sig); return FALSE; } CRM_ASSERT(sizeof(crm_signal_t) > sizeof(GSource)); source = g_source_new(&crm_signal_funcs, sizeof(crm_signal_t)); crm_signals[sig] = (crm_signal_t *) mainloop_setup_trigger(source, priority, NULL, NULL); CRM_ASSERT(crm_signals[sig] != NULL); crm_signals[sig]->handler = dispatch; crm_signals[sig]->signal = sig; if (crm_signal(sig, mainloop_signal_handler) == FALSE) { crm_signal_t *tmp = crm_signals[sig]; crm_signals[sig] = NULL; mainloop_destroy_trigger((crm_trigger_t *) tmp); return FALSE; } #if 0 /* If we want signals to interrupt mainloop's poll(), instead of waiting for * the timeout, then we should call siginterrupt() below * * For now, just enforce a low timeout */ if (siginterrupt(sig, 1) < 0) { crm_perror(LOG_INFO, "Could not enable system call interruptions for signal %d", sig); } #endif return TRUE; } gboolean mainloop_destroy_signal(int sig) { crm_signal_t *tmp = NULL; if (sig >= NSIG || sig < 0) { crm_err("Signal %d is out of range", sig); return FALSE; } else if (crm_signal(sig, NULL) == FALSE) { crm_perror(LOG_ERR, "Could not uninstall signal handler for signal %d", sig); return FALSE; } else if (crm_signals[sig] == NULL) { return TRUE; } tmp = crm_signals[sig]; crm_signals[sig] = NULL; mainloop_destroy_trigger((crm_trigger_t *) tmp); return TRUE; } static qb_array_t *gio_map = NULL; /* * libqb... */ struct gio_to_qb_poll { int32_t is_used; GIOChannel *channel; guint source; int32_t events; void * data; qb_ipcs_dispatch_fn_t fn; enum qb_loop_priority p; }; static int gio_adapter_refcount(struct gio_to_qb_poll *adaptor) { /* This is evil * Looking at the giochannel header file, ref_count is the first member of channel * So cheat... */ if(adaptor && adaptor->channel) { int *ref = (void*)adaptor->channel; return *ref; } return 0; } static gboolean gio_read_socket (GIOChannel *gio, GIOCondition condition, gpointer data) { struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data; gint fd = g_io_channel_unix_get_fd(gio); crm_trace("%p.%d %d (ref=%d)", data, fd, condition, gio_adapter_refcount(adaptor)); if(condition & G_IO_NVAL) { crm_trace("Marking failed adaptor %p unused", adaptor); adaptor->is_used = QB_FALSE; } return (adaptor->fn(fd, condition, adaptor->data) == 0); } static void -gio_poll_destroy(gpointer data) +gio_poll_destroy(gpointer data) { /* adaptor->source is valid but about to be destroyed (ref_count == 0) in gmain.c * adaptor->channel will still have ref_count > 0... should be == 1 */ struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data; crm_trace("Destroying adaptor %p channel %p (ref=%d)", adaptor, adaptor->channel, gio_adapter_refcount(adaptor)); adaptor->is_used = QB_FALSE; adaptor->channel = NULL; adaptor->source = 0; } static int32_t gio_poll_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts, void *data, qb_ipcs_dispatch_fn_t fn) { struct gio_to_qb_poll *adaptor; GIOChannel *channel; int32_t res = 0; res = qb_array_index(gio_map, fd, (void**)&adaptor); if (res < 0) { crm_err("Array lookup failed for fd=%d: %d", fd, res); return res; } crm_trace("Adding fd=%d to mainloop as adapater %p", fd, adaptor); if (adaptor->is_used) { crm_err("Adapter for descriptor %d is still in-use", fd); return -EEXIST; } /* channel is created with ref_count = 1 */ channel = g_io_channel_unix_new(fd); if (!channel) { crm_err("No memory left to add fd=%d", fd); return -ENOMEM; } /* Because unlike the poll() API, glib doesn't tell us about HUPs by default */ evts |= (G_IO_HUP|G_IO_NVAL|G_IO_ERR); adaptor->channel = channel; adaptor->fn = fn; adaptor->events = evts; adaptor->data = data; adaptor->p = p; adaptor->is_used = QB_TRUE; adaptor->source = g_io_add_watch_full(channel, G_PRIORITY_DEFAULT, evts, gio_read_socket, adaptor, gio_poll_destroy); /* Now that mainloop now holds a reference to adaptor->channel, * thanks to g_io_add_watch_full(), drop ours from g_io_channel_unix_new(). * * This means that adaptor->channel will be free'd by: * g_main_context_dispatch() * -> g_source_destroy_internal() * -> g_source_callback_unref() * shortly after gio_poll_destroy() completes */ - g_io_channel_unref(adaptor->channel); + g_io_channel_unref(adaptor->channel); crm_trace("Added to mainloop with gsource id=%d, ref=%d", adaptor->source, gio_adapter_refcount(adaptor)); if(adaptor->source > 0) { return 0; } - + return -EINVAL; } static int32_t gio_poll_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t evts, void *data, qb_ipcs_dispatch_fn_t fn) { return 0; } static int32_t gio_poll_dispatch_del(int32_t fd) { struct gio_to_qb_poll *adaptor; crm_trace("Looking for fd=%d", fd); if (qb_array_index(gio_map, fd, (void**)&adaptor) == 0) { crm_trace("Marking adaptor %p unused (ref=%d)", adaptor, gio_adapter_refcount(adaptor)); adaptor->is_used = QB_FALSE; } return 0; } struct qb_ipcs_poll_handlers gio_poll_funcs = { .job_add = NULL, .dispatch_add = gio_poll_dispatch_add, .dispatch_mod = gio_poll_dispatch_mod, .dispatch_del = gio_poll_dispatch_del, }; static enum qb_ipc_type pick_ipc_type(enum qb_ipc_type requested) { const char *env = getenv("PCMK_ipc_type"); if(env && strcmp("shared-mem", env) == 0) { return QB_IPC_SHM; } else if(env && strcmp("socket", env) == 0) { return QB_IPC_SOCKET; } else if(env && strcmp("posix", env) == 0) { return QB_IPC_POSIX_MQ; } else if(env && strcmp("sysv", env) == 0) { return QB_IPC_SYSV_MQ; } else if(requested == QB_IPC_NATIVE) { /* We prefer shared memory because the server never blocks on * send. If part of a message fits into the socket, libqb * needs to block until the remainder can be sent also. * Otherwise the client will wait forever for the remaining * bytes. */ return QB_IPC_SHM; } return requested; } qb_ipcs_service_t *mainloop_add_ipc_server( - const char *name, enum qb_ipc_type type, struct qb_ipcs_service_handlers *callbacks) + const char *name, enum qb_ipc_type type, struct qb_ipcs_service_handlers *callbacks) { int rc = 0; qb_ipcs_service_t* server = NULL; if(gio_map == NULL) { gio_map = qb_array_create_2(64, sizeof(struct gio_to_qb_poll), 1); } crm_client_init(); server = qb_ipcs_create(name, 0, pick_ipc_type(type), callbacks); qb_ipcs_poll_handlers_set(server, &gio_poll_funcs); rc = qb_ipcs_run(server); if (rc < 0) { crm_err("Could not start %s IPC server: %s (%d)", name, pcmk_strerror(rc), rc); return NULL; } return server; } -void mainloop_del_ipc_server(qb_ipcs_service_t *server) +void mainloop_del_ipc_server(qb_ipcs_service_t *server) { if(server) { qb_ipcs_destroy(server); } } struct mainloop_io_s { char *name; void *userdata; guint source; crm_ipc_t *ipc; GIOChannel *channel; int (*dispatch_fn_ipc)(const char *buffer, ssize_t length, gpointer userdata); int (*dispatch_fn_io) (gpointer userdata); void (*destroy_fn) (gpointer userdata); }; static int -mainloop_gio_refcount(mainloop_io_t *client) +mainloop_gio_refcount(mainloop_io_t *client) { /* This is evil * Looking at the giochannel header file, ref_count is the first member of channel * So cheat... */ if(client && client->channel) { int *ref = (void*)client->channel; return *ref; } return 0; } static gboolean mainloop_gio_callback(GIOChannel *gio, GIOCondition condition, gpointer data) { gboolean keep = TRUE; mainloop_io_t *client = data; if(condition & G_IO_IN) { if(client->ipc) { long rc = 0; int max = 10; do { rc = crm_ipc_read(client->ipc); if(rc <= 0) { crm_trace("Message acquisition from %s[%p] failed: %s (%ld)", client->name, client, pcmk_strerror(rc), rc); } else if(client->dispatch_fn_ipc) { const char *buffer = crm_ipc_buffer(client->ipc); crm_trace("New message from %s[%p] = %d", client->name, client, rc, condition); if(client->dispatch_fn_ipc(buffer, rc, client->userdata) < 0) { crm_trace("Connection to %s no longer required", client->name); keep = FALSE; } } } while(keep && rc > 0 && --max > 0); } else { crm_trace("New message from %s[%p]", client->name, client); if(client->dispatch_fn_io) { if(client->dispatch_fn_io(client->userdata) < 0) { crm_trace("Connection to %s no longer required", client->name); keep = FALSE; } } } } if(client->ipc && crm_ipc_connected(client->ipc) == FALSE) { crm_err("Connection to %s[%p] closed (I/O condition=%d)", client->name, client, condition); keep = FALSE; } else if(condition & (G_IO_HUP|G_IO_NVAL|G_IO_ERR)) { crm_trace("The connection %s[%p] has been closed (I/O condition=%d, refcount=%d)", client->name, client, condition, mainloop_gio_refcount(client)); keep = FALSE; } else if((condition & G_IO_IN) == 0) { /* #define GLIB_SYSDEF_POLLIN =1 #define GLIB_SYSDEF_POLLPRI =2 #define GLIB_SYSDEF_POLLOUT =4 #define GLIB_SYSDEF_POLLERR =8 #define GLIB_SYSDEF_POLLHUP =16 #define GLIB_SYSDEF_POLLNVAL =32 typedef enum { G_IO_IN GLIB_SYSDEF_POLLIN, G_IO_OUT GLIB_SYSDEF_POLLOUT, G_IO_PRI GLIB_SYSDEF_POLLPRI, G_IO_ERR GLIB_SYSDEF_POLLERR, G_IO_HUP GLIB_SYSDEF_POLLHUP, G_IO_NVAL GLIB_SYSDEF_POLLNVAL } GIOCondition; A bitwise combination representing a condition to watch for on an event source. G_IO_IN There is data to read. G_IO_OUT Data can be written (without blocking). G_IO_PRI There is urgent data to read. G_IO_ERR Error condition. G_IO_HUP Hung up (the connection has been broken, usually for pipes and sockets). - G_IO_NVAL Invalid request. The file descriptor is not open. + G_IO_NVAL Invalid request. The file descriptor is not open. */ crm_err("Strange condition: %d", condition); } /* keep == FALSE results in mainloop_gio_destroy() being called * just before the source is removed from mainloop */ return keep; } static void mainloop_gio_destroy(gpointer c) { mainloop_io_t *client = c; /* client->source is valid but about to be destroyed (ref_count == 0) in gmain.c * client->channel will still have ref_count > 0... should be == 1 */ crm_trace("Destroying client %s[%p] %d", client->name, c, mainloop_gio_refcount(client)); if(client->ipc) { crm_ipc_close(client->ipc); } if(client->destroy_fn) { client->destroy_fn(client->userdata); } - + if(client->ipc) { crm_ipc_destroy(client->ipc); } crm_trace("Destroyed client %s[%p] %d", client->name, c, mainloop_gio_refcount(client)); free(client->name); memset(client, 0, sizeof(mainloop_io_t)); /* A bit of pointless paranoia */ free(client); } mainloop_io_t * mainloop_add_ipc_client( - const char *name, int priority, size_t max_size, void *userdata, struct ipc_client_callbacks *callbacks) + const char *name, int priority, size_t max_size, void *userdata, struct ipc_client_callbacks *callbacks) { mainloop_io_t *client = NULL; crm_ipc_t *conn = crm_ipc_new(name, max_size); if(conn && crm_ipc_connect(conn)) { int32_t fd = crm_ipc_get_fd(conn); client = mainloop_add_fd(name, priority, fd, userdata, NULL); client->ipc = conn; client->destroy_fn = callbacks->destroy; client->dispatch_fn_ipc = callbacks->dispatch; } if(conn && client == NULL) { crm_trace("Connection to %s failed", name); crm_ipc_close(conn); crm_ipc_destroy(conn); } - + return client; } void mainloop_del_ipc_client(mainloop_io_t *client) { mainloop_del_fd(client); } crm_ipc_t * mainloop_get_ipc_client(mainloop_io_t *client) { if(client) { return client->ipc; } return NULL; } mainloop_io_t * mainloop_add_fd( - const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks) + const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks) { mainloop_io_t *client = NULL; if(fd > 0) { - client = calloc(1, sizeof(mainloop_io_t)); + client = calloc(1, sizeof(mainloop_io_t)); client->name = strdup(name); client->userdata = userdata; if(callbacks) { client->destroy_fn = callbacks->destroy; client->dispatch_fn_io = callbacks->dispatch; } client->channel = g_io_channel_unix_new(fd); client->source = g_io_add_watch_full( client->channel, priority, (G_IO_IN|G_IO_HUP|G_IO_NVAL|G_IO_ERR), mainloop_gio_callback, client, mainloop_gio_destroy); /* Now that mainloop now holds a reference to adaptor->channel, * thanks to g_io_add_watch_full(), drop ours from g_io_channel_unix_new(). * * This means that adaptor->channel will be free'd by: * g_main_context_dispatch() or g_source_remove() * -> g_source_destroy_internal() * -> g_source_callback_unref() * shortly after mainloop_gio_destroy() completes */ g_io_channel_unref(client->channel); crm_trace("Added connection %d for %s[%p].%d %d", client->source, client->name, client, fd, mainloop_gio_refcount(client)); } return client; } void mainloop_del_fd(mainloop_io_t *client) { if(client != NULL) { crm_trace("Removing client %s[%p] %d", client->name, client, mainloop_gio_refcount(client)); if (client->source) { /* Results in mainloop_gio_destroy() being called just * before the source is removed from mainloop */ g_source_remove(client->source); } } } pid_t mainloop_get_child_pid(mainloop_child_t *child) { return child->pid; } int mainloop_get_child_timeout(mainloop_child_t *child) { return child->timeout; } void * mainloop_get_child_userdata(mainloop_child_t *child) { return child->privatedata; } void mainloop_clear_child_userdata(mainloop_child_t *child) { child->privatedata = NULL; } static gboolean child_timeout_callback(gpointer p) { mainloop_child_t *child = p; child->timerid = 0; if (child->timeout) { crm_crit("%s process (PID %d) will not die!", child->desc, (int)child->pid); return FALSE; } child->timeout = TRUE; crm_warn("%s process (PID %d) timed out", child->desc, (int)child->pid); if (kill(child->pid, SIGKILL) < 0) { if (errno == ESRCH) { /* Nothing left to do */ return FALSE; } crm_perror(LOG_ERR, "kill(%d, KILL) failed", child->pid); } child->timerid = g_timeout_add(5000, child_timeout_callback, child); return FALSE; } static void mainloop_child_destroy(mainloop_child_t *child) { if (child->timerid != 0) { crm_trace("Removing timer %d", child->timerid); g_source_remove(child->timerid); child->timerid = 0; } free(child->desc); g_free(child); } static void child_death_dispatch(GPid pid, gint status, gpointer user_data) { int signo = 0; int exitcode = 0; mainloop_child_t *child = user_data; crm_trace("Managed process %d exited: %p", pid, child); if (WIFEXITED(status)) { exitcode = WEXITSTATUS(status); crm_trace("Managed process %d (%s) exited with rc=%d", pid, child->desc, exitcode); } else if (WIFSIGNALED(status)) { signo = WTERMSIG(status); crm_trace("Managed process %d (%s) exited with signal=%d", pid, child->desc, signo); } #ifdef WCOREDUMP if (WCOREDUMP(status)) { crm_err("Managed process %d (%s) dumped core", pid, child->desc); } #endif if (child->callback) { child->callback(child, status, signo, exitcode); } crm_trace("Removed process entry for %d", pid); mainloop_child_destroy(child); return; } /* Create/Log a new tracked process * To track a process group, use -pid */ void mainloop_add_child(pid_t pid, int timeout, const char *desc, void * privatedata, void (*callback)(mainloop_child_t *p, int status, int signo, int exitcode)) { mainloop_child_t *child = g_new(mainloop_child_t, 1); child->pid = pid; child->timerid = 0; child->timeout = FALSE; child->desc = strdup(desc); child->privatedata = privatedata; child->callback = callback; if (timeout) { child->timerid = g_timeout_add( timeout, child_timeout_callback, child); } child->watchid = g_child_watch_add(pid, child_death_dispatch, child); } diff --git a/lib/lrmd/lrmd_client.c b/lib/lrmd/lrmd_client.c index 1f739d418b..6417875e9f 100644 --- a/lib/lrmd/lrmd_client.c +++ b/lib/lrmd/lrmd_client.c @@ -1,1898 +1,1899 @@ /* * Copyright (c) 2012 David Vossel * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. - * + * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. - * + * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA * */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include +#include #include #include #ifdef HAVE_GNUTLS_GNUTLS_H # undef KEYFILE # include #endif #include #include #include #include CRM_TRACE_INIT_DATA(lrmd); static stonith_t *stonith_api = NULL; static int lrmd_api_disconnect(lrmd_t *lrmd); static int lrmd_api_is_connected(lrmd_t *lrmd); #ifdef HAVE_GNUTLS_GNUTLS_H #define LRMD_CLIENT_HANDSHAKE_TIMEOUT 5000 /* 5 seconds */ gnutls_psk_client_credentials_t psk_cred_s; int lrmd_tls_set_key(gnutls_datum_t *key, const char *location); static void lrmd_tls_disconnect(lrmd_t *lrmd); static int global_remote_msg_id = 0; int lrmd_tls_send_msg(crm_remote_t *session, xmlNode *msg, uint32_t id, const char *msg_type); static void lrmd_tls_connection_destroy(gpointer userdata); #endif typedef struct lrmd_private_s { enum client_type type; char *token; mainloop_io_t *source; /* IPC parameters */ crm_ipc_t *ipc; crm_remote_t *remote; /* Extra TLS parameters */ char *remote_nodename; #ifdef HAVE_GNUTLS_GNUTLS_H char *server; int port; gnutls_psk_client_credentials_t psk_cred_c; int sock; GList *pending_notify; crm_trigger_t *process_notify; #endif lrmd_event_callback callback; } lrmd_private_t; static lrmd_list_t * lrmd_list_add(lrmd_list_t * head, const char *value) { lrmd_list_t *p, *end; p = calloc(1, sizeof(lrmd_list_t)); p->val = strdup(value); end = head; while (end && end->next) { end = end->next; } if (end) { end->next = p; } else { head = p; } return head; } void lrmd_list_freeall(lrmd_list_t * head) { lrmd_list_t *p; while (head) { char *val = (char *)head->val; p = head->next; free(val); free(head); head = p; } } lrmd_key_value_t * lrmd_key_value_add(lrmd_key_value_t * head, const char *key, const char *value) { lrmd_key_value_t *p, *end; p = calloc(1, sizeof(lrmd_key_value_t)); p->key = strdup(key); p->value = strdup(value); end = head; while (end && end->next) { end = end->next; } if (end) { end->next = p; } else { head = p; } return head; } void lrmd_key_value_freeall(lrmd_key_value_t * head) { lrmd_key_value_t *p; while (head) { p = head->next; free(head->key); free(head->value); free(head); head = p; } } static void dup_attr(gpointer key, gpointer value, gpointer user_data) { g_hash_table_replace(user_data, strdup(key), strdup(value)); } lrmd_event_data_t * lrmd_copy_event(lrmd_event_data_t * event) { lrmd_event_data_t *copy = NULL; copy = calloc(1, sizeof(lrmd_event_data_t)); /* This will get all the int values. * we just have to be careful not to leave any * dangling pointers to strings. */ memcpy(copy, event, sizeof(lrmd_event_data_t)); copy->rsc_id = event->rsc_id ? strdup(event->rsc_id) : NULL; copy->op_type = event->op_type ? strdup(event->op_type) : NULL; copy->user_data = event->user_data ? strdup(event->user_data) : NULL; copy->output = event->output ? strdup(event->output) : NULL; copy->remote_nodename = event->remote_nodename ? strdup(event->remote_nodename) : NULL; if (event->params) { copy->params = g_hash_table_new_full(crm_str_hash, g_str_equal, g_hash_destroy_str, g_hash_destroy_str); if (copy->params != NULL) { g_hash_table_foreach(event->params, dup_attr, copy->params); } } return copy; } void lrmd_free_event(lrmd_event_data_t * event) { if (!event) { return; } /* free gives me grief if i try to cast */ free((char *)event->rsc_id); free((char *)event->op_type); free((char *)event->user_data); free((char *)event->output); free((char *)event->remote_nodename); if (event->params) { g_hash_table_destroy(event->params); } free(event); } static int lrmd_dispatch_internal(lrmd_t *lrmd, xmlNode *msg) { const char *type; lrmd_private_t *native = lrmd->private; lrmd_event_data_t event = { 0, }; if (!native->callback) { /* no callback set */ crm_trace("notify event received but client has not set callback"); return 1; } event.remote_nodename = native->remote_nodename; type = crm_element_value(msg, F_LRMD_OPERATION); crm_element_value_int(msg, F_LRMD_CALLID, &event.call_id); event.rsc_id = crm_element_value(msg, F_LRMD_RSC_ID); if (crm_str_eq(type, LRMD_OP_RSC_REG, TRUE)) { event.type = lrmd_event_register; } else if (crm_str_eq(type, LRMD_OP_RSC_UNREG, TRUE)) { event.type = lrmd_event_unregister; } else if (crm_str_eq(type, LRMD_OP_RSC_EXEC, TRUE)) { crm_element_value_int(msg, F_LRMD_TIMEOUT, &event.timeout); crm_element_value_int(msg, F_LRMD_RSC_INTERVAL, &event.interval); crm_element_value_int(msg, F_LRMD_RSC_START_DELAY, &event.start_delay); crm_element_value_int(msg, F_LRMD_EXEC_RC, (int *)&event.rc); crm_element_value_int(msg, F_LRMD_OP_STATUS, &event.op_status); crm_element_value_int(msg, F_LRMD_RSC_DELETED, &event.rsc_deleted); crm_element_value_int(msg, F_LRMD_RSC_RUN_TIME, (int *)&event.t_run); crm_element_value_int(msg, F_LRMD_RSC_RCCHANGE_TIME, (int *)&event.t_rcchange); crm_element_value_int(msg, F_LRMD_RSC_EXEC_TIME, (int *)&event.exec_time); crm_element_value_int(msg, F_LRMD_RSC_QUEUE_TIME, (int *)&event.queue_time); event.op_type = crm_element_value(msg, F_LRMD_RSC_ACTION); event.user_data = crm_element_value(msg, F_LRMD_RSC_USERDATA_STR); event.output = crm_element_value(msg, F_LRMD_RSC_OUTPUT); event.type = lrmd_event_exec_complete; event.params = xml2list(msg); } else if (crm_str_eq(type, LRMD_OP_POKE, TRUE)) { event.type = lrmd_event_poke; } else { return 1; } crm_trace("op %s notify event received", type); native->callback(&event); if (event.params) { g_hash_table_destroy(event.params); } return 1; } static int lrmd_ipc_dispatch(const char *buffer, ssize_t length, gpointer userdata) { lrmd_t *lrmd = userdata; lrmd_private_t *native = lrmd->private; xmlNode *msg; int rc; if (!native->callback) { /* no callback set */ return 1; } msg = string2xml(buffer); rc = lrmd_dispatch_internal(lrmd, msg); free_xml(msg); return rc; } #ifdef HAVE_GNUTLS_GNUTLS_H static void lrmd_free_xml(gpointer userdata) { free_xml((xmlNode *) userdata); } static int lrmd_tls_connected(lrmd_t *lrmd) { lrmd_private_t *native = lrmd->private; if (native->remote->tls_session) { return TRUE; } return FALSE; } static int lrmd_tls_dispatch(gpointer userdata) { lrmd_t *lrmd = userdata; lrmd_private_t *native = lrmd->private; xmlNode *xml = NULL; int rc = 0; int disconnected = 0; if (lrmd_tls_connected(lrmd) == FALSE) { crm_trace("tls dispatch triggered after disconnect"); return 0; } crm_trace("tls_dispatch triggered"); /* First check if there are any pending notifies to process that came * while we were waiting for replies earlier. */ if (native->pending_notify) { GList *iter = NULL; crm_trace("Processing pending notifies"); for (iter = native->pending_notify; iter; iter = iter->next) { lrmd_dispatch_internal(lrmd, iter->data); } g_list_free_full(native->pending_notify, lrmd_free_xml); native->pending_notify = NULL; } /* Next read the current buffer and see if there are any messages to handle. */ rc = crm_remote_ready(native->remote, 0); if (rc == 0) { /* nothing to read, see if any full messages are already in buffer. */ xml = crm_remote_parse_buffer(native->remote); } else if (rc < 0) { disconnected = 1; } else { crm_remote_recv(native->remote, -1, &disconnected); xml = crm_remote_parse_buffer(native->remote); } while (xml) { lrmd_dispatch_internal(lrmd, xml); free_xml(xml); xml = crm_remote_parse_buffer(native->remote); } if (disconnected) { crm_info("Server disconnected while reading remote server msg."); lrmd_tls_disconnect(lrmd); return 0; } return 1; } #endif /* Not used with mainloop */ int lrmd_poll(lrmd_t *lrmd, int timeout) { lrmd_private_t *native = lrmd->private; switch (native->type) { case CRM_CLIENT_IPC: return crm_ipc_ready(native->ipc); #ifdef HAVE_GNUTLS_GNUTLS_H case CRM_CLIENT_TLS: if (native->pending_notify) { return 1; } else if (native->remote->buffer && strstr(native->remote->buffer, REMOTE_MSG_TERMINATOR)) { return 1; } return crm_remote_ready(native->remote, 0); #endif default: crm_err("Unsupported connection type: %d", native->type); } return 0; } /* Not used with mainloop */ bool lrmd_dispatch(lrmd_t * lrmd) { lrmd_private_t *private = NULL; CRM_ASSERT(lrmd != NULL); private = lrmd->private; switch (private->type) { case CRM_CLIENT_IPC: while (crm_ipc_ready(private->ipc)) { if (crm_ipc_read(private->ipc) > 0) { const char *msg = crm_ipc_buffer(private->ipc); lrmd_ipc_dispatch(msg, strlen(msg), lrmd); } } break; #ifdef HAVE_GNUTLS_GNUTLS_H case CRM_CLIENT_TLS: lrmd_tls_dispatch(lrmd); break; #endif default: crm_err("Unsupported connection type: %d", private->type); } if (lrmd_api_is_connected(lrmd) == FALSE) { crm_err("Connection closed"); return FALSE; } return TRUE; } static xmlNode * lrmd_create_op(const char *token, const char *op, xmlNode * data, enum lrmd_call_options options) { xmlNode *op_msg = create_xml_node(NULL, "lrmd_command"); CRM_CHECK(op_msg != NULL, return NULL); CRM_CHECK(token != NULL, return NULL); crm_xml_add(op_msg, F_XML_TAGNAME, "lrmd_command"); crm_xml_add(op_msg, F_TYPE, T_LRMD); crm_xml_add(op_msg, F_LRMD_CALLBACK_TOKEN, token); crm_xml_add(op_msg, F_LRMD_OPERATION, op); crm_trace("Sending call options: %.8lx, %d", (long)options, options); crm_xml_add_int(op_msg, F_LRMD_CALLOPTS, options); if (data != NULL) { add_message_xml(op_msg, F_LRMD_CALLDATA, data); } return op_msg; } static void lrmd_ipc_connection_destroy(gpointer userdata) { lrmd_t *lrmd = userdata; lrmd_private_t *native = lrmd->private; crm_info("IPC connection destroyed"); /* Prevent these from being cleaned up in lrmd_api_disconnect() */ native->ipc = NULL; native->source = NULL; if (native->callback) { lrmd_event_data_t event = { 0, }; event.type = lrmd_event_disconnect; event.remote_nodename = native->remote_nodename; native->callback(&event); } } #ifdef HAVE_GNUTLS_GNUTLS_H static void lrmd_tls_connection_destroy(gpointer userdata) { lrmd_t *lrmd = userdata; lrmd_private_t *native = lrmd->private; crm_info("TLS connection destroyed"); if (native->remote->tls_session) { gnutls_bye(*native->remote->tls_session, GNUTLS_SHUT_RDWR); gnutls_deinit(*native->remote->tls_session); gnutls_free(native->remote->tls_session); } if (native->psk_cred_c) { gnutls_psk_free_client_credentials(native->psk_cred_c); } if (native->sock) { close(native->sock); } if (native->process_notify) { mainloop_destroy_trigger(native->process_notify); native->process_notify = NULL; } if (native->pending_notify) { g_list_free_full(native->pending_notify, lrmd_free_xml); native->pending_notify = NULL; } free(native->remote->buffer); native->remote->buffer = NULL; native->source = 0; native->sock = 0; native->psk_cred_c = NULL; native->remote->tls_session = NULL; native->sock = 0; if (native->callback) { lrmd_event_data_t event = { 0, }; event.remote_nodename = native->remote_nodename; event.type = lrmd_event_disconnect; native->callback(&event); } return; } int lrmd_tls_send_msg(crm_remote_t *session, xmlNode *msg, uint32_t id, const char *msg_type) { int rc = -1; crm_xml_add_int(msg, F_LRMD_REMOTE_MSG_ID, id); crm_xml_add(msg, F_LRMD_REMOTE_MSG_TYPE, msg_type); rc = crm_remote_send(session, msg); if (rc < 0) { crm_err("Failed to send remote lrmd tls msg, rc = %d" , rc); return rc; } return rc; } static xmlNode * lrmd_tls_recv_reply(lrmd_t *lrmd, int total_timeout, int expected_reply_id, int *disconnected) { lrmd_private_t *native = lrmd->private; xmlNode *xml = NULL; time_t start = time(NULL); const char *msg_type = NULL; int reply_id = 0; int remaining_timeout = 0; /* A timeout of 0 here makes no sense. We have to wait a period of time * for the response to come back. If -1 or 0, default to 10 seconds. */ if (total_timeout <= 0) { total_timeout = 10000; } while (!xml) { xml = crm_remote_parse_buffer(native->remote); if (!xml) { /* read some more off the tls buffer if we still have time left. */ if (remaining_timeout) { remaining_timeout = remaining_timeout - ((time(NULL) - start) * 1000); } else { remaining_timeout = total_timeout; } if (remaining_timeout <= 0) { return NULL; } crm_remote_recv(native->remote, remaining_timeout, disconnected); xml = crm_remote_parse_buffer(native->remote); if (!xml || *disconnected) { return NULL; } } CRM_ASSERT(xml != NULL); crm_element_value_int(xml, F_LRMD_REMOTE_MSG_ID, &reply_id); msg_type = crm_element_value(xml, F_LRMD_REMOTE_MSG_TYPE); if (!msg_type) { crm_err("Empty msg type received while waiting for reply"); free_xml(xml); xml = NULL; } else if (safe_str_eq(msg_type, "notify")) { /* got a notify while waiting for reply, trigger the notify to be processed later */ crm_info("queueing notify"); native->pending_notify = g_list_append(native->pending_notify, xml); if (native->process_notify) { crm_info("notify trigger set."); mainloop_set_trigger(native->process_notify); } xml = NULL; } else if (safe_str_neq(msg_type, "reply")) { /* msg isn't a reply, make some noise */ crm_err("Expected a reply, got %s", msg_type); free_xml(xml); xml = NULL; } else if (reply_id != expected_reply_id) { crm_err("Got outdated reply, expected id %d got id %d", expected_reply_id, reply_id); free_xml(xml); xml = NULL; } } if (native->remote->buffer && native->process_notify) { mainloop_set_trigger(native->process_notify); } return xml; } static int lrmd_tls_send(lrmd_t *lrmd, xmlNode *msg) { int rc = 0; lrmd_private_t *native = lrmd->private; global_remote_msg_id++; if (global_remote_msg_id <= 0) { global_remote_msg_id = 1; } rc = lrmd_tls_send_msg(native->remote, msg, global_remote_msg_id, "request"); if (rc <= 0) { crm_err("Remote lrmd send failed, disconnecting"); lrmd_tls_disconnect(lrmd); return -ENOTCONN; } return pcmk_ok; } static int lrmd_tls_send_recv(lrmd_t *lrmd, xmlNode *msg, int timeout, xmlNode **reply) { int rc = 0; int disconnected = 0; xmlNode *xml = NULL; if (lrmd_tls_connected(lrmd) == FALSE) { return -1; } rc = lrmd_tls_send(lrmd, msg); if (rc < 0) { return rc; } xml = lrmd_tls_recv_reply(lrmd, timeout, global_remote_msg_id, &disconnected); if (disconnected) { crm_err("Remote lrmd server disconnected while waiting for reply with id %d. ", global_remote_msg_id); lrmd_tls_disconnect(lrmd); rc = -ENOTCONN; } else if (!xml) { crm_err("Remote lrmd never received reply for request id %d. timeout: %dms ", global_remote_msg_id, timeout); rc = -ECOMM; } if (reply) { *reply = xml; } else { free_xml(xml); } return rc; } #endif static int lrmd_send_xml(lrmd_t *lrmd, xmlNode *msg, int timeout, xmlNode **reply) { int rc = -1; lrmd_private_t *native = lrmd->private; switch (native->type) { case CRM_CLIENT_IPC: rc = crm_ipc_send(native->ipc, msg, crm_ipc_client_response, timeout, reply); break; #ifdef HAVE_GNUTLS_GNUTLS_H case CRM_CLIENT_TLS: rc = lrmd_tls_send_recv(lrmd, msg, timeout, reply); break; #endif default: crm_err("Unsupported connection type: %d", native->type); } return rc; } static int lrmd_send_xml_no_reply(lrmd_t *lrmd, xmlNode *msg) { int rc = -1; lrmd_private_t *native = lrmd->private; switch (native->type) { case CRM_CLIENT_IPC: rc = crm_ipc_send(native->ipc, msg, crm_ipc_client_none, 0, NULL); break; #ifdef HAVE_GNUTLS_GNUTLS_H case CRM_CLIENT_TLS: rc = lrmd_tls_send(lrmd, msg); break; #endif default: crm_err("Unsupported connection type: %d", native->type); } return rc; } static int lrmd_api_is_connected(lrmd_t *lrmd) { lrmd_private_t *native = lrmd->private; switch (native->type) { case CRM_CLIENT_IPC: return crm_ipc_connected(native->ipc); break; #ifdef HAVE_GNUTLS_GNUTLS_H case CRM_CLIENT_TLS: return lrmd_tls_connected(lrmd); break; #endif default: crm_err("Unsupported connection type: %d", native->type); } return 0; } static int lrmd_send_command(lrmd_t * lrmd, const char *op, xmlNode * data, xmlNode ** output_data, int timeout, /* ms. defaults to 1000 if set to 0 */ enum lrmd_call_options options, gboolean expect_reply) /* TODO we need to reduce usage of this boolean */ { int rc = pcmk_ok; int reply_id = -1; lrmd_private_t *native = lrmd->private; xmlNode *op_msg = NULL; xmlNode *op_reply = NULL; if (!lrmd_api_is_connected(lrmd)) { return -ENOTCONN; } if (op == NULL) { crm_err("No operation specified"); return -EINVAL; } CRM_CHECK(native->token != NULL,;); crm_trace("sending %s op to lrmd", op); op_msg = lrmd_create_op(native->token, op, data, options); if (op_msg == NULL) { return -EINVAL; } crm_xml_add_int(op_msg, F_LRMD_TIMEOUT, timeout); if (expect_reply) { rc = lrmd_send_xml(lrmd, op_msg, timeout, &op_reply); } else { rc = lrmd_send_xml_no_reply(lrmd, op_msg); goto done; } if (rc < 0) { crm_perror(LOG_ERR, "Couldn't perform %s operation (timeout=%d): %d", op, timeout, rc); rc = -ECOMM; goto done; } rc = pcmk_ok; crm_element_value_int(op_reply, F_LRMD_CALLID, &reply_id); crm_trace("%s op reply received", op); if (crm_element_value_int(op_reply, F_LRMD_RC, &rc) != 0) { rc = -ENOMSG; goto done; } crm_log_xml_trace(op_reply, "Reply"); if (output_data) { *output_data = op_reply; op_reply = NULL; /* Prevent subsequent free */ } done: if (lrmd_api_is_connected(lrmd) == FALSE) { crm_err("LRMD disconnected"); } free_xml(op_msg); free_xml(op_reply); return rc; } static int lrmd_api_poke_connection(lrmd_t *lrmd) { int rc; xmlNode *data = create_xml_node(NULL, F_LRMD_RSC); crm_xml_add(data, F_LRMD_ORIGIN, __FUNCTION__); rc = lrmd_send_command(lrmd, LRMD_OP_POKE, data, NULL, 0, 0, FALSE); free_xml(data); return rc; } static int lrmd_handshake(lrmd_t *lrmd, const char *name) { int rc = pcmk_ok; lrmd_private_t *native = lrmd->private; xmlNode *reply = NULL; xmlNode *hello = create_xml_node(NULL, "lrmd_command"); crm_xml_add(hello, F_TYPE, T_LRMD); crm_xml_add(hello, F_LRMD_OPERATION, CRM_OP_REGISTER); crm_xml_add(hello, F_LRMD_CLIENTNAME, name); rc = lrmd_send_xml(lrmd, hello, -1, &reply); if (rc < 0) { crm_perror(LOG_DEBUG, "Couldn't complete registration with the lrmd API: %d", rc); rc = -ECOMM; } else if (reply == NULL) { crm_err("Did not receive registration reply"); rc = -EPROTO; } else { const char *msg_type = crm_element_value(reply, F_LRMD_OPERATION); const char *tmp_ticket = crm_element_value(reply, F_LRMD_CLIENTID); if (safe_str_neq(msg_type, CRM_OP_REGISTER)) { crm_err("Invalid registration message: %s", msg_type); crm_log_xml_err(reply, "Bad reply"); rc = -EPROTO; } else if (tmp_ticket == NULL) { crm_err("No registration token provided"); crm_log_xml_err(reply, "Bad reply"); rc = -EPROTO; } else { crm_trace("Obtained registration token: %s", tmp_ticket); native->token = strdup(tmp_ticket); rc = pcmk_ok; } } free_xml(reply); free_xml(hello); if (rc != pcmk_ok) { lrmd_api_disconnect(lrmd); } return rc; } static int lrmd_ipc_connect(lrmd_t * lrmd, int *fd) { int rc = pcmk_ok; lrmd_private_t *native = lrmd->private; static struct ipc_client_callbacks lrmd_callbacks = { .dispatch = lrmd_ipc_dispatch, .destroy = lrmd_ipc_connection_destroy }; crm_info("Connecting to lrmd"); if (fd) { /* No mainloop */ native->ipc = crm_ipc_new("lrmd", 0); if (native->ipc && crm_ipc_connect(native->ipc)) { *fd = crm_ipc_get_fd(native->ipc); } else if (native->ipc) { rc = -ENOTCONN; } } else { native->source = mainloop_add_ipc_client("lrmd", G_PRIORITY_HIGH, 0, lrmd, &lrmd_callbacks); native->ipc = mainloop_get_ipc_client(native->source); } if (native->ipc == NULL) { crm_debug("Could not connect to the LRMD API"); rc = -ENOTCONN; } return rc; } #ifdef HAVE_GNUTLS_GNUTLS_H int lrmd_tls_set_key(gnutls_datum_t *key, const char *location) { FILE *stream; int read_len = 256; int cur_len = 0; int buf_len = read_len; static char *key_cache = NULL; static size_t key_cache_len = 0; static time_t key_cache_updated; if (key_cache) { time_t now = time(NULL); if ((now - key_cache_updated) < 60) { key->data = gnutls_malloc(key_cache_len + 1); key->size = key_cache_len; memcpy(key->data, key_cache, key_cache_len); crm_debug("using cached LRMD key"); return 0; } else { key_cache_len = 0; key_cache_updated = 0; free(key_cache); key_cache = NULL; crm_debug("clearing lrmd key cache"); } } stream = fopen(location, "r"); if (!stream) { return -1; } key->data = gnutls_malloc(read_len); while (!feof(stream)) { char next; if (cur_len == buf_len) { buf_len = cur_len + read_len; key->data = gnutls_realloc(key->data, buf_len); } next = fgetc(stream); if (next == EOF && feof(stream)) { break; } key->data[cur_len] = next; cur_len++; } fclose(stream); key->size = cur_len; if (!cur_len) { gnutls_free(key->data); key->data = 0; return -1; } if (!key_cache) { key_cache = calloc(1, key->size+1); memcpy(key_cache, key->data, key->size); key_cache_len = key->size; key_cache_updated = time(NULL); } return 0; } static int lrmd_tls_key_cb(gnutls_session_t session, char **username, gnutls_datum_t *key) { int rc = 0; if (lrmd_tls_set_key(key, DEFAULT_REMOTE_KEY_LOCATION)) { rc = lrmd_tls_set_key(key, ALT_REMOTE_KEY_LOCATION); } if (rc) { crm_err("No lrmd remote key found"); return -1; } *username = gnutls_malloc(strlen(DEFAULT_REMOTE_USERNAME) + 1); strcpy(*username, DEFAULT_REMOTE_USERNAME); return rc; } static void lrmd_gnutls_global_init(void) { static int gnutls_init = 0; if (!gnutls_init) { gnutls_global_init(); } gnutls_init = 1; } #endif static void report_async_connection_result(lrmd_t *lrmd, int rc) { lrmd_private_t *native = lrmd->private; if (native->callback) { lrmd_event_data_t event = { 0, }; event.type = lrmd_event_connect; event.remote_nodename = native->remote_nodename; event.connection_rc = rc; native->callback(&event); } } #ifdef HAVE_GNUTLS_GNUTLS_H static void lrmd_tcp_connect_cb(void *userdata, int sock) { lrmd_t *lrmd = userdata; lrmd_private_t *native = lrmd->private; char name[256] = { 0, }; static struct mainloop_fd_callbacks lrmd_tls_callbacks = { .dispatch = lrmd_tls_dispatch, .destroy = lrmd_tls_connection_destroy, }; int rc = sock; if (rc < 0) { lrmd_tls_connection_destroy(lrmd); crm_info("remote lrmd connect to %s at port %d failed", native->server, native->port); report_async_connection_result(lrmd, rc); return; } /* TODO continue with tls stuff now that tcp connect passed. make this async as well soon * to avoid all blocking code in the client. */ native->sock = sock; gnutls_psk_allocate_client_credentials(&native->psk_cred_c); gnutls_psk_set_client_credentials_function(native->psk_cred_c, lrmd_tls_key_cb); native->remote->tls_session = create_psk_tls_session(sock, GNUTLS_CLIENT, native->psk_cred_c); if (crm_initiate_client_tls_handshake(native->remote, LRMD_CLIENT_HANDSHAKE_TIMEOUT) != 0) { crm_warn("Client tls handshake failed for server %s:%d. Disconnecting", native->server, native->port); gnutls_deinit(*native->remote->tls_session); gnutls_free(native->remote->tls_session); native->remote->tls_session = NULL; lrmd_tls_connection_destroy(lrmd); report_async_connection_result(lrmd, -1); return; } crm_info("Remote lrmd client TLS connection established with server %s:%d", native->server, native->port); snprintf(name, 128, "remote-lrmd-%s:%d", native->server, native->port); native->process_notify = mainloop_add_trigger(G_PRIORITY_HIGH, lrmd_tls_dispatch, lrmd); native->source = mainloop_add_fd(name, G_PRIORITY_HIGH, native->sock, lrmd, &lrmd_tls_callbacks); rc = lrmd_handshake(lrmd, name); report_async_connection_result(lrmd, rc); return; } static int lrmd_tls_connect_async(lrmd_t *lrmd, int timeout /*ms*/) { int rc = 0; lrmd_private_t *native = lrmd->private; lrmd_gnutls_global_init(); rc = crm_remote_tcp_connect_async(native->server, native->port, timeout, lrmd, lrmd_tcp_connect_cb); return rc; } static int lrmd_tls_connect(lrmd_t *lrmd, int *fd) { static struct mainloop_fd_callbacks lrmd_tls_callbacks = { .dispatch = lrmd_tls_dispatch, .destroy = lrmd_tls_connection_destroy, }; lrmd_private_t *native = lrmd->private; int sock; lrmd_gnutls_global_init(); sock = crm_remote_tcp_connect(native->server, native->port); if (sock <= 0) { crm_warn("Could not establish remote lrmd connection to %s", native->server); lrmd_tls_connection_destroy(lrmd); return -ENOTCONN; } native->sock = sock; gnutls_psk_allocate_client_credentials(&native->psk_cred_c); gnutls_psk_set_client_credentials_function(native->psk_cred_c, lrmd_tls_key_cb); native->remote->tls_session = create_psk_tls_session(sock, GNUTLS_CLIENT, native->psk_cred_c); if (crm_initiate_client_tls_handshake(native->remote, LRMD_CLIENT_HANDSHAKE_TIMEOUT) != 0) { crm_err("Session creation for %s:%d failed", native->server, native->port); gnutls_deinit(*native->remote->tls_session); gnutls_free(native->remote->tls_session); native->remote->tls_session = NULL; lrmd_tls_connection_destroy(lrmd); return -1; } crm_info("Remote lrmd client TLS connection established with server %s:%d", native->server, native->port); if (fd) { *fd = sock; } else { char name[256] = { 0, }; snprintf(name, 128, "remote-lrmd-%s:%d", native->server, native->port); native->process_notify = mainloop_add_trigger(G_PRIORITY_HIGH, lrmd_tls_dispatch, lrmd); native->source = mainloop_add_fd(name, G_PRIORITY_HIGH, native->sock, lrmd, &lrmd_tls_callbacks); } return pcmk_ok; } #endif static int lrmd_api_connect(lrmd_t * lrmd, const char *name, int *fd) { int rc = -ENOTCONN; lrmd_private_t *native = lrmd->private; switch (native->type) { case CRM_CLIENT_IPC: rc = lrmd_ipc_connect(lrmd, fd); break; #ifdef HAVE_GNUTLS_GNUTLS_H case CRM_CLIENT_TLS: rc = lrmd_tls_connect(lrmd, fd); break; #endif default: crm_err("Unsupported connection type: %d", native->type); } if (rc == pcmk_ok) { rc = lrmd_handshake(lrmd, name); } return rc; } static int lrmd_api_connect_async(lrmd_t * lrmd, const char *name, int timeout) { int rc = 0; lrmd_private_t *native = lrmd->private; if (!native->callback) { crm_err("Async connect not possible, no lrmd client callback set."); return -1; } switch (native->type) { case CRM_CLIENT_IPC: /* fake async connection with ipc. it should be fast * enough that we gain very little from async */ rc = lrmd_api_connect(lrmd, name, NULL); if (!rc) { report_async_connection_result(lrmd, rc); } break; #ifdef HAVE_GNUTLS_GNUTLS_H case CRM_CLIENT_TLS: rc = lrmd_tls_connect_async(lrmd, timeout); if (rc) { /* connection failed, report rc now */ report_async_connection_result(lrmd, rc); } break; #endif default: crm_err("Unsupported connection type: %d", native->type); } return rc; } static void lrmd_ipc_disconnect(lrmd_t *lrmd) { lrmd_private_t *native = lrmd->private; if (native->source != NULL) { /* Attached to mainloop */ mainloop_del_ipc_client(native->source); native->source = NULL; native->ipc = NULL; } else if(native->ipc) { /* Not attached to mainloop */ crm_ipc_t *ipc = native->ipc; native->ipc = NULL; crm_ipc_close(ipc); crm_ipc_destroy(ipc); } } #ifdef HAVE_GNUTLS_GNUTLS_H static void lrmd_tls_disconnect(lrmd_t *lrmd) { lrmd_private_t *native = lrmd->private; if (native->remote->tls_session) { gnutls_bye(*native->remote->tls_session, GNUTLS_SHUT_RDWR); gnutls_deinit(*native->remote->tls_session); gnutls_free(native->remote->tls_session); native->remote->tls_session = 0; } if (native->source != NULL) { /* Attached to mainloop */ mainloop_del_ipc_client(native->source); native->source = NULL; } else if(native->sock) { close(native->sock); } if (native->pending_notify) { g_list_free_full(native->pending_notify, lrmd_free_xml); native->pending_notify = NULL; } } #endif static int lrmd_api_disconnect(lrmd_t *lrmd) { lrmd_private_t *native = lrmd->private; crm_info("Disconnecting from lrmd service"); switch (native->type) { case CRM_CLIENT_IPC: lrmd_ipc_disconnect(lrmd); break; #ifdef HAVE_GNUTLS_GNUTLS_H case CRM_CLIENT_TLS: lrmd_tls_disconnect(lrmd); break; #endif default: crm_err("Unsupported connection type: %d", native->type); } free(native->token); native->token = NULL; return 0; } static int lrmd_api_register_rsc(lrmd_t * lrmd, const char *rsc_id, const char *class, const char *provider, const char *type, enum lrmd_call_options options) { int rc = pcmk_ok; xmlNode *data = NULL; if (!class || !type || !rsc_id) { return -EINVAL; } if (safe_str_eq(class, "ocf") && !provider) { return -EINVAL; } data = create_xml_node(NULL, F_LRMD_RSC); crm_xml_add(data, F_LRMD_ORIGIN, __FUNCTION__); crm_xml_add(data, F_LRMD_RSC_ID, rsc_id); crm_xml_add(data, F_LRMD_CLASS, class); crm_xml_add(data, F_LRMD_PROVIDER, provider); crm_xml_add(data, F_LRMD_TYPE, type); rc = lrmd_send_command(lrmd, LRMD_OP_RSC_REG, data, NULL, 0, options, TRUE); free_xml(data); return rc; } static int lrmd_api_unregister_rsc(lrmd_t * lrmd, const char *rsc_id, enum lrmd_call_options options) { int rc = pcmk_ok; xmlNode *data = create_xml_node(NULL, F_LRMD_RSC); crm_xml_add(data, F_LRMD_ORIGIN, __FUNCTION__); crm_xml_add(data, F_LRMD_RSC_ID, rsc_id); rc = lrmd_send_command(lrmd, LRMD_OP_RSC_UNREG, data, NULL, 0, options, TRUE); free_xml(data); return rc; } lrmd_rsc_info_t * lrmd_copy_rsc_info(lrmd_rsc_info_t * rsc_info) { lrmd_rsc_info_t *copy = NULL; copy = calloc(1, sizeof(lrmd_rsc_info_t)); copy->id = strdup(rsc_info->id); copy->type = strdup(rsc_info->type); copy->class = strdup(rsc_info->class); if (rsc_info->provider) { copy->provider = strdup(rsc_info->provider); } return copy; } void lrmd_free_rsc_info(lrmd_rsc_info_t * rsc_info) { if (!rsc_info) { return; } free(rsc_info->id); free(rsc_info->type); free(rsc_info->class); free(rsc_info->provider); free(rsc_info); } static lrmd_rsc_info_t * lrmd_api_get_rsc_info(lrmd_t * lrmd, const char *rsc_id, enum lrmd_call_options options) { lrmd_rsc_info_t *rsc_info = NULL; xmlNode *data = create_xml_node(NULL, F_LRMD_RSC); xmlNode *output = NULL; const char *class = NULL; const char *provider = NULL; const char *type = NULL; crm_xml_add(data, F_LRMD_ORIGIN, __FUNCTION__); crm_xml_add(data, F_LRMD_RSC_ID, rsc_id); lrmd_send_command(lrmd, LRMD_OP_RSC_INFO, data, &output, 0, options, TRUE); free_xml(data); if (!output) { return NULL; } class = crm_element_value(output, F_LRMD_CLASS); provider = crm_element_value(output, F_LRMD_PROVIDER); type = crm_element_value(output, F_LRMD_TYPE); if (!class || !type) { free_xml(output); return NULL; } else if (safe_str_eq(class, "ocf") && !provider) { free_xml(output); return NULL; } rsc_info = calloc(1, sizeof(lrmd_rsc_info_t)); rsc_info->id = strdup(rsc_id); rsc_info->class = strdup(class); if (provider) { rsc_info->provider = strdup(provider); } rsc_info->type = strdup(type); free_xml(output); return rsc_info; } static void lrmd_api_set_callback(lrmd_t * lrmd, lrmd_event_callback callback) { lrmd_private_t *native = lrmd->private; native->callback = callback; } static int stonith_get_metadata(const char *provider, const char *type, char **output) { int rc = pcmk_ok; stonith_api->cmds->metadata(stonith_api, st_opt_sync_call, type, provider, output, 0); if (*output == NULL) { rc = -EIO; } return rc; } static int lsb_get_metadata(const char *type, char **output) { #define lsb_metadata_template \ "\n"\ "\n"\ "\n"\ " 1.0\n"\ " \n"\ " %s"\ " \n"\ " %s\n"\ " \n"\ " \n"\ " \n"\ " \n"\ " \n"\ " \n"\ " \n"\ " \n"\ " \n"\ " \n"\ " \n"\ " \n"\ " %s\n"\ " %s\n"\ " %s\n"\ " %s\n"\ " %s\n"\ " %s\n"\ " %s\n"\ " \n"\ "\n" #define LSB_INITSCRIPT_INFOBEGIN_TAG "### BEGIN INIT INFO" #define LSB_INITSCRIPT_INFOEND_TAG "### END INIT INFO" #define PROVIDES "# Provides:" #define REQ_START "# Required-Start:" #define REQ_STOP "# Required-Stop:" #define SHLD_START "# Should-Start:" #define SHLD_STOP "# Should-Stop:" #define DFLT_START "# Default-Start:" #define DFLT_STOP "# Default-Stop:" #define SHORT_DSCR "# Short-Description:" #define DESCRIPTION "# Description:" #define lsb_meta_helper_free_value(m) \ if ((m) != NULL) { \ xmlFree(m); \ (m) = NULL; \ } #define lsb_meta_helper_get_value(buffer, ptr, keyword) \ if (!ptr && !strncasecmp(buffer, keyword, strlen(keyword))) { \ (ptr) = (char *)xmlEncodeEntitiesReentrant(NULL, BAD_CAST buffer+strlen(keyword)); \ continue; \ } char ra_pathname[PATH_MAX] = { 0, }; FILE *fp; GString *meta_data = NULL; char buffer[1024]; char *provides = NULL; char *req_start = NULL; char *req_stop = NULL; char *shld_start = NULL; char *shld_stop = NULL; char *dflt_start = NULL; char *dflt_stop = NULL; char *s_dscrpt = NULL; char *xml_l_dscrpt = NULL; GString *l_dscrpt = NULL; snprintf(ra_pathname, sizeof(ra_pathname), "%s%s%s", type[0] == '/' ? "" : LSB_ROOT_DIR, type[0] == '/' ? "" : "/", type); if (!(fp = fopen(ra_pathname, "r"))) { return -EIO; } /* Enter into the lsb-compliant comment block */ while (fgets(buffer, sizeof(buffer), fp)) { /* Now suppose each of the following eight arguments contain only one line */ lsb_meta_helper_get_value(buffer, provides, PROVIDES) lsb_meta_helper_get_value(buffer, req_start, REQ_START) lsb_meta_helper_get_value(buffer, req_stop, REQ_STOP) lsb_meta_helper_get_value(buffer, shld_start, SHLD_START) lsb_meta_helper_get_value(buffer, shld_stop, SHLD_STOP) lsb_meta_helper_get_value(buffer, dflt_start, DFLT_START) lsb_meta_helper_get_value(buffer, dflt_stop, DFLT_STOP) lsb_meta_helper_get_value(buffer, s_dscrpt, SHORT_DSCR) /* Long description may cross multiple lines */ if ((l_dscrpt == NULL) && (0 == strncasecmp(buffer, DESCRIPTION, strlen(DESCRIPTION)))) { l_dscrpt = g_string_new(buffer + strlen(DESCRIPTION)); /* Between # and keyword, more than one space, or a tab character, * indicates the continuation line. Extracted from LSB init script standard */ while (fgets(buffer, sizeof(buffer), fp)) { if (!strncmp(buffer, "# ", 3) || !strncmp(buffer, "#\t", 2)) { buffer[0] = ' '; l_dscrpt = g_string_append(l_dscrpt, buffer); } else { fputs(buffer, fp); break; /* Long description ends */ } } continue; } if (l_dscrpt) { xml_l_dscrpt = (char *)xmlEncodeEntitiesReentrant(NULL, BAD_CAST(l_dscrpt->str)); } if (!strncasecmp(buffer, LSB_INITSCRIPT_INFOEND_TAG, strlen(LSB_INITSCRIPT_INFOEND_TAG))) { /* Get to the out border of LSB comment block */ break; } if (buffer[0] != '#') { break; /* Out of comment block in the beginning */ } } fclose(fp); meta_data = g_string_new(""); g_string_sprintf(meta_data, lsb_metadata_template, type, (xml_l_dscrpt == NULL) ? type : xml_l_dscrpt, (s_dscrpt == NULL) ? type : s_dscrpt, (provides == NULL) ? "" : provides, (req_start == NULL) ? "" : req_start, (req_stop == NULL) ? "" : req_stop, (shld_start == NULL) ? "" : shld_start, (shld_stop == NULL) ? "" : shld_stop, (dflt_start == NULL) ? "" : dflt_start, (dflt_stop == NULL) ? "" : dflt_stop); lsb_meta_helper_free_value(xml_l_dscrpt); lsb_meta_helper_free_value(s_dscrpt); lsb_meta_helper_free_value(provides); lsb_meta_helper_free_value(req_start); lsb_meta_helper_free_value(req_stop); lsb_meta_helper_free_value(shld_start); lsb_meta_helper_free_value(shld_stop); lsb_meta_helper_free_value(dflt_start); lsb_meta_helper_free_value(dflt_stop); if (l_dscrpt) { g_string_free(l_dscrpt, TRUE); } *output = strdup(meta_data->str); g_string_free(meta_data, TRUE); return pcmk_ok; } #if SUPPORT_NAGIOS static int nagios_get_metadata(const char *type, char **output) { int rc = pcmk_ok; FILE *file_strm = NULL; int start = 0, length = 0, read_len = 0; char *metadata_file = NULL; int len = 36; len += strlen(NAGIOS_METADATA_DIR); len += strlen(type); metadata_file = calloc(1, len); CRM_CHECK(metadata_file != NULL, return -ENOMEM); sprintf(metadata_file, "%s/%s.xml", NAGIOS_METADATA_DIR, type); file_strm = fopen(metadata_file, "r"); if (file_strm == NULL) { crm_err("Metadata file %s does not exist", metadata_file); free(metadata_file); return -EIO; } /* see how big the file is */ start = ftell(file_strm); fseek(file_strm, 0L, SEEK_END); length = ftell(file_strm); fseek(file_strm, 0L, start); CRM_ASSERT(length >= 0); CRM_ASSERT(start == ftell(file_strm)); if (length <= 0) { crm_info("%s was not valid", metadata_file); free(*output); *output = NULL; rc = -EIO; } else { crm_trace("Reading %d bytes from file", length); *output = calloc(1, (length + 1)); read_len = fread(*output, 1, length, file_strm); if (read_len != length) { crm_err("Calculated and read bytes differ: %d vs. %d", length, read_len); free(*output); *output = NULL; rc = -EIO; } } fclose(file_strm); free(metadata_file); return rc; } #endif static int generic_get_metadata(const char *standard, const char *provider, const char *type, char **output) { svc_action_t *action = resources_action_create(type, standard, provider, type, "meta-data", 0, 5000, NULL); if (!(services_action_sync(action))) { crm_err("Failed to retrieve meta-data for %s:%s:%s", standard, provider, type); services_action_free(action); return -EIO; } if (!action->stdout_data) { crm_err("Failed to retrieve meta-data for %s:%s:%s", standard, provider, type); services_action_free(action); return -EIO; } *output = strdup(action->stdout_data); services_action_free(action); return pcmk_ok; } static int lrmd_api_get_metadata(lrmd_t * lrmd, const char *class, const char *provider, const char *type, char **output, enum lrmd_call_options options) { if (!class || !type) { return -EINVAL; } if (safe_str_eq(class, "stonith")) { return stonith_get_metadata(provider, type, output); } else if (safe_str_eq(class, "lsb")) { return lsb_get_metadata(type, output); #if SUPPORT_NAGIOS } else if (safe_str_eq(class, "nagios")) { return nagios_get_metadata(type, output); #endif } return generic_get_metadata(class, provider, type, output); } static int lrmd_api_exec(lrmd_t * lrmd, const char *rsc_id, const char *action, const char *userdata, int interval, /* ms */ int timeout, /* ms */ int start_delay, /* ms */ enum lrmd_call_options options, lrmd_key_value_t * params) { int rc = pcmk_ok; xmlNode *data = create_xml_node(NULL, F_LRMD_RSC); xmlNode *args = create_xml_node(data, XML_TAG_ATTRS); lrmd_key_value_t *tmp = NULL; crm_xml_add(data, F_LRMD_ORIGIN, __FUNCTION__); crm_xml_add(data, F_LRMD_RSC_ID, rsc_id); crm_xml_add(data, F_LRMD_RSC_ACTION, action); crm_xml_add(data, F_LRMD_RSC_USERDATA_STR, userdata); crm_xml_add_int(data, F_LRMD_RSC_INTERVAL, interval); crm_xml_add_int(data, F_LRMD_TIMEOUT, timeout); crm_xml_add_int(data, F_LRMD_RSC_START_DELAY, start_delay); for (tmp = params; tmp; tmp = tmp->next) { hash2field((gpointer) tmp->key, (gpointer) tmp->value, args); } rc = lrmd_send_command(lrmd, LRMD_OP_RSC_EXEC, data, NULL, timeout, options, TRUE); free_xml(data); lrmd_key_value_freeall(params); return rc; } static int lrmd_api_cancel(lrmd_t * lrmd, const char *rsc_id, const char *action, int interval) { int rc = pcmk_ok; xmlNode *data = create_xml_node(NULL, F_LRMD_RSC); crm_xml_add(data, F_LRMD_ORIGIN, __FUNCTION__); crm_xml_add(data, F_LRMD_RSC_ACTION, action); crm_xml_add(data, F_LRMD_RSC_ID, rsc_id); crm_xml_add_int(data, F_LRMD_RSC_INTERVAL, interval); rc = lrmd_send_command(lrmd, LRMD_OP_RSC_CANCEL, data, NULL, 0, 0, TRUE); free_xml(data); return rc; } static int list_stonith_agents(lrmd_list_t ** resources) { int rc = 0; stonith_key_value_t *stonith_resources = NULL; stonith_key_value_t *dIter = NULL; stonith_api->cmds->list_agents(stonith_api, st_opt_sync_call, NULL, &stonith_resources, 0); for (dIter = stonith_resources; dIter; dIter = dIter->next) { rc++; if(resources) { *resources = lrmd_list_add(*resources, dIter->value); } } stonith_key_value_freeall(stonith_resources, 1, 0); return rc; } static int lrmd_api_list_agents(lrmd_t * lrmd, lrmd_list_t ** resources, const char *class, const char *provider) { int rc = 0; if (safe_str_eq(class, "stonith")) { rc += list_stonith_agents(resources); } else { GListPtr gIter = NULL; GList *agents = resources_list_agents(class, provider); for (gIter = agents; gIter != NULL; gIter = gIter->next) { *resources = lrmd_list_add(*resources, (const char *)gIter->data); rc++; } g_list_free_full(agents, free); if (!class) { rc += list_stonith_agents(resources); } } if(rc == 0) { crm_notice("No agents found for class %s", class); rc = -EPROTONOSUPPORT; } return rc; } static int does_provider_have_agent(const char *agent, const char *provider, const char *class) { int found = 0; GList *agents = NULL; GListPtr gIter2 = NULL; agents = resources_list_agents(class, provider); for (gIter2 = agents; gIter2 != NULL; gIter2 = gIter2->next) { if (safe_str_eq(agent, gIter2->data)) { found = 1; } } g_list_free_full(agents, free); return found; } static int lrmd_api_list_ocf_providers(lrmd_t * lrmd, const char *agent, lrmd_list_t ** providers) { int rc = pcmk_ok; char *provider = NULL; GList *ocf_providers = NULL; GListPtr gIter = NULL; ocf_providers = resources_list_providers("ocf"); for (gIter = ocf_providers; gIter != NULL; gIter = gIter->next) { provider = gIter->data; if (!agent || does_provider_have_agent(agent, provider, "ocf")) { *providers = lrmd_list_add(*providers, (const char *)gIter->data); rc++; } } g_list_free_full(ocf_providers, free); return rc; } static int lrmd_api_list_standards(lrmd_t * lrmd, lrmd_list_t ** supported) { int rc = 0; GList *standards = NULL; GListPtr gIter = NULL; standards = resources_list_standards(); for (gIter = standards; gIter != NULL; gIter = gIter->next) { *supported = lrmd_list_add(*supported, (const char *)gIter->data); rc++; } if(list_stonith_agents(NULL) > 0) { *supported = lrmd_list_add(*supported, "stonith"); rc++; } g_list_free_full(standards, free); return rc; } lrmd_t * lrmd_api_new(void) { lrmd_t *new_lrmd = NULL; lrmd_private_t *pvt = NULL; new_lrmd = calloc(1, sizeof(lrmd_t)); pvt = calloc(1, sizeof(lrmd_private_t)); pvt->remote = calloc(1, sizeof(crm_remote_t)); new_lrmd->cmds = calloc(1, sizeof(lrmd_api_operations_t)); pvt->type = CRM_CLIENT_IPC; new_lrmd->private = pvt; new_lrmd->cmds->connect = lrmd_api_connect; new_lrmd->cmds->connect_async = lrmd_api_connect_async; new_lrmd->cmds->is_connected = lrmd_api_is_connected; new_lrmd->cmds->poke_connection = lrmd_api_poke_connection; new_lrmd->cmds->disconnect = lrmd_api_disconnect; new_lrmd->cmds->register_rsc = lrmd_api_register_rsc; new_lrmd->cmds->unregister_rsc = lrmd_api_unregister_rsc; new_lrmd->cmds->get_rsc_info = lrmd_api_get_rsc_info; new_lrmd->cmds->set_callback = lrmd_api_set_callback; new_lrmd->cmds->get_metadata = lrmd_api_get_metadata; new_lrmd->cmds->exec = lrmd_api_exec; new_lrmd->cmds->cancel = lrmd_api_cancel; new_lrmd->cmds->list_agents = lrmd_api_list_agents; new_lrmd->cmds->list_ocf_providers = lrmd_api_list_ocf_providers; new_lrmd->cmds->list_standards = lrmd_api_list_standards; if (!stonith_api) { stonith_api = stonith_api_new(); } return new_lrmd; } lrmd_t * lrmd_remote_api_new(const char *nodename, const char *server, int port) { #ifdef HAVE_GNUTLS_GNUTLS_H lrmd_t *new_lrmd = lrmd_api_new(); lrmd_private_t *native = new_lrmd->private; if (!nodename && !server) { return NULL; } native->type = CRM_CLIENT_TLS; native->remote_nodename = nodename ? strdup(nodename) : strdup(server); native->server = server ? strdup(server) : strdup(nodename); native->port = port ? port : DEFAULT_REMOTE_PORT; return new_lrmd; #else crm_err("GNUTLS is not enabled for this build, remote LRMD client can not be created"); return NULL; #endif } void lrmd_api_delete(lrmd_t * lrmd) { if (!lrmd) { return; } lrmd->cmds->disconnect(lrmd); /* no-op if already disconnected */ free(lrmd->cmds); if (lrmd->private) { lrmd_private_t *native = lrmd->private; #ifdef HAVE_GNUTLS_GNUTLS_H free(native->server); #endif free(native->remote_nodename); free(native->remote); } free(lrmd->private); free(lrmd); } diff --git a/lrmd/lrmd_private.h b/lrmd/lrmd_private.h index 2642387ae4..1f94d47fa4 100644 --- a/lrmd/lrmd_private.h +++ b/lrmd/lrmd_private.h @@ -1,94 +1,94 @@ /* * Copyright (c) 2012 David Vossel * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. - * + * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. - * + * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA * */ #ifndef LRMD_PVT__H # define LRMD_PVT__H # include -# include +# include # include # include #ifdef HAVE_GNUTLS_GNUTLS_H # undef KEYFILE # include #endif GHashTable *rsc_list; typedef struct lrmd_rsc_s { char *rsc_id; char *class; char *provider; char *type; int call_opts; /* NEVER dereference this pointer, * It simply exists as a switch to let us know * when the currently active operation has completed */ void *active; /* Operations in this list * have not been executed yet. */ GList *pending_ops; /* Operations in this list are recurring operations * that have been handed off from the pending ops list. */ GList *recurring_ops; int stonith_started; crm_trigger_t *work; } lrmd_rsc_t; #ifdef HAVE_GNUTLS_GNUTLS_H /* in remote_tls.c */ int lrmd_init_remote_tls_server(int port); void lrmd_tls_server_destroy(void); /* Hidden in lrmd client lib */ extern int lrmd_tls_send_msg(crm_remote_t *session, xmlNode *msg, uint32_t id, const char *msg_type); extern int lrmd_tls_set_key(gnutls_datum_t *key, const char *location); #endif int lrmd_server_send_reply(crm_client_t *client, uint32_t id, xmlNode *reply); int lrmd_server_send_notify(crm_client_t *client, xmlNode *msg); void process_lrmd_message(crm_client_t * client, uint32_t id, xmlNode * request); void free_rsc(gpointer data); void lrmd_shutdown(int nsig); void client_disconnect_cleanup(const char *client_id); /*! - * \brief Don't worry about freeing this connection. It is + * \brief Don't worry about freeing this connection. It is * taken care of after mainloop exits by the main() function. */ stonith_t *get_stonith_connection(void); /*! * \brief This is a callback that tells the lrmd * the current stonith connection has gone away. This allows * us to timeout any pending stonith commands */ void stonith_connection_failed(void); #endif diff --git a/mcp/pacemaker.c b/mcp/pacemaker.c index 1a1465a3e1..975321a54e 100644 --- a/mcp/pacemaker.c +++ b/mcp/pacemaker.c @@ -1,1005 +1,1005 @@ -/* +/* * Copyright (C) 2010 Andrew Beekhof - * + * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. - * + * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. - * + * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include -#include +#include #include #include #include #include gboolean fatal_error = FALSE; GMainLoop *mainloop = NULL; GHashTable *peers = NULL; #define PCMK_PROCESS_CHECK_INTERVAL 5 char *local_name = NULL; uint32_t local_nodeid = 0; crm_trigger_t *shutdown_trigger = NULL; const char *pid_file = "/var/run/pacemaker.pid"; /* *INDENT-OFF* */ enum crm_proc_flag { crm_proc_none = 0x00000001, crm_proc_plugin = 0x00000002, crm_proc_lrmd = 0x00000010, crm_proc_cib = 0x00000100, crm_proc_crmd = 0x00000200, crm_proc_attrd = 0x00001000, crm_proc_stonithd = 0x00002000, crm_proc_pe = 0x00010000, crm_proc_te = 0x00020000, crm_proc_mgmtd = 0x00040000, crm_proc_stonith_ng = 0x00100000, }; /* *INDENT-ON* */ typedef struct pcmk_child_s { int pid; long flag; int start_seq; int respawn_count; gboolean respawn; const char *name; const char *uid; const char *command; gboolean active_before_startup; } pcmk_child_t; /* Index into the array below */ #define pcmk_child_crmd 4 #define pcmk_child_mgmtd 8 /* *INDENT-OFF* */ static pcmk_child_t pcmk_children[] = { { 0, crm_proc_none, 0, 0, FALSE, "none", NULL, NULL }, { 0, crm_proc_plugin, 0, 0, FALSE, "ais", NULL, NULL }, { 0, crm_proc_lrmd, 3, 0, TRUE, "lrmd", NULL, CRM_DAEMON_DIR"/lrmd" }, { 0, crm_proc_cib, 1, 0, TRUE, "cib", CRM_DAEMON_USER, CRM_DAEMON_DIR"/cib" }, { 0, crm_proc_crmd, 6, 0, TRUE, "crmd", CRM_DAEMON_USER, CRM_DAEMON_DIR"/crmd" }, { 0, crm_proc_attrd, 4, 0, TRUE, "attrd", CRM_DAEMON_USER, CRM_DAEMON_DIR"/attrd" }, { 0, crm_proc_stonithd, 0, 0, TRUE, "stonithd", NULL, NULL }, { 0, crm_proc_pe, 5, 0, TRUE, "pengine", CRM_DAEMON_USER, CRM_DAEMON_DIR"/pengine" }, { 0, crm_proc_mgmtd, 0, 0, TRUE, "mgmtd", NULL, HB_DAEMON_DIR"/mgmtd" }, { 0, crm_proc_stonith_ng, 2, 0, TRUE, "stonith-ng", NULL, CRM_DAEMON_DIR"/stonithd" }, }; /* *INDENT-ON* */ static gboolean start_child(pcmk_child_t * child); static gboolean check_active_before_startup_processes(gpointer user_data); void enable_crmd_as_root(gboolean enable) { if (enable) { pcmk_children[pcmk_child_crmd].uid = NULL; } else { pcmk_children[pcmk_child_crmd].uid = CRM_DAEMON_USER; } } void enable_mgmtd(gboolean enable) { if (enable) { pcmk_children[pcmk_child_mgmtd].start_seq = 7; } else { pcmk_children[pcmk_child_mgmtd].start_seq = 0; } } static uint32_t get_process_list(void) { int lpc = 0; uint32_t procs = crm_proc_plugin; for (lpc = 0; lpc < SIZEOF(pcmk_children); lpc++) { if (pcmk_children[lpc].pid != 0) { procs |= pcmk_children[lpc].flag; } } return procs; } static void pcmk_process_exit(pcmk_child_t *child) { child->pid = 0; child->active_before_startup = FALSE; /* Broadcast the fact that one of our processes died ASAP - * + * * Try to get some logging of the cause out first though * because we're probably about to get fenced * * Potentially do this only if respawn_count > N * to allow for local recovery */ update_node_processes(local_nodeid, NULL, get_process_list()); child->respawn_count += 1; if (child->respawn_count > MAX_RESPAWN) { crm_err("Child respawn count exceeded by %s", child->name); child->respawn = FALSE; } if (shutdown_trigger) { mainloop_set_trigger(shutdown_trigger); update_node_processes(local_nodeid, NULL, get_process_list()); } else if (child->respawn) { crm_notice("Respawning failed child process: %s", child->name); start_child(child); } } -static void pcmk_child_exit(GPid pid, gint status, gpointer user_data) +static void pcmk_child_exit(GPid pid, gint status, gpointer user_data) { int exitcode = 0; pcmk_child_t *child = user_data; if(WIFSIGNALED(status)) { int signo = WTERMSIG(status); int core = WCOREDUMP(status); crm_notice("Child process %s terminated with signal %d (pid=%d, core=%d)", child->name, signo, child->pid, core); } else if(WIFEXITED(status)) { exitcode = WEXITSTATUS(status); do_crm_log(exitcode == 0 ? LOG_INFO : LOG_ERR, "Child process %s exited (pid=%d, rc=%d)", child->name, child->pid, exitcode); } if (exitcode == 100) { crm_warn("Pacemaker child process %s no longer wishes to be respawned. " "Shutting ourselves down.", child->name); child->respawn = FALSE; fatal_error = TRUE; pcmk_shutdown(15); } pcmk_process_exit(child); } static gboolean stop_child(pcmk_child_t * child, int signal) { if (signal == 0) { signal = SIGTERM; } if (child->command == NULL) { crm_debug("Nothing to do for child \"%s\"", child->name); return TRUE; } if (child->pid <= 0) { crm_trace("Client %s not running", child->name); return TRUE; } errno = 0; if (kill(child->pid, signal) == 0) { crm_notice("Stopping %s: Sent -%d to process %d", child->name, signal, child->pid); } else { crm_perror(LOG_ERR, "Stopping %s: Could not send -%d to process %d failed", child->name, signal, child->pid); } return TRUE; } static char *opts_default[] = { NULL, NULL }; static char *opts_vgrind[] = { NULL, NULL, NULL, NULL, NULL }; static gboolean start_child(pcmk_child_t * child) { int lpc = 0; uid_t uid = 0; struct rlimit oflimits; gboolean use_valgrind = FALSE; gboolean use_callgrind = FALSE; const char *devnull = "/dev/null"; const char *env_valgrind = getenv("PCMK_valgrind_enabled"); const char *env_callgrind = getenv("PCMK_callgrind_enabled"); child->active_before_startup = FALSE; if (child->command == NULL) { crm_info("Nothing to do for child \"%s\"", child->name); return TRUE; } if (env_callgrind != NULL && crm_is_true(env_callgrind)) { use_callgrind = TRUE; use_valgrind = TRUE; } else if (env_callgrind != NULL && strstr(env_callgrind, child->name)) { use_callgrind = TRUE; use_valgrind = TRUE; } else if (env_valgrind != NULL && crm_is_true(env_valgrind)) { use_valgrind = TRUE; } else if (env_valgrind != NULL && strstr(env_valgrind, child->name)) { use_valgrind = TRUE; } if (use_valgrind && strlen(VALGRIND_BIN) == 0) { crm_warn("Cannot enable valgrind for %s:" " The location of the valgrind binary is unknown", child->name); use_valgrind = FALSE; } child->pid = fork(); CRM_ASSERT(child->pid != -1); if (child->pid > 0) { /* parent */ g_child_watch_add(child->pid, pcmk_child_exit, child); crm_info("Forked child %d for process %s%s", child->pid, child->name, use_valgrind ? " (valgrind enabled: " VALGRIND_BIN ")" : ""); update_node_processes(local_nodeid, NULL, get_process_list()); return TRUE; } else { /* Start a new session */ (void)setsid(); /* Setup the two alternate arg arrarys */ opts_vgrind[0] = strdup(VALGRIND_BIN); if (use_callgrind) { opts_vgrind[1] = strdup("--tool=callgrind"); opts_vgrind[2] = strdup("--callgrind-out-file=" CRM_STATE_DIR "/callgrind.out.%p"); opts_vgrind[3] = strdup(child->command); opts_vgrind[4] = NULL; } else { opts_vgrind[1] = strdup(child->command); opts_vgrind[2] = NULL; opts_vgrind[3] = NULL; opts_vgrind[4] = NULL; } opts_default[0] = strdup(child->command);; #if 0 /* Dont set the group for now - it prevents connection to the cluster */ if (gid && setgid(gid) < 0) { crm_perror("Could not set group to %d", gid); } #endif if (child->uid) { if (crm_user_lookup(child->uid, &uid, NULL) < 0) { crm_err("Invalid uid (%s) specified for %s", child->uid, child->name); return TRUE; } } if (uid && setuid(uid) < 0) { crm_perror(LOG_ERR, "Could not set user to %d (%s)", uid, child->uid); } /* Close all open file descriptors */ getrlimit(RLIMIT_NOFILE, &oflimits); for (lpc = 0; lpc < oflimits.rlim_cur; lpc++) { close(lpc); } (void)open(devnull, O_RDONLY); /* Stdin: fd 0 */ (void)open(devnull, O_WRONLY); /* Stdout: fd 1 */ (void)open(devnull, O_WRONLY); /* Stderr: fd 2 */ if (use_valgrind) { (void)execvp(VALGRIND_BIN, opts_vgrind); } else { (void)execvp(child->command, opts_default); } crm_perror(LOG_ERR, "FATAL: Cannot exec %s", child->command); crm_exit(100); } return TRUE; /* never reached */ } static gboolean escalate_shutdown(gpointer data) { pcmk_child_t *child = data; if (child->pid) { /* Use SIGSEGV instead of SIGKILL to create a core so we can see what it was up to */ crm_err("Child %s not terminating in a timely manner, forcing", child->name); stop_child(child, SIGSEGV); } return FALSE; } static gboolean pcmk_shutdown_worker(gpointer user_data) { static int phase = 0; static time_t next_log = 0; static int max = SIZEOF(pcmk_children); int lpc = 0; if (phase == 0) { crm_notice("Shuting down Pacemaker"); phase = max; /* Add a second, more frequent, check to speed up shutdown */ g_timeout_add_seconds(5, check_active_before_startup_processes, NULL); } for (; phase > 0; phase--) { /* dont stop anything with start_seq < 1 */ for (lpc = max - 1; lpc >= 0; lpc--) { pcmk_child_t *child = &(pcmk_children[lpc]); if (phase != child->start_seq) { continue; } if (child->pid) { time_t now = time(NULL); if (child->respawn) { next_log = now + 30; child->respawn = FALSE; stop_child(child, SIGTERM); if (phase < pcmk_children[pcmk_child_crmd].start_seq) { g_timeout_add(180000 /* 3m */ , escalate_shutdown, child); } } else if (now >= next_log) { next_log = now + 30; crm_notice("Still waiting for %s (pid=%d, seq=%d) to terminate...", child->name, child->pid, child->start_seq); } return TRUE; } /* cleanup */ crm_debug("%s confirmed stopped", child->name); child->pid = 0; } } /* send_cluster_id(); */ crm_notice("Shutdown complete"); g_main_loop_quit(mainloop); if(fatal_error) { crm_notice("Attempting to inhibit respawning after fatal error"); crm_exit(100); } - + return TRUE; } void pcmk_shutdown(int nsig) { if (shutdown_trigger == NULL) { shutdown_trigger = mainloop_add_trigger(G_PRIORITY_HIGH, pcmk_shutdown_worker, NULL); } mainloop_set_trigger(shutdown_trigger); } static void build_path(const char *path_c, mode_t mode) { int offset = 1, len = 0; char *path = strdup(path_c); CRM_CHECK(path != NULL, return); for (len = strlen(path); offset < len; offset++) { if (path[offset] == '/') { path[offset] = 0; if (mkdir(path, mode) < 0 && errno != EEXIST) { crm_perror(LOG_ERR, "Could not create directory '%s'", path); break; } path[offset] = '/'; } } if (mkdir(path, mode) < 0 && errno != EEXIST) { crm_perror(LOG_ERR, "Could not create directory '%s'", path); } free(path); } static int32_t pcmk_ipc_accept(qb_ipcs_connection_t *c, uid_t uid, gid_t gid) { crm_trace("Connection %p", c); if(crm_client_new(c, uid, gid) == NULL) { return -EIO; } return 0; } static void pcmk_ipc_created(qb_ipcs_connection_t *c) { crm_trace("Connection %p", c); } /* Exit code means? */ static int32_t pcmk_ipc_dispatch(qb_ipcs_connection_t *qbc, void *data, size_t size) { uint32_t id = 0; uint32_t flags = 0; const char *task = NULL; crm_client_t *c = crm_client_get(qbc); xmlNode *msg = crm_ipcs_recv(c, data, size, &id, &flags); if(flags & crm_ipc_client_response) { crm_ipcs_send_ack(c, id, "ack", __FUNCTION__, __LINE__); } if (msg == NULL) { return 0; } task = crm_element_value(msg, F_CRM_TASK); if(crm_str_eq(task, CRM_OP_QUIT, TRUE)) { /* Time to quit */ crm_notice("Shutting down in responce to ticket %s (%s)", crm_element_value(msg, F_CRM_REFERENCE), crm_element_value(msg, F_CRM_ORIGIN)); pcmk_shutdown(15); } else { /* Just send to everyone */ update_process_clients(); } - + free_xml(msg); return 0; } /* Error code means? */ static int32_t -pcmk_ipc_closed(qb_ipcs_connection_t *c) +pcmk_ipc_closed(qb_ipcs_connection_t *c) { crm_client_t *client = crm_client_get(c); crm_trace("Connection %p", c); crm_client_destroy(client); return 0; } static void -pcmk_ipc_destroy(qb_ipcs_connection_t *c) +pcmk_ipc_destroy(qb_ipcs_connection_t *c) { crm_trace("Connection %p", c); } -struct qb_ipcs_service_handlers ipc_callbacks = +struct qb_ipcs_service_handlers ipc_callbacks = { .connection_accept = pcmk_ipc_accept, .connection_created = pcmk_ipc_created, .msg_process = pcmk_ipc_dispatch, .connection_closed = pcmk_ipc_closed, .connection_destroyed = pcmk_ipc_destroy }; static void ghash_send_proc_details(gpointer key, gpointer value, gpointer data) { crm_ipcs_send(value, 0, data, TRUE); } static void peer_loop_fn(gpointer key, gpointer value, gpointer user_data) { pcmk_peer_t *node = value; xmlNode *update = user_data; xmlNode *xml = create_xml_node(update, "node"); crm_xml_add_int(xml, "id", node->id); crm_xml_add(xml, "uname", node->uname); crm_xml_add_int(xml, "processes", node->processes); } void update_process_clients(void) { xmlNode *update = create_xml_node(NULL, "nodes"); crm_trace("Sending process list to %d children", crm_hash_table_size(client_connections)); g_hash_table_foreach(peers, peer_loop_fn, update); g_hash_table_foreach(client_connections, ghash_send_proc_details, update); free_xml(update); } void update_process_peers(void) { char buffer[1024]; struct iovec iov; int rc = 0; memset(buffer, 0, SIZEOF(buffer)); if (local_name) { rc = snprintf(buffer, SIZEOF(buffer) - 1, "", local_name, get_process_list()); } else { rc = snprintf(buffer, SIZEOF(buffer) - 1, "", get_process_list()); } iov.iov_base = buffer; iov.iov_len = rc + 1; crm_trace("Sending %s", buffer); send_cpg_message(&iov); } gboolean update_node_processes(uint32_t id, const char *uname, uint32_t procs) { gboolean changed = FALSE; pcmk_peer_t *node = g_hash_table_lookup(peers, GUINT_TO_POINTER(id)); if (node == NULL) { changed = TRUE; node = calloc(1, sizeof(pcmk_peer_t)); node->id = id; g_hash_table_insert(peers, GUINT_TO_POINTER(id), node); node = g_hash_table_lookup(peers, GUINT_TO_POINTER(id)); CRM_ASSERT(node != NULL); } if (uname != NULL) { if (node->uname == NULL || safe_str_eq(node->uname, uname) == FALSE) { int lpc, len = strlen(uname); crm_notice("%p Node %u now known as %s%s%s", node, id, uname, node->uname?node->uname:", was: ", node->uname?node->uname:""); free(node->uname); node->uname = strdup(uname); changed = TRUE; for(lpc = 0; lpc < len; lpc++) { if(uname[lpc] >= 'A' && uname[lpc] <= 'Z') { crm_warn("Node names with capitals are discouraged, consider changing '%s' to something else", uname); break; } } } } else { crm_trace("Empty uname for node %u", id); } if (procs != 0) { if(procs != node->processes) { crm_debug("Node %s now has process list: %.32x (was %.32x)", node->uname, procs, node->processes); node->processes = procs; changed = TRUE; } else { crm_trace("Node %s still has process list: %.32x", node->uname, procs); } } if (changed && id == local_nodeid) { update_process_clients(); update_process_peers(); } return changed; } /* *INDENT-OFF* */ static struct crm_option long_options[] = { /* Top-level Options */ {"help", 0, 0, '?', "\tThis text"}, {"version", 0, 0, '$', "\tVersion information" }, {"verbose", 0, 0, 'V', "\tIncrease debug output"}, {"shutdown", 0, 0, 'S', "\tInstruct Pacemaker to shutdown on this machine"}, {"features", 0, 0, 'F', "\tDisplay the full version and list of features Pacemaker was built with"}, {"-spacer-", 1, 0, '-', "\nAdditional Options:"}, {"foreground", 0, 0, 'f', "\tRun in the foreground instead of as a daemon"}, {"pid-file", 1, 0, 'p', "\t(Advanced) Daemon pid file location"}, {NULL, 0, 0, 0} }; /* *INDENT-ON* */ static void mcp_chown(const char *path, uid_t uid, gid_t gid) { int rc = chown(path, uid, gid); if(rc < 0) { crm_warn("Cannot change the ownership of %s to user %s and gid %d: %s", path, CRM_DAEMON_USER, gid, pcmk_strerror(errno)); } } static gboolean check_active_before_startup_processes(gpointer user_data) { int start_seq = 1, lpc = 0; static int max = SIZEOF(pcmk_children); gboolean keep_tracking = FALSE; for (start_seq = 1; start_seq < max; start_seq++) { for (lpc = 0; lpc < max; lpc++) { if (pcmk_children[lpc].active_before_startup == FALSE) { /* we are already tracking it as a child process. */ continue; } else if (start_seq != pcmk_children[lpc].start_seq) { continue; } else if (crm_pid_active(pcmk_children[lpc].pid) != 1) { crm_notice("Process %s terminated (pid=%d)", pcmk_children[lpc].name, pcmk_children[lpc].pid); pcmk_process_exit(&(pcmk_children[lpc])); continue; } /* at least one of the processes found at startup * is still going, so keep this recurring timer around */ keep_tracking = TRUE; } } return keep_tracking; } static void find_and_track_existing_processes(void) { DIR *dp; struct dirent *entry; struct stat statbuf; int start_tracker = 0; dp = opendir("/proc"); if (!dp) { /* no proc directory to search through */ crm_notice("Can not read /proc directory to track existing components"); return; } while ((entry = readdir(dp)) != NULL) { char procpath[128]; char value[64]; char key[16]; FILE *file; int pid; int max = SIZEOF(pcmk_children); int i; strcpy(procpath, "/proc/"); /* strlen("/proc/") + strlen("/status") + 1 = 14 * 128 - 14 = 114 */ strncat(procpath, entry->d_name, 114); if (lstat(procpath, &statbuf)) { continue; } if (!S_ISDIR(statbuf.st_mode) || !isdigit(entry->d_name[0])) { continue; } strcat(procpath, "/status"); file = fopen(procpath, "r"); if (!file) { continue; } if (fscanf(file, "%15s%63s", key, value) != 2) { fclose(file); continue; } fclose(file); pid = atoi(entry->d_name); if (pid <= 0) { continue; } for (i = 0; i < max; i++) { const char *name = pcmk_children[i].name; if (pcmk_children[i].start_seq == 0) { continue; } if (pcmk_children[i].flag == crm_proc_stonith_ng) { name = "stonithd"; } if (safe_str_eq(name, value)) { if (crm_pid_active(pid) != 1) { continue; } crm_notice("Tracking existing %s process (pid=%d)", value, pid); pcmk_children[i].pid = pid; pcmk_children[i].active_before_startup = TRUE; start_tracker = 1; } } } if (start_tracker) { g_timeout_add_seconds(PCMK_PROCESS_CHECK_INTERVAL, check_active_before_startup_processes, NULL); } closedir(dp); } static void init_children_processes(void) { int start_seq = 1, lpc = 0; static int max = SIZEOF(pcmk_children); /* start any children that have not been detected */ for (start_seq = 1; start_seq < max; start_seq++) { /* dont start anything with start_seq < 1 */ for (lpc = 0; lpc < max; lpc++) { if (pcmk_children[lpc].pid) { /* we are already tracking it */ continue; } if (start_seq == pcmk_children[lpc].start_seq) { start_child(&(pcmk_children[lpc])); } } } } int main(int argc, char **argv) { int rc; int flag; int argerr = 0; int option_index = 0; gboolean shutdown = FALSE; uid_t pcmk_uid = 0; gid_t pcmk_gid = 0; struct rlimit cores; crm_ipc_t *old_instance = NULL; qb_ipcs_service_t *ipcs = NULL; const char *facility = daemon_option("logfacility"); setenv("LC_ALL", "C", 1); setenv("HA_LOGFACILITY", facility, 1); setenv("HA_LOGD", "no", 1); set_daemon_option("mcp", "true"); set_daemon_option("use_logd", "off"); crm_log_init(NULL, LOG_INFO, TRUE, FALSE, argc, argv, FALSE); crm_set_options(NULL, "mode [options]", long_options, "Start/Stop Pacemaker\n"); /* Restore the original facility so that read_config() does the right thing */ set_daemon_option("logfacility", facility); while (1) { flag = crm_get_option(argc, argv, &option_index); if (flag == -1) break; switch (flag) { case 'V': crm_bump_log_level(argc, argv); break; case 'f': /* Legacy */ break; case 'p': pid_file = optarg; break; case '$': case '?': crm_help(flag, EX_OK); break; case 'S': shutdown = TRUE; break; case 'F': printf("Pacemaker %s (Build: %s)\n Supporting: %s\n", VERSION, BUILD_VERSION, CRM_FEATURES); crm_exit(0); default: printf("Argument code 0%o (%c) is not (?yet?) supported\n", flag, flag); ++argerr; break; } } if (optind < argc) { printf("non-option ARGV-elements: "); while (optind < argc) printf("%s ", argv[optind++]); printf("\n"); } if (argerr) { crm_help('?', EX_USAGE); } crm_debug("Checking for old instances of %s", CRM_SYSTEM_MCP); old_instance = crm_ipc_new(CRM_SYSTEM_MCP, 0); crm_ipc_connect(old_instance); - + if(shutdown) { crm_debug("Terminating previous instance"); while (crm_ipc_connected(old_instance)) { xmlNode *cmd = create_request(CRM_OP_QUIT, NULL, NULL, CRM_SYSTEM_MCP, CRM_SYSTEM_MCP, NULL); crm_debug("."); crm_ipc_send(old_instance, cmd, 0, 0, NULL); free_xml(cmd); sleep(2); } crm_ipc_close(old_instance); crm_ipc_destroy(old_instance); crm_exit(0); } else if(crm_ipc_connected(old_instance)) { crm_ipc_close(old_instance); crm_ipc_destroy(old_instance); crm_err("Pacemaker is already active, aborting startup"); crm_exit(100); } crm_ipc_close(old_instance); crm_ipc_destroy(old_instance); if (read_config() == FALSE) { crm_notice("Could not obtain corosync config data, exiting"); crm_exit(1); } crm_notice("Starting Pacemaker %s (Build: %s): %s", VERSION, BUILD_VERSION, CRM_FEATURES); mainloop = g_main_new(FALSE); rc = getrlimit(RLIMIT_CORE, &cores); if (rc < 0) { crm_perror(LOG_ERR, "Cannot determine current maximum core size."); } else { if (cores.rlim_max == 0 && geteuid() == 0) { cores.rlim_max = RLIM_INFINITY; } else { crm_info("Maximum core file size is: %lu", (unsigned long)cores.rlim_max); } cores.rlim_cur = cores.rlim_max; rc = setrlimit(RLIMIT_CORE, &cores); if (rc < 0) { crm_perror(LOG_ERR, "Core file generation will remain disabled." " Core files are an important diagnositic tool," " please consider enabling them by default."); } #if 0 /* system() is not thread-safe, can't call from here * Actually, its a pretty hacky way to try and achieve this anyway */ if (system("echo 1 > /proc/sys/kernel/core_uses_pid") != 0) { crm_perror(LOG_ERR, "Could not enable /proc/sys/kernel/core_uses_pid"); } #endif } if (crm_user_lookup(CRM_DAEMON_USER, &pcmk_uid, &pcmk_gid) < 0) { crm_err("Cluster user %s does not exist, aborting Pacemaker startup", CRM_DAEMON_USER); crm_exit(1); } mkdir(CRM_STATE_DIR, 0750); mcp_chown(CRM_STATE_DIR, pcmk_uid, pcmk_gid); /* Used by stonithd */ build_path(HA_STATE_DIR "/heartbeat", 0755); mcp_chown(HA_STATE_DIR "/heartbeat", pcmk_uid, pcmk_gid); /* Used by RAs - Leave owned by root */ build_path(CRM_RSCTMP_DIR, 0755); /* Used to store core files in */ build_path(CRM_CORE_DIR, 0755); mcp_chown(CRM_CORE_DIR, pcmk_uid, pcmk_gid); /* Used to store blackbox dumps in */ build_path(CRM_BLACKBOX_DIR, 0755); mcp_chown(CRM_BLACKBOX_DIR, pcmk_uid, pcmk_gid); /* Used to store policy engine inputs in */ build_path(PE_STATE_DIR, 0755); mcp_chown(PE_STATE_DIR, pcmk_uid, pcmk_gid); /* Used to store the cluster configuration */ build_path(CRM_CONFIG_DIR, 0755); mcp_chown(CRM_CONFIG_DIR, pcmk_uid, pcmk_gid); peers = g_hash_table_new(g_direct_hash, g_direct_equal); ipcs = mainloop_add_ipc_server(CRM_SYSTEM_MCP, QB_IPC_NATIVE, &ipc_callbacks); if (ipcs == NULL) { crm_err("Couldn't start IPC server"); crm_exit(1); } if (cluster_connect_cfg(&local_nodeid) == FALSE) { crm_err("Couldn't connect to Corosync's CFG service"); crm_exit(1); } if (cluster_connect_cpg() == FALSE) { crm_err("Couldn't connect to Corosync's CPG service"); crm_exit(1); } local_name = get_local_node_name(); update_node_processes(local_nodeid, local_name, get_process_list()); mainloop_add_signal(SIGTERM, pcmk_shutdown); mainloop_add_signal(SIGINT, pcmk_shutdown); find_and_track_existing_processes(); init_children_processes(); crm_info("Starting mainloop"); g_main_run(mainloop); if(ipcs) { crm_trace("Closing IPC server"); mainloop_del_ipc_server(ipcs); ipcs = NULL; } g_main_destroy(mainloop); cluster_disconnect_cpg(); cluster_disconnect_cfg(); crm_info("Exiting %s", crm_system_name); crm_exit(0); } diff --git a/pengine/main.c b/pengine/main.c index 2f5f7cdc4e..2178b92119 100644 --- a/pengine/main.c +++ b/pengine/main.c @@ -1,193 +1,193 @@ -/* +/* * Copyright (C) 2004 Andrew Beekhof - * + * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. - * + * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. - * + * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include -#include +#include #include #include #include #if HAVE_LIBXML2 # include #endif #define OPTARGS "hVc" GMainLoop *mainloop = NULL; qb_ipcs_service_t *ipcs = NULL; void pengine_shutdown(int nsig); static int32_t pe_ipc_accept(qb_ipcs_connection_t *c, uid_t uid, gid_t gid) { crm_trace("Connection %p", c); if(crm_client_new(c, uid, gid) == NULL) { return -EIO; } return 0; } static void pe_ipc_created(qb_ipcs_connection_t *c) { crm_trace("Connection %p", c); } gboolean process_pe_message(xmlNode * msg, xmlNode * xml_data, crm_client_t* sender); static int32_t pe_ipc_dispatch(qb_ipcs_connection_t *qbc, void *data, size_t size) { uint32_t id = 0; uint32_t flags = 0; crm_client_t *c = crm_client_get(qbc); xmlNode *msg = crm_ipcs_recv(c, data, size, &id, &flags); if(flags & crm_ipc_client_response) { crm_ipcs_send_ack(c, id, "ack", __FUNCTION__, __LINE__); } if (msg != NULL) { xmlNode *data = get_message_xml(msg, F_CRM_DATA); - + process_pe_message(msg, data, c); free_xml(msg); } return 0; } /* Error code means? */ static int32_t -pe_ipc_closed(qb_ipcs_connection_t *c) +pe_ipc_closed(qb_ipcs_connection_t *c) { crm_client_t *client = crm_client_get(c); crm_trace("Connection %p", c); crm_client_destroy(client); return 0; } static void -pe_ipc_destroy(qb_ipcs_connection_t *c) +pe_ipc_destroy(qb_ipcs_connection_t *c) { crm_trace("Connection %p", c); } -struct qb_ipcs_service_handlers ipc_callbacks = +struct qb_ipcs_service_handlers ipc_callbacks = { .connection_accept = pe_ipc_accept, .connection_created = pe_ipc_created, .msg_process = pe_ipc_dispatch, .connection_closed = pe_ipc_closed, .connection_destroyed = pe_ipc_destroy }; /* *INDENT-OFF* */ static struct crm_option long_options[] = { /* Top-level Options */ {"help", 0, 0, '?', "\tThis text"}, {"verbose", 0, 0, 'V', "\tIncrease debug output"}, {0, 0, 0, 0} }; /* *INDENT-ON* */ int main(int argc, char **argv) { int flag; int index = 0; int argerr = 0; crm_log_init(NULL, LOG_INFO, TRUE, FALSE, argc, argv, FALSE); crm_set_options(NULL, "[options]", long_options, "Daemon for calculating the cluster's response to events"); mainloop_add_signal(SIGTERM, pengine_shutdown); while (1) { flag = crm_get_option(argc, argv, &index); if (flag == -1) break; switch (flag) { case 'V': crm_bump_log_level(argc, argv); break; case 'h': /* Help message */ crm_help('?', EX_OK); break; default: ++argerr; break; } } if (argc - optind == 1 && safe_str_eq("metadata", argv[optind])) { pe_metadata(); return 0; } if (optind > argc) { ++argerr; } if (argerr) { crm_help('?', EX_USAGE); } if (crm_is_writable(PE_STATE_DIR, NULL, CRM_DAEMON_USER, CRM_DAEMON_GROUP, FALSE) == FALSE) { crm_err("Bad permissions on " PE_STATE_DIR ". Terminating"); fprintf(stderr, "ERROR: Bad permissions on " PE_STATE_DIR ". See logs for details\n"); fflush(stderr); return 100; } crm_debug("Init server comms"); ipcs = mainloop_add_ipc_server(CRM_SYSTEM_PENGINE, QB_IPC_SHM, &ipc_callbacks); if (ipcs == NULL) { crm_err("Failed to create IPC server: shutting down and inhibiting respawn"); crm_exit(100); } /* Create the mainloop and run it... */ crm_info("Starting %s", crm_system_name); mainloop = g_main_new(FALSE); g_main_run(mainloop); crm_info("Exiting %s", crm_system_name); return crm_exit(0); } void pengine_shutdown(int nsig) { mainloop_del_ipc_server(ipcs); crm_exit(EX_OK); }