diff --git a/lib/ais/plugin.c b/lib/ais/plugin.c
index 9ec2ac2715..2cc4d9956e 100644
--- a/lib/ais/plugin.c
+++ b/lib/ais/plugin.c
@@ -1,1755 +1,1756 @@
 /*
  * Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
  * 
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Lesser General Public
  * License as published by the Free Software Foundation; either
  * version 2.1 of the License, or (at your option) any later version.
  * 
  * This library is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  * Lesser General Public License for more details.
  * 
  * You should have received a copy of the GNU Lesser General Public
  * License along with this library; if not, write to the Free Software
  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USAA
  */
 
 #include <crm_internal.h>
+#include <crm/cluster/internal.h>
 #include <sys/types.h>
 #include <sys/uio.h>
 #include <sys/socket.h>
 #include <sys/un.h>
 #include <netinet/in.h>
 #include <arpa/inet.h>
 #include <unistd.h>
 #include <fcntl.h>
 #include <stdlib.h>
 #include <stdio.h>
 #include <errno.h>
 #include <signal.h>
 #include <string.h>
 
 #include <corosync/totem/totempg.h>
 #include <corosync/engine/objdb.h>
 #include <corosync/engine/config.h>
 
 #include <config.h>
 #include "plugin.h"
 #include "utils.h"
 
 #include <glib.h>
 
 #include <sys/resource.h>
 #include <sys/utsname.h>
 #include <sys/socket.h>
 #include <sys/wait.h>
 #include <sys/stat.h>
 #include <pthread.h>
 #include <bzlib.h>
 #include <pwd.h>
 
 struct corosync_api_v1 *pcmk_api = NULL;
 
 uint32_t plugin_has_votes = 0;
 uint32_t plugin_expected_votes = 2;
 
 int use_mgmtd = 0;
 int plugin_log_level = LOG_DEBUG;
 char *local_uname = NULL;
 int local_uname_len = 0;
 char *local_cname = NULL;
 int local_cname_len = 0;
 uint32_t local_nodeid = 0;
 char *ipc_channel_name = NULL;
 static uint64_t local_born_on = 0;
 
 uint64_t membership_seq = 0;
 pthread_t pcmk_wait_thread;
 
 gboolean use_mcp = FALSE;
 gboolean wait_active = TRUE;
 gboolean have_reliable_membership_id = FALSE;
 GHashTable *ipc_client_list = NULL;
 GHashTable *membership_list = NULL;
 GHashTable *membership_notify_list = NULL;
 
 #define MAX_RESPAWN		100
 #define LOOPBACK_ID		16777343
 #define crm_flag_none		0x00000000
 #define crm_flag_members	0x00000001
 
 struct crm_identify_msg_s {
     cs_ipc_header_request_t header __attribute__ ((aligned(8)));
     uint32_t id;
     uint32_t pid;
     int32_t votes;
     uint32_t processes;
     char uname[256];
     char version[256];
     uint64_t born_on;
 } __attribute__ ((packed));
 
 /* *INDENT-OFF* */
 static crm_child_t pcmk_children[] = {
     { 0, crm_proc_none,     crm_flag_none,    0, 0, FALSE, "none",     NULL,		NULL,			   NULL, NULL },
     { 0, crm_proc_plugin,      crm_flag_none,    0, 0, FALSE, "ais",      NULL,		NULL,			   NULL, NULL },
     { 0, crm_proc_lrmd,     crm_flag_none,    3, 0, TRUE,  "lrmd",     NULL,		CRM_DAEMON_DIR"/lrmd",     NULL, NULL },
     { 0, crm_proc_cib,      crm_flag_members, 1, 0, TRUE,  "cib",      CRM_DAEMON_USER, CRM_DAEMON_DIR"/cib",      NULL, NULL },
     { 0, crm_proc_crmd,     crm_flag_members, 6, 0, TRUE,  "crmd",     CRM_DAEMON_USER, CRM_DAEMON_DIR"/crmd",     NULL, NULL },
     { 0, crm_proc_attrd,    crm_flag_none,    4, 0, TRUE,  "attrd",    CRM_DAEMON_USER, CRM_DAEMON_DIR"/attrd",    NULL, NULL },
     { 0, crm_proc_stonithd, crm_flag_none,    0, 0, TRUE,  "stonithd", NULL,		"/bin/false",		   NULL, NULL },
     { 0, crm_proc_pe,       crm_flag_none,    5, 0, TRUE,  "pengine",  CRM_DAEMON_USER, CRM_DAEMON_DIR"/pengine",  NULL, NULL },
     { 0, crm_proc_mgmtd,    crm_flag_none,    7, 0, TRUE,  "mgmtd",    NULL,		HB_DAEMON_DIR"/mgmtd",     NULL, NULL },
     { 0, crm_proc_stonith_ng, crm_flag_none,  2, 0, TRUE,  "stonith-ng", NULL,		CRM_DAEMON_DIR"/stonithd", NULL, NULL },
 };
 /* *INDENT-ON* */
 
 void send_cluster_id(void);
 int send_cluster_msg_raw(const AIS_Message * ais_msg);
 char *pcmk_generate_membership_data(void);
 gboolean check_message_sanity(const AIS_Message * msg, const char *data);
 
 typedef const void ais_void_ptr;
 int pcmk_shutdown(void);
 void pcmk_peer_update(enum totem_configuration_type configuration_type,
                       const unsigned int *member_list, size_t member_list_entries,
                       const unsigned int *left_list, size_t left_list_entries,
                       const unsigned int *joined_list, size_t joined_list_entries,
                       const struct memb_ring_id *ring_id);
 
 int pcmk_startup(struct corosync_api_v1 *corosync_api);
 int pcmk_config_init(struct corosync_api_v1 *corosync_api);
 
 int pcmk_ipc_exit(void *conn);
 int pcmk_ipc_connect(void *conn);
 void pcmk_ipc(void *conn, ais_void_ptr * msg);
 
 void pcmk_exec_dump(void);
 void pcmk_cluster_swab(void *msg);
 void pcmk_cluster_callback(ais_void_ptr * message, unsigned int nodeid);
 
 void pcmk_nodeid(void *conn, ais_void_ptr * msg);
 void pcmk_nodes(void *conn, ais_void_ptr * msg);
 void pcmk_notify(void *conn, ais_void_ptr * msg);
 void pcmk_remove_member(void *conn, ais_void_ptr * msg);
 void pcmk_quorum(void *conn, ais_void_ptr * msg);
 
 void pcmk_cluster_id_swab(void *msg);
 void pcmk_cluster_id_callback(ais_void_ptr * message, unsigned int nodeid);
 void ais_remove_peer(char *node_id);
 
 static uint32_t
 get_process_list(void)
 {
     int lpc = 0;
     uint32_t procs = crm_proc_plugin;
 
     if (use_mcp) {
         return 0;
     }
 
     for (lpc = 0; lpc < SIZEOF(pcmk_children); lpc++) {
         if (pcmk_children[lpc].pid != 0) {
             procs |= pcmk_children[lpc].flag;
         }
     }
     return procs;
 }
 
 static struct corosync_lib_handler pcmk_lib_service[] = {
     {                           /* 0 */
      .lib_handler_fn = pcmk_ipc,
      .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED,
      },
     {                           /* 1 */
      .lib_handler_fn = pcmk_nodes,
      .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED,
      },
     {                           /* 2 */
      .lib_handler_fn = pcmk_notify,
      .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED,
      },
     {                           /* 3 */
      .lib_handler_fn = pcmk_nodeid,
      .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED,
      },
     {                           /* 4 */
      .lib_handler_fn = pcmk_remove_member,
      .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED,
      },
     {                           /* 5 */
      .lib_handler_fn = pcmk_quorum,
      .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED,
      },
 };
 
 static struct corosync_exec_handler pcmk_exec_service[] = {
     {                           /* 0 */
      .exec_handler_fn = pcmk_cluster_callback,
      .exec_endian_convert_fn = pcmk_cluster_swab},
     {                           /* 1 */
      .exec_handler_fn = pcmk_cluster_id_callback,
      .exec_endian_convert_fn = pcmk_cluster_id_swab}
 };
 
 /*
  * Exports the interface for the service
  */
 /* *INDENT-OFF* */
 struct corosync_service_engine pcmk_service_handler = {
     .name			= (char *)"Pacemaker Cluster Manager "PACKAGE_VERSION,
     .id				= PCMK_SERVICE_ID,
     .private_data_size		= 0,
     .flow_control		= COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED, 
     .allow_inquorate		= CS_LIB_ALLOW_INQUORATE,
     .lib_init_fn		= pcmk_ipc_connect,
     .lib_exit_fn		= pcmk_ipc_exit,
     .exec_init_fn		= pcmk_startup,
     .exec_exit_fn		= pcmk_shutdown,
     .config_init_fn		= pcmk_config_init,
     .priority			= 50,    
     .lib_engine			= pcmk_lib_service,
     .lib_engine_count		= sizeof (pcmk_lib_service) / sizeof (struct corosync_lib_handler),
     .exec_engine		= pcmk_exec_service,
     .exec_engine_count		= sizeof (pcmk_exec_service) / sizeof (struct corosync_exec_handler),
     .confchg_fn			= pcmk_peer_update,
     .exec_dump_fn		= pcmk_exec_dump,
 /* 	void (*sync_init) (void); */
 /* 	int (*sync_process) (void); */
 /* 	void (*sync_activate) (void); */
 /* 	void (*sync_abort) (void); */
 };
 
 
 /*
  * Dynamic Loader definition
  */
 struct corosync_service_engine *pcmk_get_handler_ver0 (void);
 
 struct corosync_service_engine_iface_ver0 pcmk_service_handler_iface = {
     .corosync_get_service_engine_ver0 = pcmk_get_handler_ver0
 };
 
 static struct lcr_iface openais_pcmk_ver0[2] = {
     {
 	.name				= "pacemaker",
 	.version			= 0,
 	.versions_replace		= 0,
 	.versions_replace_count		= 0,
 	.dependencies			= 0,
 	.dependency_count		= 0,
 	.constructor			= NULL,
 	.destructor			= NULL,
 	.interfaces			= NULL
     },
     {
 	.name				= "pacemaker",
 	.version			= 1,
 	.versions_replace		= 0,
 	.versions_replace_count		= 0,
 	.dependencies			= 0,
 	.dependency_count		= 0,
 	.constructor			= NULL,
 	.destructor			= NULL,
 	.interfaces			= NULL
     }
 };
 
 static struct lcr_comp pcmk_comp_ver0 = {
     .iface_count			= 2,
     .ifaces				= openais_pcmk_ver0
 };
 /* *INDENT-ON* */
 
 struct corosync_service_engine *
 pcmk_get_handler_ver0(void)
 {
     return (&pcmk_service_handler);
 }
 
 __attribute__ ((constructor))
 static void
 register_this_component(void)
 {
     lcr_interfaces_set(&openais_pcmk_ver0[0], &pcmk_service_handler_iface);
     lcr_interfaces_set(&openais_pcmk_ver0[1], &pcmk_service_handler_iface);
 
     lcr_component_register(&pcmk_comp_ver0);
 }
 
 static int
 plugin_has_quorum(void)
 {
     if ((plugin_expected_votes >> 1) < plugin_has_votes) {
         return 1;
     }
     return 0;
 }
 
 static void
 update_expected_votes(int value)
 {
     if (value < plugin_has_votes) {
         /* Never drop below the number of connected nodes */
         ais_info("Cannot update expected quorum votes %d -> %d:"
                  " value cannot be less that the current number of votes",
                  plugin_expected_votes, value);
 
     } else if (plugin_expected_votes != value) {
         ais_info("Expected quorum votes %d -> %d", plugin_expected_votes, value);
         plugin_expected_votes = value;
     }
 }
 
 /* Create our own local copy of the config so we can navigate it */
 static void
 process_ais_conf(void)
 {
     char *value = NULL;
     gboolean any_log = FALSE;
     hdb_handle_t top_handle = 0;
     hdb_handle_t local_handle = 0;
 
     ais_info("Reading configure");
     top_handle = config_find_init(pcmk_api, "logging");
     local_handle = config_find_next(pcmk_api, "logging", top_handle);
 
     get_config_opt(pcmk_api, local_handle, "debug", &value, "on");
     if (ais_get_boolean(value)) {
         plugin_log_level = LOG_DEBUG;
         pcmk_env.debug = "1";
 
     } else {
         plugin_log_level = LOG_INFO;
         pcmk_env.debug = "0";
     }
 
     get_config_opt(pcmk_api, local_handle, "to_logfile", &value, "off");
     if (ais_get_boolean(value)) {
         get_config_opt(pcmk_api, local_handle, "logfile", &value, NULL);
 
         if (value == NULL) {
             ais_err("Logging to a file requested but no log file specified");
 
         } else {
             uid_t pcmk_uid = geteuid();
             uid_t pcmk_gid = getegid();
 
             FILE *logfile = fopen(value, "a");
 
             if (logfile) {
                 int ignore = 0;
                 int logfd = fileno(logfile);
 
                 pcmk_env.logfile = value;
 
                 /* Ensure the file has the correct permissions */
                 ignore = fchown(logfd, pcmk_uid, pcmk_gid);
                 ignore = fchmod(logfd, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
 
                 fprintf(logfile, "Set r/w permissions for uid=%d, gid=%d on %s\n",
                         pcmk_uid, pcmk_gid, value);
                 fflush(logfile);
                 fsync(logfd);
                 fclose(logfile);
                 any_log = TRUE;
 
             } else {
                 ais_err("Couldn't create logfile: %s", value);
             }
         }
     }
 
     get_config_opt(pcmk_api, local_handle, "to_syslog", &value, "on");
     if (any_log && ais_get_boolean(value) == FALSE) {
         ais_info("User configured file based logging and explicitly disabled syslog.");
         value = "none";
 
     } else {
         if (ais_get_boolean(value) == FALSE) {
             ais_err
                 ("Please enable some sort of logging, either 'to_file: on' or  'to_syslog: on'.");
             ais_err("If you use file logging, be sure to also define a value for 'logfile'");
         }
         get_config_opt(pcmk_api, local_handle, "syslog_facility", &value, "daemon");
     }
     pcmk_env.syslog = value;
 
     config_find_done(pcmk_api, local_handle);
 
     top_handle = config_find_init(pcmk_api, "quorum");
     local_handle = config_find_next(pcmk_api, "quorum", top_handle);
     get_config_opt(pcmk_api, local_handle, "provider", &value, NULL);
     if (value && ais_str_eq("quorum_cman", value)) {
         pcmk_env.quorum = "cman";
     } else {
         pcmk_env.quorum = "pcmk";
     }
 
     top_handle = config_find_init(pcmk_api, "service");
     local_handle = config_find_next(pcmk_api, "service", top_handle);
     while (local_handle) {
         value = NULL;
         pcmk_api->object_key_get(local_handle, "name", strlen("name"), (void **)&value, NULL);
         if (ais_str_eq("pacemaker", value)) {
             break;
         }
         local_handle = config_find_next(pcmk_api, "service", top_handle);
     }
 
     get_config_opt(pcmk_api, local_handle, "ver", &value, "0");
     if (ais_str_eq(value, "1")) {
         ais_info("Enabling MCP mode: Use the Pacemaker init script to complete Pacemaker startup");
         use_mcp = TRUE;
     }
 
     get_config_opt(pcmk_api, local_handle, "clustername", &local_cname, "pcmk");
     local_cname_len = strlen(local_cname);
 
     get_config_opt(pcmk_api, local_handle, "use_logd", &value, "no");
     pcmk_env.use_logd = value;
 
     get_config_opt(pcmk_api, local_handle, "use_mgmtd", &value, "no");
     if (ais_get_boolean(value) == FALSE) {
         int lpc = 0;
 
         for (; lpc < SIZEOF(pcmk_children); lpc++) {
             if (crm_proc_mgmtd & pcmk_children[lpc].flag) {
                 /* Disable mgmtd startup */
                 pcmk_children[lpc].start_seq = 0;
                 break;
             }
         }
     }
 
     config_find_done(pcmk_api, local_handle);
 }
 
 int
 pcmk_config_init(struct corosync_api_v1 *unused)
 {
     return 0;
 }
 
 static void *
 pcmk_wait_dispatch(void *arg)
 {
     struct timespec waitsleep = {
         .tv_sec = 1,
         .tv_nsec = 0
     };
 
     while (wait_active) {
         int lpc = 0;
 
         for (; lpc < SIZEOF(pcmk_children); lpc++) {
             if (pcmk_children[lpc].pid > 0) {
                 int status;
                 pid_t pid = wait4(pcmk_children[lpc].pid, &status, WNOHANG, NULL);
 
                 if (pid == 0) {
                     continue;
 
                 } else if (pid < 0) {
                     ais_perror("Call to wait4(%s) failed", pcmk_children[lpc].name);
                     continue;
                 }
 
                 /* cleanup */
                 pcmk_children[lpc].pid = 0;
                 pcmk_children[lpc].conn = NULL;
                 pcmk_children[lpc].async_conn = NULL;
 
                 if (WIFSIGNALED(status)) {
                     int sig = WTERMSIG(status);
 
                     ais_err("Child process %s terminated with signal %d"
                             " (pid=%d, core=%s)",
                             pcmk_children[lpc].name, sig, pid,
                             WCOREDUMP(status) ? "true" : "false");
 
                 } else if (WIFEXITED(status)) {
                     int rc = WEXITSTATUS(status);
 
                     do_ais_log(rc == 0 ? LOG_NOTICE : LOG_ERR,
                                "Child process %s exited (pid=%d, rc=%d)", pcmk_children[lpc].name,
                                pid, rc);
 
                     if (rc == 100) {
                         ais_notice("Child process %s no longer wishes"
                                    " to be respawned", pcmk_children[lpc].name);
                         pcmk_children[lpc].respawn = FALSE;
                     }
                 }
 
                 /* Broadcast the fact that one of our processes died
                  * 
                  * Try to get some logging of the cause out first though
                  * because we're probably about to get fenced
                  *
                  * Potentially do this only if respawn_count > N
                  * to allow for local recovery
                  */
                 send_cluster_id();
 
                 pcmk_children[lpc].respawn_count += 1;
                 if (pcmk_children[lpc].respawn_count > MAX_RESPAWN) {
                     ais_err("Child respawn count exceeded by %s", pcmk_children[lpc].name);
                     pcmk_children[lpc].respawn = FALSE;
                 }
                 if (pcmk_children[lpc].respawn) {
                     ais_notice("Respawning failed child process: %s", pcmk_children[lpc].name);
                     spawn_child(&(pcmk_children[lpc]));
                 }
                 send_cluster_id();
             }
         }
         sched_yield();
         nanosleep(&waitsleep, 0);
     }
     return 0;
 }
 
 static uint32_t
 pcmk_update_nodeid(void)
 {
     int last = local_nodeid;
 
     local_nodeid = pcmk_api->totem_nodeid_get();
 
     if (last != local_nodeid) {
         if (last == 0) {
             ais_info("Local node id: %u", local_nodeid);
 
         } else {
             char *last_s = NULL;
 
             ais_malloc0(last_s, 32);
             ais_warn("Detected local node id change: %u -> %u", last, local_nodeid);
             snprintf(last_s, 31, "%u", last);
             ais_remove_peer(last_s);
             ais_free(last_s);
         }
         update_member(local_nodeid, 0, 0, 1, 0, local_uname, CRM_NODE_MEMBER, NULL);
     }
 
     return local_nodeid;
 }
 
 static void
 build_path(const char *path_c, mode_t mode)
 {
     int offset = 1, len = 0;
     char *path = ais_strdup(path_c);
 
     AIS_CHECK(path != NULL, return);
     for (len = strlen(path); offset < len; offset++) {
         if (path[offset] == '/') {
             path[offset] = 0;
             if (mkdir(path, mode) < 0 && errno != EEXIST) {
                 ais_perror("Could not create directory '%s'", path);
                 break;
             }
             path[offset] = '/';
         }
     }
     if (mkdir(path, mode) < 0 && errno != EEXIST) {
         ais_perror("Could not create directory '%s'", path);
     }
     ais_free(path);
 }
 
 int
 pcmk_startup(struct corosync_api_v1 *init_with)
 {
     int rc = 0;
     int lpc = 0;
     int start_seq = 1;
     struct utsname us;
     struct rlimit cores;
     static int max = SIZEOF(pcmk_children);
 
     uid_t pcmk_uid = 0;
     gid_t pcmk_gid = 0;
 
     uid_t root_uid = -1;
     uid_t cs_uid = geteuid();
 
     pcmk_user_lookup("root", &root_uid, NULL);
 
     pcmk_api = init_with;
 
     pcmk_env.debug = "0";
     pcmk_env.logfile = NULL;
     pcmk_env.use_logd = "false";
     pcmk_env.syslog = "daemon";
 
     if (cs_uid != root_uid) {
         ais_err("Corosync must be configured to start as 'root',"
                 " otherwise Pacemaker cannot manage services."
                 "  Expected %d got %d", root_uid, cs_uid);
         return -1;
     }
 
     process_ais_conf();
 
     membership_list = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, destroy_ais_node);
     membership_notify_list = g_hash_table_new(g_direct_hash, g_direct_equal);
     ipc_client_list = g_hash_table_new(g_direct_hash, g_direct_equal);
 
     ais_info("CRM: Initialized");
     log_printf(LOG_INFO, "Logging: Initialized %s\n", __PRETTY_FUNCTION__);
 
     rc = getrlimit(RLIMIT_CORE, &cores);
     if (rc < 0) {
         ais_perror("Cannot determine current maximum core size.");
     } else {
         if (cores.rlim_max == 0 && geteuid() == 0) {
             cores.rlim_max = RLIM_INFINITY;
         } else {
             ais_info("Maximum core file size is: %lu", cores.rlim_max);
         }
         cores.rlim_cur = cores.rlim_max;
 
         rc = setrlimit(RLIMIT_CORE, &cores);
         if (rc < 0) {
             ais_perror("Core file generation will remain disabled."
                        " Core files are an important diagnositic tool,"
                        " please consider enabling them by default.");
         }
 #if 0
         /* system() is not thread-safe, can't call from here
          * Actually, its a pretty hacky way to try and achieve this anyway
          */
         if (system("echo 1 > /proc/sys/kernel/core_uses_pid") != 0) {
             ais_perror("Could not enable /proc/sys/kernel/core_uses_pid");
         }
 #endif
     }
 
     if (pcmk_user_lookup(CRM_DAEMON_USER, &pcmk_uid, &pcmk_gid) < 0) {
         ais_err("Cluster user %s does not exist, aborting Pacemaker startup", CRM_DAEMON_USER);
         return TRUE;
     }
 
     rc = mkdir(CRM_STATE_DIR, 0750);
     rc = chown(CRM_STATE_DIR, pcmk_uid, pcmk_gid);
 
     /* Used by stonithd */
     build_path(HA_STATE_DIR "/heartbeat", 0755);
 
     /* Used by RAs - Leave owned by root */
     build_path(CRM_RSCTMP_DIR, 0755);
 
     rc = uname(&us);
     AIS_ASSERT(rc == 0);
     local_uname = ais_strdup(us.nodename);
     local_uname_len = strlen(local_uname);
 
     ais_info("Service: %d", PCMK_SERVICE_ID);
     ais_info("Local hostname: %s", local_uname);
     pcmk_update_nodeid();
 
     if (use_mcp == FALSE) {
         pthread_create(&pcmk_wait_thread, NULL, pcmk_wait_dispatch, NULL);
         for (start_seq = 1; start_seq < max; start_seq++) {
             /* dont start anything with start_seq < 1 */
             for (lpc = 0; lpc < max; lpc++) {
                 if (start_seq == pcmk_children[lpc].start_seq) {
                     spawn_child(&(pcmk_children[lpc]));
                 }
             }
         }
     }
     return 0;
 }
 
 /*
   static void ais_print_node(const char *prefix, struct totem_ip_address *host) 
   {
   int len = 0;
   char *buffer = NULL;
 
   ais_malloc0(buffer, INET6_ADDRSTRLEN+1);
 	
   inet_ntop(host->family, host->addr, buffer, INET6_ADDRSTRLEN);
 
   len = strlen(buffer);
   ais_info("%s: %.*s", prefix, len, buffer);
   ais_free(buffer);
   }
 */
 
 #if 0
 /* copied here for reference from exec/totempg.c */
 char *
 totempg_ifaces_print(unsigned int nodeid)
 {
     static char iface_string[256 * INTERFACE_MAX];
     char one_iface[64];
     struct totem_ip_address interfaces[INTERFACE_MAX];
     char **status;
     unsigned int iface_count;
     unsigned int i;
     int res;
 
     iface_string[0] = '\0';
 
     res = totempg_ifaces_get(nodeid, interfaces, &status, &iface_count);
     if (res == -1) {
         return ("no interface found for nodeid");
     }
 
     for (i = 0; i < iface_count; i++) {
         sprintf(one_iface, "r(%d) ip(%s), ", i, totemip_print(&interfaces[i]));
         strcat(iface_string, one_iface);
     }
     return (iface_string);
 }
 #endif
 
 static void
 ais_mark_unseen_peer_dead(gpointer key, gpointer value, gpointer user_data)
 {
     int *changed = user_data;
     crm_node_t *node = value;
 
     if (node->last_seen != membership_seq && ais_str_eq(CRM_NODE_LOST, node->state) == FALSE) {
         ais_info("Node %s was not seen in the previous transition", node->uname);
         *changed += update_member(node->id, 0, membership_seq, node->votes,
                                   node->processes, node->uname, CRM_NODE_LOST, NULL);
     }
 }
 
 void
 pcmk_peer_update(enum totem_configuration_type configuration_type,
                  const unsigned int *member_list, size_t member_list_entries,
                  const unsigned int *left_list, size_t left_list_entries,
                  const unsigned int *joined_list, size_t joined_list_entries,
                  const struct memb_ring_id *ring_id)
 {
     int lpc = 0;
     int changed = 0;
     int do_update = 0;
 
     AIS_ASSERT(ring_id != NULL);
     switch (configuration_type) {
         case TOTEM_CONFIGURATION_REGULAR:
             do_update = 1;
             break;
         case TOTEM_CONFIGURATION_TRANSITIONAL:
             break;
     }
 
     membership_seq = ring_id->seq;
     ais_notice("%s membership event on ring %lld: memb=%ld, new=%ld, lost=%ld",
                do_update ? "Stable" : "Transitional", ring_id->seq,
                (long)member_list_entries, (long)joined_list_entries, (long)left_list_entries);
 
     if (do_update == 0) {
         for (lpc = 0; lpc < joined_list_entries; lpc++) {
             const char *prefix = "new: ";
             uint32_t nodeid = joined_list[lpc];
 
             ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid);
         }
         for (lpc = 0; lpc < member_list_entries; lpc++) {
             const char *prefix = "memb:";
             uint32_t nodeid = member_list[lpc];
 
             ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid);
         }
         for (lpc = 0; lpc < left_list_entries; lpc++) {
             const char *prefix = "lost:";
             uint32_t nodeid = left_list[lpc];
 
             ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid);
         }
         return;
     }
 
     for (lpc = 0; lpc < joined_list_entries; lpc++) {
         const char *prefix = "NEW: ";
         uint32_t nodeid = joined_list[lpc];
         crm_node_t *node = NULL;
 
         changed += update_member(nodeid, 0, membership_seq, -1, 0, NULL, CRM_NODE_MEMBER, NULL);
 
         ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid);
 
         node = g_hash_table_lookup(membership_list, GUINT_TO_POINTER(nodeid));
         if (node->addr == NULL) {
             const char *addr = totempg_ifaces_print(nodeid);
 
             node->addr = ais_strdup(addr);
             ais_debug("Node %u has address %s", nodeid, node->addr);
         }
     }
 
     for (lpc = 0; lpc < member_list_entries; lpc++) {
         const char *prefix = "MEMB:";
         uint32_t nodeid = member_list[lpc];
 
         changed += update_member(nodeid, 0, membership_seq, -1, 0, NULL, CRM_NODE_MEMBER, NULL);
 
         ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid);
     }
 
     for (lpc = 0; lpc < left_list_entries; lpc++) {
         const char *prefix = "LOST:";
         uint32_t nodeid = left_list[lpc];
 
         changed += update_member(nodeid, 0, membership_seq, -1, 0, NULL, CRM_NODE_LOST, NULL);
         ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid);
     }
 
     if (changed && joined_list_entries == 0 && left_list_entries == 0) {
         ais_err("Something strange happened: %d", changed);
         changed = 0;
     }
 
     ais_trace("Reaping unseen nodes...");
     g_hash_table_foreach(membership_list, ais_mark_unseen_peer_dead, &changed);
 
     if (member_list_entries > 1) {
         /* Used to set born-on in send_cluster_id())
          * We need to wait until we have at least one peer since first
          * membership id is based on the one before we stopped and isn't reliable
          */
         have_reliable_membership_id = TRUE;
     }
 
     if (changed) {
         ais_debug("%d nodes changed", changed);
         pcmk_update_nodeid();
         send_member_notification();
     }
 
     send_cluster_id();
 }
 
 int
 pcmk_ipc_exit(void *conn)
 {
     int lpc = 0;
     const char *client = NULL;
     void *async_conn = conn;
 
     for (; lpc < SIZEOF(pcmk_children); lpc++) {
         if (pcmk_children[lpc].conn == conn) {
             if (wait_active == FALSE) {
                 /* Make sure the shutdown loop exits */
                 pcmk_children[lpc].pid = 0;
             }
             pcmk_children[lpc].conn = NULL;
             pcmk_children[lpc].async_conn = NULL;
             client = pcmk_children[lpc].name;
             break;
         }
     }
 
     g_hash_table_remove(membership_notify_list, async_conn);
     g_hash_table_remove(ipc_client_list, async_conn);
 
     if (client) {
         do_ais_log(LOG_INFO, "Client %s (conn=%p, async-conn=%p) left", client, conn, async_conn);
     } else {
         do_ais_log((LOG_DEBUG + 1), "Client %s (conn=%p, async-conn=%p) left",
                    "unknown-transient", conn, async_conn);
     }
 
     return (0);
 }
 
 int
 pcmk_ipc_connect(void *conn)
 {
     /* OpenAIS hasn't finished setting up the connection at this point
      * Sending messages now messes up the protocol!
      */
     return (0);
 }
 
 /*
  * Executive message handlers
  */
 void
 pcmk_cluster_swab(void *msg)
 {
     AIS_Message *ais_msg = msg;
 
     ais_trace("Performing endian conversion...");
     ais_msg->id = swab32(ais_msg->id);
     ais_msg->size = swab32(ais_msg->size);
     ais_msg->is_compressed = swab32(ais_msg->is_compressed);
     ais_msg->compressed_size = swab32(ais_msg->compressed_size);
 
     ais_msg->host.id = swab32(ais_msg->host.id);
     ais_msg->host.pid = swab32(ais_msg->host.pid);
     ais_msg->host.type = swab32(ais_msg->host.type);
     ais_msg->host.size = swab32(ais_msg->host.size);
     ais_msg->host.local = swab32(ais_msg->host.local);
 
     ais_msg->sender.id = swab32(ais_msg->sender.id);
     ais_msg->sender.pid = swab32(ais_msg->sender.pid);
     ais_msg->sender.type = swab32(ais_msg->sender.type);
     ais_msg->sender.size = swab32(ais_msg->sender.size);
     ais_msg->sender.local = swab32(ais_msg->sender.local);
 
     ais_msg->header.size = swab32(ais_msg->header.size);
     ais_msg->header.id = swab32(ais_msg->header.id);
     ais_msg->header.error = swab32(ais_msg->header.error);
 }
 
 void
 pcmk_cluster_callback(ais_void_ptr * message, unsigned int nodeid)
 {
     const AIS_Message *ais_msg = message;
 
     ais_trace("Message from node %u (%s)", nodeid, nodeid == local_nodeid ? "local" : "remote");
 /*  Shouldn't be required...
     update_member(
  	ais_msg->sender.id, membership_seq, -1, 0, ais_msg->sender.uname, NULL);
 */
 
     if (ais_msg->host.size == 0 || ais_str_eq(ais_msg->host.uname, local_uname)) {
         route_ais_message(ais_msg, FALSE);
 
     } else {
         ais_trace("Discarding Msg[%d] (dest=%s:%s, from=%s:%s)",
                     ais_msg->id, ais_dest(&(ais_msg->host)),
                     msg_type2text(ais_msg->host.type),
                     ais_dest(&(ais_msg->sender)), msg_type2text(ais_msg->sender.type));
     }
 }
 
 void
 pcmk_cluster_id_swab(void *msg)
 {
     struct crm_identify_msg_s *ais_msg = msg;
 
     ais_trace("Performing endian conversion...");
     ais_msg->id = swab32(ais_msg->id);
     ais_msg->pid = swab32(ais_msg->pid);
     ais_msg->votes = swab32(ais_msg->votes);
     ais_msg->processes = swab32(ais_msg->processes);
     ais_msg->born_on = swab64(ais_msg->born_on);
 
     ais_msg->header.size = swab32(ais_msg->header.size);
     ais_msg->header.id = swab32(ais_msg->header.id);
 }
 
 void
 pcmk_cluster_id_callback(ais_void_ptr * message, unsigned int nodeid)
 {
     int changed = 0;
     const struct crm_identify_msg_s *msg = message;
 
     if (nodeid != msg->id) {
         ais_err("Invalid message: Node %u claimed to be node %d", nodeid, msg->id);
         return;
     }
     ais_debug("Node update: %s (%s)", msg->uname, msg->version);
     changed =
         update_member(nodeid, msg->born_on, membership_seq, msg->votes, msg->processes, msg->uname,
                       NULL, msg->version);
 
     if (changed) {
         send_member_notification();
     }
 }
 
 struct res_overlay {
     cs_ipc_header_response_t header __attribute((aligned(8)));
     char buf[4096];
 };
 
 struct res_overlay *res_overlay = NULL;
 
 static void
 send_ipc_ack(void *conn)
 {
     if (res_overlay == NULL) {
         ais_malloc0(res_overlay, sizeof(struct res_overlay));
     }
 
     res_overlay->header.id = CRM_MESSAGE_IPC_ACK;
     res_overlay->header.size = sizeof(cs_ipc_header_response_t);
     res_overlay->header.error = CS_OK;
     pcmk_api->ipc_response_send(conn, res_overlay, res_overlay->header.size);
 }
 
 /* local callbacks */
 void
 pcmk_ipc(void *conn, ais_void_ptr * msg)
 {
     AIS_Message *mutable;
     int type = 0, size = 0;
     gboolean transient = TRUE;
     const AIS_Message *ais_msg = (const AIS_Message *)msg;
     void *async_conn = conn;
 
     ais_trace("Message from client %p", conn);
 
     if (check_message_sanity(msg, ((const AIS_Message *)msg)->data) == FALSE) {
         /* The message is corrupted - ignore */
         send_ipc_ack(conn);
         msg = NULL;
         return;
     }
 
     /* Make a copy of the message here and ACK it
      * The message is only valid until a response is sent
      * but the response must also be sent _before_ we send anything else
      */
 
     mutable = ais_msg_copy(ais_msg);
     AIS_ASSERT(check_message_sanity(mutable, mutable->data));
 
     size = mutable->header.size;
     /* ais_malloc0(ais_msg, size); */
     /* memcpy(ais_msg, msg, size); */
 
     type = mutable->sender.type;
     ais_trace
         ("type: %d local: %d conn: %p host type: %d ais: %d sender pid: %d child pid: %d size: %d",
          type, mutable->host.local, pcmk_children[type].conn, mutable->host.type, crm_msg_ais,
          mutable->sender.pid, pcmk_children[type].pid, ((int)SIZEOF(pcmk_children)));
 
     if (type > crm_msg_none && type < SIZEOF(pcmk_children)) {
         /* known child process */
         transient = FALSE;
     }
 #if 0
     /* If this check fails, the order of pcmk_children probably 
      *   doesn't match that of the crm_ais_msg_types enum
      */
     AIS_CHECK(transient || mutable->sender.pid == pcmk_children[type].pid,
               ais_err("Sender: %d, child[%d]: %d", mutable->sender.pid, type,
                       pcmk_children[type].pid); ais_free(mutable); return);
 #endif
 
     if (transient == FALSE
         && type > crm_msg_none
         && mutable->host.local
         && pcmk_children[type].conn == NULL && mutable->host.type == crm_msg_ais) {
         AIS_CHECK(mutable->sender.type != mutable->sender.pid,
                   ais_err("Pid=%d, type=%d", mutable->sender.pid, mutable->sender.type));
 
         ais_info("Recorded connection %p for %s/%d",
                  conn, pcmk_children[type].name, pcmk_children[type].pid);
         pcmk_children[type].conn = conn;
         pcmk_children[type].async_conn = async_conn;
 
         /* Make sure they have the latest membership */
         if (pcmk_children[type].flags & crm_flag_members) {
             char *update = pcmk_generate_membership_data();
 
             g_hash_table_replace(membership_notify_list, async_conn, async_conn);
             ais_info("Sending membership update " U64T " to %s",
                      membership_seq, pcmk_children[type].name);
             send_client_msg(async_conn, crm_class_members, crm_msg_none, update);
         }
 
     } else if (transient) {
         AIS_CHECK(mutable->sender.type == mutable->sender.pid,
                   ais_err("Pid=%d, type=%d", mutable->sender.pid, mutable->sender.type));
         g_hash_table_replace(ipc_client_list, async_conn, GUINT_TO_POINTER(mutable->sender.pid));
     }
 
     mutable->sender.id = local_nodeid;
     mutable->sender.size = local_uname_len;
     memset(mutable->sender.uname, 0, MAX_NAME);
     memcpy(mutable->sender.uname, local_uname, mutable->sender.size);
 
     route_ais_message(mutable, TRUE);
     send_ipc_ack(conn);
     msg = NULL;
     ais_free(mutable);
 }
 
 int
 pcmk_shutdown(void)
 {
     int lpc = 0;
     static int phase = 0;
     static int max_wait = 0;
     static time_t next_log = 0;
     static int max = SIZEOF(pcmk_children);
 
     if (use_mcp) {
         if (pcmk_children[crm_msg_crmd].conn || pcmk_children[crm_msg_stonith_ng].conn) {
             time_t now = time(NULL);
 
             if (now > next_log) {
                 next_log = now + 300;
                 ais_notice
                     ("Preventing Corosync shutdown.  Please ensure Pacemaker is stopped first.");
             }
             return -1;
         }
         ais_notice("Unloading Pacemaker plugin");
         return 0;
     }
 
     if (phase == 0) {
         ais_notice("Shuting down Pacemaker");
         phase = max;
     }
 
     wait_active = FALSE;        /* stop the wait loop */
 
     for (; phase > 0; phase--) {
         /* dont stop anything with start_seq < 1 */
 
         for (lpc = max - 1; lpc >= 0; lpc--) {
             if (phase != pcmk_children[lpc].start_seq) {
                 continue;
             }
 
             if (pcmk_children[lpc].pid) {
                 pid_t pid = 0;
                 int status = 0;
                 time_t now = time(NULL);
 
                 if (pcmk_children[lpc].respawn) {
                     max_wait = 5;       /* 5 * 30s = 2.5 minutes... plenty once the crmd is gone */
                     next_log = now + 30;
                     pcmk_children[lpc].respawn = FALSE;
                     stop_child(&(pcmk_children[lpc]), SIGTERM);
                 }
 
                 pid = wait4(pcmk_children[lpc].pid, &status, WNOHANG, NULL);
                 if (pid < 0) {
                     ais_perror("Call to wait4(%s/%d) failed - treating it as stopped",
                                pcmk_children[lpc].name, pcmk_children[lpc].pid);
 
                 } else if (pid == 0) {
                     if (now >= next_log) {
                         max_wait--;
                         next_log = now + 30;
                         ais_notice("Still waiting for %s (pid=%d, seq=%d) to terminate...",
                                    pcmk_children[lpc].name, pcmk_children[lpc].pid,
                                    pcmk_children[lpc].start_seq);
                         if (max_wait <= 0 && phase < pcmk_children[crm_msg_crmd].start_seq) {
                             ais_err("Child %s taking too long to terminate, sending SIGKILL",
                                     pcmk_children[lpc].name);
                             stop_child(&(pcmk_children[lpc]), SIGKILL);
                         }
                     }
                     /* Return control to corosync */
                     return -1;
                 }
             }
 
             /* cleanup */
             ais_notice("%s confirmed stopped", pcmk_children[lpc].name);
             pcmk_children[lpc].async_conn = NULL;
             pcmk_children[lpc].conn = NULL;
             pcmk_children[lpc].pid = 0;
         }
     }
 
     send_cluster_id();
     ais_notice("Shutdown complete");
     /* TODO: Add back the logsys flush call once its written */
 
     return 0;
 }
 
 struct member_loop_data {
     char *string;
 };
 
 static void
 member_vote_count_fn(gpointer key, gpointer value, gpointer user_data)
 {
     crm_node_t *node = value;
 
     if (ais_str_eq(CRM_NODE_MEMBER, node->state)) {
         plugin_has_votes += node->votes;
     }
 }
 
 void
 member_loop_fn(gpointer key, gpointer value, gpointer user_data)
 {
     crm_node_t *node = value;
     struct member_loop_data *data = user_data;
 
     ais_trace("Dumping node %u", node->id);
     data->string = append_member(data->string, node);
 }
 
 char *
 pcmk_generate_membership_data(void)
 {
     int size = 0;
     struct member_loop_data data;
 
     size = 256;
     ais_malloc0(data.string, size);
 
     /* Ensure the list of active processes is up-to-date */
     update_member(local_nodeid, 0, 0, -1, get_process_list(), local_uname, CRM_NODE_MEMBER, NULL);
 
     plugin_has_votes = 0;
     g_hash_table_foreach(membership_list, member_vote_count_fn, NULL);
     if (plugin_has_votes > plugin_expected_votes) {
         update_expected_votes(plugin_has_votes);
     }
 
     snprintf(data.string, size,
              "<nodes id=\"" U64T "\" quorate=\"%s\" expected=\"%u\" actual=\"%u\">",
              membership_seq, plugin_has_quorum()? "true" : "false",
              plugin_expected_votes, plugin_has_votes);
 
     g_hash_table_foreach(membership_list, member_loop_fn, &data);
     size = strlen(data.string);
     data.string = realloc(data.string, size + 9);       /* 9 = </nodes> + nul */
     sprintf(data.string + size, "</nodes>");
     return data.string;
 }
 
 void
 pcmk_nodes(void *conn, ais_void_ptr * msg)
 {
     char *data = pcmk_generate_membership_data();
     void *async_conn = conn;
 
     /* send the ACK before we send any other messages
      * - but after we no longer need to access the message
      */
     send_ipc_ack(conn);
     msg = NULL;
 
     if (async_conn) {
         send_client_msg(async_conn, crm_class_members, crm_msg_none, data);
     }
     ais_free(data);
 }
 
 void
 pcmk_remove_member(void *conn, ais_void_ptr * msg)
 {
     const AIS_Message *ais_msg = msg;
     char *data = get_ais_data(ais_msg);
 
     send_ipc_ack(conn);
     msg = NULL;
 
     if (data != NULL) {
         char *bcast = ais_concat("remove-peer", data, ':');
 
         send_cluster_msg(crm_msg_ais, NULL, bcast);
         ais_info("Sent: %s", bcast);
         ais_free(bcast);
     }
 
     ais_free(data);
 }
 
 static void
 send_quorum_details(void *conn)
 {
     int size = 256;
     char *data = NULL;
 
     ais_malloc0(data, size);
 
     snprintf(data, size, "<quorum id=\"" U64T "\" quorate=\"%s\" expected=\"%u\" actual=\"%u\"/>",
              membership_seq, plugin_has_quorum()? "true" : "false",
              plugin_expected_votes, plugin_has_votes);
 
     send_client_msg(conn, crm_class_quorum, crm_msg_none, data);
     ais_free(data);
 }
 
 void
 pcmk_quorum(void *conn, ais_void_ptr * msg)
 {
     char *dummy = NULL;
     const AIS_Message *ais_msg = msg;
     char *data = get_ais_data(ais_msg);
 
     send_ipc_ack(conn);
     msg = NULL;
 
     /* Make sure the current number of votes is accurate */
     dummy = pcmk_generate_membership_data();
     ais_free(dummy);
 
     /* Calls without data just want the current quorum details */
     if (data != NULL && strlen(data) > 0) {
         int value = ais_get_int(data, NULL);
 
         update_expected_votes(value);
     }
 
     send_quorum_details(conn);
     ais_free(data);
 }
 
 void
 pcmk_notify(void *conn, ais_void_ptr * msg)
 {
     const AIS_Message *ais_msg = msg;
     char *data = get_ais_data(ais_msg);
     void *async_conn = conn;
 
     int enable = 0;
     int sender = ais_msg->sender.pid;
 
     send_ipc_ack(conn);
     msg = NULL;
 
     if (ais_str_eq("true", data)) {
         enable = 1;
     }
 
     ais_info("%s node notifications for child %d (%p)",
              enable ? "Enabling" : "Disabling", sender, async_conn);
     if (enable) {
         g_hash_table_replace(membership_notify_list, async_conn, async_conn);
     } else {
         g_hash_table_remove(membership_notify_list, async_conn);
     }
     ais_free(data);
 }
 
 void
 pcmk_nodeid(void *conn, ais_void_ptr * msg)
 {
     static int counter = 0;
     struct crm_ais_nodeid_resp_s resp;
 
     ais_trace("Sending local nodeid: %d to %p[%d]", local_nodeid, conn, counter);
 
     resp.header.id = crm_class_nodeid;
     resp.header.size = sizeof(struct crm_ais_nodeid_resp_s);
     resp.header.error = CS_OK;
     resp.id = local_nodeid;
     resp.counter = counter++;
     memset(resp.uname, 0, MAX_NAME);
     memcpy(resp.uname, local_uname, local_uname_len);
     memset(resp.cname, 0, MAX_NAME);
     memcpy(resp.cname, local_cname, local_cname_len);
 
     pcmk_api->ipc_response_send(conn, &resp, resp.header.size);
 }
 
 static gboolean
 ghash_send_update(gpointer key, gpointer value, gpointer data)
 {
     if (send_client_msg(value, crm_class_members, crm_msg_none, data) != 0) {
         /* remove it */
         return TRUE;
     }
     return FALSE;
 }
 
 void
 send_member_notification(void)
 {
     char *update = pcmk_generate_membership_data();
 
     ais_info("Sending membership update " U64T " to %d children",
              membership_seq, g_hash_table_size(membership_notify_list));
 
     g_hash_table_foreach_remove(membership_notify_list, ghash_send_update, update);
     ais_free(update);
 }
 
 gboolean
 check_message_sanity(const AIS_Message * msg, const char *data)
 {
     gboolean sane = TRUE;
     gboolean repaired = FALSE;
     int dest = msg->host.type;
     int tmp_size = msg->header.size - sizeof(AIS_Message);
 
     if (sane && msg->header.size == 0) {
         ais_err("Message with no size");
         sane = FALSE;
     }
 
     if (sane && msg->header.error != CS_OK) {
         ais_err("Message header contains an error: %d", msg->header.error);
         sane = FALSE;
     }
 
     AIS_CHECK(msg->header.size > sizeof(AIS_Message),
               ais_err("Message %d size too small: %d < %zu",
                       msg->header.id, msg->header.size, sizeof(AIS_Message)); return FALSE);
 
     if (sane && ais_data_len(msg) != tmp_size) {
         ais_warn("Message payload size is incorrect: expected %d, got %d", ais_data_len(msg),
                  tmp_size);
         sane = TRUE;
     }
 
     if (sane && ais_data_len(msg) == 0) {
         ais_err("Message with no payload");
         sane = FALSE;
     }
 
     if (sane && data && msg->is_compressed == FALSE) {
         int str_size = strlen(data) + 1;
 
         if (ais_data_len(msg) != str_size) {
             int lpc = 0;
 
             ais_err("Message payload is corrupted: expected %d bytes, got %d",
                     ais_data_len(msg), str_size);
             sane = FALSE;
             for (lpc = (str_size - 10); lpc < msg->size; lpc++) {
                 if (lpc < 0) {
                     lpc = 0;
                 }
                 ais_trace("bad_data[%d]: %d / '%c'", lpc, data[lpc], data[lpc]);
             }
         }
     }
 
     if (sane == FALSE) {
         AIS_CHECK(sane,
                   ais_err
                   ("Invalid message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)",
                    msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)),
                    msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed,
                    ais_data_len(msg), msg->header.size));
 
     } else if (repaired) {
         ais_err
             ("Repaired message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)",
              msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)),
              msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed,
              ais_data_len(msg), msg->header.size);
     } else {
         ais_trace
             ("Verified message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)",
              msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)),
              msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed,
              ais_data_len(msg), msg->header.size);
     }
     return sane;
 }
 
 static int delivered_transient = 0;
 static void
 deliver_transient_msg(gpointer key, gpointer value, gpointer user_data)
 {
     int pid = GPOINTER_TO_INT(value);
     AIS_Message *mutable = user_data;
 
     if (pid == mutable->host.type) {
         int rc = send_client_ipc(key, mutable);
 
         delivered_transient++;
 
         ais_info("Sent message to %s.%d (rc=%d)", ais_dest(&(mutable->host)), pid, rc);
         if (rc != 0) {
             ais_warn("Sending message to %s.%d failed (rc=%d)",
                      ais_dest(&(mutable->host)), pid, rc);
             log_ais_message(LOG_DEBUG, mutable);
         }
     }
 }
 
 gboolean
 route_ais_message(const AIS_Message * msg, gboolean local_origin)
 {
     int rc = 0;
     int dest = msg->host.type;
     const char *reason = "unknown";
     AIS_Message *mutable = ais_msg_copy(msg);
     static int service_id = SERVICE_ID_MAKE(PCMK_SERVICE_ID, 0);
 
     ais_trace("Msg[%d] (dest=%s:%s, from=%s:%s.%d, remote=%s, size=%d)",
                 mutable->id, ais_dest(&(mutable->host)), msg_type2text(dest),
                 ais_dest(&(mutable->sender)), msg_type2text(mutable->sender.type),
                 mutable->sender.pid, local_origin ? "false" : "true", ais_data_len((mutable)));
 
     if (local_origin == FALSE) {
         if (mutable->host.size == 0 || ais_str_eq(local_uname, mutable->host.uname)) {
             mutable->host.local = TRUE;
         }
     }
 
     if (check_message_sanity(mutable, mutable->data) == FALSE) {
         /* Dont send this message to anyone */
         rc = 1;
         goto bail;
     }
 
     if (mutable->host.local) {
         void *conn = NULL;
         const char *lookup = NULL;
 
         if (dest == crm_msg_ais) {
             process_ais_message(mutable);
             goto bail;
 
         } else if (dest == crm_msg_lrmd) {
             /* lrmd messages are routed via the crm */
             dest = crm_msg_crmd;
 
         } else if (dest == crm_msg_te) {
             /* te messages are routed via the crm */
             dest = crm_msg_crmd;
 
         } else if (dest >= SIZEOF(pcmk_children)) {
             /* Transient client */
 
             delivered_transient = 0;
             g_hash_table_foreach(ipc_client_list, deliver_transient_msg, mutable);
             if (delivered_transient) {
                 ais_trace("Sent message to %d transient clients: %d", delivered_transient, dest);
                 goto bail;
 
             } else {
                 /* try the crmd */
                 ais_trace("Sending message to transient client %d via crmd", dest);
                 dest = crm_msg_crmd;
             }
 
         } else if (dest == 0) {
             ais_err("Invalid destination: %d", dest);
             log_ais_message(LOG_ERR, mutable);
             log_printf(LOG_ERR, "%s", get_ais_data(mutable));
             rc = 1;
             goto bail;
         }
 
         lookup = msg_type2text(dest);
         conn = pcmk_children[dest].async_conn;
 
         /* the cluster fails in weird and wonderfully obscure ways when this is not true */
         AIS_ASSERT(ais_str_eq(lookup, pcmk_children[dest].name));
 
         if (mutable->header.id == service_id) {
             mutable->header.id = 0;     /* reset this back to zero for IPC messages */
 
         } else if (mutable->header.id != 0) {
             ais_err("reset header id back to zero from %d", mutable->header.id);
             mutable->header.id = 0;     /* reset this back to zero for IPC messages */
         }
 
         reason = "ipc delivery failed";
         rc = send_client_ipc(conn, mutable);
 
     } else if (local_origin) {
         /* forward to other hosts */
         ais_trace("Forwarding to cluster");
         reason = "cluster delivery failed";
         rc = send_cluster_msg_raw(mutable);
     }
 
     if (rc != 0) {
         ais_warn("Sending message to %s.%s failed: %s (rc=%d)",
                  ais_dest(&(mutable->host)), msg_type2text(dest), reason, rc);
         log_ais_message(LOG_DEBUG, mutable);
     }
 
   bail:
     ais_free(mutable);
     return rc == 0 ? TRUE : FALSE;
 }
 
 int
 send_cluster_msg_raw(const AIS_Message * ais_msg)
 {
     int rc = 0;
     struct iovec iovec;
     static uint32_t msg_id = 0;
     AIS_Message *mutable = ais_msg_copy(ais_msg);
 
     AIS_ASSERT(local_nodeid != 0);
     AIS_ASSERT(ais_msg->header.size == (sizeof(AIS_Message) + ais_data_len(ais_msg)));
 
     if (mutable->id == 0) {
         msg_id++;
         AIS_CHECK(msg_id != 0 /* detect wrap-around */ ,
                   msg_id++;
                   ais_err("Message ID wrapped around"));
         mutable->id = msg_id;
     }
 
     mutable->header.error = CS_OK;
     mutable->header.id = SERVICE_ID_MAKE(PCMK_SERVICE_ID, 0);
 
     mutable->sender.id = local_nodeid;
     mutable->sender.size = local_uname_len;
     memset(mutable->sender.uname, 0, MAX_NAME);
     memcpy(mutable->sender.uname, local_uname, mutable->sender.size);
 
     iovec.iov_base = (char *)mutable;
     iovec.iov_len = mutable->header.size;
 
     ais_trace("Sending message (size=%u)", (unsigned int)iovec.iov_len);
     rc = pcmk_api->totem_mcast(&iovec, 1, TOTEMPG_SAFE);
 
     if (rc == 0 && mutable->is_compressed == FALSE) {
         ais_trace("Message sent: %.80s", mutable->data);
     }
 
     AIS_CHECK(rc == 0, ais_err("Message not sent (%d): %.120s", rc, mutable->data));
 
     ais_free(mutable);
     return rc;
 }
 
 #define min(x,y) (x)<(y)?(x):(y)
 
 void
 send_cluster_id(void)
 {
     int rc = 0;
     int len = 0;
     time_t now = time(NULL);
     struct iovec iovec;
     struct crm_identify_msg_s *msg = NULL;
 
     static time_t started = 0;
     static uint64_t first_seq = 0;
 
     AIS_ASSERT(local_nodeid != 0);
 
     if (started == 0) {
         started = now;
         first_seq = membership_seq;
     }
 
     if (local_born_on == 0) {
         if (started + 15 < now) {
             ais_debug("Born-on set to: " U64T " (age)", first_seq);
             local_born_on = first_seq;
 
         } else if (have_reliable_membership_id) {
             ais_debug("Born-on set to: " U64T " (peer)", membership_seq);
             local_born_on = membership_seq;
 
         } else {
             ais_debug("Leaving born-on unset: " U64T, membership_seq);
         }
     }
 
     ais_malloc0(msg, sizeof(struct crm_identify_msg_s));
     msg->header.size = sizeof(struct crm_identify_msg_s);
 
     msg->id = local_nodeid;
     /* msg->header.error = CS_OK; */
     msg->header.id = SERVICE_ID_MAKE(PCMK_SERVICE_ID, 1);
 
     len = min(local_uname_len, MAX_NAME - 1);
     memset(msg->uname, 0, MAX_NAME);
     memcpy(msg->uname, local_uname, len);
 
     len = min(strlen(VERSION), MAX_NAME - 1);
     memset(msg->version, 0, MAX_NAME);
     memcpy(msg->version, VERSION, len);
 
     msg->votes = 1;
     msg->pid = getpid();
     msg->processes = get_process_list();
     msg->born_on = local_born_on;
 
     ais_debug("Local update: id=%u, born=" U64T ", seq=" U64T "",
               local_nodeid, local_born_on, membership_seq);
     update_member(local_nodeid, local_born_on, membership_seq, msg->votes, msg->processes, NULL,
                   NULL, VERSION);
 
     iovec.iov_base = (char *)msg;
     iovec.iov_len = msg->header.size;
 
     rc = pcmk_api->totem_mcast(&iovec, 1, TOTEMPG_SAFE);
 
     AIS_CHECK(rc == 0, ais_err("Message not sent (%d)", rc));
 
     ais_free(msg);
 }
 
 static gboolean
 ghash_send_removal(gpointer key, gpointer value, gpointer data)
 {
     send_quorum_details(value);
     if (send_client_msg(value, crm_class_rmpeer, crm_msg_none, data) != 0) {
         /* remove it */
         return TRUE;
     }
     return FALSE;
 }
 
 void
 ais_remove_peer(char *node_id)
 {
     uint32_t id = ais_get_int(node_id, NULL);
     crm_node_t *node = g_hash_table_lookup(membership_list, GUINT_TO_POINTER(id));
 
     if (node == NULL) {
         ais_info("Peer %u is unknown", id);
 
     } else if (ais_str_eq(CRM_NODE_MEMBER, node->state)) {
         ais_warn("Peer %u/%s is still active", id, node->uname);
 
     } else if (g_hash_table_remove(membership_list, GUINT_TO_POINTER(id))) {
         plugin_expected_votes--;
         ais_notice("Removed dead peer %u from the membership list", id);
         ais_info("Sending removal of %u to %d children",
                  id, g_hash_table_size(membership_notify_list));
 
         g_hash_table_foreach_remove(membership_notify_list, ghash_send_removal, node_id);
 
     } else {
         ais_warn("Peer %u/%s was not removed", id, node->uname);
     }
 }
 
 gboolean
 process_ais_message(const AIS_Message * msg)
 {
     int len = ais_data_len(msg);
     char *data = get_ais_data(msg);
 
     do_ais_log(LOG_DEBUG,
                "Msg[%d] (dest=%s:%s, from=%s:%s.%d, remote=%s, size=%d): %.90s",
                msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
                ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
                msg->sender.pid,
                msg->sender.uname == local_uname ? "false" : "true", ais_data_len(msg), data);
 
     if (data && len > 12 && strncmp("remove-peer:", data, 12) == 0) {
         char *node = data + 12;
 
         ais_remove_peer(node);
     }
 
     ais_free(data);
     return TRUE;
 }
 
 static void
 member_dump_fn(gpointer key, gpointer value, gpointer user_data)
 {
     crm_node_t *node = value;
 
     ais_info(" node id:%u, uname=%s state=%s processes=%.16x born=" U64T " seen=" U64T
              " addr=%s version=%s", node->id, node->uname ? node->uname : "-unknown-", node->state,
              node->processes, node->born, node->last_seen, node->addr ? node->addr : "-unknown-",
              node->version ? node->version : "-unknown-");
 }
 
 void
 pcmk_exec_dump(void)
 {
     /* Called after SIG_USR2 */
     process_ais_conf();
     ais_info("Local id: %u, uname: %s, born: " U64T, local_nodeid, local_uname, local_born_on);
     ais_info("Membership id: " U64T ", quorate: %s, expected: %u, actual: %u",
              membership_seq, plugin_has_quorum()? "true" : "false",
              plugin_expected_votes, plugin_has_votes);
 
     g_hash_table_foreach(membership_list, member_dump_fn, NULL);
 }
