diff --git a/cib/callbacks.c b/cib/callbacks.c index 86cfd92261..aa5f6a8272 100644 --- a/cib/callbacks.c +++ b/cib/callbacks.c @@ -1,1453 +1,1458 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "common.h" extern GMainLoop *mainloop; extern gboolean cib_shutdown_flag; extern gboolean stand_alone; extern const char *cib_root; static unsigned long cib_local_bcast_num = 0; typedef struct cib_local_notify_s { xmlNode *notify_src; char *client_id; gboolean from_peer; gboolean sync_reply; } cib_local_notify_t; qb_ipcs_service_t *ipcs_ro = NULL; qb_ipcs_service_t *ipcs_rw = NULL; qb_ipcs_service_t *ipcs_shm = NULL; extern crm_cluster_t crm_cluster; extern int cib_update_counter(xmlNode * xml_obj, const char *field, gboolean reset); extern void GHFunc_count_peers(gpointer key, gpointer value, gpointer user_data); gint cib_GCompareFunc(gconstpointer a, gconstpointer b); gboolean can_write(int flags); void send_cib_replace(const xmlNode * sync_request, const char *host); void cib_process_request(xmlNode * request, gboolean privileged, gboolean force_synchronous, gboolean from_peer, cib_client_t * cib_client); extern GHashTable *client_list; extern GHashTable *local_notify_queue; int next_client_id = 0; extern const char *cib_our_uname; extern unsigned long cib_num_ops, cib_num_local, cib_num_updates, cib_num_fail; extern unsigned long cib_bad_connects, cib_num_timeouts; extern int cib_status; int cib_process_command(xmlNode * request, xmlNode ** reply, xmlNode ** cib_diff, gboolean privileged); gboolean cib_common_callback(qb_ipcs_connection_t *c, void *data, size_t size, gboolean privileged); static int32_t cib_ipc_accept(qb_ipcs_connection_t *c, uid_t uid, gid_t gid) { cib_client_t *new_client = NULL; #if ENABLE_ACL struct group *crm_grp = NULL; #endif crm_trace("Connecting %p for uid=%d gid=%d pid=%d", c, uid, gid, crm_ipcs_client_pid(c)); if (cib_shutdown_flag) { crm_info("Ignoring new client [%d] during shutdown", crm_ipcs_client_pid(c)); return -EPERM; } new_client = calloc(1, sizeof(cib_client_t)); new_client->ipc = c; CRM_CHECK(new_client->id == NULL, free(new_client->id)); new_client->id = crm_generate_uuid(); #if ENABLE_ACL crm_grp = getgrnam(CRM_DAEMON_GROUP); if (crm_grp) { qb_ipcs_connection_auth_set(c, -1, crm_grp->gr_gid, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); } new_client->user = uid2username(uid); #endif /* make sure we can find ourselves later for sync calls * redirected to the master instance */ g_hash_table_insert(client_list, new_client->id, new_client); qb_ipcs_context_set(c, new_client); return 0; } static void cib_ipc_created(qb_ipcs_connection_t *c) { cib_client_t *cib_client = qb_ipcs_context_get(c); crm_trace("%p connected for client %s", c, cib_client->id); } static int32_t cib_ipc_dispatch_rw(qb_ipcs_connection_t *c, void *data, size_t size) { cib_client_t *cib_client = qb_ipcs_context_get(c); crm_trace("%p message from %s", c, cib_client->id); return cib_common_callback(c, data, size, TRUE); } static int32_t cib_ipc_dispatch_ro(qb_ipcs_connection_t *c, void *data, size_t size) { cib_client_t *cib_client = qb_ipcs_context_get(c); crm_trace("%p message from %s", c, cib_client->id); return cib_common_callback(c, data, size, FALSE); } /* Error code means? */ static int32_t cib_ipc_closed(qb_ipcs_connection_t *c) { cib_client_t *cib_client = qb_ipcs_context_get(c); crm_trace("Connection %p closed", c); CRM_ASSERT(cib_client != NULL); CRM_ASSERT(cib_client->id != NULL); if (!g_hash_table_remove(client_list, cib_client->id)) { crm_err("Client %s not found in the hashtable", cib_client->name); } return 0; } static void cib_ipc_destroy(qb_ipcs_connection_t *c) { cib_client_t *cib_client = qb_ipcs_context_get(c); CRM_ASSERT(cib_client != NULL); CRM_ASSERT(cib_client->id != NULL); /* In case we arrive here without a call to cib_ipc_close() */ g_hash_table_remove(client_list, cib_client->id); crm_trace("Destroying %s (%p)", cib_client->name, c); free(cib_client->name); free(cib_client->callback_id); free(cib_client->id); free(cib_client->user); free(cib_client); crm_trace("Freed the cib client"); if (cib_shutdown_flag) { cib_shutdown(0); } } struct qb_ipcs_service_handlers ipc_ro_callbacks = { .connection_accept = cib_ipc_accept, .connection_created = cib_ipc_created, .msg_process = cib_ipc_dispatch_ro, .connection_closed = cib_ipc_closed, .connection_destroyed = cib_ipc_destroy }; struct qb_ipcs_service_handlers ipc_rw_callbacks = { .connection_accept = cib_ipc_accept, .connection_created = cib_ipc_created, .msg_process = cib_ipc_dispatch_rw, .connection_closed = cib_ipc_closed, .connection_destroyed = cib_ipc_destroy }; void cib_common_callback_worker(uint32_t id, uint32_t flags, xmlNode * op_request, cib_client_t * cib_client, gboolean privileged) { const char *op = crm_element_value(op_request, F_CIB_OPERATION); if (crm_str_eq(op, CRM_OP_REGISTER, TRUE)) { if(flags & crm_ipc_client_response) { xmlNode *ack = create_xml_node(NULL, __FUNCTION__); crm_xml_add(ack, F_CIB_OPERATION, CRM_OP_REGISTER); crm_xml_add(ack, F_CIB_CLIENTID, cib_client->id); crm_ipcs_send(cib_client->ipc, id, ack, FALSE); cib_client->request_id = 0; free_xml(ack); } return; } else if (crm_str_eq(op, T_CIB_NOTIFY, TRUE)) { /* Update the notify filters for this client */ int on_off = 0; const char *type = crm_element_value(op_request, F_CIB_NOTIFY_TYPE); crm_element_value_int(op_request, F_CIB_NOTIFY_ACTIVATE, &on_off); crm_debug("Setting %s callbacks for %s (%s): %s", type, cib_client->name, cib_client->id, on_off ? "on" : "off"); if (safe_str_eq(type, T_CIB_POST_NOTIFY)) { cib_client->post_notify = on_off; } else if (safe_str_eq(type, T_CIB_PRE_NOTIFY)) { cib_client->pre_notify = on_off; } else if (safe_str_eq(type, T_CIB_UPDATE_CONFIRM)) { cib_client->confirmations = on_off; } else if (safe_str_eq(type, T_CIB_DIFF_NOTIFY)) { cib_client->diffs = on_off; } else if (safe_str_eq(type, T_CIB_REPLACE_NOTIFY)) { cib_client->replace = on_off; } if(flags & crm_ipc_client_response) { /* TODO - include rc */ crm_ipcs_send_ack(cib_client->ipc, id, "ack", __FUNCTION__, __LINE__); cib_client->request_id = 0; } return; } cib_client->num_calls++; cib_process_request(op_request, FALSE, privileged, FALSE, cib_client); } int32_t cib_common_callback(qb_ipcs_connection_t *c, void *data, size_t size, gboolean privileged) { uint32_t id = 0; uint32_t flags = 0; int call_options = 0; xmlNode *op_request = crm_ipcs_recv(c, data, size, &id, &flags); cib_client_t *cib_client = qb_ipcs_context_get(c); if(op_request) { crm_element_value_int(op_request, F_CIB_CALLOPTS, &call_options); } crm_trace("Inbound: %.200s", data); if (op_request == NULL || cib_client == NULL) { crm_ipcs_send_ack(c, id, "nack", __FUNCTION__, __LINE__); return 0; } if(is_set(call_options, cib_sync_call)) { CRM_ASSERT(flags & crm_ipc_client_response); } if(flags & crm_ipc_client_response) { CRM_LOG_ASSERT(cib_client->request_id == 0); /* This means the client has two synchronous events in-flight */ cib_client->request_id = id; /* Reply only to the last one */ } if (cib_client->name == NULL) { const char *value = crm_element_value(op_request, F_CIB_CLIENTNAME); if (value == NULL) { cib_client->name = crm_itoa(crm_ipcs_client_pid(c)); } else { cib_client->name = strdup(value); } } if (cib_client->callback_id == NULL) { const char *value = crm_element_value(op_request, F_CIB_CALLBACK_TOKEN); if (value != NULL) { cib_client->callback_id = strdup(value); } else { cib_client->callback_id = strdup(cib_client->id); } } crm_xml_add(op_request, F_CIB_CLIENTID, cib_client->id); crm_xml_add(op_request, F_CIB_CLIENTNAME, cib_client->name); #if ENABLE_ACL determine_request_user(cib_client->user, op_request, F_CIB_USER); #endif crm_log_xml_trace(op_request, "Client[inbound]"); cib_common_callback_worker(id, flags, op_request, cib_client, privileged); free_xml(op_request); return 0; } static void do_local_notify(xmlNode * notify_src, const char *client_id, gboolean sync_reply, gboolean from_peer) { /* send callback to originating child */ cib_client_t *client_obj = NULL; int local_rc = pcmk_ok; if (client_id != NULL) { client_obj = g_hash_table_lookup(client_list, client_id); } else { crm_trace("No client to sent the response to. F_CIB_CLIENTID not set."); } if (client_obj == NULL) { local_rc = -ECONNRESET; } else { int rid = 0; if(sync_reply) { - CRM_LOG_ASSERT(client_obj->request_id); + if (client_obj->ipc) { + CRM_LOG_ASSERT(client_obj->request_id); - rid = client_obj->request_id; - client_obj->request_id = 0; + rid = client_obj->request_id; + client_obj->request_id = 0; - crm_trace("Sending response %d to %s %s", + crm_trace("Sending response %d to %s %s", rid, client_obj->name, from_peer?"(originator of delegated request)":""); + } else { + crm_trace("Sending response to %s %s", + client_obj->name, from_peer?"(originator of delegated request)":""); + } } else { crm_trace("Sending an event to %s %s", client_obj->name, from_peer?"(originator of delegated request)":""); } if (client_obj->ipc && crm_ipcs_send(client_obj->ipc, rid, notify_src, !sync_reply) < 0) { local_rc = -ENOMSG; #ifdef HAVE_GNUTLS_GNUTLS_H } else if (client_obj->session) { crm_send_remote_msg(client_obj->session, notify_src, client_obj->encrypted); #endif } else if(client_obj->ipc == NULL) { crm_err("Unknown transport for %s", client_obj->name); } } if (local_rc != pcmk_ok && client_obj != NULL) { crm_warn("%sSync reply to %s failed: %s", sync_reply ? "" : "A-", client_obj ? client_obj->name : "", pcmk_strerror(local_rc)); } } static void local_notify_destroy_callback(gpointer data) { cib_local_notify_t *notify = data; free_xml(notify->notify_src); free(notify->client_id); free(notify); } static void check_local_notify(int bcast_id) { cib_local_notify_t *notify = NULL; if (!local_notify_queue) { return; } notify = g_hash_table_lookup(local_notify_queue, GINT_TO_POINTER(bcast_id)); if (notify) { do_local_notify(notify->notify_src, notify->client_id, notify->sync_reply, notify->from_peer); g_hash_table_remove(local_notify_queue, GINT_TO_POINTER(bcast_id)); } } static void queue_local_notify(xmlNode * notify_src, const char *client_id, gboolean sync_reply, gboolean from_peer) { cib_local_notify_t *notify = calloc(1, sizeof(cib_local_notify_t)); notify->notify_src = notify_src; notify->client_id = strdup(client_id); notify->sync_reply = sync_reply; notify->from_peer = from_peer; if (!local_notify_queue) { local_notify_queue = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, local_notify_destroy_callback); } g_hash_table_insert(local_notify_queue, GINT_TO_POINTER(cib_local_bcast_num), notify); } static void parse_local_options(cib_client_t * cib_client, int call_type, int call_options, const char *host, const char *op, gboolean * local_notify, gboolean * needs_reply, gboolean * process, gboolean * needs_forward) { if (cib_op_modifies(call_type) && !(call_options & cib_inhibit_bcast)) { /* we need to send an update anyway */ *needs_reply = TRUE; } else { *needs_reply = FALSE; } if (host == NULL && (call_options & cib_scope_local)) { crm_trace("Processing locally scoped %s op from %s", op, cib_client->name); *local_notify = TRUE; } else if (host == NULL && cib_is_master) { crm_trace("Processing master %s op locally from %s", op, cib_client->name); *local_notify = TRUE; } else if (safe_str_eq(host, cib_our_uname)) { crm_trace("Processing locally addressed %s op from %s", op, cib_client->name); *local_notify = TRUE; } else if (stand_alone) { *needs_forward = FALSE; *local_notify = TRUE; *process = TRUE; } else { crm_trace("%s op from %s needs to be forwarded to %s", op, cib_client->name, host ? host : "the master instance"); *needs_forward = TRUE; *process = FALSE; } } static gboolean parse_peer_options(int call_type, xmlNode * request, gboolean * local_notify, gboolean * needs_reply, gboolean * process, gboolean * needs_forward) { const char *op = NULL; const char *host = NULL; const char *delegated = NULL; const char *originator = crm_element_value(request, F_ORIG); const char *reply_to = crm_element_value(request, F_CIB_ISREPLY); const char *update = crm_element_value(request, F_CIB_GLOBAL_UPDATE); gboolean is_reply = safe_str_eq(reply_to, cib_our_uname); if (crm_is_true(update)) { *needs_reply = FALSE; if (is_reply) { *local_notify = TRUE; crm_trace("Processing global/peer update from %s" " that originated from us", originator); } else { crm_trace("Processing global/peer update from %s", originator); } return TRUE; } host = crm_element_value(request, F_CIB_HOST); if (host != NULL && safe_str_eq(host, cib_our_uname)) { crm_trace("Processing request sent to us from %s", originator); return TRUE; } else if (host == NULL && cib_is_master == TRUE) { crm_trace("Processing request sent to master instance from %s", originator); return TRUE; } op = crm_element_value(request, F_CIB_OPERATION); if(safe_str_eq(op, "cib_shutdown_req")) { /* Always process these */ *local_notify = FALSE; if(reply_to == NULL || is_reply) { *process = TRUE; } if(is_reply) { *needs_reply = FALSE; } return *process; } if (is_reply) { crm_trace("Forward reply sent from %s to local clients", originator); *process = FALSE; *needs_reply = FALSE; *local_notify = TRUE; return TRUE; } delegated = crm_element_value(request, F_CIB_DELEGATED); if (delegated != NULL) { crm_trace("Ignoring msg for master instance"); } else if (host != NULL) { /* this is for a specific instance and we're not it */ crm_trace("Ignoring msg for instance on %s", crm_str(host)); } else if (reply_to == NULL && cib_is_master == FALSE) { /* this is for the master instance and we're not it */ crm_trace("Ignoring reply to %s", crm_str(reply_to)); } else if (safe_str_eq(op, "cib_shutdown_req")) { if (reply_to != NULL) { crm_debug("Processing %s from %s", op, host); *needs_reply = FALSE; } else { crm_debug("Processing %s reply from %s", op, host); } return TRUE; } else { crm_err("Nothing for us to do?"); crm_log_xml_err(request, "Peer[inbound]"); } return FALSE; } static void forward_request(xmlNode * request, cib_client_t * cib_client, int call_options) { const char *op = crm_element_value(request, F_CIB_OPERATION); const char *host = crm_element_value(request, F_CIB_HOST); crm_xml_add(request, F_CIB_DELEGATED, cib_our_uname); if (host != NULL) { crm_trace("Forwarding %s op to %s", op, host); send_cluster_message(crm_get_peer(0, host), crm_msg_cib, request, FALSE); } else { crm_trace("Forwarding %s op to master instance", op); send_cluster_message(NULL, crm_msg_cib, request, FALSE); } /* Return the request to its original state */ xml_remove_prop(request, F_CIB_DELEGATED); if (call_options & cib_discard_reply) { crm_trace("Client not interested in reply"); } } static gboolean send_peer_reply(xmlNode * msg, xmlNode * result_diff, const char *originator, gboolean broadcast) { CRM_ASSERT(msg != NULL); if (broadcast) { /* this (successful) call modified the CIB _and_ the * change needs to be broadcast... * send via HA to other nodes */ int diff_add_updates = 0; int diff_add_epoch = 0; int diff_add_admin_epoch = 0; int diff_del_updates = 0; int diff_del_epoch = 0; int diff_del_admin_epoch = 0; const char *digest = NULL; digest = crm_element_value(result_diff, XML_ATTR_DIGEST); cib_diff_version_details(result_diff, &diff_add_admin_epoch, &diff_add_epoch, &diff_add_updates, &diff_del_admin_epoch, &diff_del_epoch, &diff_del_updates); crm_trace("Sending update diff %d.%d.%d -> %d.%d.%d %s", diff_del_admin_epoch, diff_del_epoch, diff_del_updates, diff_add_admin_epoch, diff_add_epoch, diff_add_updates, digest); crm_xml_add(msg, F_CIB_ISREPLY, originator); crm_xml_add(msg, F_CIB_GLOBAL_UPDATE, XML_BOOLEAN_TRUE); crm_xml_add(msg, F_CIB_OPERATION, CIB_OP_APPLY_DIFF); CRM_ASSERT(digest != NULL); add_message_xml(msg, F_CIB_UPDATE_DIFF, result_diff); crm_log_xml_trace(msg, "copy"); return send_cluster_message(NULL, crm_msg_cib, msg, TRUE); } else if (originator != NULL) { /* send reply via HA to originating node */ crm_trace("Sending request result to originator only"); crm_xml_add(msg, F_CIB_ISREPLY, originator); return send_cluster_message(crm_get_peer(0, originator), crm_msg_cib, msg, FALSE); } return FALSE; } void cib_process_request(xmlNode * request, gboolean force_synchronous, gboolean privileged, gboolean from_peer, cib_client_t * cib_client) { int call_type = 0; int call_options = 0; gboolean process = TRUE; gboolean is_update = TRUE; gboolean needs_reply = TRUE; gboolean local_notify = FALSE; gboolean needs_forward = FALSE; gboolean global_update = crm_is_true(crm_element_value(request, F_CIB_GLOBAL_UPDATE)); xmlNode *op_reply = NULL; xmlNode *result_diff = NULL; int rc = pcmk_ok; const char *op = crm_element_value(request, F_CIB_OPERATION); const char *originator = crm_element_value(request, F_ORIG); const char *host = crm_element_value(request, F_CIB_HOST); const char *client_id = crm_element_value(request, F_CIB_CLIENTID); crm_trace("%s Processing msg %s", cib_our_uname, crm_element_value(request, F_SEQ)); cib_num_ops++; if (cib_num_ops == 0) { cib_num_fail = 0; cib_num_local = 0; cib_num_updates = 0; crm_info("Stats wrapped around"); } if (host != NULL && strlen(host) == 0) { host = NULL; } crm_element_value_int(request, F_CIB_CALLOPTS, &call_options); if (force_synchronous) { call_options |= cib_sync_call; } crm_trace("Processing %s message (%s) for %s...", from_peer ? "peer" : "local", from_peer ? originator : cib_our_uname, host ? host : "master"); rc = cib_get_operation_id(op, &call_type); if (rc != pcmk_ok) { /* TODO: construct error reply? */ crm_err("Pre-processing of command failed: %s", pcmk_strerror(rc)); return; } is_update = cib_op_modifies(call_type); if (is_update) { cib_num_updates++; } if (from_peer == FALSE) { parse_local_options(cib_client, call_type, call_options, host, op, &local_notify, &needs_reply, &process, &needs_forward); } else if (parse_peer_options(call_type, request, &local_notify, &needs_reply, &process, &needs_forward) == FALSE) { return; } crm_trace("Finished determining processing actions"); if (call_options & cib_discard_reply) { needs_reply = is_update; local_notify = FALSE; } if (needs_forward) { forward_request(request, cib_client, call_options); return; } if (cib_status != pcmk_ok) { rc = cib_status; crm_err("Operation ignored, cluster configuration is invalid." " Please repair and restart: %s", pcmk_strerror(cib_status)); op_reply = cib_construct_reply(request, the_cib, cib_status); } else if (process) { int level = LOG_INFO; const char *section = crm_element_value(request, F_CIB_SECTION); cib_num_local++; rc = cib_process_command(request, &op_reply, &result_diff, privileged); if (global_update) { switch (rc) { case pcmk_ok: case -pcmk_err_old_data: case -pcmk_err_diff_resync: case -pcmk_err_diff_failed: level = LOG_DEBUG_2; break; default: level = LOG_ERR; } } else if (safe_str_eq(op, CIB_OP_QUERY)) { level = LOG_DEBUG_2; } else if (rc != pcmk_ok) { cib_num_fail++; level = LOG_WARNING; } else if (safe_str_eq(op, CIB_OP_SLAVE)) { level = LOG_DEBUG_2; } else if (safe_str_eq(section, XML_CIB_TAG_STATUS)) { level = LOG_DEBUG_2; } do_crm_log_unlikely(level, "Operation complete: op %s for section %s (origin=%s/%s/%s, version=%s.%s.%s): %s (rc=%d)", op, section ? section : "'all'", originator ? originator : "local", crm_element_value(request, F_CIB_CLIENTNAME), crm_element_value(request, F_CIB_CALLID), the_cib ? crm_element_value(the_cib, XML_ATTR_GENERATION_ADMIN) : "0", the_cib ? crm_element_value(the_cib, XML_ATTR_GENERATION) : "0", the_cib ? crm_element_value(the_cib, XML_ATTR_NUMUPDATES) : "0", pcmk_strerror(rc), rc); if (op_reply == NULL && (needs_reply || local_notify)) { crm_err("Unexpected NULL reply to message"); crm_log_xml_err(request, "null reply"); needs_reply = FALSE; local_notify = FALSE; } } crm_trace("processing response cases %.16x %.16x", call_options, cib_sync_call); /* from now on we are the server */ if (needs_reply == FALSE || stand_alone) { /* nothing more to do... * this was a non-originating slave update */ crm_trace("Completed slave update"); } else if (rc == pcmk_ok && result_diff != NULL && !(call_options & cib_inhibit_bcast)) { gboolean broadcast = FALSE; cib_local_bcast_num++; crm_xml_add_int(request, F_CIB_LOCAL_NOTIFY_ID, cib_local_bcast_num); broadcast = send_peer_reply(request, result_diff, originator, TRUE); if (broadcast && client_id && local_notify && op_reply) { /* If we have been asked to sync the reply, * and a bcast msg has gone out, we queue the local notify * until we know the bcast message has been received */ local_notify = FALSE; queue_local_notify(op_reply, client_id, (call_options & cib_sync_call), from_peer); op_reply = NULL; /* the reply is queued, so don't free here */ } } else if (call_options & cib_discard_reply) { crm_trace("Caller isn't interested in reply"); } else if (from_peer) { if (is_update == FALSE || result_diff == NULL) { crm_trace("Request not broadcast: R/O call"); } else if (call_options & cib_inhibit_bcast) { crm_trace("Request not broadcast: inhibited"); } else if (rc != pcmk_ok) { crm_trace("Request not broadcast: call failed: %s", pcmk_strerror(rc)); } else { crm_trace("Directing reply to %s", originator); } send_peer_reply(op_reply, result_diff, originator, FALSE); } if (local_notify && client_id) { if (process == FALSE) { do_local_notify(request, client_id, call_options & cib_sync_call, from_peer); } else { do_local_notify(op_reply, client_id, call_options & cib_sync_call, from_peer); } } free_xml(op_reply); free_xml(result_diff); return; } xmlNode * cib_construct_reply(xmlNode * request, xmlNode * output, int rc) { int lpc = 0; xmlNode *reply = NULL; const char *name = NULL; const char *value = NULL; const char *names[] = { F_CIB_OPERATION, F_CIB_CALLID, F_CIB_CLIENTID, F_CIB_CALLOPTS }; static int max = DIMOF(names); crm_trace("Creating a basic reply"); reply = create_xml_node(NULL, "cib-reply"); crm_xml_add(reply, F_TYPE, T_CIB); for (lpc = 0; lpc < max; lpc++) { name = names[lpc]; value = crm_element_value(request, name); crm_xml_add(reply, name, value); } crm_xml_add_int(reply, F_CIB_RC, rc); if (output != NULL) { crm_trace("Attaching reply output"); add_message_xml(reply, F_CIB_CALLDATA, output); } return reply; } int cib_process_command(xmlNode * request, xmlNode ** reply, xmlNode ** cib_diff, gboolean privileged) { xmlNode *input = NULL; xmlNode *output = NULL; xmlNode *result_cib = NULL; xmlNode *current_cib = NULL; #if ENABLE_ACL xmlNode *filtered_current_cib = NULL; #endif int call_type = 0; int call_options = 0; int log_level = LOG_DEBUG_4; const char *op = NULL; const char *section = NULL; int rc = pcmk_ok; int rc2 = pcmk_ok; gboolean send_r_notify = FALSE; gboolean global_update = FALSE; gboolean config_changed = FALSE; gboolean manage_counters = TRUE; CRM_ASSERT(cib_status == pcmk_ok); *reply = NULL; *cib_diff = NULL; current_cib = the_cib; /* Start processing the request... */ op = crm_element_value(request, F_CIB_OPERATION); crm_element_value_int(request, F_CIB_CALLOPTS, &call_options); rc = cib_get_operation_id(op, &call_type); if (rc == pcmk_ok && privileged == FALSE) { rc = cib_op_can_run(call_type, call_options, privileged, global_update); } rc2 = cib_op_prepare(call_type, request, &input, §ion); if (rc == pcmk_ok) { rc = rc2; } if (rc != pcmk_ok) { crm_trace("Call setup failed: %s", pcmk_strerror(rc)); goto done; } else if (cib_op_modifies(call_type) == FALSE) { #if ENABLE_ACL if (acl_enabled(config_hash) == FALSE || acl_filter_cib(request, current_cib, current_cib, &filtered_current_cib) == FALSE) { rc = cib_perform_op(op, call_options, cib_op_func(call_type), TRUE, section, request, input, FALSE, &config_changed, current_cib, &result_cib, NULL, &output); } else if (filtered_current_cib == NULL) { crm_debug("Pre-filtered the entire cib"); rc = -EACCES; } else { crm_debug("Pre-filtered the queried cib according to the ACLs"); rc = cib_perform_op(op, call_options, cib_op_func(call_type), TRUE, section, request, input, FALSE, &config_changed, filtered_current_cib, &result_cib, NULL, &output); } #else rc = cib_perform_op(op, call_options, cib_op_func(call_type), TRUE, section, request, input, FALSE, &config_changed, current_cib, &result_cib, NULL, &output); #endif CRM_CHECK(result_cib == NULL, free_xml(result_cib)); goto done; } /* Handle a valid write action */ global_update = crm_is_true(crm_element_value(request, F_CIB_GLOBAL_UPDATE)); if (global_update) { manage_counters = FALSE; call_options |= cib_force_diff; CRM_CHECK(call_type == 3 || call_type == 4, crm_err("Call type: %d", call_type); crm_log_xml_err(request, "bad op")); } #ifdef SUPPORT_PRENOTIFY if ((call_options & cib_inhibit_notify) == 0) { cib_pre_notify(call_options, op, the_cib, input); } #endif if (rc == pcmk_ok) { if (call_options & cib_inhibit_bcast) { /* skip */ crm_trace("Skipping update: inhibit broadcast"); manage_counters = FALSE; } rc = cib_perform_op(op, call_options, cib_op_func(call_type), FALSE, section, request, input, manage_counters, &config_changed, current_cib, &result_cib, cib_diff, &output); #if ENABLE_ACL if (acl_enabled(config_hash) == TRUE && acl_check_diff(request, current_cib, result_cib, *cib_diff) == FALSE) { rc = -EACCES; } #endif if (rc == pcmk_ok && config_changed) { time_t now; char *now_str = NULL; const char *validation = crm_element_value(result_cib, XML_ATTR_VALIDATION); if (validation) { int current_version = get_schema_version(validation); int support_version = get_schema_version("pacemaker-1.1"); /* Once the later schemas support the "update-*" attributes, change "==" to ">=" -- Changed */ if (current_version >= support_version) { const char *origin = crm_element_value(request, F_ORIG); crm_xml_replace(result_cib, XML_ATTR_UPDATE_ORIG, origin ? origin : cib_our_uname); crm_xml_replace(result_cib, XML_ATTR_UPDATE_CLIENT, crm_element_value(request, F_CIB_CLIENTNAME)); #if ENABLE_ACL crm_xml_replace(result_cib, XML_ATTR_UPDATE_USER, crm_element_value(request, F_CIB_USER)); #endif } } now = time(NULL); now_str = ctime(&now); now_str[24] = EOS; /* replace the newline */ crm_xml_replace(result_cib, XML_CIB_ATTR_WRITTEN, now_str); } if (manage_counters == FALSE) { config_changed = cib_config_changed(current_cib, result_cib, cib_diff); } /* Always write to disk for replace ops, * this negates the need to detect ordering changes */ if (config_changed == FALSE && crm_str_eq(CIB_OP_REPLACE, op, TRUE)) { config_changed = TRUE; } } cib_add_digest(result_cib, *cib_diff); if (rc == pcmk_ok && (call_options & cib_dryrun) == 0) { rc = activateCibXml(result_cib, config_changed, op); if (rc == pcmk_ok && cib_internal_config_changed(*cib_diff)) { cib_read_config(config_hash, result_cib); } if (crm_str_eq(CIB_OP_REPLACE, op, TRUE)) { if (section == NULL) { send_r_notify = TRUE; } else if (safe_str_eq(section, XML_TAG_CIB)) { send_r_notify = TRUE; } else if (safe_str_eq(section, XML_CIB_TAG_NODES)) { send_r_notify = TRUE; } else if (safe_str_eq(section, XML_CIB_TAG_STATUS)) { send_r_notify = TRUE; } } else if (crm_str_eq(CIB_OP_ERASE, op, TRUE)) { send_r_notify = TRUE; } } else if (rc == -pcmk_err_dtd_validation) { if (output != NULL) { crm_log_xml_info(output, "cib:output"); free_xml(output); } #if ENABLE_ACL { xmlNode *filtered_result_cib = NULL; if (acl_enabled(config_hash) == FALSE || acl_filter_cib(request, current_cib, result_cib, &filtered_result_cib) == FALSE) { output = result_cib; } else { crm_debug("Filtered the result cib for output according to the ACLs"); output = filtered_result_cib; if (result_cib != NULL) { free_xml(result_cib); } } } #else output = result_cib; #endif } else { free_xml(result_cib); } if ((call_options & cib_inhibit_notify) == 0) { const char *call_id = crm_element_value(request, F_CIB_CALLID); const char *client = crm_element_value(request, F_CIB_CLIENTNAME); #ifdef SUPPORT_POSTNOTIFY cib_post_notify(call_options, op, input, rc, the_cib); #endif cib_diff_notify(call_options, client, call_id, op, input, rc, *cib_diff); } if (send_r_notify) { const char *origin = crm_element_value(request, F_ORIG); cib_replace_notify(origin, the_cib, rc, *cib_diff); } if (rc != pcmk_ok) { log_level = LOG_DEBUG_4; if (rc == -pcmk_err_dtd_validation && global_update) { log_level = LOG_WARNING; crm_log_xml_info(input, "cib:global_update"); } } else if (config_changed) { log_level = LOG_DEBUG_3; if (cib_is_master) { log_level = LOG_NOTICE; } } else if (cib_is_master) { log_level = LOG_DEBUG_2; } log_cib_diff(log_level, *cib_diff, "cib:diff"); done: if ((call_options & cib_discard_reply) == 0) { *reply = cib_construct_reply(request, output, rc); crm_log_xml_trace(*reply, "cib:reply"); } #if ENABLE_ACL if (filtered_current_cib != NULL) { free_xml(filtered_current_cib); } #endif if (call_type >= 0) { cib_op_cleanup(call_type, call_options, &input, &output); } return rc; } gint cib_GCompareFunc(gconstpointer a, gconstpointer b) { const xmlNode *a_msg = a; const xmlNode *b_msg = b; int msg_a_id = 0; int msg_b_id = 0; const char *value = NULL; value = crm_element_value_const(a_msg, F_CIB_CALLID); msg_a_id = crm_parse_int(value, NULL); value = crm_element_value_const(b_msg, F_CIB_CALLID); msg_b_id = crm_parse_int(value, NULL); if (msg_a_id == msg_b_id) { return 0; } else if (msg_a_id < msg_b_id) { return -1; } return 1; } #if SUPPORT_HEARTBEAT void cib_ha_peer_callback(HA_Message * msg, void *private_data) { xmlNode *xml = convert_ha_message(NULL, msg, __FUNCTION__); cib_peer_callback(xml, private_data); free_xml(xml); } #endif void cib_peer_callback(xmlNode * msg, void *private_data) { const char *reason = NULL; const char *originator = crm_element_value(msg, F_ORIG); if (originator == NULL || crm_str_eq(originator, cib_our_uname, TRUE)) { /* message is from ourselves */ int bcast_id = 0; if (!(crm_element_value_int(msg, F_CIB_LOCAL_NOTIFY_ID, &bcast_id))) { check_local_notify(bcast_id); } return; } else if (crm_peer_cache == NULL) { reason = "membership not established"; goto bail; } if (crm_element_value(msg, F_CIB_CLIENTNAME) == NULL) { crm_xml_add(msg, F_CIB_CLIENTNAME, originator); } /* crm_log_xml_trace("Peer[inbound]", msg); */ cib_process_request(msg, FALSE, TRUE, TRUE, NULL); return; bail: if (reason) { const char *seq = crm_element_value(msg, F_SEQ); const char *op = crm_element_value(msg, F_CIB_OPERATION); crm_warn("Discarding %s message (%s) from %s: %s", op, seq, originator, reason); } } #if SUPPORT_HEARTBEAT extern oc_ev_t *cib_ev_token; static void *ccm_library = NULL; int (*ccm_api_callback_done) (void *cookie) = NULL; int (*ccm_api_handle_event) (const oc_ev_t * token) = NULL; void cib_client_status_callback(const char *node, const char *client, const char *status, void *private) { crm_node_t *peer = NULL; if (safe_str_eq(client, CRM_SYSTEM_CIB)) { crm_info("Status update: Client %s/%s now has status [%s]", node, client, status); if (safe_str_eq(status, JOINSTATUS)) { status = ONLINESTATUS; } else if (safe_str_eq(status, LEAVESTATUS)) { status = OFFLINESTATUS; } peer = crm_get_peer(0, node); crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cib, status); } return; } int cib_ccm_dispatch(gpointer user_data) { int rc = 0; oc_ev_t *ccm_token = (oc_ev_t *) user_data; crm_trace("received callback"); if (ccm_api_handle_event == NULL) { ccm_api_handle_event = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_handle_event", 1); } rc = (*ccm_api_handle_event) (ccm_token); if (0 == rc) { return 0; } crm_err("CCM connection appears to have failed: rc=%d.", rc); /* eventually it might be nice to recover and reconnect... but until then... */ crm_err("Exiting to recover from CCM connection failure"); crm_exit(2); return -1; } int current_instance = 0; void cib_ccm_msg_callback(oc_ed_t event, void *cookie, size_t size, const void *data) { gboolean update_id = FALSE; const oc_ev_membership_t *membership = data; CRM_ASSERT(membership != NULL); crm_info("Processing CCM event=%s (id=%d)", ccm_event_name(event), membership->m_instance); if (current_instance > membership->m_instance) { crm_err("Membership instance ID went backwards! %d->%d", current_instance, membership->m_instance); CRM_ASSERT(current_instance <= membership->m_instance); } switch (event) { case OC_EV_MS_NEW_MEMBERSHIP: case OC_EV_MS_INVALID: update_id = TRUE; break; case OC_EV_MS_PRIMARY_RESTORED: update_id = TRUE; break; case OC_EV_MS_NOT_PRIMARY: crm_trace("Ignoring transitional CCM event: %s", ccm_event_name(event)); break; case OC_EV_MS_EVICTED: crm_err("Evicted from CCM: %s", ccm_event_name(event)); break; default: crm_err("Unknown CCM event: %d", event); } if (update_id) { unsigned int lpc = 0; CRM_CHECK(membership != NULL, return); current_instance = membership->m_instance; for (lpc = 0; lpc < membership->m_n_out; lpc++) { crm_update_ccm_node(membership, lpc + membership->m_out_idx, CRM_NODE_LOST, current_instance); } for (lpc = 0; lpc < membership->m_n_member; lpc++) { crm_update_ccm_node(membership, lpc + membership->m_memb_idx, CRM_NODE_ACTIVE, current_instance); } } if (ccm_api_callback_done == NULL) { ccm_api_callback_done = find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_callback_done", 1); } (*ccm_api_callback_done) (cookie); return; } #endif gboolean can_write(int flags) { return TRUE; } static gboolean cib_force_exit(gpointer data) { crm_notice("Forcing exit!"); terminate_cib(__FUNCTION__, TRUE); return FALSE; } static void disconnect_remote_client(gpointer key, gpointer value, gpointer user_data) { cib_client_t *a_client = value; crm_err("Disconnecting %s... Not implemented", crm_str(a_client->name)); } void cib_shutdown(int nsig) { struct qb_ipcs_stats srv_stats; if (cib_shutdown_flag == FALSE) { int disconnects = 0; qb_ipcs_connection_t *c = NULL; cib_shutdown_flag = TRUE; c = qb_ipcs_connection_first_get(ipcs_rw); while(c != NULL) { qb_ipcs_connection_t *last = c; c = qb_ipcs_connection_next_get(ipcs_rw, last); crm_debug("Disconnecting r/w client %p...", last); qb_ipcs_disconnect(last); qb_ipcs_connection_unref(last); disconnects++; } c = qb_ipcs_connection_first_get(ipcs_ro); while(c != NULL) { qb_ipcs_connection_t *last = c; c = qb_ipcs_connection_next_get(ipcs_ro, last); crm_debug("Disconnecting r/o client %p...", last); qb_ipcs_disconnect(last); qb_ipcs_connection_unref(last); disconnects++; } c = qb_ipcs_connection_first_get(ipcs_shm); while(c != NULL) { qb_ipcs_connection_t *last = c; c = qb_ipcs_connection_next_get(ipcs_shm, last); crm_debug("Disconnecting non-blocking r/w client %p...", last); qb_ipcs_disconnect(last); qb_ipcs_connection_unref(last); disconnects++; } disconnects += g_hash_table_size(client_list); crm_debug("Disconnecting %d remote clients", g_hash_table_size(client_list)); g_hash_table_foreach(client_list, disconnect_remote_client, NULL); crm_info("Disconnected %d clients", disconnects); } qb_ipcs_stats_get(ipcs_rw, &srv_stats, QB_FALSE); if(g_hash_table_size(client_list) == 0) { crm_info("All clients disconnected (%d)", srv_stats.active_connections); initiate_exit(); } else { crm_info("Waiting on %d clients to disconnect (%d)", g_hash_table_size(client_list), srv_stats.active_connections); } } void initiate_exit(void) { int active = 0; xmlNode *leaving = NULL; active = crm_active_peers(); if (active < 2) { terminate_cib(__FUNCTION__, FALSE); return; } crm_info("Sending disconnect notification to %d peers...", active); leaving = create_xml_node(NULL, "exit-notification"); crm_xml_add(leaving, F_TYPE, "cib"); crm_xml_add(leaving, F_CIB_OPERATION, "cib_shutdown_req"); send_cluster_message(NULL, crm_msg_cib, leaving, TRUE); free_xml(leaving); g_timeout_add(crm_get_msec("5s"), cib_force_exit, NULL); } extern int remote_fd; extern int remote_tls_fd; extern void terminate_cs_connection(void); void terminate_cib(const char *caller, gboolean fast) { if (remote_fd > 0) { close(remote_fd); remote_fd = 0; } if (remote_tls_fd > 0) { close(remote_tls_fd); remote_tls_fd = 0; } if(!fast) { crm_info("%s: Disconnecting from cluster infrastructure", caller); crm_cluster_disconnect(&crm_cluster); } uninitializeCib(); crm_info("%s: Exiting%s...", caller, fast?" fast":mainloop?" from mainloop":""); if(fast == FALSE && mainloop != NULL && g_main_is_running(mainloop)) { g_main_quit(mainloop); } else { qb_ipcs_destroy(ipcs_ro); qb_ipcs_destroy(ipcs_rw); qb_ipcs_destroy(ipcs_shm); if (fast) { crm_exit(EX_USAGE); } else { crm_exit(EX_OK); } } } diff --git a/cib/callbacks.h b/cib/callbacks.h index 99a5065bf5..b8af997b63 100644 --- a/cib/callbacks.h +++ b/cib/callbacks.h @@ -1,97 +1,101 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #ifdef HAVE_GNUTLS_GNUTLS_H # undef KEYFILE # include #endif extern gboolean cib_is_master; extern GHashTable *client_list; extern GHashTable *peer_hash; extern GHashTable *config_hash; typedef struct cib_client_s { char *id; char *name; char *callback_id; char *user; + char *recv_buf; int request_id; qb_ipcs_connection_t *ipc; #ifdef HAVE_GNUTLS_GNUTLS_H gnutls_session *session; + gboolean handshake_complete; #else void *session; #endif gboolean encrypted; + gboolean remote_auth; mainloop_io_t *remote; - + unsigned long num_calls; int pre_notify; int post_notify; int confirmations; int replace; int diffs; + int remote_auth_timeout; GList *delegated_calls; } cib_client_t; typedef struct cib_operation_s { const char *operation; gboolean modifies_cib; gboolean needs_privileges; gboolean needs_quorum; int (*prepare) (xmlNode *, xmlNode **, const char **); int (*cleanup) (int, xmlNode **, xmlNode **); int (*fn) (const char *, int, const char *, xmlNode *, xmlNode *, xmlNode *, xmlNode **, xmlNode **); } cib_operation_t; extern struct qb_ipcs_service_handlers ipc_ro_callbacks; extern struct qb_ipcs_service_handlers ipc_rw_callbacks; extern qb_ipcs_service_t *ipcs_ro; extern qb_ipcs_service_t *ipcs_rw; extern qb_ipcs_service_t *ipcs_shm; extern void cib_peer_callback(xmlNode * msg, void *private_data); extern void cib_client_status_callback(const char *node, const char *client, const char *status, void *private); extern void cib_common_callback_worker(uint32_t id, uint32_t flags, xmlNode * op_request, cib_client_t * cib_client, gboolean privileged); void cib_shutdown(int nsig); void initiate_exit(void); void terminate_cib(const char *caller, gboolean fast); #if SUPPORT_HEARTBEAT extern void cib_ha_peer_callback(HA_Message * msg, void *private_data); extern int cib_ccm_dispatch(gpointer user_data); extern void cib_ccm_msg_callback(oc_ed_t event, void *cookie, size_t size, const void *data); #endif diff --git a/cib/notify.c b/cib/notify.c index 9dcb6997af..1dcda8fcdf 100644 --- a/cib/notify.c +++ b/cib/notify.c @@ -1,345 +1,345 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include int pending_updates = 0; extern GHashTable *client_list; gboolean cib_notify_client(gpointer key, gpointer value, gpointer user_data); void attach_cib_generation(xmlNode * msg, const char *field, xmlNode * a_cib); void do_cib_notify(int options, const char *op, xmlNode * update, int result, xmlNode * result_data, const char *msg_type); static void need_pre_notify(gpointer key, gpointer value, gpointer user_data) { cib_client_t *client = value; if (client->pre_notify) { gboolean *needed = user_data; *needed = TRUE; } } static void need_post_notify(gpointer key, gpointer value, gpointer user_data) { cib_client_t *client = value; if (client->post_notify) { gboolean *needed = user_data; *needed = TRUE; } } gboolean cib_notify_client(gpointer key, gpointer value, gpointer user_data) { const char *type = NULL; gboolean do_send = FALSE; cib_client_t *client = value; xmlNode *update_msg = user_data; CRM_CHECK(client != NULL, return TRUE); CRM_CHECK(update_msg != NULL, return TRUE); - if (client->ipc == NULL) { + if (client->ipc == NULL && client->session == NULL) { crm_warn("Skipping client with NULL channel"); return FALSE; } type = crm_element_value(update_msg, F_SUBTYPE); CRM_LOG_ASSERT(type != NULL); if (client->diffs && safe_str_eq(type, T_CIB_DIFF_NOTIFY)) { do_send = TRUE; } else if (client->replace && safe_str_eq(type, T_CIB_REPLACE_NOTIFY)) { do_send = TRUE; } else if (client->confirmations && safe_str_eq(type, T_CIB_UPDATE_CONFIRM)) { do_send = TRUE; } else if (client->pre_notify && safe_str_eq(type, T_CIB_PRE_NOTIFY)) { do_send = TRUE; } else if (client->post_notify && safe_str_eq(type, T_CIB_POST_NOTIFY)) { do_send = TRUE; } if (do_send) { if (client->ipc) { if(crm_ipcs_send(client->ipc, 0, update_msg, TRUE) == FALSE) { crm_warn("Notification of client %s/%s failed", client->name, client->id); } #ifdef HAVE_GNUTLS_GNUTLS_H } else if (client->session) { crm_debug("Sent %s notification to client %s/%s", type, client->name, client->id); crm_send_remote_msg(client->session, update_msg, client->encrypted); #endif } else { crm_err("Unknown transport for %s", client->name); } } return FALSE; } void cib_pre_notify(int options, const char *op, xmlNode * existing, xmlNode * update) { xmlNode *update_msg = NULL; const char *type = NULL; const char *id = NULL; gboolean needed = FALSE; g_hash_table_foreach(client_list, need_pre_notify, &needed); if (needed == FALSE) { return; } /* TODO: consider pre-notification for removal */ update_msg = create_xml_node(NULL, "pre-notify"); if (update != NULL) { id = crm_element_value(update, XML_ATTR_ID); } crm_xml_add(update_msg, F_TYPE, T_CIB_NOTIFY); crm_xml_add(update_msg, F_SUBTYPE, T_CIB_PRE_NOTIFY); crm_xml_add(update_msg, F_CIB_OPERATION, op); if (id != NULL) { crm_xml_add(update_msg, F_CIB_OBJID, id); } if (update != NULL) { crm_xml_add(update_msg, F_CIB_OBJTYPE, crm_element_name(update)); } else if (existing != NULL) { crm_xml_add(update_msg, F_CIB_OBJTYPE, crm_element_name(existing)); } type = crm_element_value(update_msg, F_CIB_OBJTYPE); attach_cib_generation(update_msg, "cib_generation", the_cib); if (existing != NULL) { add_message_xml(update_msg, F_CIB_EXISTING, existing); } if (update != NULL) { add_message_xml(update_msg, F_CIB_UPDATE, update); } g_hash_table_foreach_remove(client_list, cib_notify_client, update_msg); if (update == NULL) { crm_trace("Performing operation %s (on section=%s)", op, type); } else { crm_trace("Performing %s on <%s%s%s>", op, type, id ? " id=" : "", id ? id : ""); } free_xml(update_msg); } void cib_post_notify(int options, const char *op, xmlNode * update, int result, xmlNode * new_obj) { gboolean needed = FALSE; g_hash_table_foreach(client_list, need_post_notify, &needed); if (needed == FALSE) { return; } do_cib_notify(options, op, update, result, new_obj, T_CIB_UPDATE_CONFIRM); } void cib_diff_notify(int options, const char *client, const char *call_id, const char *op, xmlNode * update, int result, xmlNode * diff) { int add_updates = 0; int add_epoch = 0; int add_admin_epoch = 0; int del_updates = 0; int del_epoch = 0; int del_admin_epoch = 0; int log_level = LOG_DEBUG_2; if (diff == NULL) { return; } if (result != pcmk_ok) { log_level = LOG_WARNING; } cib_diff_version_details(diff, &add_admin_epoch, &add_epoch, &add_updates, &del_admin_epoch, &del_epoch, &del_updates); if (add_updates != del_updates) { do_crm_log(log_level, "Update (client: %s%s%s): %d.%d.%d -> %d.%d.%d (%s)", client, call_id ? ", call:" : "", call_id ? call_id : "", del_admin_epoch, del_epoch, del_updates, add_admin_epoch, add_epoch, add_updates, pcmk_strerror(result)); } else if (diff != NULL) { do_crm_log(log_level, "Local-only Change (client:%s%s%s): %d.%d.%d (%s)", client, call_id ? ", call: " : "", call_id ? call_id : "", add_admin_epoch, add_epoch, add_updates, pcmk_strerror(result)); } do_cib_notify(options, op, update, result, diff, T_CIB_DIFF_NOTIFY); } void do_cib_notify(int options, const char *op, xmlNode * update, int result, xmlNode * result_data, const char *msg_type) { xmlNode *update_msg = NULL; const char *id = NULL; update_msg = create_xml_node(NULL, "notify"); if (result_data != NULL) { id = crm_element_value(result_data, XML_ATTR_ID); } crm_xml_add(update_msg, F_TYPE, T_CIB_NOTIFY); crm_xml_add(update_msg, F_SUBTYPE, msg_type); crm_xml_add(update_msg, F_CIB_OPERATION, op); crm_xml_add_int(update_msg, F_CIB_RC, result); if (id != NULL) { crm_xml_add(update_msg, F_CIB_OBJID, id); } if (update != NULL) { crm_trace("Setting type to update->name: %s", crm_element_name(update)); crm_xml_add(update_msg, F_CIB_OBJTYPE, crm_element_name(update)); } else if (result_data != NULL) { crm_trace("Setting type to new_obj->name: %s", crm_element_name(result_data)); crm_xml_add(update_msg, F_CIB_OBJTYPE, crm_element_name(result_data)); } else { crm_trace("Not Setting type"); } attach_cib_generation(update_msg, "cib_generation", the_cib); if (update != NULL) { add_message_xml(update_msg, F_CIB_UPDATE, update); } if (result_data != NULL) { add_message_xml(update_msg, F_CIB_UPDATE_RESULT, result_data); } crm_trace("Notifying clients"); g_hash_table_foreach_remove(client_list, cib_notify_client, update_msg); free_xml(update_msg); crm_trace("Notify complete"); } void attach_cib_generation(xmlNode * msg, const char *field, xmlNode * a_cib) { xmlNode *generation = create_xml_node(NULL, XML_CIB_TAG_GENERATION_TUPPLE); if (a_cib != NULL) { copy_in_properties(generation, a_cib); } add_message_xml(msg, field, generation); free_xml(generation); } void cib_replace_notify(const char *origin, xmlNode * update, int result, xmlNode * diff) { xmlNode *replace_msg = NULL; int add_updates = 0; int add_epoch = 0; int add_admin_epoch = 0; int del_updates = 0; int del_epoch = 0; int del_admin_epoch = 0; if (diff == NULL) { return; } cib_diff_version_details(diff, &add_admin_epoch, &add_epoch, &add_updates, &del_admin_epoch, &del_epoch, &del_updates); if(del_updates < 0) { crm_log_xml_debug(diff, "Bad replace diff"); } if (add_updates != del_updates) { crm_info("Replaced: %d.%d.%d -> %d.%d.%d from %s", del_admin_epoch, del_epoch, del_updates, add_admin_epoch, add_epoch, add_updates, crm_str(origin)); } else if (diff != NULL) { crm_info("Local-only Replace: %d.%d.%d from %s", add_admin_epoch, add_epoch, add_updates, crm_str(origin)); } replace_msg = create_xml_node(NULL, "notify-replace"); crm_xml_add(replace_msg, F_TYPE, T_CIB_NOTIFY); crm_xml_add(replace_msg, F_SUBTYPE, T_CIB_REPLACE_NOTIFY); crm_xml_add(replace_msg, F_CIB_OPERATION, CIB_OP_REPLACE); crm_xml_add_int(replace_msg, F_CIB_RC, result); attach_cib_generation(replace_msg, "cib-replace-generation", update); crm_log_xml_trace(replace_msg, "CIB Replaced"); g_hash_table_foreach_remove(client_list, cib_notify_client, replace_msg); free_xml(replace_msg); } diff --git a/cib/remote.c b/cib/remote.c index 7ad7132b2c..f9b90a074c 100644 --- a/cib/remote.c +++ b/cib/remote.c @@ -1,616 +1,700 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "callbacks.h" /* #undef HAVE_PAM_PAM_APPL_H */ /* #undef HAVE_GNUTLS_GNUTLS_H */ #ifdef HAVE_GNUTLS_GNUTLS_H # undef KEYFILE # include #endif #include #include #if HAVE_SECURITY_PAM_APPL_H # include # define HAVE_PAM 1 #else # if HAVE_PAM_PAM_APPL_H # include # define HAVE_PAM 1 # endif #endif -#ifdef HAVE_DECL_NANOSLEEP -# include -#endif - extern int remote_tls_fd; extern gboolean cib_shutdown_flag; int init_remote_listener(int port, gboolean encrypted); void cib_remote_connection_destroy(gpointer user_data); #ifdef HAVE_GNUTLS_GNUTLS_H # define DH_BITS 1024 gnutls_dh_params dh_params; -extern gnutls_anon_server_credentials anon_cred_s; +gnutls_anon_server_credentials anon_cred_s; static void debug_log(int level, const char *str) { fputs(str, stderr); } - -extern gnutls_session *create_tls_session(int csock, int type); - #endif +#define REMOTE_AUTH_TIMEOUT 10000 + int num_clients; int authenticate_user(const char *user, const char *passwd); int cib_remote_listen(gpointer data); int cib_remote_msg(gpointer data); static void remote_connection_destroy(gpointer user_data) { return; } #define ERROR_SUFFIX " Shutting down remote listener" int init_remote_listener(int port, gboolean encrypted) { int rc; int *ssock = NULL; struct sockaddr_in saddr; int optval; static struct mainloop_fd_callbacks remote_listen_fd_callbacks = { .dispatch = cib_remote_listen, .destroy = remote_connection_destroy, }; if (port <= 0) { /* dont start it */ return 0; } if (encrypted) { #ifndef HAVE_GNUTLS_GNUTLS_H crm_warn("TLS support is not available"); return 0; #else crm_notice("Starting a tls listener on port %d.", port); gnutls_global_init(); -/* gnutls_global_set_log_level (10); */ + /* gnutls_global_set_log_level (10); */ gnutls_global_set_log_function(debug_log); gnutls_dh_params_init(&dh_params); gnutls_dh_params_generate2(dh_params, DH_BITS); gnutls_anon_allocate_server_credentials(&anon_cred_s); gnutls_anon_set_server_dh_params(anon_cred_s, dh_params); #endif } else { crm_warn("Starting a plain_text listener on port %d.", port); } #ifndef HAVE_PAM crm_warn("PAM is _not_ enabled!"); #endif /* create server socket */ ssock = malloc(sizeof(int)); *ssock = socket(AF_INET, SOCK_STREAM, 0); if (*ssock == -1) { crm_perror(LOG_ERR, "Can not create server socket." ERROR_SUFFIX); free(ssock); return -1; } /* reuse address */ optval = 1; rc = setsockopt(*ssock, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)); if(rc < 0) { crm_perror(LOG_INFO, "Couldn't allow the reuse of local addresses by our remote listener"); } /* bind server socket */ memset(&saddr, '\0', sizeof(saddr)); saddr.sin_family = AF_INET; saddr.sin_addr.s_addr = INADDR_ANY; saddr.sin_port = htons(port); if (bind(*ssock, (struct sockaddr *)&saddr, sizeof(saddr)) == -1) { crm_perror(LOG_ERR, "Can not bind server socket." ERROR_SUFFIX); close(*ssock); free(ssock); return -2; } if (listen(*ssock, 10) == -1) { crm_perror(LOG_ERR, "Can not start listen." ERROR_SUFFIX); close(*ssock); free(ssock); return -3; } mainloop_add_fd("cib-remote", G_PRIORITY_DEFAULT, *ssock, ssock, &remote_listen_fd_callbacks); return *ssock; } static int check_group_membership(const char *usr, const char *grp) { int index = 0; struct passwd *pwd = NULL; struct group *group = NULL; CRM_CHECK(usr != NULL, return FALSE); CRM_CHECK(grp != NULL, return FALSE); pwd = getpwnam(usr); if (pwd == NULL) { crm_err("No user named '%s' exists!", usr); return FALSE; } group = getgrgid(pwd->pw_gid); if (group != NULL && crm_str_eq(grp, group->gr_name, TRUE)) { return TRUE; } group = getgrnam(grp); if (group == NULL) { crm_err("No group named '%s' exists!", grp); return FALSE; } while (TRUE) { char *member = group->gr_mem[index++]; if (member == NULL) { break; } else if (crm_str_eq(usr, member, TRUE)) { return TRUE; } }; return FALSE; } +static gboolean +cib_remote_auth(xmlNode *login) +{ + const char *user = NULL; + const char *pass = NULL; + const char *tmp = NULL; + + crm_log_xml_info(login, "Login: "); + if (login == NULL) { + return FALSE; + } + + tmp = crm_element_name(login); + if (safe_str_neq(tmp, "cib_command")) { + crm_err("Wrong tag: %s", tmp); + return FALSE; + } + + tmp = crm_element_value(login, "op"); + if (safe_str_neq(tmp, "authenticate")) { + crm_err("Wrong operation: %s", tmp); + return FALSE; + } + + user = crm_element_value(login, "user"); + pass = crm_element_value(login, "password"); + + if (!user || !pass) { + crm_err("missing auth credentials"); + return FALSE; + } + + /* Non-root daemons can only validate the password of the + * user they're running as + */ + if (check_group_membership(user, CRM_DAEMON_GROUP) == FALSE) { + crm_err("User is not a member of the required group"); + return FALSE; + + } else if (authenticate_user(user, pass) == FALSE) { + crm_err("PAM auth failed"); + return FALSE; + } + + return TRUE; +} + +static gboolean +remote_auth_timeout_cb(gpointer data) +{ + cib_client_t *client = data; + + client->remote_auth_timeout = 0; + + if (client->remote_auth == TRUE) { + return FALSE; + } + + mainloop_del_fd(client->remote); + crm_err("Remote client authentication timed out"); + + return FALSE; +} int cib_remote_listen(gpointer data) { - int lpc = 0; int csock = 0; unsigned laddr; - time_t now = 0; - time_t start = time(NULL); struct sockaddr_in addr; int ssock = *(int *)data; + int flag; #ifdef HAVE_GNUTLS_GNUTLS_H gnutls_session *session = NULL; #endif cib_client_t *new_client = NULL; - xmlNode *login = NULL; - const char *user = NULL; - const char *pass = NULL; - const char *tmp = NULL; - -#ifdef HAVE_DECL_NANOSLEEP - const struct timespec sleepfast = { 0, 10000000 }; /* 10 millisec */ -#endif - static struct mainloop_fd_callbacks remote_client_fd_callbacks = { .dispatch = cib_remote_msg, .destroy = cib_remote_connection_destroy, - }; - + }; + /* accept the connection */ laddr = sizeof(addr); csock = accept(ssock, (struct sockaddr *)&addr, &laddr); crm_debug("New %s connection from %s", ssock == remote_tls_fd ? "secure" : "clear-text", inet_ntoa(addr.sin_addr)); if (csock == -1) { crm_err("accept socket failed"); return TRUE; } + if ((flag = fcntl(csock, F_GETFL)) >= 0) { + if (fcntl(csock, F_SETFL, flag | O_NONBLOCK) < 0) { + crm_err( "fcntl() write failed"); + close(csock); + return TRUE; + } + } else { + crm_err( "fcntl() read failed"); + close(csock); + return TRUE; + } + if (ssock == remote_tls_fd) { #ifdef HAVE_GNUTLS_GNUTLS_H /* create gnutls session for the server socket */ - session = create_tls_session(csock, GNUTLS_SERVER); + session = crm_create_anon_tls_session(csock, GNUTLS_SERVER, anon_cred_s); if (session == NULL) { crm_err("TLS session creation failed"); close(csock); return TRUE; } #endif } - do { - crm_trace("Iter: %d", lpc++); - if (ssock == remote_tls_fd) { -#ifdef HAVE_GNUTLS_GNUTLS_H - login = crm_recv_remote_msg(session, TRUE); -#endif - } else { - login = crm_recv_remote_msg(GINT_TO_POINTER(csock), FALSE); - } - if (login != NULL) { - break; - } -#ifdef HAVE_DECL_NANOSLEEP - nanosleep(&sleepfast, NULL); -#else - sleep(1); -#endif - now = time(NULL); - - /* Peers have 3s to connect */ - } while (login == NULL && (start - now) < 4); - - crm_log_xml_info(login, "Login: "); - if (login == NULL) { - goto bail; - } - - tmp = crm_element_name(login); - if (safe_str_neq(tmp, "cib_command")) { - crm_err("Wrong tag: %s", tmp); - goto bail; - } - - tmp = crm_element_value(login, "op"); - if (safe_str_neq(tmp, "authenticate")) { - crm_err("Wrong operation: %s", tmp); - goto bail; - } - - user = crm_element_value(login, "user"); - pass = crm_element_value(login, "password"); - - /* Non-root daemons can only validate the password of the - * user they're running as - */ - if (check_group_membership(user, CRM_DAEMON_GROUP) == FALSE) { - crm_err("User is not a member of the required group"); - goto bail; - - } else if (authenticate_user(user, pass) == FALSE) { - crm_err("PAM auth failed"); - goto bail; - } - - /* send ACK */ num_clients++; new_client = calloc(1, sizeof(cib_client_t)); - new_client->name = crm_element_value_copy(login, "name"); - - CRM_CHECK(new_client->id == NULL, free(new_client->id)); new_client->id = crm_generate_uuid(); - -#if ENABLE_ACL - new_client->user = strdup(user); -#endif - new_client->callback_id = NULL; + /* clients have a few seconds to perform handshake. */ + new_client->remote_auth_timeout = g_timeout_add(REMOTE_AUTH_TIMEOUT, remote_auth_timeout_cb, new_client); + if (ssock == remote_tls_fd) { #ifdef HAVE_GNUTLS_GNUTLS_H new_client->encrypted = TRUE; new_client->session = session; #endif } else { new_client->session = GINT_TO_POINTER(csock); } - free_xml(login); - login = create_xml_node(NULL, "cib_result"); - crm_xml_add(login, F_CIB_OPERATION, CRM_OP_REGISTER); - crm_xml_add(login, F_CIB_CLIENTID, new_client->id); - crm_send_remote_msg(new_client->session, login, new_client->encrypted); - free_xml(login); - new_client->remote = mainloop_add_fd( "cib-remote-client", G_PRIORITY_DEFAULT, csock, new_client, &remote_client_fd_callbacks); g_hash_table_insert(client_list, new_client->id, new_client); return TRUE; - - bail: - if (ssock == remote_tls_fd) { -#ifdef HAVE_GNUTLS_GNUTLS_H - gnutls_bye(*session, GNUTLS_SHUT_RDWR); - gnutls_deinit(*session); - gnutls_free(session); -#endif - } - close(csock); - free_xml(login); - return TRUE; } void cib_remote_connection_destroy(gpointer user_data) { cib_client_t *client = user_data; + int csock = 0; if (client == NULL) { return; } crm_trace("Cleaning up after client disconnect: %s/%s", crm_str(client->name), client->id); if (client->id != NULL) { if (!g_hash_table_remove(client_list, client->id)) { crm_err("Client %s not found in the hashtable", client->name); } } crm_trace("Destroying %s (%p)", client->name, user_data); num_clients--; crm_trace("Num unfree'd clients: %d", num_clients); + if (client->remote_auth_timeout) { + g_source_remove(client->remote_auth_timeout); + } + + if (client->encrypted) { +#ifdef HAVE_GNUTLS_GNUTLS_H + if (client->session) { + void *sock_ptr = gnutls_transport_get_ptr(*client->session); + csock = GPOINTER_TO_INT(sock_ptr); + if (client->handshake_complete) { + gnutls_bye(*client->session, GNUTLS_SHUT_WR); + } + gnutls_deinit(*client->session); + gnutls_free(client->session); + } +#endif + } else { + csock = GPOINTER_TO_INT(client->session); + } + client->session = NULL; + + if (csock > 0) { + close(csock); + } + free(client->name); free(client->callback_id); free(client->id); free(client->user); + free(client->recv_buf); free(client); crm_trace("Freed the cib client"); if (cib_shutdown_flag) { cib_shutdown(0); } return; } -int -cib_remote_msg(gpointer data) +static void +cib_handle_remote_msg(cib_client_t *client, xmlNode *command) { const char *value = NULL; - xmlNode *command = NULL; - cib_client_t *client = data; - - crm_trace("%s callback", client->encrypted ? "secure" : "clear-text"); - - command = crm_recv_remote_msg(client->session, client->encrypted); - if (command == NULL) { - return -1; - } value = crm_element_name(command); if (safe_str_neq(value, "cib_command")) { crm_log_xml_trace(command, "Bad command: "); - goto bail; + return; } if (client->name == NULL) { value = crm_element_value(command, F_CLIENTNAME); if (value == NULL) { client->name = strdup(client->id); } else { client->name = strdup(value); } } if (client->callback_id == NULL) { value = crm_element_value(command, F_CIB_CALLBACK_TOKEN); if (value != NULL) { client->callback_id = strdup(value); crm_trace("Callback channel for %s is %s", client->id, client->callback_id); } else { client->callback_id = strdup(client->id); } } /* unset dangerous options */ xml_remove_prop(command, F_ORIG); xml_remove_prop(command, F_CIB_HOST); xml_remove_prop(command, F_CIB_GLOBAL_UPDATE); crm_xml_add(command, F_TYPE, T_CIB); crm_xml_add(command, F_CIB_CLIENTID, client->id); crm_xml_add(command, F_CIB_CLIENTNAME, client->name); #if ENABLE_ACL crm_xml_add(command, F_CIB_USER, client->user); #endif if (crm_element_value(command, F_CIB_CALLID) == NULL) { char *call_uuid = crm_generate_uuid(); /* fix the command */ crm_xml_add(command, F_CIB_CALLID, call_uuid); free(call_uuid); } if (crm_element_value(command, F_CIB_CALLOPTS) == NULL) { crm_xml_add_int(command, F_CIB_CALLOPTS, 0); } crm_log_xml_trace(command, "Remote command: "); cib_common_callback_worker(0, 0, command, client, TRUE); - bail: - free_xml(command); - command = NULL; +} + +int +cib_remote_msg(gpointer data) +{ + xmlNode *command = NULL; + cib_client_t *client = data; + int disconnected = 0; + int timeout = client->remote_auth ? -1 : 1000; + + crm_trace("%s callback", client->encrypted ? "secure" : "clear-text"); + +#ifdef HAVE_GNUTLS_GNUTLS_H + if (client->encrypted && (client->handshake_complete == FALSE)) { + int rc = 0; + + /* Muliple calls to handshake will be required, this callback + * will be invoked once the client sends more handshake data. */ + do { + rc = gnutls_handshake(*client->session); + + if (rc < 0 && rc != GNUTLS_E_AGAIN) { + crm_err("Remote cib tls handshake failed"); + return -1; + } + } while (rc == GNUTLS_E_INTERRUPTED); + + if (rc == 0) { + crm_debug("Remote cib tls handshake completed"); + client->handshake_complete = TRUE; + if (client->remote_auth_timeout) { + g_source_remove(client->remote_auth_timeout); + } + /* after handshake, clients must send auth in a few seconds */ + client->remote_auth_timeout = g_timeout_add(REMOTE_AUTH_TIMEOUT, remote_auth_timeout_cb, client); + } + return 0; + } +#endif + + crm_recv_remote_msg(client->session, &client->recv_buf, client->encrypted, timeout, &disconnected); + + /* must pass auth before we will process anything else */ + if (client->remote_auth == FALSE) { + xmlNode *reg; +#if ENABLE_ACL + const char *user = NULL; +#endif + command = crm_parse_remote_buffer(&client->recv_buf); + if (cib_remote_auth(command) == FALSE) { + free_xml(command); + return -1; + } + + crm_debug("remote connection authenticated successfully"); + client->remote_auth = TRUE; + g_source_remove(client->remote_auth_timeout); + client->remote_auth_timeout = 0; + client->name = crm_element_value_copy(command, "name"); + +#if ENABLE_ACL + user = crm_element_value(command, "user"); + if (user) { + new_client->user = strdup(user); + } +#endif + + /* send ACK */ + reg = create_xml_node(NULL, "cib_result"); + crm_xml_add(reg, F_CIB_OPERATION, CRM_OP_REGISTER); + crm_xml_add(reg, F_CIB_CLIENTID, client->id); + crm_send_remote_msg(client->session, reg, client->encrypted); + free_xml(reg); + free_xml(command); + } + + command = crm_parse_remote_buffer(&client->recv_buf); + while (command) { + crm_trace("command received"); + cib_handle_remote_msg(client, command); + free_xml(command); + command = crm_parse_remote_buffer(&client->recv_buf); + } + + if (disconnected) { + crm_trace("disconnected while receiving remote cib msg."); + return -1; + } + return 0; } #ifdef HAVE_PAM /* * Useful Examples: * http://www.kernel.org/pub/linux/libs/pam/Linux-PAM-html * http://developer.apple.com/samplecode/CryptNoMore/index.html */ static int construct_pam_passwd(int num_msg, const struct pam_message **msg, struct pam_response **response, void *data) { int count = 0; struct pam_response *reply; char *string = (char *)data; CRM_CHECK(data, return PAM_CONV_ERR); CRM_CHECK(num_msg == 1, return PAM_CONV_ERR); /* We only want to handle one message */ reply = calloc(1, sizeof(struct pam_response)); CRM_ASSERT(reply != NULL); for (count = 0; count < num_msg; ++count) { switch (msg[count]->msg_style) { case PAM_TEXT_INFO: crm_info("PAM: %s\n", msg[count]->msg); break; case PAM_PROMPT_ECHO_OFF: case PAM_PROMPT_ECHO_ON: reply[count].resp_retcode = 0; reply[count].resp = string; /* We already made a copy */ case PAM_ERROR_MSG: /* In theory we'd want to print this, but then * we see the password prompt in the logs */ /* crm_err("PAM error: %s\n", msg[count]->msg); */ break; default: crm_err("Unhandled conversation type: %d", msg[count]->msg_style); goto bail; } } *response = reply; reply = NULL; return PAM_SUCCESS; bail: for (count = 0; count < num_msg; ++count) { if (reply[count].resp != NULL) { switch (msg[count]->msg_style) { case PAM_PROMPT_ECHO_ON: case PAM_PROMPT_ECHO_OFF: /* Erase the data - it contained a password */ while (*(reply[count].resp)) { *(reply[count].resp)++ = '\0'; } free(reply[count].resp); break; } reply[count].resp = NULL; } } free(reply); reply = NULL; return PAM_CONV_ERR; } #endif int authenticate_user(const char *user, const char *passwd) { #ifndef HAVE_PAM gboolean pass = TRUE; #else int rc = 0; gboolean pass = FALSE; const void *p_user = NULL; struct pam_conv p_conv; struct pam_handle *pam_h = NULL; static const char *pam_name = NULL; if (pam_name == NULL) { pam_name = getenv("CIB_pam_service"); } if (pam_name == NULL) { pam_name = "login"; } p_conv.conv = construct_pam_passwd; p_conv.appdata_ptr = strdup(passwd); rc = pam_start(pam_name, user, &p_conv, &pam_h); if (rc != PAM_SUCCESS) { crm_err("Could not initialize PAM: %s (%d)", pam_strerror(pam_h, rc), rc); goto bail; } rc = pam_authenticate(pam_h, 0); if (rc != PAM_SUCCESS) { crm_err("Authentication failed for %s: %s (%d)", user, pam_strerror(pam_h, rc), rc); goto bail; } /* Make sure we authenticated the user we wanted to authenticate. * Since we also run as non-root, it might be worth pre-checking * the user has the same EID as us, since that the only user we * can authenticate. */ rc = pam_get_item(pam_h, PAM_USER, &p_user); if (rc != PAM_SUCCESS) { crm_err("Internal PAM error: %s (%d)", pam_strerror(pam_h, rc), rc); goto bail; } else if (p_user == NULL) { crm_err("Unknown user authenticated."); goto bail; } else if (safe_str_neq(p_user, user)) { crm_err("User mismatch: %s vs. %s.", (const char *)p_user, (const char *)user); goto bail; } rc = pam_acct_mgmt(pam_h, 0); if (rc != PAM_SUCCESS) { crm_err("Access denied: %s (%d)", pam_strerror(pam_h, rc), rc); goto bail; } pass = TRUE; bail: rc = pam_end(pam_h, rc); #endif return pass; } diff --git a/include/crm_internal.h b/include/crm_internal.h index cf6d95d373..388af5931f 100644 --- a/include/crm_internal.h +++ b/include/crm_internal.h @@ -1,258 +1,290 @@ /* crm_internal.h */ /* * Copyright (C) 2006 - 2008 * Andrew Beekhof * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #ifndef CRM_INTERNAL__H # define CRM_INTERNAL__H # include # include # include # include # include # include # include /* Dynamic loading of libraries */ void *find_library_function(void **handle, const char *lib, const char *fn, int fatal); void *convert_const_pointer(const void *ptr); /* For ACLs */ char *uid2username(uid_t uid); void determine_request_user(char *user, xmlNode * request, const char *field); # if ENABLE_ACL # include static inline gboolean is_privileged(const char *user) { if (user == NULL) { return FALSE; } else if (strcmp(user, CRM_DAEMON_USER) == 0) { return TRUE; } else if (strcmp(user, "root") == 0) { return TRUE; } return FALSE; } # endif /* CLI option processing*/ # ifdef HAVE_GETOPT_H # include # else # define no_argument 0 # define required_argument 1 # endif # define pcmk_option_default 0x00000 # define pcmk_option_hidden 0x00001 # define pcmk_option_paragraph 0x00002 # define pcmk_option_example 0x00004 struct crm_option { /* Fields from 'struct option' in getopt.h */ /* name of long option */ const char *name; /* * one of no_argument, required_argument, and optional_argument: * whether option takes an argument */ int has_arg; /* if not NULL, set *flag to val when option found */ int *flag; /* if flag not NULL, value to set *flag to; else return value */ int val; /* Custom fields */ const char *desc; long flags; }; void crm_set_options(const char *short_options, const char *usage, struct crm_option *long_options, const char *app_desc); int crm_get_option(int argc, char **argv, int *index); int crm_get_option_long(int argc, char **argv, int *index, const char **longname); void crm_help(char cmd, int exit_code); /* Cluster Option Processing */ typedef struct pe_cluster_option_s { const char *name; const char *alt_name; const char *type; const char *values; const char *default_value; gboolean(*is_valid) (const char *); const char *description_short; const char *description_long; } pe_cluster_option; const char *cluster_option(GHashTable * options, gboolean(*validate) (const char *), const char *name, const char *old_name, const char *def_value); const char *get_cluster_pref(GHashTable * options, pe_cluster_option * option_list, int len, const char *name); void config_metadata(const char *name, const char *version, const char *desc_short, const char *desc_long, pe_cluster_option * option_list, int len); void verify_all_options(GHashTable * options, pe_cluster_option * option_list, int len); gboolean check_time(const char *value); gboolean check_timer(const char *value); gboolean check_boolean(const char *value); gboolean check_number(const char *value); /* Shared PE/crmd functionality */ void filter_action_parameters(xmlNode * param_set, const char *version); void filter_reload_parameters(xmlNode * param_set, const char *restart_string); /* Resource operation updates */ xmlNode *create_operation_update(xmlNode * parent, lrmd_event_data_t *event, const char *caller_version, int target_rc, const char *origin, int level); /* char2score */ extern int node_score_red; extern int node_score_green; extern int node_score_yellow; extern int node_score_infinity; /* Assorted convenience functions */ static inline int crm_strlen_zero(const char *s) { return !s || *s == '\0'; } char *add_list_element(char *list, const char *value); char *generate_series_filename(const char *directory, const char *series, int sequence, gboolean bzip); int get_last_sequence(const char *directory, const char *series); void write_last_sequence(const char *directory, const char *series, int sequence, int max); int crm_pid_active(long pid); void crm_make_daemon(const char *name, gboolean daemonize, const char *pidfile); gboolean crm_is_writable(const char *dir, const char *file, const char *user, const char *group, gboolean need_both); char *generate_op_key(const char *rsc_id, const char *op_type, int interval); char *generate_notify_key(const char *rsc_id, const char *notify_type, const char *op_type); char *generate_transition_magic_v202(const char *transition_key, int op_status); char *generate_transition_magic(const char *transition_key, int op_status, int op_rc); char *generate_transition_key(int action, int transition_id, int target_rc, const char *node); static inline long long crm_clear_bit(const char *function, const char *target, long long word, long long bit) { long long rc = (word & ~bit); if(rc == word) { /* Unchanged */ } else if (target) { crm_trace("Bit 0x%.8llx for %s cleared by %s", bit, target, function); } else { crm_trace("Bit 0x%.8llx cleared by %s", bit, function); } return rc; } static inline long long crm_set_bit(const char *function, const char *target, long long word, long long bit) { long long rc = (word|bit); if(rc == word) { /* Unchanged */ } else if (target) { crm_trace("Bit 0x%.8llx for %s set by %s", bit, target, function); } else { crm_trace("Bit 0x%.8llx set by %s", bit, function); } return rc; } # define set_bit(word, bit) word = crm_set_bit(__PRETTY_FUNCTION__, NULL, word, bit) # define clear_bit(word, bit) word = crm_clear_bit(__PRETTY_FUNCTION__, NULL, word, bit) void g_hash_destroy_str(gpointer data); long long crm_int_helper(const char *text, char **end_text); char *crm_concat(const char *prefix, const char *suffix, char join); char *generate_hash_key(const char *crm_msg_reference, const char *sys); -xmlNode *crm_recv_remote_msg(void *session, gboolean encrypted); -void crm_send_remote_msg(void *session, xmlNode * msg, gboolean encrypted); + + +/*! remote tcp/tls helper functions */ +gboolean crm_recv_remote_msg(void *session, char **recv_buf, gboolean encrypted, int total_timeout_ms, int *disconnected); +char *crm_recv_remote_raw(void *data, gboolean encrypted, size_t max_recv, size_t *recv_len, int *disconnected); +int crm_send_remote_msg(void *session, xmlNode * msg, gboolean encrypted); +int crm_recv_remote_ready(void *session, gboolean encrypted, int timeout_ms); +xmlNode *crm_parse_remote_buffer(char **msg_buf); +int crm_remote_tcp_connect(const char *host, int port); + +#ifdef HAVE_GNUTLS_GNUTLS_H +/*! + * \internal + * \brief Initiate the client handshake after establishing the tcp socket. + * \note This is a blocking function, it will block until the entire handshake + * is complete or until the timeout period is reached. + * \retval 0 success + * \retval negative, failure + */ +int crm_initiate_client_tls_handshake(void *session_data, int timeout_ms); +/*! + * \internal + * \brief Create client or server session for anon DH encryption credentials + * \param sock, the socket the session will use for transport + * \param type, GNUTLS_SERVER or GNUTLS_CLIENT + * \param credentials, gnutls_anon_server_credentials_t or gnutls_anon_client_credentials_t + * + * \retval gnutls_session * on success + * \retval NULL on failure + */ +void *crm_create_anon_tls_session(int sock, int type, void *credentials); +#endif + +#define REMOTE_MSG_TERMINATOR "\r\n\r\n" const char *daemon_option(const char *option); void set_daemon_option(const char *option, const char *value); gboolean daemon_option_enabled(const char *daemon, const char *option); void strip_text_nodes(xmlNode *xml); # define crm_config_err(fmt...) { crm_config_error = TRUE; crm_err(fmt); } # define crm_config_warn(fmt...) { crm_config_warning = TRUE; crm_warn(fmt); } # define attrd_channel T_ATTRD # define F_ATTRD_KEY "attr_key" # define F_ATTRD_ATTRIBUTE "attr_name" # define F_ATTRD_TASK "task" # define F_ATTRD_VALUE "attr_value" # define F_ATTRD_SET "attr_set" # define F_ATTRD_SECTION "attr_section" # define F_ATTRD_DAMPEN "attr_dampening" # define F_ATTRD_IGNORE_LOCALLY "attr_ignore_locally" # define F_ATTRD_HOST "attr_host" # define F_ATTRD_USER "attr_user" # if SUPPORT_COROSYNC # if CS_USES_LIBQB # include # include typedef struct qb_ipc_request_header cs_ipc_header_request_t; typedef struct qb_ipc_response_header cs_ipc_header_response_t; # else # include # include # include static inline int qb_to_cs_error(int a) { return a; } typedef coroipc_request_header_t cs_ipc_header_request_t; typedef coroipc_response_header_t cs_ipc_header_response_t; # endif # else typedef struct { int size __attribute__ ((aligned(8))); int id __attribute__ ((aligned(8))); } __attribute__ ((aligned(8))) cs_ipc_header_request_t; typedef struct { int size __attribute__ ((aligned(8))); int id __attribute__ ((aligned(8))); int error __attribute__ ((aligned(8))); } __attribute__ ((aligned(8))) cs_ipc_header_response_t; # endif #endif /* CRM_INTERNAL__H */ diff --git a/lib/cib/cib_remote.c b/lib/cib/cib_remote.c index 91bca96eb3..ca80c0e077 100644 --- a/lib/cib/cib_remote.c +++ b/lib/cib/cib_remote.c @@ -1,641 +1,629 @@ /* * Copyright (c) 2008 Andrew Beekhof * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA * */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #ifdef HAVE_GNUTLS_GNUTLS_H # undef KEYFILE # include -extern gnutls_anon_client_credentials anon_cred_c; -extern gnutls_session *create_tls_session(int csock, int type); +gnutls_anon_client_credentials anon_cred_c; +#define DEFAULT_CLIENT_HANDSHAKE_TIMEOUT 5000 /* 5 seconds */ const int kx_prio[] = { GNUTLS_KX_ANON_DH, 0 }; +static gboolean remote_gnutls_credentials_init = FALSE; #else typedef void gnutls_session; #endif #include #include #define DH_BITS 1024 struct remote_connection_s { int socket; gboolean encrypted; gnutls_session *session; mainloop_io_t *source; char *token; + char *recv_buf; }; typedef struct cib_remote_opaque_s { int flags; int socket; int port; char *server; char *user; char *passwd; struct remote_connection_s command; struct remote_connection_s callback; } cib_remote_opaque_t; void cib_remote_connection_destroy(gpointer user_data); -int cib_remote_dispatch(gpointer user_data); +int cib_remote_callback_dispatch(gpointer user_data); +int cib_remote_command_dispatch(gpointer user_data); int cib_remote_signon(cib_t * cib, const char *name, enum cib_conn_type type); int cib_remote_signoff(cib_t * cib); int cib_remote_free(cib_t * cib); int cib_remote_perform_op(cib_t * cib, const char *op, const char *host, const char *section, xmlNode * data, xmlNode ** output_data, int call_options, const char *name); static int cib_remote_inputfd(cib_t * cib) { cib_remote_opaque_t *private = cib->variant_opaque; return private->callback.socket; } static int cib_remote_set_connection_dnotify(cib_t * cib, void (*dnotify) (gpointer user_data)) { return -EPROTONOSUPPORT; } static int cib_remote_register_notification(cib_t * cib, const char *callback, int enabled) { xmlNode *notify_msg = create_xml_node(NULL, "cib_command"); cib_remote_opaque_t *private = cib->variant_opaque; crm_xml_add(notify_msg, F_CIB_OPERATION, T_CIB_NOTIFY); crm_xml_add(notify_msg, F_CIB_NOTIFY_TYPE, callback); crm_xml_add_int(notify_msg, F_CIB_NOTIFY_ACTIVATE, enabled); crm_send_remote_msg(private->callback.session, notify_msg, private->callback.encrypted); free_xml(notify_msg); return pcmk_ok; } cib_t * cib_remote_new(const char *server, const char *user, const char *passwd, int port, gboolean encrypted) { cib_remote_opaque_t *private = NULL; cib_t *cib = cib_new_variant(); private = calloc(1, sizeof(cib_remote_opaque_t)); cib->variant = cib_remote; cib->variant_opaque = private; if (server) { private->server = strdup(server); } if (user) { private->user = strdup(user); } if (passwd) { private->passwd = strdup(passwd); } private->port = port; private->command.encrypted = encrypted; private->callback.encrypted = encrypted; /* assign variant specific ops */ cib->delegate_fn = cib_remote_perform_op; cib->cmds->signon = cib_remote_signon; cib->cmds->signoff = cib_remote_signoff; cib->cmds->free = cib_remote_free; cib->cmds->inputfd = cib_remote_inputfd; cib->cmds->register_notification = cib_remote_register_notification; cib->cmds->set_connection_dnotify = cib_remote_set_connection_dnotify; return cib; } static int cib_tls_close(cib_t * cib) { cib_remote_opaque_t *private = cib->variant_opaque; - shutdown(private->command.socket, SHUT_RDWR); /* no more receptions */ - shutdown(private->callback.socket, SHUT_RDWR); /* no more receptions */ - close(private->command.socket); - close(private->callback.socket); - #ifdef HAVE_GNUTLS_GNUTLS_H if (private->command.encrypted) { - gnutls_bye(*(private->command.session), GNUTLS_SHUT_RDWR); - gnutls_deinit(*(private->command.session)); - gnutls_free(private->command.session); - - gnutls_bye(*(private->callback.session), GNUTLS_SHUT_RDWR); - gnutls_deinit(*(private->callback.session)); - gnutls_free(private->callback.session); + if (private->command.session) { + gnutls_bye(*(private->command.session), GNUTLS_SHUT_RDWR); + gnutls_deinit(*(private->command.session)); + gnutls_free(private->command.session); + } - gnutls_anon_free_client_credentials(anon_cred_c); - gnutls_global_deinit(); + if (private->callback.session) { + gnutls_bye(*(private->callback.session), GNUTLS_SHUT_RDWR); + gnutls_deinit(*(private->callback.session)); + gnutls_free(private->callback.session); + } + private->command.session = NULL; + private->callback.session = NULL; + if (remote_gnutls_credentials_init) { + gnutls_anon_free_client_credentials(anon_cred_c); + gnutls_global_deinit(); + remote_gnutls_credentials_init = FALSE; + } } #endif + + if (private->command.socket) { + shutdown(private->command.socket, SHUT_RDWR); /* no more receptions */ + close(private->command.socket); + } + if (private->callback.socket) { + shutdown(private->callback.socket, SHUT_RDWR); /* no more receptions */ + close(private->callback.socket); + } + private->command.socket = 0; + private->callback.socket = 0; + + free(private->command.recv_buf); + free(private->callback.recv_buf); + private->command.recv_buf = NULL; + private->callback.recv_buf = NULL; + return 0; } static int -cib_tls_signon(cib_t * cib, struct remote_connection_s *connection) +cib_tls_signon(cib_t * cib, struct remote_connection_s *connection, gboolean event_channel) { int sock; cib_remote_opaque_t *private = cib->variant_opaque; - struct sockaddr_in addr; int rc = 0; - char *server = private->server; - - int ret_ga; - struct addrinfo *res; - struct addrinfo hints; + int disconnected = 0; xmlNode *answer = NULL; xmlNode *login = NULL; - static struct mainloop_fd_callbacks cib_fd_callbacks = - { - .dispatch = cib_remote_dispatch, - .destroy = cib_remote_connection_destroy, - }; + static struct mainloop_fd_callbacks cib_fd_callbacks = { 0, }; + + cib_fd_callbacks.dispatch = event_channel ? cib_remote_callback_dispatch : cib_remote_command_dispatch; + cib_fd_callbacks.destroy = cib_remote_connection_destroy; connection->socket = 0; connection->session = NULL; - /* create socket */ - sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); - if (sock == -1) { - crm_perror(LOG_ERR, "Socket creation failed"); - return -1; - } - - /* getaddrinfo */ - bzero(&hints, sizeof(struct addrinfo)); - hints.ai_flags = AI_CANONNAME; - hints.ai_family = AF_INET; - hints.ai_socktype = SOCK_RAW; - - if (hints.ai_family == AF_INET6) { - hints.ai_protocol = IPPROTO_ICMPV6; - } else { - hints.ai_protocol = IPPROTO_ICMP; - } - - crm_debug("Looking up %s", server); - ret_ga = getaddrinfo(server, NULL, &hints, &res); - if (ret_ga) { - crm_err("getaddrinfo: %s", gai_strerror(ret_ga)); - close(sock); - return -1; - } - - if (res->ai_canonname) { - server = res->ai_canonname; - } - - crm_debug("Got address %s for %s", server, private->server); - - if (!res->ai_addr) { - fprintf(stderr, "getaddrinfo failed"); - crm_exit(1); - } -#if 1 - memcpy(&addr, res->ai_addr, res->ai_addrlen); -#else - /* connect to server */ - memset(&addr, 0, sizeof(addr)); - addr.sin_family = AF_INET; - addr.sin_addr.s_addr = inet_addr(server); -#endif - addr.sin_port = htons(private->port); - - if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) == -1) { - crm_perror(LOG_ERR, "Connection to %s:%d failed", server, private->port); - close(sock); - return -1; + sock = crm_remote_tcp_connect(private->server, private->port); + if (sock <= 0) { + crm_perror(LOG_ERR, "remote tcp connection to %s:%d failed", private->server, private->port); } + connection->socket = sock; if (connection->encrypted) { /* initialize GnuTls lib */ #ifdef HAVE_GNUTLS_GNUTLS_H - gnutls_global_init(); - gnutls_anon_allocate_client_credentials(&anon_cred_c); + if (remote_gnutls_credentials_init == FALSE) { + gnutls_global_init(); + gnutls_anon_allocate_client_credentials(&anon_cred_c); + remote_gnutls_credentials_init = TRUE; + } /* bind the socket to GnuTls lib */ - connection->session = create_tls_session(sock, GNUTLS_CLIENT); - if (connection->session == NULL) { - crm_perror(LOG_ERR, "Session creation for %s:%d failed", server, private->port); - close(sock); + connection->session = crm_create_anon_tls_session(sock, GNUTLS_CLIENT, anon_cred_c); + + if (crm_initiate_client_tls_handshake(connection->session, DEFAULT_CLIENT_HANDSHAKE_TIMEOUT) != 0) { + crm_err("Session creation for %s:%d failed", private->server, private->port); + + gnutls_deinit(*connection->session); + gnutls_free(connection->session); + connection->session = NULL; cib_tls_close(cib); return -1; } #else return -EPROTONOSUPPORT; #endif } else { connection->session = GUINT_TO_POINTER(sock); } /* login to server */ login = create_xml_node(NULL, "cib_command"); crm_xml_add(login, "op", "authenticate"); crm_xml_add(login, "user", private->user); crm_xml_add(login, "password", private->passwd); crm_xml_add(login, "hidden", "password"); crm_send_remote_msg(connection->session, login, connection->encrypted); free_xml(login); - answer = crm_recv_remote_msg(connection->session, connection->encrypted); + crm_recv_remote_msg(connection->session, &connection->recv_buf, connection->encrypted, -1, &disconnected); + + if (disconnected) { + rc = -ENOTCONN; + } + + answer = crm_parse_remote_buffer(&connection->recv_buf); + crm_log_xml_trace(answer, "Reply"); if (answer == NULL) { rc = -EPROTO; } else { /* grab the token */ const char *msg_type = crm_element_value(answer, F_CIB_OPERATION); const char *tmp_ticket = crm_element_value(answer, F_CIB_CLIENTID); if (safe_str_neq(msg_type, CRM_OP_REGISTER)) { crm_err("Invalid registration message: %s", msg_type); rc = -EPROTO; } else if (tmp_ticket == NULL) { rc = -EPROTO; } else { connection->token = strdup(tmp_ticket); } } + free_xml(answer); + answer = NULL; if (rc != 0) { cib_tls_close(cib); + return rc; } - connection->socket = sock; + crm_trace("remote client connection established"); connection->source = mainloop_add_fd("cib-remote", G_PRIORITY_HIGH, connection->socket, cib, &cib_fd_callbacks); return rc; } void cib_remote_connection_destroy(gpointer user_data) { crm_err("Connection destroyed"); #ifdef HAVE_GNUTLS_GNUTLS_H cib_tls_close(user_data); #endif return; } int -cib_remote_dispatch(gpointer user_data) +cib_remote_command_dispatch(gpointer user_data) +{ + int disconnected = 0; + cib_t *cib = user_data; + cib_remote_opaque_t *private = cib->variant_opaque; + + crm_recv_remote_msg(private->command.session, &private->command.recv_buf, private->command.encrypted, -1, &disconnected); + + free(private->command.recv_buf); + private->command.recv_buf = NULL; + crm_err("received late reply for remote cib connection, discarding"); + + if (disconnected) { + return -1; + } + return 0; +} + +int +cib_remote_callback_dispatch(gpointer user_data) { cib_t *cib = user_data; cib_remote_opaque_t *private = cib->variant_opaque; xmlNode *msg = NULL; - const char *type = NULL; + int disconnected = 0; crm_info("Message on callback channel"); - msg = crm_recv_remote_msg(private->callback.session, private->callback.encrypted); - type = crm_element_value(msg, F_TYPE); - crm_trace("Activating %s callbacks...", type); + crm_recv_remote_msg(private->callback.session, &private->callback.recv_buf, private->callback.encrypted, -1, &disconnected); - if (safe_str_eq(type, T_CIB)) { - cib_native_callback(cib, msg, 0, 0); + msg = crm_parse_remote_buffer(&private->callback.recv_buf); + while (msg) { + const char *type = crm_element_value(msg, F_TYPE); + crm_trace("Activating %s callbacks...", type); - } else if (safe_str_eq(type, T_CIB_NOTIFY)) { - g_list_foreach(cib->notify_list, cib_native_notify, msg); + if (safe_str_eq(type, T_CIB)) { + cib_native_callback(cib, msg, 0, 0); - } else { - crm_err("Unknown message type: %s", type); - } + } else if (safe_str_eq(type, T_CIB_NOTIFY)) { + g_list_foreach(cib->notify_list, cib_native_notify, msg); + + } else { + crm_err("Unknown message type: %s", type); + } - if (msg != NULL) { free_xml(msg); - return 0; + msg = crm_parse_remote_buffer(&private->callback.recv_buf); + } + + if (disconnected) { + return -1; } - return -1; + + return 0; } int cib_remote_signon(cib_t * cib, const char *name, enum cib_conn_type type) { int rc = pcmk_ok; cib_remote_opaque_t *private = cib->variant_opaque; if (private->passwd == NULL) { struct termios settings; int rc; rc = tcgetattr(0, &settings); settings.c_lflag &= ~ECHO; rc = tcsetattr(0, TCSANOW, &settings); fprintf(stderr, "Password: "); private->passwd = calloc(1, 1024); rc = scanf("%s", private->passwd); fprintf(stdout, "\n"); /* fprintf(stderr, "entered: '%s'\n", buffer); */ if (rc < 1) { private->passwd = NULL; } settings.c_lflag |= ECHO; rc = tcsetattr(0, TCSANOW, &settings); } if (private->server == NULL || private->user == NULL) { rc = -EINVAL; } if (rc == pcmk_ok) { - rc = cib_tls_signon(cib, &(private->command)); + rc = cib_tls_signon(cib, &(private->command), FALSE); } if (rc == pcmk_ok) { - rc = cib_tls_signon(cib, &(private->callback)); + rc = cib_tls_signon(cib, &(private->callback), TRUE); } if (rc == pcmk_ok) { xmlNode *hello = cib_create_op(0, private->callback.token, CRM_OP_REGISTER, NULL, NULL, NULL, 0, NULL); crm_xml_add(hello, F_CIB_CLIENTNAME, name); crm_send_remote_msg(private->command.session, hello, private->command.encrypted); free_xml(hello); } if (rc == pcmk_ok) { fprintf(stderr, "%s: Opened connection to %s:%d\n", name, private->server, private->port); cib->state = cib_connected_command; cib->type = cib_command; } else { fprintf(stderr, "%s: Connection to %s:%d failed: %s\n", name, private->server, private->port, pcmk_strerror(rc)); } return rc; } int cib_remote_signoff(cib_t * cib) { int rc = pcmk_ok; /* cib_remote_opaque_t *private = cib->variant_opaque; */ crm_debug("Signing out of the CIB Service"); #ifdef HAVE_GNUTLS_GNUTLS_H cib_tls_close(cib); #endif cib->state = cib_disconnected; cib->type = cib_none; return rc; } int cib_remote_free(cib_t * cib) { int rc = pcmk_ok; crm_warn("Freeing CIB"); if (cib->state != cib_disconnected) { rc = cib_remote_signoff(cib); if (rc == pcmk_ok) { cib_remote_opaque_t *private = cib->variant_opaque; free(private->server); free(private->user); free(private->passwd); free(cib->cmds); free(private); free(cib); } } return rc; } -static gboolean timer_expired = FALSE; -static struct timer_rec_s *sync_timer = NULL; -static gboolean -cib_timeout_handler(gpointer data) -{ - struct timer_rec_s *timer = data; - - timer_expired = TRUE; - crm_err("Call %d timed out after %ds", timer->call_id, timer->timeout); - - /* Always return TRUE, never remove the handler - * We do that after the while-loop in cib_native_perform_op() - */ - return TRUE; -} - int cib_remote_perform_op(cib_t * cib, const char *op, const char *host, const char *section, xmlNode * data, xmlNode ** output_data, int call_options, const char *name) { int rc = pcmk_ok; + int disconnected = 0; + int remaining_time = 0; + time_t start_time; xmlNode *op_msg = NULL; xmlNode *op_reply = NULL; cib_remote_opaque_t *private = cib->variant_opaque; - if (sync_timer == NULL) { - sync_timer = calloc(1, sizeof(struct timer_rec_s)); - } - if (cib->state == cib_disconnected) { return -ENOTCONN; } if (output_data != NULL) { *output_data = NULL; } if (op == NULL) { crm_err("No operation specified"); return -EINVAL; } cib->call_id++; /* prevent call_id from being negative (or zero) and conflicting * with the cib_errors enum * use 2 because we use it as (cib->call_id - 1) below */ if (cib->call_id < 1) { cib->call_id = 1; } op_msg = cib_create_op(cib->call_id, private->callback.token, op, host, section, data, call_options, NULL); if (op_msg == NULL) { return -EPROTO; } crm_trace("Sending %s message to CIB service", op); - crm_send_remote_msg(private->command.session, op_msg, private->command.encrypted); + if (!(call_options & cib_sync_call)) { + crm_send_remote_msg(private->callback.session, op_msg, private->command.encrypted); + } else { + crm_send_remote_msg(private->command.session, op_msg, private->command.encrypted); + } free_xml(op_msg); if ((call_options & cib_discard_reply)) { crm_trace("Discarding reply"); return pcmk_ok; } else if (!(call_options & cib_sync_call)) { return cib->call_id; } crm_trace("Waiting for a syncronous reply"); - if (cib->call_timeout > 0) { - /* We need this, even with msgfromIPC_timeout(), because we might - * get other/older replies that don't match the active request - */ - timer_expired = FALSE; - sync_timer->call_id = cib->call_id; - sync_timer->timeout = cib->call_timeout * 1000; - sync_timer->ref = g_timeout_add(sync_timer->timeout, cib_timeout_handler, sync_timer); - } + start_time = time(NULL); + remaining_time = cib->call_timeout ? cib->call_timeout : 60; - while (timer_expired == FALSE) { + while (remaining_time > 0 && !disconnected) { int reply_id = -1; int msg_id = cib->call_id; - op_reply = crm_recv_remote_msg(private->command.session, private->command.encrypted); - if (op_reply == NULL) { + crm_recv_remote_msg(private->command.session, &private->command.recv_buf, private->command.encrypted, remaining_time * 1000, &disconnected); + op_reply = crm_parse_remote_buffer(&private->command.recv_buf); + + if (!op_reply) { break; } crm_element_value_int(op_reply, F_CIB_CALLID, &reply_id); - CRM_CHECK(reply_id > 0, free_xml(op_reply); - if (sync_timer->ref > 0) { - g_source_remove(sync_timer->ref); sync_timer->ref = 0;} - return -ENOMSG) ; if (reply_id == msg_id) { break; } else if (reply_id < msg_id) { crm_debug("Received old reply: %d (wanted %d)", reply_id, msg_id); crm_log_xml_trace(op_reply, "Old reply"); } else if ((reply_id - 10000) > msg_id) { /* wrap-around case */ crm_debug("Received old reply: %d (wanted %d)", reply_id, msg_id); crm_log_xml_trace(op_reply, "Old reply"); } else { crm_err("Received a __future__ reply:" " %d (wanted %d)", reply_id, msg_id); } free_xml(op_reply); op_reply = NULL; - } - - if (sync_timer->ref > 0) { - g_source_remove(sync_timer->ref); - sync_timer->ref = 0; - } - if (timer_expired) { - return -ETIME; + /* wasn't the right reply, try and read some more */ + remaining_time = time(NULL) - start_time; } /* if(IPC_ISRCONN(native->command_channel) == FALSE) { */ /* crm_err("CIB disconnected: %d", */ /* native->command_channel->ch_status); */ /* cib->state = cib_disconnected; */ /* } */ - if (op_reply == NULL) { + if (disconnected) { + crm_err("Disconnected while waiting for reply."); + return -ENOTCONN; + } else if (op_reply == NULL) { crm_err("No reply message - empty"); return -ENOMSG; } crm_trace("Syncronous reply received"); /* Start processing the reply... */ if (crm_element_value_int(op_reply, F_CIB_RC, &rc) != 0) { rc = -EPROTO; } if (rc == -pcmk_err_diff_resync) { /* This is an internal value that clients do not and should not care about */ rc = pcmk_ok; } if (rc == pcmk_ok || rc == -EPERM) { crm_log_xml_debug(op_reply, "passed"); } else { /* } else if(rc == -ETIME) { */ crm_err("Call failed: %s", pcmk_strerror(rc)); crm_log_xml_warn(op_reply, "failed"); } if (output_data == NULL) { /* do nothing more */ } else if (!(call_options & cib_discard_reply)) { xmlNode *tmp = get_message_xml(op_reply, F_CIB_CALLDATA); if (tmp == NULL) { crm_trace("No output in reply to \"%s\" command %d", op, cib->call_id - 1); } else { *output_data = copy_xml(tmp); } } free_xml(op_reply); return rc; } diff --git a/lib/common/mainloop.c b/lib/common/mainloop.c index 09cf6e92b0..1e413b60f3 100644 --- a/lib/common/mainloop.c +++ b/lib/common/mainloop.c @@ -1,872 +1,873 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #ifndef _GNU_SOURCE # define _GNU_SOURCE #endif #include #include #include #include #include #include #include #include struct mainloop_child_s { pid_t pid; char *desc; unsigned timerid; unsigned watchid; gboolean timeout; void *privatedata; /* Called when a process dies */ void (*callback)(mainloop_child_t* p, int status, int signo, int exitcode); }; struct trigger_s { GSource source; gboolean running; gboolean trigger; void *user_data; guint id; }; static gboolean crm_trigger_prepare(GSource * source, gint * timeout) { crm_trigger_t *trig = (crm_trigger_t *) source; /* cluster-glue's FD and IPC related sources make use of * g_source_add_poll() but do not set a timeout in their prepare * functions * * This means mainloop's poll() will block until an event for one * of these sources occurs - any /other/ type of source, such as * this one or g_idle_*, that doesn't use g_source_add_poll() is * S-O-L and wont be processed until there is something fd-based * happens. * * Luckily the timeout we can set here affects all sources and * puts an upper limit on how long poll() can take. * * So unconditionally set a small-ish timeout, not too small that * we're in constant motion, which will act as an upper bound on * how long the signal handling might be delayed for. */ *timeout = 500; /* Timeout in ms */ return trig->trigger; } static gboolean crm_trigger_check(GSource * source) { crm_trigger_t *trig = (crm_trigger_t *) source; return trig->trigger; } static gboolean crm_trigger_dispatch(GSource * source, GSourceFunc callback, gpointer userdata) { int rc = TRUE; crm_trigger_t *trig = (crm_trigger_t *) source; if(trig->running) { /* Wait until the existing job is complete before starting the next one */ return TRUE; } trig->trigger = FALSE; if (callback) { rc = callback(trig->user_data); if(rc < 0) { crm_trace("Trigger handler %p not yet complete", trig); trig->running = TRUE; rc = TRUE; } } return rc; } static GSourceFuncs crm_trigger_funcs = { crm_trigger_prepare, crm_trigger_check, crm_trigger_dispatch, NULL }; static crm_trigger_t * mainloop_setup_trigger(GSource * source, int priority, int(*dispatch) (gpointer user_data), gpointer userdata) { crm_trigger_t *trigger = NULL; trigger = (crm_trigger_t *) source; trigger->id = 0; trigger->trigger = FALSE; trigger->user_data = userdata; if (dispatch) { g_source_set_callback(source, dispatch, trigger, NULL); } g_source_set_priority(source, priority); g_source_set_can_recurse(source, FALSE); trigger->id = g_source_attach(source, NULL); return trigger; } void mainloop_trigger_complete(crm_trigger_t *trig) { crm_trace("Trigger handler %p complete", trig); trig->running = FALSE; } /* If dispatch returns: * -1: Job running but not complete * 0: Remove the trigger from mainloop * 1: Leave the trigger in mainloop */ crm_trigger_t * mainloop_add_trigger(int priority, int(*dispatch) (gpointer user_data), gpointer userdata) { GSource *source = NULL; CRM_ASSERT(sizeof(crm_trigger_t) > sizeof(GSource)); source = g_source_new(&crm_trigger_funcs, sizeof(crm_trigger_t)); CRM_ASSERT(source != NULL); return mainloop_setup_trigger(source, priority, dispatch, userdata); } void mainloop_set_trigger(crm_trigger_t * source) { source->trigger = TRUE; } gboolean mainloop_destroy_trigger(crm_trigger_t * source) { source->trigger = FALSE; if (source->id > 0) { g_source_remove(source->id); + source->id = 0; } return TRUE; } typedef struct signal_s { crm_trigger_t trigger; /* must be first */ void (*handler) (int sig); int signal; } crm_signal_t; static crm_signal_t *crm_signals[NSIG]; static gboolean crm_signal_dispatch(GSource * source, GSourceFunc callback, gpointer userdata) { crm_signal_t *sig = (crm_signal_t *) source; crm_info("Invoking handler for signal %d: %s", sig->signal, strsignal(sig->signal)); sig->trigger.trigger = FALSE; if (sig->handler) { sig->handler(sig->signal); } return TRUE; } static void mainloop_signal_handler(int sig) { if (sig > 0 && sig < NSIG && crm_signals[sig] != NULL) { mainloop_set_trigger((crm_trigger_t *) crm_signals[sig]); } } static GSourceFuncs crm_signal_funcs = { crm_trigger_prepare, crm_trigger_check, crm_signal_dispatch, NULL }; gboolean crm_signal(int sig, void (*dispatch) (int sig)) { sigset_t mask; struct sigaction sa; struct sigaction old; if (sigemptyset(&mask) < 0) { crm_perror(LOG_ERR, "Call to sigemptyset failed"); return FALSE; } memset(&sa, 0, sizeof(struct sigaction)); sa.sa_handler = dispatch; sa.sa_flags = SA_RESTART; sa.sa_mask = mask; if (sigaction(sig, &sa, &old) < 0) { crm_perror(LOG_ERR, "Could not install signal handler for signal %d", sig); return FALSE; } return TRUE; } gboolean mainloop_add_signal(int sig, void (*dispatch) (int sig)) { GSource *source = NULL; int priority = G_PRIORITY_HIGH - 1; if (sig == SIGTERM) { /* TERM is higher priority than other signals, * signals are higher priority than other ipc. * Yes, minus: smaller is "higher" */ priority--; } if (sig >= NSIG || sig < 0) { crm_err("Signal %d is out of range", sig); return FALSE; } else if (crm_signals[sig] != NULL && crm_signals[sig]->handler == dispatch) { crm_trace("Signal handler for %d is already installed", sig); return TRUE; } else if (crm_signals[sig] != NULL) { crm_err("Different signal handler for %d is already installed", sig); return FALSE; } CRM_ASSERT(sizeof(crm_signal_t) > sizeof(GSource)); source = g_source_new(&crm_signal_funcs, sizeof(crm_signal_t)); crm_signals[sig] = (crm_signal_t *) mainloop_setup_trigger(source, priority, NULL, NULL); CRM_ASSERT(crm_signals[sig] != NULL); crm_signals[sig]->handler = dispatch; crm_signals[sig]->signal = sig; if (crm_signal(sig, mainloop_signal_handler) == FALSE) { crm_signal_t *tmp = crm_signals[sig]; crm_signals[sig] = NULL; mainloop_destroy_trigger((crm_trigger_t *) tmp); return FALSE; } #if 0 /* If we want signals to interrupt mainloop's poll(), instead of waiting for * the timeout, then we should call siginterrupt() below * * For now, just enforce a low timeout */ if (siginterrupt(sig, 1) < 0) { crm_perror(LOG_INFO, "Could not enable system call interruptions for signal %d", sig); } #endif return TRUE; } gboolean mainloop_destroy_signal(int sig) { crm_signal_t *tmp = NULL; if (sig >= NSIG || sig < 0) { crm_err("Signal %d is out of range", sig); return FALSE; } else if (crm_signal(sig, NULL) == FALSE) { crm_perror(LOG_ERR, "Could not uninstall signal handler for signal %d", sig); return FALSE; } else if (crm_signals[sig] == NULL) { return TRUE; } tmp = crm_signals[sig]; crm_signals[sig] = NULL; mainloop_destroy_trigger((crm_trigger_t *) tmp); return TRUE; } static qb_array_t *gio_map = NULL; /* * libqb... */ struct gio_to_qb_poll { int32_t is_used; GIOChannel *channel; guint source; int32_t events; void * data; qb_ipcs_dispatch_fn_t fn; enum qb_loop_priority p; }; static int gio_adapter_refcount(struct gio_to_qb_poll *adaptor) { /* This is evil * Looking at the giochannel header file, ref_count is the first member of channel * So cheat... */ if(adaptor && adaptor->channel) { int *ref = (void*)adaptor->channel; return *ref; } return 0; } static gboolean gio_read_socket (GIOChannel *gio, GIOCondition condition, gpointer data) { struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data; gint fd = g_io_channel_unix_get_fd(gio); crm_trace("%p.%d %d (ref=%d)", data, fd, condition, gio_adapter_refcount(adaptor)); if(condition & G_IO_NVAL) { crm_trace("Marking failed adaptor %p unused", adaptor); adaptor->is_used = QB_FALSE; } return (adaptor->fn(fd, condition, adaptor->data) == 0); } static void gio_poll_destroy(gpointer data) { /* adaptor->source is valid but about to be destroyed (ref_count == 0) in gmain.c * adaptor->channel will still have ref_count > 0... should be == 1 */ struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data; crm_trace("Destroying adaptor %p channel %p (ref=%d)", adaptor, adaptor->channel, gio_adapter_refcount(adaptor)); adaptor->is_used = QB_FALSE; adaptor->channel = NULL; adaptor->source = 0; } static int32_t gio_poll_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts, void *data, qb_ipcs_dispatch_fn_t fn) { struct gio_to_qb_poll *adaptor; GIOChannel *channel; int32_t res = 0; res = qb_array_index(gio_map, fd, (void**)&adaptor); if (res < 0) { crm_err("Array lookup failed for fd=%d: %d", fd, res); return res; } crm_trace("Adding fd=%d to mainloop as adapater %p", fd, adaptor); if (adaptor->is_used) { crm_err("Adapter for descriptor %d is still in-use", fd); return -EEXIST; } /* channel is created with ref_count = 1 */ channel = g_io_channel_unix_new(fd); if (!channel) { crm_err("No memory left to add fd=%d", fd); return -ENOMEM; } /* Because unlike the poll() API, glib doesn't tell us about HUPs by default */ evts |= (G_IO_HUP|G_IO_NVAL|G_IO_ERR); adaptor->channel = channel; adaptor->fn = fn; adaptor->events = evts; adaptor->data = data; adaptor->p = p; adaptor->is_used = QB_TRUE; adaptor->source = g_io_add_watch_full(channel, G_PRIORITY_DEFAULT, evts, gio_read_socket, adaptor, gio_poll_destroy); /* Now that mainloop now holds a reference to adaptor->channel, * thanks to g_io_add_watch_full(), drop ours from g_io_channel_unix_new(). * * This means that adaptor->channel will be free'd by: * g_main_context_dispatch() * -> g_source_destroy_internal() * -> g_source_callback_unref() * shortly after gio_poll_destroy() completes */ g_io_channel_unref(adaptor->channel); crm_trace("Added to mainloop with gsource id=%d, ref=%d", adaptor->source, gio_adapter_refcount(adaptor)); if(adaptor->source > 0) { return 0; } return -EINVAL; } static int32_t gio_poll_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t evts, void *data, qb_ipcs_dispatch_fn_t fn) { return 0; } static int32_t gio_poll_dispatch_del(int32_t fd) { struct gio_to_qb_poll *adaptor; crm_trace("Looking for fd=%d", fd); if (qb_array_index(gio_map, fd, (void**)&adaptor) == 0) { crm_trace("Marking adaptor %p unused (ref=%d)", adaptor, gio_adapter_refcount(adaptor)); adaptor->is_used = QB_FALSE; } return 0; } struct qb_ipcs_poll_handlers gio_poll_funcs = { .job_add = NULL, .dispatch_add = gio_poll_dispatch_add, .dispatch_mod = gio_poll_dispatch_mod, .dispatch_del = gio_poll_dispatch_del, }; static enum qb_ipc_type pick_ipc_type(enum qb_ipc_type requested) { const char *env = getenv("PCMK_ipc_type"); if(env && strcmp("shared-mem", env) == 0) { return QB_IPC_SHM; } else if(env && strcmp("socket", env) == 0) { return QB_IPC_SOCKET; } else if(env && strcmp("posix", env) == 0) { return QB_IPC_POSIX_MQ; } else if(env && strcmp("sysv", env) == 0) { return QB_IPC_SYSV_MQ; } else if(requested == QB_IPC_NATIVE) { /* We prefer sockets actually */ return QB_IPC_SOCKET; } return requested; } qb_ipcs_service_t *mainloop_add_ipc_server( const char *name, enum qb_ipc_type type, struct qb_ipcs_service_handlers *callbacks) { int rc = 0; qb_ipcs_service_t* server = NULL; if(gio_map == NULL) { gio_map = qb_array_create_2(64, sizeof(struct gio_to_qb_poll), 1); } server = qb_ipcs_create(name, 0, pick_ipc_type(type), callbacks); qb_ipcs_poll_handlers_set(server, &gio_poll_funcs); rc = qb_ipcs_run(server); if (rc < 0) { crm_err("Could not start %s IPC server: %s (%d)", name, pcmk_strerror(rc), rc); return NULL; } return server; } void mainloop_del_ipc_server(qb_ipcs_service_t *server) { if(server) { qb_ipcs_destroy(server); } } struct mainloop_io_s { char *name; void *userdata; guint source; crm_ipc_t *ipc; GIOChannel *channel; int (*dispatch_fn_ipc)(const char *buffer, ssize_t length, gpointer userdata); int (*dispatch_fn_io) (gpointer userdata); void (*destroy_fn) (gpointer userdata); }; static int mainloop_gio_refcount(mainloop_io_t *client) { /* This is evil * Looking at the giochannel header file, ref_count is the first member of channel * So cheat... */ if(client && client->channel) { int *ref = (void*)client->channel; return *ref; } return 0; } static gboolean mainloop_gio_callback(GIOChannel *gio, GIOCondition condition, gpointer data) { gboolean keep = TRUE; mainloop_io_t *client = data; if(condition & G_IO_IN) { if(client->ipc) { long rc = 0; int max = 10; do { rc = crm_ipc_read(client->ipc); if(rc <= 0) { crm_trace("Message acquisition from %s[%p] failed: %s (%ld)", client->name, client, pcmk_strerror(rc), rc); } else if(client->dispatch_fn_ipc) { const char *buffer = crm_ipc_buffer(client->ipc); crm_trace("New message from %s[%p] = %d", client->name, client, rc, condition); if(client->dispatch_fn_ipc(buffer, rc, client->userdata) < 0) { crm_trace("Connection to %s no longer required", client->name); keep = FALSE; } } } while(keep && rc > 0 && --max > 0); } else { crm_trace("New message from %s[%p]", client->name, client); if(client->dispatch_fn_io) { if(client->dispatch_fn_io(client->userdata) < 0) { crm_trace("Connection to %s no longer required", client->name); keep = FALSE; } } } } if(client->ipc && crm_ipc_connected(client->ipc) == FALSE) { crm_err("Connection to %s[%p] closed (I/O condition=%d)", client->name, client, condition); keep = FALSE; } else if(condition & (G_IO_HUP|G_IO_NVAL|G_IO_ERR)) { crm_trace("The connection %s[%p] has been closed (I/O condition=%d, refcount=%d)", client->name, client, condition, mainloop_gio_refcount(client)); keep = FALSE; } else if((condition & G_IO_IN) == 0) { /* #define GLIB_SYSDEF_POLLIN =1 #define GLIB_SYSDEF_POLLPRI =2 #define GLIB_SYSDEF_POLLOUT =4 #define GLIB_SYSDEF_POLLERR =8 #define GLIB_SYSDEF_POLLHUP =16 #define GLIB_SYSDEF_POLLNVAL =32 typedef enum { G_IO_IN GLIB_SYSDEF_POLLIN, G_IO_OUT GLIB_SYSDEF_POLLOUT, G_IO_PRI GLIB_SYSDEF_POLLPRI, G_IO_ERR GLIB_SYSDEF_POLLERR, G_IO_HUP GLIB_SYSDEF_POLLHUP, G_IO_NVAL GLIB_SYSDEF_POLLNVAL } GIOCondition; A bitwise combination representing a condition to watch for on an event source. G_IO_IN There is data to read. G_IO_OUT Data can be written (without blocking). G_IO_PRI There is urgent data to read. G_IO_ERR Error condition. G_IO_HUP Hung up (the connection has been broken, usually for pipes and sockets). G_IO_NVAL Invalid request. The file descriptor is not open. */ crm_err("Strange condition: %d", condition); } /* keep == FALSE results in mainloop_gio_destroy() being called * just before the source is removed from mainloop */ return keep; } static void mainloop_gio_destroy(gpointer c) { mainloop_io_t *client = c; /* client->source is valid but about to be destroyed (ref_count == 0) in gmain.c * client->channel will still have ref_count > 0... should be == 1 */ crm_trace("Destroying client %s[%p] %d", client->name, c, mainloop_gio_refcount(client)); if(client->ipc) { crm_ipc_close(client->ipc); } if(client->destroy_fn) { client->destroy_fn(client->userdata); } if(client->ipc) { crm_ipc_destroy(client->ipc); } crm_trace("Destroyed client %s[%p] %d", client->name, c, mainloop_gio_refcount(client)); free(client->name); memset(client, 0, sizeof(mainloop_io_t)); /* A bit of pointless paranoia */ free(client); } mainloop_io_t * mainloop_add_ipc_client( const char *name, int priority, size_t max_size, void *userdata, struct ipc_client_callbacks *callbacks) { mainloop_io_t *client = NULL; crm_ipc_t *conn = crm_ipc_new(name, max_size); if(conn && crm_ipc_connect(conn)) { int32_t fd = crm_ipc_get_fd(conn); client = mainloop_add_fd(name, priority, fd, userdata, NULL); client->ipc = conn; client->destroy_fn = callbacks->destroy; client->dispatch_fn_ipc = callbacks->dispatch; } if(conn && client == NULL) { crm_trace("Connection to %s failed", name); crm_ipc_close(conn); crm_ipc_destroy(conn); } return client; } void mainloop_del_ipc_client(mainloop_io_t *client) { mainloop_del_fd(client); } crm_ipc_t * mainloop_get_ipc_client(mainloop_io_t *client) { if(client) { return client->ipc; } return NULL; } mainloop_io_t * mainloop_add_fd( const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks) { mainloop_io_t *client = NULL; if(fd > 0) { client = calloc(1, sizeof(mainloop_io_t)); client->name = strdup(name); client->userdata = userdata; if(callbacks) { client->destroy_fn = callbacks->destroy; client->dispatch_fn_io = callbacks->dispatch; } client->channel = g_io_channel_unix_new(fd); client->source = g_io_add_watch_full( client->channel, priority, (G_IO_IN|G_IO_HUP|G_IO_NVAL|G_IO_ERR), mainloop_gio_callback, client, mainloop_gio_destroy); /* Now that mainloop now holds a reference to adaptor->channel, * thanks to g_io_add_watch_full(), drop ours from g_io_channel_unix_new(). * * This means that adaptor->channel will be free'd by: * g_main_context_dispatch() or g_source_remove() * -> g_source_destroy_internal() * -> g_source_callback_unref() * shortly after mainloop_gio_destroy() completes */ g_io_channel_unref(client->channel); crm_trace("Added connection %d for %s[%p].%d %d", client->source, client->name, client, fd, mainloop_gio_refcount(client)); } return client; } void mainloop_del_fd(mainloop_io_t *client) { if(client != NULL) { crm_trace("Removing client %s[%p] %d", client->name, client, mainloop_gio_refcount(client)); if (client->source) { /* Results in mainloop_gio_destroy() being called just * before the source is removed from mainloop */ g_source_remove(client->source); } } } pid_t mainloop_get_child_pid(mainloop_child_t *child) { return child->pid; } int mainloop_get_child_timeout(mainloop_child_t *child) { return child->timeout; } void * mainloop_get_child_userdata(mainloop_child_t *child) { return child->privatedata; } void mainloop_clear_child_userdata(mainloop_child_t *child) { child->privatedata = NULL; } static gboolean child_timeout_callback(gpointer p) { mainloop_child_t *child = p; child->timerid = 0; if (child->timeout) { crm_crit("%s process (PID %d) will not die!", child->desc, (int)child->pid); return FALSE; } child->timeout = TRUE; crm_warn("%s process (PID %d) timed out", child->desc, (int)child->pid); if (kill(child->pid, SIGKILL) < 0) { if (errno == ESRCH) { /* Nothing left to do */ return FALSE; } crm_perror(LOG_ERR, "kill(%d, KILL) failed", child->pid); } child->timerid = g_timeout_add(5000, child_timeout_callback, child); return FALSE; } static void mainloop_child_destroy(mainloop_child_t *child) { if (child->timerid != 0) { crm_trace("Removing timer %d", child->timerid); g_source_remove(child->timerid); child->timerid = 0; } free(child->desc); g_free(child); } static void child_death_dispatch(GPid pid, gint status, gpointer user_data) { int signo = 0; int exitcode = 0; mainloop_child_t *child = user_data; crm_trace("Managed process %d exited: %p", pid, child); if (WIFEXITED(status)) { exitcode = WEXITSTATUS(status); crm_trace("Managed process %d (%s) exited with rc=%d", pid, child->desc, exitcode); } else if (WIFSIGNALED(status)) { signo = WTERMSIG(status); crm_trace("Managed process %d (%s) exited with signal=%d", pid, child->desc, signo); } #ifdef WCOREDUMP if (WCOREDUMP(status)) { crm_err("Managed process %d (%s) dumped core", pid, child->desc); } #endif if (child->callback) { child->callback(child, status, signo, exitcode); } crm_trace("Removed process entry for %d", pid); mainloop_child_destroy(child); return; } /* Create/Log a new tracked process * To track a process group, use -pid */ void mainloop_add_child(pid_t pid, int timeout, const char *desc, void * privatedata, void (*callback)(mainloop_child_t *p, int status, int signo, int exitcode)) { mainloop_child_t *child = g_new(mainloop_child_t, 1); child->pid = pid; child->timerid = 0; child->timeout = FALSE; child->desc = strdup(desc); child->privatedata = privatedata; child->callback = callback; if (timeout) { child->timerid = g_timeout_add( timeout, child_timeout_callback, child); } child->watchid = g_child_watch_add(pid, child_death_dispatch, child); } diff --git a/lib/common/remote.c b/lib/common/remote.c index 7f04097c31..ae61481bc5 100644 --- a/lib/common/remote.c +++ b/lib/common/remote.c @@ -1,340 +1,705 @@ /* * Copyright (c) 2008 Andrew Beekhof * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA * */ #include #include #include #include #include #include #include #include - +#include #include +#include + #include #include #include #include #include #include #ifdef HAVE_GNUTLS_GNUTLS_H # undef KEYFILE # include #endif #ifdef HAVE_GNUTLS_GNUTLS_H -const int tls_kx_order[] = { +const int anon_tls_kx_order[] = { GNUTLS_KX_ANON_DH, GNUTLS_KX_DHE_RSA, GNUTLS_KX_DHE_DSS, GNUTLS_KX_RSA, 0 }; -gnutls_anon_client_credentials anon_cred_c; -gnutls_anon_server_credentials anon_cred_s; -static char *cib_send_tls(gnutls_session * session, xmlNode * msg); -static char *cib_recv_tls(gnutls_session * session); -#endif +int +crm_initiate_client_tls_handshake(void *session_data, int timeout_ms) +{ + int rc = 0; + int pollrc = 0; + time_t start = time(NULL); + gnutls_session *session = session_data; -char *cib_recv_plaintext(int sock); -char *cib_send_plaintext(int sock, xmlNode * msg); + do { + rc = gnutls_handshake(*session); + if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) { + pollrc = crm_recv_remote_ready(session, TRUE, 1000); + if (pollrc < 0) { + /* poll returned error, there is no hope */ + rc = -1; + } + } + } while (((time(NULL) - start) < (timeout_ms/1000)) && + (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN)); -#ifdef HAVE_GNUTLS_GNUTLS_H -gnutls_session *create_tls_session(int csock, int type); + return rc; +} -gnutls_session * -create_tls_session(int csock, int type /* GNUTLS_SERVER, GNUTLS_CLIENT */ ) +void * +crm_create_anon_tls_session(int csock, int type /* GNUTLS_SERVER, GNUTLS_CLIENT */, void *credentials) { - int rc = 0; gnutls_session *session = gnutls_malloc(sizeof(gnutls_session)); gnutls_init(session, type); # ifdef HAVE_GNUTLS_PRIORITY_SET_DIRECT /* http://www.manpagez.com/info/gnutls/gnutls-2.10.4/gnutls_81.php#Echo-Server-with-anonymous-authentication */ gnutls_priority_set_direct(*session, "NORMAL:+ANON-DH", NULL); /* gnutls_priority_set_direct (*session, "NONE:+VERS-TLS-ALL:+CIPHER-ALL:+MAC-ALL:+SIGN-ALL:+COMP-ALL:+ANON-DH", NULL); */ # else gnutls_set_default_priority(*session); - gnutls_kx_set_priority(*session, tls_kx_order); + gnutls_kx_set_priority(*session, anon_tls_kx_order); # endif gnutls_transport_set_ptr(*session, (gnutls_transport_ptr) GINT_TO_POINTER(csock)); switch (type) { - case GNUTLS_SERVER: - gnutls_credentials_set(*session, GNUTLS_CRD_ANON, anon_cred_s); - break; - case GNUTLS_CLIENT: - gnutls_credentials_set(*session, GNUTLS_CRD_ANON, anon_cred_c); - break; + case GNUTLS_SERVER: + gnutls_credentials_set(*session, GNUTLS_CRD_ANON, (gnutls_anon_server_credentials_t) credentials); + break; + case GNUTLS_CLIENT: + gnutls_credentials_set(*session, GNUTLS_CRD_ANON, (gnutls_anon_client_credentials_t) credentials); + break; } - do { - rc = gnutls_handshake(*session); - } while (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN); - - if (rc < 0) { - crm_err("Handshake failed: %s", gnutls_strerror(rc)); - gnutls_deinit(*session); - gnutls_free(session); - return NULL; - } return session; } -static char * -cib_send_tls(gnutls_session * session, xmlNode * msg) +static int +crm_send_tls(gnutls_session * session, const char *buf, size_t len) { - char *xml_text = NULL; - -# if 0 - const char *name = crm_element_name(msg); + const char *unsent = buf; + int rc = 0; + int total_send; - if (safe_str_neq(name, "cib_command")) { - xmlNodeSetName(msg, "cib_result"); + if (buf == NULL) { + return -1; } -# endif - xml_text = dump_xml_unformatted(msg); - if (xml_text != NULL) { - char *unsent = xml_text; - int len = strlen(xml_text); - int rc = 0; - len++; /* null char */ - crm_trace("Message size: %d", len); + total_send = len; + crm_trace("Message size: %d", len); - while (TRUE) { - rc = gnutls_record_send(*session, unsent, len); - crm_debug("Sent %d bytes", rc); + while (TRUE) { + rc = gnutls_record_send(*session, unsent, len); - if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) { - crm_debug("Retry"); + if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) { + crm_debug("Retry"); - } else if (rc < 0) { - crm_debug("Connection terminated"); - break; + } else if (rc < 0) { + crm_err("Connection terminated rc = %d", rc); + break; - } else if (rc < len) { - crm_debug("Only sent %d of %d bytes", rc, len); - len -= rc; - unsent += rc; - } else { - break; - } + } else if (rc < len) { + crm_debug("Only sent %d of %d bytes", rc, len); + len -= rc; + unsent += rc; + } else { + crm_debug("Sent %d bytes", rc); + break; } - } - free(xml_text); - return NULL; + return rc < 0 ? rc : total_send; } + +/*! + * \internal + * \brief Read bytes off non blocking tls session. + * + * \param session - tls session to read + * \param max_size - max bytes allowed to read for buffer. 0 assumes no limit + * + * \note only use with NON-Blocking sockets. Should only be used after polling socket. + * This function will return once max_size is met, the socket read buffer + * is empty, or an error is encountered. + * + * \retval '\0' terminated buffer on success + */ static char * -cib_recv_tls(gnutls_session * session) +crm_recv_tls(gnutls_session * session, size_t max_size, size_t *recv_len, int *disconnected) { char *buf = NULL; - int rc = 0; - int len = 0; - int chunk_size = 1024; + size_t len = 0; + size_t chunk_size = max_size ? max_size : 1024; + size_t buf_size = 0; + size_t read_size = 0; if (session == NULL) { - return NULL; + if (disconnected) { + *disconnected = 1; + } + goto done; } - buf = calloc(1, chunk_size); + buf = calloc(1, chunk_size + 1); + buf_size = chunk_size; while (TRUE) { - errno = 0; - rc = gnutls_record_recv(*session, buf + len, chunk_size); - crm_trace("Got %d more bytes. errno=%d", rc, errno); + read_size = buf_size - len; - if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) { - crm_trace("Retry"); + /* automatically grow the buffer when needed if max_size is not set.*/ + if (!max_size && (read_size < (chunk_size / 2))) { + buf_size += chunk_size; + crm_trace("Grow buffer by %d more bytes. buf is now %d bytes", (int)chunk_size, buf_size); + buf = realloc(buf, buf_size + 1); + CRM_ASSERT(buf != NULL); - } else if (rc == GNUTLS_E_UNEXPECTED_PACKET_LENGTH) { - crm_trace("Session disconnected"); - goto bail; + read_size = buf_size - len; + } - } else if (rc < 0) { - crm_err("Error receiving message: %s (%d)", gnutls_strerror(rc), rc); - goto bail; + rc = gnutls_record_recv(*session, buf + len, read_size); - } else if (rc == chunk_size) { + if (rc > 0) { + crm_trace("Got %d more bytes.", rc); len += rc; - chunk_size *= 2; - buf = realloc(buf, len + chunk_size); - crm_trace("Retry with %d more bytes", (int)chunk_size); - CRM_ASSERT(buf != NULL); - - } else if (buf[len + rc - 1] != 0) { - crm_trace("Last char is %d '%c'", buf[len + rc - 1], buf[len + rc - 1]); - crm_trace("Retry with %d more bytes", (int)chunk_size); - len += rc; - buf = realloc(buf, len + chunk_size); - CRM_ASSERT(buf != NULL); + /* always null terminate buffer, the +1 to alloc always allows for this.*/ + buf[len] = '\0'; + } + if (max_size && (max_size == read_size)) { + crm_trace("Buffer max read size %d met" , max_size); + goto done; + } - } else { - crm_trace("Got %d more bytes", (int)rc); - return buf; + /* process any errors. */ + if (rc == GNUTLS_E_INTERRUPTED) { + crm_trace("EINTR encoutered, retry tls read"); + } else if (rc == GNUTLS_E_AGAIN) { + crm_trace("non-blocking, exiting read on rc = %d", rc); + goto done; + } else if (rc <= 0) { + if (rc == 0) { + crm_debug("EOF encoutered during TLS read"); + } else { + crm_debug("Error receiving message: %s (%d)", gnutls_strerror(rc), rc); + } + if (disconnected) { + *disconnected = 1; + } + goto done; } } - bail: - free(buf); - return NULL; + +done: + if (recv_len) { + *recv_len = len; + } + if (!len) { + free(buf); + buf = NULL; + } + return buf; } #endif -char * -cib_send_plaintext(int sock, xmlNode * msg) +static int +crm_send_plaintext(int sock, const char *buf, size_t len) { - char *xml_text = dump_xml_unformatted(msg); - if (xml_text != NULL) { - int rc = 0; - char *unsent = xml_text; - int len = strlen(xml_text); + int rc = 0; + const char *unsent = buf; + int total_send; - len++; /* null char */ - crm_trace("Message on socket %d: size=%d", sock, len); - retry: - rc = write(sock, unsent, len); - if (rc < 0) { - switch (errno) { - case EINTR: - case EAGAIN: - crm_trace("Retry"); - goto retry; - default: - crm_perror(LOG_ERR, "Could only write %d of the remaining %d bytes", rc, len); - break; - } + if (buf == NULL) { + return -1; + } + total_send = len; - } else if (rc < len) { - crm_trace("Only sent %d of %d remaining bytes", rc, len); - len -= rc; - unsent += rc; + crm_trace("Message on socket %d: size=%d", sock, len); + retry: + rc = write(sock, unsent, len); + if (rc < 0) { + switch (errno) { + case EINTR: + case EAGAIN: + crm_trace("Retry"); goto retry; - - } else { - crm_trace("Sent %d bytes: %.100s", rc, xml_text); + default: + crm_perror(LOG_ERR, "Could only write %d of the remaining %d bytes", rc, (int) len); + break; } + + } else if (rc < len) { + crm_trace("Only sent %d of %d remaining bytes", rc, len); + len -= rc; + unsent += rc; + goto retry; + + } else { + crm_trace("Sent %d bytes: %.100s", rc, buf); } - free(xml_text); - return NULL; + + return rc < 0 ? rc : total_send; } -char * -cib_recv_plaintext(int sock) +/*! + * \internal + * \brief Read bytes off non blocking socket. + * + * \param session - tls session to read + * \param max_size - max bytes allowed to read for buffer. 0 assumes no limit + * + * \note only use with NON-Blocking sockets. Should only be used after polling socket. + * This function will return once max_size is met, the socket read buffer + * is empty, or an error is encountered. + * + * \retval '\0' terminated buffer on success + */ +static char * +crm_recv_plaintext(int sock, size_t max_size, size_t *recv_len, int *disconnected) { char *buf = NULL; - ssize_t rc = 0; ssize_t len = 0; - ssize_t chunk_size = 512; + ssize_t chunk_size = max_size ? max_size : 1024; + size_t buf_size = 0; + size_t read_size = 0; - buf = calloc(1, chunk_size); + if (sock <= 0) { + if (disconnected) { + *disconnected = 1; + } + goto done; + } - while (1) { - errno = 0; - rc = read(sock, buf + len, chunk_size); - crm_trace("Got %d more bytes. errno=%d", (int)rc, errno); - - if (errno == EINTR || errno == EAGAIN) { - crm_trace("Retry: %d", (int)rc); - if (rc > 0) { - len += rc; - buf = realloc(buf, len + chunk_size); - CRM_ASSERT(buf != NULL); - } + buf = calloc(1, chunk_size + 1); + buf_size = chunk_size; - } else if (rc < 0) { - crm_perror(LOG_ERR, "Error receiving message: %d", (int)rc); - goto bail; + while (TRUE) { + errno = 0; + read_size = buf_size - len; - } else if (rc == chunk_size) { - len += rc; - chunk_size *= 2; - buf = realloc(buf, len + chunk_size); - crm_trace("Retry with %d more bytes", (int)chunk_size); + /* automatically grow the buffer when needed if max_size is not set.*/ + if (!max_size && (read_size < (chunk_size / 2))) { + buf_size += chunk_size; + crm_trace("Grow buffer by %d more bytes. buf is now %d bytes", (int)chunk_size, buf_size); + buf = realloc(buf, buf_size + 1); CRM_ASSERT(buf != NULL); - } else if (buf[len + rc - 1] != 0) { - crm_trace("Last char is %d '%c'", buf[len + rc - 1], buf[len + rc - 1]); - crm_trace("Retry with %d more bytes", (int)chunk_size); + read_size = buf_size - len; + } + + rc = read(sock, buf + len, chunk_size); + + if (rc > 0) { + crm_trace("Got %d more bytes. errno=%d", (int)rc, errno); len += rc; - buf = realloc(buf, len + chunk_size); - CRM_ASSERT(buf != NULL); + /* always null terminate buffer, the +1 to alloc always allows for this.*/ + buf[len] = '\0'; + } + if (max_size && (max_size == read_size)) { + crm_trace("Buffer max read size %d met" , max_size); + goto done; + } - } else { - return buf; + if (rc > 0) { + continue; + } else if (rc == 0) { + if (disconnected) { + *disconnected = 1; + } + crm_trace("EOF encoutered during read"); + goto done; + } + + /* process errors */ + if (errno == EINTR) { + crm_trace("EINTER encoutered, retry socket read."); + } else if (errno == EAGAIN) { + crm_trace("non-blocking, exiting read on rc = %d", rc); + goto done; + } else if (errno <= 0) { + if (disconnected) { + *disconnected = 1; + } + crm_debug("Error receiving message: %d", (int)rc); + goto done; } } - bail: - free(buf); - return NULL; +done: + if (recv_len) { + *recv_len = len; + } + if (!len) { + free(buf); + buf = NULL; + } + return buf; } -void -crm_send_remote_msg(void *session, xmlNode * msg, gboolean encrypted) +static int +crm_send_remote_msg_raw(void *session, const char *buf, size_t len, gboolean encrypted) { + int rc = -1; if (encrypted) { #ifdef HAVE_GNUTLS_GNUTLS_H - cib_send_tls(session, msg); + rc = crm_send_tls(session, buf, len); #else CRM_ASSERT(encrypted == FALSE); #endif } else { - cib_send_plaintext(GPOINTER_TO_INT(session), msg); + rc = crm_send_plaintext(GPOINTER_TO_INT(session), buf, len); } + return rc; } +int +crm_send_remote_msg(void *session, xmlNode * msg, gboolean encrypted) +{ + int rc = -1; + char *xml_text = NULL; + int len = 0; + + xml_text = dump_xml_unformatted(msg); + if (xml_text) { + len = strlen(xml_text); + } else { + crm_err("Invalid XML, can not send msg"); + return -1; + } + + rc = crm_send_remote_msg_raw(session, xml_text, len, encrypted); + if (rc < 0) { + goto done; + } + rc = crm_send_remote_msg_raw(session, REMOTE_MSG_TERMINATOR, strlen(REMOTE_MSG_TERMINATOR), encrypted); + +done: + if (rc < 0) { + crm_err("Failed to send remote msg, rc = %d", rc); + } + + free(xml_text); + return rc; +} + +/*! + * \internal + * \brief handles the recv buffer and parsing out msgs. + * \note new_data is owned by this function once it is passed in. + */ xmlNode * -crm_recv_remote_msg(void *session, gboolean encrypted) +crm_parse_remote_buffer(char **msg_buf) { - char *reply = NULL; + char *buf = NULL; + char *start = NULL; + char *end = NULL; xmlNode *xml = NULL; + if (*msg_buf == NULL) { + return NULL; + } + + /* take ownership of the buffer */ + buf = *msg_buf; + *msg_buf = NULL; + + /* MSGS are separated by a '\r\n\r\n'. Split a message off the buffer and return it. */ + start = buf; + end = strstr(start, REMOTE_MSG_TERMINATOR); + + while (!xml && end) { + + /* grab the message */ + end[0] = '\0'; + end += strlen(REMOTE_MSG_TERMINATOR); + + xml = string2xml(start); + if (xml == NULL) { + crm_err("Couldn't parse: '%.120s'", start); + } + start = end; + end = strstr(start, REMOTE_MSG_TERMINATOR); + } + + if (xml && start) { + /* we have msgs left over, save it until next time */ + *msg_buf = strdup(start); + free(buf); + } else if (!xml) { + /* no msg present */ + *msg_buf = buf; + } + + return xml; +} + +/*! + * \internal + * \brief Determine if a remote session has data to read + * + * \retval 0, timeout occured. + * \retval positive, data is ready to be read + * \retval negative, session has ended + */ +int +crm_recv_remote_ready(void *session, gboolean encrypted, int timeout /* ms */) +{ + struct pollfd fds = { 0, }; + int sock = 0; + void *sock_ptr = NULL; + int rc = 0; + time_t start; + + if (encrypted) { +#ifdef HAVE_GNUTLS_GNUTLS_H + gnutls_session *tls_session = session; + sock_ptr = gnutls_transport_get_ptr(*tls_session); +#else + CRM_ASSERT(encrypted == FALSE); +#endif + } else { + sock_ptr = session; + } + + sock = GPOINTER_TO_INT(sock_ptr); + if (sock <= 0) { + return -ENOTCONN; + } + + start = time(NULL); + errno = 0; + do { + fds.fd = sock; + fds.events = POLLIN; + + /* If we got an EINTR while polling, and we have a + * specific timeout we are trying to honor, attempt + * to adjust the timeout to the closest second. */ + if (errno == EINTR && (timeout > 0)) { + timeout = timeout - ((time(NULL) - start) * 1000); + if (timeout < 1000) { + timeout = 1000; + } + } + + rc = poll(&fds, 1, timeout); + } while (rc < 0 && errno == EINTR); + + return rc; +} + +char * +crm_recv_remote_raw(void *session, gboolean encrypted, size_t max_recv, size_t *recv_len, int *disconnected) +{ + char *reply = NULL; + if (recv_len) { + *recv_len = 0; + } + + if (disconnected) { + *disconnected = 0; + } + if (encrypted) { #ifdef HAVE_GNUTLS_GNUTLS_H - reply = cib_recv_tls(session); + reply = crm_recv_tls(session, max_recv, recv_len, disconnected); #else CRM_ASSERT(encrypted == FALSE); #endif } else { - reply = cib_recv_plaintext(GPOINTER_TO_INT(session)); + reply = crm_recv_plaintext(GPOINTER_TO_INT(session), max_recv, recv_len, disconnected); } if (reply == NULL || strlen(reply) == 0) { crm_trace("Empty reply"); + } - } else { - xml = string2xml(reply); - if (xml == NULL) { - crm_err("Couldn't parse: '%.120s'", reply); + return reply; +} + +/*! + * \internal + * \brief Read data off the socket until at least one full message is present or timeout occures. + * \retval TRUE message read + * \retval FALSE full message not read + */ + +gboolean +crm_recv_remote_msg(void *session, char **recv_buf, gboolean encrypted, int total_timeout /*ms */, int *disconnected) +{ + int ret; + size_t request_len = 0; + time_t start = time(NULL); + char *raw_request = NULL; + int remaining_timeout = 0; + + if (total_timeout == 0) { + total_timeout = 10000; + } else if (total_timeout < 0) { + total_timeout = 60000; + } + *disconnected = 0; + + remaining_timeout = total_timeout; + while ((remaining_timeout > 0) && !(*disconnected)) { + + /* read some more off the tls buffer if we still have time left. */ + crm_trace("waiting to receive remote msg, starting timeout %d, remaining_timeout %d", total_timeout, remaining_timeout); + ret = crm_recv_remote_ready(session, encrypted, remaining_timeout); + raw_request = NULL; + + if (ret == 0) { + crm_err("poll timed out (%d ms) while waiting to receive msg", remaining_timeout); + return FALSE; + + } else if (ret < 0) { + if (errno != EINTR) { + crm_debug("poll returned error while waiting for msg, rc: %d, errno: %d", ret, errno); + *disconnected = 1; + return FALSE; + } + crm_debug("poll EINTR encountered during poll, retrying"); + } else { + raw_request = crm_recv_remote_raw(session, encrypted, 0, &request_len, disconnected); + } + + remaining_timeout = remaining_timeout - ((time(NULL) - start) * 1000); + + if (!raw_request) { + crm_debug("Empty msg received after poll"); + continue; + } + + if (*recv_buf) { + int old_len = strlen(*recv_buf); + + crm_trace("Expanding recv buffer from %d to %d", old_len, old_len+request_len); + + *recv_buf = realloc(*recv_buf, old_len + request_len + 1); + memcpy(*recv_buf + old_len, raw_request, request_len); + *(*recv_buf+old_len+request_len) = '\0'; + free(raw_request); + } else { + *recv_buf = raw_request; + } + + if (strstr(*recv_buf, REMOTE_MSG_TERMINATOR)) { + return TRUE; } } - free(reply); - return xml; + return FALSE; } + +/*! + * \internal + * \brief tcp connection to server at specified port + * \retval positive, socket fd. + * \retval negative, failed to connect. + */ +int +crm_remote_tcp_connect(const char *host, int port) +{ + struct addrinfo *res; + struct addrinfo *rp; + struct addrinfo hints; + const char *server = host; + int ret_ga; + int sock; + + /* getaddrinfo */ + memset(&hints, 0, sizeof(struct addrinfo)); + hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */ + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_CANONNAME; + + crm_debug("Looking up %s", server); + ret_ga = getaddrinfo(server, NULL, &hints, &res); + if (ret_ga) { + crm_err("getaddrinfo: %s", gai_strerror(ret_ga)); + return -1; + } + + if (!res || !res->ai_addr) { + crm_err("getaddrinfo failed"); + return -1; + } + + for (rp = res; rp != NULL; rp = rp->ai_next) { + struct sockaddr *addr = rp->ai_addr; + int flag = 0; + if (!addr) { + continue; + } + + if (rp->ai_canonname) { + server = res->ai_canonname; + } + crm_debug("Got address %s for %s", server, host); + + /* create socket */ + sock = socket(rp->ai_family, SOCK_STREAM, IPPROTO_TCP); + if (sock == -1) { + crm_err("Socket creation failed for remote client connection."); + continue; + } + if (addr->sa_family == AF_INET6) { + struct sockaddr_in6 *addr_in = (struct sockaddr_in6 *) addr; + addr_in->sin6_port = htons(port); + } else { + struct sockaddr_in *addr_in = (struct sockaddr_in *) addr; + addr_in->sin_port = htons(port); + crm_info("Attempting to connect to remote server at %s:%d", inet_ntoa(addr_in->sin_addr), port); + } + + if (connect(sock, rp->ai_addr, rp->ai_addrlen) == 0) { + if ((flag = fcntl(sock, F_GETFL)) >= 0) { + if (fcntl(sock, F_SETFL, flag | O_NONBLOCK) < 0) { + crm_err( "fcntl() write failed"); + close(sock); + sock = -1; + continue; + } + } + break; /* Success */ + } + + close(sock); + sock = -1; + } + freeaddrinfo(res); + + return sock; +} + diff --git a/tools/crm_mon.c b/tools/crm_mon.c index 5c2e68722a..fe592649fc 100644 --- a/tools/crm_mon.c +++ b/tools/crm_mon.c @@ -1,2292 +1,2292 @@ /* * Copyright (C) 2004 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include <../lib/pengine/unpack.h> #include <../pengine/pengine.h> #include /* GMainLoop *mainloop = NULL; */ void wait_for_refresh(int offset, const char *prefix, int msec); void clean_up(int rc); void crm_diff_update(const char *event, xmlNode * msg); gboolean mon_refresh_display(gpointer user_data); int cib_connect(gboolean full); void mon_st_callback(stonith_t *st, stonith_event_t *e); char *xml_file = NULL; char *as_html_file = NULL; int as_xml = 0; char *pid_file = NULL; char *snmp_target = NULL; char *snmp_community = NULL; gboolean as_console = TRUE;; gboolean simple_status = FALSE; gboolean group_by_node = FALSE; gboolean inactive_resources = FALSE; gboolean web_cgi = FALSE; int reconnect_msec = 5000; gboolean daemonize = FALSE; GMainLoop *mainloop = NULL; guint timer_id = 0; GList *attr_list = NULL; const char *crm_mail_host = NULL; const char *crm_mail_prefix = NULL; const char *crm_mail_from = NULL; const char *crm_mail_to = NULL; const char *external_agent = NULL; const char *external_recipient = NULL; cib_t *cib = NULL; stonith_t *st = NULL; xmlNode *current_cib = NULL; gboolean one_shot = FALSE; gboolean has_warnings = FALSE; gboolean print_failcount = FALSE; gboolean print_operations = FALSE; gboolean print_timing = FALSE; gboolean print_nodes_attr = FALSE; gboolean print_last_updated = TRUE; gboolean print_last_change = TRUE; gboolean print_tickets = FALSE; gboolean watch_fencing = FALSE; #define FILTER_STR {"shutdown", "terminate", "standby", "fail-count", \ "last-failure", "probe_complete", "#id", "#uname", \ "#is_dc", NULL} gboolean log_diffs = FALSE; gboolean log_updates = FALSE; long last_refresh = 0; crm_trigger_t *refresh_trigger = NULL; /* * 1.3.6.1.4.1.32723 has been assigned to the project by IANA * http://www.iana.org/assignments/enterprise-numbers */ #define PACEMAKER_PREFIX "1.3.6.1.4.1.32723" #define PACEMAKER_TRAP_PREFIX PACEMAKER_PREFIX ".1" #define snmp_crm_trap_oid PACEMAKER_TRAP_PREFIX #define snmp_crm_oid_node PACEMAKER_TRAP_PREFIX ".1" #define snmp_crm_oid_rsc PACEMAKER_TRAP_PREFIX ".2" #define snmp_crm_oid_task PACEMAKER_TRAP_PREFIX ".3" #define snmp_crm_oid_desc PACEMAKER_TRAP_PREFIX ".4" #define snmp_crm_oid_status PACEMAKER_TRAP_PREFIX ".5" #define snmp_crm_oid_rc PACEMAKER_TRAP_PREFIX ".6" #define snmp_crm_oid_trc PACEMAKER_TRAP_PREFIX ".7" #if CURSES_ENABLED # define print_dot() if(as_console) { \ printw("."); \ clrtoeol(); \ refresh(); \ } else { \ fprintf(stdout, "."); \ } #else # define print_dot() fprintf(stdout, "."); #endif #if CURSES_ENABLED # define print_as(fmt, args...) if(as_console) { \ printw(fmt, ##args); \ clrtoeol(); \ refresh(); \ } else { \ fprintf(stdout, fmt, ##args); \ } #else # define print_as(fmt, args...) fprintf(stdout, fmt, ##args); #endif static void blank_screen(void) { #if CURSES_ENABLED int lpc = 0; for (lpc = 0; lpc < LINES; lpc++) { move(lpc, 0); clrtoeol(); } move(0, 0); refresh(); #endif } static gboolean mon_timer_popped(gpointer data) { int rc = pcmk_ok; if (timer_id > 0) { g_source_remove(timer_id); } rc = cib_connect(TRUE); if (rc != pcmk_ok) { print_dot(); timer_id = g_timeout_add(reconnect_msec, mon_timer_popped, NULL); } return FALSE; } static void mon_cib_connection_destroy(gpointer user_data) { print_as("Connection to the CIB terminated\n"); if (cib) { print_as("Reconnecting..."); cib->cmds->signoff(cib); timer_id = g_timeout_add(reconnect_msec, mon_timer_popped, NULL); } return; } /* * Mainloop signal handler. */ static void mon_shutdown(int nsig) { clean_up(EX_OK); } #if ON_DARWIN # define sighandler_t sig_t #endif #if CURSES_ENABLED #ifndef HAVE_SIGHANDLER_T typedef void (*sighandler_t)(int); #endif static sighandler_t ncurses_winch_handler; static void mon_winresize(int nsig) { static int not_done; int lines = 0, cols = 0; if (!not_done++) { if (ncurses_winch_handler) /* the original ncurses WINCH signal handler does the * magic of retrieving the new window size; * otherwise, we'd have to use ioctl or tgetent */ (*ncurses_winch_handler) (SIGWINCH); getmaxyx(stdscr, lines, cols); resizeterm(lines, cols); mainloop_set_trigger(refresh_trigger); } not_done--; } #endif int cib_connect(gboolean full) { int rc = pcmk_ok; static gboolean need_pass = TRUE; CRM_CHECK(cib != NULL, return -EINVAL); if (getenv("CIB_passwd") != NULL) { need_pass = FALSE; } if(watch_fencing && st == NULL) { st = stonith_api_new(); } if(watch_fencing && st->state == stonith_disconnected) { crm_trace("Connecting to stonith"); rc = st->cmds->connect(st, crm_system_name, NULL); if(rc == pcmk_ok) { crm_trace("Setting up stonith callbacks"); st->cmds->register_notification(st, T_STONITH_NOTIFY_FENCE, mon_st_callback); } } if (cib->state != cib_connected_query && cib->state != cib_connected_command) { crm_trace("Connecting to the CIB"); if (as_console && need_pass && cib->variant == cib_remote) { need_pass = FALSE; print_as("Password:"); } rc = cib->cmds->signon(cib, crm_system_name, cib_query); if (rc != pcmk_ok) { return rc; } current_cib = get_cib_copy(cib); mon_refresh_display(NULL); if (full) { if (rc == pcmk_ok) { rc = cib->cmds->set_connection_dnotify(cib, mon_cib_connection_destroy); if (rc == -EPROTONOSUPPORT) { - print_as("Notification setup failed, won't be able to reconnect after failure"); + print_as("Notification setup not supported, won't be able to reconnect after failure"); if (as_console) { sleep(2); } rc = pcmk_ok; } } if (rc == pcmk_ok) { cib->cmds->del_notify_callback(cib, T_CIB_DIFF_NOTIFY, crm_diff_update); rc = cib->cmds->add_notify_callback(cib, T_CIB_DIFF_NOTIFY, crm_diff_update); } if (rc != pcmk_ok) { print_as("Notification setup failed, could not monitor CIB actions"); if (as_console) { sleep(2); } clean_up(-rc); } } } return rc; } /* *INDENT-OFF* */ static struct crm_option long_options[] = { /* Top-level Options */ {"help", 0, 0, '?', "\tThis text"}, {"version", 0, 0, '$', "\tVersion information" }, {"verbose", 0, 0, 'V', "\tIncrease debug output"}, {"quiet", 0, 0, 'Q', "\tDisplay only essential output" }, {"-spacer-", 1, 0, '-', "\nModes:"}, {"as-html", 1, 0, 'h', "Write cluster status to the named html file"}, {"as-xml", 0, 0, 'X', "\tWrite cluster status as xml to stdout. This will enable one-shot mode."}, {"web-cgi", 0, 0, 'w', "\tWeb mode with output suitable for cgi"}, {"simple-status", 0, 0, 's', "Display the cluster status once as a simple one line output (suitable for nagios)"}, {"snmp-traps", 1, 0, 'S', "Send SNMP traps to this station", !ENABLE_SNMP}, {"snmp-community", 1, 0, 'C', "Specify community for SNMP traps(default is NULL)", !ENABLE_SNMP}, {"mail-to", 1, 0, 'T', "Send Mail alerts to this user. See also --mail-from, --mail-host, --mail-prefix", !ENABLE_ESMTP}, {"-spacer-", 1, 0, '-', "\nDisplay Options:"}, {"group-by-node", 0, 0, 'n', "\tGroup resources by node" }, {"inactive", 0, 0, 'r', "\tDisplay inactive resources" }, {"failcounts", 0, 0, 'f', "\tDisplay resource fail counts"}, {"operations", 0, 0, 'o', "\tDisplay resource operation history" }, {"timing-details", 0, 0, 't', "\tDisplay resource operation history with timing details" }, {"tickets", 0, 0, 'c', "\t\tDisplay cluster tickets"}, {"watch-fencing", 0, 0, 'W', "\t\tListen for fencing events. For use with --external-agent, --mail-to and/or --snmp-traps where supported"}, {"show-node-attributes", 0, 0, 'A', "Display node attributes" }, {"-spacer-", 1, 0, '-', "\nAdditional Options:"}, {"interval", 1, 0, 'i', "\tUpdate frequency in seconds" }, {"one-shot", 0, 0, '1', "\tDisplay the cluster status once on the console and exit"}, {"disable-ncurses",0, 0, 'N', "\tDisable the use of ncurses", !CURSES_ENABLED}, {"daemonize", 0, 0, 'd', "\tRun in the background as a daemon"}, {"pid-file", 1, 0, 'p', "\t(Advanced) Daemon pid file location"}, {"mail-from", 1, 0, 'F', "\tMail alerts should come from the named user", !ENABLE_ESMTP}, {"mail-host", 1, 0, 'H', "\tMail alerts should be sent via the named host", !ENABLE_ESMTP}, {"mail-prefix", 1, 0, 'P', "Subjects for mail alerts should start with this string", !ENABLE_ESMTP}, {"external-agent", 1, 0, 'E', "A program to run when resource operations take place."}, {"external-recipient",1, 0, 'e', "A recipient for your program (assuming you want the program to send something to someone)."}, {"xml-file", 1, 0, 'x', NULL, 1}, {"-spacer-", 1, 0, '-', "\nExamples:", pcmk_option_paragraph}, {"-spacer-", 1, 0, '-', "Display the cluster status on the console with updates as they occur:", pcmk_option_paragraph}, {"-spacer-", 1, 0, '-', " crm_mon", pcmk_option_example}, {"-spacer-", 1, 0, '-', "Display the cluster status on the console just once then exit:", pcmk_option_paragraph}, {"-spacer-", 1, 0, '-', " crm_mon -1", pcmk_option_example}, {"-spacer-", 1, 0, '-', "Display your cluster status, group resources by node, and include inactive resources in the list:", pcmk_option_paragraph}, {"-spacer-", 1, 0, '-', " crm_mon --group-by-node --inactive", pcmk_option_example}, {"-spacer-", 1, 0, '-', "Start crm_mon as a background daemon and have it write the cluster status to an HTML file:", pcmk_option_paragraph}, {"-spacer-", 1, 0, '-', " crm_mon --daemonize --as-html /path/to/docroot/filename.html", pcmk_option_example}, {"-spacer-", 1, 0, '-', "Start crm_mon and export the current cluster status as xml to stdout, then exit.:", pcmk_option_paragraph}, {"-spacer-", 1, 0, '-', " crm_mon --as-xml", pcmk_option_example}, {"-spacer-", 1, 0, '-', "Start crm_mon as a background daemon and have it send email alerts:", pcmk_option_paragraph|!ENABLE_ESMTP}, {"-spacer-", 1, 0, '-', " crm_mon --daemonize --mail-to user@example.com --mail-host mail.example.com", pcmk_option_example|!ENABLE_ESMTP}, {"-spacer-", 1, 0, '-', "Start crm_mon as a background daemon and have it send SNMP alerts:", pcmk_option_paragraph|!ENABLE_SNMP}, {"-spacer-", 1, 0, '-', " crm_mon --daemonize --snmp-traps snmptrapd.example.com", pcmk_option_example|!ENABLE_SNMP}, {NULL, 0, 0, 0} }; /* *INDENT-ON* */ int main(int argc, char **argv) { int flag; int argerr = 0; int exit_code = 0; int option_index = 0; pid_file = strdup("/tmp/ClusterMon.pid"); crm_log_cli_init("crm_mon"); crm_set_options(NULL, "mode [options]", long_options, "Provides a summary of cluster's current state." "\n\nOutputs varying levels of detail in a number of different formats.\n"); #ifndef ON_DARWIN /* prevent zombies */ signal(SIGCLD, SIG_IGN); #endif if (strcmp(crm_system_name, "crm_mon.cgi") == 0) { web_cgi = TRUE; one_shot = TRUE; } while (1) { flag = crm_get_option(argc, argv, &option_index); if (flag == -1) break; switch (flag) { case 'V': crm_bump_log_level(argc, argv); break; case 'Q': print_last_updated = FALSE; print_last_change = FALSE; break; case 'i': reconnect_msec = crm_get_msec(optarg); break; case 'n': group_by_node = TRUE; break; case 'r': inactive_resources = TRUE; break; case 'W': watch_fencing = TRUE; break; case 'd': daemonize = TRUE; break; case 't': print_timing = TRUE; print_operations = TRUE; break; case 'o': print_operations = TRUE; break; case 'f': print_failcount = TRUE; break; case 'A': print_nodes_attr = TRUE; break; case 'c': print_tickets = TRUE; break; case 'p': free(pid_file); pid_file = strdup(optarg); break; case 'x': xml_file = strdup(optarg); one_shot = TRUE; break; case 'h': as_html_file = strdup(optarg); break; case 'X': as_xml = TRUE; one_shot = TRUE; break; case 'w': web_cgi = TRUE; one_shot = TRUE; break; case 's': simple_status = TRUE; one_shot = TRUE; break; case 'S': snmp_target = optarg; break; case 'T': crm_mail_to = optarg; break; case 'F': crm_mail_from = optarg; break; case 'H': crm_mail_host = optarg; break; case 'P': crm_mail_prefix = optarg; break; case 'E': external_agent = optarg; break; case 'e': external_recipient = optarg; break; case '1': one_shot = TRUE; break; case 'N': as_console = FALSE; break; case 'C': snmp_community = optarg; break; case '$': case '?': crm_help(flag, EX_OK); break; default: printf("Argument code 0%o (%c) is not (?yet?) supported\n", flag, flag); ++argerr; break; } } if (optind < argc) { printf("non-option ARGV-elements: "); while (optind < argc) printf("%s ", argv[optind++]); printf("\n"); } if (argerr) { crm_help('?', EX_USAGE); } if (one_shot) { as_console = FALSE; } else if (daemonize) { as_console = FALSE; crm_enable_stderr(FALSE); if (!as_html_file && !snmp_target && !crm_mail_to && !external_agent && !as_xml) { printf ("Looks like you forgot to specify one or more of: --as-html, --as-xml, --mail-to, --snmp-target, --external-agent\n"); crm_help('?', EX_USAGE); } crm_make_daemon(crm_system_name, TRUE, pid_file); } else if (as_console) { #if CURSES_ENABLED initscr(); cbreak(); noecho(); crm_enable_stderr(FALSE); #else one_shot = TRUE; as_console = FALSE; printf("Defaulting to one-shot mode\n"); printf("You need to have curses available at compile time to enable console mode\n"); #endif } crm_info("Starting %s", crm_system_name); if (xml_file != NULL) { current_cib = filename2xml(xml_file); mon_refresh_display(NULL); return exit_code; } if (current_cib == NULL) { cib = cib_new(); if (!one_shot) { print_as("Attempting connection to the cluster..."); } do { exit_code = cib_connect(!one_shot); if (one_shot) { break; } else if (exit_code != pcmk_ok) { print_dot(); sleep(reconnect_msec / 1000); } } while (exit_code == -ENOTCONN); if (exit_code != pcmk_ok) { print_as("\nConnection to cluster failed: %s\n", pcmk_strerror(exit_code)); if (as_console) { sleep(2); } clean_up(-exit_code); } } if (one_shot) { return exit_code; } mainloop = g_main_new(FALSE); mainloop_add_signal(SIGTERM, mon_shutdown); mainloop_add_signal(SIGINT, mon_shutdown); #if CURSES_ENABLED if (as_console) { ncurses_winch_handler = signal(SIGWINCH, mon_winresize); if (ncurses_winch_handler == SIG_DFL || ncurses_winch_handler == SIG_IGN || ncurses_winch_handler == SIG_ERR) ncurses_winch_handler = NULL; } #endif refresh_trigger = mainloop_add_trigger(G_PRIORITY_LOW, mon_refresh_display, NULL); g_main_run(mainloop); g_main_destroy(mainloop); crm_info("Exiting %s", crm_system_name); clean_up(0); return 0; /* never reached */ } void wait_for_refresh(int offset, const char *prefix, int msec) { int lpc = msec / 1000; struct timespec sleept = { 1, 0 }; if (as_console == FALSE) { timer_id = g_timeout_add(msec, mon_timer_popped, NULL); return; } crm_notice("%sRefresh in %ds...", prefix ? prefix : "", lpc); while (lpc > 0) { #if CURSES_ENABLED move(offset, 0); /* printw("%sRefresh in \033[01;32m%ds\033[00m...", prefix?prefix:"", lpc); */ printw("%sRefresh in %ds...\n", prefix ? prefix : "", lpc); clrtoeol(); refresh(); #endif lpc--; if (lpc == 0) { timer_id = g_timeout_add(1000, mon_timer_popped, NULL); } else { if (nanosleep(&sleept, NULL) != 0) { return; } } } } #define mon_warn(fmt...) do { \ if (!has_warnings) { \ print_as("Warning:"); \ } else { \ print_as(","); \ } \ print_as(fmt); \ has_warnings = TRUE; \ } while(0) static int count_resources(pe_working_set_t * data_set, resource_t * rsc) { int count = 0; GListPtr gIter = NULL; if (rsc == NULL) { gIter = data_set->resources; } else if (rsc->children) { gIter = rsc->children; } else { return is_not_set(rsc->flags, pe_rsc_orphan); } for (; gIter != NULL; gIter = gIter->next) { count += count_resources(data_set, gIter->data); } return count; } static int print_simple_status(pe_working_set_t * data_set) { node_t *dc = NULL; GListPtr gIter = NULL; int nodes_online = 0; int nodes_standby = 0; dc = data_set->dc_node; if (dc == NULL) { mon_warn("No DC "); } for (gIter = data_set->nodes; gIter != NULL; gIter = gIter->next) { node_t *node = (node_t *) gIter->data; if (node->details->standby && node->details->online) { nodes_standby++; } else if (node->details->online) { nodes_online++; } else { mon_warn("offline node: %s", node->details->uname); } } if (!has_warnings) { print_as("Ok: %d nodes online", nodes_online); if (nodes_standby > 0) { print_as(", %d standby nodes", nodes_standby); } print_as(", %d resources configured", count_resources(data_set, NULL)); } print_as("\n"); return 0; } extern int get_failcount(node_t * node, resource_t * rsc, int *last_failure, pe_working_set_t * data_set); static void print_date(time_t time) { int lpc = 0; char date_str[26]; asctime_r(localtime(&time), date_str); for (; lpc < 26; lpc++) { if (date_str[lpc] == '\n') { date_str[lpc] = 0; } } print_as("'%s'", date_str); } static void print_rsc_summary(pe_working_set_t * data_set, node_t * node, resource_t * rsc, gboolean all) { gboolean printed = FALSE; time_t last_failure = 0; char *fail_attr = crm_concat("fail-count", rsc->id, '-'); const char *value = g_hash_table_lookup(node->details->attrs, fail_attr); int failcount = char2score(value); /* Get the true value, not the effective one from get_failcount() */ get_failcount(node, rsc, (int *)&last_failure, data_set); free(fail_attr); if (all || failcount || last_failure > 0) { printed = TRUE; print_as(" %s: migration-threshold=%d", rsc->id, rsc->migration_threshold); } if (failcount > 0) { printed = TRUE; print_as(" fail-count=%d", failcount); } if (last_failure > 0) { printed = TRUE; print_as(" last-failure="); print_date(last_failure); } if (printed) { print_as("\n"); } } static void print_rsc_history(pe_working_set_t * data_set, node_t * node, xmlNode * rsc_entry) { GListPtr gIter = NULL; GListPtr op_list = NULL; gboolean print_name = TRUE; GListPtr sorted_op_list = NULL; const char *rsc_id = crm_element_value(rsc_entry, XML_ATTR_ID); resource_t *rsc = pe_find_resource(data_set->resources, rsc_id); xmlNode *rsc_op = NULL; for (rsc_op = __xml_first_child(rsc_entry); rsc_op != NULL; rsc_op = __xml_next(rsc_op)) { if (crm_str_eq((const char *)rsc_op->name, XML_LRM_TAG_RSC_OP, TRUE)) { op_list = g_list_append(op_list, rsc_op); } } sorted_op_list = g_list_sort(op_list, sort_op_by_callid); for (gIter = sorted_op_list; gIter != NULL; gIter = gIter->next) { xmlNode *xml_op = (xmlNode *) gIter->data; const char *value = NULL; const char *call = crm_element_value(xml_op, XML_LRM_ATTR_CALLID); const char *task = crm_element_value(xml_op, XML_LRM_ATTR_TASK); const char *op_rc = crm_element_value(xml_op, XML_LRM_ATTR_RC); const char *interval = crm_element_value(xml_op, XML_LRM_ATTR_INTERVAL); int rc = crm_parse_int(op_rc, "0"); if (safe_str_eq(task, CRMD_ACTION_STATUS) && safe_str_eq(interval, "0")) { task = "probe"; } if (rc == 7 && safe_str_eq(task, "probe")) { continue; } else if (safe_str_eq(task, CRMD_ACTION_NOTIFY)) { continue; } if (print_name) { print_name = FALSE; if (rsc == NULL) { print_as("Orphan resource: %s", rsc_id); } else { print_rsc_summary(data_set, node, rsc, TRUE); } } print_as(" + (%s) %s:", call, task); if (safe_str_neq(interval, "0")) { print_as(" interval=%sms", interval); } if (print_timing) { int int_value; const char *attr = "last-rc-change"; value = crm_element_value(xml_op, attr); if (value) { int_value = crm_parse_int(value, NULL); print_as(" %s=", attr); print_date(int_value); } attr = "last-run"; value = crm_element_value(xml_op, attr); if (value) { int_value = crm_parse_int(value, NULL); print_as(" %s=", attr); print_date(int_value); } attr = "exec-time"; value = crm_element_value(xml_op, attr); if (value) { int_value = crm_parse_int(value, NULL); print_as(" %s=%dms", attr, int_value); } attr = "queue-time"; value = crm_element_value(xml_op, attr); if (value) { int_value = crm_parse_int(value, NULL); print_as(" %s=%dms", attr, int_value); } } print_as(" rc=%s (%s)\n", op_rc, lrmd_event_rc2str(rc)); } /* no need to free the contents */ g_list_free(sorted_op_list); } static void print_attr_msg(node_t * node, GListPtr rsc_list, const char *attrname, const char *attrvalue) { GListPtr gIter = NULL; for (gIter = rsc_list; gIter != NULL; gIter = gIter->next) { resource_t *rsc = (resource_t *) gIter->data; const char *type = g_hash_table_lookup(rsc->meta, "type"); if (rsc->children != NULL) { print_attr_msg(node, rsc->children, attrname, attrvalue); } if (safe_str_eq(type, "ping") || safe_str_eq(type, "pingd")) { const char *name = "pingd"; const char *multiplier = NULL; char **host_list = NULL; int host_list_num = 0; int expected_score = 0; if (g_hash_table_lookup(rsc->meta, "name") != NULL) { name = g_hash_table_lookup(rsc->meta, "name"); } /* To identify the resource with the attribute name. */ if (safe_str_eq(name, attrname)) { int value = crm_parse_int(attrvalue, "0"); multiplier = g_hash_table_lookup(rsc->meta, "multiplier"); host_list = g_strsplit(g_hash_table_lookup(rsc->meta, "host_list"), " ", 0); host_list_num = g_strv_length(host_list); g_strfreev(host_list); /* pingd multiplier is the same as the default value. */ expected_score = host_list_num * crm_parse_int(multiplier, "1"); /* pingd is abnormal score. */ if (value <= 0) { print_as("\t: Connectivity is lost"); } else if (value < expected_score) { print_as("\t: Connectivity is degraded (Expected=%d)", expected_score); } } } } } static int compare_attribute(gconstpointer a, gconstpointer b) { int rc; rc = strcmp((const char *)a, (const char *)b); return rc; } static void create_attr_list(gpointer name, gpointer value, gpointer data) { int i; const char *filt_str[] = FILTER_STR; CRM_CHECK(name != NULL, return); /* filtering automatic attributes */ for (i = 0; filt_str[i] != NULL; i++) { if (g_str_has_prefix(name, filt_str[i])) { return; } } attr_list = g_list_insert_sorted(attr_list, name, compare_attribute); } static void print_node_attribute(gpointer name, gpointer node_data) { const char *value = NULL; node_t *node = (node_t *) node_data; value = g_hash_table_lookup(node->details->attrs, name); print_as(" + %-32s\t: %-10s", (char *)name, value); print_attr_msg(node, node->details->running_rsc, name, value); print_as("\n"); } static void print_node_summary(pe_working_set_t * data_set, gboolean operations) { xmlNode *lrm_rsc = NULL; xmlNode *rsc_entry = NULL; xmlNode *node_state = NULL; xmlNode *cib_status = get_object_root(XML_CIB_TAG_STATUS, data_set->input); if (operations) { print_as("\nOperations:\n"); } else { print_as("\nMigration summary:\n"); } for (node_state = __xml_first_child(cib_status); node_state != NULL; node_state = __xml_next(node_state)) { if (crm_str_eq((const char *)node_state->name, XML_CIB_TAG_STATE, TRUE)) { node_t *node = pe_find_node_id(data_set->nodes, ID(node_state)); if (node == NULL || node->details->online == FALSE) { continue; } print_as("* Node %s: ", crm_element_value(node_state, XML_ATTR_UNAME)); print_as("\n"); lrm_rsc = find_xml_node(node_state, XML_CIB_TAG_LRM, FALSE); lrm_rsc = find_xml_node(lrm_rsc, XML_LRM_TAG_RESOURCES, FALSE); for (rsc_entry = __xml_first_child(lrm_rsc); rsc_entry != NULL; rsc_entry = __xml_next(rsc_entry)) { if (crm_str_eq((const char *)rsc_entry->name, XML_LRM_TAG_RESOURCE, TRUE)) { if (operations) { print_rsc_history(data_set, node, rsc_entry); } else { const char *rsc_id = crm_element_value(rsc_entry, XML_ATTR_ID); resource_t *rsc = pe_find_resource(data_set->resources, rsc_id); if (rsc) { print_rsc_summary(data_set, node, rsc, FALSE); } else { print_as(" %s: orphan\n", rsc_id); } } } } } } } static void print_ticket(gpointer name, gpointer value, gpointer data) { ticket_t *ticket = (ticket_t *) value; print_as(" %s\t%s%10s", ticket->id, ticket->granted ? "granted":"revoked", ticket->standby ? " [standby]":""); if (ticket->last_granted > -1) { print_as(" last-granted="); print_date(ticket->last_granted); } print_as("\n"); return; } static void print_cluster_tickets(pe_working_set_t * data_set) { xmlNode *cib_constraints = get_object_root(XML_CIB_TAG_CONSTRAINTS, data_set->input); /* For recording the tickets that are referenced in rsc_ticket constraints * but have never been granted yet. */ unpack_constraints(cib_constraints, data_set); print_as("\nTickets:\n"); g_hash_table_foreach(data_set->tickets, print_ticket, NULL); return; } static int print_status(pe_working_set_t * data_set) { static int updates = 0; GListPtr gIter = NULL; node_t *dc = NULL; char *since_epoch = NULL; char *online_nodes = NULL; char *offline_nodes = NULL; xmlNode *dc_version = NULL; xmlNode *quorum_node = NULL; xmlNode *stack = NULL; time_t a_time = time(NULL); int print_opts = pe_print_ncurses; const char *quorum_votes = "unknown"; if (as_console) { blank_screen(); } else { print_opts = pe_print_printf; } updates++; dc = data_set->dc_node; if (a_time == (time_t) - 1) { crm_perror(LOG_ERR, "set_node_tstamp(): Invalid time returned"); return 1; } since_epoch = ctime(&a_time); if (since_epoch != NULL && print_last_updated) { print_as("Last updated: %s", since_epoch); } if (print_last_change) { const char *last_written = crm_element_value(data_set->input, XML_CIB_ATTR_WRITTEN); const char *user = crm_element_value(data_set->input, XML_ATTR_UPDATE_USER); const char *client = crm_element_value(data_set->input, XML_ATTR_UPDATE_CLIENT); const char *origin = crm_element_value(data_set->input, XML_ATTR_UPDATE_ORIG); print_as("Last change: %s", last_written ? last_written : ""); if (user) { print_as(" by %s", user); } if (client) { print_as(" via %s", client); } if (origin) { print_as(" on %s", origin); } print_as("\n"); } stack = get_xpath_object("//nvpair[@name='cluster-infrastructure']", data_set->input, LOG_DEBUG); if (stack) { print_as("Stack: %s\n", crm_element_value(stack, XML_NVPAIR_ATTR_VALUE)); } dc_version = get_xpath_object("//nvpair[@name='dc-version']", data_set->input, LOG_DEBUG); if (dc == NULL) { print_as("Current DC: NONE\n"); } else { const char *quorum = crm_element_value(data_set->input, XML_ATTR_HAVE_QUORUM); if (safe_str_neq(dc->details->uname, dc->details->id)) { print_as("Current DC: %s (%s)", dc->details->uname, dc->details->id); } else { print_as("Current DC: %s", dc->details->uname); } print_as(" - partition %s quorum\n", crm_is_true(quorum) ? "with" : "WITHOUT"); if (dc_version) { print_as("Version: %s\n", crm_element_value(dc_version, XML_NVPAIR_ATTR_VALUE)); } } quorum_node = get_xpath_object("//nvpair[@name='" XML_ATTR_EXPECTED_VOTES "']", data_set->input, LOG_DEBUG); if (quorum_node) { quorum_votes = crm_element_value(quorum_node, XML_NVPAIR_ATTR_VALUE); } print_as("%d Nodes configured, %s expected votes\n", g_list_length(data_set->nodes), quorum_votes); print_as("%d Resources configured.\n", count_resources(data_set, NULL)); print_as("\n\n"); for (gIter = data_set->nodes; gIter != NULL; gIter = gIter->next) { node_t *node = (node_t *) gIter->data; const char *node_mode = NULL; if (node->details->unclean) { if (node->details->online && node->details->unclean) { node_mode = "UNCLEAN (online)"; } else if (node->details->pending) { node_mode = "UNCLEAN (pending)"; } else { node_mode = "UNCLEAN (offline)"; } } else if (node->details->pending) { node_mode = "pending"; } else if (node->details->standby_onfail && node->details->online) { node_mode = "standby (on-fail)"; } else if (node->details->standby) { if (node->details->online) { node_mode = "standby"; } else { node_mode = "OFFLINE (standby)"; } } else if (node->details->online) { node_mode = "online"; if (group_by_node == FALSE) { online_nodes = add_list_element(online_nodes, node->details->uname); continue; } } else { node_mode = "OFFLINE"; if (group_by_node == FALSE) { offline_nodes = add_list_element(offline_nodes, node->details->uname); continue; } } if (safe_str_eq(node->details->uname, node->details->id)) { print_as("Node %s: %s\n", node->details->uname, node_mode); } else { print_as("Node %s (%s): %s\n", node->details->uname, node->details->id, node_mode); } if (group_by_node) { GListPtr gIter2 = NULL; for (gIter2 = node->details->running_rsc; gIter2 != NULL; gIter2 = gIter2->next) { resource_t *rsc = (resource_t *) gIter2->data; rsc->fns->print(rsc, "\t", print_opts | pe_print_rsconly, stdout); } } } if (online_nodes) { print_as("Online: [%s ]\n", online_nodes); free(online_nodes); } if (offline_nodes) { print_as("OFFLINE: [%s ]\n", offline_nodes); free(offline_nodes); } if (group_by_node == FALSE && inactive_resources) { print_as("\nFull list of resources:\n"); } else if (inactive_resources) { print_as("\nInactive resources:\n"); } if (group_by_node == FALSE || inactive_resources) { print_as("\n"); for (gIter = data_set->resources; gIter != NULL; gIter = gIter->next) { resource_t *rsc = (resource_t *) gIter->data; gboolean is_active = rsc->fns->active(rsc, TRUE); gboolean partially_active = rsc->fns->active(rsc, FALSE); if (is_set(rsc->flags, pe_rsc_orphan) && is_active == FALSE) { continue; } else if (group_by_node == FALSE) { if (partially_active || inactive_resources) { rsc->fns->print(rsc, NULL, print_opts, stdout); } } else if (is_active == FALSE && inactive_resources) { rsc->fns->print(rsc, NULL, print_opts, stdout); } } } if (print_nodes_attr) { print_as("\nNode Attributes:\n"); for (gIter = data_set->nodes; gIter != NULL; gIter = gIter->next) { node_t *node = (node_t *) gIter->data; if (node == NULL || node->details->online == FALSE) { continue; } attr_list = NULL; print_as("* Node %s:\n", node->details->uname); g_hash_table_foreach(node->details->attrs, create_attr_list, NULL); g_list_foreach(attr_list, print_node_attribute, node); } } if (print_operations || print_failcount) { print_node_summary(data_set, print_operations); } if (xml_has_children(data_set->failed)) { xmlNode *xml_op = NULL; print_as("\nFailed actions:\n"); for (xml_op = __xml_first_child(data_set->failed); xml_op != NULL; xml_op = __xml_next(xml_op)) { int val = 0; const char *id = ID(xml_op); const char *op_key = crm_element_value(xml_op, XML_LRM_ATTR_TASK_KEY); const char *last = crm_element_value(xml_op, "last_run"); const char *node = crm_element_value(xml_op, XML_ATTR_UNAME); const char *call = crm_element_value(xml_op, XML_LRM_ATTR_CALLID); const char *rc = crm_element_value(xml_op, XML_LRM_ATTR_RC); const char *status = crm_element_value(xml_op, XML_LRM_ATTR_OPSTATUS); val = crm_parse_int(status, "0"); print_as(" %s (node=%s, call=%s, rc=%s, status=%s", op_key ? op_key : id, node, call, rc, services_lrm_status_str(val)); if (last) { time_t run_at = crm_parse_int(last, "0"); print_as(", last-run=%s, queued=%sms, exec=%sms\n", ctime(&run_at), crm_element_value(xml_op, "exec_time"), crm_element_value(xml_op, "queue_time")); } val = crm_parse_int(rc, "0"); print_as("): %s\n", lrmd_event_rc2str(val)); } } if (print_tickets) { print_cluster_tickets(data_set); } #if CURSES_ENABLED if (as_console) { refresh(); } #endif return 0; } static int print_xml_status(pe_working_set_t * data_set) { FILE *stream = stdout; GListPtr gIter = NULL; node_t *dc = NULL; xmlNode *stack = NULL; xmlNode *quorum_node = NULL; const char *quorum_votes = "unknown"; dc = data_set->dc_node; fprintf(stream, "\n"); fprintf(stream, "\n", VERSION); /*** SUMMARY ***/ fprintf(stream, " \n"); if (print_last_updated) { time_t now = time(NULL); char *now_str = ctime(&now); now_str[24] = EOS; /* replace the newline */ fprintf(stream, " \n", now_str); } if (print_last_change) { const char *last_written = crm_element_value(data_set->input, XML_CIB_ATTR_WRITTEN); const char *user = crm_element_value(data_set->input, XML_ATTR_UPDATE_USER); const char *client = crm_element_value(data_set->input, XML_ATTR_UPDATE_CLIENT); const char *origin = crm_element_value(data_set->input, XML_ATTR_UPDATE_ORIG); fprintf(stream, " \n", last_written ? last_written : "", user ? user : "", client ? client : "", origin ? origin : ""); } stack = get_xpath_object("//nvpair[@name='cluster-infrastructure']", data_set->input, LOG_DEBUG); if (stack) { fprintf(stream, " \n", crm_element_value(stack, XML_NVPAIR_ATTR_VALUE)); } if (!dc) { fprintf(stream, " \n"); } else { const char *quorum = crm_element_value(data_set->input, XML_ATTR_HAVE_QUORUM); const char *uname = dc->details->uname; const char *id = dc->details->id; xmlNode *dc_version = get_xpath_object("//nvpair[@name='dc-version']", data_set->input, LOG_DEBUG); fprintf(stream, " \n", dc_version ? crm_element_value(dc_version, XML_NVPAIR_ATTR_VALUE) : "", uname, id, quorum ? (crm_is_true(quorum) ? "true" : "false") : "false"); } quorum_node = get_xpath_object("//nvpair[@name='" XML_ATTR_EXPECTED_VOTES "']", data_set->input, LOG_DEBUG); if (quorum_node) { quorum_votes = crm_element_value(quorum_node, XML_NVPAIR_ATTR_VALUE); } fprintf(stream, " \n", g_list_length(data_set->nodes), quorum_votes); fprintf(stream, " \n", count_resources(data_set, NULL)); fprintf(stream, " \n"); /*** NODES ***/ fprintf(stream, " \n"); for (gIter = data_set->nodes; gIter != NULL; gIter = gIter->next) { node_t *node = (node_t *) gIter->data; const char *node_type = "unknown"; switch (node->details->type) { case node_member: node_type = "member"; break; case node_ping: node_type = "ping"; break; } fprintf(stream, " details->uname); fprintf(stream, "id=\"%s\" ", node->details->id); fprintf(stream, "online=\"%s\" ", node->details->online ? "true" : "false"); fprintf(stream, "standby=\"%s\" ", node->details->standby ? "true" : "false"); fprintf(stream, "standby_onfail=\"%s\" ", node->details->standby_onfail ? "true" : "false"); fprintf(stream, "pending=\"%s\" ", node->details->pending ? "true" : "false"); fprintf(stream, "unclean=\"%s\" ", node->details->unclean ? "true" : "false"); fprintf(stream, "shutdown=\"%s\" ", node->details->shutdown ? "true" : "false"); fprintf(stream, "expected_up=\"%s\" ", node->details->expected_up ? "true" : "false"); fprintf(stream, "is_dc=\"%s\" ", node->details->is_dc ? "true" : "false"); fprintf(stream, "resources_running=\"%d\" ", g_list_length(node->details->running_rsc)); fprintf(stream, "type=\"%s\" ", node_type); if (group_by_node) { GListPtr lpc2 = NULL; fprintf(stream, ">\n"); for (lpc2 = node->details->running_rsc; lpc2 != NULL; lpc2 = lpc2->next) { resource_t *rsc = (resource_t *) lpc2->data; rsc->fns->print(rsc, " ", pe_print_xml | pe_print_rsconly, stream); } fprintf(stream, " \n"); } else { fprintf(stream, "/>\n"); } } fprintf(stream, " \n"); /*** RESOURCES ***/ if (group_by_node == FALSE || inactive_resources) { fprintf(stream, " \n"); for (gIter = data_set->resources; gIter != NULL; gIter = gIter->next) { resource_t *rsc = (resource_t *) gIter->data; gboolean is_active = rsc->fns->active(rsc, TRUE); gboolean partially_active = rsc->fns->active(rsc, FALSE); if (is_set(rsc->flags, pe_rsc_orphan) && is_active == FALSE) { continue; } else if (group_by_node == FALSE) { if (partially_active || inactive_resources) { rsc->fns->print(rsc, " ", pe_print_xml, stream); } } else if (is_active == FALSE && inactive_resources) { rsc->fns->print(rsc, " ", pe_print_xml, stream); } } fprintf(stream, " \n"); } fprintf(stream, "\n"); fflush(stream); fclose(stream); return 0; } static int print_html_status(pe_working_set_t * data_set, const char *filename, gboolean web_cgi) { FILE *stream; GListPtr gIter = NULL; node_t *dc = NULL; static int updates = 0; char *filename_tmp = NULL; if (web_cgi) { stream = stdout; fprintf(stream, "Content-type: text/html\n\n"); } else { filename_tmp = crm_concat(filename, "tmp", '.'); stream = fopen(filename_tmp, "w"); if (stream == NULL) { crm_perror(LOG_ERR, "Cannot open %s for writing", filename_tmp); free(filename_tmp); return -1; } } updates++; dc = data_set->dc_node; fprintf(stream, ""); fprintf(stream, ""); fprintf(stream, "Cluster status"); /* content="%d;url=http://webdesign.about.com" */ fprintf(stream, "", reconnect_msec / 1000); fprintf(stream, ""); /*** SUMMARY ***/ fprintf(stream, "

Cluster summary

"); { char *now_str = NULL; time_t now = time(NULL); now_str = ctime(&now); now_str[24] = EOS; /* replace the newline */ fprintf(stream, "Last updated: %s
\n", now_str); } if (dc == NULL) { fprintf(stream, "Current DC: NONE
"); } else { fprintf(stream, "Current DC: %s (%s)
", dc->details->uname, dc->details->id); } fprintf(stream, "%d Nodes configured.
", g_list_length(data_set->nodes)); fprintf(stream, "%d Resources configured.
", count_resources(data_set, NULL)); /*** CONFIG ***/ fprintf(stream, "

Config Options

\n"); fprintf(stream, "\n"); fprintf(stream, "\n", is_set(data_set->flags, pe_flag_stonith_enabled) ? "enabled" : "disabled"); fprintf(stream, "\n", is_set(data_set->flags, pe_flag_symmetric_cluster) ? "" : "a-"); fprintf(stream, "\n
STONITH of failed nodes:%s
Cluster is:%ssymmetric
No Quorum Policy:"); switch (data_set->no_quorum_policy) { case no_quorum_freeze: fprintf(stream, "Freeze resources"); break; case no_quorum_stop: fprintf(stream, "Stop ALL resources"); break; case no_quorum_ignore: fprintf(stream, "Ignore"); break; case no_quorum_suicide: fprintf(stream, "Suicide"); break; } fprintf(stream, "\n
\n"); /*** NODE LIST ***/ fprintf(stream, "

Node List

\n"); fprintf(stream, "
    \n"); for (gIter = data_set->nodes; gIter != NULL; gIter = gIter->next) { node_t *node = (node_t *) gIter->data; fprintf(stream, "
  • "); if (node->details->standby_onfail && node->details->online) { fprintf(stream, "Node: %s (%s): %s", node->details->uname, node->details->id, "standby (on-fail)\n"); } else if (node->details->standby && node->details->online) { fprintf(stream, "Node: %s (%s): %s", node->details->uname, node->details->id, "standby\n"); } else if (node->details->standby) { fprintf(stream, "Node: %s (%s): %s", node->details->uname, node->details->id, "OFFLINE (standby)\n"); } else if (node->details->online) { fprintf(stream, "Node: %s (%s): %s", node->details->uname, node->details->id, "online\n"); } else { fprintf(stream, "Node: %s (%s): %s", node->details->uname, node->details->id, "OFFLINE\n"); } if (group_by_node) { GListPtr lpc2 = NULL; fprintf(stream, "
      \n"); for (lpc2 = node->details->running_rsc; lpc2 != NULL; lpc2 = lpc2->next) { resource_t *rsc = (resource_t *) lpc2->data; fprintf(stream, "
    • "); rsc->fns->print(rsc, NULL, pe_print_html | pe_print_rsconly, stream); fprintf(stream, "
    • \n"); } fprintf(stream, "
    \n"); } fprintf(stream, "
  • \n"); } fprintf(stream, "
\n"); if (group_by_node && inactive_resources) { fprintf(stream, "

Inactive Resources

\n"); } else if (group_by_node == FALSE) { fprintf(stream, "

Resource List

\n"); } if (group_by_node == FALSE || inactive_resources) { for (gIter = data_set->resources; gIter != NULL; gIter = gIter->next) { resource_t *rsc = (resource_t *) gIter->data; gboolean is_active = rsc->fns->active(rsc, TRUE); gboolean partially_active = rsc->fns->active(rsc, FALSE); if (is_set(rsc->flags, pe_rsc_orphan) && is_active == FALSE) { continue; } else if (group_by_node == FALSE) { if (partially_active || inactive_resources) { rsc->fns->print(rsc, NULL, pe_print_html, stream); } } else if (is_active == FALSE && inactive_resources) { rsc->fns->print(rsc, NULL, pe_print_html, stream); } } } fprintf(stream, ""); fflush(stream); fclose(stream); if (!web_cgi) { if (rename(filename_tmp, filename) != 0) { crm_perror(LOG_ERR, "Unable to rename %s->%s", filename_tmp, filename); } free(filename_tmp); } return 0; } #if ENABLE_SNMP # include # include # include # include # include # include # define add_snmp_field(list, oid_string, value) do { \ oid name[MAX_OID_LEN]; \ size_t name_length = MAX_OID_LEN; \ if (snmp_parse_oid(oid_string, name, &name_length)) { \ int s_rc = snmp_add_var(list, name, name_length, 's', (value)); \ if(s_rc != 0) { \ crm_err("Could not add %s=%s rc=%d", oid_string, value, s_rc); \ } else { \ crm_trace("Added %s=%s", oid_string, value); \ } \ } else { \ crm_err("Could not parse OID: %s", oid_string); \ } \ } while(0) \ # define add_snmp_field_int(list, oid_string, value) do { \ oid name[MAX_OID_LEN]; \ size_t name_length = MAX_OID_LEN; \ if (snmp_parse_oid(oid_string, name, &name_length)) { \ if(NULL == snmp_pdu_add_variable( \ list, name, name_length, ASN_INTEGER, \ (u_char *) & value, sizeof(value))) { \ crm_err("Could not add %s=%d", oid_string, value); \ } else { \ crm_trace("Added %s=%d", oid_string, value); \ } \ } else { \ crm_err("Could not parse OID: %s", oid_string); \ } \ } while(0) \ static int snmp_input(int operation, netsnmp_session * session, int reqid, netsnmp_pdu * pdu, void *magic) { return 1; } static netsnmp_session * crm_snmp_init(const char *target, char *community) { static netsnmp_session *session = NULL; # ifdef NETSNMPV53 char target53[128]; snprintf(target53, sizeof(target53), "%s:162", target); # endif if (session) { return session; } if (target == NULL) { return NULL; } if (get_crm_log_level() > LOG_INFO) { char *debug_tokens = strdup("run:shell,snmptrap,tdomain"); debug_register_tokens(debug_tokens); snmp_set_do_debugging(1); } session = calloc(1, sizeof(netsnmp_session)); snmp_sess_init(session); session->version = SNMP_VERSION_2c; session->callback = snmp_input; session->callback_magic = NULL; if (community) { session->community_len = strlen(community); session->community = (unsigned char *)community; } session = snmp_add(session, # ifdef NETSNMPV53 netsnmp_tdomain_transport(target53, 0, "udp"), # else netsnmp_transport_open_client("snmptrap", target), # endif NULL, NULL); if (session == NULL) { snmp_sess_perror("Could not create snmp transport", session); } return session; } #endif static int send_snmp_trap(const char *node, const char *rsc, const char *task, int target_rc, int rc, int status, const char *desc) { int ret = 1; #if ENABLE_SNMP static oid snmptrap_oid[] = { 1, 3, 6, 1, 6, 3, 1, 1, 4, 1, 0 }; static oid sysuptime_oid[] = { 1, 3, 6, 1, 2, 1, 1, 3, 0 }; netsnmp_pdu *trap_pdu; netsnmp_session *session = crm_snmp_init(snmp_target, snmp_community); trap_pdu = snmp_pdu_create(SNMP_MSG_TRAP2); if (!trap_pdu) { crm_err("Failed to create SNMP notification"); return SNMPERR_GENERR; } if (1) { /* send uptime */ char csysuptime[20]; time_t now = time(NULL); sprintf(csysuptime, "%ld", now); snmp_add_var(trap_pdu, sysuptime_oid, sizeof(sysuptime_oid) / sizeof(oid), 't', csysuptime); } /* Indicate what the trap is by setting snmpTrapOid.0 */ ret = snmp_add_var(trap_pdu, snmptrap_oid, sizeof(snmptrap_oid) / sizeof(oid), 'o', snmp_crm_trap_oid); if (ret != 0) { crm_err("Failed set snmpTrapOid.0=%s", snmp_crm_trap_oid); return ret; } /* Add extries to the trap */ if (rsc) { add_snmp_field(trap_pdu, snmp_crm_oid_rsc, rsc); } add_snmp_field(trap_pdu, snmp_crm_oid_node, node); add_snmp_field(trap_pdu, snmp_crm_oid_task, task); add_snmp_field(trap_pdu, snmp_crm_oid_desc, desc); add_snmp_field_int(trap_pdu, snmp_crm_oid_rc, rc); add_snmp_field_int(trap_pdu, snmp_crm_oid_trc, target_rc); add_snmp_field_int(trap_pdu, snmp_crm_oid_status, status); /* Send and cleanup */ ret = snmp_send(session, trap_pdu); if (ret == 0) { /* error */ snmp_sess_perror("Could not send SNMP trap", session); snmp_free_pdu(trap_pdu); ret = SNMPERR_GENERR; } else { ret = SNMPERR_SUCCESS; } #else crm_err("Sending SNMP traps is not supported by this installation"); #endif return ret; } #if ENABLE_ESMTP # include # include static void print_recipient_status(smtp_recipient_t recipient, const char *mailbox, void *arg) { const smtp_status_t *status; status = smtp_recipient_status(recipient); printf("%s: %d %s", mailbox, status->code, status->text); } static void event_cb(smtp_session_t session, int event_no, void *arg, ...) { int *ok; va_list alist; va_start(alist, arg); switch (event_no) { case SMTP_EV_CONNECT: case SMTP_EV_MAILSTATUS: case SMTP_EV_RCPTSTATUS: case SMTP_EV_MESSAGEDATA: case SMTP_EV_MESSAGESENT: case SMTP_EV_DISCONNECT: break; case SMTP_EV_WEAK_CIPHER:{ int bits = va_arg(alist, long); ok = va_arg(alist, int *); crm_debug("SMTP_EV_WEAK_CIPHER, bits=%d - accepted.", bits); *ok = 1; break; } case SMTP_EV_STARTTLS_OK: crm_debug("SMTP_EV_STARTTLS_OK - TLS started here."); break; case SMTP_EV_INVALID_PEER_CERTIFICATE:{ long vfy_result = va_arg(alist, long); ok = va_arg(alist, int *); /* There is a table in handle_invalid_peer_certificate() of mail-file.c */ crm_err("SMTP_EV_INVALID_PEER_CERTIFICATE: %ld", vfy_result); *ok = 1; break; } case SMTP_EV_NO_PEER_CERTIFICATE: ok = va_arg(alist, int *); crm_debug("SMTP_EV_NO_PEER_CERTIFICATE - accepted."); *ok = 1; break; case SMTP_EV_WRONG_PEER_CERTIFICATE: ok = va_arg(alist, int *); crm_debug("SMTP_EV_WRONG_PEER_CERTIFICATE - accepted."); *ok = 1; break; case SMTP_EV_NO_CLIENT_CERTIFICATE: ok = va_arg(alist, int *); crm_debug("SMTP_EV_NO_CLIENT_CERTIFICATE - accepted."); *ok = 1; break; default: crm_debug("Got event: %d - ignored.\n", event_no); } va_end(alist); } #endif #define BODY_MAX 2048 #if ENABLE_ESMTP static void crm_smtp_debug(const char *buf, int buflen, int writing, void *arg) { char type = 0; int lpc = 0, last = 0, level = *(int *)arg; if (writing == SMTP_CB_HEADERS) { type = 'H'; } else if (writing) { type = 'C'; } else { type = 'S'; } for (; lpc < buflen; lpc++) { switch (buf[lpc]) { case 0: case '\n': if (last > 0) { do_crm_log(level, " %.*s", lpc - last, buf + last); } else { do_crm_log(level, "%c: %.*s", type, lpc - last, buf + last); } last = lpc + 1; break; } } } #endif static int send_custom_trap(const char *node, const char *rsc, const char *task, int target_rc, int rc, int status, const char *desc) { pid_t pid; /*setenv needs chars, these are ints */ char *rc_s = crm_itoa(rc); char *status_s = crm_itoa(status); char *target_rc_s = crm_itoa(target_rc); crm_debug("Sending external notification to '%s' via '%s'", external_recipient, external_agent); setenv("CRM_notify_recipient", external_recipient, 1); setenv("CRM_notify_node", node, 1); setenv("CRM_notify_rsc", rsc, 1); setenv("CRM_notify_task", task, 1); setenv("CRM_notify_desc", desc, 1); setenv("CRM_notify_rc", rc_s, 1); setenv("CRM_notify_target_rc", target_rc_s, 1); setenv("CRM_notify_status", status_s, 1); pid = fork(); if (pid == -1) { crm_perror(LOG_ERR, "notification fork() failed."); } if (pid == 0) { /* crm_debug("notification: I am the child. Executing the nofitication program."); */ execl(external_agent, external_agent, NULL); } crm_trace("Finished running custom notification program '%s'.", external_agent); free(target_rc_s); free(status_s); free(rc_s); return 0; } static int send_smtp_trap(const char *node, const char *rsc, const char *task, int target_rc, int rc, int status, const char *desc) { #if ENABLE_ESMTP smtp_session_t session; smtp_message_t message; auth_context_t authctx; struct sigaction sa; int len = 20; int noauth = 1; int smtp_debug = LOG_DEBUG; char crm_mail_body[BODY_MAX]; char *crm_mail_subject = NULL; memset(&sa, 0, sizeof(struct sigaction)); if (node == NULL) { node = "-"; } if (rsc == NULL) { rsc = "-"; } if (desc == NULL) { desc = "-"; } if (crm_mail_to == NULL) { return 1; } if (crm_mail_host == NULL) { crm_mail_host = "localhost:25"; } if (crm_mail_prefix == NULL) { crm_mail_prefix = "Cluster notification"; } crm_debug("Sending '%s' mail to %s via %s", crm_mail_prefix, crm_mail_to, crm_mail_host); len += strlen(crm_mail_prefix); len += strlen(task); len += strlen(rsc); len += strlen(node); len += strlen(desc); len++; crm_mail_subject = calloc(1, len); snprintf(crm_mail_subject, len, "%s - %s event for %s on %s: %s\r\n", crm_mail_prefix, task, rsc, node, desc); len = 0; len += snprintf(crm_mail_body + len, BODY_MAX - len, "\r\n%s\r\n", crm_mail_prefix); len += snprintf(crm_mail_body + len, BODY_MAX - len, "====\r\n\r\n"); if (rc == target_rc) { len += snprintf(crm_mail_body + len, BODY_MAX - len, "Completed operation %s for resource %s on %s\r\n", task, rsc, node); } else { len += snprintf(crm_mail_body + len, BODY_MAX - len, "Operation %s for resource %s on %s failed: %s\r\n", task, rsc, node, desc); } len += snprintf(crm_mail_body + len, BODY_MAX - len, "\r\nDetails:\r\n"); len += snprintf(crm_mail_body + len, BODY_MAX - len, "\toperation status: (%d) %s\r\n", status, services_lrm_status_str(status)); if (status == PCMK_LRM_OP_DONE) { len += snprintf(crm_mail_body + len, BODY_MAX - len, "\tscript returned: (%d) %s\r\n", rc, lrmd_event_rc2str(rc)); len += snprintf(crm_mail_body + len, BODY_MAX - len, "\texpected return value: (%d) %s\r\n", target_rc, lrmd_event_rc2str(target_rc)); } auth_client_init(); session = smtp_create_session(); message = smtp_add_message(session); smtp_starttls_enable(session, Starttls_ENABLED); sa.sa_handler = SIG_IGN; sigemptyset(&sa.sa_mask); sa.sa_flags = 0; sigaction(SIGPIPE, &sa, NULL); smtp_set_server(session, crm_mail_host); authctx = auth_create_context(); auth_set_mechanism_flags(authctx, AUTH_PLUGIN_PLAIN, 0); smtp_set_eventcb(session, event_cb, NULL); /* Now tell libESMTP it can use the SMTP AUTH extension. */ if (!noauth) { crm_debug("Adding authentication context"); smtp_auth_set_context(session, authctx); } if (crm_mail_from == NULL) { struct utsname us; char auto_from[BODY_MAX]; CRM_ASSERT(uname(&us) == 0); snprintf(auto_from, BODY_MAX, "crm_mon@%s", us.nodename); smtp_set_reverse_path(message, auto_from); } else { /* NULL is ok */ smtp_set_reverse_path(message, crm_mail_from); } smtp_set_header(message, "To", NULL /*phrase */ , NULL /*addr */ ); /* "Phrase" */ smtp_add_recipient(message, crm_mail_to); /* Set the Subject: header and override any subject line in the message headers. */ smtp_set_header(message, "Subject", crm_mail_subject); smtp_set_header_option(message, "Subject", Hdr_OVERRIDE, 1); smtp_set_message_str(message, crm_mail_body); smtp_set_monitorcb(session, crm_smtp_debug, &smtp_debug, 1); if (smtp_start_session(session)) { char buf[128]; int rc = smtp_errno(); crm_err("SMTP server problem: %s (%d)", smtp_strerror(rc, buf, sizeof buf), rc); } else { char buf[128]; int rc = smtp_errno(); const smtp_status_t *smtp_status = smtp_message_transfer_status(message); if (rc != 0) { crm_err("SMTP server problem: %s (%d)", smtp_strerror(rc, buf, sizeof buf), rc); } crm_info("Send status: %d %s", smtp_status->code, crm_str(smtp_status->text)); smtp_enumerate_recipients(message, print_recipient_status, NULL); } smtp_destroy_session(session); auth_destroy_context(authctx); auth_client_exit(); #endif return 0; } static void handle_rsc_op(xmlNode * rsc_op) { int rc = -1; int status = -1; int action = -1; int interval = 0; int target_rc = -1; int transition_num = -1; gboolean notify = TRUE; char *rsc = NULL; char *task = NULL; const char *desc = NULL; const char *node = NULL; const char *magic = NULL; const char *id = crm_element_value(rsc_op, XML_LRM_ATTR_TASK_KEY); char *update_te_uuid = NULL; xmlNode *n = rsc_op; if (id == NULL) { /* Compatability with <= 1.1.5 */ id = ID(rsc_op); } magic = crm_element_value(rsc_op, XML_ATTR_TRANSITION_MAGIC); if (magic == NULL) { /* non-change */ return; } if (FALSE == decode_transition_magic(magic, &update_te_uuid, &transition_num, &action, &status, &rc, &target_rc)) { crm_err("Invalid event %s detected for %s", magic, id); return; } if (parse_op_key(id, &rsc, &task, &interval) == FALSE) { crm_err("Invalid event detected for %s", id); goto bail; } while (n != NULL && safe_str_neq(XML_CIB_TAG_STATE, TYPE(n))) { n = n->parent; } node = crm_element_value(n, XML_ATTR_UNAME); if (node == NULL) { node = ID(n); } if (node == NULL) { crm_err("No node detected for event %s (%s)", magic, id); goto bail; } /* look up where we expected it to be? */ desc = pcmk_strerror(pcmk_ok); if (status == PCMK_LRM_OP_DONE && target_rc == rc) { crm_notice("%s of %s on %s completed: %s", task, rsc, node, desc); if (rc == PCMK_EXECRA_NOT_RUNNING) { notify = FALSE; } } else if (status == PCMK_LRM_OP_DONE) { desc = lrmd_event_rc2str(rc); crm_warn("%s of %s on %s failed: %s", task, rsc, node, desc); } else { desc = services_lrm_status_str(status); crm_warn("%s of %s on %s failed: %s", task, rsc, node, desc); } if (notify && snmp_target) { send_snmp_trap(node, rsc, task, target_rc, rc, status, desc); } if (notify && crm_mail_to) { send_smtp_trap(node, rsc, task, target_rc, rc, status, desc); } if (notify && external_agent) { send_custom_trap(node, rsc, task, target_rc, rc, status, desc); } bail: free(update_te_uuid); free(rsc); free(task); } void crm_diff_update(const char *event, xmlNode * msg) { int rc = -1; long now = time(NULL); const char *op = NULL; print_dot(); if (current_cib != NULL) { xmlNode *cib_last = current_cib; current_cib = NULL; rc = cib_apply_patch_event(msg, cib_last, ¤t_cib, LOG_DEBUG); free_xml(cib_last); switch(rc) { case pcmk_err_diff_resync: case pcmk_err_diff_failed: crm_warn("[%s] %s Patch aborted: %s (%d)", event, op, pcmk_strerror(rc), rc); case pcmk_ok: break; default: crm_warn("[%s] %s ABORTED: %s (%d)", event, op, pcmk_strerror(rc), rc); return; } } if (current_cib == NULL) { current_cib = get_cib_copy(cib); } if (crm_mail_to || snmp_target || external_agent) { /* Process operation updates */ xmlXPathObject *xpathObj = xpath_search(msg, "//" F_CIB_UPDATE_RESULT "//" XML_TAG_DIFF_ADDED "//" XML_LRM_TAG_RSC_OP); if (xpathObj && xpathObj->nodesetval->nodeNr > 0) { int lpc = 0, max = xpathObj->nodesetval->nodeNr; for (lpc = 0; lpc < max; lpc++) { xmlNode *rsc_op = getXpathResult(xpathObj, lpc); handle_rsc_op(rsc_op); } } if (xpathObj) { xmlXPathFreeObject(xpathObj); } } if ((now - last_refresh) > (reconnect_msec / 1000)) { /* Force a refresh */ mon_refresh_display(NULL); } else { mainloop_set_trigger(refresh_trigger); } } gboolean mon_refresh_display(gpointer user_data) { xmlNode *cib_copy = copy_xml(current_cib); pe_working_set_t data_set; last_refresh = time(NULL); if (cli_config_update(&cib_copy, NULL, FALSE) == FALSE) { if (cib) { cib->cmds->signoff(cib); } print_as("Upgrade failed: %s", pcmk_strerror(-pcmk_err_dtd_validation)); if (as_console) { sleep(2); } clean_up(EX_USAGE); return FALSE; } set_working_set_defaults(&data_set); data_set.input = cib_copy; cluster_status(&data_set); if (as_html_file || web_cgi) { if (print_html_status(&data_set, as_html_file, web_cgi) != 0) { fprintf(stderr, "Critical: Unable to output html file\n"); clean_up(EX_USAGE); } } else if (as_xml) { if (print_xml_status(&data_set) != 0) { fprintf(stderr, "Critical: Unable to output xml file\n"); clean_up(EX_USAGE); } } else if (daemonize) { /* do nothing */ } else if (simple_status) { print_simple_status(&data_set); if (has_warnings) { clean_up(EX_USAGE); } } else { print_status(&data_set); } cleanup_calculations(&data_set); return TRUE; } void mon_st_callback(stonith_t *st, stonith_event_t *e) { char *desc = g_strdup_printf( "Operation %s requested by %s for peer %s: %s (ref=%s)", e->operation, e->origin, e->target, pcmk_strerror(e->result), e->id); if (snmp_target) { send_snmp_trap(e->target, NULL, e->operation, pcmk_ok, e->result, 0, desc); } if (crm_mail_to) { send_smtp_trap(e->target, NULL, e->operation, pcmk_ok, e->result, 0, desc); } if (external_agent) { send_custom_trap(e->target, NULL, e->operation, pcmk_ok, e->result, 0, desc); } g_free(desc); } /* * De-init ncurses, signoff from the CIB and deallocate memory. */ void clean_up(int rc) { #if ENABLE_SNMP netsnmp_session *session = crm_snmp_init(NULL, NULL); if (session) { snmp_close(session); snmp_shutdown("snmpapp"); } #endif #if CURSES_ENABLED if (as_console) { as_console = FALSE; echo(); nocbreak(); endwin(); } #endif if (cib != NULL) { cib->cmds->signoff(cib); cib_delete(cib); cib = NULL; } free(as_html_file); free(xml_file); free(pid_file); if (rc >= 0) { crm_exit(rc); } return; }