Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F1842119
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
60 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/lib/ais/plugin.c b/lib/ais/plugin.c
index 7bb9f7339b..08d711f9f3 100644
--- a/lib/ais/plugin.c
+++ b/lib/ais/plugin.c
@@ -1,1328 +1,1328 @@
/*
* Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <crm_internal.h>
#include <sys/types.h>
#include <sys/uio.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <signal.h>
#include <string.h>
#include <config.h>
#include <crm/ais_common.h>
#include "plugin.h"
#include "utils.h"
#ifdef AIS_COROSYNC
# include <corosync/totem/totempg.h>
#endif
#include <glib/ghash.h>
#include <sys/utsname.h>
#include <sys/socket.h>
#include <pthread.h>
#include <sys/wait.h>
#include <bzlib.h>
plugin_init_type *crm_api = NULL;
int use_mgmtd = 0;
int plugin_log_level = LOG_DEBUG;
char *local_uname = NULL;
int local_uname_len = 0;
uint32_t local_nodeid = 0;
char *ipc_channel_name = NULL;
uint64_t membership_seq = 0;
pthread_t crm_wait_thread;
gboolean wait_active = TRUE;
gboolean have_reliable_membership_id = FALSE;
GHashTable *membership_list = NULL;
GHashTable *membership_notify_list = NULL;
#define MAX_RESPAWN 100
#define crm_flag_none 0x00000000
#define crm_flag_members 0x00000001
struct crm_identify_msg_s
{
mar_req_header_t header __attribute__((aligned(8)));
uint32_t id;
uint32_t pid;
int32_t votes;
uint32_t processes;
char uname[256];
char version[256];
uint64_t born_on;
} __attribute__((packed));
static crm_child_t crm_children[] = {
{ 0, crm_proc_none, crm_flag_none, 0, 0, FALSE, "none", NULL, NULL, NULL, NULL },
{ 0, crm_proc_ais, crm_flag_none, 0, 0, FALSE, "ais", NULL, NULL, NULL, NULL },
{ 0, crm_proc_lrmd, crm_flag_none, 3, 0, TRUE, "lrmd", NULL, HA_LIBHBDIR"/lrmd", NULL, NULL },
{ 0, crm_proc_cib, crm_flag_members, 2, 0, TRUE, "cib", HA_CCMUSER, HA_LIBHBDIR"/cib", NULL, NULL },
{ 0, crm_proc_crmd, crm_flag_members, 6, 0, TRUE, "crmd", HA_CCMUSER, HA_LIBHBDIR"/crmd", NULL, NULL },
{ 0, crm_proc_attrd, crm_flag_none, 4, 0, TRUE, "attrd", HA_CCMUSER, HA_LIBHBDIR"/attrd", NULL, NULL },
{ 0, crm_proc_stonithd, crm_flag_none, 1, 0, TRUE, "stonithd", NULL, HA_LIBHBDIR"/stonithd", NULL, NULL },
{ 0, crm_proc_pe, crm_flag_none, 5, 0, TRUE, "pengine", HA_CCMUSER, HA_LIBHBDIR"/pengine", NULL, NULL },
{ 0, crm_proc_mgmtd, crm_flag_none, 7, 0, TRUE, "mgmtd", NULL, HA_LIBHBDIR"/mgmtd", NULL, NULL },
};
void send_cluster_id(void);
int send_cluster_msg_raw(AIS_Message *ais_msg);
char *ais_generate_membership_data(void);
extern totempg_groups_handle openais_group_handle;
void global_confchg_fn (
enum totem_configuration_type configuration_type,
unsigned int *member_list, int member_list_entries,
unsigned int *left_list, int left_list_entries,
unsigned int *joined_list, int joined_list_entries,
struct memb_ring_id *ring_id);
#ifdef AIS_WHITETANK
int crm_exec_init_fn (struct objdb_iface_ver0 *objdb);
int crm_exec_exit_fn (struct objdb_iface_ver0 *objdb);
int crm_config_init_fn(struct objdb_iface_ver0 *objdb);
#endif
#ifdef AIS_COROSYNC
int crm_exec_init_fn (struct corosync_api_v1 *corosync_api);
int crm_exec_exit_fn (void);
int crm_config_init_fn(struct corosync_api_v1 *corosync_api);
#endif
int ais_ipc_client_connect_callback (void *conn);
int ais_ipc_client_exit_callback (void *conn);
void ais_cluster_message_swab(void *msg);
void ais_cluster_message_callback(void *message, unsigned int nodeid);
void ais_ipc_message_callback(void *conn, void *msg);
void ais_our_nodeid(void *conn, void *msg);
void ais_quorum_query(void *conn, void *msg);
void ais_node_list_query(void *conn, void *msg);
void ais_manage_notification(void *conn, void *msg);
void ais_plugin_remove_member(void *conn, void *msg);
void ais_cluster_id_swab(void *msg);
void ais_cluster_id_callback(void *message, unsigned int nodeid);
static plugin_lib_handler crm_lib_service[] =
{
{ /* 0 */
.lib_handler_fn = ais_ipc_message_callback,
.response_size = sizeof (mar_res_header_t),
.response_id = CRM_MESSAGE_IPC_ACK,
.flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 1 */
.lib_handler_fn = ais_node_list_query,
.response_size = sizeof (mar_res_header_t),
.response_id = CRM_MESSAGE_IPC_ACK,
.flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 2 */
.lib_handler_fn = ais_manage_notification,
.response_size = sizeof (mar_res_header_t),
.response_id = CRM_MESSAGE_IPC_ACK,
.flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 3 */
.lib_handler_fn = ais_our_nodeid,
.response_size = sizeof (struct crm_ais_nodeid_resp_s),
.response_id = crm_class_nodeid,
.flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 4 */
.lib_handler_fn = ais_plugin_remove_member,
.response_size = sizeof (mar_res_header_t),
.response_id = CRM_MESSAGE_IPC_ACK,
.flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED
},
};
static plugin_exec_handler crm_exec_service[] =
{
{ /* 0 */
.exec_handler_fn = ais_cluster_message_callback,
.exec_endian_convert_fn = ais_cluster_message_swab
},
{ /* 1 */
.exec_handler_fn = ais_cluster_id_callback,
.exec_endian_convert_fn = ais_cluster_id_swab
}
};
static void crm_exec_dump_fn(void)
{
ais_err("Called after SIG_USR2");
}
/*
* Exports the interface for the service
*/
plugin_service_handler crm_service_handler = {
.name = (unsigned char *)"Pacemaker Cluster Manager",
.id = CRM_SERVICE,
.private_data_size = 0,
.flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED,
.lib_init_fn = ais_ipc_client_connect_callback,
.lib_exit_fn = ais_ipc_client_exit_callback,
.exec_init_fn = crm_exec_init_fn,
.exec_exit_fn = crm_exec_exit_fn,
.config_init_fn = crm_config_init_fn,
#ifdef AIS_WHITETANK
.lib_service = crm_lib_service,
.lib_service_count = sizeof (crm_lib_service) / sizeof (plugin_lib_handler),
.exec_service = crm_exec_service,
.exec_service_count = sizeof (crm_exec_service) / sizeof (plugin_exec_handler),
#endif
#ifdef AIS_COROSYNC
.lib_engine = crm_lib_service,
.lib_engine_count = sizeof (crm_lib_service) / sizeof (plugin_lib_handler),
.exec_engine = crm_exec_service,
.exec_engine_count = sizeof (crm_exec_service) / sizeof (plugin_exec_handler),
#endif
.confchg_fn = global_confchg_fn,
.exec_dump_fn = crm_exec_dump_fn,
/* void (*sync_init) (void); */
/* int (*sync_process) (void); */
/* void (*sync_activate) (void); */
/* void (*sync_abort) (void); */
};
/*
* Dynamic Loader definition
*/
plugin_service_handler *crm_get_handler_ver0 (void);
#ifdef AIS_WHITETANK
struct openais_service_handler_iface_ver0 crm_service_handler_iface = {
.openais_get_service_handler_ver0 = crm_get_handler_ver0
};
#endif
#ifdef AIS_COROSYNC
struct corosync_service_engine_iface_ver0 crm_service_handler_iface = {
.corosync_get_service_engine_ver0 = crm_get_handler_ver0
};
#endif
static struct lcr_iface openais_crm_ver0[1] = {
{
.name = "pacemaker",
.version = 0,
.versions_replace = 0,
.versions_replace_count = 0,
.dependencies = 0,
.dependency_count = 0,
.constructor = NULL,
.destructor = NULL,
.interfaces = NULL
}
};
static struct lcr_comp crm_comp_ver0 = {
.iface_count = 1,
.ifaces = openais_crm_ver0
};
plugin_service_handler *crm_get_handler_ver0 (void)
{
return (&crm_service_handler);
}
__attribute__ ((constructor)) static void register_this_component (void) {
lcr_interfaces_set (&openais_crm_ver0[0], &crm_service_handler_iface);
lcr_component_register (&crm_comp_ver0);
}
#ifdef AIS_COROSYNC
#include <corosync/engine/config.h>
#endif
/* Create our own local copy of the config so we can navigate it */
static void process_ais_conf(void)
{
char *value = NULL;
unsigned int top_handle = 0;
unsigned int local_handle = 0;
ais_info("Reading configure");
top_handle = config_find_init(crm_api, "logging");
local_handle = config_find_next(crm_api, "logging", top_handle);
get_config_opt(crm_api, local_handle, "debug", &value, "on");
if(ais_get_boolean(value)) {
plugin_log_level = LOG_DEBUG;
setenv("HA_debug", "1", 1);
} else {
plugin_log_level = LOG_INFO;
setenv("HA_debug", "0", 1);
}
get_config_opt(crm_api, local_handle, "to_syslog", &value, "on");
if(ais_get_boolean(value)) {
get_config_opt(crm_api, local_handle, "syslog_facility", &value, "daemon");
setenv("HA_logfacility", value, 1);
} else {
setenv("HA_logfacility", "none", 1);
}
get_config_opt(crm_api, local_handle, "to_file", &value, "off");
if(ais_get_boolean(value)) {
get_config_opt(crm_api, local_handle, "logfile", &value, NULL);
if(value == NULL) {
ais_err("Logging to a file requested but no log file specified");
} else {
setenv("HA_logfile", value, 1);
}
}
config_find_done(crm_api, local_handle);
top_handle = config_find_init(crm_api, "service");
local_handle = config_find_next(crm_api, "service", top_handle);
while(local_handle) {
value = NULL;
crm_api->object_key_get(local_handle, "name", strlen("name"), (void**)&value, NULL);
if(ais_str_eq("pacemaker", value)) {
break;
}
local_handle = config_find_next(crm_api, "service", top_handle);
}
get_config_opt(crm_api, local_handle, "expected_nodes", &value, "2");
setenv("HA_expected_nodes", value, 1);
get_config_opt(crm_api, local_handle, "expected_votes", &value, "2");
setenv("HA_expected_votes", value, 1);
get_config_opt(crm_api, local_handle, "quorum_votes", &value, "1");
setenv("HA_votes", value, 1);
get_config_opt(crm_api, local_handle, "use_logd", &value, "no");
setenv("HA_use_logd", value, 1);
get_config_opt(crm_api, local_handle, "use_mgmtd", &value, "no");
if(ais_get_boolean(value) == FALSE) {
int lpc = 0;
for (; lpc < SIZEOF(crm_children); lpc++) {
if(crm_proc_mgmtd & crm_children[lpc].flag) {
/* Disable mgmtd startup */
crm_children[lpc].start_seq = 0;
break;
}
}
}
config_find_done(crm_api, local_handle);
}
static void crm_plugin_init(void)
{
int rc = 0;
struct utsname us;
#ifdef AIS_WHITETANK
log_init ("crm");
#endif
process_ais_conf();
membership_list = g_hash_table_new_full(
g_direct_hash, g_direct_equal, NULL, destroy_ais_node);
membership_notify_list = g_hash_table_new(g_direct_hash, g_direct_equal);
setenv("HA_COMPRESSION", "bz2", 1);
setenv("HA_cluster_type", "openais", 1);
if(system("echo 1 > /proc/sys/kernel/core_uses_pid") != 0) {
ais_perror("Could not enable /proc/sys/kernel/core_uses_pid");
}
ais_info("CRM: Initialized");
log_printf(LOG_INFO, "Logging: Initialized %s\n", __PRETTY_FUNCTION__);
rc = uname(&us);
AIS_ASSERT(rc == 0);
local_uname = ais_strdup(us.nodename);
local_uname_len = strlen(local_uname);
#if AIS_WHITETANK
local_nodeid = totempg_my_nodeid_get();
#endif
#if AIS_COROSYNC
local_nodeid = crm_api->totem_nodeid_get();
#endif
ais_info("Service: %d", CRM_SERVICE);
ais_info("Local node id: %u", local_nodeid);
ais_info("Local hostname: %s", local_uname);
update_member(local_nodeid, 0, 0, 1, 0, local_uname, CRM_NODE_MEMBER, NULL);
}
int crm_config_init_fn(plugin_init_type *unused)
{
return 0;
}
static void *crm_wait_dispatch (void *arg)
{
struct timespec waitsleep = {
.tv_sec = 0,
.tv_nsec = 100000 /* 100 msec */
};
while(wait_active) {
int lpc = 0;
for (; lpc < SIZEOF(crm_children); lpc++) {
if(crm_children[lpc].pid > 0) {
int status;
pid_t pid = wait4(
crm_children[lpc].pid, &status, WNOHANG, NULL);
if(pid == 0) {
continue;
} else if(pid < 0) {
ais_perror("crm_wait_dispatch: Call to wait4(%s) failed",
crm_children[lpc].name);
continue;
}
/* cleanup */
crm_children[lpc].pid = 0;
crm_children[lpc].conn = NULL;
crm_children[lpc].async_conn = NULL;
if(WIFSIGNALED(status)) {
int sig = WTERMSIG(status);
ais_err("Child process %s terminated with signal %d"
" (pid=%d, core=%s)",
crm_children[lpc].name, sig, pid,
WCOREDUMP(status)?"true":"false");
} else if (WIFEXITED(status)) {
int rc = WEXITSTATUS(status);
do_ais_log(rc==0?LOG_NOTICE:LOG_ERR, "Child process %s exited (pid=%d, rc=%d)",
crm_children[lpc].name, pid, rc);
if(rc == 100) {
ais_notice("Child process %s no longer wishes"
" to be respawned", crm_children[lpc].name);
crm_children[lpc].respawn = FALSE;
}
}
crm_children[lpc].respawn_count += 1;
if(crm_children[lpc].respawn_count > MAX_RESPAWN) {
ais_err("Child respawn count exceeded by %s",
crm_children[lpc].name);
crm_children[lpc].respawn = FALSE;
}
if(crm_children[lpc].respawn) {
ais_notice("Respawning failed child process: %s",
crm_children[lpc].name);
spawn_child(&(crm_children[lpc]));
} else {
send_cluster_id();
}
}
}
sched_yield ();
nanosleep (&waitsleep, 0);
}
return 0;
}
#include <sys/stat.h>
#include <pwd.h>
int crm_exec_init_fn(plugin_init_type *init_with)
{
int lpc = 0;
int start_seq = 1;
static gboolean need_init = TRUE;
static int max = SIZEOF(crm_children);
crm_api = init_with;
if(need_init) {
struct passwd *pwentry = NULL;
need_init = FALSE;
crm_plugin_init();
pthread_create (&crm_wait_thread, NULL, crm_wait_dispatch, NULL);
pwentry = getpwnam(HA_CCMUSER);
AIS_CHECK(pwentry != NULL,
ais_err("Cluster user %s does not exist", HA_CCMUSER);
return TRUE);
mkdir(HA_VARRUNDIR, 750);
mkdir(HA_VARRUNDIR"/crm", 750);
mkdir(HA_VARRUNHBDIR"/rsctmp", 755); /* Used by RAs - Leave owned by root */
chown(HA_VARRUNDIR"/crm", pwentry->pw_uid, pwentry->pw_gid);
chown(HA_VARRUNDIR, pwentry->pw_uid, pwentry->pw_gid);
for (start_seq = 1; start_seq < max; start_seq++) {
/* dont start anything with start_seq < 1 */
for (lpc = 0; lpc < max; lpc++) {
if(start_seq == crm_children[lpc].start_seq) {
spawn_child(&(crm_children[lpc]));
}
}
}
}
ais_info("CRM: Initialized");
return 0;
}
/*
static void ais_print_node(const char *prefix, struct totem_ip_address *host)
{
int len = 0;
char *buffer = NULL;
ais_malloc0(buffer, INET6_ADDRSTRLEN+1);
inet_ntop(host->family, host->addr, buffer, INET6_ADDRSTRLEN);
len = strlen(buffer);
ais_info("%s: %.*s", prefix, len, buffer);
ais_free(buffer);
}
*/
#if 0
/* copied here for reference from exec/totempg.c */
char *totempg_ifaces_print (unsigned int nodeid)
{
static char iface_string[256 * INTERFACE_MAX];
char one_iface[64];
struct totem_ip_address interfaces[INTERFACE_MAX];
char **status;
unsigned int iface_count;
unsigned int i;
int res;
iface_string[0] = '\0';
res = totempg_ifaces_get (nodeid, interfaces, &status, &iface_count);
if (res == -1) {
return ("no interface found for nodeid");
}
for (i = 0; i < iface_count; i++) {
sprintf (one_iface, "r(%d) ip(%s), ",
i, totemip_print (&interfaces[i]));
strcat (iface_string, one_iface);
}
return (iface_string);
}
#endif
static void ais_mark_unseen_peer_dead(
gpointer key, gpointer value, gpointer user_data)
{
int *changed = user_data;
crm_node_t *node = value;
if(node->last_seen != membership_seq
&& ais_str_eq(CRM_NODE_LOST, node->state) == FALSE) {
ais_info("Node %s was not seen in the previous transition", node->uname);
*changed += update_member(node->id, 0, membership_seq, node->votes,
node->processes, node->uname, CRM_NODE_LOST, NULL);
}
}
void global_confchg_fn (
enum totem_configuration_type configuration_type,
unsigned int *member_list, int member_list_entries,
unsigned int *left_list, int left_list_entries,
unsigned int *joined_list, int joined_list_entries,
struct memb_ring_id *ring_id)
{
int lpc = 0;
int changed = 0;
int do_update = 0;
AIS_ASSERT(ring_id != NULL);
switch(configuration_type) {
case TOTEM_CONFIGURATION_REGULAR:
do_update = 1;
break;
case TOTEM_CONFIGURATION_TRANSITIONAL:
break;
}
membership_seq = ring_id->seq;
ais_notice("%s membership event on ring %lld: memb=%d, new=%d, lost=%d",
do_update?"Stable":"Transitional", ring_id->seq, member_list_entries,
joined_list_entries, left_list_entries);
if(do_update == 0) {
for(lpc = 0; lpc < joined_list_entries; lpc++) {
const char *prefix = "new: ";
uint32_t nodeid = joined_list[lpc];
ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid);
}
for(lpc = 0; lpc < member_list_entries; lpc++) {
const char *prefix = "memb:";
uint32_t nodeid = member_list[lpc];
ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid);
}
for(lpc = 0; lpc < left_list_entries; lpc++) {
const char *prefix = "lost:";
uint32_t nodeid = left_list[lpc];
ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid);
}
return;
}
for(lpc = 0; lpc < joined_list_entries; lpc++) {
const char *prefix = "NEW: ";
uint32_t nodeid = joined_list[lpc];
crm_node_t *node = NULL;
changed += update_member(
nodeid, 0, membership_seq, -1, 0, NULL, CRM_NODE_MEMBER, NULL);
ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid);
node = g_hash_table_lookup(membership_list, GUINT_TO_POINTER(nodeid));
if(node->addr == NULL) {
const char *addr = totempg_ifaces_print(nodeid);
node->addr = ais_strdup(addr);
ais_debug("Node %u has address %s", nodeid, node->addr);
}
}
for(lpc = 0; lpc < member_list_entries; lpc++) {
const char *prefix = "MEMB:";
uint32_t nodeid = member_list[lpc];
changed += update_member(
nodeid, 0, membership_seq, -1, 0, NULL, CRM_NODE_MEMBER, NULL);
ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid);
}
for(lpc = 0; lpc < left_list_entries; lpc++) {
const char *prefix = "LOST:";
uint32_t nodeid = left_list[lpc];
changed += update_member(
nodeid, 0, membership_seq, -1, 0, NULL, CRM_NODE_LOST, NULL);
ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid);
}
if(changed && joined_list_entries == 0 && left_list_entries == 0) {
ais_err("Something strange happened: %d", changed);
changed = 0;
}
ais_debug_2("Reaping unseen nodes...");
g_hash_table_foreach(membership_list, ais_mark_unseen_peer_dead, &changed);
if(member_list_entries > 1) {
/* Used to set born-on in send_cluster_id())
* We need to wait until we have at least one peer since first
* membership id is based on the one before we stopped and isn't reliable
*/
have_reliable_membership_id = TRUE;
}
if(changed) {
ais_debug("%d nodes changed", changed);
send_member_notification();
}
send_cluster_id();
}
int ais_ipc_client_exit_callback (void *conn)
{
int lpc = 0;
const char *client = NULL;
void *async_conn = openais_conn_partner_get(conn);
for (; lpc < SIZEOF(crm_children); lpc++) {
if(crm_children[lpc].conn == conn) {
if(wait_active == FALSE) {
/* Make sure the shutdown loop exits */
crm_children[lpc].pid = 0;
}
crm_children[lpc].conn = NULL;
crm_children[lpc].async_conn = NULL;
client = crm_children[lpc].name;
break;
}
}
g_hash_table_remove(membership_notify_list, async_conn);
ais_info("Client %s (conn=%p, async-conn=%p) left",
client?client:"unknown-transient", conn, async_conn);
return (0);
}
int ais_ipc_client_connect_callback (void *conn)
{
/* OpenAIS hasn't finished setting up the connection at this point
* Sending messages now messes up the protocol!
*/
return (0);
}
/*
* Executive message handlers
*/
void ais_cluster_message_swab(void *msg)
{
AIS_Message *ais_msg = msg;
ais_debug_3("Performing endian conversion...");
ais_msg->id = swab32 (ais_msg->id);
ais_msg->size = swab32 (ais_msg->size);
ais_msg->is_compressed = swab32 (ais_msg->is_compressed);
ais_msg->compressed_size = swab32 (ais_msg->compressed_size);
ais_msg->host.id = swab32 (ais_msg->host.id);
ais_msg->host.pid = swab32 (ais_msg->host.pid);
ais_msg->host.type = swab32 (ais_msg->host.type);
ais_msg->host.size = swab32 (ais_msg->host.size);
ais_msg->host.local = swab32 (ais_msg->host.local);
ais_msg->sender.id = swab32 (ais_msg->sender.id);
ais_msg->sender.pid = swab32 (ais_msg->sender.pid);
ais_msg->sender.type = swab32 (ais_msg->sender.type);
ais_msg->sender.size = swab32 (ais_msg->sender.size);
ais_msg->sender.local = swab32 (ais_msg->sender.local);
}
void ais_cluster_message_callback (
void *message, unsigned int nodeid)
{
AIS_Message *ais_msg = message;
ais_debug_2("Message from node %u (%s)",
nodeid, nodeid==local_nodeid?"local":"remote");
/* Shouldn't be required...
update_member(
ais_msg->sender.id, membership_seq, -1, 0, ais_msg->sender.uname, NULL);
*/
if(ais_msg->host.size == 0
|| ais_str_eq(ais_msg->host.uname, local_uname)) {
route_ais_message(ais_msg, FALSE);
} else {
ais_debug_3("Discarding Msg[%d] (dest=%s:%s, from=%s:%s)",
ais_msg->id, ais_dest(&(ais_msg->host)),
msg_type2text(ais_msg->host.type),
ais_dest(&(ais_msg->sender)),
msg_type2text(ais_msg->sender.type));
}
}
void ais_cluster_id_swab(void *msg)
{
struct crm_identify_msg_s *ais_msg = msg;
ais_debug_3("Performing endian conversion...");
ais_msg->id = swab32 (ais_msg->id);
ais_msg->pid = swab32 (ais_msg->pid);
ais_msg->votes = swab32 (ais_msg->votes);
ais_msg->processes = swab32 (ais_msg->processes);
}
void ais_cluster_id_callback (void *message, unsigned int nodeid)
{
int changed = 0;
struct crm_identify_msg_s *msg = message;
if(nodeid != msg->id) {
ais_err("Invalid message: Node %u claimed to be node %d",
nodeid, msg->id);
return;
}
ais_debug("Node update: %s (%s)", msg->uname, msg->version);
changed = update_member(
nodeid, msg->born_on, membership_seq, msg->votes, msg->processes, msg->uname, NULL, msg->version);
if(changed) {
send_member_notification();
}
}
struct res_overlay {
mar_res_header_t header __attribute((aligned(8)));
char buf[4096];
};
struct res_overlay *res_overlay = NULL;
static void send_ipc_ack(void *conn, int class)
{
if(res_overlay == NULL) {
ais_malloc0(res_overlay, sizeof(struct res_overlay));
}
res_overlay->header.size = crm_lib_service[class].response_size;
res_overlay->header.id = crm_lib_service[class].response_id;
res_overlay->header.error = SA_AIS_OK;
#ifdef AIS_WHITETANK
openais_response_send (conn, res_overlay, res_overlay->header.size);
#endif
#ifdef AIS_COROSYNC
crm_api->ipc_conn_send_response (conn, res_overlay, res_overlay->header.size);
#endif
}
/* local callbacks */
void ais_ipc_message_callback(void *conn, void *msg)
{
gboolean transient = TRUE;
AIS_Message *ais_msg = msg;
int type = ais_msg->sender.type;
void *async_conn = openais_conn_partner_get(conn);
ais_debug_2("Message from client %p", conn);
send_ipc_ack(conn, 0);
ais_debug_3("type: %d local: %d conn: %p host type: %d ais: %d sender pid: %d child pid: %d size: %d",
type, ais_msg->host.local, crm_children[type].conn, ais_msg->host.type, crm_msg_ais,
ais_msg->sender.pid, crm_children[type].pid, ((int)SIZEOF(crm_children)));
if(type > crm_msg_none && type < SIZEOF(crm_children)) {
/* known child process */
transient = FALSE;
}
/* If this check fails, the order of crm_children probably
* doesn't match that of the crm_ais_msg_types enum
*/
AIS_CHECK(transient || ais_msg->sender.pid == crm_children[type].pid,
ais_err("Sender: %d, child[%d]: %d", ais_msg->sender.pid, type, crm_children[type].pid);
return);
if(transient == FALSE
&& type > crm_msg_none
&& ais_msg->host.local
&& crm_children[type].conn == NULL
&& ais_msg->host.type == crm_msg_ais) {
ais_info("Recorded connection %p for %s/%d",
conn, crm_children[type].name, crm_children[type].pid);
crm_children[type].conn = conn;
crm_children[type].async_conn = async_conn;
/* Make sure they have the latest membership */
if(crm_children[type].flags & crm_flag_members) {
char *update = ais_generate_membership_data();
g_hash_table_replace(membership_notify_list, async_conn, async_conn);
ais_info("Sending membership update "U64T" to %s",
membership_seq, crm_children[type].name);
send_client_msg(async_conn, crm_class_members, crm_msg_none,update);
}
}
ais_msg->sender.id = local_nodeid;
ais_msg->sender.size = local_uname_len;
memset(ais_msg->sender.uname, 0, MAX_NAME);
memcpy(ais_msg->sender.uname, local_uname, ais_msg->sender.size);
route_ais_message(msg, TRUE);
}
int crm_exec_exit_fn (
#ifdef AIS_WHITETANK
struct objdb_iface_ver0 *objdb
#endif
#ifdef AIS_COROSYNC
void
#endif
)
{
int lpc = 0;
int start_seq = 1;
static int max = SIZEOF(crm_children);
struct timespec waitsleep = {
.tv_sec = 1,
.tv_nsec = 0
};
ais_notice("Begining shutdown");
in_shutdown = TRUE;
wait_active = FALSE; /* stop the wait loop */
for (start_seq = max; start_seq > 0; start_seq--) {
/* dont stop anything with start_seq < 1 */
for (lpc = max - 1; lpc >= 0; lpc--) {
int orig_pid = 0, iter = 0;
if(start_seq != crm_children[lpc].start_seq) {
continue;
}
orig_pid = crm_children[lpc].pid;
crm_children[lpc].respawn = FALSE;
stop_child(&(crm_children[lpc]), SIGTERM);
while(crm_children[lpc].command && crm_children[lpc].pid) {
int status;
pid_t pid = 0;
pid = wait4(
crm_children[lpc].pid, &status, WNOHANG, NULL);
if(pid == 0) {
if((++iter % 30) == 0) {
ais_notice("Still waiting for %s (pid=%d) to terminate...",
crm_children[lpc].name, orig_pid);
}
sched_yield ();
nanosleep (&waitsleep, 0);
continue;
} else if(pid < 0) {
ais_perror("crm_exec_exit_fn: Call to wait4(%s) failed",
crm_children[lpc].name);
}
/* cleanup */
crm_children[lpc].pid = 0;
crm_children[lpc].conn = NULL;
crm_children[lpc].async_conn = NULL;
break;
}
ais_notice("%s (pid=%d) confirmed dead",
crm_children[lpc].name, orig_pid);
}
}
send_cluster_id();
ais_notice("Shutdown complete");
#ifndef AIS_WHITETANK
logsys_flush ();
#endif
return 0;
}
struct member_loop_data
{
char *string;
};
void member_loop_fn(gpointer key, gpointer value, gpointer user_data)
{
crm_node_t *node = value;
struct member_loop_data *data = user_data;
ais_debug_2("Dumping node %u", node->id);
data->string = append_member(data->string, node);
}
char *ais_generate_membership_data(void)
{
int size = 0;
struct member_loop_data data;
size = 14 + 32; /* <nodes id=""> + int */
ais_malloc0(data.string, size);
sprintf(data.string, "<nodes id=\""U64T"\">", membership_seq);
g_hash_table_foreach(membership_list, member_loop_fn, &data);
size = strlen(data.string);
data.string = realloc(data.string, size + 9) ;/* 9 = </nodes> + nul */
sprintf(data.string + size, "</nodes>");
return data.string;
}
void ais_node_list_query(void *conn, void *msg)
{
char *data = ais_generate_membership_data();
void *async_conn = openais_conn_partner_get(conn);
/* send the ACK before we send any other messages */
send_ipc_ack(conn, 1);
if(async_conn) {
send_client_msg(async_conn, crm_class_members, crm_msg_none, data);
}
ais_free(data);
}
void ais_plugin_remove_member(void *conn, void *msg)
{
AIS_Message *ais_msg = msg;
char *data = get_ais_data(ais_msg);
if(data != NULL) {
char *bcast = ais_concat("remove-peer", data, ':');
send_cluster_msg(crm_msg_ais, NULL, bcast);
ais_info("Sent: %s", bcast);
ais_free(bcast);
}
send_ipc_ack(conn, 2);
ais_free(data);
}
void ais_manage_notification(void *conn, void *msg)
{
int enable = 0;
AIS_Message *ais_msg = msg;
char *data = get_ais_data(ais_msg);
void *async_conn = openais_conn_partner_get(conn);
if(ais_str_eq("true", data)) {
enable = 1;
}
ais_info("%s node notifications for child %d (%p)",
enable?"Enabling":"Disabling", ais_msg->sender.pid, async_conn);
if(enable) {
g_hash_table_replace(membership_notify_list, async_conn, async_conn);
} else {
g_hash_table_remove(membership_notify_list, async_conn);
}
send_ipc_ack(conn, 2);
ais_free(data);
}
void ais_our_nodeid(void *conn, void *msg)
{
static int counter = 0;
struct crm_ais_nodeid_resp_s resp;
ais_info("Sending local nodeid: %d to %p[%d]", local_nodeid, conn, counter);
resp.header.size = crm_lib_service[crm_class_nodeid].response_size;
resp.header.id = crm_lib_service[crm_class_nodeid].response_id;
resp.header.error = SA_AIS_OK;
resp.id = local_nodeid;
resp.counter = counter++;
memset(resp.uname, 0, 256);
memcpy(resp.uname, local_uname, local_uname_len);
#ifdef AIS_WHITETANK
openais_response_send (conn, &resp, resp.header.size);
#endif
#ifdef AIS_COROSYNC
crm_api->ipc_conn_send_response (conn, &resp, resp.header.size);
#endif
}
static gboolean
ghash_send_update(gpointer key, gpointer value, gpointer data)
{
if(send_client_msg(value, crm_class_members, crm_msg_none, data) != 0) {
/* remove it */
return TRUE;
}
return FALSE;
}
void send_member_notification(void)
{
char *update = ais_generate_membership_data();
ais_info("Sending membership update "U64T" to %d children",
membership_seq,
g_hash_table_size(membership_notify_list));
g_hash_table_foreach_remove(membership_notify_list, ghash_send_update, update);
ais_free(update);
}
static gboolean check_message_sanity(AIS_Message *msg, char *data)
{
gboolean sane = TRUE;
gboolean repaired = FALSE;
int dest = msg->host.type;
int tmp_size = msg->header.size - sizeof(AIS_Message);
if(sane && msg->header.size == 0) {
ais_warn("Message with no size");
sane = FALSE;
}
- if(sane && msg->header.error != 0) {
+ if(sane && msg->header.error != SA_AIS_OK) {
ais_warn("Message header contains an error: %d", msg->header.error);
sane = FALSE;
}
if(sane && ais_data_len(msg) != tmp_size) {
int cur_size = ais_data_len(msg);
repaired = TRUE;
if(msg->is_compressed) {
msg->compressed_size = tmp_size;
} else {
msg->size = tmp_size;
}
ais_warn("Repaired message payload size %d -> %d", cur_size, tmp_size);
}
if(sane && ais_data_len(msg) == 0) {
ais_warn("Message with no payload");
sane = FALSE;
}
if(sane && data && msg->is_compressed == FALSE) {
int str_size = strlen(data) + 1;
if(ais_data_len(msg) != str_size) {
int lpc = 0;
ais_warn("Message payload is corrupted: expected %d bytes, got %d",
ais_data_len(msg), str_size);
sane = FALSE;
for(lpc = (str_size - 10); lpc < msg->size; lpc++) {
if(lpc < 0) {
lpc = 0;
}
ais_debug_2("bad_data[%d]: %d / '%c'", lpc, data[lpc], data[lpc]);
}
}
}
if(sane == FALSE) {
ais_err("Invalid message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)",
msg->id, ais_dest(&(msg->host)), msg_type2text(dest),
ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
msg->sender.pid, msg->is_compressed, ais_data_len(msg),
msg->header.size);
} else if(repaired) {
ais_err("Repaired message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)",
msg->id, ais_dest(&(msg->host)), msg_type2text(dest),
ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
msg->sender.pid, msg->is_compressed, ais_data_len(msg),
msg->header.size);
} else {
ais_debug_3("Verified message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)",
msg->id, ais_dest(&(msg->host)), msg_type2text(dest),
ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
msg->sender.pid, msg->is_compressed, ais_data_len(msg),
msg->header.size);
}
return sane;
}
gboolean route_ais_message(AIS_Message *msg, gboolean local_origin)
{
int rc = 0;
int dest = msg->host.type;
const char *reason = "unknown";
ais_debug_3("Msg[%d] (dest=%s:%s, from=%s:%s.%d, remote=%s, size=%d)",
msg->id, ais_dest(&(msg->host)), msg_type2text(dest),
ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
msg->sender.pid, local_origin?"false":"true", ais_data_len(msg));
if(local_origin == FALSE) {
if(msg->host.size == 0
|| ais_str_eq(local_uname, msg->host.uname)) {
msg->host.local = TRUE;
}
}
if(check_message_sanity(msg, msg->data) == FALSE) {
/* Dont send this message to anyone */
return FALSE;
}
if(msg->host.local) {
void *conn = NULL;
const char *lookup = NULL;
if(dest == crm_msg_ais) {
process_ais_message(msg);
return TRUE;
} else if(dest == crm_msg_lrmd) {
/* lrmd messages are routed via the crm */
dest = crm_msg_crmd;
} else if(dest == crm_msg_te) {
/* te messages are routed via the crm */
dest = crm_msg_crmd;
}
AIS_CHECK(dest > 0 && dest < SIZEOF(crm_children),
ais_err("Invalid destination: %d", dest);
log_ais_message(LOG_ERR, msg);
return FALSE;
);
lookup = msg_type2text(dest);
conn = crm_children[dest].async_conn;
/* the cluster fails in weird and wonderfully obscure ways when this is not true */
AIS_ASSERT(ais_str_eq(lookup, crm_children[dest].name));
rc = send_client_ipc(conn, msg);
} else if(local_origin) {
/* forward to other hosts */
ais_debug_3("Forwarding to cluster");
reason = "cluster delivery failed";
rc = send_cluster_msg_raw(msg);
} else {
ais_debug_3("Ignoring...");
}
if(rc != 0) {
ais_warn("Sending message to %s.%s failed: %s (rc=%d)",
ais_dest(&(msg->host)), msg_type2text(dest), reason, rc);
log_ais_message(LOG_DEBUG, msg);
return FALSE;
}
return TRUE;
}
int send_cluster_msg_raw(AIS_Message *ais_msg)
{
int rc = 0;
struct iovec iovec;
static uint32_t msg_id = 0;
AIS_Message *bz2_msg = NULL;
AIS_ASSERT(local_nodeid != 0);
if(ais_msg->header.size != (sizeof(AIS_Message) + ais_data_len(ais_msg))) {
ais_err("Repairing size mismatch: %u + %d = %d",
(unsigned int)sizeof(AIS_Message),
ais_data_len(ais_msg), ais_msg->header.size);
ais_msg->header.size = sizeof(AIS_Message) + ais_data_len(ais_msg);
}
if(ais_msg->id == 0) {
msg_id++;
AIS_CHECK(msg_id != 0 /* detect wrap-around */,
msg_id++; ais_err("Message ID wrapped around"));
ais_msg->id = msg_id;
}
ais_msg->header.id = SERVICE_ID_MAKE(CRM_SERVICE, 0);
ais_msg->sender.id = local_nodeid;
ais_msg->sender.size = local_uname_len;
memset(ais_msg->sender.uname, 0, MAX_NAME);
memcpy(ais_msg->sender.uname, local_uname, ais_msg->sender.size);
iovec.iov_base = (char *)ais_msg;
iovec.iov_len = ais_msg->header.size;
ais_debug_3("Sending message (size=%u)", (unsigned int)iovec.iov_len);
rc = totempg_groups_mcast_joined (
openais_group_handle, &iovec, 1, TOTEMPG_SAFE);
if(rc == 0 && ais_msg->is_compressed == FALSE) {
ais_debug_2("Message sent: %.80s", ais_msg->data);
}
AIS_CHECK(rc == 0, ais_err("Message not sent (%d): %.120s", rc, ais_msg->data));
ais_free(bz2_msg);
return rc;
}
#define min(x,y) (x)<(y)?(x):(y)
void send_cluster_id(void)
{
int rc = 0;
int lpc = 0;
int len = 0;
struct iovec iovec;
struct crm_identify_msg_s *msg = NULL;
static uint64_t local_born_on = 0;
AIS_ASSERT(local_nodeid != 0);
if(local_born_on == 0 && have_reliable_membership_id) {
local_born_on = membership_seq;
}
ais_malloc0(msg, sizeof(struct crm_identify_msg_s));
msg->header.size = sizeof(struct crm_identify_msg_s);
msg->id = local_nodeid;
msg->header.id = SERVICE_ID_MAKE(CRM_SERVICE, 1);
len = min(local_uname_len, MAX_NAME-1);
memset(msg->uname, 0, MAX_NAME);
memcpy(msg->uname, local_uname, len);
len = min(strlen(VERSION), MAX_NAME-1);
memset(msg->version, 0, MAX_NAME);
memcpy(msg->version, VERSION, len);
msg->votes = 1;
msg->pid = getpid();
msg->processes = crm_proc_ais;
msg->born_on = local_born_on;
for (lpc = 0; lpc < SIZEOF(crm_children); lpc++) {
if(crm_children[lpc].pid != 0) {
msg->processes |= crm_children[lpc].flag;
}
}
ais_debug("Local update: id=%u, born="U64T", seq="U64T"",
local_nodeid, local_born_on, membership_seq);
update_member(
local_nodeid, local_born_on, membership_seq, msg->votes, msg->processes, NULL, NULL, VERSION);
iovec.iov_base = (char *)msg;
iovec.iov_len = msg->header.size;
rc = totempg_groups_mcast_joined (
openais_group_handle, &iovec, 1, TOTEMPG_SAFE);
AIS_CHECK(rc == 0, ais_err("Message not sent (%d)", rc));
ais_free(msg);
}
diff --git a/lib/common/ais.c b/lib/common/ais.c
index 281672810c..7bc3a0dd07 100644
--- a/lib/common/ais.c
+++ b/lib/common/ais.c
@@ -1,750 +1,750 @@
/*
* Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <crm_internal.h>
#include <bzlib.h>
#include <crm/ais.h>
#include <crm/common/cluster.h>
#include <sys/uio.h>
#include <sys/utsname.h>
#include "stack.h"
#include <clplumbing/timers.h>
#include <clplumbing/Gmain_timeout.h>
#ifdef AIS_WHITETANK
extern int openais_fd_get(void *ipc_context);
extern SaAisErrorT openais_service_connect(enum service_types service, void **ipc_context);
extern int openais_dispatch_recv(void *ipc_context, void *buf, int timeout);
extern SaAisErrorT openais_msg_send_reply_receive(
void *ipc_context, struct iovec *iov, int iov_len,
void *res_msg, int res_len);
#endif
enum crm_ais_msg_types text2msg_type(const char *text)
{
int type = crm_msg_none;
CRM_CHECK(text != NULL, return type);
if(safe_str_eq(text, "ais")) {
type = crm_msg_ais;
} else if(safe_str_eq(text, "crm_plugin")) {
type = crm_msg_ais;
} else if(safe_str_eq(text, CRM_SYSTEM_CIB)) {
type = crm_msg_cib;
} else if(safe_str_eq(text, CRM_SYSTEM_CRMD)) {
type = crm_msg_crmd;
} else if(safe_str_eq(text, CRM_SYSTEM_DC)) {
type = crm_msg_crmd;
} else if(safe_str_eq(text, CRM_SYSTEM_TENGINE)) {
type = crm_msg_te;
} else if(safe_str_eq(text, CRM_SYSTEM_PENGINE)) {
type = crm_msg_pe;
} else if(safe_str_eq(text, CRM_SYSTEM_LRMD)) {
type = crm_msg_lrmd;
} else if(safe_str_eq(text, CRM_SYSTEM_STONITHD)) {
type = crm_msg_stonithd;
} else if(safe_str_eq(text, "attrd")) {
type = crm_msg_attrd;
} else {
crm_debug_2("Unknown message type: %s", text);
}
return type;
}
char *get_ais_data(AIS_Message *msg)
{
int rc = BZ_OK;
char *uncompressed = NULL;
unsigned int new_size = msg->size;
if(msg->is_compressed == FALSE) {
crm_debug_2("Returning uncompressed message data");
uncompressed = strdup(msg->data);
} else {
crm_debug_2("Decompressing message data");
crm_malloc0(uncompressed, new_size);
rc = BZ2_bzBuffToBuffDecompress(
uncompressed, &new_size, msg->data, msg->compressed_size, 1, 0);
CRM_ASSERT(rc = BZ_OK);
CRM_ASSERT(new_size == msg->size);
}
return uncompressed;
}
#if SUPPORT_AIS
int ais_fd_sync = -1;
int ais_fd_async = -1; /* never send messages via this channel */
void *ais_ipc_ctx = NULL;
GFDSource *ais_source = NULL;
GFDSource *ais_source_sync = NULL;
gboolean get_ais_nodeid(uint32_t *id, char **uname)
{
struct iovec iov;
int retries = 0;
int rc = SA_AIS_OK;
mar_res_header_t header;
struct crm_ais_nodeid_resp_s answer;
header.id = crm_class_nodeid;
header.size = sizeof(mar_res_header_t);
CRM_CHECK(id != NULL, return FALSE);
CRM_CHECK(uname != NULL, return FALSE);
iov.iov_base = &header;
iov.iov_len = header.size;
retry:
errno = 0;
#if TRADITIONAL_AIS_IPC
rc = saSendReceiveReply(ais_fd_sync, &header, header.size, &answer, sizeof (struct crm_ais_nodeid_resp_s));
#else
rc = openais_msg_send_reply_receive(
ais_ipc_ctx, &iov, 1, &answer, sizeof (answer));
#endif
if(rc == SA_AIS_OK) {
CRM_CHECK(answer.header.size == sizeof (struct crm_ais_nodeid_resp_s),
crm_err("Odd message: id=%d, size=%d, error=%d",
answer.header.id, answer.header.size, answer.header.error));
CRM_CHECK(answer.header.id == crm_class_nodeid, crm_err("Bad response id: %d", answer.header.id));
}
if(rc == SA_AIS_ERR_TRY_AGAIN && retries < 20) {
retries++;
crm_info("Peer overloaded: Re-sending message (Attempt %d of 20)", retries);
mssleep(retries * 100); /* Proportional back off */
goto retry;
}
if(rc != SA_AIS_OK) {
crm_err("Sending nodeid request: FAILED (rc=%d): %s", rc, ais_error2text(rc));
return FALSE;
} else if(answer.header.error != SA_AIS_OK) {
crm_err("Bad response from peer: (rc=%d): %s", rc, ais_error2text(rc));
return FALSE;
}
crm_info("Server details: id=%u uname=%s", answer.id, answer.uname);
*id = answer.id;
*uname = crm_strdup(answer.uname);
return TRUE;
}
gboolean
send_ais_text(int class, const char *data,
gboolean local, const char *node, enum crm_ais_msg_types dest)
{
static int msg_id = 0;
static int local_pid = 0;
int retries = 0;
int rc = SA_AIS_OK;
int buf_len = sizeof(mar_res_header_t);
char *buf = NULL;
struct iovec iov;
mar_res_header_t *header;
AIS_Message *ais_msg = NULL;
enum crm_ais_msg_types sender = text2msg_type(crm_system_name);
if(local_pid == 0) {
local_pid = getpid();
}
CRM_CHECK(data != NULL, return FALSE);
crm_malloc0(ais_msg, sizeof(AIS_Message));
ais_msg->id = msg_id++;
ais_msg->header.id = class;
ais_msg->header.error = SA_AIS_OK;
ais_msg->host.type = dest;
ais_msg->host.local = local;
if(node) {
ais_msg->host.size = strlen(node);
memset(ais_msg->host.uname, 0, MAX_NAME);
memcpy(ais_msg->host.uname, node, ais_msg->host.size);
ais_msg->host.id = 0;
} else {
ais_msg->host.size = 0;
memset(ais_msg->host.uname, 0, MAX_NAME);
ais_msg->host.id = 0;
}
ais_msg->sender.type = sender;
ais_msg->sender.pid = local_pid;
ais_msg->sender.size = 0;
memset(ais_msg->sender.uname, 0, MAX_NAME);
ais_msg->sender.id = 0;
ais_msg->size = 1 + strlen(data);
if(ais_msg->size < CRM_BZ2_THRESHOLD) {
failback:
crm_realloc(ais_msg, sizeof(AIS_Message) + ais_msg->size);
memcpy(ais_msg->data, data, ais_msg->size);
} else {
char *compressed = NULL;
char *uncompressed = crm_strdup(data);
unsigned int len = (ais_msg->size * 1.1) + 600; /* recomended size */
crm_debug_5("Compressing message payload");
crm_malloc(compressed, len);
rc = BZ2_bzBuffToBuffCompress(
compressed, &len, uncompressed, ais_msg->size, CRM_BZ2_BLOCKS, 0, CRM_BZ2_WORK);
crm_free(uncompressed);
if(rc != BZ_OK) {
crm_err("Compression failed: %d", rc);
crm_free(compressed);
goto failback;
}
crm_realloc(ais_msg, sizeof(AIS_Message) + len + 1);
memcpy(ais_msg->data, compressed, len);
ais_msg->data[len] = 0;
crm_free(compressed);
ais_msg->is_compressed = TRUE;
ais_msg->compressed_size = len;
crm_debug_2("Compression details: %d -> %d",
ais_msg->size, ais_data_len(ais_msg));
}
ais_msg->header.size = sizeof(AIS_Message) + ais_data_len(ais_msg);
crm_debug_3("Sending%s message %d to %s.%s (data=%d, total=%d)",
ais_msg->is_compressed?" compressed":"",
ais_msg->id, ais_dest(&(ais_msg->host)), msg_type2text(dest),
ais_data_len(ais_msg), ais_msg->header.size);
iov.iov_base = ais_msg;
iov.iov_len = ais_msg->header.size;
retry:
errno = 0;
crm_realloc(buf, buf_len);
#if TRADITIONAL_AIS_IPC
rc = saSendReceiveReply(ais_fd_sync, ais_msg, ais_msg->header.size, buf, buf_len);
#else
rc = openais_msg_send_reply_receive(ais_ipc_ctx, &iov, 1, buf, buf_len);
#endif
header = (mar_res_header_t *)buf;
if(rc == SA_AIS_ERR_TRY_AGAIN && retries < 20) {
retries++;
crm_info("Peer overloaded: Re-sending message (Attempt %d of 20)", retries);
mssleep(retries * 100); /* Proportional back off */
goto retry;
} else if(rc == SA_AIS_OK) {
CRM_CHECK_AND_STORE(header->size == sizeof (mar_res_header_t),
crm_err("Odd message: id=%d, size=%d, class=%d, error=%d",
header->id, header->size, class, header->error));
if(buf_len < header->size) {
crm_err("Increasing buffer length to %d and retrying", header->size);
buf_len = header->size + 1;
goto retry;
} else if(header->id == crm_class_nodeid && header->size == sizeof (struct crm_ais_nodeid_resp_s)){
struct crm_ais_nodeid_resp_s *answer = (struct crm_ais_nodeid_resp_s *)header;
crm_err("Server details: id=%u uname=%s counter=%u", answer->id, answer->uname, answer->counter);
} else {
CRM_CHECK_AND_STORE(header->id == CRM_MESSAGE_IPC_ACK,
crm_err("Bad response id (%d) for request (%d)", header->id, ais_msg->header.id));
CRM_CHECK(header->error == SA_AIS_OK, rc = header->error);
}
}
if(rc != SA_AIS_OK) {
crm_perror(LOG_ERR,"Sending message %d: FAILED (rc=%d): %s",
ais_msg->id, rc, ais_error2text(rc));
ais_fd_async = -1;
} else {
crm_debug_4("Message %d: sent", ais_msg->id);
}
crm_free(buf);
crm_free(ais_msg);
return (rc == SA_AIS_OK);
}
gboolean
send_ais_message(xmlNode *msg,
gboolean local, const char *node, enum crm_ais_msg_types dest)
{
gboolean rc = TRUE;
char *data = NULL;
if(ais_fd_async < 0 || ais_source == NULL) {
crm_err("Not connected to AIS");
return FALSE;
}
data = dump_xml_unformatted(msg);
rc = send_ais_text(0, data, local, node, dest);
crm_free(data);
return rc;
}
void terminate_ais_connection(void)
{
if(ais_fd_sync > 0) {
close(ais_fd_sync);
}
if(ais_fd_async > 0) {
close(ais_fd_async);
}
crm_notice("Disconnected from AIS");
/* G_main_del_fd(ais_source); */
/* G_main_del_fd(ais_source_sync); */
}
int ais_membership_timer = 0;
gboolean ais_membership_force = FALSE;
static gboolean ais_membership_dampen(gpointer data)
{
crm_debug_2("Requesting cluster membership after stabilization delay");
send_ais_text(crm_class_members, __FUNCTION__, TRUE, NULL, crm_msg_ais);
ais_membership_force = TRUE;
ais_membership_timer = 0;
return FALSE; /* never repeat automatically */
}
gboolean ais_dispatch(int sender, gpointer user_data)
{
char *data = NULL;
char *uncompressed = NULL;
int rc = SA_AIS_OK;
AIS_Message *msg = NULL;
gboolean (*dispatch)(AIS_Message*,char*,int) = user_data;
#if TRADITIONAL_AIS_IPC
mar_res_header_t *header = NULL;
static int header_len = sizeof(mar_res_header_t);
crm_malloc0(header, header_len);
errno = 0;
rc = saRecvRetry(sender, header, header_len);
if (rc != SA_AIS_OK) {
crm_perror(LOG_ERR, "Receiving message header failed: (%d/%d) %s", rc, errno, ais_error2text(rc));
goto bail;
} else if(header->size == header_len) {
crm_err("Empty message: id=%d, size=%d, error=%d, header_len=%d",
header->id, header->size, header->error, header_len);
goto done;
} else if(header->size == 0 || header->size < header_len) {
crm_err("Mangled header: size=%d, header=%d, error=%d",
header->size, header_len, header->error);
goto done;
} else if(header->error != 0) {
crm_err("Header contined error: %d", header->error);
}
crm_debug_2("Looking for %d (%d - %d) more bytes",
header->size - header_len, header->size, header_len);
crm_realloc(header, header->size);
/* Use a char* so we can store the remainder into an offset */
data = (char*)header;
errno = 0;
rc = saRecvRetry(sender, data+header_len, header->size - header_len);
#else
crm_malloc0(data, 1000000);
rc = openais_dispatch_recv (ais_ipc_ctx, data, 0);
#endif
msg = (AIS_Message*)data;
if (rc != SA_AIS_OK) {
crm_perror(LOG_ERR,"Receiving message body failed: (%d) %s", rc, ais_error2text(rc));
goto bail;
}
crm_debug_3("Got new%s message (size=%d, %d, %d)",
msg->is_compressed?" compressed":"",
ais_data_len(msg), msg->size, msg->compressed_size);
data = msg->data;
if(msg->is_compressed && msg->size > 0) {
int rc = BZ_OK;
unsigned int new_size = msg->size;
if(check_message_sanity(msg, NULL) == FALSE) {
goto badmsg;
}
crm_debug_5("Decompressing message data");
crm_malloc0(uncompressed, new_size);
rc = BZ2_bzBuffToBuffDecompress(
uncompressed, &new_size, data, msg->compressed_size, 1, 0);
if(rc != BZ_OK) {
crm_err("Decompression failed: %d", rc);
goto badmsg;
}
CRM_ASSERT(rc == BZ_OK);
CRM_ASSERT(new_size == msg->size);
data = uncompressed;
} else if(check_message_sanity(msg, data) == FALSE) {
goto badmsg;
} else if(safe_str_eq("identify", data)) {
int pid = getpid();
char *pid_s = crm_itoa(pid);
send_ais_text(0, pid_s, TRUE, NULL, crm_msg_ais);
crm_free(pid_s);
goto done;
}
if(msg->header.id == crm_class_rmpeer) {
uint32_t id = crm_int_helper(data, NULL);
crm_info("Removing peer %s/%u", data, id);
reap_crm_member(id);
crm_calculate_quorum();
goto done;
}
if(msg->header.id == crm_class_members) {
xmlNode *xml = string2xml(data);
if(xml != NULL) {
gboolean do_ask = FALSE;
gboolean do_process = TRUE;
int new_size = 0;
unsigned long long seq = 0;
int current_size = crm_active_members();
const char *reason = "unknown";
const char *value = crm_element_value(xml, "id");
seq = crm_int_helper(value, NULL);
crm_debug_2("Received membership %llu", seq);
xml_child_iter(xml, node,
const char *state = crm_element_value(node, "state");
if(safe_str_eq(state, CRM_NODE_MEMBER)) {
new_size++;
}
);
if(ais_membership_force) {
/* always process */
crm_debug_2("Processing delayed membership change");
#if 0
} else if(current_size == 0 && new_size == 1) {
do_ask = TRUE;
do_process = FALSE;
reason = "We've come up alone";
#endif
} else if(new_size < (current_size/2)) {
do_process = FALSE;
reason = "We've lost more than half our peers";
if(ais_membership_timer == 0) {
reason = "We've lost more than half our peers";
crm_log_xml_debug(xml, __PRETTY_FUNCTION__);
do_ask = TRUE;
}
}
if(do_process) {
static long long last = 0;
/* if there is a timer running - let it run
* there is no harm in getting an extra membership message
*/
crm_peer_seq = seq;
/* Skip resends */
if(last < seq) {
crm_info("Processing membership %llu", seq);
}
/* crm_log_xml_debug(xml, __PRETTY_FUNCTION__); */
if(ais_membership_force) {
ais_membership_force = FALSE;
}
xml_child_iter(xml, node, crm_update_ais_node(node, seq));
crm_calculate_quorum();
last = seq;
} else if(do_ask) {
dispatch = NULL;
crm_warn("Pausing to allow membership stability (size %d -> %d): %s",
current_size, new_size, reason);
ais_membership_timer = Gmain_timeout_add(4*1000, ais_membership_dampen, NULL);
/* process node additions */
xml_child_iter(xml, node,
const char *state = crm_element_value(node, "state");
if(crm_str_eq(state, CRM_NODE_MEMBER, FALSE)) {
crm_update_ais_node(node, seq);
}
);
} else {
dispatch = NULL;
crm_warn("Membership is still unstable (size %d -> %d): %s",
current_size, new_size, reason);
}
} else {
crm_warn("Invalid peer update: %s", data);
}
free_xml(xml);
} else {
const char *uuid = msg->sender.uname;
crm_update_peer(msg->sender.id, 0,0,0,0, uuid, msg->sender.uname, NULL, NULL);
}
if(dispatch != NULL) {
dispatch(msg, data, sender);
}
done:
crm_free(uncompressed);
crm_free(msg);
return TRUE;
badmsg:
crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
" min=%d, total=%d, size=%d, bz2_size=%d",
msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
msg->sender.pid, (int)sizeof(AIS_Message),
msg->header.size, msg->size, msg->compressed_size);
goto done;
bail:
crm_err("AIS connection failed");
return FALSE;
}
static void
ais_destroy(gpointer user_data)
{
crm_err("AIS connection terminated");
ais_fd_sync = -1;
exit(1);
}
gboolean init_ais_connection(
gboolean (*dispatch)(AIS_Message*,char*,int),
void (*destroy)(gpointer), char **our_uuid, char **our_uname, int *nodeid)
{
int pid = 0;
int retries = 0;
int rc = SA_AIS_OK;
char *pid_s = NULL;
struct utsname name;
uint32_t local_nodeid = 0;
char *local_uname = NULL;
retry:
crm_info("Creating connection to our AIS plugin");
#if TRADITIONAL_AIS_IPC
rc = saServiceConnect (&ais_fd_sync, &ais_fd_async, CRM_SERVICE);
#else
rc = openais_service_connect(CRM_SERVICE, &ais_ipc_ctx);
if(ais_ipc_ctx) {
ais_fd_async = openais_fd_get(ais_ipc_ctx);
} else if(rc == SA_AIS_OK) {
crm_err("No context created, but connection reported 'ok'");
rc = SA_AIS_ERR_LIBRARY;
}
#endif
if (rc != SA_AIS_OK) {
crm_info("Connection to our AIS plugin (%d) failed: %s (%d)", CRM_SERVICE, ais_error2text(rc), rc);
}
switch(rc) {
case SA_AIS_OK:
break;
case SA_AIS_ERR_TRY_AGAIN:
if(retries < 30) {
sleep(1);
retries++;
goto retry;
}
crm_err("Retry count exceeded");
return FALSE;
default:
return FALSE;
}
if(destroy == NULL) {
crm_debug("Using the default destroy handler");
destroy = ais_destroy;
}
crm_info("AIS connection established");
pid = getpid();
pid_s = crm_itoa(pid);
send_ais_text(0, pid_s, TRUE, NULL, crm_msg_ais);
crm_free(pid_s);
crm_peer_init();
get_ais_nodeid(&local_nodeid, &local_uname);
if(uname(&name) < 0) {
crm_perror(LOG_ERR,"uname(2) call failed");
exit(100);
}
if(safe_str_neq(name.nodename, local_uname)) {
crm_crit("Node name mismatch! OpenAIS supplied %s, our lookup returned %s", local_uname, name.nodename);
crm_notice("Node name mismatches usually occur when assigned automatically by DHCP servers");
crm_notice("If this node was part of the cluster with a different name,"
" you will need to remove the old entry with crm_node --remove");
}
if(our_uuid != NULL) {
*our_uuid = crm_strdup(local_uname);
}
if(our_uname != NULL) {
*our_uname = local_uname;
}
if(nodeid != NULL) {
*nodeid = local_nodeid;
}
if(local_nodeid != 0) {
/* Ensure the local node always exists */
crm_update_peer(local_nodeid, 0, 0, 0, 0, local_uname, local_uname, NULL, NULL);
}
if(dispatch) {
ais_source = G_main_add_fd(
G_PRIORITY_HIGH, ais_fd_async, FALSE, ais_dispatch, dispatch, destroy);
}
return TRUE;
}
gboolean check_message_sanity(AIS_Message *msg, char *data)
{
gboolean sane = TRUE;
gboolean repaired = FALSE;
int dest = msg->host.type;
int tmp_size = msg->header.size - sizeof(AIS_Message);
if(sane && msg->header.size == 0) {
crm_warn("Message with no size");
sane = FALSE;
}
- if(sane && msg->header.error != 0) {
+ if(sane && msg->header.error != SA_AIS_OK) {
crm_warn("Message header contains an error: %d", msg->header.error);
sane = FALSE;
}
if(sane && ais_data_len(msg) != tmp_size) {
int cur_size = ais_data_len(msg);
repaired = TRUE;
if(msg->is_compressed) {
msg->compressed_size = tmp_size;
} else {
msg->size = tmp_size;
}
crm_warn("Repaired message payload size %d -> %d", cur_size, tmp_size);
}
if(sane && ais_data_len(msg) == 0) {
crm_warn("Message with no payload");
sane = FALSE;
}
if(sane && data && msg->is_compressed == FALSE) {
int str_size = strlen(data) + 1;
if(ais_data_len(msg) != str_size) {
int lpc = 0;
crm_warn("Message payload is corrupted: expected %d bytes, got %d",
ais_data_len(msg), str_size);
sane = FALSE;
for(lpc = (str_size - 10); lpc < msg->size; lpc++) {
if(lpc < 0) {
lpc = 0;
}
crm_debug("bad_data[%d]: %d / '%c'", lpc, data[lpc], data[lpc]);
}
}
}
if(sane == FALSE) {
crm_err("Invalid message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)",
msg->id, ais_dest(&(msg->host)), msg_type2text(dest),
ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
msg->sender.pid, msg->is_compressed, ais_data_len(msg),
msg->header.size);
} else if(repaired) {
crm_err("Repaired message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)",
msg->id, ais_dest(&(msg->host)), msg_type2text(dest),
ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
msg->sender.pid, msg->is_compressed, ais_data_len(msg),
msg->header.size);
} else {
crm_debug_3("Verfied message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)",
msg->id, ais_dest(&(msg->host)), msg_type2text(dest),
ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
msg->sender.pid, msg->is_compressed, ais_data_len(msg),
msg->header.size);
}
return sane;
}
#endif
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sat, Nov 23, 12:26 PM (1 d, 3 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1018713
Default Alt Text
(60 KB)
Attached To
Mode
rP Pacemaker
Attached
Detach File
Event Timeline
Log In to Comment