diff --git a/lib/ais/utils.c b/lib/ais/utils.c
index ab8f9565b3..016e48d256 100644
--- a/lib/ais/utils.c
+++ b/lib/ais/utils.c
@@ -1,751 +1,752 @@
 /*
  * Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
  * 
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Lesser General Public
  * License as published by the Free Software Foundation; either
  * version 2.1 of the License, or (at your option) any later version.
  * 
  * This library is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  * Lesser General Public License for more details.
  * 
  * You should have received a copy of the GNU Lesser General Public
  * License along with this library; if not, write to the Free Software
  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
  */
 
 #include <crm_internal.h>
+#include <crm/cluster/internal.h>
 #include <sys/types.h>
 #include <stdlib.h>
 #include <unistd.h>
 #include <fcntl.h>
 #include <errno.h>
 #include <stdio.h>
 #include <signal.h>
 #include <sys/types.h>
 #include <sys/time.h>
 #include <sys/resource.h>
 
 #include <pwd.h>
 #include <glib.h>
 #include <bzlib.h>
 #include <grp.h>
 
 #include "./utils.h"
 #include "./plugin.h"
 
 struct pcmk_env_s pcmk_env;
 
 void
 log_ais_message(int level, const AIS_Message * msg)
 {
     char *data = get_ais_data(msg);
 
     qb_log_from_external_source(__func__, __FILE__,
                                 "Msg[%d] (dest=%s:%s, from=%s:%s.%d, remote=%s, size=%d): %.90s",
                                 level, __LINE__, 0,
                msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
                ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
                msg->sender.pid,
                msg->sender.uname == local_uname ? "false" : "true", ais_data_len(msg), data);
 /*     do_ais_log(level, */
 /* 	       "Msg[%d] (dest=%s:%s, from=%s:%s.%d, remote=%s, size=%d): %.90s", */
 /* 	       msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type), */
 /* 	       ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), */
 /* 	       msg->sender.pid, */
 /* 	       msg->sender.uname==local_uname?"false":"true", */
 /* 	       ais_data_len(msg), data); */
     ais_free(data);
 }
 
 /*
 static gboolean ghash_find_by_uname(gpointer key, gpointer value, gpointer user_data) 
 {
     crm_node_t *node = value;
     int id = GPOINTER_TO_INT(user_data);
 
     if (node->id == id) {
 	return TRUE;
     }
     return FALSE;
 }
 */
 
 static int
 ais_string_to_boolean(const char *s)
 {
     int rc = 0;
 
     if (s == NULL) {
         return rc;
     }
 
     if (strcasecmp(s, "true") == 0
         || strcasecmp(s, "on") == 0
         || strcasecmp(s, "yes") == 0 || strcasecmp(s, "y") == 0 || strcasecmp(s, "1") == 0) {
         rc = 1;
     }
     return rc;
 }
 
 static char *opts_default[] = { NULL, NULL };
 static char *opts_vgrind[] = { NULL, NULL, NULL, NULL, NULL };
 
 gboolean
 spawn_child(crm_child_t * child)
 {
     int lpc = 0;
     uid_t uid = 0;
     struct rlimit oflimits;
     gboolean use_valgrind = FALSE;
     gboolean use_callgrind = FALSE;
     const char *devnull = "/dev/null";
     const char *env_valgrind = getenv("PCMK_valgrind_enabled");
     const char *env_callgrind = getenv("PCMK_callgrind_enabled");
 
     if (child->command == NULL) {
         ais_info("Nothing to do for child \"%s\"", child->name);
         return TRUE;
     }
 
     if (ais_string_to_boolean(env_callgrind)) {
         use_callgrind = TRUE;
         use_valgrind = TRUE;
 
     } else if (env_callgrind != NULL && strstr(env_callgrind, child->name)) {
         use_callgrind = TRUE;
         use_valgrind = TRUE;
 
     } else if (ais_string_to_boolean(env_valgrind)) {
         use_valgrind = TRUE;
 
     } else if (env_valgrind != NULL && strstr(env_valgrind, child->name)) {
         use_valgrind = TRUE;
     }
 
     if (use_valgrind && strlen(VALGRIND_BIN) == 0) {
         ais_warn("Cannot enable valgrind for %s:"
                  " The location of the valgrind binary is unknown", child->name);
         use_valgrind = FALSE;
     }
 
     if (child->uid) {
         if (pcmk_user_lookup(child->uid, &uid, NULL) < 0) {
             ais_err("Invalid uid (%s) specified for %s", child->uid, child->name);
             return FALSE;
         }
     }
 
     child->pid = fork();
     AIS_ASSERT(child->pid != -1);
 
     if (child->pid > 0) {
         /* parent */
         ais_info("Forked child %d for process %s%s", child->pid, child->name,
                  use_valgrind ? " (valgrind enabled: " VALGRIND_BIN ")" : "");
 
     } else {
         /* Setup the two alternate arg arrarys */
         opts_vgrind[0] = ais_strdup(VALGRIND_BIN);
         if (use_callgrind) {
             opts_vgrind[1] = ais_strdup("--tool=callgrind");
             opts_vgrind[2] = ais_strdup("--callgrind-out-file=" CRM_STATE_DIR "/callgrind.out.%p");
             opts_vgrind[3] = ais_strdup(child->command);
             opts_vgrind[4] = NULL;
         } else {
             opts_vgrind[1] = ais_strdup(child->command);
             opts_vgrind[2] = NULL;
             opts_vgrind[3] = NULL;
             opts_vgrind[4] = NULL;
         }
         opts_default[0] = ais_strdup(child->command);;
 
 #if 0
         /* Dont set the group for now - it prevents connection to the cluster */
         if (gid && setgid(gid) < 0) {
             ais_perror("Could not set group to %d", gid);
         }
 #endif
 
         if (uid) {
             struct passwd *pwent = getpwuid(uid);
 
             if(pwent == NULL) {
                 ais_perror("Cannot get password entry of uid: %d", uid);
 
             } else if (initgroups(pwent->pw_name, pwent->pw_gid) < 0) {
                 ais_perror("Cannot initalize groups for %s (uid=%d)", pwent->pw_name, uid);
             }
         }
 
         if (uid && setuid(uid) < 0) {
             ais_perror("Could not set user to %d (%s)", uid, child->uid);
         }
 
         /* Close all open file descriptors */
         getrlimit(RLIMIT_NOFILE, &oflimits);
         for (; lpc < oflimits.rlim_cur; lpc++) {
             close(lpc);
         }
 
         (void)open(devnull, O_RDONLY);  /* Stdin:  fd 0 */
         (void)open(devnull, O_WRONLY);  /* Stdout: fd 1 */
         (void)open(devnull, O_WRONLY);  /* Stderr: fd 2 */
 
 /* *INDENT-OFF* */
 	setenv("HA_COMPRESSION",	"bz2",             1);
 	setenv("HA_cluster_type",	"openais",	   1);
 	setenv("HA_debug",		pcmk_env.debug,    1);
 	setenv("HA_logfacility",	pcmk_env.syslog,   1);
 	setenv("HA_LOGFACILITY",	pcmk_env.syslog,   1);
 	setenv("HA_use_logd",		pcmk_env.use_logd, 1);
 	setenv("HA_quorum_type",	pcmk_env.quorum,   1);
 /* *INDENT-ON* */
 
         if (pcmk_env.logfile) {
             setenv("HA_debugfile", pcmk_env.logfile, 1);
         }
 
         if (use_valgrind) {
             (void)execvp(VALGRIND_BIN, opts_vgrind);
         } else {
             (void)execvp(child->command, opts_default);
         }
         ais_perror("FATAL: Cannot exec %s", child->command);
         exit(100);
     }
     return TRUE;
 }
 
 gboolean
 stop_child(crm_child_t * child, int signal)
 {
     if (signal == 0) {
         signal = SIGTERM;
     }
 
     if (child->command == NULL) {
         ais_info("Nothing to do for child \"%s\"", child->name);
         return TRUE;
     }
 
     ais_debug("Stopping CRM child \"%s\"", child->name);
 
     if (child->pid <= 0) {
         ais_trace("Client %s not running", child->name);
         return TRUE;
     }
 
     errno = 0;
     if (kill(child->pid, signal) == 0) {
         ais_notice("Sent -%d to %s: [%d]", signal, child->name, child->pid);
 
     } else {
         ais_perror("Sent -%d to %s: [%d]", signal, child->name, child->pid);
     }
 
     return TRUE;
 }
 
 void
 destroy_ais_node(gpointer data)
 {
     crm_node_t *node = data;
 
     ais_info("Destroying entry for node %u", node->id);
 
     ais_free(node->addr);
     ais_free(node->uname);
     ais_free(node->state);
     ais_free(node);
 }
 
 int
 update_member(unsigned int id, uint64_t born, uint64_t seq, int32_t votes,
               uint32_t procs, const char *uname, const char *state, const char *version)
 {
     int changed = 0;
     crm_node_t *node = NULL;
 
     node = g_hash_table_lookup(membership_list, GUINT_TO_POINTER(id));
 
     if (node == NULL) {
         ais_malloc0(node, sizeof(crm_node_t));
         ais_info("Creating entry for node %u born on " U64T "", id, seq);
         node->id = id;
         node->addr = NULL;
         node->state = ais_strdup("unknown");
 
         g_hash_table_insert(membership_list, GUINT_TO_POINTER(id), node);
         node = g_hash_table_lookup(membership_list, GUINT_TO_POINTER(id));
     }
     AIS_ASSERT(node != NULL);
 
     if (seq != 0) {
         node->last_seen = seq;
     }
 
     if (born != 0 && node->born != born) {
         changed = TRUE;
         node->born = born;
         ais_info("%p Node %u (%s) born on: " U64T, node, id, uname, born);
     }
 
     if (version != NULL) {
         ais_free(node->version);
         node->version = ais_strdup(version);
     }
 
     if (uname != NULL) {
         if (node->uname == NULL || ais_str_eq(node->uname, uname) == FALSE) {
             ais_info("%p Node %u now known as %s (was: %s)", node, id, uname, node->uname);
             ais_free(node->uname);
             node->uname = ais_strdup(uname);
             changed = TRUE;
         }
     }
 
     if (procs != 0 && procs != node->processes) {
         ais_info("Node %s now has process list: %.32x (%u)", node->uname, procs, procs);
         node->processes = procs;
         changed = TRUE;
     }
 
     if (votes >= 0 && votes != node->votes) {
         ais_info("Node %s now has %d quorum votes (was %d)", node->uname, votes, node->votes);
         node->votes = votes;
         changed = TRUE;
     }
 
     if (state != NULL) {
         if (node->state == NULL || ais_str_eq(node->state, state) == FALSE) {
             ais_free(node->state);
             node->state = ais_strdup(state);
             ais_info("Node %u/%s is now: %s", id, node->uname ? node->uname : "unknown", state);
             changed = TRUE;
         }
     }
 
     return changed;
 }
 
 void
 delete_member(uint32_t id, const char *uname)
 {
     if (uname == NULL) {
         g_hash_table_remove(membership_list, GUINT_TO_POINTER(id));
         return;
     }
     ais_err("Deleting by uname is not yet supported");
 }
 
 const char *
 member_uname(uint32_t id)
 {
     crm_node_t *node = g_hash_table_lookup(membership_list, GUINT_TO_POINTER(id));
 
     if (node == NULL) {
         return ".unknown.";
     }
     if (node->uname == NULL) {
         return ".pending.";
     }
     return node->uname;
 }
 
 char *
 append_member(char *data, crm_node_t * node)
 {
     int size = 1;               /* nul */
     int offset = 0;
     static int fixed_len = 4 + 8 + 7 + 6 + 6 + 7 + 11;
 
     if (data) {
         size = strlen(data);
     }
     offset = size;
 
     size += fixed_len;
     size += 32;                 /* node->id */
     size += 100;                /* node->seq, node->born */
     size += strlen(node->state);
     if (node->uname) {
         size += (7 + strlen(node->uname));
     }
     if (node->addr) {
         size += (6 + strlen(node->addr));
     }
     if (node->version) {
         size += (9 + strlen(node->version));
     }
     data = realloc(data, size);
 
     offset += snprintf(data + offset, size - offset, "<node id=\"%u\" ", node->id);
     if (node->uname) {
         offset += snprintf(data + offset, size - offset, "uname=\"%s\" ", node->uname);
     }
     offset += snprintf(data + offset, size - offset, "state=\"%s\" ", node->state);
     offset += snprintf(data + offset, size - offset, "born=\"" U64T "\" ", node->born);
     offset += snprintf(data + offset, size - offset, "seen=\"" U64T "\" ", node->last_seen);
     offset += snprintf(data + offset, size - offset, "votes=\"%d\" ", node->votes);
     offset += snprintf(data + offset, size - offset, "processes=\"%u\" ", node->processes);
     if (node->addr) {
         offset += snprintf(data + offset, size - offset, "addr=\"%s\" ", node->addr);
     }
     if (node->version) {
         offset += snprintf(data + offset, size - offset, "version=\"%s\" ", node->version);
     }
     offset += snprintf(data + offset, size - offset, "/>");
 
     return data;
 }
 
 void
 swap_sender(AIS_Message * msg)
 {
     int tmp = 0;
     char tmp_s[256];
 
     tmp = msg->host.type;
     msg->host.type = msg->sender.type;
     msg->sender.type = tmp;
 
     tmp = msg->host.type;
     msg->host.size = msg->sender.type;
     msg->sender.type = tmp;
 
     memcpy(tmp_s, msg->host.uname, 256);
     memcpy(msg->host.uname, msg->sender.uname, 256);
     memcpy(msg->sender.uname, tmp_s, 256);
 }
 
 char *
 get_ais_data(const AIS_Message * msg)
 {
     int rc = BZ_OK;
     char *uncompressed = NULL;
     unsigned int new_size = msg->size + 1;
 
     if (msg->is_compressed == FALSE) {
         uncompressed = strdup(msg->data);
 
     } else {
         ais_malloc0(uncompressed, new_size);
 
         rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, (char *)msg->data,
                                         msg->compressed_size, 1, 0);
         if (rc != BZ_OK) {
             ais_info("rc=%d, new=%u expected=%u", rc, new_size, msg->size);
         }
         AIS_ASSERT(rc == BZ_OK);
         AIS_ASSERT(new_size == msg->size);
     }
 
     return uncompressed;
 }
 
 int
 send_cluster_msg(enum crm_ais_msg_types type, const char *host, const char *data)
 {
     int rc = 0;
     int data_len = 0;
     AIS_Message *ais_msg = NULL;
     int total_size = sizeof(AIS_Message);
 
     AIS_ASSERT(local_nodeid != 0);
 
     if (data != NULL) {
         data_len = 1 + strlen(data);
         total_size += data_len;
     }
     ais_malloc0(ais_msg, total_size);
 
     ais_msg->header.size = total_size;
     ais_msg->header.error = CS_OK;
     ais_msg->header.id = 0;
 
     ais_msg->size = data_len;
     ais_msg->sender.type = crm_msg_ais;
     if (data != NULL) {
         memcpy(ais_msg->data, data, data_len);
     }
 
     ais_msg->host.type = type;
     ais_msg->host.id = 0;
     if (host) {
         ais_msg->host.size = strlen(host);
         memset(ais_msg->host.uname, 0, MAX_NAME);
         memcpy(ais_msg->host.uname, host, ais_msg->host.size);
 /* 	ais_msg->host.id = nodeid_lookup(host); */
 
     } else {
         ais_msg->host.type = type;
         ais_msg->host.size = 0;
         memset(ais_msg->host.uname, 0, MAX_NAME);
     }
 
     rc = send_cluster_msg_raw(ais_msg);
     ais_free(ais_msg);
 
     return rc;
 }
 
 extern struct corosync_api_v1 *pcmk_api;
 
 int
 send_client_ipc(void *conn, const AIS_Message * ais_msg)
 {
     int rc = -1;
 
     if (conn == NULL) {
         rc = -2;
 
     } else if (!libais_connection_active(conn)) {
         ais_warn("Connection no longer active");
         rc = -3;
 
 /* 	} else if ((queue->size - 1) == queue->used) { */
 /* 	    ais_err("Connection is throttled: %d", queue->size); */
 
     } else {
 #if SUPPORT_COROSYNC
         rc = pcmk_api->ipc_dispatch_send(conn, ais_msg, ais_msg->header.size);
 #endif
     }
     return rc;
 }
 
 int
 send_client_msg(void *conn, enum crm_ais_msg_class class, enum crm_ais_msg_types type,
                 const char *data)
 {
     int rc = 0;
     int data_len = 0;
     int total_size = sizeof(AIS_Message);
     AIS_Message *ais_msg = NULL;
     static int msg_id = 0;
 
     AIS_ASSERT(local_nodeid != 0);
 
     msg_id++;
     AIS_ASSERT(msg_id != 0 /* wrap-around */ );
 
     if (data != NULL) {
         data_len = 1 + strlen(data);
     }
     total_size += data_len;
 
     ais_malloc0(ais_msg, total_size);
 
     ais_msg->id = msg_id;
     ais_msg->header.id = class;
     ais_msg->header.size = total_size;
     ais_msg->header.error = CS_OK;
 
     ais_msg->size = data_len;
     if (data != NULL) {
         memcpy(ais_msg->data, data, data_len);
     }
 
     ais_msg->host.size = 0;
     ais_msg->host.type = type;
     memset(ais_msg->host.uname, 0, MAX_NAME);
     ais_msg->host.id = 0;
 
     ais_msg->sender.type = crm_msg_ais;
     ais_msg->sender.size = local_uname_len;
     memset(ais_msg->sender.uname, 0, MAX_NAME);
     memcpy(ais_msg->sender.uname, local_uname, ais_msg->sender.size);
     ais_msg->sender.id = local_nodeid;
 
     rc = send_client_ipc(conn, ais_msg);
 
     if (rc != 0) {
         ais_warn("Sending message to %s failed: %d", msg_type2text(type), rc);
         log_ais_message(LOG_DEBUG, ais_msg);
     }
 
     ais_free(ais_msg);
     return rc;
 }
 
 char *
 ais_concat(const char *prefix, const char *suffix, char join)
 {
     int len = 0;
     char *new_str = NULL;
 
     AIS_ASSERT(prefix != NULL);
     AIS_ASSERT(suffix != NULL);
     len = strlen(prefix) + strlen(suffix) + 2;
 
     ais_malloc0(new_str, (len));
     sprintf(new_str, "%s%c%s", prefix, join, suffix);
     new_str[len - 1] = 0;
     return new_str;
 }
 
 hdb_handle_t
 config_find_init(struct corosync_api_v1 * config, char *name)
 {
     hdb_handle_t local_handle = 0;
 
 #if SUPPORT_COROSYNC
     config->object_find_create(OBJECT_PARENT_HANDLE, name, strlen(name), &local_handle);
     ais_info("Local handle: %lld for %s", (long long)local_handle, name);
 #endif
 
     return local_handle;
 }
 
 hdb_handle_t
 config_find_next(struct corosync_api_v1 * config, char *name, hdb_handle_t top_handle)
 {
     int rc = 0;
     hdb_handle_t local_handle = 0;
 
 #if SUPPORT_COROSYNC
     rc = config->object_find_next(top_handle, &local_handle);
 #endif
 
     if (rc < 0) {
         ais_info("No additional configuration supplied for: %s", name);
         local_handle = 0;
     } else {
         ais_info("Processing additional %s options...", name);
     }
     return local_handle;
 }
 
 void
 config_find_done(struct corosync_api_v1 *config, hdb_handle_t local_handle)
 {
 #if SUPPORT_COROSYNC
     config->object_find_destroy(local_handle);
 #endif
 }
 
 int
 get_config_opt(struct corosync_api_v1 *config,
                hdb_handle_t object_service_handle, char *key, char **value, const char *fallback)
 {
     char *env_key = NULL;
 
     *value = NULL;
 
     if (object_service_handle > 0) {
         config->object_key_get(object_service_handle, key, strlen(key), (void **)value, NULL);
     }
 
     if (*value) {
         ais_info("Found '%s' for option: %s", *value, key);
         return 0;
     }
 
     env_key = ais_concat("HA", key, '_');
     *value = getenv(env_key);
     ais_free(env_key);
 
     if (*value) {
         ais_info("Found '%s' in ENV for option: %s", *value, key);
         return 0;
     }
 
     if (fallback) {
         ais_info("Defaulting to '%s' for option: %s", fallback, key);
         *value = ais_strdup(fallback);
 
     } else {
         ais_info("No default for option: %s", key);
     }
 
     return -1;
 }
 
 int
 ais_get_boolean(const char *value)
 {
     if (value == NULL) {
         return 0;
 
     } else if (strcasecmp(value, "true") == 0
                || strcasecmp(value, "on") == 0
                || strcasecmp(value, "yes") == 0
                || strcasecmp(value, "y") == 0 || strcasecmp(value, "1") == 0) {
         return 1;
     }
     return 0;
 }
 
 long long
 ais_get_int(const char *text, char **end_text)
 {
     long long result = -1;
     char *local_end_text = NULL;
 
     errno = 0;
 
     if (text != NULL) {
 #ifdef ANSI_ONLY
         if (end_text != NULL) {
             result = strtol(text, end_text, 10);
         } else {
             result = strtol(text, &local_end_text, 10);
         }
 #else
         if (end_text != NULL) {
             result = strtoll(text, end_text, 10);
         } else {
             result = strtoll(text, &local_end_text, 10);
         }
 #endif
 
         if (errno == EINVAL) {
             ais_err("Conversion of %s failed", text);
             result = -1;
 
         } else if (errno == ERANGE) {
             ais_err("Conversion of %s was clipped: %lld", text, result);
 
         } else if (errno != 0) {
             ais_perror("Conversion of %s failed:", text);
         }
 
         if (local_end_text != NULL && local_end_text[0] != '\0') {
             ais_err("Characters left over after parsing '%s': '%s'", text, local_end_text);
         }
     }
     return result;
 }
 
 #define PW_BUFFER_LEN 500
 
 int
 pcmk_user_lookup(const char *name, uid_t * uid, gid_t * gid)
 {
     int rc = -1;
     char *buffer = NULL;
     struct passwd pwd;
     struct passwd *pwentry = NULL;
 
     ais_malloc0(buffer, PW_BUFFER_LEN);
     getpwnam_r(name, &pwd, buffer, PW_BUFFER_LEN, &pwentry);
     if (pwentry) {
         rc = 0;
         if (uid) {
             *uid = pwentry->pw_uid;
         }
         if (gid) {
             *gid = pwentry->pw_gid;
         }
         ais_debug("Cluster user %s has uid=%d gid=%d", name, pwentry->pw_uid, pwentry->pw_gid);
 
     } else {
         ais_err("Cluster user %s does not exist", name);
     }
 
     ais_free(buffer);
     return rc;
 }
