Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/lib/cluster/corosync.c b/lib/cluster/corosync.c
index 8797911c1a..5f0991a30e 100644
--- a/lib/cluster/corosync.c
+++ b/lib/cluster/corosync.c
@@ -1,1086 +1,1092 @@
/*
* Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include <crm_internal.h>
#include <bzlib.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <crm/common/ipc.h>
#include <crm/cluster/internal.h>
#include <crm/common/mainloop.h>
#include <sys/utsname.h>
#include <qb/qbipcc.h>
#include <qb/qbutil.h>
#include <corosync/corodefs.h>
#include <corosync/corotypes.h>
#include <corosync/hdb.h>
#include <corosync/cpg.h>
#include <corosync/cfg.h>
#include <corosync/cmap.h>
#include <corosync/quorum.h>
#include <crm/msg_xml.h>
cpg_handle_t pcmk_cpg_handle = 0;
struct cpg_name pcmk_cpg_group = {
.length = 0,
.value[0] = 0,
};
quorum_handle_t pcmk_quorum_handle = 0;
gboolean(*quorum_app_callback) (unsigned long long seq, gboolean quorate) = NULL;
static char *pcmk_uname = NULL;
static int pcmk_uname_len = 0;
static uint32_t pcmk_nodeid = 0;
#define cs_repeat(counter, max, code) do { \
code; \
if(rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { \
counter++; \
crm_debug("Retrying operation after %ds", counter); \
sleep(counter); \
} else { \
break; \
} \
} while(counter < max)
#ifndef INTERFACE_MAX
# define INTERFACE_MAX 2 /* from the private coroapi.h header */
#endif
static gboolean
corosync_name_is_valid(const char *key, const char *name)
{
int octet;
if(name == NULL) {
crm_trace("%s is empty", key);
return FALSE;
} else if(sscanf(name, "%d.%d.%d.%d", &octet, &octet, &octet, &octet) == 4) {
crm_trace("%s contains an ipv4 address, ignoring: %s", key, name);
return FALSE;
} else if(strstr(name, ":") != NULL) {
crm_trace("%s contains an ipv4 address, ignoring: %s", key, name);
return FALSE;
}
+ crm_trace("%s is valid", key);
return TRUE;
}
/*
* CFG functionality stolen from node_name() in corosync-quorumtool.c
* This resolves the first address assigned to a node and returns the name or IP address.
*/
static char *corosync_node_name(cmap_handle_t cmap_handle, uint32_t nodeid)
{
int lpc = 0;
int rc = CS_OK;
int retries = 0;
char *name = NULL;
cmap_handle_t local_handle = 0;
corosync_cfg_handle_t cfg_handle = 0;
static corosync_cfg_callbacks_t cfg_callbacks = {};
if(cmap_handle == 0 && local_handle == 0) {
retries = 0;
crm_trace("Initializing CMAP connection");
do {
rc = cmap_initialize(&local_handle);
if(rc != CS_OK) {
retries++;
crm_debug("API connection setup failed: %s. Retrying in %ds", cs_strerror(rc), retries);
sleep(retries);
}
} while(retries < 5 && rc != CS_OK);
if (rc != CS_OK) {
crm_warn("Could not connect to Cluster Configuration Database API, error %d", cs_strerror(rc));
local_handle = 0;
}
}
if(cmap_handle == 0) {
cmap_handle = local_handle;
}
while(name == NULL && cmap_handle != 0) {
uint32_t id = 0;
- char *name = NULL;
char *key = NULL;
key = g_strdup_printf("nodelist.node.%d.nodeid", lpc);
rc = cmap_get_uint32(cmap_handle, key, &id);
+ crm_trace("Checking %u vs %u from %s", nodeid, id, key);
g_free(key);
if(rc != CS_OK) {
break;
}
if(nodeid == id) {
+ crm_trace("Searching for node name for %u in nodelist.node.%d %s", nodeid, lpc, name);
if(name == NULL) {
key = g_strdup_printf("nodelist.node.%d.ring0_addr", lpc);
rc = cmap_get_string(cmap_handle, key, &name);
+ crm_trace("%s = %s", key, name);
if(corosync_name_is_valid(key, name) == FALSE) {
free(name); name = NULL;
}
g_free(key);
}
if(name == NULL) {
key = g_strdup_printf("nodelist.node.%d.name", lpc);
rc = cmap_get_string(cmap_handle, key, &name);
+ crm_trace("%s = %s %d", key, name, rc);
if(corosync_name_is_valid(key, name) == FALSE) {
free(name); name = NULL;
}
g_free(key);
}
break;
}
lpc++;
}
- if(local_handle) {
- cmap_finalize(local_handle);
- }
-
if(name == NULL) {
retries = 0;
crm_trace("Initializing CFG connection");
do {
rc = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks);
if(rc != CS_OK) {
retries++;
crm_debug("API connection setup failed: %s. Retrying in %ds", cs_strerror(rc), retries);
sleep(retries);
}
} while(retries < 5 && rc != CS_OK);
if (rc != CS_OK) {
crm_warn("Could not connect to the Corosync CFG API, error %d", cs_strerror(rc));
cfg_handle = 0;
}
}
if(name == NULL && cfg_handle != 0) {
int numaddrs;
char buf[INET6_ADDRSTRLEN];
socklen_t addrlen;
struct sockaddr_storage *ss;
corosync_cfg_node_address_t addrs[INTERFACE_MAX];
rc = corosync_cfg_get_node_addrs(cfg_handle, nodeid, INTERFACE_MAX, &numaddrs, addrs);
if (rc == CS_OK) {
ss = (struct sockaddr_storage *)addrs[0].address;
if (ss->ss_family == AF_INET6) {
addrlen = sizeof(struct sockaddr_in6);
} else {
addrlen = sizeof(struct sockaddr_in);
}
if (getnameinfo((struct sockaddr *)addrs[0].address, addrlen, buf, sizeof(buf), NULL, 0, 0) == 0) {
crm_notice("Inferred node name '%s' for nodeid %u from DNS", buf, nodeid);
if(corosync_name_is_valid("DNS", buf)) {
name = strdup(buf);
}
}
+ } else {
+ crm_debug("Unable to get node address for nodeid %u: %s", nodeid, cs_strerror(rc));
}
cmap_finalize(cfg_handle);
}
+ if(local_handle) {
+ cmap_finalize(local_handle);
+ }
+
if(name == NULL) {
- crm_err("Unable to get node address for nodeid %u: %s", nodeid, cs_strerror(rc));
+ crm_err("Unable to get node name for nodeid %u", nodeid);
}
return name;
}
enum crm_ais_msg_types
text2msg_type(const char *text)
{
int type = crm_msg_none;
CRM_CHECK(text != NULL, return type);
if (safe_str_eq(text, "ais")) {
type = crm_msg_ais;
} else if (safe_str_eq(text, "crm_plugin")) {
type = crm_msg_ais;
} else if (safe_str_eq(text, CRM_SYSTEM_CIB)) {
type = crm_msg_cib;
} else if (safe_str_eq(text, CRM_SYSTEM_CRMD)) {
type = crm_msg_crmd;
} else if (safe_str_eq(text, CRM_SYSTEM_DC)) {
type = crm_msg_crmd;
} else if (safe_str_eq(text, CRM_SYSTEM_TENGINE)) {
type = crm_msg_te;
} else if (safe_str_eq(text, CRM_SYSTEM_PENGINE)) {
type = crm_msg_pe;
} else if (safe_str_eq(text, CRM_SYSTEM_LRMD)) {
type = crm_msg_lrmd;
} else if (safe_str_eq(text, CRM_SYSTEM_STONITHD)) {
type = crm_msg_stonithd;
} else if (safe_str_eq(text, "stonith-ng")) {
type = crm_msg_stonith_ng;
} else if (safe_str_eq(text, "attrd")) {
type = crm_msg_attrd;
} else {
/* This will normally be a transient client rather than
* a cluster daemon. Set the type to the pid of the client
*/
int scan_rc = sscanf(text, "%d", &type);
if (scan_rc != 1) {
/* Ensure its sane */
type = crm_msg_none;
}
}
return type;
}
static char *ais_cluster_name = NULL;
gboolean
crm_get_cluster_name(char **cname)
{
CRM_CHECK(cname != NULL, return FALSE);
if (ais_cluster_name) {
*cname = strdup(ais_cluster_name);
return TRUE;
}
return FALSE;
}
gboolean
send_ais_text(int class, const char *data,
gboolean local, const char *node, enum crm_ais_msg_types dest)
{
static int msg_id = 0;
static int local_pid = 0;
int retries = 0;
int rc = CS_OK;
int buf_len = sizeof(cs_ipc_header_response_t);
char *buf = NULL;
struct iovec iov;
const char *transport = "pcmk";
AIS_Message *ais_msg = NULL;
enum crm_ais_msg_types sender = text2msg_type(crm_system_name);
/* There are only 6 handlers registered to crm_lib_service in plugin.c */
CRM_CHECK(class < 6, crm_err("Invalid message class: %d", class); return FALSE);
if (data == NULL) {
data = "";
}
if (local_pid == 0) {
local_pid = getpid();
}
if (sender == crm_msg_none) {
sender = local_pid;
}
ais_msg = calloc(1, sizeof(AIS_Message));
ais_msg->id = msg_id++;
ais_msg->header.id = class;
ais_msg->header.error = CS_OK;
ais_msg->host.type = dest;
ais_msg->host.local = local;
if (node) {
ais_msg->host.size = strlen(node);
memset(ais_msg->host.uname, 0, MAX_NAME);
memcpy(ais_msg->host.uname, node, ais_msg->host.size);
ais_msg->host.id = 0;
} else {
ais_msg->host.size = 0;
memset(ais_msg->host.uname, 0, MAX_NAME);
ais_msg->host.id = 0;
}
ais_msg->sender.id = 0;
ais_msg->sender.type = sender;
ais_msg->sender.pid = local_pid;
ais_msg->sender.size = pcmk_uname_len;
memset(ais_msg->sender.uname, 0, MAX_NAME);
memcpy(ais_msg->sender.uname, pcmk_uname, ais_msg->sender.size);
ais_msg->size = 1 + strlen(data);
if (ais_msg->size < CRM_BZ2_THRESHOLD) {
failback:
ais_msg = realloc(ais_msg, sizeof(AIS_Message) + ais_msg->size);
memcpy(ais_msg->data, data, ais_msg->size);
} else {
char *compressed = NULL;
char *uncompressed = strdup(data);
unsigned int len = (ais_msg->size * 1.1) + 600; /* recomended size */
crm_trace("Compressing message payload");
compressed = malloc( len);
rc = BZ2_bzBuffToBuffCompress(compressed, &len, uncompressed, ais_msg->size, CRM_BZ2_BLOCKS,
0, CRM_BZ2_WORK);
free(uncompressed);
if (rc != BZ_OK) {
crm_err("Compression failed: %d", rc);
free(compressed);
goto failback;
}
ais_msg = realloc(ais_msg, sizeof(AIS_Message) + len + 1);
memcpy(ais_msg->data, compressed, len);
ais_msg->data[len] = 0;
free(compressed);
ais_msg->is_compressed = TRUE;
ais_msg->compressed_size = len;
crm_trace("Compression details: %d -> %d", ais_msg->size, ais_data_len(ais_msg));
}
ais_msg->header.size = sizeof(AIS_Message) + ais_data_len(ais_msg);
crm_trace("Sending%s message %d to %s.%s (data=%d, total=%d)",
ais_msg->is_compressed ? " compressed" : "",
ais_msg->id, ais_dest(&(ais_msg->host)), msg_type2text(dest),
ais_data_len(ais_msg), ais_msg->header.size);
iov.iov_base = ais_msg;
iov.iov_len = ais_msg->header.size;
buf = realloc(buf, buf_len);
do {
if (rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) {
retries++;
crm_info("Peer overloaded or membership in flux:"
" Re-sending message (Attempt %d of 20)", retries);
sleep(retries); /* Proportional back off */
}
errno = 0;
transport = "cpg";
CRM_CHECK(dest != crm_msg_ais, rc = CS_ERR_MESSAGE_ERROR; goto bail);
rc = cpg_mcast_joined(pcmk_cpg_handle, CPG_TYPE_AGREED, &iov, 1);
if (rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) {
cpg_flow_control_state_t fc_state = CPG_FLOW_CONTROL_DISABLED;
int rc2 = cpg_flow_control_state_get(pcmk_cpg_handle, &fc_state);
if (rc2 == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) {
crm_warn("Connection overloaded, cannot send messages");
goto bail;
} else if (rc2 != CS_OK) {
crm_warn("Could not determin the connection state: %s (%d)",
ais_error2text(rc2), rc2);
goto bail;
}
}
} while ((rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) && retries < 20);
bail:
if (rc != CS_OK) {
crm_perror(LOG_ERR, "Sending message %d via %s: FAILED (rc=%d): %s",
ais_msg->id, transport, rc, ais_error2text(rc));
} else {
crm_trace("Message %d: sent", ais_msg->id);
}
free(buf);
free(ais_msg);
return (rc == CS_OK);
}
gboolean
send_ais_message(xmlNode * msg, gboolean local, const char *node, enum crm_ais_msg_types dest)
{
gboolean rc = TRUE;
char *data = dump_xml_unformatted(msg);
rc = send_ais_text(crm_class_cluster, data, local, node, dest);
free(data);
return rc;
}
void
terminate_ais_connection(void)
{
crm_notice("Disconnecting from Corosync");
if(pcmk_cpg_handle) {
crm_trace("Disconnecting CPG");
cpg_leave(pcmk_cpg_handle, &pcmk_cpg_group);
cpg_finalize(pcmk_cpg_handle);
pcmk_cpg_handle = 0;
} else {
crm_info("No CPG connection");
}
if(pcmk_quorum_handle) {
crm_trace("Disconnecting quorum");
quorum_finalize(pcmk_quorum_handle);
pcmk_quorum_handle = 0;
} else {
crm_info("No Quorum connection");
}
}
int ais_membership_timer = 0;
gboolean ais_membership_force = FALSE;
static gboolean
ais_dispatch_message(AIS_Message * msg, gboolean(*dispatch) (AIS_Message *, char *, int))
{
char *data = NULL;
char *uncompressed = NULL;
xmlNode *xml = NULL;
CRM_ASSERT(msg != NULL);
crm_trace("Got new%s message (size=%d, %d, %d)",
msg->is_compressed ? " compressed" : "",
ais_data_len(msg), msg->size, msg->compressed_size);
data = msg->data;
if (msg->is_compressed && msg->size > 0) {
int rc = BZ_OK;
unsigned int new_size = msg->size + 1;
if (check_message_sanity(msg, NULL) == FALSE) {
goto badmsg;
}
crm_trace("Decompressing message data");
uncompressed = calloc(1, new_size);
rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, data, msg->compressed_size, 1, 0);
if (rc != BZ_OK) {
crm_err("Decompression failed: %d", rc);
goto badmsg;
}
CRM_ASSERT(rc == BZ_OK);
CRM_ASSERT(new_size == msg->size);
data = uncompressed;
} else if (check_message_sanity(msg, data) == FALSE) {
goto badmsg;
} else if (safe_str_eq("identify", data)) {
int pid = getpid();
char *pid_s = crm_itoa(pid);
send_ais_text(crm_class_cluster, pid_s, TRUE, NULL, crm_msg_ais);
free(pid_s);
goto done;
}
if (msg->header.id != crm_class_members) {
crm_update_peer(__FUNCTION__, msg->sender.id, 0, 0, 0, 0, msg->sender.uname, msg->sender.uname, NULL,
NULL);
}
if (msg->header.id == crm_class_rmpeer) {
uint32_t id = crm_int_helper(data, NULL);
crm_info("Removing peer %s/%u", data, id);
reap_crm_member(id);
goto done;
}
crm_trace("Payload: %s", data);
if (dispatch != NULL) {
dispatch(msg, data, 0);
}
done:
free(uncompressed);
free_xml(xml);
return TRUE;
badmsg:
crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
" min=%d, total=%d, size=%d, bz2_size=%d",
msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
msg->sender.pid, (int)sizeof(AIS_Message),
msg->header.size, msg->size, msg->compressed_size);
goto done;
}
gboolean(*pcmk_cpg_dispatch_fn) (AIS_Message *, char *, int) = NULL;
static int
pcmk_cpg_dispatch(gpointer user_data)
{
int rc = 0;
pcmk_cpg_dispatch_fn = user_data;
rc = cpg_dispatch(pcmk_cpg_handle, CS_DISPATCH_ALL);
if (rc != CS_OK) {
crm_err("Connection to the CPG API failed: %d", rc);
return -1;
}
return 0;
}
static void
pcmk_cpg_deliver(cpg_handle_t handle,
const struct cpg_name *groupName,
uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
{
AIS_Message *ais_msg = (AIS_Message *) msg;
if (ais_msg->sender.id > 0 && ais_msg->sender.id != nodeid) {
crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, ais_msg->sender.id);
return;
} else if (ais_msg->host.size != 0 && safe_str_neq(ais_msg->host.uname, pcmk_uname)) {
/* Not for us */
return;
}
ais_msg->sender.id = nodeid;
if (ais_msg->sender.size == 0) {
crm_node_t *peer = crm_get_peer(nodeid, NULL);
if (peer == NULL) {
crm_err("Peer with nodeid=%u is unknown", nodeid);
} else if (peer->uname == NULL) {
crm_err("No uname for peer with nodeid=%u", nodeid);
} else {
crm_notice("Fixing uname for peer with nodeid=%u", nodeid);
ais_msg->sender.size = strlen(peer->uname);
memset(ais_msg->sender.uname, 0, MAX_NAME);
memcpy(ais_msg->sender.uname, peer->uname, ais_msg->sender.size);
}
}
ais_dispatch_message(ais_msg, pcmk_cpg_dispatch_fn);
}
static void
pcmk_cpg_membership(cpg_handle_t handle,
const struct cpg_name *groupName,
const struct cpg_address *member_list, size_t member_list_entries,
const struct cpg_address *left_list, size_t left_list_entries,
const struct cpg_address *joined_list, size_t joined_list_entries)
{
int i;
gboolean found = FALSE;
static int counter = 0;
for (i = 0; i < left_list_entries; i++) {
crm_node_t *peer = crm_get_peer(left_list[i].nodeid, NULL);
crm_info("Left[%d.%d] %s.%d ", counter, i, groupName->value, left_list[i].nodeid);
crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, OFFLINESTATUS);
}
for (i = 0; i < joined_list_entries; i++) {
crm_info("Joined[%d.%d] %s.%d ", counter, i, groupName->value, joined_list[i].nodeid);
}
for (i = 0; i < member_list_entries; i++) {
crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL);
crm_info("Member[%d.%d] %s.%d ", counter, i, groupName->value, member_list[i].nodeid);
crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS);
if(pcmk_nodeid == member_list[i].nodeid) {
found = TRUE;
}
}
if(!found) {
crm_err("We're not part of CPG group %s anymore!", groupName->value);
/* Possibly re-call cpg_join() */
}
counter++;
}
cpg_callbacks_t cpg_callbacks = {
.cpg_deliver_fn = pcmk_cpg_deliver,
.cpg_confchg_fn = pcmk_cpg_membership,
};
static gboolean
init_cpg_connection(gboolean(*dispatch) (AIS_Message *, char *, int), void (*destroy) (gpointer),
uint32_t * nodeid)
{
int rc = -1;
int fd = 0;
int retries = 0;
crm_node_t *peer = NULL;
struct mainloop_fd_callbacks cpg_fd_callbacks = {
.dispatch = pcmk_cpg_dispatch,
.destroy = destroy,
};
strcpy(pcmk_cpg_group.value, crm_system_name);
pcmk_cpg_group.length = strlen(crm_system_name) + 1;
cs_repeat(retries, 30, rc = cpg_initialize(&pcmk_cpg_handle, &cpg_callbacks));
if (rc != CS_OK) {
crm_err("Could not connect to the Cluster Process Group API: %d\n", rc);
goto bail;
}
retries = 0;
cs_repeat(retries, 30, rc = cpg_local_get(pcmk_cpg_handle, (unsigned int *)nodeid));
if (rc != CS_OK) {
crm_err("Could not get local node id from the CPG API");
goto bail;
}
retries = 0;
cs_repeat(retries, 30, rc = cpg_join(pcmk_cpg_handle, &pcmk_cpg_group));
if (rc != CS_OK) {
crm_err("Could not join the CPG group '%s': %d", crm_system_name, rc);
goto bail;
}
rc = cpg_fd_get(pcmk_cpg_handle, &fd);
if (rc != CS_OK) {
crm_err("Could not obtain the CPG API connection: %d\n", rc);
goto bail;
}
mainloop_add_fd("corosync-cpg", fd, dispatch, &cpg_fd_callbacks);
bail:
if (rc != CS_OK) {
cpg_finalize(pcmk_cpg_handle);
return FALSE;
}
peer = crm_get_peer(pcmk_nodeid, pcmk_uname);
crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS);
return TRUE;
}
static int
pcmk_quorum_dispatch(gpointer user_data)
{
int rc = 0;
rc = quorum_dispatch(pcmk_quorum_handle, CS_DISPATCH_ALL);
if (rc < 0) {
crm_err("Connection to the Quorum API failed: %d", rc);
return -1;
}
return 0;
}
static void
corosync_mark_unseen_peer_dead(gpointer key, gpointer value, gpointer user_data)
{
int *seq = user_data;
crm_node_t *node = value;
if (node->last_seen != *seq && crm_str_eq(CRM_NODE_LOST, node->state, TRUE) == FALSE) {
crm_notice("Node %d/%s was not seen in the previous transition", node->id, node->uname);
crm_update_peer(__FUNCTION__, node->id, 0, 0, 0, 0, NULL, NULL, NULL, CRM_NODE_LOST);
}
}
static void
corosync_mark_node_unseen(gpointer key, gpointer value, gpointer user_data)
{
crm_node_t *node = value;
node->last_seen = 0;
}
static void
pcmk_quorum_notification(quorum_handle_t handle,
uint32_t quorate,
uint64_t ring_id, uint32_t view_list_entries, uint32_t * view_list)
{
int i;
static gboolean init_phase = TRUE;
if (quorate != crm_have_quorum) {
crm_notice("Membership " U64T ": quorum %s (%lu)", ring_id,
quorate ? "acquired" : "lost", (long unsigned int)view_list_entries);
crm_have_quorum = quorate;
} else {
crm_info("Membership " U64T ": quorum %s (%lu)", ring_id,
quorate ? "retained" : "still lost", (long unsigned int)view_list_entries);
}
if(view_list_entries == 0 && init_phase) {
crm_info("Corosync membership is still forming, ignoring");
return;
}
init_phase = FALSE;
g_hash_table_foreach(crm_peer_cache, corosync_mark_node_unseen, NULL);
for (i = 0; i < view_list_entries; i++) {
uint32_t id = view_list[i];
char *name = NULL;
crm_node_t *node = NULL;
char *uuid = get_corosync_uuid(id, NULL);
crm_debug("Member[%d] %d ", i, id);
node = crm_get_peer(id, NULL);
if(node->uname == NULL) {
crm_info("Obtaining name for new node %u", id);
name = corosync_node_name(0, id);
}
crm_update_peer(__FUNCTION__, id, 0, ring_id, 0, 0, uuid, name, NULL, CRM_NODE_MEMBER);
free(name);
}
crm_trace("Reaping unseen nodes...");
g_hash_table_foreach(crm_peer_cache, corosync_mark_unseen_peer_dead, &ring_id);
if (quorum_app_callback) {
quorum_app_callback(ring_id, quorate);
}
}
quorum_callbacks_t quorum_callbacks = {
.quorum_notify_fn = pcmk_quorum_notification,
};
gboolean
init_quorum_connection(gboolean(*dispatch) (unsigned long long, gboolean),
void (*destroy) (gpointer))
{
int rc = -1;
int fd = 0;
int quorate = 0;
uint32_t quorum_type = 0;
struct mainloop_fd_callbacks quorum_fd_callbacks;
quorum_fd_callbacks.dispatch = pcmk_quorum_dispatch;
quorum_fd_callbacks.destroy = destroy;
crm_debug("Configuring Pacemaker to obtain quorum from Corosync");
rc = quorum_initialize(&pcmk_quorum_handle, &quorum_callbacks, &quorum_type);
if (rc != CS_OK) {
crm_err("Could not connect to the Quorum API: %d\n", rc);
goto bail;
} else if (quorum_type != QUORUM_SET) {
crm_err("Corosync quorum is not configured\n");
goto bail;
}
rc = quorum_getquorate(pcmk_quorum_handle, &quorate);
if (rc != CS_OK) {
crm_err("Could not obtain the current Quorum API state: %d\n", rc);
goto bail;
}
crm_notice("Quorum %s", quorate ? "acquired" : "lost");
quorum_app_callback = dispatch;
crm_have_quorum = quorate;
rc = quorum_trackstart(pcmk_quorum_handle, CS_TRACK_CHANGES | CS_TRACK_CURRENT);
if (rc != CS_OK) {
crm_err("Could not setup Quorum API notifications: %d\n", rc);
goto bail;
}
rc = quorum_fd_get(pcmk_quorum_handle, &fd);
if (rc != CS_OK) {
crm_err("Could not obtain the Quorum API connection: %d\n", rc);
goto bail;
}
mainloop_add_fd("quorum", fd, dispatch, &quorum_fd_callbacks);
corosync_initialize_nodelist(NULL, FALSE, NULL);
bail:
if (rc != CS_OK) {
quorum_finalize(pcmk_quorum_handle);
return FALSE;
}
return TRUE;
}
gboolean
init_ais_connection(gboolean(*dispatch) (AIS_Message *, char *, int), void (*destroy) (gpointer),
char **our_uuid, char **our_uname, int *nodeid)
{
int retries = 0;
while (retries++ < 30) {
int rc = init_ais_connection_once(dispatch, destroy, our_uuid, our_uname, nodeid);
switch (rc) {
case CS_OK:
return TRUE;
break;
case CS_ERR_TRY_AGAIN:
case CS_ERR_QUEUE_FULL:
break;
default:
return FALSE;
}
}
crm_err("Retry count exceeded: %d", retries);
return FALSE;
}
gboolean
init_ais_connection_once(gboolean(*dispatch) (AIS_Message *, char *, int),
void (*destroy) (gpointer), char **our_uuid, char **our_uname, int *nodeid)
{
struct utsname res;
enum cluster_type_e stack = get_cluster_type();
crm_peer_init();
/* Here we just initialize comms */
if(stack != pcmk_cluster_corosync) {
crm_err("Invalid cluster type: %s (%d)", name_for_cluster_type(stack), stack);
return FALSE;
}
if (init_cpg_connection(dispatch, destroy, &pcmk_nodeid) == FALSE) {
return FALSE;
} else if (uname(&res) < 0) {
crm_perror(LOG_ERR, "Could not determin the current host");
exit(100);
} else {
pcmk_uname = strdup(res.nodename);
}
crm_info("Connection to '%s': established", name_for_cluster_type(stack));
CRM_ASSERT(pcmk_uname != NULL);
pcmk_uname_len = strlen(pcmk_uname);
if (pcmk_nodeid != 0) {
/* Ensure the local node always exists */
crm_update_peer(__FUNCTION__, pcmk_nodeid, 0, 0, 0, 0, pcmk_uname, pcmk_uname, NULL, NULL);
}
if (our_uuid != NULL) {
*our_uuid = get_corosync_uuid(pcmk_nodeid, pcmk_uname);
}
if (our_uname != NULL) {
*our_uname = strdup(pcmk_uname);
}
if (nodeid != NULL) {
*nodeid = pcmk_nodeid;
}
return TRUE;
}
gboolean
check_message_sanity(const AIS_Message * msg, const char *data)
{
gboolean sane = TRUE;
int dest = msg->host.type;
int tmp_size = msg->header.size - sizeof(AIS_Message);
if (sane && msg->header.size == 0) {
crm_warn("Message with no size");
sane = FALSE;
}
if (sane && msg->header.error != CS_OK) {
crm_warn("Message header contains an error: %d", msg->header.error);
sane = FALSE;
}
if (sane && ais_data_len(msg) != tmp_size) {
crm_warn("Message payload size is incorrect: expected %d, got %d", ais_data_len(msg),
tmp_size);
sane = TRUE;
}
if (sane && ais_data_len(msg) == 0) {
crm_warn("Message with no payload");
sane = FALSE;
}
if (sane && data && msg->is_compressed == FALSE) {
int str_size = strlen(data) + 1;
if (ais_data_len(msg) != str_size) {
int lpc = 0;
crm_warn("Message payload is corrupted: expected %d bytes, got %d",
ais_data_len(msg), str_size);
sane = FALSE;
for (lpc = (str_size - 10); lpc < msg->size; lpc++) {
if (lpc < 0) {
lpc = 0;
}
crm_debug("bad_data[%d]: %d / '%c'", lpc, data[lpc], data[lpc]);
}
}
}
if (sane == FALSE) {
crm_err("Invalid message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)",
msg->id, ais_dest(&(msg->host)), msg_type2text(dest),
ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
msg->sender.pid, msg->is_compressed, ais_data_len(msg), msg->header.size);
} else {
crm_trace
("Verfied message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)",
msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)),
msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed,
ais_data_len(msg), msg->header.size);
}
return sane;
}
enum cluster_type_e
find_corosync_variant(void)
{
int rc = CS_OK;
cmap_handle_t handle;
/* There can be only one (possibility if confdb isn't around) */
rc = cmap_initialize(&handle);
if (rc != CS_OK) {
crm_info("Failed to initialize the cmap API. Error %d", rc);
return pcmk_cluster_unknown;
}
cmap_finalize(handle);
return pcmk_cluster_corosync;
}
gboolean
crm_is_corosync_peer_active(const crm_node_t * node)
{
if (node == NULL) {
crm_trace("NULL");
return FALSE;
} else if(safe_str_neq(node->state, CRM_NODE_MEMBER)) {
crm_trace("%s: state=%s", node->uname, node->state);
return FALSE;
} else if((node->processes & crm_proc_cpg) == 0) {
crm_trace("%s: processes=%.16x", node->uname, node->processes);
return FALSE;
}
return TRUE;
}
gboolean
corosync_initialize_nodelist(void *cluster, gboolean force_member, xmlNode *xml_parent)
{
int lpc = 0;
int rc = CS_OK;
int retries = 0;
gboolean any = FALSE;
cmap_handle_t cmap_handle;
do {
rc = cmap_initialize(&cmap_handle);
if(rc != CS_OK) {
retries++;
crm_debug("API connection setup failed: %s. Retrying in %ds", cs_strerror(rc), retries);
sleep(retries);
}
} while(retries < 5 && rc != CS_OK);
if (rc != CS_OK) {
crm_warn("Could not connect to Cluster Configuration Database API, error %d", rc);
return FALSE;
}
crm_trace("Initializing corosync nodelist");
for(lpc = 0; ; lpc++) {
uint32_t nodeid = 0;
char *name = NULL;
char *key = NULL;
key = g_strdup_printf("nodelist.node.%d.nodeid", lpc);
rc = cmap_get_uint32(cmap_handle, key, &nodeid);
g_free(key);
if(rc != CS_OK) {
break;
}
name = corosync_node_name(cmap_handle, nodeid);
if(nodeid > 0 || name != NULL) {
crm_trace("Initializing node[%d] %u = %s", lpc, nodeid, name);
crm_get_peer(nodeid, name);
}
if(nodeid > 0 && name != NULL) {
any = TRUE;
if(xml_parent) {
xmlNode *node = create_xml_node(xml_parent, XML_CIB_TAG_NODE);
crm_xml_add_int(node, XML_ATTR_ID, nodeid);
crm_xml_add(node, XML_ATTR_UNAME, name);
if(force_member) {
crm_xml_add(node, XML_ATTR_TYPE, CRM_NODE_MEMBER);
}
}
}
free(name);
}
cmap_finalize(cmap_handle);
return any;
}

File Metadata

Mime Type
text/x-diff
Expires
Wed, Jun 25, 4:13 AM (1 d, 10 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1952109
Default Alt Text
(33 KB)

Event Timeline