diff --git a/daemons/pacemakerd/pcmkd_corosync.c b/daemons/pacemakerd/pcmkd_corosync.c index c73a1f56cd..65595facd9 100644 --- a/daemons/pacemakerd/pcmkd_corosync.c +++ b/daemons/pacemakerd/pcmkd_corosync.c @@ -1,236 +1,287 @@ /* - * Copyright 2010-2018 Andrew Beekhof + * Copyright 2010-2019 the Pacemaker project contributors + * + * The version control history for this file may have further details. * * This source code is licensed under the GNU General Public License version 2 * or later (GPLv2+) WITHOUT ANY WARRANTY. */ #include #include "pacemakerd.h" #include #include /* for calls to stat() */ #include /* For basename() and dirname() */ #include #include /* For getpwname() */ #include #include #include #include #include +#include /* for crm_ipc_is_authentic_process */ #include +#include /* PCMK__SPECIAL_PID* */ + enum cluster_type_e stack = pcmk_cluster_unknown; static corosync_cfg_handle_t cfg_handle; /* =::=::=::= CFG - Shutdown stuff =::=::=::= */ static void cfg_shutdown_callback(corosync_cfg_handle_t h, corosync_cfg_shutdown_flags_t flags) { crm_info("Corosync wants to shut down: %s", (flags == COROSYNC_CFG_SHUTDOWN_FLAG_IMMEDIATE) ? "immediate" : (flags == COROSYNC_CFG_SHUTDOWN_FLAG_REGARDLESS) ? "forced" : "optional"); /* Never allow corosync to shut down while we're running */ corosync_cfg_replyto_shutdown(h, COROSYNC_CFG_SHUTDOWN_FLAG_NO); } static corosync_cfg_callbacks_t cfg_callbacks = { .corosync_cfg_shutdown_callback = cfg_shutdown_callback, }; static int pcmk_cfg_dispatch(gpointer user_data) { corosync_cfg_handle_t *handle = (corosync_cfg_handle_t *) user_data; cs_error_t rc = corosync_cfg_dispatch(*handle, CS_DISPATCH_ALL); if (rc != CS_OK) { return -1; } return 0; } static void cfg_connection_destroy(gpointer user_data) { crm_err("Connection destroyed"); cfg_handle = 0; pcmk_shutdown(SIGTERM); } gboolean cluster_disconnect_cfg(void) { if (cfg_handle) { corosync_cfg_finalize(cfg_handle); cfg_handle = 0; } pcmk_shutdown(SIGTERM); return TRUE; } #define cs_repeat(counter, max, code) do { \ code; \ if(rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { \ counter++; \ crm_debug("Retrying operation after %ds", counter); \ sleep(counter); \ } else { \ break; \ } \ } while(counter < max) gboolean cluster_connect_cfg(uint32_t * nodeid) { cs_error_t rc; - int fd = 0, retries = 0; + int fd = -1, retries = 0, rv; + uid_t found_uid = 0; + gid_t found_gid = 0; + pid_t found_pid = 0; static struct mainloop_fd_callbacks cfg_fd_callbacks = { .dispatch = pcmk_cfg_dispatch, .destroy = cfg_connection_destroy, }; cs_repeat(retries, 30, rc = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)); if (rc != CS_OK) { - crm_err("corosync cfg init error %d", rc); + crm_err("corosync cfg init: %s (%d)", cs_strerror(rc), rc); return FALSE; } rc = corosync_cfg_fd_get(cfg_handle, &fd); if (rc != CS_OK) { - crm_err("corosync cfg fd_get error %d", rc); + crm_err("corosync cfg fd_get: %s (%d)", cs_strerror(rc), rc); + goto bail; + } + + /* CFG provider run as root (in given user namespace, anyway)? */ + if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid, + &found_uid, &found_gid))) { + crm_err("CFG provider is not authentic:" + " process %lld (uid: %lld, gid: %lld)", + (long long) PCMK__SPECIAL_PID_AS_0(found_pid), + (long long) found_uid, (long long) found_gid); + goto bail; + } else if (rv < 0) { + crm_err("Could not verify authenticity of CFG provider: %s (%d)", + strerror(-rv), -rv); goto bail; } retries = 0; cs_repeat(retries, 30, rc = corosync_cfg_local_get(cfg_handle, nodeid)); if (rc != CS_OK) { crm_err("corosync cfg local_get error %d", rc); goto bail; } crm_debug("Our nodeid: %d", *nodeid); mainloop_add_fd("corosync-cfg", G_PRIORITY_DEFAULT, fd, &cfg_handle, &cfg_fd_callbacks); return TRUE; bail: corosync_cfg_finalize(cfg_handle); return FALSE; } /* =::=::=::= Configuration =::=::=::= */ static int get_config_opt(uint64_t unused, cmap_handle_t object_handle, const char *key, char **value, const char *fallback) { int rc = 0, retries = 0; cs_repeat(retries, 5, rc = cmap_get_string(object_handle, key, value)); if (rc != CS_OK) { crm_trace("Search for %s failed %d, defaulting to %s", key, rc, fallback); if (fallback) { *value = strdup(fallback); } else { *value = NULL; } } crm_trace("%s: %s", key, *value); return rc; } gboolean mcp_read_config(void) { - int rc = CS_OK; + cs_error_t rc = CS_OK; int retries = 0; cmap_handle_t local_handle; uint64_t config = 0; + int fd = -1; + uid_t found_uid = 0; + gid_t found_gid = 0; + pid_t found_pid = 0; + int rv; // There can be only one possibility do { rc = cmap_initialize(&local_handle); if (rc != CS_OK) { retries++; printf("cmap connection setup failed: %s. Retrying in %ds\n", cs_strerror(rc), retries); crm_info("cmap connection setup failed: %s. Retrying in %ds", cs_strerror(rc), retries); sleep(retries); } else { break; } } while (retries < 5); if (rc != CS_OK) { printf("Could not connect to Cluster Configuration Database API, error %d\n", rc); crm_warn("Could not connect to Cluster Configuration Database API, error %d", rc); return FALSE; } + rc = cmap_fd_get(local_handle, &fd); + if (rc != CS_OK) { + crm_err("Could not obtain the CMAP API connection: %s (%d)", + cs_strerror(rc), rc); + cmap_finalize(local_handle); + return FALSE; + } + + /* CMAP provider run as root (in given user namespace, anyway)? */ + if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid, + &found_uid, &found_gid))) { + crm_err("CMAP provider is not authentic:" + " process %lld (uid: %lld, gid: %lld)", + (long long) PCMK__SPECIAL_PID_AS_0(found_pid), + (long long) found_uid, (long long) found_gid); + cmap_finalize(local_handle); + return FALSE; + } else if (rv < 0) { + crm_err("Could not verify authenticity of CMAP provider: %s (%d)", + strerror(-rv), -rv); + cmap_finalize(local_handle); + return FALSE; + } + stack = get_cluster_type(); crm_info("Reading configure for stack: %s", name_for_cluster_type(stack)); /* =::=::= Should we be here =::=::= */ if (stack == pcmk_cluster_corosync) { set_daemon_option("cluster_type", "corosync"); set_daemon_option("quorum_type", "corosync"); } else { crm_err("Unsupported stack type: %s", name_for_cluster_type(stack)); return FALSE; } /* =::=::= Logging =::=::= */ if (daemon_option("debug")) { /* Syslog logging is already setup by crm_log_init() */ } else { /* Check corosync */ char *debug_enabled = NULL; get_config_opt(config, local_handle, "logging.debug", &debug_enabled, "off"); if (crm_is_true(debug_enabled)) { set_daemon_option("debug", "1"); if (get_crm_log_level() < LOG_DEBUG) { set_crm_log_level(LOG_DEBUG); } } else { set_daemon_option("debug", "0"); } free(debug_enabled); } if(local_handle){ gid_t gid = 0; if (crm_user_lookup(CRM_DAEMON_USER, NULL, &gid) < 0) { crm_warn("Could not authorize group with corosync " CRM_XS " No group found for user %s", CRM_DAEMON_USER); } else { char key[PATH_MAX]; snprintf(key, PATH_MAX, "uidgid.gid.%u", gid); rc = cmap_set_uint8(local_handle, key, 1); if (rc != CS_OK) { crm_warn("Could not authorize group with corosync "CRM_XS " group=%u rc=%d (%s)", gid, rc, ais_error2text(rc)); } } } cmap_finalize(local_handle); return TRUE; } diff --git a/lib/cluster/corosync.c b/lib/cluster/corosync.c index fcdfc0e412..648c0d534f 100644 --- a/lib/cluster/corosync.c +++ b/lib/cluster/corosync.c @@ -1,645 +1,767 @@ /* - * Copyright (C) 2004 Andrew Beekhof + * Copyright 2004-2019 the Pacemaker project contributors * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 2.1 of the License, or (at your option) any later version. + * The version control history for this file may have further details. * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * This source code is licensed under the GNU Lesser General Public License + * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY. */ #include #include #include #include #include #include /* U64T ~ PRIu64 */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include +#include /* PCMK__SPECIAL_PID* */ + quorum_handle_t pcmk_quorum_handle = 0; gboolean(*quorum_app_callback) (unsigned long long seq, gboolean quorate) = NULL; char * get_corosync_uuid(crm_node_t *node) { if (node && is_corosync_cluster()) { if (node->id > 0) { return crm_strdup_printf("%u", node->id); } else { crm_info("Node %s is not yet known by corosync", node->uname); } } return NULL; } /* * CFG functionality stolen from node_name() in corosync-quorumtool.c * This resolves the first address assigned to a node and returns the name or IP address. */ char * corosync_node_name(uint64_t /*cmap_handle_t */ cmap_handle, uint32_t nodeid) { int lpc = 0; - int rc = CS_OK; + cs_error_t rc = CS_OK; int retries = 0; char *name = NULL; cmap_handle_t local_handle = 0; + int fd = -1; + uid_t found_uid = 0; + gid_t found_gid = 0; + pid_t found_pid = 0; + int rv; if (nodeid == 0) { nodeid = get_local_nodeid(0); } if (cmap_handle == 0 && local_handle == 0) { retries = 0; crm_trace("Initializing CMAP connection"); do { rc = cmap_initialize(&local_handle); if (rc != CS_OK) { retries++; crm_debug("API connection setup failed: %s. Retrying in %ds", cs_strerror(rc), retries); sleep(retries); } } while (retries < 5 && rc != CS_OK); if (rc != CS_OK) { crm_warn("Could not connect to Cluster Configuration Database API, error %s", cs_strerror(rc)); local_handle = 0; } } if (cmap_handle == 0) { cmap_handle = local_handle; + + rc = cmap_fd_get(cmap_handle, &fd); + if (rc != CS_OK) { + crm_err("Could not obtain the CMAP API connection: %s (%d)", + cs_strerror(rc), rc); + goto bail; + } + + /* CMAP provider run as root (in given user namespace, anyway)? */ + if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid, + &found_uid, &found_gid))) { + crm_err("CMAP provider is not authentic:" + " process %lld (uid: %lld, gid: %lld)", + (long long) PCMK__SPECIAL_PID_AS_0(found_pid), + (long long) found_uid, (long long) found_gid); + goto bail; + } else if (rv < 0) { + crm_err("Could not verify authenticity of CMAP provider: %s (%d)", + strerror(-rv), -rv); + goto bail; + } } while (name == NULL && cmap_handle != 0) { uint32_t id = 0; char *key = NULL; key = crm_strdup_printf("nodelist.node.%d.nodeid", lpc); rc = cmap_get_uint32(cmap_handle, key, &id); crm_trace("Checking %u vs %u from %s", nodeid, id, key); free(key); if (rc != CS_OK) { break; } if (nodeid == id) { crm_trace("Searching for node name for %u in nodelist.node.%d %s", nodeid, lpc, name); if (name == NULL) { key = crm_strdup_printf("nodelist.node.%d.name", lpc); cmap_get_string(cmap_handle, key, &name); crm_trace("%s = %s", key, name); free(key); } if (name == NULL) { key = crm_strdup_printf("nodelist.node.%d.ring0_addr", lpc); cmap_get_string(cmap_handle, key, &name); crm_trace("%s = %s", key, name); if (node_name_is_valid(key, name) == FALSE) { free(name); name = NULL; } free(key); } break; } lpc++; } +bail: if(local_handle) { cmap_finalize(local_handle); } if (name == NULL) { crm_info("Unable to get node name for nodeid %u", nodeid); } return name; } void terminate_cs_connection(crm_cluster_t *cluster) { cluster_disconnect_cpg(cluster); if (pcmk_quorum_handle) { quorum_finalize(pcmk_quorum_handle); pcmk_quorum_handle = 0; } crm_notice("Disconnected from Corosync"); } static int pcmk_quorum_dispatch(gpointer user_data) { int rc = 0; rc = quorum_dispatch(pcmk_quorum_handle, CS_DISPATCH_ALL); if (rc < 0) { crm_err("Connection to the Quorum API failed: %d", rc); pcmk_quorum_handle = 0; return -1; } return 0; } static void pcmk_quorum_notification(quorum_handle_t handle, uint32_t quorate, uint64_t ring_id, uint32_t view_list_entries, uint32_t * view_list) { int i; GHashTableIter iter; crm_node_t *node = NULL; static gboolean init_phase = TRUE; if (quorate != crm_have_quorum) { if (quorate) { crm_notice("Quorum acquired " CRM_XS " membership=%" U64T " members=%lu", ring_id, (long unsigned int)view_list_entries); } else { crm_warn("Quorum lost " CRM_XS " membership=%" U64T " members=%lu", ring_id, (long unsigned int)view_list_entries); } crm_have_quorum = quorate; } else { crm_info("Quorum %s " CRM_XS " membership=%" U64T " members=%lu", (quorate? "retained" : "still lost"), ring_id, (long unsigned int)view_list_entries); } if (view_list_entries == 0 && init_phase) { crm_info("Corosync membership is still forming, ignoring"); return; } init_phase = FALSE; /* Reset last_seen for all cached nodes so we can tell which ones aren't * in the view list */ g_hash_table_iter_init(&iter, crm_peer_cache); while (g_hash_table_iter_next(&iter, NULL, (gpointer *) &node)) { node->last_seen = 0; } /* Update the peer cache for each node in view list */ for (i = 0; i < view_list_entries; i++) { uint32_t id = view_list[i]; crm_debug("Member[%d] %u ", i, id); /* Get this node's peer cache entry (adding one if not already there) */ node = crm_get_peer(id, NULL); if (node->uname == NULL) { char *name = corosync_node_name(0, id); crm_info("Obtaining name for new node %u", id); node = crm_get_peer(id, name); free(name); } /* Update the node state (including updating last_seen to ring_id) */ crm_update_peer_state(__FUNCTION__, node, CRM_NODE_MEMBER, ring_id); } /* Remove any peer cache entries we didn't update */ crm_reap_unseen_nodes(ring_id); if (quorum_app_callback) { quorum_app_callback(ring_id, quorate); } } quorum_callbacks_t quorum_callbacks = { .quorum_notify_fn = pcmk_quorum_notification, }; gboolean cluster_connect_quorum(gboolean(*dispatch) (unsigned long long, gboolean), void (*destroy) (gpointer)) { - int rc = -1; + cs_error_t rc; int fd = 0; int quorate = 0; uint32_t quorum_type = 0; struct mainloop_fd_callbacks quorum_fd_callbacks; + uid_t found_uid = 0; + gid_t found_gid = 0; + pid_t found_pid = 0; + int rv; quorum_fd_callbacks.dispatch = pcmk_quorum_dispatch; quorum_fd_callbacks.destroy = destroy; crm_debug("Configuring Pacemaker to obtain quorum from Corosync"); rc = quorum_initialize(&pcmk_quorum_handle, &quorum_callbacks, &quorum_type); if (rc != CS_OK) { - crm_err("Could not connect to the Quorum API: %d", rc); + crm_err("Could not connect to the Quorum API: %s (%d)", + cs_strerror(rc), rc); goto bail; } else if (quorum_type != QUORUM_SET) { crm_err("Corosync quorum is not configured"); goto bail; } + rc = quorum_fd_get(pcmk_quorum_handle, &fd); + if (rc != CS_OK) { + crm_err("Could not obtain the Quorum API connection: %s (%d)", + strerror(rc), rc); + goto bail; + } + + /* Quorum provider run as root (in given user namespace, anyway)? */ + if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid, + &found_uid, &found_gid))) { + crm_err("Quorum provider is not authentic:" + " process %lld (uid: %lld, gid: %lld)", + (long long) PCMK__SPECIAL_PID_AS_0(found_pid), + (long long) found_uid, (long long) found_gid); + rc = CS_ERR_ACCESS; + goto bail; + } else if (rv < 0) { + crm_err("Could not verify authenticity of Quorum provider: %s (%d)", + strerror(-rv), -rv); + rc = CS_ERR_ACCESS; + goto bail; + } + rc = quorum_getquorate(pcmk_quorum_handle, &quorate); if (rc != CS_OK) { crm_err("Could not obtain the current Quorum API state: %d", rc); goto bail; } if (quorate) { crm_notice("Quorum acquired"); } else { crm_warn("Quorum lost"); } quorum_app_callback = dispatch; crm_have_quorum = quorate; rc = quorum_trackstart(pcmk_quorum_handle, CS_TRACK_CHANGES | CS_TRACK_CURRENT); if (rc != CS_OK) { crm_err("Could not setup Quorum API notifications: %d", rc); goto bail; } - rc = quorum_fd_get(pcmk_quorum_handle, &fd); - if (rc != CS_OK) { - crm_err("Could not obtain the Quorum API connection: %d", rc); - goto bail; - } - mainloop_add_fd("quorum", G_PRIORITY_HIGH, fd, dispatch, &quorum_fd_callbacks); corosync_initialize_nodelist(NULL, FALSE, NULL); bail: if (rc != CS_OK) { quorum_finalize(pcmk_quorum_handle); return FALSE; } return TRUE; } gboolean init_cs_connection(crm_cluster_t * cluster) { int retries = 0; while (retries < 5) { int rc = init_cs_connection_once(cluster); retries++; switch (rc) { case CS_OK: return TRUE; break; case CS_ERR_TRY_AGAIN: case CS_ERR_QUEUE_FULL: sleep(retries); break; default: return FALSE; } } crm_err("Could not connect to corosync after %d retries", retries); return FALSE; } gboolean init_cs_connection_once(crm_cluster_t * cluster) { crm_node_t *peer = NULL; enum cluster_type_e stack = get_cluster_type(); crm_peer_init(); /* Here we just initialize comms */ if (stack != pcmk_cluster_corosync) { crm_err("Invalid cluster type: %s (%d)", name_for_cluster_type(stack), stack); return FALSE; } if (cluster_connect_cpg(cluster) == FALSE) { return FALSE; } crm_info("Connection to '%s': established", name_for_cluster_type(stack)); cluster->nodeid = get_local_nodeid(0); if(cluster->nodeid == 0) { crm_err("Could not establish local nodeid"); return FALSE; } cluster->uname = get_node_name(0); if(cluster->uname == NULL) { crm_err("Could not establish local node name"); return FALSE; } /* Ensure the local node always exists */ peer = crm_get_peer(cluster->nodeid, cluster->uname); cluster->uuid = get_corosync_uuid(peer); return TRUE; } gboolean check_message_sanity(const AIS_Message * msg, const char *data) { gboolean sane = TRUE; int dest = msg->host.type; int tmp_size = msg->header.size - sizeof(AIS_Message); if (sane && msg->header.size == 0) { crm_warn("Message with no size"); sane = FALSE; } if (sane && msg->header.error != CS_OK) { crm_warn("Message header contains an error: %d", msg->header.error); sane = FALSE; } if (sane && ais_data_len(msg) != tmp_size) { crm_warn("Message payload size is incorrect: expected %d, got %d", ais_data_len(msg), tmp_size); sane = TRUE; } if (sane && ais_data_len(msg) == 0) { crm_warn("Message with no payload"); sane = FALSE; } if (sane && data && msg->is_compressed == FALSE) { int str_size = strlen(data) + 1; if (ais_data_len(msg) != str_size) { int lpc = 0; crm_warn("Message payload is corrupted: expected %d bytes, got %d", ais_data_len(msg), str_size); sane = FALSE; for (lpc = (str_size - 10); lpc < msg->size; lpc++) { if (lpc < 0) { lpc = 0; } crm_debug("bad_data[%d]: %d / '%c'", lpc, data[lpc], data[lpc]); } } } if (sane == FALSE) { crm_err("Invalid message %d: (dest=%s:%s, from=%s:%s.%u, compressed=%d, size=%d, total=%d)", msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed, ais_data_len(msg), msg->header.size); } else { crm_trace ("Verified message %d: (dest=%s:%s, from=%s:%s.%u, compressed=%d, size=%d, total=%d)", msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed, ais_data_len(msg), msg->header.size); } return sane; } enum cluster_type_e find_corosync_variant(void) { int rc = CS_OK; cmap_handle_t handle; rc = cmap_initialize(&handle); switch(rc) { case CS_OK: break; case CS_ERR_SECURITY: crm_debug("Failed to initialize the cmap API: Permission denied (%d)", rc); /* It's there, we just can't talk to it. * Good enough for us to identify as 'corosync' */ return pcmk_cluster_corosync; default: crm_info("Failed to initialize the cmap API: %s (%d)", ais_error2text(rc), rc); return pcmk_cluster_unknown; } cmap_finalize(handle); return pcmk_cluster_corosync; } gboolean crm_is_corosync_peer_active(const crm_node_t * node) { if (node == NULL) { crm_trace("NULL"); return FALSE; } else if (safe_str_neq(node->state, CRM_NODE_MEMBER)) { crm_trace("%s: state=%s", node->uname, node->state); return FALSE; } else if ((node->processes & crm_proc_cpg) == 0) { crm_trace("%s: processes=%.16x", node->uname, node->processes); return FALSE; } return TRUE; } gboolean corosync_initialize_nodelist(void *cluster, gboolean force_member, xmlNode * xml_parent) { int lpc = 0; - int rc = CS_OK; + cs_error_t rc = CS_OK; int retries = 0; gboolean any = FALSE; cmap_handle_t cmap_handle; + int fd = -1; + uid_t found_uid = 0; + gid_t found_gid = 0; + pid_t found_pid = 0; + int rv; do { rc = cmap_initialize(&cmap_handle); if (rc != CS_OK) { retries++; crm_debug("API connection setup failed: %s. Retrying in %ds", cs_strerror(rc), retries); sleep(retries); } } while (retries < 5 && rc != CS_OK); if (rc != CS_OK) { crm_warn("Could not connect to Cluster Configuration Database API, error %d", rc); return FALSE; } + rc = cmap_fd_get(cmap_handle, &fd); + if (rc != CS_OK) { + crm_err("Could not obtain the CMAP API connection: %s (%d)", + cs_strerror(rc), rc); + goto bail; + } + + /* CMAP provider run as root (in given user namespace, anyway)? */ + if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid, + &found_uid, &found_gid))) { + crm_err("CMAP provider is not authentic:" + " process %lld (uid: %lld, gid: %lld)", + (long long) PCMK__SPECIAL_PID_AS_0(found_pid), + (long long) found_uid, (long long) found_gid); + goto bail; + } else if (rv < 0) { + crm_err("Could not verify authenticity of CMAP provider: %s (%d)", + strerror(-rv), -rv); + goto bail; + } + crm_peer_init(); crm_trace("Initializing corosync nodelist"); for (lpc = 0; TRUE; lpc++) { uint32_t nodeid = 0; char *name = NULL; char *key = NULL; key = crm_strdup_printf("nodelist.node.%d.nodeid", lpc); rc = cmap_get_uint32(cmap_handle, key, &nodeid); free(key); if (rc != CS_OK) { break; } name = corosync_node_name(cmap_handle, nodeid); if (name != NULL) { GHashTableIter iter; crm_node_t *node = NULL; g_hash_table_iter_init(&iter, crm_peer_cache); while (g_hash_table_iter_next(&iter, NULL, (gpointer *) &node)) { if(node && node->uname && strcasecmp(node->uname, name) == 0) { if (node->id && node->id != nodeid) { crm_crit("Nodes %u and %u share the same name '%s': shutting down", node->id, nodeid, name); crm_exit(CRM_EX_FATAL); } } } } if (nodeid > 0 || name != NULL) { crm_trace("Initializing node[%d] %u = %s", lpc, nodeid, name); crm_get_peer(nodeid, name); } if (nodeid > 0 && name != NULL) { any = TRUE; if (xml_parent) { xmlNode *node = create_xml_node(xml_parent, XML_CIB_TAG_NODE); crm_xml_set_id(node, "%u", nodeid); crm_xml_add(node, XML_ATTR_UNAME, name); if (force_member) { crm_xml_add(node, XML_ATTR_TYPE, CRM_NODE_MEMBER); } } } free(name); } +bail: cmap_finalize(cmap_handle); return any; } char * corosync_cluster_name(void) { cmap_handle_t handle; char *cluster_name = NULL; - int rc = CS_OK; + cs_error_t rc = CS_OK; + int fd = -1; + uid_t found_uid = 0; + gid_t found_gid = 0; + pid_t found_pid = 0; + int rv; rc = cmap_initialize(&handle); if (rc != CS_OK) { - crm_info("Failed to initialize the cmap API: %s (%d)", ais_error2text(rc), rc); + crm_info("Failed to initialize the cmap API: %s (%d)", + cs_strerror(rc), rc); return NULL; } + rc = cmap_fd_get(handle, &fd); + if (rc != CS_OK) { + crm_err("Could not obtain the CMAP API connection: %s (%d)", + cs_strerror(rc), rc); + goto bail; + } + + /* CMAP provider run as root (in given user namespace, anyway)? */ + if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid, + &found_uid, &found_gid))) { + crm_err("CMAP provider is not authentic:" + " process %lld (uid: %lld, gid: %lld)", + (long long) PCMK__SPECIAL_PID_AS_0(found_pid), + (long long) found_uid, (long long) found_gid); + goto bail; + } else if (rv < 0) { + crm_err("Could not verify authenticity of CMAP provider: %s (%d)", + strerror(-rv), -rv); + goto bail; + } + rc = cmap_get_string(handle, "totem.cluster_name", &cluster_name); if (rc != CS_OK) { - crm_info("Cannot get totem.cluster_name: %s (%d)", ais_error2text(rc), rc); + crm_info("Cannot get totem.cluster_name: %s (%d)", cs_strerror(rc), rc); } else { crm_debug("cmap totem.cluster_name = '%s'", cluster_name); } +bail: cmap_finalize(handle); - return cluster_name; } int corosync_cmap_has_config(const char *prefix) { - int rc = CS_OK; + cs_error_t rc = CS_OK; int retries = 0; static int found = -1; cmap_handle_t cmap_handle; cmap_iter_handle_t iter_handle; char key_name[CMAP_KEYNAME_MAXLEN + 1]; + int fd = -1; + uid_t found_uid = 0; + gid_t found_gid = 0; + pid_t found_pid = 0; + int rv; if(found != -1) { return found; } do { rc = cmap_initialize(&cmap_handle); if (rc != CS_OK) { retries++; crm_debug("API connection setup failed: %s. Retrying in %ds", cs_strerror(rc), retries); sleep(retries); } } while (retries < 5 && rc != CS_OK); if (rc != CS_OK) { crm_warn("Could not connect to Cluster Configuration Database API: %s (rc=%d)", cs_strerror(rc), rc); return -1; } + rc = cmap_fd_get(cmap_handle, &fd); + if (rc != CS_OK) { + crm_err("Could not obtain the CMAP API connection: %s (%d)", + cs_strerror(rc), rc); + goto bail; + } + + /* CMAP provider run as root (in given user namespace, anyway)? */ + if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid, + &found_uid, &found_gid))) { + crm_err("CMAP provider is not authentic:" + " process %lld (uid: %lld, gid: %lld)", + (long long) PCMK__SPECIAL_PID_AS_0(found_pid), + (long long) found_uid, (long long) found_gid); + goto bail; + } else if (rv < 0) { + crm_err("Could not verify authenticity of CMAP provider: %s (%d)", + strerror(-rv), -rv); + goto bail; + } + rc = cmap_iter_init(cmap_handle, prefix, &iter_handle); if (rc != CS_OK) { crm_warn("Failed to initialize iteration for corosync cmap '%s': %s (rc=%d)", prefix, cs_strerror(rc), rc); goto bail; } found = 0; while ((rc = cmap_iter_next(cmap_handle, iter_handle, key_name, NULL, NULL)) == CS_OK) { crm_trace("'%s' is configured in corosync cmap: %s", prefix, key_name); found++; break; } cmap_iter_finalize(cmap_handle, iter_handle); bail: cmap_finalize(cmap_handle); return found; } diff --git a/lib/cluster/cpg.c b/lib/cluster/cpg.c index 75af131353..430195cada 100644 --- a/lib/cluster/cpg.c +++ b/lib/cluster/cpg.c @@ -1,654 +1,713 @@ /* - * Copyright 2004-2018 Andrew Beekhof + * Copyright 2004-2019 the Pacemaker project contributors + * + * The version control history for this file may have further details. * * This source code is licensed under the GNU Lesser General Public License * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include +#include /* PCMK__SPECIAL_PID* */ + cpg_handle_t pcmk_cpg_handle = 0; /* TODO: Remove, use cluster.cpg_handle */ static bool cpg_evicted = FALSE; gboolean(*pcmk_cpg_dispatch_fn) (int kind, const char *from, const char *data) = NULL; #define cs_repeat(counter, max, code) do { \ code; \ if(rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { \ counter++; \ crm_debug("Retrying operation after %ds", counter); \ sleep(counter); \ } else { \ break; \ } \ } while(counter < max) void cluster_disconnect_cpg(crm_cluster_t *cluster) { pcmk_cpg_handle = 0; if (cluster->cpg_handle) { crm_trace("Disconnecting CPG"); cpg_leave(cluster->cpg_handle, &cluster->group); cpg_finalize(cluster->cpg_handle); cluster->cpg_handle = 0; } else { crm_info("No CPG connection"); } } uint32_t get_local_nodeid(cpg_handle_t handle) { - int rc = CS_OK; + cs_error_t rc = CS_OK; int retries = 0; static uint32_t local_nodeid = 0; cpg_handle_t local_handle = handle; cpg_callbacks_t cb = { }; + int fd = -1; + uid_t found_uid = 0; + gid_t found_gid = 0; + pid_t found_pid = 0; + int rv; if(local_nodeid != 0) { return local_nodeid; } if(handle == 0) { crm_trace("Creating connection"); cs_repeat(retries, 5, rc = cpg_initialize(&local_handle, &cb)); + if (rc != CS_OK) { + crm_err("Could not connect to the CPG API: %s (%d)", + cs_strerror(rc), rc); + return 0; + } + + rc = cpg_fd_get(local_handle, &fd); + if (rc != CS_OK) { + crm_err("Could not obtain the CPG API connection: %s (%d)", + cs_strerror(rc), rc); + goto bail; + } + + /* CPG provider run as root (in given user namespace, anyway)? */ + if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid, + &found_uid, &found_gid))) { + crm_err("CPG provider is not authentic:" + " process %lld (uid: %lld, gid: %lld)", + (long long) PCMK__SPECIAL_PID_AS_0(found_pid), + (long long) found_uid, (long long) found_gid); + goto bail; + } else if (rv < 0) { + crm_err("Could not verify authenticity of CPG provider: %s (%d)", + strerror(-rv), -rv); + goto bail; + } } if (rc == CS_OK) { retries = 0; crm_trace("Performing lookup"); cs_repeat(retries, 5, rc = cpg_local_get(local_handle, &local_nodeid)); } if (rc != CS_OK) { crm_err("Could not get local node id from the CPG API: %s (%d)", ais_error2text(rc), rc); } + +bail: if(handle == 0) { crm_trace("Closing connection"); cpg_finalize(local_handle); } crm_debug("Local nodeid is %u", local_nodeid); return local_nodeid; } GListPtr cs_message_queue = NULL; int cs_message_timer = 0; static ssize_t crm_cs_flush(gpointer data); static gboolean crm_cs_flush_cb(gpointer data) { cs_message_timer = 0; crm_cs_flush(data); return FALSE; } #define CS_SEND_MAX 200 static ssize_t crm_cs_flush(gpointer data) { int sent = 0; ssize_t rc = 0; int queue_len = 0; static unsigned int last_sent = 0; cpg_handle_t *handle = (cpg_handle_t *)data; if (*handle == 0) { crm_trace("Connection is dead"); return pcmk_ok; } queue_len = g_list_length(cs_message_queue); if ((queue_len % 1000) == 0 && queue_len > 1) { crm_err("CPG queue has grown to %d", queue_len); } else if (queue_len == CS_SEND_MAX) { crm_warn("CPG queue has grown to %d", queue_len); } if (cs_message_timer) { /* There is already a timer, wait until it goes off */ crm_trace("Timer active %d", cs_message_timer); return pcmk_ok; } while (cs_message_queue && sent < CS_SEND_MAX) { struct iovec *iov = cs_message_queue->data; errno = 0; rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1); if (rc != CS_OK) { break; } sent++; last_sent++; crm_trace("CPG message sent, size=%llu", (unsigned long long) iov->iov_len); cs_message_queue = g_list_remove(cs_message_queue, iov); free(iov->iov_base); free(iov); } queue_len -= sent; if (sent > 1 || cs_message_queue) { crm_info("Sent %d CPG messages (%d remaining, last=%u): %s (%lld)", sent, queue_len, last_sent, ais_error2text(rc), (long long) rc); } else { crm_trace("Sent %d CPG messages (%d remaining, last=%u): %s (%lld)", sent, queue_len, last_sent, ais_error2text(rc), (long long) rc); } if (cs_message_queue) { uint32_t delay_ms = 100; if(rc != CS_OK) { /* Proportionally more if sending failed but cap at 1s */ delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len)); } cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, data); } return rc; } gboolean send_cpg_iov(struct iovec * iov) { static unsigned int queued = 0; queued++; crm_trace("Queueing CPG message %u (%llu bytes)", queued, (unsigned long long) iov->iov_len); cs_message_queue = g_list_append(cs_message_queue, iov); crm_cs_flush(&pcmk_cpg_handle); return TRUE; } static int pcmk_cpg_dispatch(gpointer user_data) { int rc = 0; crm_cluster_t *cluster = (crm_cluster_t*) user_data; rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE); if (rc != CS_OK) { crm_err("Connection to the CPG API failed: %s (%d)", ais_error2text(rc), rc); cluster->cpg_handle = 0; return -1; } else if(cpg_evicted) { crm_err("Evicted from CPG membership"); return -1; } return 0; } char * pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content, uint32_t *kind, const char **from) { char *data = NULL; AIS_Message *msg = (AIS_Message *) content; if(handle) { // Do filtering and field massaging uint32_t local_nodeid = get_local_nodeid(handle); const char *local_name = get_local_node_name(); if (msg->sender.id > 0 && msg->sender.id != nodeid) { crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, msg->sender.id); return NULL; } else if (msg->host.id != 0 && (local_nodeid != msg->host.id)) { /* Not for us */ crm_trace("Not for us: %u != %u", msg->host.id, local_nodeid); return NULL; } else if (msg->host.size != 0 && safe_str_neq(msg->host.uname, local_name)) { /* Not for us */ crm_trace("Not for us: %s != %s", msg->host.uname, local_name); return NULL; } msg->sender.id = nodeid; if (msg->sender.size == 0) { crm_node_t *peer = crm_get_peer(nodeid, NULL); if (peer == NULL) { crm_err("Peer with nodeid=%u is unknown", nodeid); } else if (peer->uname == NULL) { crm_err("No uname for peer with nodeid=%u", nodeid); } else { crm_notice("Fixing uname for peer with nodeid=%u", nodeid); msg->sender.size = strlen(peer->uname); memset(msg->sender.uname, 0, MAX_NAME); memcpy(msg->sender.uname, peer->uname, msg->sender.size); } } } crm_trace("Got new%s message (size=%d, %d, %d)", msg->is_compressed ? " compressed" : "", ais_data_len(msg), msg->size, msg->compressed_size); if (kind != NULL) { *kind = msg->header.id; } if (from != NULL) { *from = msg->sender.uname; } if (msg->is_compressed && msg->size > 0) { int rc = BZ_OK; char *uncompressed = NULL; unsigned int new_size = msg->size + 1; if (check_message_sanity(msg, NULL) == FALSE) { goto badmsg; } crm_trace("Decompressing message data"); uncompressed = calloc(1, new_size); rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data, msg->compressed_size, 1, 0); if (rc != BZ_OK) { crm_err("Decompression failed: %s " CRM_XS " bzerror=%d", bz2_strerror(rc), rc); free(uncompressed); goto badmsg; } CRM_ASSERT(rc == BZ_OK); CRM_ASSERT(new_size == msg->size); data = uncompressed; } else if (check_message_sanity(msg, data) == FALSE) { goto badmsg; } else if (safe_str_eq("identify", data)) { char *pid_s = crm_getpid_s(); send_cluster_text(crm_class_cluster, pid_s, TRUE, NULL, crm_msg_ais); free(pid_s); return NULL; } else { data = strdup(msg->data); } // Is this necessary? crm_get_peer(msg->sender.id, msg->sender.uname); crm_trace("Payload: %.200s", data); return data; badmsg: crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):" " min=%d, total=%d, size=%d, bz2_size=%d", msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type), ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), msg->sender.pid, (int)sizeof(AIS_Message), msg->header.size, msg->size, msg->compressed_size); free(data); return NULL; } #define PEER_NAME(peer) ((peer)? ((peer)->uname? (peer)->uname : "") : "") void pcmk_cpg_membership(cpg_handle_t handle, const struct cpg_name *groupName, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries) { int i; gboolean found = FALSE; static int counter = 0; uint32_t local_nodeid = get_local_nodeid(handle); for (i = 0; i < left_list_entries; i++) { crm_node_t *peer = crm_find_peer(left_list[i].nodeid, NULL); crm_info("Group event %s.%d: node %u (%s) left", groupName->value, counter, left_list[i].nodeid, PEER_NAME(peer)); if (peer) { crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, OFFLINESTATUS); } } for (i = 0; i < joined_list_entries; i++) { crm_info("Group event %s.%d: node %u joined", groupName->value, counter, joined_list[i].nodeid); } for (i = 0; i < member_list_entries; i++) { crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL); crm_info("Group event %s.%d: node %u (%s) is member", groupName->value, counter, member_list[i].nodeid, PEER_NAME(peer)); /* If the caller left auto-reaping enabled, this will also update the * state to member. */ peer = crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS); if (peer && peer->state && strcmp(peer->state, CRM_NODE_MEMBER)) { /* The node is a CPG member, but we currently think it's not a * cluster member. This is possible only if auto-reaping was * disabled. The node may be joining, and we happened to get the CPG * notification before the quorum notification; or the node may have * just died, and we are processing its final messages; or a bug * has affected the peer cache. */ time_t now = time(NULL); if (peer->when_lost == 0) { // Track when we first got into this contradictory state peer->when_lost = now; } else if (now > (peer->when_lost + 60)) { // If it persists for more than a minute, update the state crm_warn("Node %u member of group %s but believed offline", member_list[i].nodeid, groupName->value); crm_update_peer_state(__FUNCTION__, peer, CRM_NODE_MEMBER, 0); } } if (local_nodeid == member_list[i].nodeid) { found = TRUE; } } if (!found) { crm_err("We're not part of CPG group '%s' anymore!", groupName->value); cpg_evicted = TRUE; } counter++; } gboolean cluster_connect_cpg(crm_cluster_t *cluster) { - int rc = -1; - int fd = 0; + cs_error_t rc; + int fd = -1; int retries = 0; uint32_t id = 0; crm_node_t *peer = NULL; cpg_handle_t handle = 0; const char *message_name = pcmk_message_name(crm_system_name); + uid_t found_uid = 0; + gid_t found_gid = 0; + pid_t found_pid = 0; + int rv; struct mainloop_fd_callbacks cpg_fd_callbacks = { .dispatch = pcmk_cpg_dispatch, .destroy = cluster->destroy, }; cpg_callbacks_t cpg_callbacks = { .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn, .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn, /* .cpg_deliver_fn = pcmk_cpg_deliver, */ /* .cpg_confchg_fn = pcmk_cpg_membership, */ }; cpg_evicted = FALSE; cluster->group.length = 0; cluster->group.value[0] = 0; /* group.value is char[128] */ strncpy(cluster->group.value, message_name, 127); cluster->group.value[127] = 0; cluster->group.length = 1 + QB_MIN(127, strlen(cluster->group.value)); cs_repeat(retries, 30, rc = cpg_initialize(&handle, &cpg_callbacks)); if (rc != CS_OK) { - crm_err("Could not connect to the Cluster Process Group API: %d", rc); + crm_err("Could not connect to the CPG API: %s (%d)", + cs_strerror(rc), rc); + goto bail; + } + + rc = cpg_fd_get(handle, &fd); + if (rc != CS_OK) { + crm_err("Could not obtain the CPG API connection: %s (%d)", + cs_strerror(rc), rc); + goto bail; + } + + /* CPG provider run as root (in given user namespace, anyway)? */ + if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid, + &found_uid, &found_gid))) { + crm_err("CPG provider is not authentic:" + " process %lld (uid: %lld, gid: %lld)", + (long long) PCMK__SPECIAL_PID_AS_0(found_pid), + (long long) found_uid, (long long) found_gid); + rc = CS_ERR_ACCESS; + goto bail; + } else if (rv < 0) { + crm_err("Could not verify authenticity of CPG provider: %s (%d)", + strerror(-rv), -rv); + rc = CS_ERR_ACCESS; goto bail; } id = get_local_nodeid(handle); if (id == 0) { crm_err("Could not get local node id from the CPG API"); goto bail; } cluster->nodeid = id; retries = 0; cs_repeat(retries, 30, rc = cpg_join(handle, &cluster->group)); if (rc != CS_OK) { crm_err("Could not join the CPG group '%s': %d", message_name, rc); goto bail; } - rc = cpg_fd_get(handle, &fd); - if (rc != CS_OK) { - crm_err("Could not obtain the CPG API connection: %d", rc); - goto bail; - } - pcmk_cpg_handle = handle; cluster->cpg_handle = handle; mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks); bail: if (rc != CS_OK) { cpg_finalize(handle); return FALSE; } peer = crm_get_peer(id, NULL); crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS); return TRUE; } gboolean send_cluster_message_cs(xmlNode * msg, gboolean local, crm_node_t * node, enum crm_ais_msg_types dest) { gboolean rc = TRUE; char *data = NULL; data = dump_xml_unformatted(msg); rc = send_cluster_text(crm_class_cluster, data, local, node, dest); free(data); return rc; } gboolean send_cluster_text(enum crm_ais_msg_class msg_class, const char *data, gboolean local, crm_node_t *node, enum crm_ais_msg_types dest) { static int msg_id = 0; static int local_pid = 0; static int local_name_len = 0; static const char *local_name = NULL; char *target = NULL; struct iovec *iov; AIS_Message *msg = NULL; enum crm_ais_msg_types sender = text2msg_type(crm_system_name); switch (msg_class) { case crm_class_cluster: break; default: crm_err("Invalid message class: %d", msg_class); return FALSE; } CRM_CHECK(dest != crm_msg_ais, return FALSE); if(local_name == NULL) { local_name = get_local_node_name(); } if(local_name_len == 0 && local_name) { local_name_len = strlen(local_name); } if (data == NULL) { data = ""; } if (local_pid == 0) { local_pid = getpid(); } if (sender == crm_msg_none) { sender = local_pid; } msg = calloc(1, sizeof(AIS_Message)); msg_id++; msg->id = msg_id; msg->header.id = msg_class; msg->header.error = CS_OK; msg->host.type = dest; msg->host.local = local; if (node) { if (node->uname) { target = strdup(node->uname); msg->host.size = strlen(node->uname); memset(msg->host.uname, 0, MAX_NAME); memcpy(msg->host.uname, node->uname, msg->host.size); } else { target = crm_strdup_printf("%u", node->id); } msg->host.id = node->id; } else { target = strdup("all"); } msg->sender.id = 0; msg->sender.type = sender; msg->sender.pid = local_pid; msg->sender.size = local_name_len; memset(msg->sender.uname, 0, MAX_NAME); if(local_name && msg->sender.size) { memcpy(msg->sender.uname, local_name, msg->sender.size); } msg->size = 1 + strlen(data); msg->header.size = sizeof(AIS_Message) + msg->size; if (msg->size < CRM_BZ2_THRESHOLD) { msg = realloc_safe(msg, msg->header.size); memcpy(msg->data, data, msg->size); } else { char *compressed = NULL; unsigned int new_size = 0; char *uncompressed = strdup(data); if (crm_compress_string(uncompressed, msg->size, 0, &compressed, &new_size)) { msg->header.size = sizeof(AIS_Message) + new_size; msg = realloc_safe(msg, msg->header.size); memcpy(msg->data, compressed, new_size); msg->is_compressed = TRUE; msg->compressed_size = new_size; } else { msg = realloc_safe(msg, msg->header.size); memcpy(msg->data, data, msg->size); } free(uncompressed); free(compressed); } iov = calloc(1, sizeof(struct iovec)); iov->iov_base = msg; iov->iov_len = msg->header.size; if (msg->compressed_size) { crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes compressed payload): %.200s", msg->id, target, (unsigned long long) iov->iov_len, msg->compressed_size, data); } else { crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes payload): %.200s", msg->id, target, (unsigned long long) iov->iov_len, msg->size, data); } free(target); send_cpg_iov(iov); return TRUE; } enum crm_ais_msg_types text2msg_type(const char *text) { int type = crm_msg_none; CRM_CHECK(text != NULL, return type); text = pcmk_message_name(text); if (safe_str_eq(text, "ais")) { type = crm_msg_ais; } else if (safe_str_eq(text, CRM_SYSTEM_CIB)) { type = crm_msg_cib; } else if (safe_str_eq(text, CRM_SYSTEM_CRMD) || safe_str_eq(text, CRM_SYSTEM_DC)) { type = crm_msg_crmd; } else if (safe_str_eq(text, CRM_SYSTEM_TENGINE)) { type = crm_msg_te; } else if (safe_str_eq(text, CRM_SYSTEM_PENGINE)) { type = crm_msg_pe; } else if (safe_str_eq(text, CRM_SYSTEM_LRMD)) { type = crm_msg_lrmd; } else if (safe_str_eq(text, CRM_SYSTEM_STONITHD)) { type = crm_msg_stonithd; } else if (safe_str_eq(text, "stonith-ng")) { type = crm_msg_stonith_ng; } else if (safe_str_eq(text, "attrd")) { type = crm_msg_attrd; } else { /* This will normally be a transient client rather than * a cluster daemon. Set the type to the pid of the client */ int scan_rc = sscanf(text, "%d", &type); if (scan_rc != 1 || type <= crm_msg_stonith_ng) { /* Ensure it's sane */ type = crm_msg_none; } } return type; } diff --git a/lib/common/ipc.c b/lib/common/ipc.c index aa055d366e..84d7cb93fa 100644 --- a/lib/common/ipc.c +++ b/lib/common/ipc.c @@ -1,1566 +1,1605 @@ /* * Copyright 2004-2019 the Pacemaker project contributors * * The version control history for this file may have further details. * * This source code is licensed under the GNU Lesser General Public License * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY. */ #include #if defined(US_AUTH_PEERCRED_UCRED) || defined(US_AUTH_PEERCRED_SOCKPEERCRED) # ifdef US_AUTH_PEERCRED_UCRED # ifndef _GNU_SOURCE # define _GNU_SOURCE # endif # endif # include #elif defined(US_AUTH_GETPEERUCRED) # include #endif #include #include #include #include #include #include #include #include #include #include /* indirectly: pcmk_err_generic */ #include #include #include #include /* PCMK__SPECIAL_PID* */ #define PCMK_IPC_VERSION 1 /* Evict clients whose event queue grows this large (by default) */ #define PCMK_IPC_DEFAULT_QUEUE_MAX 500 struct crm_ipc_response_header { struct qb_ipc_response_header qb; uint32_t size_uncompressed; uint32_t size_compressed; uint32_t flags; uint8_t version; /* Protect against version changes for anyone that might bother to statically link us */ }; static int hdr_offset = 0; static unsigned int ipc_buffer_max = 0; static unsigned int pick_ipc_buffer(unsigned int max); static inline void crm_ipc_init(void) { if (hdr_offset == 0) { hdr_offset = sizeof(struct crm_ipc_response_header); } if (ipc_buffer_max == 0) { ipc_buffer_max = pick_ipc_buffer(0); } } unsigned int crm_ipc_default_buffer_size(void) { return pick_ipc_buffer(0); } static char * generateReference(const char *custom1, const char *custom2) { static uint ref_counter = 0; return crm_strdup_printf("%s-%s-%lu-%u", (custom1? custom1 : "_empty_"), (custom2? custom2 : "_empty_"), (unsigned long)time(NULL), ref_counter++); } xmlNode * create_request_adv(const char *task, xmlNode * msg_data, const char *host_to, const char *sys_to, const char *sys_from, const char *uuid_from, const char *origin) { char *true_from = NULL; xmlNode *request = NULL; char *reference = generateReference(task, sys_from); if (uuid_from != NULL) { true_from = generate_hash_key(sys_from, uuid_from); } else if (sys_from != NULL) { true_from = strdup(sys_from); } else { crm_err("No sys from specified"); } // host_from will get set for us if necessary by the controller when routed request = create_xml_node(NULL, __FUNCTION__); crm_xml_add(request, F_CRM_ORIGIN, origin); crm_xml_add(request, F_TYPE, T_CRM); crm_xml_add(request, F_CRM_VERSION, CRM_FEATURE_SET); crm_xml_add(request, F_CRM_MSG_TYPE, XML_ATTR_REQUEST); crm_xml_add(request, F_CRM_REFERENCE, reference); crm_xml_add(request, F_CRM_TASK, task); crm_xml_add(request, F_CRM_SYS_TO, sys_to); crm_xml_add(request, F_CRM_SYS_FROM, true_from); /* HOSTTO will be ignored if it is to the DC anyway. */ if (host_to != NULL && strlen(host_to) > 0) { crm_xml_add(request, F_CRM_HOST_TO, host_to); } if (msg_data != NULL) { add_message_xml(request, F_CRM_DATA, msg_data); } free(reference); free(true_from); return request; } /* * This method adds a copy of xml_response_data */ xmlNode * create_reply_adv(xmlNode * original_request, xmlNode * xml_response_data, const char *origin) { xmlNode *reply = NULL; const char *host_from = crm_element_value(original_request, F_CRM_HOST_FROM); const char *sys_from = crm_element_value(original_request, F_CRM_SYS_FROM); const char *sys_to = crm_element_value(original_request, F_CRM_SYS_TO); const char *type = crm_element_value(original_request, F_CRM_MSG_TYPE); const char *operation = crm_element_value(original_request, F_CRM_TASK); const char *crm_msg_reference = crm_element_value(original_request, F_CRM_REFERENCE); if (type == NULL) { crm_err("Cannot create new_message, no message type in original message"); CRM_ASSERT(type != NULL); return NULL; #if 0 } else if (strcasecmp(XML_ATTR_REQUEST, type) != 0) { crm_err("Cannot create new_message, original message was not a request"); return NULL; #endif } reply = create_xml_node(NULL, __FUNCTION__); if (reply == NULL) { crm_err("Cannot create new_message, malloc failed"); return NULL; } crm_xml_add(reply, F_CRM_ORIGIN, origin); crm_xml_add(reply, F_TYPE, T_CRM); crm_xml_add(reply, F_CRM_VERSION, CRM_FEATURE_SET); crm_xml_add(reply, F_CRM_MSG_TYPE, XML_ATTR_RESPONSE); crm_xml_add(reply, F_CRM_REFERENCE, crm_msg_reference); crm_xml_add(reply, F_CRM_TASK, operation); /* since this is a reply, we reverse the from and to */ crm_xml_add(reply, F_CRM_SYS_TO, sys_from); crm_xml_add(reply, F_CRM_SYS_FROM, sys_to); /* HOSTTO will be ignored if it is to the DC anyway. */ if (host_from != NULL && strlen(host_from) > 0) { crm_xml_add(reply, F_CRM_HOST_TO, host_from); } if (xml_response_data != NULL) { add_message_xml(reply, F_CRM_DATA, xml_response_data); } return reply; } /* Libqb based IPC */ /* Server... */ GHashTable *client_connections = NULL; crm_client_t * crm_client_get(qb_ipcs_connection_t * c) { if (client_connections) { return g_hash_table_lookup(client_connections, c); } crm_trace("No client found for %p", c); return NULL; } crm_client_t * crm_client_get_by_id(const char *id) { gpointer key; crm_client_t *client; GHashTableIter iter; if (client_connections && id) { g_hash_table_iter_init(&iter, client_connections); while (g_hash_table_iter_next(&iter, &key, (gpointer *) & client)) { if (strcmp(client->id, id) == 0) { return client; } } } crm_trace("No client found with id=%s", id); return NULL; } const char * crm_client_name(crm_client_t * c) { if (c == NULL) { return "null"; } else if (c->name == NULL && c->id == NULL) { return "unknown"; } else if (c->name == NULL) { return c->id; } else { return c->name; } } const char * crm_client_type_text(enum client_type client_type) { switch (client_type) { case CRM_CLIENT_IPC: return "IPC"; case CRM_CLIENT_TCP: return "TCP"; #ifdef HAVE_GNUTLS_GNUTLS_H case CRM_CLIENT_TLS: return "TLS"; #endif default: return "unknown"; } } void crm_client_init(void) { if (client_connections == NULL) { crm_trace("Creating client hash table"); client_connections = g_hash_table_new(g_direct_hash, g_direct_equal); } } void crm_client_cleanup(void) { if (client_connections != NULL) { int active = g_hash_table_size(client_connections); if (active) { crm_err("Exiting with %d active connections", active); } g_hash_table_destroy(client_connections); client_connections = NULL; } } void crm_client_disconnect_all(qb_ipcs_service_t *service) { qb_ipcs_connection_t *c = NULL; if (service == NULL) { return; } c = qb_ipcs_connection_first_get(service); while (c != NULL) { qb_ipcs_connection_t *last = c; c = qb_ipcs_connection_next_get(service, last); /* There really shouldn't be anyone connected at this point */ crm_notice("Disconnecting client %p, pid=%d...", last, crm_ipcs_client_pid(last)); qb_ipcs_disconnect(last); qb_ipcs_connection_unref(last); } } /*! * \internal * \brief Allocate a new crm_client_t object based on an IPC connection * * \param[in] c IPC connection (or NULL to allocate generic client) * \param[in] key Connection table key (or NULL to use sane default) * \param[in] uid_client UID corresponding to c (ignored if c is NULL) * * \return Pointer to new crm_client_t (or NULL on error) */ static crm_client_t * client_from_connection(qb_ipcs_connection_t *c, void *key, uid_t uid_client) { crm_client_t *client = calloc(1, sizeof(crm_client_t)); if (client == NULL) { crm_perror(LOG_ERR, "Allocating client"); return NULL; } if (c) { #if ENABLE_ACL client->user = uid2username(uid_client); if (client->user == NULL) { client->user = strdup("#unprivileged"); CRM_CHECK(client->user != NULL, free(client); return NULL); crm_err("Unable to enforce ACLs for user ID %d, assuming unprivileged", uid_client); } #endif client->ipcs = c; client->kind = CRM_CLIENT_IPC; client->pid = crm_ipcs_client_pid(c); if (key == NULL) { key = c; } } client->id = crm_generate_uuid(); if (client->id == NULL) { crm_err("Could not generate UUID for client"); free(client->user); free(client); return NULL; } if (key == NULL) { key = client->id; } g_hash_table_insert(client_connections, key, client); return client; } /*! * \brief Allocate a new crm_client_t object and generate its ID * * \param[in] key What to use as connections hash table key (NULL to use ID) * * \return Pointer to new crm_client_t (asserts on failure) */ crm_client_t * crm_client_alloc(void *key) { crm_client_t *client = client_from_connection(NULL, key, 0); CRM_ASSERT(client != NULL); return client; } crm_client_t * crm_client_new(qb_ipcs_connection_t * c, uid_t uid_client, gid_t gid_client) { static gid_t uid_cluster = 0; static gid_t gid_cluster = 0; crm_client_t *client = NULL; CRM_CHECK(c != NULL, return NULL); if (uid_cluster == 0) { if (crm_user_lookup(CRM_DAEMON_USER, &uid_cluster, &gid_cluster) < 0) { static bool need_log = TRUE; if (need_log) { crm_warn("Could not find user and group IDs for user %s", CRM_DAEMON_USER); need_log = FALSE; } } } if (uid_client != 0) { crm_trace("Giving access to group %u", gid_cluster); /* Passing -1 to chown(2) means don't change */ qb_ipcs_connection_auth_set(c, -1, gid_cluster, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); } crm_client_init(); /* TODO: Do our own auth checking, return NULL if unauthorized */ client = client_from_connection(c, NULL, uid_client); if (client == NULL) { return NULL; } if ((uid_client == 0) || (uid_client == uid_cluster)) { /* Remember when a connection came from root or hacluster */ set_bit(client->flags, crm_client_flag_ipc_privileged); } crm_debug("Connecting %p for uid=%d gid=%d pid=%u id=%s", c, uid_client, gid_client, client->pid, client->id); return client; } static struct iovec * pcmk__new_ipc_event() { struct iovec *iov = calloc(2, sizeof(struct iovec)); CRM_ASSERT(iov != NULL); return iov; } /*! * \brief Free an I/O vector created by crm_ipc_prepare() * * \param[in] event I/O vector to free */ void pcmk_free_ipc_event(struct iovec *event) { if (event != NULL) { free(event[0].iov_base); free(event[1].iov_base); free(event); } } static void free_event(gpointer data) { pcmk_free_ipc_event((struct iovec *) data); } static void add_event(crm_client_t *c, struct iovec *iov) { if (c->event_queue == NULL) { c->event_queue = g_queue_new(); } g_queue_push_tail(c->event_queue, iov); } void crm_client_destroy(crm_client_t * c) { if (c == NULL) { return; } if (client_connections) { if (c->ipcs) { crm_trace("Destroying %p/%p (%d remaining)", c, c->ipcs, crm_hash_table_size(client_connections) - 1); g_hash_table_remove(client_connections, c->ipcs); } else { crm_trace("Destroying remote connection %p (%d remaining)", c, crm_hash_table_size(client_connections) - 1); g_hash_table_remove(client_connections, c->id); } } if (c->event_timer) { g_source_remove(c->event_timer); } if (c->event_queue) { crm_debug("Destroying %d events", g_queue_get_length(c->event_queue)); g_queue_free_full(c->event_queue, free_event); } free(c->id); free(c->name); free(c->user); if (c->remote) { if (c->remote->auth_timeout) { g_source_remove(c->remote->auth_timeout); } free(c->remote->buffer); free(c->remote); } free(c); } /*! * \brief Raise IPC eviction threshold for a client, if allowed * * \param[in,out] client Client to modify * \param[in] queue_max New threshold (as string) * * \return TRUE if change was allowed, FALSE otherwise */ bool crm_set_client_queue_max(crm_client_t *client, const char *qmax) { if (is_set(client->flags, crm_client_flag_ipc_privileged)) { int qmax_int = crm_int_helper(qmax, NULL); if ((errno == 0) && (qmax_int > 0)) { client->queue_max = qmax_int; return TRUE; } } return FALSE; } int crm_ipcs_client_pid(qb_ipcs_connection_t * c) { struct qb_ipcs_connection_stats stats; stats.client_pid = 0; qb_ipcs_connection_stats_get(c, &stats, 0); return stats.client_pid; } xmlNode * crm_ipcs_recv(crm_client_t * c, void *data, size_t size, uint32_t * id, uint32_t * flags) { xmlNode *xml = NULL; char *uncompressed = NULL; char *text = ((char *)data) + sizeof(struct crm_ipc_response_header); struct crm_ipc_response_header *header = data; if (id) { *id = ((struct qb_ipc_response_header *)data)->id; } if (flags) { *flags = header->flags; } if (is_set(header->flags, crm_ipc_proxied)) { /* Mark this client as being the endpoint of a proxy connection. * Proxy connections responses are sent on the event channel, to avoid * blocking the controller serving as proxy. */ c->flags |= crm_client_flag_ipc_proxied; } if(header->version > PCMK_IPC_VERSION) { crm_err("Filtering incompatible v%d IPC message, we only support versions <= %d", header->version, PCMK_IPC_VERSION); return NULL; } if (header->size_compressed) { int rc = 0; unsigned int size_u = 1 + header->size_uncompressed; uncompressed = calloc(1, size_u); crm_trace("Decompressing message data %u bytes into %u bytes", header->size_compressed, size_u); rc = BZ2_bzBuffToBuffDecompress(uncompressed, &size_u, text, header->size_compressed, 1, 0); text = uncompressed; if (rc != BZ_OK) { crm_err("Decompression failed: %s " CRM_XS " bzerror=%d", bz2_strerror(rc), rc); free(uncompressed); return NULL; } } CRM_ASSERT(text[header->size_uncompressed - 1] == 0); crm_trace("Received %.200s", text); xml = string2xml(text); free(uncompressed); return xml; } ssize_t crm_ipcs_flush_events(crm_client_t * c); static gboolean crm_ipcs_flush_events_cb(gpointer data) { crm_client_t *c = data; c->event_timer = 0; crm_ipcs_flush_events(c); return FALSE; } /*! * \internal * \brief Add progressive delay before next event queue flush * * \param[in,out] c Client connection to add delay to * \param[in] queue_len Current event queue length */ static inline void delay_next_flush(crm_client_t *c, unsigned int queue_len) { /* Delay a maximum of 1.5 seconds */ guint delay = (queue_len < 5)? (1000 + 100 * queue_len) : 1500; c->event_timer = g_timeout_add(delay, crm_ipcs_flush_events_cb, c); } ssize_t crm_ipcs_flush_events(crm_client_t * c) { ssize_t rc = 0; unsigned int sent = 0; unsigned int queue_len = 0; if (c == NULL) { return pcmk_ok; } else if (c->event_timer) { /* There is already a timer, wait until it goes off */ crm_trace("Timer active for %p - %d", c->ipcs, c->event_timer); return pcmk_ok; } if (c->event_queue) { queue_len = g_queue_get_length(c->event_queue); } while (sent < 100) { struct crm_ipc_response_header *header = NULL; struct iovec *event = NULL; if (c->event_queue) { // We don't pop unless send is successful event = g_queue_peek_head(c->event_queue); } if (event == NULL) { // Queue is empty break; } rc = qb_ipcs_event_sendv(c->ipcs, event, 2); if (rc < 0) { break; } event = g_queue_pop_head(c->event_queue); sent++; header = event[0].iov_base; if (header->size_compressed) { crm_trace("Event %d to %p[%d] (%lld compressed bytes) sent", header->qb.id, c->ipcs, c->pid, (long long) rc); } else { crm_trace("Event %d to %p[%d] (%lld bytes) sent: %.120s", header->qb.id, c->ipcs, c->pid, (long long) rc, (char *) (event[1].iov_base)); } pcmk_free_ipc_event(event); } queue_len -= sent; if (sent > 0 || queue_len) { crm_trace("Sent %d events (%d remaining) for %p[%d]: %s (%lld)", sent, queue_len, c->ipcs, c->pid, pcmk_strerror(rc < 0 ? rc : 0), (long long) rc); } if (queue_len) { /* Allow clients to briefly fall behind on processing incoming messages, * but drop completely unresponsive clients so the connection doesn't * consume resources indefinitely. */ if (queue_len > QB_MAX(c->queue_max, PCMK_IPC_DEFAULT_QUEUE_MAX)) { if ((c->queue_backlog <= 1) || (queue_len < c->queue_backlog)) { /* Don't evict for a new or shrinking backlog */ crm_warn("Client with process ID %u has a backlog of %u messages " CRM_XS " %p", c->pid, queue_len, c->ipcs); } else { crm_err("Evicting client with process ID %u due to backlog of %u messages " CRM_XS " %p", c->pid, queue_len, c->ipcs); c->queue_backlog = 0; qb_ipcs_disconnect(c->ipcs); return rc; } } c->queue_backlog = queue_len; delay_next_flush(c, queue_len); } else { /* Event queue is empty, there is no backlog */ c->queue_backlog = 0; } return rc; } ssize_t crm_ipc_prepare(uint32_t request, xmlNode * message, struct iovec ** result, uint32_t max_send_size) { static unsigned int biggest = 0; struct iovec *iov; unsigned int total = 0; char *compressed = NULL; char *buffer = dump_xml_unformatted(message); struct crm_ipc_response_header *header = calloc(1, sizeof(struct crm_ipc_response_header)); CRM_ASSERT(result != NULL); crm_ipc_init(); if (max_send_size == 0) { max_send_size = ipc_buffer_max; } CRM_LOG_ASSERT(max_send_size != 0); *result = NULL; iov = pcmk__new_ipc_event(); iov[0].iov_len = hdr_offset; iov[0].iov_base = header; header->version = PCMK_IPC_VERSION; header->size_uncompressed = 1 + strlen(buffer); total = iov[0].iov_len + header->size_uncompressed; if (total < max_send_size) { iov[1].iov_base = buffer; iov[1].iov_len = header->size_uncompressed; } else { unsigned int new_size = 0; if (crm_compress_string (buffer, header->size_uncompressed, max_send_size, &compressed, &new_size)) { header->flags |= crm_ipc_compressed; header->size_compressed = new_size; iov[1].iov_len = header->size_compressed; iov[1].iov_base = compressed; free(buffer); biggest = QB_MAX(header->size_compressed, biggest); } else { ssize_t rc = -EMSGSIZE; crm_log_xml_trace(message, "EMSGSIZE"); biggest = QB_MAX(header->size_uncompressed, biggest); crm_err ("Could not compress the message (%u bytes) into less than the configured ipc limit (%u bytes). " "Set PCMK_ipc_buffer to a higher value (%u bytes suggested)", header->size_uncompressed, max_send_size, 4 * biggest); free(compressed); pcmk_free_ipc_event(iov); return rc; } } header->qb.size = iov[0].iov_len + iov[1].iov_len; header->qb.id = (int32_t)request; /* Replying to a specific request */ *result = iov; CRM_ASSERT(header->qb.size > 0); return header->qb.size; } ssize_t crm_ipcs_sendv(crm_client_t * c, struct iovec * iov, enum crm_ipc_flags flags) { ssize_t rc; static uint32_t id = 1; struct crm_ipc_response_header *header = iov[0].iov_base; if (c->flags & crm_client_flag_ipc_proxied) { /* _ALL_ replies to proxied connections need to be sent as events */ if (is_not_set(flags, crm_ipc_server_event)) { flags |= crm_ipc_server_event; /* this flag lets us know this was originally meant to be a response. * even though we're sending it over the event channel. */ flags |= crm_ipc_proxied_relay_response; } } header->flags |= flags; if (flags & crm_ipc_server_event) { header->qb.id = id++; /* We don't really use it, but doesn't hurt to set one */ if (flags & crm_ipc_server_free) { crm_trace("Sending the original to %p[%d]", c->ipcs, c->pid); add_event(c, iov); } else { struct iovec *iov_copy = pcmk__new_ipc_event(); crm_trace("Sending a copy to %p[%d]", c->ipcs, c->pid); iov_copy[0].iov_len = iov[0].iov_len; iov_copy[0].iov_base = malloc(iov[0].iov_len); memcpy(iov_copy[0].iov_base, iov[0].iov_base, iov[0].iov_len); iov_copy[1].iov_len = iov[1].iov_len; iov_copy[1].iov_base = malloc(iov[1].iov_len); memcpy(iov_copy[1].iov_base, iov[1].iov_base, iov[1].iov_len); add_event(c, iov_copy); } } else { CRM_LOG_ASSERT(header->qb.id != 0); /* Replying to a specific request */ rc = qb_ipcs_response_sendv(c->ipcs, iov, 2); if (rc < header->qb.size) { crm_notice("Response %d to pid %d failed: %s " CRM_XS " bytes=%u rc=%lld ipcs=%p", header->qb.id, c->pid, pcmk_strerror(rc), header->qb.size, (long long) rc, c->ipcs); } else { crm_trace("Response %d sent, %lld bytes to %p[%d]", header->qb.id, (long long) rc, c->ipcs, c->pid); } if (flags & crm_ipc_server_free) { pcmk_free_ipc_event(iov); } } if (flags & crm_ipc_server_event) { rc = crm_ipcs_flush_events(c); } else { crm_ipcs_flush_events(c); } if (rc == -EPIPE || rc == -ENOTCONN) { crm_trace("Client %p disconnected", c->ipcs); } return rc; } ssize_t crm_ipcs_send(crm_client_t * c, uint32_t request, xmlNode * message, enum crm_ipc_flags flags) { struct iovec *iov = NULL; ssize_t rc = 0; if(c == NULL) { return -EDESTADDRREQ; } crm_ipc_init(); rc = crm_ipc_prepare(request, message, &iov, ipc_buffer_max); if (rc > 0) { rc = crm_ipcs_sendv(c, iov, flags | crm_ipc_server_free); } else { pcmk_free_ipc_event(iov); crm_notice("Message to pid %d failed: %s " CRM_XS " rc=%lld ipcs=%p", c->pid, pcmk_strerror(rc), (long long) rc, c->ipcs); } return rc; } void crm_ipcs_send_ack(crm_client_t * c, uint32_t request, uint32_t flags, const char *tag, const char *function, int line) { if (flags & crm_ipc_client_response) { xmlNode *ack = create_xml_node(NULL, tag); crm_trace("Ack'ing msg from %s (%p)", crm_client_name(c), c); c->request_id = 0; crm_xml_add(ack, "function", function); crm_xml_add_int(ack, "line", line); crm_ipcs_send(c, request, ack, flags); free_xml(ack); } } /* Client... */ #define MIN_MSG_SIZE 12336 /* sizeof(struct qb_ipc_connection_response) */ #define MAX_MSG_SIZE 128*1024 /* 128k default */ struct crm_ipc_s { struct pollfd pfd; /* the max size we can send/receive over ipc */ unsigned int max_buf_size; /* Size of the allocated 'buffer' */ unsigned int buf_size; int msg_size; int need_reply; char *buffer; char *name; qb_ipcc_connection_t *ipc; }; static unsigned int pick_ipc_buffer(unsigned int max) { static unsigned int global_max = 0; if (global_max == 0) { const char *env = getenv("PCMK_ipc_buffer"); if (env) { int env_max = crm_parse_int(env, "0"); global_max = (env_max > 0)? QB_MAX(MIN_MSG_SIZE, env_max) : MAX_MSG_SIZE; } else { global_max = MAX_MSG_SIZE; } } return QB_MAX(max, global_max); } crm_ipc_t * crm_ipc_new(const char *name, size_t max_size) { crm_ipc_t *client = NULL; client = calloc(1, sizeof(crm_ipc_t)); client->name = strdup(name); client->buf_size = pick_ipc_buffer(max_size); client->buffer = malloc(client->buf_size); /* Clients initiating connection pick the max buf size */ client->max_buf_size = client->buf_size; client->pfd.fd = -1; client->pfd.events = POLLIN; client->pfd.revents = 0; return client; } /*! * \brief Establish an IPC connection to a Pacemaker component * * \param[in] client Connection instance obtained from crm_ipc_new() * - * \return TRUE on success, FALSE otherwise (in which case errno will be set) + * \return TRUE on success, FALSE otherwise (in which case errno will be set; + * specifically, in case of discovering the remote side is not + * authentic, its value is set to ECONNABORTED). */ bool crm_ipc_connect(crm_ipc_t * client) { + static uid_t cl_uid = 0; + static gid_t cl_gid = 0; + pid_t found_pid = 0; uid_t found_uid = 0; gid_t found_gid = 0; + int rv; + client->need_reply = FALSE; client->ipc = qb_ipcc_connect(client->name, client->buf_size); if (client->ipc == NULL) { crm_debug("Could not establish %s connection: %s (%d)", client->name, pcmk_strerror(errno), errno); return FALSE; } client->pfd.fd = crm_ipc_get_fd(client); if (client->pfd.fd < 0) { - crm_debug("Could not obtain file descriptor for %s connection: %s (%d)", client->name, pcmk_strerror(errno), errno); + rv = errno; + /* message already omitted */ + crm_ipc_close(client); + errno = rv; + return FALSE; + } + + if (!cl_uid && !cl_gid + && (rv = crm_user_lookup(CRM_DAEMON_USER, &cl_uid, &cl_gid)) < 0) { + errno = -rv; + /* message already omitted */ + crm_ipc_close(client); + errno = -rv; + return FALSE; + } + + if (!(rv = crm_ipc_is_authentic_process(client->pfd.fd, cl_uid, cl_gid, + &found_pid, &found_uid, + &found_gid))) { + crm_err("Daemon (IPC %s) is not authentic:" + " process %lld (uid: %lld, gid: %lld)", + client->name, (long long) PCMK__SPECIAL_PID_AS_0(found_pid), + (long long) found_uid, (long long) found_gid); + crm_ipc_close(client); + errno = ECONNABORTED; + return FALSE; + + } else if (rv < 0) { + errno = -rv; + crm_perror(LOG_ERR, "Could not verify authenticity of daemon (IPC %s)", + client->name); + crm_ipc_close(client); + errno = -rv; return FALSE; } qb_ipcc_context_set(client->ipc, client); #ifdef HAVE_IPCS_GET_BUFFER_SIZE client->max_buf_size = qb_ipcc_get_buffer_size(client->ipc); if (client->max_buf_size > client->buf_size) { free(client->buffer); client->buffer = calloc(1, client->max_buf_size); client->buf_size = client->max_buf_size; } #endif return TRUE; } void crm_ipc_close(crm_ipc_t * client) { if (client) { crm_trace("Disconnecting %s IPC connection %p (%p)", client->name, client, client->ipc); if (client->ipc) { qb_ipcc_connection_t *ipc = client->ipc; client->ipc = NULL; qb_ipcc_disconnect(ipc); } } } void crm_ipc_destroy(crm_ipc_t * client) { if (client) { if (client->ipc && qb_ipcc_is_connected(client->ipc)) { crm_notice("Destroying an active IPC connection to %s", client->name); /* The next line is basically unsafe * * If this connection was attached to mainloop and mainloop is active, * the 'disconnected' callback will end up back here and we'll end * up free'ing the memory twice - something that can still happen * even without this if we destroy a connection and it closes before * we call exit */ /* crm_ipc_close(client); */ } crm_trace("Destroying IPC connection to %s: %p", client->name, client); free(client->buffer); free(client->name); free(client); } } int crm_ipc_get_fd(crm_ipc_t * client) { int fd = 0; if (client && client->ipc && (qb_ipcc_fd_get(client->ipc, &fd) == 0)) { return fd; } errno = EINVAL; crm_perror(LOG_ERR, "Could not obtain file IPC descriptor for %s", (client? client->name : "unspecified client")); return -errno; } bool crm_ipc_connected(crm_ipc_t * client) { bool rc = FALSE; if (client == NULL) { crm_trace("No client"); return FALSE; } else if (client->ipc == NULL) { crm_trace("No connection"); return FALSE; } else if (client->pfd.fd < 0) { crm_trace("Bad descriptor"); return FALSE; } rc = qb_ipcc_is_connected(client->ipc); if (rc == FALSE) { client->pfd.fd = -EINVAL; } return rc; } /*! * \brief Check whether an IPC connection is ready to be read * * \param[in] client Connection to check * * \return Positive value if ready to be read, 0 if not ready, -errno on error */ int crm_ipc_ready(crm_ipc_t *client) { int rc; CRM_ASSERT(client != NULL); if (crm_ipc_connected(client) == FALSE) { return -ENOTCONN; } client->pfd.revents = 0; rc = poll(&(client->pfd), 1, 0); return (rc < 0)? -errno : rc; } static int crm_ipc_decompress(crm_ipc_t * client) { struct crm_ipc_response_header *header = (struct crm_ipc_response_header *)(void*)client->buffer; if (header->size_compressed) { int rc = 0; unsigned int size_u = 1 + header->size_uncompressed; /* never let buf size fall below our max size required for ipc reads. */ unsigned int new_buf_size = QB_MAX((hdr_offset + size_u), client->max_buf_size); char *uncompressed = calloc(1, new_buf_size); crm_trace("Decompressing message data %u bytes into %u bytes", header->size_compressed, size_u); rc = BZ2_bzBuffToBuffDecompress(uncompressed + hdr_offset, &size_u, client->buffer + hdr_offset, header->size_compressed, 1, 0); if (rc != BZ_OK) { crm_err("Decompression failed: %s " CRM_XS " bzerror=%d", bz2_strerror(rc), rc); free(uncompressed); return -EILSEQ; } /* * This assert no longer holds true. For an identical msg, some clients may * require compression, and others may not. If that same msg (event) is sent * to multiple clients, it could result in some clients receiving a compressed * msg even though compression was not explicitly required for them. * * CRM_ASSERT((header->size_uncompressed + hdr_offset) >= ipc_buffer_max); */ CRM_ASSERT(size_u == header->size_uncompressed); memcpy(uncompressed, client->buffer, hdr_offset); /* Preserve the header */ header = (struct crm_ipc_response_header *)(void*)uncompressed; free(client->buffer); client->buf_size = new_buf_size; client->buffer = uncompressed; } CRM_ASSERT(client->buffer[hdr_offset + header->size_uncompressed - 1] == 0); return pcmk_ok; } long crm_ipc_read(crm_ipc_t * client) { struct crm_ipc_response_header *header = NULL; CRM_ASSERT(client != NULL); CRM_ASSERT(client->ipc != NULL); CRM_ASSERT(client->buffer != NULL); crm_ipc_init(); client->buffer[0] = 0; client->msg_size = qb_ipcc_event_recv(client->ipc, client->buffer, client->buf_size, 0); if (client->msg_size >= 0) { int rc = crm_ipc_decompress(client); if (rc != pcmk_ok) { return rc; } header = (struct crm_ipc_response_header *)(void*)client->buffer; if(header->version > PCMK_IPC_VERSION) { crm_err("Filtering incompatible v%d IPC message, we only support versions <= %d", header->version, PCMK_IPC_VERSION); return -EBADMSG; } crm_trace("Received %s event %d, size=%u, rc=%d, text: %.100s", client->name, header->qb.id, header->qb.size, client->msg_size, client->buffer + hdr_offset); } else { crm_trace("No message from %s received: %s", client->name, pcmk_strerror(client->msg_size)); } if (crm_ipc_connected(client) == FALSE || client->msg_size == -ENOTCONN) { crm_err("Connection to %s failed", client->name); } if (header) { /* Data excluding the header */ return header->size_uncompressed; } return -ENOMSG; } const char * crm_ipc_buffer(crm_ipc_t * client) { CRM_ASSERT(client != NULL); return client->buffer + sizeof(struct crm_ipc_response_header); } uint32_t crm_ipc_buffer_flags(crm_ipc_t * client) { struct crm_ipc_response_header *header = NULL; CRM_ASSERT(client != NULL); if (client->buffer == NULL) { return 0; } header = (struct crm_ipc_response_header *)(void*)client->buffer; return header->flags; } const char * crm_ipc_name(crm_ipc_t * client) { CRM_ASSERT(client != NULL); return client->name; } static int internal_ipc_send_recv(crm_ipc_t * client, const void *iov) { int rc = 0; do { rc = qb_ipcc_sendv_recv(client->ipc, iov, 2, client->buffer, client->buf_size, -1); } while (rc == -EAGAIN && crm_ipc_connected(client)); return rc; } static int internal_ipc_send_request(crm_ipc_t * client, const void *iov, int ms_timeout) { int rc = 0; time_t timeout = time(NULL) + 1 + (ms_timeout / 1000); do { rc = qb_ipcc_sendv(client->ipc, iov, 2); } while (rc == -EAGAIN && time(NULL) < timeout && crm_ipc_connected(client)); return rc; } static int internal_ipc_get_reply(crm_ipc_t * client, int request_id, int ms_timeout) { time_t timeout = time(NULL) + 1 + (ms_timeout / 1000); int rc = 0; crm_ipc_init(); /* get the reply */ crm_trace("client %s waiting on reply to msg id %d", client->name, request_id); do { rc = qb_ipcc_recv(client->ipc, client->buffer, client->buf_size, 1000); if (rc > 0) { struct crm_ipc_response_header *hdr = NULL; int rc = crm_ipc_decompress(client); if (rc != pcmk_ok) { return rc; } hdr = (struct crm_ipc_response_header *)(void*)client->buffer; if (hdr->qb.id == request_id) { /* Got it */ break; } else if (hdr->qb.id < request_id) { xmlNode *bad = string2xml(crm_ipc_buffer(client)); crm_err("Discarding old reply %d (need %d)", hdr->qb.id, request_id); crm_log_xml_notice(bad, "OldIpcReply"); } else { xmlNode *bad = string2xml(crm_ipc_buffer(client)); crm_err("Discarding newer reply %d (need %d)", hdr->qb.id, request_id); crm_log_xml_notice(bad, "ImpossibleReply"); CRM_ASSERT(hdr->qb.id <= request_id); } } else if (crm_ipc_connected(client) == FALSE) { crm_err("Server disconnected client %s while waiting for msg id %d", client->name, request_id); break; } } while (time(NULL) < timeout); return rc; } int crm_ipc_send(crm_ipc_t * client, xmlNode * message, enum crm_ipc_flags flags, int32_t ms_timeout, xmlNode ** reply) { long rc = 0; struct iovec *iov; static uint32_t id = 0; static int factor = 8; struct crm_ipc_response_header *header; crm_ipc_init(); if (client == NULL) { crm_notice("Invalid connection"); return -ENOTCONN; } else if (crm_ipc_connected(client) == FALSE) { /* Don't even bother */ crm_notice("Connection to %s closed", client->name); return -ENOTCONN; } if (ms_timeout == 0) { ms_timeout = 5000; } if (client->need_reply) { crm_trace("Trying again to obtain pending reply from %s", client->name); rc = qb_ipcc_recv(client->ipc, client->buffer, client->buf_size, ms_timeout); if (rc < 0) { crm_warn("Sending to %s (%p) is disabled until pending reply is received", client->name, client->ipc); return -EALREADY; } else { crm_notice("Lost reply from %s (%p) finally arrived, sending re-enabled", client->name, client->ipc); client->need_reply = FALSE; } } id++; CRM_LOG_ASSERT(id != 0); /* Crude wrap-around detection */ rc = crm_ipc_prepare(id, message, &iov, client->max_buf_size); if(rc < 0) { return rc; } header = iov[0].iov_base; header->flags |= flags; if(is_set(flags, crm_ipc_proxied)) { /* Don't look for a synchronous response */ clear_bit(flags, crm_ipc_client_response); } if(header->size_compressed) { if(factor < 10 && (client->max_buf_size / 10) < (rc / factor)) { crm_notice("Compressed message exceeds %d0%% of the configured ipc limit (%u bytes), " "consider setting PCMK_ipc_buffer to %u or higher", factor, client->max_buf_size, 2 * client->max_buf_size); factor++; } } crm_trace("Sending from client: %s request id: %d bytes: %u timeout:%d msg...", client->name, header->qb.id, header->qb.size, ms_timeout); if (ms_timeout > 0 || is_not_set(flags, crm_ipc_client_response)) { rc = internal_ipc_send_request(client, iov, ms_timeout); if (rc <= 0) { crm_trace("Failed to send from client %s request %d with %u bytes...", client->name, header->qb.id, header->qb.size); goto send_cleanup; } else if (is_not_set(flags, crm_ipc_client_response)) { crm_trace("Message sent, not waiting for reply to %d from %s to %u bytes...", header->qb.id, client->name, header->qb.size); goto send_cleanup; } rc = internal_ipc_get_reply(client, header->qb.id, ms_timeout); if (rc < 0) { /* No reply, for now, disable sending * * The alternative is to close the connection since we don't know * how to detect and discard out-of-sequence replies * * TODO - implement the above */ client->need_reply = TRUE; } } else { rc = internal_ipc_send_recv(client, iov); } if (rc > 0) { struct crm_ipc_response_header *hdr = (struct crm_ipc_response_header *)(void*)client->buffer; crm_trace("Received response %d, size=%u, rc=%ld, text: %.200s", hdr->qb.id, hdr->qb.size, rc, crm_ipc_buffer(client)); if (reply) { *reply = string2xml(crm_ipc_buffer(client)); } } else { crm_trace("Response not received: rc=%ld, errno=%d", rc, errno); } send_cleanup: if (crm_ipc_connected(client) == FALSE) { crm_notice("Connection to %s closed: %s (%ld)", client->name, pcmk_strerror(rc), rc); } else if (rc == -ETIMEDOUT) { crm_warn("Request %d to %s (%p) failed: %s (%ld) after %dms", header->qb.id, client->name, client->ipc, pcmk_strerror(rc), rc, ms_timeout); crm_write_blackbox(0, NULL); } else if (rc <= 0) { crm_warn("Request %d to %s (%p) failed: %s (%ld)", header->qb.id, client->name, client->ipc, pcmk_strerror(rc), rc); } pcmk_free_ipc_event(iov); return rc; } int crm_ipc_is_authentic_process(int sock, uid_t refuid, gid_t refgid, pid_t *gotpid, uid_t *gotuid, gid_t *gotgid) { int ret = 0; pid_t found_pid = 0; uid_t found_uid = 0; gid_t found_gid = 0; #if defined(US_AUTH_PEERCRED_UCRED) struct ucred ucred; socklen_t ucred_len = sizeof(ucred); if (!getsockopt(sock, SOL_SOCKET, SO_PEERCRED, &ucred, &ucred_len) && ucred_len == sizeof(ucred)) { found_pid = ucred.pid; found_uid = ucred.uid; found_gid = ucred.gid; #elif defined(US_AUTH_PEERCRED_SOCKPEERCRED) struct sockpeercred sockpeercred; socklen_t sockpeercred_len = sizeof(sockpeercred); if (!getsockopt(sock, SOL_SOCKET, SO_PEERCRED, &sockpeercred, &sockpeercred_len) && sockpeercred_len == sizeof(sockpeercred_len)) { found_pid = sockpeercred.pid; found_uid = sockpeercred.uid; found_gid = sockpeercred.gid; #elif defined(US_AUTH_GETPEEREID) if (!getpeereid(sock, &found_uid, &found_gid)) { found_pid = PCMK__SPECIAL_PID; /* cannot obtain PID (FreeBSD) */ #elif defined(US_AUTH_GETPEERUCRED) ucred_t *ucred; if (!getpeerucred(sock, &ucred)) { errno = 0; found_pid = ucred_getpid(ucred); found_uid = ucred_geteuid(ucred); found_gid = ucred_getegid(ucred); ret = -errno; ucred_free(ucred); if (ret) { return (ret < 0) ? ret : -pcmk_err_generic; } #else # error "No way to authenticate a Unix socket peer" errno = 0; if (0) { #endif if (gotpid != NULL) { *gotpid = found_pid; } if (gotuid != NULL) { *gotuid = found_uid; } if (gotgid != NULL) { *gotgid = found_gid; } ret = (found_uid == 0 || found_uid == refuid || found_gid == refgid); } else { ret = (errno > 0) ? -errno : -pcmk_err_generic; } return ret; } int pcmk__ipc_is_authentic_process_active(const char *name, uid_t refuid, gid_t refgid, pid_t *gotpid) { static char last_asked_name[PATH_MAX / 2] = ""; /* log spam prevention */ int fd, ret = 0; pid_t found_pid = 0; uid_t found_uid = 0; gid_t found_gid = 0; qb_ipcc_connection_t *c; if ((c = qb_ipcc_connect(name, 0)) == NULL) { crm_info("Could not connect to %s IPC: %s", name, strerror(errno)); } else if ((ret = qb_ipcc_fd_get(c, &fd))) { crm_err("Could not get fd from %s IPC: %s (%d)", name, strerror(-ret), -ret); ret = -1; } else if ((ret = crm_ipc_is_authentic_process(fd, refuid, refgid, &found_pid, &found_uid, &found_gid)) < 0) { if (ret == -pcmk_err_generic) { crm_err("Could not get peer credentials from %s IPC", name); } else { crm_err("Could not get peer credentials from %s IPC: %s (%d)", name, strerror(-ret), -ret); } ret = -1; } else { if (gotpid != NULL) { *gotpid = found_pid; } if (!ret) { crm_err("Daemon (IPC %s) effectively blocked with unauthorized" " process %lld (uid: %lld, gid: %lld)", name, (long long) PCMK__SPECIAL_PID_AS_0(found_pid), (long long) found_uid, (long long) found_gid); ret = -2; } else if ((found_uid != refuid || found_gid != refgid) && strncmp(last_asked_name, name, sizeof(last_asked_name))) { if (!found_uid && refuid) { crm_warn("Daemon (IPC %s) runs as root, whereas the expected" " credentials are %lld:%lld, hazard of violating" " the least privilege principle", name, (long long) refuid, (long long) refgid); } else { crm_notice("Daemon (IPC %s) runs as %lld:%lld, whereas the" " expected credentials are %lld:%lld, which may" " mean a different set of privileges than expected", name, (long long) found_uid, (long long) found_gid, (long long) refuid, (long long) refgid); } memccpy(last_asked_name, name, '\0', sizeof(last_asked_name)); } } if (ret) { /* here, !ret only when we could not initially connect */ qb_ipcc_disconnect(c); } return ret; } /* Utils */ xmlNode * create_hello_message(const char *uuid, const char *client_name, const char *major_version, const char *minor_version) { xmlNode *hello_node = NULL; xmlNode *hello = NULL; if (uuid == NULL || strlen(uuid) == 0 || client_name == NULL || strlen(client_name) == 0 || major_version == NULL || strlen(major_version) == 0 || minor_version == NULL || strlen(minor_version) == 0) { crm_err("Missing fields, Hello message will not be valid."); return NULL; } hello_node = create_xml_node(NULL, XML_TAG_OPTIONS); crm_xml_add(hello_node, "major_version", major_version); crm_xml_add(hello_node, "minor_version", minor_version); crm_xml_add(hello_node, "client_name", client_name); crm_xml_add(hello_node, "client_uuid", uuid); crm_trace("creating hello message"); hello = create_request(CRM_OP_HELLO, hello_node, NULL, NULL, client_name, uuid); free_xml(hello_node); return hello; }