diff --git a/lib/cluster/legacy.c b/lib/cluster/legacy.c
index 987900382e..06d8e93f91 100644
--- a/lib/cluster/legacy.c
+++ b/lib/cluster/legacy.c
@@ -1,1390 +1,1391 @@
 /* 
  * Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
  * 
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Lesser General Public
  * License as published by the Free Software Foundation; either
  * version 2.1 of the License, or (at your option) any later version.
  * 
  * This library is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  * Lesser General Public License for more details.
  * 
  * You should have received a copy of the GNU Lesser General Public
  * License along with this library; if not, write to the Free Software
  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
  */
 
 #include <crm_internal.h>
+#include <crm/cluster/internal.h>
 #include <bzlib.h>
 #include <crm/common/ipc.h>
 #include <crm/cluster.h>
 #include <crm/common/mainloop.h>
 #include <sys/utsname.h>
 
 #if SUPPORT_COROSYNC
 #    include <corosync/confdb.h>
 #    include <corosync/corodefs.h>
 #  include <corosync/cpg.h>
 cpg_handle_t pcmk_cpg_handle = 0;
 
 struct cpg_name pcmk_cpg_group = {
     .length = 0,
     .value[0] = 0,
 };
 #endif
 
 #if HAVE_CMAP
 #  include <corosync/cmap.h>
 #endif
 
 #if SUPPORT_CMAN
 #  include <libcman.h>
 cman_handle_t pcmk_cman_handle = NULL;
 #endif
 
 static char *pcmk_uname = NULL;
 static int pcmk_uname_len = 0;
 static uint32_t pcmk_nodeid = 0;
 int ais_membership_timer = 0;
 gboolean ais_membership_force = FALSE;
 int ais_dispatch(gpointer user_data);
 
 #define cs_repeat(counter, max, code) do {		\
 	code;						\
 	if(rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) {  \
 	    counter++;					\
 	    crm_debug("Retrying operation after %ds", counter);	\
 	    sleep(counter);				\
 	} else {                                        \
             break;                                      \
         }                                               \
     } while(counter < max)
 
 enum crm_ais_msg_types
 text2msg_type(const char *text)
 {
     int type = crm_msg_none;
 
     CRM_CHECK(text != NULL, return type);
     if (safe_str_eq(text, "ais")) {
         type = crm_msg_ais;
     } else if (safe_str_eq(text, "crm_plugin")) {
         type = crm_msg_ais;
     } else if (safe_str_eq(text, CRM_SYSTEM_CIB)) {
         type = crm_msg_cib;
     } else if (safe_str_eq(text, CRM_SYSTEM_CRMD)) {
         type = crm_msg_crmd;
     } else if (safe_str_eq(text, CRM_SYSTEM_DC)) {
         type = crm_msg_crmd;
     } else if (safe_str_eq(text, CRM_SYSTEM_TENGINE)) {
         type = crm_msg_te;
     } else if (safe_str_eq(text, CRM_SYSTEM_PENGINE)) {
         type = crm_msg_pe;
     } else if (safe_str_eq(text, CRM_SYSTEM_LRMD)) {
         type = crm_msg_lrmd;
     } else if (safe_str_eq(text, CRM_SYSTEM_STONITHD)) {
         type = crm_msg_stonithd;
     } else if (safe_str_eq(text, "stonith-ng")) {
         type = crm_msg_stonith_ng;
     } else if (safe_str_eq(text, "attrd")) {
         type = crm_msg_attrd;
 
     } else {
         /* This will normally be a transient client rather than
          * a cluster daemon.  Set the type to the pid of the client
          */
         int scan_rc = sscanf(text, "%d", &type);
 
         if (scan_rc != 1) {
             /* Ensure its sane */
             type = crm_msg_none;
         }
     }
     return type;
 }
 
 char *
 get_ais_data(const AIS_Message * msg)
 {
     int rc = BZ_OK;
     char *uncompressed = NULL;
     unsigned int new_size = msg->size + 1;
 
     if (msg->is_compressed == FALSE) {
         crm_trace("Returning uncompressed message data");
         uncompressed = strdup(msg->data);
 
     } else {
         crm_trace("Decompressing message data");
         uncompressed = calloc(1, new_size);
 
         rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, (char *)msg->data,
                                         msg->compressed_size, 1, 0);
 
         CRM_ASSERT(rc == BZ_OK);
         CRM_ASSERT(new_size == msg->size);
     }
 
     return uncompressed;
 }
 
 #if SUPPORT_COROSYNC
 int ais_fd_sync = -1;
 int ais_fd_async = -1;          /* never send messages via this channel */
 void *ais_ipc_ctx = NULL;
 
 hdb_handle_t ais_ipc_handle = 0;
 static char *ais_cluster_name = NULL;
 
 gboolean
 get_ais_nodeid(uint32_t * id, char **uname)
 {
     struct iovec iov;
     int retries = 0;
     int rc = CS_OK;
     cs_ipc_header_response_t header;
     struct crm_ais_nodeid_resp_s answer;
 
     header.error = CS_OK;
     header.id = crm_class_nodeid;
     header.size = sizeof(cs_ipc_header_response_t);
 
     CRM_CHECK(id != NULL, return FALSE);
     CRM_CHECK(uname != NULL, return FALSE);
 
     iov.iov_base = &header;
     iov.iov_len = header.size;
 
   retry:
     errno = 0;
     rc = coroipcc_msg_send_reply_receive(ais_ipc_handle, &iov, 1, &answer, sizeof(answer));
     if (rc == CS_OK) {
         CRM_CHECK(answer.header.size == sizeof(struct crm_ais_nodeid_resp_s),
                   crm_err("Odd message: id=%d, size=%d, error=%d",
                           answer.header.id, answer.header.size, answer.header.error));
         CRM_CHECK(answer.header.id == crm_class_nodeid,
                   crm_err("Bad response id: %d", answer.header.id));
     }
 
     if ((rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) && retries < 20) {
         retries++;
         crm_info("Peer overloaded: Re-sending message (Attempt %d of 20)", retries);
         sleep(retries);         /* Proportional back off */
         goto retry;
     }
 
     if (rc != CS_OK) {
         crm_err("Sending nodeid request: FAILED (rc=%d): %s", rc, ais_error2text(rc));
         return FALSE;
 
     } else if (answer.header.error != CS_OK) {
         crm_err("Bad response from peer: (rc=%d): %s", rc, ais_error2text(rc));
         return FALSE;
     }
 
     crm_info("Server details: id=%u uname=%s cname=%s", answer.id, answer.uname, answer.cname);
 
     *id = answer.id;
     *uname = strdup(answer.uname);
     ais_cluster_name = strdup(answer.cname);
 
     return TRUE;
 }
 
 gboolean
 crm_get_cluster_name(char **cname)
 {
     CRM_CHECK(cname != NULL, return FALSE);
     if (ais_cluster_name) {
         *cname = strdup(ais_cluster_name);
         return TRUE;
     }
     return FALSE;
 }
 
 gboolean
 send_ais_text(int class, const char *data,
               gboolean local, const char *node, enum crm_ais_msg_types dest)
 {
     static int msg_id = 0;
     static int local_pid = 0;
     enum cluster_type_e cluster_type = get_cluster_type();
 
     int retries = 0;
     int rc = CS_OK;
     int buf_len = sizeof(cs_ipc_header_response_t);
 
     char *buf = NULL;
     struct iovec iov;
     const char *transport = "pcmk";
     cs_ipc_header_response_t *header = NULL;
     AIS_Message *ais_msg = NULL;
     enum crm_ais_msg_types sender = text2msg_type(crm_system_name);
 
     /* There are only 6 handlers registered to crm_lib_service in plugin.c */
     CRM_CHECK(class < 6, crm_err("Invalid message class: %d", class); return FALSE);
 
     if (data == NULL) {
         data = "";
     }
 
     if (local_pid == 0) {
         local_pid = getpid();
     }
 
     if (sender == crm_msg_none) {
         sender = local_pid;
     }
 
     ais_msg = calloc(1, sizeof(AIS_Message));
 
     ais_msg->id = msg_id++;
     ais_msg->header.id = class;
     ais_msg->header.error = CS_OK;
 
     ais_msg->host.type = dest;
     ais_msg->host.local = local;
     if (node) {
         ais_msg->host.size = strlen(node);
         memset(ais_msg->host.uname, 0, MAX_NAME);
         memcpy(ais_msg->host.uname, node, ais_msg->host.size);
         ais_msg->host.id = 0;
 
     } else {
         ais_msg->host.size = 0;
         memset(ais_msg->host.uname, 0, MAX_NAME);
         ais_msg->host.id = 0;
     }
 
     ais_msg->sender.id = 0;
     ais_msg->sender.type = sender;
     ais_msg->sender.pid = local_pid;
     ais_msg->sender.size = pcmk_uname_len;
     memset(ais_msg->sender.uname, 0, MAX_NAME);
     memcpy(ais_msg->sender.uname, pcmk_uname, ais_msg->sender.size);
 
     ais_msg->size = 1 + strlen(data);
 
     if (ais_msg->size < CRM_BZ2_THRESHOLD) {
   failback:
         ais_msg = realloc(ais_msg, sizeof(AIS_Message) + ais_msg->size);
         memcpy(ais_msg->data, data, ais_msg->size);
 
     } else {
         char *compressed = NULL;
         char *uncompressed = strdup(data);
         unsigned int len = (ais_msg->size * 1.1) + 600; /* recomended size */
 
         crm_trace("Compressing message payload");
         compressed = malloc( len);
 
         rc = BZ2_bzBuffToBuffCompress(compressed, &len, uncompressed, ais_msg->size, CRM_BZ2_BLOCKS,
                                       0, CRM_BZ2_WORK);
 
         free(uncompressed);
 
         if (rc != BZ_OK) {
             crm_err("Compression failed: %d", rc);
             free(compressed);
             goto failback;
         }
 
         ais_msg = realloc(ais_msg, sizeof(AIS_Message) + len + 1);
         memcpy(ais_msg->data, compressed, len);
         ais_msg->data[len] = 0;
         free(compressed);
 
         ais_msg->is_compressed = TRUE;
         ais_msg->compressed_size = len;
 
         crm_trace("Compression details: %d -> %d", ais_msg->size, ais_data_len(ais_msg));
     }
 
     ais_msg->header.size = sizeof(AIS_Message) + ais_data_len(ais_msg);
 
     crm_trace("Sending%s message %d to %s.%s (data=%d, total=%d)",
               ais_msg->is_compressed ? " compressed" : "",
               ais_msg->id, ais_dest(&(ais_msg->host)), msg_type2text(dest),
               ais_data_len(ais_msg), ais_msg->header.size);
 
     iov.iov_base = ais_msg;
     iov.iov_len = ais_msg->header.size;
     buf = realloc(buf, buf_len);
 
     do {
         if (rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) {
             retries++;
             crm_info("Peer overloaded or membership in flux:"
                      " Re-sending message (Attempt %d of 20)", retries);
             sleep(retries);     /* Proportional back off */
         }
 
         errno = 0;
         switch (cluster_type) {
             case pcmk_cluster_corosync:
                 CRM_ASSERT(FALSE/*Not supported here*/);
                 break;
 
             case pcmk_cluster_classic_ais:
                 rc = coroipcc_msg_send_reply_receive(ais_ipc_handle, &iov, 1, buf, buf_len);
                 header = (cs_ipc_header_response_t *) buf;
                 if (rc == CS_OK) {
                     CRM_CHECK(header->size == sizeof(cs_ipc_header_response_t),
                               crm_err("Odd message: id=%d, size=%d, class=%d, error=%d",
                                       header->id, header->size, class, header->error));
 
                     CRM_ASSERT(buf_len >= header->size);
                     CRM_CHECK(header->id == CRM_MESSAGE_IPC_ACK,
                               crm_err("Bad response id (%d) for request (%d)", header->id,
                                       ais_msg->header.id));
                     CRM_CHECK(header->error == CS_OK, rc = header->error);
                 }
                 break;
 
             case pcmk_cluster_cman:
                 transport = "cpg";
                 CRM_CHECK(dest != crm_msg_ais, rc = CS_ERR_MESSAGE_ERROR; goto bail);
                 rc = cpg_mcast_joined(pcmk_cpg_handle, CPG_TYPE_AGREED, &iov, 1);
                 if (rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) {
                     cpg_flow_control_state_t fc_state = CPG_FLOW_CONTROL_DISABLED;
                     int rc2 = cpg_flow_control_state_get(pcmk_cpg_handle, &fc_state);
 
                     if (rc2 == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) {
                         crm_warn("Connection overloaded, cannot send messages");
                         goto bail;
 
                     } else if (rc2 != CS_OK) {
                         crm_warn("Could not determin the connection state: %s (%d)",
                                  ais_error2text(rc2), rc2);
                         goto bail;
                     }
                 }
                 break;
 
             case pcmk_cluster_unknown:
             case pcmk_cluster_invalid:
             case pcmk_cluster_heartbeat:
                 CRM_ASSERT(is_openais_cluster());
                 break;
         }
 
     } while ((rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) && retries < 20);
 
   bail:
     if (rc != CS_OK) {
         crm_perror(LOG_ERR, "Sending message %d via %s: FAILED (rc=%d): %s",
                    ais_msg->id, transport, rc, ais_error2text(rc));
 
     } else {
         crm_trace("Message %d: sent", ais_msg->id);
     }
 
     free(buf);
     free(ais_msg);
     return (rc == CS_OK);
 }
 
 gboolean
 send_ais_message(xmlNode * msg, gboolean local, const char *node, enum crm_ais_msg_types dest)
 {
     gboolean rc = TRUE;
     char *data = NULL;
 
     if (is_classic_ais_cluster()) {
         if (ais_fd_async < 0) {
             crm_err("Not connected to AIS: %d", ais_fd_async);
             return FALSE;
         }
     }
 
     data = dump_xml_unformatted(msg);
     rc = send_ais_text(crm_class_cluster, data, local, node, dest);
     free(data);
     return rc;
 }
 
 void
 terminate_ais_connection(void)
 {
     crm_notice("Disconnecting from Corosync");
 
     if (is_classic_ais_cluster()) {
         if(ais_ipc_handle) {
             crm_trace("Disconnecting plugin");
             coroipcc_service_disconnect(ais_ipc_handle);
             ais_ipc_handle = 0;
         } else {
             crm_info("No plugin connection");
         }
 
     } else {
         if(pcmk_cpg_handle) {
             crm_trace("Disconnecting CPG");
             cpg_leave(pcmk_cpg_handle, &pcmk_cpg_group);
             cpg_finalize(pcmk_cpg_handle);
             pcmk_cpg_handle = 0;
 
         } else {
             crm_info("No CPG connection");
         }
     }
 
 #  if SUPPORT_CMAN
     if (is_cman_cluster()) {
         if(pcmk_cman_handle) {
             crm_trace("Disconnecting cman");
             cman_stop_notification(pcmk_cman_handle);
             cman_finish(pcmk_cman_handle);
 
         } else {
             crm_info("No cman connection");
         }
     }
 #  endif
     ais_fd_async = -1;
     ais_fd_sync = -1;
 }
 
 
 static crm_node_t *
 crm_update_ais_node(xmlNode * member, long long seq)
 {
     const char *id_s = crm_element_value(member, "id");
     const char *addr = crm_element_value(member, "addr");
     const char *uname = crm_element_value(member, "uname");
     const char *state = crm_element_value(member, "state");
     const char *born_s = crm_element_value(member, "born");
     const char *seen_s = crm_element_value(member, "seen");
     const char *votes_s = crm_element_value(member, "votes");
     const char *procs_s = crm_element_value(member, "processes");
 
     int votes = crm_int_helper(votes_s, NULL);
     unsigned int id = crm_int_helper(id_s, NULL);
     unsigned int procs = crm_int_helper(procs_s, NULL);
 
     /* TODO: These values will contain garbage if version < 0.7.1 */
     uint64_t born = crm_int_helper(born_s, NULL);
     uint64_t seen = crm_int_helper(seen_s, NULL);
 
     return crm_update_peer(__FUNCTION__, id, born, seen, votes, procs, uname, uname, addr, state);
 }
 
 static gboolean
 ais_dispatch_message(AIS_Message * msg, gboolean(*dispatch) (AIS_Message *, char *, int))
 {
     char *data = NULL;
     char *uncompressed = NULL;
 
     xmlNode *xml = NULL;
 
     CRM_ASSERT(msg != NULL);
 
     crm_trace("Got new%s message (size=%d, %d, %d)",
               msg->is_compressed ? " compressed" : "",
               ais_data_len(msg), msg->size, msg->compressed_size);
 
     data = msg->data;
     if (msg->is_compressed && msg->size > 0) {
         int rc = BZ_OK;
         unsigned int new_size = msg->size + 1;
 
         if (check_message_sanity(msg, NULL) == FALSE) {
             goto badmsg;
         }
 
         crm_trace("Decompressing message data");
         uncompressed = calloc(1, new_size);
         rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, data, msg->compressed_size, 1, 0);
 
         if (rc != BZ_OK) {
             crm_err("Decompression failed: %d", rc);
             goto badmsg;
         }
 
         CRM_ASSERT(rc == BZ_OK);
         CRM_ASSERT(new_size == msg->size);
 
         data = uncompressed;
 
     } else if (check_message_sanity(msg, data) == FALSE) {
         goto badmsg;
 
     } else if (safe_str_eq("identify", data)) {
         int pid = getpid();
         char *pid_s = crm_itoa(pid);
 
         send_ais_text(crm_class_cluster, pid_s, TRUE, NULL, crm_msg_ais);
         free(pid_s);
         goto done;
     }
 
     if (msg->header.id != crm_class_members) {
         crm_update_peer(__FUNCTION__, msg->sender.id, 0, 0, 0, 0, msg->sender.uname, msg->sender.uname, NULL,
                         NULL);
     }
 
     if (msg->header.id == crm_class_rmpeer) {
         uint32_t id = crm_int_helper(data, NULL);
 
         crm_info("Removing peer %s/%u", data, id);
         reap_crm_member(id);
         goto done;
 
     } else if (is_classic_ais_cluster()) { 
         if (msg->header.id == crm_class_members || msg->header.id == crm_class_quorum) {
             xmlNode *node = NULL;
             const char *value = NULL;
             gboolean quorate = FALSE;
 
             xml = string2xml(data);
             if (xml == NULL) {
                 crm_err("Invalid membership update: %s", data);
                 goto badmsg;
             }
 
             value = crm_element_value(xml, "quorate");
             CRM_CHECK(value != NULL, crm_log_xml_err(xml, "No quorum value:"); goto badmsg);
             if (crm_is_true(value)) {
                 quorate = TRUE;
             }
 
             value = crm_element_value(xml, "id");
             CRM_CHECK(value != NULL, crm_log_xml_err(xml, "No membership id"); goto badmsg);
             crm_peer_seq = crm_int_helper(value, NULL);
 
             if (quorate != crm_have_quorum) {
                 crm_notice("Membership %s: quorum %s", value, quorate ? "acquired" : "lost");
                 crm_have_quorum = quorate;
 
             } else {
                 crm_info("Membership %s: quorum %s", value, quorate ? "retained" : "still lost");
             }
 
             for (node = __xml_first_child(xml); node != NULL; node = __xml_next(node)) {
                 crm_update_ais_node(node, crm_peer_seq);
             }
         }
     }
 
     crm_trace("Payload: %s", data);
     if (dispatch != NULL) {
         dispatch(msg, data, 0);
     }
 
   done:
     free(uncompressed);
     free_xml(xml);
     return TRUE;
 
   badmsg:
     crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
             " min=%d, total=%d, size=%d, bz2_size=%d",
             msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
             ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
             msg->sender.pid, (int)sizeof(AIS_Message),
             msg->header.size, msg->size, msg->compressed_size);
     goto done;
 }
 
 int
 ais_dispatch(gpointer user_data)
 {
     int rc = CS_OK;
     gboolean good = TRUE;
 
     gboolean(*dispatch) (AIS_Message *, char *, int) = user_data;
 
     do {
         char *buffer = NULL;
 
         rc = coroipcc_dispatch_get(ais_ipc_handle, (void **)&buffer, 0);
         if (rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) {
             return 0;
         }
         if (rc != CS_OK) {
             crm_perror(LOG_ERR, "Receiving message body failed: (%d) %s", rc, ais_error2text(rc));
             return -1;
         }
         if (buffer == NULL) {
             /* NULL is a legal "no message afterall" value */
             return 0;
         }
         good = ais_dispatch_message((AIS_Message *) buffer, dispatch);
         coroipcc_dispatch_put(ais_ipc_handle);
 
     } while (good && ais_ipc_handle);
 
     if(good) {
         return 0;
     }
 
     return -1;
 }
 
 static void
 ais_destroy(gpointer user_data)
 {
     crm_err("AIS connection terminated");
     ais_fd_sync = -1;
     exit(1);
 }
 
 
 #  if SUPPORT_CMAN
 
 static int
 pcmk_cman_dispatch(gpointer user_data)
 {
     int rc = cman_dispatch(pcmk_cman_handle, CMAN_DISPATCH_ALL);
 
     if (rc < 0) {
         crm_err("Connection to cman failed: %d", rc);
         return FALSE;
     }
     return TRUE;
 }
 
 #    define MAX_NODES 256
 
 static void
 cman_event_callback(cman_handle_t handle, void *privdata, int reason, int arg)
 {
     int rc = 0, lpc = 0, node_count = 0;
 
     cman_cluster_t cluster;
     static cman_node_t cman_nodes[MAX_NODES];
 
     gboolean(*dispatch) (unsigned long long, gboolean) = privdata;
 
     switch (reason) {
         case CMAN_REASON_STATECHANGE:
 
             memset(&cluster, 0, sizeof(cluster));
             rc = cman_get_cluster(pcmk_cman_handle, &cluster);
             if (rc < 0) {
                 crm_err("Couldn't query cman cluster details: %d %d", rc, errno);
                 return;
             }
 
             crm_peer_seq = cluster.ci_generation;
             if (arg != crm_have_quorum) {
                 crm_notice("Membership %llu: quorum %s", crm_peer_seq, arg ? "acquired" : "lost");
                 crm_have_quorum = arg;
 
             } else {
                 crm_info("Membership %llu: quorum %s", crm_peer_seq,
                          arg ? "retained" : "still lost");
             }
 
             rc = cman_get_nodes(pcmk_cman_handle, MAX_NODES, &node_count, cman_nodes);
             if (rc < 0) {
                 crm_err("Couldn't query cman node list: %d %d", rc, errno);
                 return;
             }
 
             for (lpc = 0; lpc < node_count; lpc++) {
                 if (cman_nodes[lpc].cn_nodeid == 0) {
                     /* Never allow node ID 0 to be considered a member #315711 */
                     cman_nodes[lpc].cn_member = 0;
                 }
                 crm_update_peer(__FUNCTION__, cman_nodes[lpc].cn_nodeid, cman_nodes[lpc].cn_incarnation,
                                 cman_nodes[lpc].cn_member ? crm_peer_seq : 0, 0, 0,
                                 cman_nodes[lpc].cn_name, cman_nodes[lpc].cn_name, NULL,
                                 cman_nodes[lpc].cn_member ? CRM_NODE_MEMBER : CRM_NODE_LOST);
             }
 
             if (dispatch) {
                 dispatch(crm_peer_seq, crm_have_quorum);
             }
             break;
 
         case CMAN_REASON_TRY_SHUTDOWN:
             /* Always reply with a negative - pacemaker needs to be stopped first */
             crm_info("CMAN wants to shut down: %s", arg ? "forced" : "optional");
             cman_replyto_shutdown(pcmk_cman_handle, 0);
             break;
 
         case CMAN_REASON_CONFIG_UPDATE:
             /* Ignore */
             break;
     }
 }
 #  endif
 
 gboolean
 init_cman_connection(gboolean(*dispatch) (unsigned long long, gboolean), void (*destroy) (gpointer))
 {
 #  if SUPPORT_CMAN
     int rc = -1, fd = -1;
     cman_cluster_t cluster;
     struct mainloop_fd_callbacks cman_fd_callbacks = {
         .dispatch = pcmk_cman_dispatch,
         .destroy = destroy,
     };
 
     crm_info("Configuring Pacemaker to obtain quorum from cman");
 
     memset(&cluster, 0, sizeof(cluster));
 
     pcmk_cman_handle = cman_init(dispatch);
     if (pcmk_cman_handle == NULL || cman_is_active(pcmk_cman_handle) == FALSE) {
         crm_err("Couldn't connect to cman");
         goto cman_bail;
     }
 
     rc = cman_get_cluster(pcmk_cman_handle, &cluster);
     if (rc < 0) {
         crm_err("Couldn't query cman cluster details: %d %d", rc, errno);
         goto cman_bail;
     }
     ais_cluster_name = strdup(cluster.ci_name);
 
     rc = cman_start_notification(pcmk_cman_handle, cman_event_callback);
     if (rc < 0) {
         crm_err("Couldn't register for cman notifications: %d %d", rc, errno);
         goto cman_bail;
     }
 
     /* Get the current membership state */
     cman_event_callback(pcmk_cman_handle, dispatch, CMAN_REASON_STATECHANGE,
                         cman_is_quorate(pcmk_cman_handle));
 
     fd = cman_get_fd(pcmk_cman_handle);
 
     mainloop_add_fd("cman", fd, dispatch, &cman_fd_callbacks);
 
   cman_bail:
     if (rc < 0) {
         cman_finish(pcmk_cman_handle);
         return FALSE;
     }
 #  else
     crm_err("cman qorum is not supported in this build");
     exit(100);
 #  endif
     return TRUE;
 }
 
 #  ifdef SUPPORT_COROSYNC
 gboolean(*pcmk_cpg_dispatch_fn) (AIS_Message *, char *, int) = NULL;
 
 static int
 pcmk_cpg_dispatch(gpointer user_data)
 {
     int rc = 0;
 
     pcmk_cpg_dispatch_fn = user_data;
     rc = cpg_dispatch(pcmk_cpg_handle, CS_DISPATCH_ALL);
     if (rc != CS_OK) {
         crm_err("Connection to the CPG API failed: %d", rc);
         return -1;
     }
     return 0;
 }
 
 static void
 pcmk_cpg_deliver(cpg_handle_t handle,
                  const struct cpg_name *groupName,
                  uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
 {
     AIS_Message *ais_msg = (AIS_Message *) msg;
 
     if (ais_msg->sender.id > 0 && ais_msg->sender.id != nodeid) {
         crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, ais_msg->sender.id);
         return;
 
     } else if (ais_msg->host.size != 0 && safe_str_neq(ais_msg->host.uname, pcmk_uname)) {
         /* Not for us */
         return;
     }
 
     ais_msg->sender.id = nodeid;
     if (ais_msg->sender.size == 0) {
         crm_node_t *peer = crm_get_peer(nodeid, NULL);
 
         if (peer == NULL) {
             crm_err("Peer with nodeid=%u is unknown", nodeid);
 
         } else if (peer->uname == NULL) {
             crm_err("No uname for peer with nodeid=%u", nodeid);
 
         } else {
             crm_notice("Fixing uname for peer with nodeid=%u", nodeid);
             ais_msg->sender.size = strlen(peer->uname);
             memset(ais_msg->sender.uname, 0, MAX_NAME);
             memcpy(ais_msg->sender.uname, peer->uname, ais_msg->sender.size);
         }
     }
 
     ais_dispatch_message(ais_msg, pcmk_cpg_dispatch_fn);
 }
 
 static void
 pcmk_cpg_membership(cpg_handle_t handle,
                     const struct cpg_name *groupName,
                     const struct cpg_address *member_list, size_t member_list_entries,
                     const struct cpg_address *left_list, size_t left_list_entries,
                     const struct cpg_address *joined_list, size_t joined_list_entries)
 {
     int i;
 
     for (i = 0; i < member_list_entries; i++) {
         crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL);
         crm_debug("Member[%d] %d ", i, member_list[i].nodeid);
         crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS);
     }
 
     for (i = 0; i < left_list_entries; i++) {
         crm_node_t *peer = crm_get_peer(left_list[i].nodeid, NULL);
         crm_debug("Left[%d] %d ", i, left_list[i].nodeid);
         crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, OFFLINESTATUS);
     }
 }
 
 cpg_callbacks_t cpg_callbacks = {
     .cpg_deliver_fn = pcmk_cpg_deliver,
     .cpg_confchg_fn = pcmk_cpg_membership,
 };
 #  endif
 
 static gboolean
 init_cpg_connection(gboolean(*dispatch) (AIS_Message *, char *, int), void (*destroy) (gpointer),
                     uint32_t * nodeid)
 {
 #  ifdef SUPPORT_COROSYNC
     int rc = -1;
     int fd = 0;
     int retries = 0;
     crm_node_t *peer = NULL;
     struct mainloop_fd_callbacks cpg_fd_callbacks = {
         .dispatch = pcmk_cpg_dispatch,
         .destroy = destroy,
     };    
 
     strcpy(pcmk_cpg_group.value, crm_system_name);
     pcmk_cpg_group.length = strlen(crm_system_name) + 1;
 
     cs_repeat(retries, 30, rc = cpg_initialize(&pcmk_cpg_handle, &cpg_callbacks));
     if (rc != CS_OK) {
         crm_err("Could not connect to the Cluster Process Group API: %d\n", rc);
         goto bail;
     }
 
     retries = 0;
     cs_repeat(retries, 30, rc = cpg_local_get(pcmk_cpg_handle, (unsigned int *)nodeid));
     if (rc != CS_OK) {
         crm_err("Could not get local node id from the CPG API");
         goto bail;
     }
 
     retries = 0;
     cs_repeat(retries, 30, rc = cpg_join(pcmk_cpg_handle, &pcmk_cpg_group));
     if (rc != CS_OK) {
         crm_err("Could not join the CPG group '%s': %d", crm_system_name, rc);
         goto bail;
     }
 
     rc = cpg_fd_get(pcmk_cpg_handle, &fd);
     if (rc != CS_OK) {
         crm_err("Could not obtain the CPG API connection: %d\n", rc);
         goto bail;
     }
 
     mainloop_add_fd("corosync-cpg", fd, dispatch, &cpg_fd_callbacks);
 
   bail:
     if (rc != CS_OK) {
         cpg_finalize(pcmk_cpg_handle);
         return FALSE;
     }
 
     peer = crm_get_peer(pcmk_nodeid, pcmk_uname);
     crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS);
 
 #  else
     crm_err("The Corosync CPG API is not supported in this build");
     exit(100);
 #  endif
     return TRUE;
 }
 
 gboolean
 init_quorum_connection(gboolean(*dispatch) (unsigned long long, gboolean),
                        void (*destroy) (gpointer))
 {
     crm_err("The Corosync quorum API is not supported in this build");
     exit(100);
     return TRUE;
 }
 
 static gboolean
 init_ais_connection_classic(gboolean(*dispatch) (AIS_Message *, char *, int),
                             void (*destroy) (gpointer), char **our_uuid, char **our_uname,
                             int *nodeid)
 {
     int rc;
     int pid = 0;
     char *pid_s = NULL;
     struct utsname name;
     struct mainloop_fd_callbacks ais_fd_callbacks = {
         .dispatch = ais_dispatch,
         .destroy = destroy,
     };
 
     crm_info("Creating connection to our Corosync plugin");
     rc = coroipcc_service_connect(COROSYNC_SOCKET_NAME, PCMK_SERVICE_ID,
                                   AIS_IPC_MESSAGE_SIZE, AIS_IPC_MESSAGE_SIZE, AIS_IPC_MESSAGE_SIZE,
                                   &ais_ipc_handle);
     if (ais_ipc_handle) {
         coroipcc_fd_get(ais_ipc_handle, &ais_fd_async);
     } else {
         crm_info("Connection to our AIS plugin (%d) failed: %s (%d)",
                  PCMK_SERVICE_ID, strerror(errno), errno);
         return FALSE;
     }
     if (ais_fd_async <= 0 && rc == CS_OK) {
         crm_err("No context created, but connection reported 'ok'");
         rc = CS_ERR_LIBRARY;
     }
     if (rc != CS_OK) {
         crm_info("Connection to our AIS plugin (%d) failed: %s (%d)", PCMK_SERVICE_ID,
                  ais_error2text(rc), rc);
     }
 
     if (rc != CS_OK) {
         return FALSE;
     }
 
     if (destroy == NULL) {
         destroy = ais_destroy;
     }
 
     mainloop_add_fd("corosync-plugin", ais_fd_async, dispatch, &ais_fd_callbacks);
     crm_info("AIS connection established");
 
     pid = getpid();
     pid_s = crm_itoa(pid);
     send_ais_text(crm_class_cluster, pid_s, TRUE, NULL, crm_msg_ais);
     free(pid_s);
 
     if (uname(&name) < 0) {
         crm_perror(LOG_ERR, "Could not determin the current host");
         exit(100);
     }
 
     get_ais_nodeid(&pcmk_nodeid, &pcmk_uname);
     if (safe_str_neq(name.nodename, pcmk_uname)) {
         crm_crit("Node name mismatch!  OpenAIS supplied %s, our lookup returned %s",
                  pcmk_uname, name.nodename);
         crm_notice
             ("Node name mismatches usually occur when assigned automatically by DHCP servers");
         crm_notice("If this node was part of the cluster with a different name,"
                    " you will need to remove the old entry with crm_node --remove");
     }
     return TRUE;
 }
 
 
 static int
 pcmk_mcp_dispatch(const char *buffer, ssize_t length, gpointer userdata)
 {
     xmlNode *msg = string2xml(buffer);
 
     if (msg && is_classic_ais_cluster()) {
         xmlNode *node = NULL;
         
         for (node = __xml_first_child(msg); node != NULL; node = __xml_next(node)) {
             int id = 0;
             int children = 0;
             const char *uname = crm_element_value(node, "uname");
             
             crm_element_value_int(node, "id", &id);
             crm_element_value_int(node, "processes", &children);
             if (id == 0) {
                 crm_log_xml_err(msg, "Bad Update");
             } else {
                 crm_update_peer(__FUNCTION__, id, 0, 0, 0, children, NULL, uname, NULL, NULL);
             }
         }
     }
 
     free_xml(msg);
     return 0;
 }
 
 static void
 pcmk_mcp_destroy(gpointer user_data)
 {
     void (*callback)(gpointer data) = user_data;
     if(callback) {
         callback(NULL);
     }
 }
 
 gboolean
 init_ais_connection(gboolean(*dispatch) (AIS_Message *, char *, int), void (*destroy) (gpointer),
                     char **our_uuid, char **our_uname, int *nodeid)
 {
     int retries = 0;
     static struct ipc_client_callbacks mcp_callbacks = 
         {
             .dispatch = pcmk_mcp_dispatch,
             .destroy = pcmk_mcp_destroy
         };
 
     while (retries++ < 30) {
         int rc = init_ais_connection_once(dispatch, destroy, our_uuid, our_uname, nodeid);
 
         switch (rc) {
             case CS_OK:
                 if (getenv("HA_mcp")) {
                     xmlNode *poke = create_xml_node(NULL, "poke");
                     mainloop_io_t *ipc = mainloop_add_ipc_client(CRM_SYSTEM_MCP, 0, destroy, &mcp_callbacks);
                     crm_ipc_send(mainloop_get_ipc_client(ipc), poke, NULL, 0);
                     free_xml(poke);
                 }
                 return TRUE;
                 break;
             case CS_ERR_TRY_AGAIN:
             case CS_ERR_QUEUE_FULL:
                 break;
             default:
                 return FALSE;
         }
     }
 
     crm_err("Retry count exceeded: %d", retries);
     return FALSE;
 }
 
 static char *
 get_local_node_name(void)
 {
     char *name = NULL;
     struct utsname res;
 
     if (is_cman_cluster()) {
 #  if SUPPORT_CMAN
         cman_node_t us;
         cman_handle_t cman;
 
         cman = cman_init(NULL);
         if (cman != NULL && cman_is_active(cman)) {
             us.cn_name[0] = 0;
             cman_get_node(cman, CMAN_NODEID_US, &us);
             name = strdup(us.cn_name);
             crm_info("Using CMAN node name: %s", name);
 
         } else {
             crm_err("Couldn't determin node name from CMAN");
         }
 
         cman_finish(cman);
 #  endif
 
     } else if (uname(&res) < 0) {
         crm_perror(LOG_ERR, "Could not determin the current host");
         exit(100);
 
     } else {
         name = strdup(res.nodename);
     }
     return name;
 }
 
 extern int set_cluster_type(enum cluster_type_e type);
 
 gboolean
 init_ais_connection_once(gboolean(*dispatch) (AIS_Message *, char *, int),
                          void (*destroy) (gpointer), char **our_uuid, char **our_uname, int *nodeid)
 {
     enum cluster_type_e stack = get_cluster_type();
 
     crm_peer_init();
 
     /* Here we just initialize comms */
     switch (stack) {
         case pcmk_cluster_classic_ais:
             if (init_ais_connection_classic(dispatch, destroy, our_uuid, &pcmk_uname, nodeid) ==
                 FALSE) {
                 return FALSE;
             }
             break;
         case pcmk_cluster_cman:
             if (init_cpg_connection(dispatch, destroy, &pcmk_nodeid) == FALSE) {
                 return FALSE;
             }
             pcmk_uname = get_local_node_name();
             break;
         case pcmk_cluster_heartbeat:
             crm_info("Could not find an active corosync based cluster");
             return FALSE;
             break;
         default:
             crm_err("Invalid cluster type: %s (%d)", name_for_cluster_type(stack), stack);
             return FALSE;
             break;
     }
 
     crm_info("Connection to '%s': established", name_for_cluster_type(stack));
 
     CRM_ASSERT(pcmk_uname != NULL);
     pcmk_uname_len = strlen(pcmk_uname);
 
     if (pcmk_nodeid != 0) {
         /* Ensure the local node always exists */
         crm_update_peer(__FUNCTION__, pcmk_nodeid, 0, 0, 0, 0, pcmk_uname, pcmk_uname, NULL, NULL);
     }
 
     if (our_uuid != NULL) {
         *our_uuid = get_corosync_uuid(pcmk_nodeid, pcmk_uname);
     }
 
     if (our_uname != NULL) {
         *our_uname = strdup(pcmk_uname);
     }
 
     if (nodeid != NULL) {
         *nodeid = pcmk_nodeid;
     }
 
     return TRUE;
 }
 
 gboolean
 check_message_sanity(const AIS_Message * msg, const char *data)
 {
     gboolean sane = TRUE;
     gboolean repaired = FALSE;
     int dest = msg->host.type;
     int tmp_size = msg->header.size - sizeof(AIS_Message);
 
     if (sane && msg->header.size == 0) {
         crm_warn("Message with no size");
         sane = FALSE;
     }
 
     if (sane && msg->header.error != CS_OK) {
         crm_warn("Message header contains an error: %d", msg->header.error);
         sane = FALSE;
     }
 
     if (sane && ais_data_len(msg) != tmp_size) {
         crm_warn("Message payload size is incorrect: expected %d, got %d", ais_data_len(msg),
                  tmp_size);
         sane = TRUE;
     }
 
     if (sane && ais_data_len(msg) == 0) {
         crm_warn("Message with no payload");
         sane = FALSE;
     }
 
     if (sane && data && msg->is_compressed == FALSE) {
         int str_size = strlen(data) + 1;
 
         if (ais_data_len(msg) != str_size) {
             int lpc = 0;
 
             crm_warn("Message payload is corrupted: expected %d bytes, got %d",
                      ais_data_len(msg), str_size);
             sane = FALSE;
             for (lpc = (str_size - 10); lpc < msg->size; lpc++) {
                 if (lpc < 0) {
                     lpc = 0;
                 }
                 crm_debug("bad_data[%d]: %d / '%c'", lpc, data[lpc], data[lpc]);
             }
         }
     }
 
     if (sane == FALSE) {
         crm_err("Invalid message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)",
                 msg->id, ais_dest(&(msg->host)), msg_type2text(dest),
                 ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
                 msg->sender.pid, msg->is_compressed, ais_data_len(msg), msg->header.size);
 
     } else if (repaired) {
         crm_err
             ("Repaired message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)",
              msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)),
              msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed,
              ais_data_len(msg), msg->header.size);
     } else {
         crm_trace
             ("Verfied message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)",
              msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)),
              msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed,
              ais_data_len(msg), msg->header.size);
     }
 
     return sane;
 }
 #endif
 
 static int
 get_config_opt(confdb_handle_t config,
                hdb_handle_t object_handle, const char *key, char **value, const char *fallback)
 {
     size_t len = 0;
     char *env_key = NULL;
     const char *env_value = NULL;
     char buffer[256];
 
     if (*value) {
         free(*value);
         *value = NULL;
     }
 
     if (object_handle > 0) {
         if (CS_OK == confdb_key_get(config, object_handle, key, strlen(key), &buffer, &len)) {
             *value = strdup(buffer);
         }
     }
 
     if (*value) {
         crm_info("Found '%s' for option: %s", *value, key);
         return 0;
     }
 
     env_key = crm_concat("HA", key, '_');
     env_value = getenv(env_key);
     free(env_key);
 
     if (*value) {
         crm_info("Found '%s' in ENV for option: %s", *value, key);
         *value = strdup(env_value);
         return 0;
     }
 
     if (fallback) {
         crm_info("Defaulting to '%s' for option: %s", fallback, key);
         *value = strdup(fallback);
 
     } else {
         crm_info("No default for option: %s", key);
     }
 
     return -1;
 }
 
 static confdb_handle_t
 config_find_init(confdb_handle_t config)
 {
     cs_error_t rc = CS_OK;
     confdb_handle_t local_handle = OBJECT_PARENT_HANDLE;
 
     rc = confdb_object_find_start(config, local_handle);
     if (rc == CS_OK) {
         return local_handle;
     } else {
         crm_err("Couldn't create search context: %d", rc);
     }
     return 0;
 }
 
 static hdb_handle_t
 config_find_next(confdb_handle_t config, const char *name, confdb_handle_t top_handle)
 {
     cs_error_t rc = CS_OK;
     hdb_handle_t local_handle = 0;
 
     if (top_handle == 0) {
         crm_err("Couldn't search for %s: no valid context", name);
         return 0;
     }
 
     crm_trace("Searching for %s in " HDB_X_FORMAT, name, top_handle);
     rc = confdb_object_find(config, top_handle, name, strlen(name), &local_handle);
     if (rc != CS_OK) {
         crm_info("No additional configuration supplied for: %s", name);
         local_handle = 0;
     } else {
         crm_info("Processing additional %s options...", name);
     }
     return local_handle;
 }
 
 enum cluster_type_e
 find_corosync_variant(void)
 {
     confdb_handle_t config;
     enum cluster_type_e found = pcmk_cluster_unknown;
 
     int rc;
     char *value = NULL;
     confdb_handle_t top_handle = 0;
     hdb_handle_t local_handle = 0;
     static confdb_callbacks_t callbacks = { };
 
     rc = confdb_initialize(&config, &callbacks);
     if (rc != CS_OK) {
         crm_debug("Could not initialize Cluster Configuration Database API instance error %d", rc);
         return found;
     }
 
     top_handle = config_find_init(config);
     local_handle = config_find_next(config, "service", top_handle);
     while (local_handle) {
         get_config_opt(config, local_handle, "name", &value, NULL);
         if (safe_str_eq("pacemaker", value)) {
             found = pcmk_cluster_classic_ais;
 
             get_config_opt(config, local_handle, "ver", &value, "0");
             crm_trace("Found Pacemaker plugin version: %s", value);
             break;
         }
 
         local_handle = config_find_next(config, "service", top_handle);
     }
 
     if (found == pcmk_cluster_unknown) {
         top_handle = config_find_init(config);
         local_handle = config_find_next(config, "quorum", top_handle);
         get_config_opt(config, local_handle, "provider", &value, NULL);
 
         if (safe_str_eq("quorum_cman", value)) {
             crm_trace("Found CMAN quorum provider");
             found = pcmk_cluster_cman;
         }
     }
     free(value);
 
     confdb_finalize(config);
     return found;
 }
 
 gboolean
 crm_is_corosync_peer_active(const crm_node_t * node)
 {
     enum crm_proc_flag proc = crm_proc_none;
     if (node == NULL) {
         crm_trace("NULL");
         return FALSE;
 
     } else if(safe_str_neq(node->state, CRM_NODE_MEMBER)) {
         crm_trace("%s: state=%s", node->uname, node->state);
         return FALSE;
 
     } else if(is_cman_cluster() && (node->processes & crm_proc_cpg)) {
         /* If we can still talk to our peer process on that node,
          * then its also part of the corosync membership
          */
         crm_trace("%s: processes=%.16x", node->uname, node->processes);
         return TRUE;
 
     } else if(is_classic_ais_cluster() && (node->processes & crm_proc_plugin) == 0) {
         crm_trace("%s: processes=%.16x", node->uname, node->processes);
         return FALSE;
     }
 
     proc = text2proc(crm_system_name);
     if(proc != crm_proc_none && (node->processes & proc) == 0) {
         crm_trace("%s: proc %.16x not in %.16x", node->uname, proc, node->processes);
         return FALSE;
     }
 
     return TRUE;
 }