diff --git a/cts/agents/cpg_test_agent.c b/cts/agents/cpg_test_agent.c index f729ad2d..ea2e6481 100644 --- a/cts/agents/cpg_test_agent.c +++ b/cts/agents/cpg_test_agent.c @@ -1,804 +1,805 @@ /* * Copyright (c) 2010 Red Hat, Inc. * * All rights reserved. * * Author: Angus Salkeld (asalkeld@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "common_test_agent.h" #include #include typedef enum { MSG_OK, MSG_NODEID_ERR, MSG_PID_ERR, MSG_SEQ_ERR, MSG_SIZE_ERR, MSG_SHA1_ERR, } msg_status_t; typedef struct { uint32_t nodeid; pid_t pid; unsigned char sha1[20]; uint32_t seq; size_t size; unsigned char payload[0]; } msg_t; #define LOG_STR_SIZE 80 typedef struct { char log[LOG_STR_SIZE]; struct qb_list_head list; } log_entry_t; static char big_and_buf[HOW_BIG_AND_BUF]; static int32_t record_config_events_g = 0; static int32_t record_messages_g = 0; static cpg_handle_t cpg_handle = 0; static corosync_cfg_handle_t cfg_handle = 0; static int32_t cpg_fd = -1; static int32_t cfg_fd = -1; static struct qb_list_head config_chg_log_head; static struct qb_list_head msg_log_head; static pid_t my_pid; static uint32_t my_nodeid; static int32_t my_seq; static int32_t use_zcb = QB_FALSE; static int32_t my_msgs_to_send; static int32_t my_msgs_sent; static int32_t total_stored_msgs = 0; static int32_t total_msgs_revd = 0; static int32_t in_cnchg = 0; static int32_t pcmk_test = 0; PK11Context* sha1_context; static void send_some_more_messages (void * unused); static char* err_status_string (char * buf, size_t buf_len, msg_status_t status) { switch (status) { case MSG_OK: strncpy (buf, "OK", buf_len); break; case MSG_NODEID_ERR: strncpy (buf, "NODEID_ERR", buf_len); break; case MSG_PID_ERR: strncpy (buf, "PID_ERR", buf_len); break; case MSG_SEQ_ERR: strncpy (buf, "SEQ_ERR", buf_len); break; case MSG_SIZE_ERR: strncpy (buf, "SIZE_ERR", buf_len); break; case MSG_SHA1_ERR: strncpy (buf, "SHA1_ERR", buf_len); break; default: strncpy (buf, "UNKNOWN_ERR", buf_len); break; } if (buf_len > 0) { buf[buf_len - 1] = '\0'; } return buf; } static void delivery_callback ( cpg_handle_t handle, const struct cpg_name *groupName, uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len) { log_entry_t *log_pt; msg_t *msg_pt = (msg_t*)msg; msg_status_t status = MSG_OK; char status_buf[20]; unsigned char sha1_compare[20]; unsigned int sha1_len; if (record_messages_g == 0) { return; } if (nodeid != msg_pt->nodeid) { status = MSG_NODEID_ERR; } if (pid != msg_pt->pid) { status = MSG_PID_ERR; } if (msg_len != msg_pt->size) { status = MSG_SIZE_ERR; } PK11_DigestBegin(sha1_context); PK11_DigestOp(sha1_context, msg_pt->payload, (msg_pt->size - sizeof (msg_t))); PK11_DigestFinal(sha1_context, sha1_compare, &sha1_len, sizeof(sha1_compare)); if (memcmp (sha1_compare, msg_pt->sha1, 20) != 0) { qb_log (LOG_ERR, "msg seq:%d; incorrect hash", msg_pt->seq); status = MSG_SHA1_ERR; } log_pt = malloc (sizeof(log_entry_t)); qb_list_init (&log_pt->list); snprintf (log_pt->log, LOG_STR_SIZE, "%u:%d:%d:%s;", msg_pt->nodeid, msg_pt->seq, my_seq, err_status_string (status_buf, 20, status)); qb_list_add_tail (&log_pt->list, &msg_log_head); total_stored_msgs++; total_msgs_revd++; my_seq++; if ((total_msgs_revd % 1000) == 0) { qb_log (LOG_INFO, "rx %d", total_msgs_revd); } } static void config_change_callback ( cpg_handle_t handle, const struct cpg_name *groupName, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries) { int i; log_entry_t *log_pt; /* group_name,ip,pid,join|leave */ if (record_config_events_g > 0) { qb_log (LOG_INFO, "got cpg event[recording] for group %s", groupName->value); } else { qb_log (LOG_INFO, "got cpg event[ignoring] for group %s", groupName->value); } for (i = 0; i < left_list_entries; i++) { if (record_config_events_g > 0) { log_pt = malloc (sizeof(log_entry_t)); qb_list_init (&log_pt->list); snprintf (log_pt->log, LOG_STR_SIZE, "%s,%u,%u,left", groupName->value, left_list[i].nodeid,left_list[i].pid); qb_list_add_tail(&log_pt->list, &config_chg_log_head); qb_log (LOG_INFO, "cpg event %s", log_pt->log); } } for (i = 0; i < joined_list_entries; i++) { if (record_config_events_g > 0) { log_pt = malloc (sizeof(log_entry_t)); qb_list_init (&log_pt->list); snprintf (log_pt->log, LOG_STR_SIZE, "%s,%u,%u,join", groupName->value, joined_list[i].nodeid,joined_list[i].pid); qb_list_add_tail (&log_pt->list, &config_chg_log_head); qb_log (LOG_INFO, "cpg event %s", log_pt->log); } } if (pcmk_test == 1) { in_cnchg = 1; send_some_more_messages (NULL); in_cnchg = 0; } } static void my_shutdown_callback (corosync_cfg_handle_t handle, corosync_cfg_shutdown_flags_t flags) { qb_log (LOG_CRIT, "flags:%d", flags); if (flags == COROSYNC_CFG_SHUTDOWN_FLAG_REQUEST) { corosync_cfg_replyto_shutdown (cfg_handle, COROSYNC_CFG_SHUTDOWN_FLAG_YES); } } static corosync_cfg_callbacks_t cfg_callbacks = { .corosync_cfg_shutdown_callback = my_shutdown_callback, }; static cpg_callbacks_t callbacks = { .cpg_deliver_fn = delivery_callback, .cpg_confchg_fn = config_change_callback, }; static void record_messages (void) { record_messages_g = 1; qb_log (LOG_INFO, "record:%d", record_messages_g); } static void record_config_events (int sock) { char response[100]; ssize_t rc; size_t send_len; record_config_events_g = 1; qb_log (LOG_INFO, "record:%d", record_config_events_g); snprintf (response, 100, "%s", OK_STR); send_len = strlen (response); rc = send (sock, response, send_len, 0); assert(rc == send_len); } static void read_config_event (int sock) { const char *empty = "None"; - struct qb_list_head * list = config_chg_log_head.next; log_entry_t *entry; ssize_t rc; size_t send_len; - if (list != &config_chg_log_head) { - entry = qb_list_entry (list, log_entry_t, list); + if (qb_list_empty(&config_chg_log_head) == 0) { + entry = qb_list_first_entry (&config_chg_log_head, log_entry_t, list); send_len = strlen (entry->log); rc = send (sock, entry->log, send_len, 0); qb_list_del (&entry->list); free (entry); } else { qb_log (LOG_DEBUG, "no events in list"); send_len = strlen (empty); rc = send (sock, empty, send_len, 0); } assert(rc == send_len); } static void read_messages (int sock, char* atmost_str) { - struct qb_list_head * list; + struct qb_list_head *iter, *tmp_iter; + log_entry_t *entry; int atmost = atoi (atmost_str); int packed = 0; ssize_t rc; if (atmost == 0) atmost = 1; if (atmost > (HOW_BIG_AND_BUF / LOG_STR_SIZE)) atmost = (HOW_BIG_AND_BUF / LOG_STR_SIZE); big_and_buf[0] = '\0'; - for (list = msg_log_head.next; - (!qb_list_empty (&msg_log_head) && packed < atmost); ) { + qb_list_for_each_safe(iter, tmp_iter, &msg_log_head) { + + if (packed >= atmost) + break; - entry = qb_list_entry (list, log_entry_t, list); + entry = qb_list_entry (iter, log_entry_t, list); strcat (big_and_buf, entry->log); packed++; - list = list->next; qb_list_del (&entry->list); free (entry); total_stored_msgs--; } if (packed == 0) { strcpy (big_and_buf, "None"); } else { if ((total_stored_msgs % 1000) == 0) { qb_log(LOG_INFO, "sending %d; total_stored_msgs:%d; len:%d", packed, total_stored_msgs, (int)strlen (big_and_buf)); } } rc = send (sock, big_and_buf, strlen (big_and_buf), 0); assert(rc == strlen (big_and_buf)); } static qb_loop_timer_handle more_messages_timer_handle; static void send_some_more_messages_later (void) { cpg_dispatch (cpg_handle, CS_DISPATCH_ALL); qb_loop_timer_add ( ta_poll_handle_get(), QB_LOOP_MED, 300*QB_TIME_NS_IN_MSEC, NULL, send_some_more_messages, &more_messages_timer_handle); } static void send_some_more_messages_zcb (void) { msg_t *my_msg; int i; int send_now; size_t payload_size; size_t total_size; unsigned int sha1_len; cs_error_t res; cpg_flow_control_state_t fc_state; void *zcb_buffer; if (cpg_fd < 0) return; send_now = my_msgs_to_send; payload_size = (rand() % 100000); total_size = payload_size + sizeof (msg_t); cpg_zcb_alloc (cpg_handle, total_size, &zcb_buffer); my_msg = (msg_t*)zcb_buffer; qb_log(LOG_DEBUG, "send_now:%d", send_now); my_msg->pid = my_pid; my_msg->nodeid = my_nodeid; my_msg->size = sizeof (msg_t) + payload_size; my_msg->seq = my_msgs_sent; for (i = 0; i < payload_size; i++) { my_msg->payload[i] = i; } PK11_DigestBegin(sha1_context); PK11_DigestOp(sha1_context, my_msg->payload, payload_size); PK11_DigestFinal(sha1_context, my_msg->sha1, &sha1_len, sizeof(my_msg->sha1)); for (i = 0; i < send_now; i++) { res = cpg_flow_control_state_get (cpg_handle, &fc_state); if (res == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) { /* lets do this later */ send_some_more_messages_later (); qb_log (LOG_INFO, "flow control enabled."); goto free_buffer; } res = cpg_zcb_mcast_joined (cpg_handle, CPG_TYPE_AGREED, zcb_buffer, total_size); if (res == CS_ERR_TRY_AGAIN) { /* lets do this later */ send_some_more_messages_later (); goto free_buffer; } else if (res != CS_OK) { qb_log (LOG_ERR, "cpg_mcast_joined error:%d, exiting.", res); exit (-2); } my_msgs_sent++; my_msgs_to_send--; } free_buffer: cpg_zcb_free (cpg_handle, zcb_buffer); } #define cs_repeat(counter, max, code) do { \ code; \ if (res == CS_ERR_TRY_AGAIN) { \ counter++; \ sleep(counter); \ } \ } while (res == CS_ERR_TRY_AGAIN && counter < max) static unsigned char buffer[200000]; static void send_some_more_messages_normal (void) { msg_t my_msg; struct iovec iov[2]; int i; int send_now; size_t payload_size; cs_error_t res; cpg_flow_control_state_t fc_state; int retries = 0; time_t before; unsigned int sha1_len; if (cpg_fd < 0) return; send_now = my_msgs_to_send; qb_log (LOG_TRACE, "send_now:%d", send_now); my_msg.pid = my_pid; my_msg.nodeid = my_nodeid; payload_size = (rand() % 10000); my_msg.size = sizeof (msg_t) + payload_size; my_msg.seq = my_msgs_sent; for (i = 0; i < payload_size; i++) { buffer[i] = i; } PK11_DigestBegin(sha1_context); PK11_DigestOp(sha1_context, buffer, payload_size); PK11_DigestFinal(sha1_context, my_msg.sha1, &sha1_len, sizeof(my_msg.sha1)); iov[0].iov_len = sizeof (msg_t); iov[0].iov_base = &my_msg; iov[1].iov_len = payload_size; iov[1].iov_base = buffer; for (i = 0; i < send_now; i++) { if (in_cnchg && pcmk_test) { retries = 0; before = time(NULL); cs_repeat(retries, 30, res = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, iov, 2)); if (retries > 20) { qb_log (LOG_ERR, "cs_repeat: blocked for :%lu secs.", (unsigned long)(time(NULL) - before)); } if (res != CS_OK) { qb_log (LOG_ERR, "cpg_mcast_joined error:%d.", res); return; } } else { res = cpg_flow_control_state_get (cpg_handle, &fc_state); if (res == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) { /* lets do this later */ send_some_more_messages_later (); qb_log (LOG_INFO, "flow control enabled."); return; } res = cpg_mcast_joined (cpg_handle, CPG_TYPE_AGREED, iov, 2); if (res == CS_ERR_TRY_AGAIN) { /* lets do this later */ send_some_more_messages_later (); if (i > 0) { qb_log (LOG_INFO, "TRY_AGAIN %d to send.", my_msgs_to_send); } return; } else if (res != CS_OK) { qb_log (LOG_ERR, "cpg_mcast_joined error:%d, exiting.", res); exit (-2); } } my_msgs_sent++; my_msg.seq = my_msgs_sent; my_msgs_to_send--; } qb_log (LOG_TRACE, "sent %d; to send %d.", my_msgs_sent, my_msgs_to_send); } static void send_some_more_messages (void * unused) { if (use_zcb) { send_some_more_messages_zcb (); } else { send_some_more_messages_normal (); } } static void msg_blaster (int sock, char* num_to_send_str) { my_msgs_to_send = atoi (num_to_send_str); my_msgs_sent = 0; my_seq = 1; my_pid = getpid(); use_zcb = QB_FALSE; total_stored_msgs = 0; cpg_local_get (cpg_handle, &my_nodeid); /* control the limits */ if (my_msgs_to_send <= 0) my_msgs_to_send = 1; if (my_msgs_to_send > 10000) my_msgs_to_send = 10000; send_some_more_messages_normal (); } static void context_test (int sock) { char response[100]; char *cmp; ssize_t rc; size_t send_len; cpg_context_set (cpg_handle, response); cpg_context_get (cpg_handle, (void**)&cmp); if (response != cmp) { snprintf (response, 100, "%s", FAIL_STR); } else { snprintf (response, 100, "%s", OK_STR); } send_len = strlen (response); rc = send (sock, response, send_len, 0); assert(rc == send_len); } static void msg_blaster_zcb (int sock, char* num_to_send_str) { my_msgs_to_send = atoi (num_to_send_str); my_seq = 1; my_pid = getpid(); use_zcb = QB_TRUE; total_stored_msgs = 0; cpg_local_get (cpg_handle, &my_nodeid); /* control the limits */ if (my_msgs_to_send <= 0) my_msgs_to_send = 1; if (my_msgs_to_send > 10000) my_msgs_to_send = 10000; send_some_more_messages_zcb (); } static int cfg_dispatch_wrapper_fn ( int fd, int revents, void *data) { cs_error_t error; if (revents & POLLHUP || revents & POLLERR) { qb_log (LOG_ERR, "got POLLHUP disconnecting from CFG"); corosync_cfg_finalize(cfg_handle); cfg_handle = 0; return -1; } error = corosync_cfg_dispatch (cfg_handle, CS_DISPATCH_ALL); if (error == CS_ERR_LIBRARY) { qb_log (LOG_ERR, "got LIB error disconnecting from CFG."); corosync_cfg_finalize(cfg_handle); cfg_handle = 0; return -1; } return 0; } static int cpg_dispatch_wrapper_fn ( int fd, int revents, void *data) { cs_error_t error; if (revents & POLLHUP || revents & POLLERR) { qb_log (LOG_ERR, "got POLLHUP disconnecting from CPG"); cpg_finalize(cpg_handle); cpg_handle = 0; return -1; } error = cpg_dispatch (cpg_handle, CS_DISPATCH_ALL); if (error == CS_ERR_LIBRARY) { qb_log (LOG_ERR, "got LIB error disconnecting from CPG"); cpg_finalize(cpg_handle); cpg_handle = 0; return -1; } return 0; } static void do_command (int sock, char* func, char*args[], int num_args) { int result; char response[100]; struct cpg_name group_name; ssize_t rc; size_t send_len; qb_log (LOG_TRACE, "RPC:%s() called.", func); if (strcmp ("cpg_mcast_joined",func) == 0) { struct iovec iov[5]; int a; for (a = 0; a < num_args; a++) { iov[a].iov_base = args[a]; iov[a].iov_len = strlen(args[a])+1; } cpg_mcast_joined (cpg_handle, CPG_TYPE_AGREED, iov, num_args); } else if (strcmp ("cpg_join",func) == 0) { if (strlen(args[0]) >= CPG_MAX_NAME_LENGTH) { qb_log (LOG_ERR, "Invalid group name"); exit (1); } strcpy (group_name.value, args[0]); group_name.length = strlen(args[0]); result = cpg_join (cpg_handle, &group_name); if (result != CS_OK) { qb_log (LOG_ERR, "Could not join process group, error %d", result); exit (1); } qb_log (LOG_INFO, "called cpg_join(%s)!", group_name.value); } else if (strcmp ("cpg_leave",func) == 0) { strcpy (group_name.value, args[0]); group_name.length = strlen(args[0]); result = cpg_leave (cpg_handle, &group_name); if (result != CS_OK) { qb_log (LOG_ERR, "Could not leave process group, error %d", result); exit (1); } qb_log (LOG_INFO, "called cpg_leave(%s)!", group_name.value); } else if (strcmp ("cpg_initialize",func) == 0) { int retry_count = 0; result = cpg_initialize (&cpg_handle, &callbacks); while (result != CS_OK) { qb_log (LOG_ERR, "cpg_initialize error %d (attempt %d)", result, retry_count); if (retry_count >= 3) { exit (1); } sleep(1); retry_count++; result = cpg_initialize (&cpg_handle, &callbacks); } cpg_fd_get (cpg_handle, &cpg_fd); qb_loop_poll_add (ta_poll_handle_get(), QB_LOOP_MED, cpg_fd, POLLIN|POLLNVAL, NULL, cpg_dispatch_wrapper_fn); } else if (strcmp ("cpg_local_get", func) == 0) { unsigned int local_nodeid; cpg_local_get (cpg_handle, &local_nodeid); snprintf (response, 100, "%u",local_nodeid); send_len = strlen (response); rc = send (sock, response, send_len, 0); assert(rc == send_len); } else if (strcmp ("cpg_finalize", func) == 0) { if (cpg_handle > 0) { cpg_finalize (cpg_handle); cpg_handle = 0; } } else if (strcmp ("record_config_events", func) == 0) { record_config_events (sock); } else if (strcmp ("record_messages", func) == 0) { record_messages (); } else if (strcmp ("read_config_event", func) == 0) { read_config_event (sock); } else if (strcmp ("read_messages", func) == 0) { read_messages (sock, args[0]); } else if (strcmp ("msg_blaster_zcb", func) == 0) { msg_blaster_zcb (sock, args[0]); } else if (strcmp ("pcmk_test", func) == 0) { pcmk_test = 1; } else if (strcmp ("msg_blaster", func) == 0) { msg_blaster (sock, args[0]); } else if (strcmp ("context_test", func) == 0) { context_test (sock); } else if (strcmp ("are_you_ok_dude", func) == 0) { snprintf (response, 100, "%s", OK_STR); send_len = strlen (response); rc = send (sock, response, strlen (response), 0); assert(rc == send_len); } else if (strcmp ("cfg_shutdown", func) == 0) { qb_log (LOG_INFO, "calling %s() called!", func); result = corosync_cfg_try_shutdown (cfg_handle, COROSYNC_CFG_SHUTDOWN_FLAG_REQUEST); qb_log (LOG_INFO,"%s() returned %d!", func, result); } else if (strcmp ("cfg_initialize",func) == 0) { int retry_count = 0; qb_log (LOG_INFO,"%s() called!", func); result = corosync_cfg_initialize (&cfg_handle, &cfg_callbacks); while (result != CS_OK) { qb_log (LOG_ERR, "cfg_initialize error %d (attempt %d)", result, retry_count); if (retry_count >= 3) { exit (1); } sleep(1); retry_count++; result = corosync_cfg_initialize (&cfg_handle, &cfg_callbacks); } qb_log (LOG_INFO,"corosync_cfg_initialize() == %d", result); result = corosync_cfg_fd_get (cfg_handle, &cfg_fd); qb_log (LOG_INFO,"corosync_cfg_fd_get() == %d", result); qb_loop_poll_add (ta_poll_handle_get(), QB_LOOP_MED, cfg_fd, POLLIN|POLLNVAL, NULL, cfg_dispatch_wrapper_fn); } else { qb_log(LOG_ERR, "RPC:%s not supported!", func); } } static void my_pre_exit(void) { qb_log (LOG_INFO, "%s PRE EXIT", __FILE__); if (cpg_handle > 0) { cpg_finalize (cpg_handle); cpg_handle = 0; } if (cfg_handle > 0) { corosync_cfg_finalize (cfg_handle); cfg_handle = 0; } PK11_DestroyContext(sha1_context, PR_TRUE); } int main(int argc, char *argv[]) { qb_list_init (&msg_log_head); qb_list_init (&config_chg_log_head); if (NSS_NoDB_Init(".") != SECSuccess) { qb_log(LOG_ERR, "Couldn't initialize nss"); exit (0); } if ((sha1_context = PK11_CreateDigestContext(SEC_OID_SHA1)) == NULL) { qb_log(LOG_ERR, "Couldn't initialize nss"); exit (0); } return test_agent_run ("cpg_test_agent", 9034, do_command, my_pre_exit); } diff --git a/exec/totempg.c b/exec/totempg.c index 861e8b03..9c3a4291 100644 --- a/exec/totempg.c +++ b/exec/totempg.c @@ -1,1523 +1,1523 @@ /* * Copyright (c) 2003-2005 MontaVista Software, Inc. * Copyright (c) 2005 OSDL. * Copyright (c) 2006-2012 Red Hat, Inc. * * All rights reserved. * * Author: Steven Dake (sdake@redhat.com) * Author: Mark Haverkamp (markh@osdl.org) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ /* * FRAGMENTATION AND PACKING ALGORITHM: * * Assemble the entire message into one buffer * if full fragment * store fragment into lengths list * for each full fragment * multicast fragment * set length and fragment fields of pg mesage * store remaining multicast into head of fragmentation data and set lens field * * If a message exceeds the maximum packet size allowed by the totem * single ring protocol, the protocol could lose forward progress. * Statically calculating the allowed data amount doesn't work because * the amount of data allowed depends on the number of fragments in * each message. In this implementation, the maximum fragment size * is dynamically calculated for each fragment added to the message. * It is possible for a message to be two bytes short of the maximum * packet size. This occurs when a message or collection of * messages + the mcast header + the lens are two bytes short of the * end of the packet. Since another len field consumes two bytes, the * len field would consume the rest of the packet without room for data. * * One optimization would be to forgo the final len field and determine * it from the size of the udp datagram. Then this condition would no * longer occur. */ /* * ASSEMBLY AND UNPACKING ALGORITHM: * * copy incoming packet into assembly data buffer indexed by current * location of end of fragment * * if not fragmented * deliver all messages in assembly data buffer * else * if msg_count > 1 and fragmented * deliver all messages except last message in assembly data buffer * copy last fragmented section to start of assembly data buffer * else * if msg_count = 1 and fragmented * do nothing * */ #include #ifdef HAVE_ALLOCA_H #include #endif #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define LOGSYS_UTILS_ONLY 1 #include #include "totemsrp.h" #define min(a,b) ((a) < (b)) ? a : b struct totempg_mcast_header { short version; short type; }; #if !(defined(__i386__) || defined(__x86_64__)) /* * Need align on architectures different then i386 or x86_64 */ #define TOTEMPG_NEED_ALIGN 1 #endif /* * totempg_mcast structure * * header: Identify the mcast. * fragmented: Set if this message continues into next message * continuation: Set if this message is a continuation from last message * msg_count Indicates how many packed messages are contained * in the mcast. * Also, the size of each packed message and the messages themselves are * appended to the end of this structure when sent. */ struct totempg_mcast { struct totempg_mcast_header header; unsigned char fragmented; unsigned char continuation; unsigned short msg_count; /* * short msg_len[msg_count]; */ /* * data for messages */ }; /* * Maximum packet size for totem pg messages */ #define TOTEMPG_PACKET_SIZE (totempg_totem_config->net_mtu - \ sizeof (struct totempg_mcast)) /* * Local variables used for packing small messages */ static unsigned short mcast_packed_msg_lens[FRAME_SIZE_MAX]; static int mcast_packed_msg_count = 0; static int totempg_reserved = 1; static unsigned int totempg_size_limit; static totem_queue_level_changed_fn totem_queue_level_changed = NULL; static uint32_t totempg_threaded_mode = 0; static void *totemsrp_context; /* * Function and data used to log messages */ static int totempg_log_level_security; static int totempg_log_level_error; static int totempg_log_level_warning; static int totempg_log_level_notice; static int totempg_log_level_debug; static int totempg_subsys_id; static void (*totempg_log_printf) ( int level, int subsys, const char *function, const char *file, int line, const char *format, ...) __attribute__((format(printf, 6, 7))); struct totem_config *totempg_totem_config; static totempg_stats_t totempg_stats; enum throw_away_mode { THROW_AWAY_INACTIVE, THROW_AWAY_ACTIVE }; struct assembly { unsigned int nodeid; unsigned char data[MESSAGE_SIZE_MAX]; int index; unsigned char last_frag_num; enum throw_away_mode throw_away_mode; struct qb_list_head list; }; static void assembly_deref (struct assembly *assembly); static int callback_token_received_fn (enum totem_callback_token_type type, const void *data); QB_LIST_DECLARE(assembly_list_inuse); /* * Free list is used both for transitional and operational assemblies */ QB_LIST_DECLARE(assembly_list_free); QB_LIST_DECLARE(assembly_list_inuse_trans); QB_LIST_DECLARE(totempg_groups_list); /* * Staging buffer for packed messages. Messages are staged in this buffer * before sending. Multiple messages may fit which cuts down on the * number of mcasts sent. If a message doesn't completely fit, then * the mcast header has a fragment bit set that says that there are more * data to follow. fragment_size is an index into the buffer. It indicates * the size of message data and where to place new message data. * fragment_contuation indicates whether the first packed message in * the buffer is a continuation of a previously packed fragment. */ static unsigned char *fragmentation_data; static int fragment_size = 0; static int fragment_continuation = 0; static int totempg_waiting_transack = 0; struct totempg_group_instance { void (*deliver_fn) ( unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required); void (*confchg_fn) ( enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id); struct totempg_group *groups; int groups_cnt; int32_t q_level; struct qb_list_head list; }; static unsigned char next_fragment = 1; static pthread_mutex_t totempg_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t callback_token_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t mcast_msg_mutex = PTHREAD_MUTEX_INITIALIZER; #define log_printf(level, format, args...) \ do { \ totempg_log_printf(level, \ totempg_subsys_id, \ __FUNCTION__, __FILE__, __LINE__, \ format, ##args); \ } while (0); static int msg_count_send_ok (int msg_count); static int byte_count_send_ok (int byte_count); static void totempg_waiting_trans_ack_cb (int waiting_trans_ack) { log_printf(LOG_DEBUG, "waiting_trans_ack changed to %u", waiting_trans_ack); totempg_waiting_transack = waiting_trans_ack; } static struct assembly *assembly_ref (unsigned int nodeid) { struct assembly *assembly; struct qb_list_head *list; struct qb_list_head *active_assembly_list_inuse; if (totempg_waiting_transack) { active_assembly_list_inuse = &assembly_list_inuse_trans; } else { active_assembly_list_inuse = &assembly_list_inuse; } /* * Search inuse list for node id and return assembly buffer if found */ qb_list_for_each(list, active_assembly_list_inuse) { assembly = qb_list_entry (list, struct assembly, list); if (nodeid == assembly->nodeid) { return (assembly); } } /* * Nothing found in inuse list get one from free list if available */ if (qb_list_empty (&assembly_list_free) == 0) { - assembly = qb_list_entry (assembly_list_free.next, struct assembly, list); + assembly = qb_list_first_entry (&assembly_list_free, struct assembly, list); qb_list_del (&assembly->list); qb_list_add (&assembly->list, active_assembly_list_inuse); assembly->nodeid = nodeid; assembly->index = 0; assembly->last_frag_num = 0; assembly->throw_away_mode = THROW_AWAY_INACTIVE; return (assembly); } /* * Nothing available in inuse or free list, so allocate a new one */ assembly = malloc (sizeof (struct assembly)); /* * TODO handle memory allocation failure here */ assert (assembly); assembly->nodeid = nodeid; assembly->data[0] = 0; assembly->index = 0; assembly->last_frag_num = 0; assembly->throw_away_mode = THROW_AWAY_INACTIVE; qb_list_init (&assembly->list); qb_list_add (&assembly->list, active_assembly_list_inuse); return (assembly); } static void assembly_deref (struct assembly *assembly) { qb_list_del (&assembly->list); qb_list_add (&assembly->list, &assembly_list_free); } static void assembly_deref_from_normal_and_trans (int nodeid) { int j; struct qb_list_head *list, *tmp_iter; struct qb_list_head *active_assembly_list_inuse; struct assembly *assembly; for (j = 0; j < 2; j++) { if (j == 0) { active_assembly_list_inuse = &assembly_list_inuse; } else { active_assembly_list_inuse = &assembly_list_inuse_trans; } qb_list_for_each_safe(list, tmp_iter, active_assembly_list_inuse) { assembly = qb_list_entry (list, struct assembly, list); if (nodeid == assembly->nodeid) { qb_list_del (&assembly->list); qb_list_add (&assembly->list, &assembly_list_free); } } } } static inline void app_confchg_fn ( enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id) { int i; struct totempg_group_instance *instance; struct qb_list_head *list; /* * For every leaving processor, add to free list * This also has the side effect of clearing out the dataset * In the leaving processor's assembly buffer. */ for (i = 0; i < left_list_entries; i++) { assembly_deref_from_normal_and_trans (left_list[i]); } qb_list_for_each(list, &totempg_groups_list) { instance = qb_list_entry (list, struct totempg_group_instance, list); if (instance->confchg_fn) { instance->confchg_fn ( configuration_type, member_list, member_list_entries, left_list, left_list_entries, joined_list, joined_list_entries, ring_id); } } } static inline void group_endian_convert ( void *msg, int msg_len) { unsigned short *group_len; int i; char *aligned_msg; #ifdef TOTEMPG_NEED_ALIGN /* * Align data structure for not i386 or x86_64 */ if ((size_t)msg % 4 != 0) { aligned_msg = alloca(msg_len); memcpy(aligned_msg, msg, msg_len); } else { aligned_msg = msg; } #else aligned_msg = msg; #endif group_len = (unsigned short *)aligned_msg; group_len[0] = swab16(group_len[0]); for (i = 1; i < group_len[0] + 1; i++) { group_len[i] = swab16(group_len[i]); } if (aligned_msg != msg) { memcpy(msg, aligned_msg, msg_len); } } static inline int group_matches ( struct iovec *iovec, unsigned int iov_len, struct totempg_group *groups_b, unsigned int group_b_cnt, unsigned int *adjust_iovec) { unsigned short *group_len; char *group_name; int i; int j; #ifdef TOTEMPG_NEED_ALIGN struct iovec iovec_aligned = { NULL, 0 }; #endif assert (iov_len == 1); #ifdef TOTEMPG_NEED_ALIGN /* * Align data structure for not i386 or x86_64 */ if ((size_t)iovec->iov_base % 4 != 0) { iovec_aligned.iov_base = alloca(iovec->iov_len); memcpy(iovec_aligned.iov_base, iovec->iov_base, iovec->iov_len); iovec_aligned.iov_len = iovec->iov_len; iovec = &iovec_aligned; } #endif group_len = (unsigned short *)iovec->iov_base; group_name = ((char *)iovec->iov_base) + sizeof (unsigned short) * (group_len[0] + 1); /* * Calculate amount to adjust the iovec by before delivering to app */ *adjust_iovec = sizeof (unsigned short) * (group_len[0] + 1); for (i = 1; i < group_len[0] + 1; i++) { *adjust_iovec += group_len[i]; } /* * Determine if this message should be delivered to this instance */ for (i = 1; i < group_len[0] + 1; i++) { for (j = 0; j < group_b_cnt; j++) { if ((group_len[i] == groups_b[j].group_len) && (memcmp (groups_b[j].group, group_name, group_len[i]) == 0)) { return (1); } } group_name += group_len[i]; } return (0); } static inline void app_deliver_fn ( unsigned int nodeid, void *msg, unsigned int msg_len, int endian_conversion_required) { struct totempg_group_instance *instance; struct iovec stripped_iovec; unsigned int adjust_iovec; struct iovec *iovec; struct qb_list_head *list; struct iovec aligned_iovec = { NULL, 0 }; if (endian_conversion_required) { group_endian_convert (msg, msg_len); } /* * TODO: segmentation/assembly need to be redesigned to provide aligned access * in all cases to avoid memory copies on non386 archs. Probably broke backwars * compatibility */ #ifdef TOTEMPG_NEED_ALIGN /* * Align data structure for not i386 or x86_64 */ aligned_iovec.iov_base = alloca(msg_len); aligned_iovec.iov_len = msg_len; memcpy(aligned_iovec.iov_base, msg, msg_len); #else aligned_iovec.iov_base = msg; aligned_iovec.iov_len = msg_len; #endif iovec = &aligned_iovec; qb_list_for_each(list, &totempg_groups_list) { instance = qb_list_entry (list, struct totempg_group_instance, list); if (group_matches (iovec, 1, instance->groups, instance->groups_cnt, &adjust_iovec)) { stripped_iovec.iov_len = iovec->iov_len - adjust_iovec; stripped_iovec.iov_base = (char *)iovec->iov_base + adjust_iovec; #ifdef TOTEMPG_NEED_ALIGN /* * Align data structure for not i386 or x86_64 */ if ((char *)iovec->iov_base + adjust_iovec % 4 != 0) { /* * Deal with misalignment */ stripped_iovec.iov_base = alloca (stripped_iovec.iov_len); memcpy (stripped_iovec.iov_base, (char *)iovec->iov_base + adjust_iovec, stripped_iovec.iov_len); } #endif instance->deliver_fn ( nodeid, stripped_iovec.iov_base, stripped_iovec.iov_len, endian_conversion_required); } } } static void totempg_confchg_fn ( enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id) { // TODO optimize this app_confchg_fn (configuration_type, member_list, member_list_entries, left_list, left_list_entries, joined_list, joined_list_entries, ring_id); } static void totempg_deliver_fn ( unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required) { struct totempg_mcast *mcast; unsigned short *msg_lens; int i; struct assembly *assembly; char header[FRAME_SIZE_MAX]; int msg_count; int continuation; int start; const char *data; int datasize; struct iovec iov_delv; assembly = assembly_ref (nodeid); assert (assembly); /* * Assemble the header into one block of data and * assemble the packet contents into one block of data to simplify delivery */ mcast = (struct totempg_mcast *)msg; if (endian_conversion_required) { mcast->msg_count = swab16 (mcast->msg_count); } msg_count = mcast->msg_count; datasize = sizeof (struct totempg_mcast) + msg_count * sizeof (unsigned short); memcpy (header, msg, datasize); data = msg; msg_lens = (unsigned short *) (header + sizeof (struct totempg_mcast)); if (endian_conversion_required) { for (i = 0; i < mcast->msg_count; i++) { msg_lens[i] = swab16 (msg_lens[i]); } } memcpy (&assembly->data[assembly->index], &data[datasize], msg_len - datasize); /* * If the last message in the buffer is a fragment, then we * can't deliver it. We'll first deliver the full messages * then adjust the assembly buffer so we can add the rest of the * fragment when it arrives. */ msg_count = mcast->fragmented ? mcast->msg_count - 1 : mcast->msg_count; continuation = mcast->continuation; iov_delv.iov_base = (void *)&assembly->data[0]; iov_delv.iov_len = assembly->index + msg_lens[0]; /* * Make sure that if this message is a continuation, that it * matches the sequence number of the previous fragment. * Also, if the first packed message is a continuation * of a previous message, but the assembly buffer * is empty, then we need to discard it since we can't * assemble a complete message. Likewise, if this message isn't a * continuation and the assembly buffer is empty, we have to discard * the continued message. */ start = 0; if (assembly->throw_away_mode == THROW_AWAY_ACTIVE) { /* Throw away the first msg block */ if (mcast->fragmented == 0 || mcast->fragmented == 1) { assembly->throw_away_mode = THROW_AWAY_INACTIVE; assembly->index += msg_lens[0]; iov_delv.iov_base = (void *)&assembly->data[assembly->index]; iov_delv.iov_len = msg_lens[1]; start = 1; } } else if (assembly->throw_away_mode == THROW_AWAY_INACTIVE) { if (continuation == assembly->last_frag_num) { assembly->last_frag_num = mcast->fragmented; for (i = start; i < msg_count; i++) { app_deliver_fn(nodeid, iov_delv.iov_base, iov_delv.iov_len, endian_conversion_required); assembly->index += msg_lens[i]; iov_delv.iov_base = (void *)&assembly->data[assembly->index]; if (i < (msg_count - 1)) { iov_delv.iov_len = msg_lens[i + 1]; } } } else { log_printf (LOG_DEBUG, "fragmented continuation %u is not equal to assembly last_frag_num %u", continuation, assembly->last_frag_num); assembly->throw_away_mode = THROW_AWAY_ACTIVE; } } if (mcast->fragmented == 0) { /* * End of messages, dereference assembly struct */ assembly->last_frag_num = 0; assembly->index = 0; assembly_deref (assembly); } else { /* * Message is fragmented, keep around assembly list */ if (mcast->msg_count > 1) { memmove (&assembly->data[0], &assembly->data[assembly->index], msg_lens[msg_count]); assembly->index = 0; } assembly->index += msg_lens[msg_count]; } } /* * Totem Process Group Abstraction * depends on poll abstraction, POSIX, IPV4 */ void *callback_token_received_handle; int callback_token_received_fn (enum totem_callback_token_type type, const void *data) { struct totempg_mcast mcast; struct iovec iovecs[3]; if (totempg_threaded_mode == 1) { pthread_mutex_lock (&mcast_msg_mutex); } if (mcast_packed_msg_count == 0) { if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&mcast_msg_mutex); } return (0); } if (totemsrp_avail(totemsrp_context) == 0) { if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&mcast_msg_mutex); } return (0); } mcast.header.version = 0; mcast.header.type = 0; mcast.fragmented = 0; /* * Was the first message in this buffer a continuation of a * fragmented message? */ mcast.continuation = fragment_continuation; fragment_continuation = 0; mcast.msg_count = mcast_packed_msg_count; iovecs[0].iov_base = (void *)&mcast; iovecs[0].iov_len = sizeof (struct totempg_mcast); iovecs[1].iov_base = (void *)mcast_packed_msg_lens; iovecs[1].iov_len = mcast_packed_msg_count * sizeof (unsigned short); iovecs[2].iov_base = (void *)&fragmentation_data[0]; iovecs[2].iov_len = fragment_size; (void)totemsrp_mcast (totemsrp_context, iovecs, 3, 0); mcast_packed_msg_count = 0; fragment_size = 0; if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&mcast_msg_mutex); } return (0); } /* * Initialize the totem process group abstraction */ int totempg_initialize ( qb_loop_t *poll_handle, struct totem_config *totem_config) { int res; totempg_totem_config = totem_config; totempg_log_level_security = totem_config->totem_logging_configuration.log_level_security; totempg_log_level_error = totem_config->totem_logging_configuration.log_level_error; totempg_log_level_warning = totem_config->totem_logging_configuration.log_level_warning; totempg_log_level_notice = totem_config->totem_logging_configuration.log_level_notice; totempg_log_level_debug = totem_config->totem_logging_configuration.log_level_debug; totempg_log_printf = totem_config->totem_logging_configuration.log_printf; totempg_subsys_id = totem_config->totem_logging_configuration.log_subsys_id; fragmentation_data = malloc (TOTEMPG_PACKET_SIZE); if (fragmentation_data == 0) { return (-1); } totemsrp_net_mtu_adjust (totem_config); res = totemsrp_initialize ( poll_handle, &totemsrp_context, totem_config, &totempg_stats, totempg_deliver_fn, totempg_confchg_fn, totempg_waiting_trans_ack_cb); totemsrp_callback_token_create ( totemsrp_context, &callback_token_received_handle, TOTEM_CALLBACK_TOKEN_RECEIVED, 0, callback_token_received_fn, 0); totempg_size_limit = (totemsrp_avail(totemsrp_context) - 1) * (totempg_totem_config->net_mtu - sizeof (struct totempg_mcast) - 16); qb_list_init (&totempg_groups_list); return (res); } void totempg_finalize (void) { if (totempg_threaded_mode == 1) { pthread_mutex_lock (&totempg_mutex); } totemsrp_finalize (totemsrp_context); if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&totempg_mutex); } } /* * Multicast a message */ static int mcast_msg ( struct iovec *iovec_in, unsigned int iov_len, int guarantee) { int res = 0; struct totempg_mcast mcast; struct iovec iovecs[3]; struct iovec iovec[64]; int i; int dest, src; int max_packet_size = 0; int copy_len = 0; int copy_base = 0; int total_size = 0; if (totempg_threaded_mode == 1) { pthread_mutex_lock (&mcast_msg_mutex); } totemsrp_event_signal (totemsrp_context, TOTEM_EVENT_NEW_MSG, 1); /* * Remove zero length iovectors from the list */ assert (iov_len < 64); for (dest = 0, src = 0; src < iov_len; src++) { if (iovec_in[src].iov_len) { memcpy (&iovec[dest++], &iovec_in[src], sizeof (struct iovec)); } } iov_len = dest; max_packet_size = TOTEMPG_PACKET_SIZE - (sizeof (unsigned short) * (mcast_packed_msg_count + 1)); mcast_packed_msg_lens[mcast_packed_msg_count] = 0; /* * Check if we would overwrite new message queue */ for (i = 0; i < iov_len; i++) { total_size += iovec[i].iov_len; } if (byte_count_send_ok (total_size + sizeof(unsigned short) * (mcast_packed_msg_count)) == 0) { if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&mcast_msg_mutex); } return(-1); } mcast.header.version = 0; for (i = 0; i < iov_len; ) { mcast.fragmented = 0; mcast.continuation = fragment_continuation; copy_len = iovec[i].iov_len - copy_base; /* * If it all fits with room left over, copy it in. * We need to leave at least sizeof(short) + 1 bytes in the * fragment_buffer on exit so that max_packet_size + fragment_size * doesn't exceed the size of the fragment_buffer on the next call. */ if ((copy_len + fragment_size) < (max_packet_size - sizeof (unsigned short))) { memcpy (&fragmentation_data[fragment_size], (char *)iovec[i].iov_base + copy_base, copy_len); fragment_size += copy_len; mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len; next_fragment = 1; copy_len = 0; copy_base = 0; i++; continue; /* * If it just fits or is too big, then send out what fits. */ } else { unsigned char *data_ptr; copy_len = min(copy_len, max_packet_size - fragment_size); if( copy_len == max_packet_size ) data_ptr = (unsigned char *)iovec[i].iov_base + copy_base; else { data_ptr = fragmentation_data; memcpy (&fragmentation_data[fragment_size], (unsigned char *)iovec[i].iov_base + copy_base, copy_len); } memcpy (&fragmentation_data[fragment_size], (unsigned char *)iovec[i].iov_base + copy_base, copy_len); mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len; /* * if we're not on the last iovec or the iovec is too large to * fit, then indicate a fragment. This also means that the next * message will have the continuation of this one. */ if ((i < (iov_len - 1)) || ((copy_base + copy_len) < iovec[i].iov_len)) { if (!next_fragment) { next_fragment++; } fragment_continuation = next_fragment; mcast.fragmented = next_fragment++; assert(fragment_continuation != 0); assert(mcast.fragmented != 0); } else { fragment_continuation = 0; } /* * assemble the message and send it */ mcast.msg_count = ++mcast_packed_msg_count; iovecs[0].iov_base = (void *)&mcast; iovecs[0].iov_len = sizeof(struct totempg_mcast); iovecs[1].iov_base = (void *)mcast_packed_msg_lens; iovecs[1].iov_len = mcast_packed_msg_count * sizeof(unsigned short); iovecs[2].iov_base = (void *)data_ptr; iovecs[2].iov_len = max_packet_size; assert (totemsrp_avail(totemsrp_context) > 0); res = totemsrp_mcast (totemsrp_context, iovecs, 3, guarantee); if (res == -1) { goto error_exit; } /* * Recalculate counts and indexes for the next. */ mcast_packed_msg_lens[0] = 0; mcast_packed_msg_count = 0; fragment_size = 0; max_packet_size = TOTEMPG_PACKET_SIZE - (sizeof(unsigned short)); /* * If the iovec all fit, go to the next iovec */ if ((copy_base + copy_len) == iovec[i].iov_len) { copy_len = 0; copy_base = 0; i++; /* * Continue with the rest of the current iovec. */ } else { copy_base += copy_len; } } } /* * Bump only if we added message data. This may be zero if * the last buffer just fit into the fragmentation_data buffer * and we were at the last iovec. */ if (mcast_packed_msg_lens[mcast_packed_msg_count]) { mcast_packed_msg_count++; } error_exit: if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&mcast_msg_mutex); } return (res); } /* * Determine if a message of msg_size could be queued */ static int msg_count_send_ok ( int msg_count) { int avail = 0; avail = totemsrp_avail (totemsrp_context); totempg_stats.msg_queue_avail = avail; return ((avail - totempg_reserved) > msg_count); } static int byte_count_send_ok ( int byte_count) { unsigned int msg_count = 0; int avail = 0; avail = totemsrp_avail (totemsrp_context); msg_count = (byte_count / (totempg_totem_config->net_mtu - sizeof (struct totempg_mcast) - 16)) + 1; return (avail >= msg_count); } static int send_reserve ( int msg_size) { unsigned int msg_count = 0; msg_count = (msg_size / (totempg_totem_config->net_mtu - sizeof (struct totempg_mcast) - 16)) + 1; totempg_reserved += msg_count; totempg_stats.msg_reserved = totempg_reserved; return (msg_count); } static void send_release ( int msg_count) { totempg_reserved -= msg_count; totempg_stats.msg_reserved = totempg_reserved; } #ifndef HAVE_SMALL_MEMORY_FOOTPRINT #undef MESSAGE_QUEUE_MAX #define MESSAGE_QUEUE_MAX ((4 * MESSAGE_SIZE_MAX) / totempg_totem_config->net_mtu) #endif /* HAVE_SMALL_MEMORY_FOOTPRINT */ static uint32_t q_level_precent_used(void) { return (100 - (((totemsrp_avail(totemsrp_context) - totempg_reserved) * 100) / MESSAGE_QUEUE_MAX)); } int totempg_callback_token_create ( void **handle_out, enum totem_callback_token_type type, int delete, int (*callback_fn) (enum totem_callback_token_type type, const void *), const void *data) { unsigned int res; if (totempg_threaded_mode == 1) { pthread_mutex_lock (&callback_token_mutex); } res = totemsrp_callback_token_create (totemsrp_context, handle_out, type, delete, callback_fn, data); if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&callback_token_mutex); } return (res); } void totempg_callback_token_destroy ( void *handle_out) { if (totempg_threaded_mode == 1) { pthread_mutex_lock (&callback_token_mutex); } totemsrp_callback_token_destroy (totemsrp_context, handle_out); if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&callback_token_mutex); } } /* * vi: set autoindent tabstop=4 shiftwidth=4 : */ int totempg_groups_initialize ( void **totempg_groups_instance, void (*deliver_fn) ( unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void (*confchg_fn) ( enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)) { struct totempg_group_instance *instance; if (totempg_threaded_mode == 1) { pthread_mutex_lock (&totempg_mutex); } instance = malloc (sizeof (struct totempg_group_instance)); if (instance == NULL) { goto error_exit; } instance->deliver_fn = deliver_fn; instance->confchg_fn = confchg_fn; instance->groups = 0; instance->groups_cnt = 0; instance->q_level = QB_LOOP_MED; qb_list_init (&instance->list); qb_list_add (&instance->list, &totempg_groups_list); if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&totempg_mutex); } *totempg_groups_instance = instance; return (0); error_exit: if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&totempg_mutex); } return (-1); } int totempg_groups_join ( void *totempg_groups_instance, const struct totempg_group *groups, size_t group_cnt) { struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance; struct totempg_group *new_groups; unsigned int res = 0; if (totempg_threaded_mode == 1) { pthread_mutex_lock (&totempg_mutex); } new_groups = realloc (instance->groups, sizeof (struct totempg_group) * (instance->groups_cnt + group_cnt)); if (new_groups == 0) { res = ENOMEM; goto error_exit; } memcpy (&new_groups[instance->groups_cnt], groups, group_cnt * sizeof (struct totempg_group)); instance->groups = new_groups; instance->groups_cnt += group_cnt; error_exit: if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&totempg_mutex); } return (res); } int totempg_groups_leave ( void *totempg_groups_instance, const struct totempg_group *groups, size_t group_cnt) { if (totempg_threaded_mode == 1) { pthread_mutex_lock (&totempg_mutex); } if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&totempg_mutex); } return (0); } #define MAX_IOVECS_FROM_APP 32 #define MAX_GROUPS_PER_MSG 32 int totempg_groups_mcast_joined ( void *totempg_groups_instance, const struct iovec *iovec, unsigned int iov_len, int guarantee) { struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance; unsigned short group_len[MAX_GROUPS_PER_MSG + 1]; struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP]; int i; unsigned int res; if (totempg_threaded_mode == 1) { pthread_mutex_lock (&totempg_mutex); } /* * Build group_len structure and the iovec_mcast structure */ group_len[0] = instance->groups_cnt; for (i = 0; i < instance->groups_cnt; i++) { group_len[i + 1] = instance->groups[i].group_len; iovec_mcast[i + 1].iov_len = instance->groups[i].group_len; iovec_mcast[i + 1].iov_base = (void *) instance->groups[i].group; } iovec_mcast[0].iov_len = (instance->groups_cnt + 1) * sizeof (unsigned short); iovec_mcast[0].iov_base = group_len; for (i = 0; i < iov_len; i++) { iovec_mcast[i + instance->groups_cnt + 1].iov_len = iovec[i].iov_len; iovec_mcast[i + instance->groups_cnt + 1].iov_base = iovec[i].iov_base; } res = mcast_msg (iovec_mcast, iov_len + instance->groups_cnt + 1, guarantee); if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&totempg_mutex); } return (res); } static void check_q_level( void *totempg_groups_instance) { struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance; int32_t old_level = instance->q_level; int32_t percent_used = q_level_precent_used(); if (percent_used >= 75 && instance->q_level != TOTEM_Q_LEVEL_CRITICAL) { instance->q_level = TOTEM_Q_LEVEL_CRITICAL; } else if (percent_used < 30 && instance->q_level != TOTEM_Q_LEVEL_LOW) { instance->q_level = TOTEM_Q_LEVEL_LOW; } else if (percent_used > 40 && percent_used < 50 && instance->q_level != TOTEM_Q_LEVEL_GOOD) { instance->q_level = TOTEM_Q_LEVEL_GOOD; } else if (percent_used > 60 && percent_used < 70 && instance->q_level != TOTEM_Q_LEVEL_HIGH) { instance->q_level = TOTEM_Q_LEVEL_HIGH; } if (totem_queue_level_changed && old_level != instance->q_level) { totem_queue_level_changed(instance->q_level); } } void totempg_check_q_level( void *totempg_groups_instance) { struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance; check_q_level(instance); } int totempg_groups_joined_reserve ( void *totempg_groups_instance, const struct iovec *iovec, unsigned int iov_len) { struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance; unsigned int size = 0; unsigned int i; unsigned int reserved = 0; if (totempg_threaded_mode == 1) { pthread_mutex_lock (&totempg_mutex); pthread_mutex_lock (&mcast_msg_mutex); } for (i = 0; i < instance->groups_cnt; i++) { size += instance->groups[i].group_len; } for (i = 0; i < iov_len; i++) { size += iovec[i].iov_len; } if (size >= totempg_size_limit) { reserved = -1; goto error_exit; } if (byte_count_send_ok (size)) { reserved = send_reserve (size); } else { reserved = 0; } error_exit: check_q_level(instance); if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&mcast_msg_mutex); pthread_mutex_unlock (&totempg_mutex); } return (reserved); } int totempg_groups_joined_release (int msg_count) { if (totempg_threaded_mode == 1) { pthread_mutex_lock (&totempg_mutex); pthread_mutex_lock (&mcast_msg_mutex); } send_release (msg_count); if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&mcast_msg_mutex); pthread_mutex_unlock (&totempg_mutex); } return 0; } int totempg_groups_mcast_groups ( void *totempg_groups_instance, int guarantee, const struct totempg_group *groups, size_t groups_cnt, const struct iovec *iovec, unsigned int iov_len) { unsigned short group_len[MAX_GROUPS_PER_MSG + 1]; struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP]; int i; unsigned int res; if (totempg_threaded_mode == 1) { pthread_mutex_lock (&totempg_mutex); } /* * Build group_len structure and the iovec_mcast structure */ group_len[0] = groups_cnt; for (i = 0; i < groups_cnt; i++) { group_len[i + 1] = groups[i].group_len; iovec_mcast[i + 1].iov_len = groups[i].group_len; iovec_mcast[i + 1].iov_base = (void *) groups[i].group; } iovec_mcast[0].iov_len = (groups_cnt + 1) * sizeof (unsigned short); iovec_mcast[0].iov_base = group_len; for (i = 0; i < iov_len; i++) { iovec_mcast[i + groups_cnt + 1].iov_len = iovec[i].iov_len; iovec_mcast[i + groups_cnt + 1].iov_base = iovec[i].iov_base; } res = mcast_msg (iovec_mcast, iov_len + groups_cnt + 1, guarantee); if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&totempg_mutex); } return (res); } /* * Returns -1 if error, 0 if can't send, 1 if can send the message */ int totempg_groups_send_ok_groups ( void *totempg_groups_instance, const struct totempg_group *groups, size_t groups_cnt, const struct iovec *iovec, unsigned int iov_len) { unsigned int size = 0; unsigned int i; unsigned int res; if (totempg_threaded_mode == 1) { pthread_mutex_lock (&totempg_mutex); } for (i = 0; i < groups_cnt; i++) { size += groups[i].group_len; } for (i = 0; i < iov_len; i++) { size += iovec[i].iov_len; } res = msg_count_send_ok (size); if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&totempg_mutex); } return (res); } int totempg_ifaces_get ( unsigned int nodeid, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count) { int res; res = totemsrp_ifaces_get ( totemsrp_context, nodeid, interfaces, interfaces_size, status, iface_count); return (res); } void totempg_event_signal (enum totem_event_type type, int value) { totemsrp_event_signal (totemsrp_context, type, value); } void* totempg_get_stats (void) { return &totempg_stats; } int totempg_crypto_set ( const char *cipher_type, const char *hash_type) { int res; res = totemsrp_crypto_set (totemsrp_context, cipher_type, hash_type); return (res); } int totempg_ring_reenable (void) { int res; res = totemsrp_ring_reenable (totemsrp_context); return (res); } #define ONE_IFACE_LEN 63 const char *totempg_ifaces_print (unsigned int nodeid) { static char iface_string[256 * INTERFACE_MAX]; char one_iface[ONE_IFACE_LEN+1]; struct totem_ip_address interfaces[INTERFACE_MAX]; unsigned int iface_count; unsigned int i; int res; iface_string[0] = '\0'; res = totempg_ifaces_get (nodeid, interfaces, INTERFACE_MAX, NULL, &iface_count); if (res == -1) { return ("no interface found for nodeid"); } res = totempg_ifaces_get (nodeid, interfaces, INTERFACE_MAX, NULL, &iface_count); for (i = 0; i < iface_count; i++) { snprintf (one_iface, ONE_IFACE_LEN, "r(%d) ip(%s) ", i, totemip_print (&interfaces[i])); strcat (iface_string, one_iface); } return (iface_string); } unsigned int totempg_my_nodeid_get (void) { return (totemsrp_my_nodeid_get(totemsrp_context)); } int totempg_my_family_get (void) { return (totemsrp_my_family_get(totemsrp_context)); } extern void totempg_service_ready_register ( void (*totem_service_ready) (void)) { totemsrp_service_ready_register (totemsrp_context, totem_service_ready); } void totempg_queue_level_register_callback (totem_queue_level_changed_fn fn) { totem_queue_level_changed = fn; } extern int totempg_member_add ( const struct totem_ip_address *member, int ring_no) { return totemsrp_member_add (totemsrp_context, member, ring_no); } extern int totempg_member_remove ( const struct totem_ip_address *member, int ring_no) { return totemsrp_member_remove (totemsrp_context, member, ring_no); } void totempg_threaded_mode_enable (void) { totempg_threaded_mode = 1; totemsrp_threaded_mode_enable (totemsrp_context); } void totempg_trans_ack (void) { totemsrp_trans_ack (totemsrp_context); } diff --git a/exec/votequorum.c b/exec/votequorum.c index d05f7b5d..401308a4 100644 --- a/exec/votequorum.c +++ b/exec/votequorum.c @@ -1,3027 +1,3023 @@ /* * Copyright (c) 2009-2015 Red Hat, Inc. * * All rights reserved. * * Authors: Christine Caulfield (ccaulfie@redhat.com) * Fabio M. Di Nitto (fdinitto@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include "quorum.h" #include #include #include #include #include #include #include "service.h" #include "util.h" LOGSYS_DECLARE_SUBSYS ("VOTEQ"); /* * interface with corosync */ static struct corosync_api_v1 *corosync_api; /* * votequorum global config vars */ static char qdevice_name[VOTEQUORUM_QDEVICE_MAX_NAME_LEN]; static struct cluster_node *qdevice = NULL; static unsigned int qdevice_timeout = VOTEQUORUM_QDEVICE_DEFAULT_TIMEOUT; static unsigned int qdevice_sync_timeout = VOTEQUORUM_QDEVICE_DEFAULT_SYNC_TIMEOUT; static uint8_t qdevice_can_operate = 1; static void *qdevice_reg_conn = NULL; static uint8_t qdevice_master_wins = 0; static uint8_t two_node = 0; static uint8_t wait_for_all = 0; static uint8_t wait_for_all_status = 0; static enum {ATB_NONE, ATB_LOWEST, ATB_HIGHEST, ATB_LIST} auto_tie_breaker = ATB_NONE; static int lowest_node_id = -1; static int highest_node_id = -1; #define DEFAULT_LMS_WIN 10000 static uint8_t last_man_standing = 0; static uint32_t last_man_standing_window = DEFAULT_LMS_WIN; static uint8_t allow_downscale = 0; static uint32_t ev_barrier = 0; static uint8_t ev_tracking = 0; static uint32_t ev_tracking_barrier = 0; static int ev_tracking_fd = -1; /* * votequorum_exec defines/structs/forward definitions */ struct req_exec_quorum_nodeinfo { struct qb_ipc_request_header header __attribute__((aligned(8))); uint32_t nodeid; uint32_t votes; uint32_t expected_votes; uint32_t flags; } __attribute__((packed)); struct req_exec_quorum_reconfigure { struct qb_ipc_request_header header __attribute__((aligned(8))); uint32_t nodeid; uint32_t value; uint8_t param; uint8_t _pad0; uint8_t _pad1; uint8_t _pad2; } __attribute__((packed)); struct req_exec_quorum_qdevice_reg { struct qb_ipc_request_header header __attribute__((aligned(8))); uint32_t operation; char qdevice_name[VOTEQUORUM_QDEVICE_MAX_NAME_LEN]; } __attribute__((packed)); struct req_exec_quorum_qdevice_reconfigure { struct qb_ipc_request_header header __attribute__((aligned(8))); char oldname[VOTEQUORUM_QDEVICE_MAX_NAME_LEN]; char newname[VOTEQUORUM_QDEVICE_MAX_NAME_LEN]; } __attribute__((packed)); /* * votequorum_exec onwire version (via totem) */ #include "votequorum.h" /* * votequorum_exec onwire messages (via totem) */ #define MESSAGE_REQ_EXEC_VOTEQUORUM_NODEINFO 0 #define MESSAGE_REQ_EXEC_VOTEQUORUM_RECONFIGURE 1 #define MESSAGE_REQ_EXEC_VOTEQUORUM_QDEVICE_REG 2 #define MESSAGE_REQ_EXEC_VOTEQUORUM_QDEVICE_RECONFIGURE 3 static void votequorum_exec_send_expectedvotes_notification(void); static int votequorum_exec_send_quorum_notification(void *conn, uint64_t context); static int votequorum_exec_send_nodelist_notification(void *conn, uint64_t context); #define VOTEQUORUM_RECONFIG_PARAM_EXPECTED_VOTES 1 #define VOTEQUORUM_RECONFIG_PARAM_NODE_VOTES 2 #define VOTEQUORUM_RECONFIG_PARAM_CANCEL_WFA 3 static int votequorum_exec_send_reconfigure(uint8_t param, unsigned int nodeid, uint32_t value); /* * used by req_exec_quorum_qdevice_reg */ #define VOTEQUORUM_QDEVICE_OPERATION_UNREGISTER 0 #define VOTEQUORUM_QDEVICE_OPERATION_REGISTER 1 /* * votequorum internal node status/view */ #define NODE_FLAGS_QUORATE 1 #define NODE_FLAGS_LEAVING 2 #define NODE_FLAGS_WFASTATUS 4 #define NODE_FLAGS_FIRST 8 #define NODE_FLAGS_QDEVICE_REGISTERED 16 #define NODE_FLAGS_QDEVICE_ALIVE 32 #define NODE_FLAGS_QDEVICE_CAST_VOTE 64 #define NODE_FLAGS_QDEVICE_MASTER_WINS 128 typedef enum { NODESTATE_MEMBER=1, NODESTATE_DEAD, NODESTATE_LEAVING } nodestate_t; struct cluster_node { int node_id; nodestate_t state; uint32_t votes; uint32_t expected_votes; uint32_t flags; struct qb_list_head list; }; /* * votequorum internal quorum status */ static uint8_t quorum; static uint8_t cluster_is_quorate; /* * votequorum membership data */ static struct cluster_node *us; static struct qb_list_head cluster_members_list; static unsigned int quorum_members[PROCESSOR_COUNT_MAX]; static unsigned int previous_quorum_members[PROCESSOR_COUNT_MAX]; static unsigned int atb_nodelist[PROCESSOR_COUNT_MAX]; static int quorum_members_entries = 0; static int previous_quorum_members_entries = 0; static int atb_nodelist_entries = 0; static struct memb_ring_id quorum_ringid; /* * pre allocate all cluster_nodes + one for qdevice */ static struct cluster_node cluster_nodes[PROCESSOR_COUNT_MAX+2]; static int cluster_nodes_entries = 0; /* * votequorum tracking */ struct quorum_pd { unsigned char track_flags; int tracking_enabled; uint64_t tracking_context; struct qb_list_head list; void *conn; }; static struct qb_list_head trackers_list; /* * votequorum timers */ static corosync_timer_handle_t qdevice_timer; static int qdevice_timer_set = 0; static corosync_timer_handle_t last_man_standing_timer; static int last_man_standing_timer_set = 0; static int sync_nodeinfo_sent = 0; static int sync_wait_for_poll_or_timeout = 0; /* * Service Interfaces required by service_message_handler struct */ static int sync_in_progress = 0; static void votequorum_sync_init ( const unsigned int *trans_list, size_t trans_list_entries, const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id); static int votequorum_sync_process (void); static void votequorum_sync_activate (void); static void votequorum_sync_abort (void); static quorum_set_quorate_fn_t quorum_callback; /* * votequorum_exec handler and definitions */ static char *votequorum_exec_init_fn (struct corosync_api_v1 *api); static int votequorum_exec_exit_fn (void); static int votequorum_exec_send_nodeinfo(uint32_t nodeid); static void message_handler_req_exec_votequorum_nodeinfo ( const void *message, unsigned int nodeid); static void exec_votequorum_nodeinfo_endian_convert (void *message); static void message_handler_req_exec_votequorum_reconfigure ( const void *message, unsigned int nodeid); static void exec_votequorum_reconfigure_endian_convert (void *message); static void message_handler_req_exec_votequorum_qdevice_reg ( const void *message, unsigned int nodeid); static void exec_votequorum_qdevice_reg_endian_convert (void *message); static void message_handler_req_exec_votequorum_qdevice_reconfigure ( const void *message, unsigned int nodeid); static void exec_votequorum_qdevice_reconfigure_endian_convert (void *message); static struct corosync_exec_handler votequorum_exec_engine[] = { { /* 0 */ .exec_handler_fn = message_handler_req_exec_votequorum_nodeinfo, .exec_endian_convert_fn = exec_votequorum_nodeinfo_endian_convert }, { /* 1 */ .exec_handler_fn = message_handler_req_exec_votequorum_reconfigure, .exec_endian_convert_fn = exec_votequorum_reconfigure_endian_convert }, { /* 2 */ .exec_handler_fn = message_handler_req_exec_votequorum_qdevice_reg, .exec_endian_convert_fn = exec_votequorum_qdevice_reg_endian_convert }, { /* 3 */ .exec_handler_fn = message_handler_req_exec_votequorum_qdevice_reconfigure, .exec_endian_convert_fn = exec_votequorum_qdevice_reconfigure_endian_convert }, }; /* * Library Handler and Functions Definitions */ static int quorum_lib_init_fn (void *conn); static int quorum_lib_exit_fn (void *conn); static void qdevice_timer_fn(void *arg); static void message_handler_req_lib_votequorum_getinfo (void *conn, const void *message); static void message_handler_req_lib_votequorum_setexpected (void *conn, const void *message); static void message_handler_req_lib_votequorum_setvotes (void *conn, const void *message); static void message_handler_req_lib_votequorum_trackstart (void *conn, const void *message); static void message_handler_req_lib_votequorum_trackstop (void *conn, const void *message); static void message_handler_req_lib_votequorum_qdevice_register (void *conn, const void *message); static void message_handler_req_lib_votequorum_qdevice_unregister (void *conn, const void *message); static void message_handler_req_lib_votequorum_qdevice_update (void *conn, const void *message); static void message_handler_req_lib_votequorum_qdevice_poll (void *conn, const void *message); static void message_handler_req_lib_votequorum_qdevice_master_wins (void *conn, const void *message); static struct corosync_lib_handler quorum_lib_service[] = { { /* 0 */ .lib_handler_fn = message_handler_req_lib_votequorum_getinfo, .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 1 */ .lib_handler_fn = message_handler_req_lib_votequorum_setexpected, .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 2 */ .lib_handler_fn = message_handler_req_lib_votequorum_setvotes, .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 3 */ .lib_handler_fn = message_handler_req_lib_votequorum_trackstart, .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 4 */ .lib_handler_fn = message_handler_req_lib_votequorum_trackstop, .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 5 */ .lib_handler_fn = message_handler_req_lib_votequorum_qdevice_register, .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 6 */ .lib_handler_fn = message_handler_req_lib_votequorum_qdevice_unregister, .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 7 */ .lib_handler_fn = message_handler_req_lib_votequorum_qdevice_update, .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 8 */ .lib_handler_fn = message_handler_req_lib_votequorum_qdevice_poll, .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 9 */ .lib_handler_fn = message_handler_req_lib_votequorum_qdevice_master_wins, .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED } }; static struct corosync_service_engine votequorum_service_engine = { .name = "corosync vote quorum service v1.0", .id = VOTEQUORUM_SERVICE, .priority = 2, .private_data_size = sizeof (struct quorum_pd), .allow_inquorate = CS_LIB_ALLOW_INQUORATE, .flow_control = COROSYNC_LIB_FLOW_CONTROL_REQUIRED, .lib_init_fn = quorum_lib_init_fn, .lib_exit_fn = quorum_lib_exit_fn, .lib_engine = quorum_lib_service, .lib_engine_count = sizeof (quorum_lib_service) / sizeof (struct corosync_lib_handler), .exec_init_fn = votequorum_exec_init_fn, .exec_exit_fn = votequorum_exec_exit_fn, .exec_engine = votequorum_exec_engine, .exec_engine_count = sizeof (votequorum_exec_engine) / sizeof (struct corosync_exec_handler), .sync_init = votequorum_sync_init, .sync_process = votequorum_sync_process, .sync_activate = votequorum_sync_activate, .sync_abort = votequorum_sync_abort }; struct corosync_service_engine *votequorum_get_service_engine_ver0 (void) { return (&votequorum_service_engine); } static struct default_service votequorum_service[] = { { .name = "corosync_votequorum", .ver = 0, .loader = votequorum_get_service_engine_ver0 }, }; /* * common/utility macros/functions */ #define max(a,b) (((a) > (b)) ? (a) : (b)) static void node_add_ordered(struct cluster_node *newnode) { struct cluster_node *node = NULL; struct qb_list_head *tmp; - struct qb_list_head *newlist = &newnode->list; ENTER(); qb_list_for_each(tmp, &cluster_members_list) { node = qb_list_entry(tmp, struct cluster_node, list); if (newnode->node_id < node->node_id) { break; } } if (!node) { qb_list_add(&newnode->list, &cluster_members_list); } else { - newlist->prev = tmp->prev; - newlist->next = tmp; - tmp->prev->next = newlist; - tmp->prev = newlist; + qb_list_add_tail(&newnode->list, &node->list); } LEAVE(); } static struct cluster_node *allocate_node(unsigned int nodeid) { struct cluster_node *cl = NULL; struct qb_list_head *tmp; ENTER(); if (cluster_nodes_entries <= PROCESSOR_COUNT_MAX + 1) { cl = (struct cluster_node *)&cluster_nodes[cluster_nodes_entries]; cluster_nodes_entries++; } else { qb_list_for_each(tmp, &cluster_members_list) { cl = qb_list_entry(tmp, struct cluster_node, list); if (cl->state == NODESTATE_DEAD) { break; } } /* * this should never happen */ if (!cl) { log_printf(LOGSYS_LEVEL_CRIT, "Unable to find memory for node %u data!!", nodeid); goto out; } qb_list_del(tmp); } memset(cl, 0, sizeof(struct cluster_node)); cl->node_id = nodeid; if (nodeid != VOTEQUORUM_QDEVICE_NODEID) { node_add_ordered(cl); } out: LEAVE(); return cl; } static struct cluster_node *find_node_by_nodeid(unsigned int nodeid) { struct cluster_node *node; struct qb_list_head *tmp; ENTER(); if (nodeid == us->node_id) { LEAVE(); return us; } if (nodeid == VOTEQUORUM_QDEVICE_NODEID) { LEAVE(); return qdevice; } qb_list_for_each(tmp, &cluster_members_list) { node = qb_list_entry(tmp, struct cluster_node, list); if (node->node_id == nodeid) { LEAVE(); return node; } } LEAVE(); return NULL; } static void get_lowest_node_id(void) { struct cluster_node *node = NULL; struct qb_list_head *tmp; ENTER(); lowest_node_id = us->node_id; qb_list_for_each(tmp, &cluster_members_list) { node = qb_list_entry(tmp, struct cluster_node, list); if ((node->state == NODESTATE_MEMBER) && (node->node_id < lowest_node_id)) { lowest_node_id = node->node_id; } } log_printf(LOGSYS_LEVEL_DEBUG, "lowest node id: %d us: %d", lowest_node_id, us->node_id); icmap_set_uint32("runtime.votequorum.lowest_node_id", lowest_node_id); LEAVE(); } static void get_highest_node_id(void) { struct cluster_node *node = NULL; struct qb_list_head *tmp; ENTER(); highest_node_id = us->node_id; qb_list_for_each(tmp, &cluster_members_list) { node = qb_list_entry(tmp, struct cluster_node, list); if ((node->state == NODESTATE_MEMBER) && (node->node_id > highest_node_id)) { highest_node_id = node->node_id; } } log_printf(LOGSYS_LEVEL_DEBUG, "highest node id: %d us: %d", highest_node_id, us->node_id); icmap_set_uint32("runtime.votequorum.highest_node_id", highest_node_id); LEAVE(); } static int check_low_node_id_partition(void) { struct cluster_node *node = NULL; struct qb_list_head *tmp; int found = 0; ENTER(); qb_list_for_each(tmp, &cluster_members_list) { node = qb_list_entry(tmp, struct cluster_node, list); if ((node->state == NODESTATE_MEMBER) && (node->node_id == lowest_node_id)) { found = 1; } } LEAVE(); return found; } static int check_high_node_id_partition(void) { struct cluster_node *node = NULL; struct qb_list_head *tmp; int found = 0; ENTER(); qb_list_for_each(tmp, &cluster_members_list) { node = qb_list_entry(tmp, struct cluster_node, list); if ((node->state == NODESTATE_MEMBER) && (node->node_id == highest_node_id)) { found = 1; } } LEAVE(); return found; } static int is_in_nodelist(int nodeid, unsigned int *members, int entries) { int i; ENTER(); for (i=0; istate == NODESTATE_MEMBER) && (node->flags & NODE_FLAGS_QDEVICE_MASTER_WINS) && (node->flags & NODE_FLAGS_QDEVICE_CAST_VOTE)) { found = 1; } } LEAVE(); return found; } static void decode_flags(uint32_t flags) { ENTER(); log_printf(LOGSYS_LEVEL_DEBUG, "flags: quorate: %s Leaving: %s WFA Status: %s First: %s Qdevice: %s QdeviceAlive: %s QdeviceCastVote: %s QdeviceMasterWins: %s", (flags & NODE_FLAGS_QUORATE)?"Yes":"No", (flags & NODE_FLAGS_LEAVING)?"Yes":"No", (flags & NODE_FLAGS_WFASTATUS)?"Yes":"No", (flags & NODE_FLAGS_FIRST)?"Yes":"No", (flags & NODE_FLAGS_QDEVICE_REGISTERED)?"Yes":"No", (flags & NODE_FLAGS_QDEVICE_ALIVE)?"Yes":"No", (flags & NODE_FLAGS_QDEVICE_CAST_VOTE)?"Yes":"No", (flags & NODE_FLAGS_QDEVICE_MASTER_WINS)?"Yes":"No"); LEAVE(); } /* * load/save are copied almost pristine from totemsrp,c */ static int load_ev_tracking_barrier(void) { int res = 0; char filename[PATH_MAX]; ENTER(); snprintf(filename, sizeof(filename) - 1, "%s/ev_tracking", get_run_dir()); ev_tracking_fd = open(filename, O_RDWR, 0700); if (ev_tracking_fd != -1) { res = read (ev_tracking_fd, &ev_tracking_barrier, sizeof(uint32_t)); close(ev_tracking_fd); if (res == sizeof (uint32_t)) { LEAVE(); return 0; } } ev_tracking_barrier = 0; umask(0); ev_tracking_fd = open (filename, O_CREAT|O_RDWR, 0700); if (ev_tracking_fd != -1) { res = write (ev_tracking_fd, &ev_tracking_barrier, sizeof (uint32_t)); if ((res == -1) || (res != sizeof (uint32_t))) { log_printf(LOGSYS_LEVEL_WARNING, "Unable to write to %s", filename); } close(ev_tracking_fd); LEAVE(); return 0; } log_printf(LOGSYS_LEVEL_WARNING, "Unable to create %s file", filename); LEAVE(); return -1; } static void update_wait_for_all_status(uint8_t wfa_status) { ENTER(); wait_for_all_status = wfa_status; if (wait_for_all_status) { us->flags |= NODE_FLAGS_WFASTATUS; } else { us->flags &= ~NODE_FLAGS_WFASTATUS; } icmap_set_uint8("runtime.votequorum.wait_for_all_status", wait_for_all_status); LEAVE(); } static void update_two_node(void) { ENTER(); icmap_set_uint8("runtime.votequorum.two_node", two_node); LEAVE(); } static void update_ev_barrier(uint32_t expected_votes) { ENTER(); ev_barrier = expected_votes; icmap_set_uint32("runtime.votequorum.ev_barrier", ev_barrier); LEAVE(); } static void update_qdevice_can_operate(uint8_t status) { ENTER(); qdevice_can_operate = status; icmap_set_uint8("runtime.votequorum.qdevice_can_operate", qdevice_can_operate); LEAVE(); } static void update_qdevice_master_wins(uint8_t allow) { ENTER(); qdevice_master_wins = allow; icmap_set_uint8("runtime.votequorum.qdevice_master_wins", qdevice_master_wins); LEAVE(); } static void update_ev_tracking_barrier(uint32_t ev_t_barrier) { int res; ENTER(); ev_tracking_barrier = ev_t_barrier; icmap_set_uint32("runtime.votequorum.ev_tracking_barrier", ev_tracking_barrier); if (lseek (ev_tracking_fd, 0, SEEK_SET) != 0) { log_printf(LOGSYS_LEVEL_WARNING, "Unable to update ev_tracking_barrier on disk data!!!"); LEAVE(); return; } res = write (ev_tracking_fd, &ev_tracking_barrier, sizeof (uint32_t)); if (res != sizeof (uint32_t)) { log_printf(LOGSYS_LEVEL_WARNING, "Unable to update ev_tracking_barrier on disk data!!!"); } #ifdef HAVE_FDATASYNC fdatasync(ev_tracking_fd); #else fsync(ev_tracking_fd); #endif LEAVE(); } /* * quorum calculation core bits */ static int calculate_quorum(int allow_decrease, unsigned int max_expected, unsigned int *ret_total_votes) { struct qb_list_head *nodelist; struct cluster_node *node; unsigned int total_votes = 0; unsigned int highest_expected = 0; unsigned int newquorum, q1, q2; unsigned int total_nodes = 0; ENTER(); if ((allow_downscale) && (allow_decrease) && (max_expected)) { max_expected = max(ev_barrier, max_expected); } qb_list_for_each(nodelist, &cluster_members_list) { node = qb_list_entry(nodelist, struct cluster_node, list); log_printf(LOGSYS_LEVEL_DEBUG, "node %u state=%d, votes=%u, expected=%u", node->node_id, node->state, node->votes, node->expected_votes); if (node->state == NODESTATE_MEMBER) { highest_expected = max(highest_expected, node->expected_votes); total_votes += node->votes; total_nodes++; } } if (us->flags & NODE_FLAGS_QDEVICE_CAST_VOTE) { log_printf(LOGSYS_LEVEL_DEBUG, "node 0 state=1, votes=%u", qdevice->votes); total_votes += qdevice->votes; total_nodes++; } if (max_expected > 0) { highest_expected = max_expected; } /* * This quorum calculation is taken from the OpenVMS Cluster Systems * manual, but, then, you guessed that didn't you */ q1 = (highest_expected + 2) / 2; q2 = (total_votes + 2) / 2; newquorum = max(q1, q2); /* * Normally quorum never decreases but the system administrator can * force it down by setting expected votes to a maximum value */ if (!allow_decrease) { newquorum = max(quorum, newquorum); } /* * The special two_node mode allows each of the two nodes to retain * quorum if the other fails. Only one of the two should live past * fencing (as both nodes try to fence each other in split-brain.) * Also: if there are more than two nodes, force us inquorate to avoid * any damage or confusion. */ if (two_node && total_nodes <= 2) { newquorum = 1; } if (ret_total_votes) { *ret_total_votes = total_votes; } LEAVE(); return newquorum; } static void update_node_expected_votes(int new_expected_votes) { struct qb_list_head *nodelist; struct cluster_node *node; if (new_expected_votes) { qb_list_for_each(nodelist, &cluster_members_list) { node = qb_list_entry(nodelist, struct cluster_node, list); if (node->state == NODESTATE_MEMBER) { node->expected_votes = new_expected_votes; } } } } static void are_we_quorate(unsigned int total_votes) { int quorate; int quorum_change = 0; ENTER(); /* * wait for all nodes to show up before granting quorum */ if ((wait_for_all) && (wait_for_all_status)) { if (total_votes != us->expected_votes) { log_printf(LOGSYS_LEVEL_NOTICE, "Waiting for all cluster members. " "Current votes: %d expected_votes: %d", total_votes, us->expected_votes); cluster_is_quorate = 0; return; } update_wait_for_all_status(0); } if (quorum > total_votes) { quorate = 0; } else { quorate = 1; get_lowest_node_id(); get_highest_node_id(); } if ((auto_tie_breaker != ATB_NONE) && /* Must be a half (or half-1) split */ (total_votes == (us->expected_votes / 2)) && /* If the 'other' partition in a split might have quorum then we can't run ATB */ (previous_quorum_members_entries - quorum_members_entries < quorum) && (check_auto_tie_breaker() == 1)) { quorate = 1; } if ((qdevice_master_wins) && (!quorate) && (check_qdevice_master() == 1)) { log_printf(LOGSYS_LEVEL_DEBUG, "node is quorate as part of master_wins partition"); quorate = 1; } if (cluster_is_quorate && !quorate) { quorum_change = 1; log_printf(LOGSYS_LEVEL_DEBUG, "quorum lost, blocking activity"); } if (!cluster_is_quorate && quorate) { quorum_change = 1; log_printf(LOGSYS_LEVEL_DEBUG, "quorum regained, resuming activity"); } cluster_is_quorate = quorate; if (cluster_is_quorate) { us->flags |= NODE_FLAGS_QUORATE; } else { us->flags &= ~NODE_FLAGS_QUORATE; } if (wait_for_all) { if (quorate) { update_wait_for_all_status(0); } else { update_wait_for_all_status(1); } } if ((quorum_change) && (sync_in_progress == 0)) { quorum_callback(quorum_members, quorum_members_entries, cluster_is_quorate, &quorum_ringid); votequorum_exec_send_quorum_notification(NULL, 0L); } LEAVE(); } static void get_total_votes(unsigned int *totalvotes, unsigned int *current_members) { unsigned int total_votes = 0; unsigned int cluster_members = 0; struct qb_list_head *nodelist; struct cluster_node *node; ENTER(); qb_list_for_each(nodelist, &cluster_members_list) { node = qb_list_entry(nodelist, struct cluster_node, list); if (node->state == NODESTATE_MEMBER) { cluster_members++; total_votes += node->votes; } } if (qdevice->votes) { total_votes += qdevice->votes; cluster_members++; } *totalvotes = total_votes; *current_members = cluster_members; LEAVE(); } /* * Recalculate cluster quorum, set quorate and notify changes */ static void recalculate_quorum(int allow_decrease, int by_current_nodes) { unsigned int total_votes = 0; unsigned int cluster_members = 0; ENTER(); get_total_votes(&total_votes, &cluster_members); if (!by_current_nodes) { cluster_members = 0; } /* * Keep expected_votes at the highest number of votes in the cluster */ log_printf(LOGSYS_LEVEL_DEBUG, "total_votes=%d, expected_votes=%d", total_votes, us->expected_votes); if (total_votes > us->expected_votes) { us->expected_votes = total_votes; votequorum_exec_send_expectedvotes_notification(); } if ((ev_tracking) && (us->expected_votes > ev_tracking_barrier)) { update_ev_tracking_barrier(us->expected_votes); } quorum = calculate_quorum(allow_decrease, cluster_members, &total_votes); update_node_expected_votes(cluster_members); are_we_quorate(total_votes); LEAVE(); } /* * configuration bits and pieces */ static int votequorum_read_nodelist_configuration(uint32_t *votes, uint32_t *nodes, uint32_t *expected_votes) { icmap_iter_t iter; const char *iter_key; char tmp_key[ICMAP_KEYNAME_MAXLEN]; uint32_t our_pos, node_pos; uint32_t nodecount = 0; uint32_t nodelist_expected_votes = 0; uint32_t node_votes = 0; int res = 0; ENTER(); if (icmap_get_uint32("nodelist.local_node_pos", &our_pos) != CS_OK) { log_printf(LOGSYS_LEVEL_DEBUG, "No nodelist defined or our node is not in the nodelist"); return 0; } iter = icmap_iter_init("nodelist.node."); while ((iter_key = icmap_iter_next(iter, NULL, NULL)) != NULL) { res = sscanf(iter_key, "nodelist.node.%u.%s", &node_pos, tmp_key); if (res != 2) { continue; } if (strcmp(tmp_key, "ring0_addr") != 0) { continue; } nodecount++; snprintf(tmp_key, ICMAP_KEYNAME_MAXLEN, "nodelist.node.%u.quorum_votes", node_pos); if (icmap_get_uint32(tmp_key, &node_votes) != CS_OK) { node_votes = 1; } nodelist_expected_votes = nodelist_expected_votes + node_votes; if (node_pos == our_pos) { *votes = node_votes; } } *expected_votes = nodelist_expected_votes; *nodes = nodecount; icmap_iter_finalize(iter); LEAVE(); return 1; } static int votequorum_qdevice_is_configured(uint32_t *qdevice_votes) { char *qdevice_model = NULL; int ret = 0; ENTER(); if (icmap_get_string("quorum.device.model", &qdevice_model) == CS_OK) { if (strlen(qdevice_model)) { if (icmap_get_uint32("quorum.device.votes", qdevice_votes) != CS_OK) { *qdevice_votes = -1; } if (icmap_get_uint32("quorum.device.timeout", &qdevice_timeout) != CS_OK) { qdevice_timeout = VOTEQUORUM_QDEVICE_DEFAULT_TIMEOUT; } if (icmap_get_uint32("quorum.device.sync_timeout", &qdevice_sync_timeout) != CS_OK) { qdevice_sync_timeout = VOTEQUORUM_QDEVICE_DEFAULT_SYNC_TIMEOUT; } update_qdevice_can_operate(1); ret = 1; } free(qdevice_model); } LEAVE(); return ret; } #define VOTEQUORUM_READCONFIG_STARTUP 0 #define VOTEQUORUM_READCONFIG_RUNTIME 1 static char *votequorum_readconfig(int runtime) { uint32_t node_votes = 0, qdevice_votes = 0; uint32_t node_expected_votes = 0, expected_votes = 0; uint32_t node_count = 0; uint8_t atb = 0; int have_nodelist, have_qdevice; char *atb_string = NULL; char *error = NULL; ENTER(); log_printf(LOGSYS_LEVEL_DEBUG, "Reading configuration (runtime: %d)", runtime); /* * Set the few things we re-read at runtime back to their defaults */ if (runtime) { two_node = 0; expected_votes = 0; } /* * gather basic data here */ icmap_get_uint32("quorum.expected_votes", &expected_votes); have_nodelist = votequorum_read_nodelist_configuration(&node_votes, &node_count, &node_expected_votes); have_qdevice = votequorum_qdevice_is_configured(&qdevice_votes); icmap_get_uint8("quorum.two_node", &two_node); /* * do config verification and enablement */ if ((!have_nodelist) && (!expected_votes)) { if (!runtime) { error = (char *)"configuration error: nodelist or quorum.expected_votes must be configured!"; } else { log_printf(LOGSYS_LEVEL_CRIT, "configuration error: nodelist or quorum.expected_votes must be configured!"); log_printf(LOGSYS_LEVEL_CRIT, "will continue with current runtime data"); } goto out; } /* * two_node and qdevice are not compatible in the same config. * try to make an educated guess of what to do */ if ((two_node) && (have_qdevice)) { if (!runtime) { error = (char *)"configuration error: two_node and quorum device cannot be configured at the same time!"; goto out; } else { log_printf(LOGSYS_LEVEL_CRIT, "configuration error: two_node and quorum device cannot be configured at the same time!"); if (us->flags & NODE_FLAGS_QDEVICE_REGISTERED) { log_printf(LOGSYS_LEVEL_CRIT, "quorum device is registered, disabling two_node"); two_node = 0; } else { log_printf(LOGSYS_LEVEL_CRIT, "quorum device is not registered, allowing two_node"); update_qdevice_can_operate(0); } } } /* * Enable special features */ if (!runtime) { if (two_node) { wait_for_all = 1; } icmap_get_uint8("quorum.allow_downscale", &allow_downscale); icmap_get_uint8("quorum.wait_for_all", &wait_for_all); icmap_get_uint8("quorum.last_man_standing", &last_man_standing); icmap_get_uint32("quorum.last_man_standing_window", &last_man_standing_window); icmap_get_uint8("quorum.expected_votes_tracking", &ev_tracking); icmap_get_uint8("quorum.auto_tie_breaker", &atb); icmap_get_string("quorum.auto_tie_breaker_node", &atb_string); /* auto_tie_breaker defaults to LOWEST */ if (atb) { auto_tie_breaker = ATB_LOWEST; icmap_set_uint32("runtime.votequorum.atb_type", auto_tie_breaker); } else { auto_tie_breaker = ATB_NONE; if (atb_string) { log_printf(LOGSYS_LEVEL_WARNING, "auto_tie_breaker_node: is meaningless if auto_tie_breaker is set to 0"); } } if (atb && atb_string) { parse_atb_string(atb_string); } free(atb_string); /* allow_downscale requires ev_tracking */ if (allow_downscale) { ev_tracking = 1; } if (ev_tracking) { if (load_ev_tracking_barrier() < 0) { LEAVE(); return ((char *)"Unable to load ev_tracking file!"); } update_ev_tracking_barrier(ev_tracking_barrier); } } /* two_node and auto_tie_breaker are not compatible as two_node uses * a fence race to decide quorum whereas ATB decides based on node id */ if (two_node && auto_tie_breaker != ATB_NONE) { log_printf(LOGSYS_LEVEL_CRIT, "two_node and auto_tie_breaker are both specified but are not compatible."); log_printf(LOGSYS_LEVEL_CRIT, "two_node has been disabled, please fix your corosync.conf"); two_node = 0; } /* If ATB is set and the cluster has an odd number of nodes then wait_for_all needs * to be set so that an isolated half+1 without the tie breaker node * does not have quorum on reboot. */ if ((auto_tie_breaker != ATB_NONE) && (node_expected_votes % 2) && (!wait_for_all)) { if (last_man_standing) { /* if LMS is set too, it's a fatal configuration error. We can't dictate to the user what * they might want so we'll just quit. */ log_printf(LOGSYS_LEVEL_CRIT, "auto_tie_breaker is set, the cluster has an odd number of nodes\n"); log_printf(LOGSYS_LEVEL_CRIT, "and last_man_standing is also set. With this situation a better\n"); log_printf(LOGSYS_LEVEL_CRIT, "solution would be to disable LMS, leave ATB enabled, and also\n"); log_printf(LOGSYS_LEVEL_CRIT, "enable wait_for_all (mandatory for ATB in odd-numbered clusters).\n"); log_printf(LOGSYS_LEVEL_CRIT, "Due to this ambiguity, corosync will fail to start. Please fix your corosync.conf\n"); error = (char *)"configuration error: auto_tie_breaker & last_man_standing not available in odd sized cluster"; goto out; } else { log_printf(LOGSYS_LEVEL_CRIT, "auto_tie_breaker is set and the cluster has an odd number of nodes.\n"); log_printf(LOGSYS_LEVEL_CRIT, "wait_for_all needs to be set for this configuration but it is missing\n"); log_printf(LOGSYS_LEVEL_CRIT, "Therefore auto_tie_breaker has been disabled. Please fix your corosync.conf\n"); auto_tie_breaker = ATB_NONE; icmap_set_uint32("runtime.votequorum.atb_type", auto_tie_breaker); } } /* * quorum device is not compatible with last_man_standing and auto_tie_breaker * neither lms or atb can be set at runtime, so there is no need to check for * runtime incompatibilities, but qdevice can be configured _after_ LMS and ATB have * been enabled at startup. */ if ((have_qdevice) && (last_man_standing)) { if (!runtime) { error = (char *)"configuration error: quorum.device is not compatible with last_man_standing"; goto out; } else { log_printf(LOGSYS_LEVEL_CRIT, "configuration error: quorum.device is not compatible with last_man_standing"); log_printf(LOGSYS_LEVEL_CRIT, "disabling quorum device operations"); update_qdevice_can_operate(0); } } if ((have_qdevice) && (auto_tie_breaker != ATB_NONE)) { if (!runtime) { error = (char *)"configuration error: quorum.device is not compatible with auto_tie_breaker"; goto out; } else { log_printf(LOGSYS_LEVEL_CRIT, "configuration error: quorum.device is not compatible with auto_tie_breaker"); log_printf(LOGSYS_LEVEL_CRIT, "disabling quorum device operations"); update_qdevice_can_operate(0); } } if ((have_qdevice) && (allow_downscale)) { if (!runtime) { error = (char *)"configuration error: quorum.device is not compatible with allow_downscale"; goto out; } else { log_printf(LOGSYS_LEVEL_CRIT, "configuration error: quorum.device is not compatible with allow_downscale"); log_printf(LOGSYS_LEVEL_CRIT, "disabling quorum device operations"); update_qdevice_can_operate(0); } } /* * if user specifies quorum.expected_votes + quorum.device but NOT the device.votes * we don't know what the quorum device should vote. */ if ((expected_votes) && (have_qdevice) && (qdevice_votes == -1)) { if (!runtime) { error = (char *)"configuration error: quorum.device.votes must be specified when quorum.expected_votes is set"; goto out; } else { log_printf(LOGSYS_LEVEL_CRIT, "configuration error: quorum.device.votes must be specified when quorum.expected_votes is set"); log_printf(LOGSYS_LEVEL_CRIT, "disabling quorum device operations"); update_qdevice_can_operate(0); } } /* * if user specifies a node list with uneven votes and no device.votes * we cannot autocalculate the votes */ if ((have_qdevice) && (qdevice_votes == -1) && (have_nodelist) && (node_count != node_expected_votes)) { if (!runtime) { error = (char *)"configuration error: quorum.device.votes must be specified when not all nodes votes 1"; goto out; } else { log_printf(LOGSYS_LEVEL_CRIT, "configuration error: quorum.device.votes must be specified when not all nodes votes 1"); log_printf(LOGSYS_LEVEL_CRIT, "disabling quorum device operations"); update_qdevice_can_operate(0); } } /* * validate quorum device votes vs expected_votes */ if ((qdevice_votes > 0) && (expected_votes)) { int delta = expected_votes - qdevice_votes; if (delta < 2) { if (!runtime) { error = (char *)"configuration error: quorum.device.votes is too high or expected_votes is too low"; goto out; } else { log_printf(LOGSYS_LEVEL_CRIT, "configuration error: quorum.device.votes is too high or expected_votes is too low"); log_printf(LOGSYS_LEVEL_CRIT, "disabling quorum device operations"); update_qdevice_can_operate(0); } } } /* * automatically calculate device votes and adjust expected_votes from nodelist */ if ((have_qdevice) && (qdevice_votes == -1) && (!expected_votes) && (have_nodelist) && (node_count == node_expected_votes)) { qdevice_votes = node_expected_votes - 1; node_expected_votes = node_expected_votes + qdevice_votes; } /* * set this node votes and expected_votes */ log_printf(LOGSYS_LEVEL_DEBUG, "ev_tracking=%d, ev_tracking_barrier = %d: expected_votes = %d\n", ev_tracking, ev_tracking_barrier, expected_votes); if (ev_tracking) { expected_votes = ev_tracking_barrier; } if (have_nodelist) { us->votes = node_votes; us->expected_votes = node_expected_votes; } else { us->votes = 1; icmap_get_uint32("quorum.votes", &us->votes); } if (expected_votes) { us->expected_votes = expected_votes; } /* * set qdevice votes */ if (!have_qdevice) { qdevice->votes = 0; } if (qdevice_votes != -1) { qdevice->votes = qdevice_votes; } update_ev_barrier(us->expected_votes); update_two_node(); if (wait_for_all) { update_wait_for_all_status(1); } out: LEAVE(); return error; } static void votequorum_refresh_config( int32_t event, const char *key_name, struct icmap_notify_value new_val, struct icmap_notify_value old_val, void *user_data) { int old_votes, old_expected_votes; uint8_t reloading; uint8_t cancel_wfa; ENTER(); /* * If a full reload is in progress then don't do anything until it's done and * can reconfigure it all atomically */ if (icmap_get_uint8("config.totemconfig_reload_in_progress", &reloading) == CS_OK && reloading) { return ; } icmap_get_uint8("quorum.cancel_wait_for_all", &cancel_wfa); if (strcmp(key_name, "quorum.cancel_wait_for_all") == 0 && cancel_wfa >= 1) { icmap_set_uint8("quorum.cancel_wait_for_all", 0); votequorum_exec_send_reconfigure(VOTEQUORUM_RECONFIG_PARAM_CANCEL_WFA, us->node_id, 0); return; } old_votes = us->votes; old_expected_votes = us->expected_votes; /* * Reload the configuration */ votequorum_readconfig(VOTEQUORUM_READCONFIG_RUNTIME); /* * activate new config */ votequorum_exec_send_nodeinfo(us->node_id); votequorum_exec_send_nodeinfo(VOTEQUORUM_QDEVICE_NODEID); if (us->votes != old_votes) { votequorum_exec_send_reconfigure(VOTEQUORUM_RECONFIG_PARAM_NODE_VOTES, us->node_id, us->votes); } if (us->expected_votes != old_expected_votes) { votequorum_exec_send_reconfigure(VOTEQUORUM_RECONFIG_PARAM_EXPECTED_VOTES, us->node_id, us->expected_votes); } LEAVE(); } static void votequorum_exec_add_config_notification(void) { icmap_track_t icmap_track_nodelist = NULL; icmap_track_t icmap_track_quorum = NULL; icmap_track_t icmap_track_reload = NULL; ENTER(); icmap_track_add("nodelist.", ICMAP_TRACK_ADD | ICMAP_TRACK_DELETE | ICMAP_TRACK_MODIFY | ICMAP_TRACK_PREFIX, votequorum_refresh_config, NULL, &icmap_track_nodelist); icmap_track_add("quorum.", ICMAP_TRACK_ADD | ICMAP_TRACK_DELETE | ICMAP_TRACK_MODIFY | ICMAP_TRACK_PREFIX, votequorum_refresh_config, NULL, &icmap_track_quorum); icmap_track_add("config.totemconfig_reload_in_progress", ICMAP_TRACK_ADD | ICMAP_TRACK_MODIFY, votequorum_refresh_config, NULL, &icmap_track_reload); LEAVE(); } /* * votequorum_exec core */ static int votequorum_exec_send_reconfigure(uint8_t param, unsigned int nodeid, uint32_t value) { struct req_exec_quorum_reconfigure req_exec_quorum_reconfigure; struct iovec iov[1]; int ret; ENTER(); req_exec_quorum_reconfigure.nodeid = nodeid; req_exec_quorum_reconfigure.value = value; req_exec_quorum_reconfigure.param = param; req_exec_quorum_reconfigure._pad0 = 0; req_exec_quorum_reconfigure._pad1 = 0; req_exec_quorum_reconfigure._pad2 = 0; req_exec_quorum_reconfigure.header.id = SERVICE_ID_MAKE(VOTEQUORUM_SERVICE, MESSAGE_REQ_EXEC_VOTEQUORUM_RECONFIGURE); req_exec_quorum_reconfigure.header.size = sizeof(req_exec_quorum_reconfigure); iov[0].iov_base = (void *)&req_exec_quorum_reconfigure; iov[0].iov_len = sizeof(req_exec_quorum_reconfigure); ret = corosync_api->totem_mcast (iov, 1, TOTEM_AGREED); LEAVE(); return ret; } static int votequorum_exec_send_nodeinfo(uint32_t nodeid) { struct req_exec_quorum_nodeinfo req_exec_quorum_nodeinfo; struct iovec iov[1]; struct cluster_node *node; int ret; ENTER(); node = find_node_by_nodeid(nodeid); if (!node) { return -1; } req_exec_quorum_nodeinfo.nodeid = nodeid; req_exec_quorum_nodeinfo.votes = node->votes; req_exec_quorum_nodeinfo.expected_votes = node->expected_votes; req_exec_quorum_nodeinfo.flags = node->flags; if (nodeid != VOTEQUORUM_QDEVICE_NODEID) { decode_flags(node->flags); } req_exec_quorum_nodeinfo.header.id = SERVICE_ID_MAKE(VOTEQUORUM_SERVICE, MESSAGE_REQ_EXEC_VOTEQUORUM_NODEINFO); req_exec_quorum_nodeinfo.header.size = sizeof(req_exec_quorum_nodeinfo); iov[0].iov_base = (void *)&req_exec_quorum_nodeinfo; iov[0].iov_len = sizeof(req_exec_quorum_nodeinfo); ret = corosync_api->totem_mcast (iov, 1, TOTEM_AGREED); LEAVE(); return ret; } static int votequorum_exec_send_qdevice_reconfigure(const char *oldname, const char *newname) { struct req_exec_quorum_qdevice_reconfigure req_exec_quorum_qdevice_reconfigure; struct iovec iov[1]; int ret; ENTER(); req_exec_quorum_qdevice_reconfigure.header.id = SERVICE_ID_MAKE(VOTEQUORUM_SERVICE, MESSAGE_REQ_EXEC_VOTEQUORUM_QDEVICE_RECONFIGURE); req_exec_quorum_qdevice_reconfigure.header.size = sizeof(req_exec_quorum_qdevice_reconfigure); strcpy(req_exec_quorum_qdevice_reconfigure.oldname, oldname); strcpy(req_exec_quorum_qdevice_reconfigure.newname, newname); iov[0].iov_base = (void *)&req_exec_quorum_qdevice_reconfigure; iov[0].iov_len = sizeof(req_exec_quorum_qdevice_reconfigure); ret = corosync_api->totem_mcast (iov, 1, TOTEM_AGREED); LEAVE(); return ret; } static int votequorum_exec_send_qdevice_reg(uint32_t operation, const char *qdevice_name_req) { struct req_exec_quorum_qdevice_reg req_exec_quorum_qdevice_reg; struct iovec iov[1]; int ret; ENTER(); req_exec_quorum_qdevice_reg.header.id = SERVICE_ID_MAKE(VOTEQUORUM_SERVICE, MESSAGE_REQ_EXEC_VOTEQUORUM_QDEVICE_REG); req_exec_quorum_qdevice_reg.header.size = sizeof(req_exec_quorum_qdevice_reg); req_exec_quorum_qdevice_reg.operation = operation; strcpy(req_exec_quorum_qdevice_reg.qdevice_name, qdevice_name_req); iov[0].iov_base = (void *)&req_exec_quorum_qdevice_reg; iov[0].iov_len = sizeof(req_exec_quorum_qdevice_reg); ret = corosync_api->totem_mcast (iov, 1, TOTEM_AGREED); LEAVE(); return ret; } static int votequorum_exec_send_quorum_notification(void *conn, uint64_t context) { struct res_lib_votequorum_quorum_notification *res_lib_votequorum_notification; struct qb_list_head *tmp; struct cluster_node *node; int i = 0; int cluster_members = 0; int size; char buf[sizeof(struct res_lib_votequorum_quorum_notification) + sizeof(struct votequorum_node) * (PROCESSOR_COUNT_MAX + 2)]; ENTER(); log_printf(LOGSYS_LEVEL_DEBUG, "Sending quorum callback, quorate = %d", cluster_is_quorate); qb_list_for_each(tmp, &cluster_members_list) { node = qb_list_entry(tmp, struct cluster_node, list); cluster_members++; } if (us->flags & NODE_FLAGS_QDEVICE_REGISTERED) { cluster_members++; } size = sizeof(struct res_lib_votequorum_quorum_notification) + sizeof(struct votequorum_node) * cluster_members; res_lib_votequorum_notification = (struct res_lib_votequorum_quorum_notification *)&buf; res_lib_votequorum_notification->quorate = cluster_is_quorate; res_lib_votequorum_notification->context = context; res_lib_votequorum_notification->node_list_entries = cluster_members; res_lib_votequorum_notification->header.id = MESSAGE_RES_VOTEQUORUM_QUORUM_NOTIFICATION; res_lib_votequorum_notification->header.size = size; res_lib_votequorum_notification->header.error = CS_OK; /* Send all known nodes and their states */ qb_list_for_each(tmp, &cluster_members_list) { node = qb_list_entry(tmp, struct cluster_node, list); res_lib_votequorum_notification->node_list[i].nodeid = node->node_id; res_lib_votequorum_notification->node_list[i++].state = node->state; } if (us->flags & NODE_FLAGS_QDEVICE_REGISTERED) { res_lib_votequorum_notification->node_list[i].nodeid = VOTEQUORUM_QDEVICE_NODEID; res_lib_votequorum_notification->node_list[i++].state = qdevice->state; } /* Send it to all interested parties */ if (conn) { int ret = corosync_api->ipc_dispatch_send(conn, &buf, size); LEAVE(); return ret; } else { struct quorum_pd *qpd; qb_list_for_each(tmp, &trackers_list) { qpd = qb_list_entry(tmp, struct quorum_pd, list); res_lib_votequorum_notification->context = qpd->tracking_context; corosync_api->ipc_dispatch_send(qpd->conn, &buf, size); } } LEAVE(); return 0; } static int votequorum_exec_send_nodelist_notification(void *conn, uint64_t context) { struct res_lib_votequorum_nodelist_notification *res_lib_votequorum_notification; int i = 0; int size; struct qb_list_head *tmp; char buf[sizeof(struct res_lib_votequorum_nodelist_notification) + sizeof(uint32_t) * quorum_members_entries]; ENTER(); log_printf(LOGSYS_LEVEL_DEBUG, "Sending nodelist callback. ring_id = %d/%lld", quorum_ringid.rep.nodeid, quorum_ringid.seq); size = sizeof(struct res_lib_votequorum_nodelist_notification) + sizeof(uint32_t) * quorum_members_entries; res_lib_votequorum_notification = (struct res_lib_votequorum_nodelist_notification *)&buf; res_lib_votequorum_notification->node_list_entries = quorum_members_entries; res_lib_votequorum_notification->ring_id.nodeid = quorum_ringid.rep.nodeid; res_lib_votequorum_notification->ring_id.seq = quorum_ringid.seq; res_lib_votequorum_notification->context = context; for (i=0; inode_list[i] = quorum_members[i]; } res_lib_votequorum_notification->header.id = MESSAGE_RES_VOTEQUORUM_NODELIST_NOTIFICATION; res_lib_votequorum_notification->header.size = size; res_lib_votequorum_notification->header.error = CS_OK; /* Send it to all interested parties */ if (conn) { int ret = corosync_api->ipc_dispatch_send(conn, &buf, size); LEAVE(); return ret; } else { struct quorum_pd *qpd; qb_list_for_each(tmp, &trackers_list) { qpd = qb_list_entry(tmp, struct quorum_pd, list); res_lib_votequorum_notification->context = qpd->tracking_context; corosync_api->ipc_dispatch_send(qpd->conn, &buf, size); } } LEAVE(); return 0; } static void votequorum_exec_send_expectedvotes_notification(void) { struct res_lib_votequorum_expectedvotes_notification res_lib_votequorum_expectedvotes_notification; struct quorum_pd *qpd; struct qb_list_head *tmp; ENTER(); log_printf(LOGSYS_LEVEL_DEBUG, "Sending expected votes callback"); res_lib_votequorum_expectedvotes_notification.header.id = MESSAGE_RES_VOTEQUORUM_EXPECTEDVOTES_NOTIFICATION; res_lib_votequorum_expectedvotes_notification.header.size = sizeof(res_lib_votequorum_expectedvotes_notification); res_lib_votequorum_expectedvotes_notification.header.error = CS_OK; res_lib_votequorum_expectedvotes_notification.expected_votes = us->expected_votes; qb_list_for_each(tmp, &trackers_list) { qpd = qb_list_entry(tmp, struct quorum_pd, list); res_lib_votequorum_expectedvotes_notification.context = qpd->tracking_context; corosync_api->ipc_dispatch_send(qpd->conn, &res_lib_votequorum_expectedvotes_notification, sizeof(struct res_lib_votequorum_expectedvotes_notification)); } LEAVE(); } static void exec_votequorum_qdevice_reconfigure_endian_convert (void *message) { ENTER(); LEAVE(); } static void message_handler_req_exec_votequorum_qdevice_reconfigure ( const void *message, unsigned int nodeid) { const struct req_exec_quorum_qdevice_reconfigure *req_exec_quorum_qdevice_reconfigure = message; ENTER(); log_printf(LOGSYS_LEVEL_DEBUG, "Received qdevice name change req from node %u [from: %s to: %s]", nodeid, req_exec_quorum_qdevice_reconfigure->oldname, req_exec_quorum_qdevice_reconfigure->newname); if (!strcmp(req_exec_quorum_qdevice_reconfigure->oldname, qdevice_name)) { log_printf(LOGSYS_LEVEL_DEBUG, "Allowing qdevice rename"); memset(qdevice_name, 0, VOTEQUORUM_QDEVICE_MAX_NAME_LEN); strcpy(qdevice_name, req_exec_quorum_qdevice_reconfigure->newname); /* * TODO: notify qdevices about name change? * this is not relevant for now and can wait later on since * qdevices are local only and libvotequorum is not final */ } LEAVE(); } static void exec_votequorum_qdevice_reg_endian_convert (void *message) { struct req_exec_quorum_qdevice_reg *req_exec_quorum_qdevice_reg = message; ENTER(); req_exec_quorum_qdevice_reg->operation = swab32(req_exec_quorum_qdevice_reg->operation); LEAVE(); } static void message_handler_req_exec_votequorum_qdevice_reg ( const void *message, unsigned int nodeid) { const struct req_exec_quorum_qdevice_reg *req_exec_quorum_qdevice_reg = message; struct res_lib_votequorum_status res_lib_votequorum_status; int wipe_qdevice_name = 1; struct cluster_node *node = NULL; struct qb_list_head *tmp; cs_error_t error = CS_OK; ENTER(); log_printf(LOGSYS_LEVEL_DEBUG, "Received qdevice op %u req from node %u [%s]", req_exec_quorum_qdevice_reg->operation, nodeid, req_exec_quorum_qdevice_reg->qdevice_name); switch(req_exec_quorum_qdevice_reg->operation) { case VOTEQUORUM_QDEVICE_OPERATION_REGISTER: if (nodeid != us->node_id) { if (!strlen(qdevice_name)) { log_printf(LOGSYS_LEVEL_DEBUG, "Remote qdevice name recorded"); strcpy(qdevice_name, req_exec_quorum_qdevice_reg->qdevice_name); } LEAVE(); return; } /* * protect against the case where we broadcast qdevice registration * to new memebers, we receive the message back, but there is no registration * connection in progress */ if (us->flags & NODE_FLAGS_QDEVICE_REGISTERED) { LEAVE(); return; } /* * this should NEVER happen */ if (!qdevice_reg_conn) { log_printf(LOGSYS_LEVEL_WARNING, "Unable to determine origin of the qdevice register call!"); LEAVE(); return; } /* * registering our own device in this case */ if (!strlen(qdevice_name)) { strcpy(qdevice_name, req_exec_quorum_qdevice_reg->qdevice_name); } /* * check if it is our device or something else */ if ((!strncmp(req_exec_quorum_qdevice_reg->qdevice_name, qdevice_name, VOTEQUORUM_QDEVICE_MAX_NAME_LEN))) { us->flags |= NODE_FLAGS_QDEVICE_REGISTERED; votequorum_exec_send_nodeinfo(VOTEQUORUM_QDEVICE_NODEID); votequorum_exec_send_nodeinfo(us->node_id); } else { log_printf(LOGSYS_LEVEL_WARNING, "A new qdevice with different name (new: %s old: %s) is trying to register!", req_exec_quorum_qdevice_reg->qdevice_name, qdevice_name); error = CS_ERR_EXIST; } res_lib_votequorum_status.header.size = sizeof(res_lib_votequorum_status); res_lib_votequorum_status.header.id = MESSAGE_RES_VOTEQUORUM_STATUS; res_lib_votequorum_status.header.error = error; corosync_api->ipc_response_send(qdevice_reg_conn, &res_lib_votequorum_status, sizeof(res_lib_votequorum_status)); qdevice_reg_conn = NULL; break; case VOTEQUORUM_QDEVICE_OPERATION_UNREGISTER: qb_list_for_each(tmp, &cluster_members_list) { node = qb_list_entry(tmp, struct cluster_node, list); if ((node->state == NODESTATE_MEMBER) && (node->flags & NODE_FLAGS_QDEVICE_REGISTERED)) { wipe_qdevice_name = 0; } } if (wipe_qdevice_name) { memset(qdevice_name, 0, VOTEQUORUM_QDEVICE_MAX_NAME_LEN); } break; } LEAVE(); } static void exec_votequorum_nodeinfo_endian_convert (void *message) { struct req_exec_quorum_nodeinfo *nodeinfo = message; ENTER(); nodeinfo->nodeid = swab32(nodeinfo->nodeid); nodeinfo->votes = swab32(nodeinfo->votes); nodeinfo->expected_votes = swab32(nodeinfo->expected_votes); nodeinfo->flags = swab32(nodeinfo->flags); LEAVE(); } static void message_handler_req_exec_votequorum_nodeinfo ( const void *message, unsigned int sender_nodeid) { const struct req_exec_quorum_nodeinfo *req_exec_quorum_nodeinfo = message; struct cluster_node *node = NULL; int old_votes; int old_expected; uint32_t old_flags; nodestate_t old_state; int new_node = 0; int allow_downgrade = 0; int by_node = 0; unsigned int nodeid = req_exec_quorum_nodeinfo->nodeid; ENTER(); log_printf(LOGSYS_LEVEL_DEBUG, "got nodeinfo message from cluster node %u", sender_nodeid); log_printf(LOGSYS_LEVEL_DEBUG, "nodeinfo message[%u]: votes: %d, expected: %d flags: %d", nodeid, req_exec_quorum_nodeinfo->votes, req_exec_quorum_nodeinfo->expected_votes, req_exec_quorum_nodeinfo->flags); if (nodeid != VOTEQUORUM_QDEVICE_NODEID) { decode_flags(req_exec_quorum_nodeinfo->flags); } node = find_node_by_nodeid(nodeid); if (!node) { node = allocate_node(nodeid); new_node = 1; } if (!node) { corosync_api->error_memory_failure(); LEAVE(); return; } if (new_node) { old_votes = 0; old_expected = 0; old_state = NODESTATE_DEAD; old_flags = 0; } else { old_votes = node->votes; old_expected = node->expected_votes; old_state = node->state; old_flags = node->flags; } if (nodeid == VOTEQUORUM_QDEVICE_NODEID) { struct cluster_node *sender_node = find_node_by_nodeid(sender_nodeid); assert(sender_node != NULL); if ((!cluster_is_quorate) && (sender_node->flags & NODE_FLAGS_QUORATE)) { node->votes = req_exec_quorum_nodeinfo->votes; } else { node->votes = max(node->votes, req_exec_quorum_nodeinfo->votes); } goto recalculate; } /* Update node state */ node->flags = req_exec_quorum_nodeinfo->flags; node->votes = req_exec_quorum_nodeinfo->votes; node->state = NODESTATE_MEMBER; if (node->flags & NODE_FLAGS_LEAVING) { node->state = NODESTATE_LEAVING; allow_downgrade = 1; by_node = 1; } if ((!cluster_is_quorate) && (node->flags & NODE_FLAGS_QUORATE)) { allow_downgrade = 1; us->expected_votes = req_exec_quorum_nodeinfo->expected_votes; } if (node->flags & NODE_FLAGS_QUORATE || (ev_tracking)) { node->expected_votes = req_exec_quorum_nodeinfo->expected_votes; } else { node->expected_votes = us->expected_votes; } if ((last_man_standing) && (node->votes > 1)) { log_printf(LOGSYS_LEVEL_WARNING, "Last Man Standing feature is supported only when all" "cluster nodes votes are set to 1. Disabling LMS."); last_man_standing = 0; if (last_man_standing_timer_set) { corosync_api->timer_delete(last_man_standing_timer); last_man_standing_timer_set = 0; } } recalculate: if ((new_node) || (nodeid == us->node_id) || (node->flags & NODE_FLAGS_FIRST) || (old_votes != node->votes) || (old_expected != node->expected_votes) || (old_flags != node->flags) || (old_state != node->state)) { recalculate_quorum(allow_downgrade, by_node); } if ((wait_for_all) && (!(node->flags & NODE_FLAGS_WFASTATUS)) && (node->flags & NODE_FLAGS_QUORATE)) { update_wait_for_all_status(0); } LEAVE(); } static void exec_votequorum_reconfigure_endian_convert (void *message) { struct req_exec_quorum_reconfigure *reconfigure = message; ENTER(); reconfigure->nodeid = swab32(reconfigure->nodeid); reconfigure->value = swab32(reconfigure->value); LEAVE(); } static void message_handler_req_exec_votequorum_reconfigure ( const void *message, unsigned int nodeid) { const struct req_exec_quorum_reconfigure *req_exec_quorum_reconfigure = message; struct cluster_node *node; ENTER(); log_printf(LOGSYS_LEVEL_DEBUG, "got reconfigure message from cluster node %u for %u", nodeid, req_exec_quorum_reconfigure->nodeid); switch(req_exec_quorum_reconfigure->param) { case VOTEQUORUM_RECONFIG_PARAM_EXPECTED_VOTES: update_node_expected_votes(req_exec_quorum_reconfigure->value); votequorum_exec_send_expectedvotes_notification(); update_ev_barrier(req_exec_quorum_reconfigure->value); if (ev_tracking) { us->expected_votes = max(us->expected_votes, ev_tracking_barrier); } recalculate_quorum(1, 0); /* Allow decrease */ break; case VOTEQUORUM_RECONFIG_PARAM_NODE_VOTES: node = find_node_by_nodeid(req_exec_quorum_reconfigure->nodeid); if (!node) { LEAVE(); return; } node->votes = req_exec_quorum_reconfigure->value; recalculate_quorum(1, 0); /* Allow decrease */ break; case VOTEQUORUM_RECONFIG_PARAM_CANCEL_WFA: update_wait_for_all_status(0); log_printf(LOGSYS_LEVEL_INFO, "wait_for_all_status reset by user on node %d.", req_exec_quorum_reconfigure->nodeid); recalculate_quorum(0, 0); break; } LEAVE(); } static int votequorum_exec_exit_fn (void) { int ret = 0; ENTER(); /* * tell the other nodes we are leaving */ if (allow_downscale) { us->flags |= NODE_FLAGS_LEAVING; ret = votequorum_exec_send_nodeinfo(us->node_id); } if ((ev_tracking) && (ev_tracking_fd != -1)) { close(ev_tracking_fd); } LEAVE(); return ret; } static void votequorum_set_icmap_ro_keys(void) { icmap_set_ro_access("quorum.allow_downscale", CS_FALSE, CS_TRUE); icmap_set_ro_access("quorum.wait_for_all", CS_FALSE, CS_TRUE); icmap_set_ro_access("quorum.last_man_standing", CS_FALSE, CS_TRUE); icmap_set_ro_access("quorum.last_man_standing_window", CS_FALSE, CS_TRUE); icmap_set_ro_access("quorum.expected_votes_tracking", CS_FALSE, CS_TRUE); icmap_set_ro_access("quorum.auto_tie_breaker", CS_FALSE, CS_TRUE); icmap_set_ro_access("quorum.auto_tie_breaker_node", CS_FALSE, CS_TRUE); } static char *votequorum_exec_init_fn (struct corosync_api_v1 *api) { char *error = NULL; ENTER(); /* * make sure we start clean */ qb_list_init(&cluster_members_list); qb_list_init(&trackers_list); qdevice = NULL; us = NULL; memset(cluster_nodes, 0, sizeof(cluster_nodes)); /* * Allocate a cluster_node for qdevice */ qdevice = allocate_node(VOTEQUORUM_QDEVICE_NODEID); if (!qdevice) { LEAVE(); return ((char *)"Could not allocate node."); } qdevice->votes = 0; memset(qdevice_name, 0, VOTEQUORUM_QDEVICE_MAX_NAME_LEN); /* * Allocate a cluster_node for us */ us = allocate_node(corosync_api->totem_nodeid_get()); if (!us) { LEAVE(); return ((char *)"Could not allocate node."); } icmap_set_uint32("runtime.votequorum.this_node_id", us->node_id); us->state = NODESTATE_MEMBER; us->votes = 1; us->flags |= NODE_FLAGS_FIRST; error = votequorum_readconfig(VOTEQUORUM_READCONFIG_STARTUP); if (error) { return error; } recalculate_quorum(0, 0); /* * Set RO keys in icmap */ votequorum_set_icmap_ro_keys(); /* * Listen for changes */ votequorum_exec_add_config_notification(); /* * Start us off with one node */ votequorum_exec_send_nodeinfo(us->node_id); LEAVE(); return (NULL); } /* * votequorum service core */ static void votequorum_last_man_standing_timer_fn(void *arg) { ENTER(); last_man_standing_timer_set = 0; if (cluster_is_quorate) { recalculate_quorum(1,1); } LEAVE(); } static void votequorum_sync_init ( const unsigned int *trans_list, size_t trans_list_entries, const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id) { int i, j; int found; int left_nodes; struct cluster_node *node; ENTER(); sync_in_progress = 1; sync_nodeinfo_sent = 0; sync_wait_for_poll_or_timeout = 0; if (member_list_entries > 1) { us->flags &= ~NODE_FLAGS_FIRST; } /* * we don't need to track which nodes have left directly, * since that info is in the node db, but we need to know * if somebody has left for last_man_standing */ left_nodes = 0; for (i = 0; i < quorum_members_entries; i++) { found = 0; for (j = 0; j < member_list_entries; j++) { if (quorum_members[i] == member_list[j]) { found = 1; break; } } if (found == 0) { left_nodes = 1; node = find_node_by_nodeid(quorum_members[i]); if (node) { node->state = NODESTATE_DEAD; } } } if (last_man_standing) { if (((member_list_entries >= quorum) && (left_nodes)) || ((member_list_entries <= quorum) && (auto_tie_breaker != ATB_NONE) && (check_low_node_id_partition() == 1))) { if (last_man_standing_timer_set) { corosync_api->timer_delete(last_man_standing_timer); last_man_standing_timer_set = 0; } corosync_api->timer_add_duration((unsigned long long)last_man_standing_window*1000000, NULL, votequorum_last_man_standing_timer_fn, &last_man_standing_timer); last_man_standing_timer_set = 1; } } memcpy(previous_quorum_members, quorum_members, sizeof(unsigned int) * quorum_members_entries); previous_quorum_members_entries = quorum_members_entries; memcpy(quorum_members, member_list, sizeof(unsigned int) * member_list_entries); quorum_members_entries = member_list_entries; memcpy(&quorum_ringid, ring_id, sizeof(*ring_id)); if (us->flags & NODE_FLAGS_QDEVICE_REGISTERED && us->flags & NODE_FLAGS_QDEVICE_ALIVE) { /* * Reset poll timer. Sync waiting is interrupted on valid qdevice poll or after timeout */ if (qdevice_timer_set) { corosync_api->timer_delete(qdevice_timer); } corosync_api->timer_add_duration((unsigned long long)qdevice_sync_timeout*1000000, qdevice, qdevice_timer_fn, &qdevice_timer); qdevice_timer_set = 1; sync_wait_for_poll_or_timeout = 1; log_printf(LOGSYS_LEVEL_INFO, "waiting for quorum device %s poll (but maximum for %u ms)", qdevice_name, qdevice_sync_timeout); } LEAVE(); } static int votequorum_sync_process (void) { if (!sync_nodeinfo_sent) { votequorum_exec_send_nodeinfo(us->node_id); votequorum_exec_send_nodeinfo(VOTEQUORUM_QDEVICE_NODEID); if (strlen(qdevice_name)) { votequorum_exec_send_qdevice_reg(VOTEQUORUM_QDEVICE_OPERATION_REGISTER, qdevice_name); } votequorum_exec_send_nodelist_notification(NULL, 0LL); sync_nodeinfo_sent = 1; } if (us->flags & NODE_FLAGS_QDEVICE_REGISTERED && sync_wait_for_poll_or_timeout) { /* * Waiting for qdevice to poll with new ringid or timeout */ return (-1); } return 0; } static void votequorum_sync_activate (void) { recalculate_quorum(0, 0); quorum_callback(quorum_members, quorum_members_entries, cluster_is_quorate, &quorum_ringid); votequorum_exec_send_quorum_notification(NULL, 0L); sync_in_progress = 0; } static void votequorum_sync_abort (void) { } char *votequorum_init(struct corosync_api_v1 *api, quorum_set_quorate_fn_t q_set_quorate_fn) { char *error; ENTER(); if (q_set_quorate_fn == NULL) { return ((char *)"Quorate function not set"); } corosync_api = api; quorum_callback = q_set_quorate_fn; error = corosync_service_link_and_init(corosync_api, &votequorum_service[0]); if (error) { return (error); } LEAVE(); return (NULL); } /* * Library Handler init/fini */ static int quorum_lib_init_fn (void *conn) { struct quorum_pd *pd = (struct quorum_pd *)corosync_api->ipc_private_data_get (conn); ENTER(); qb_list_init (&pd->list); pd->conn = conn; LEAVE(); return (0); } static int quorum_lib_exit_fn (void *conn) { struct quorum_pd *quorum_pd = (struct quorum_pd *)corosync_api->ipc_private_data_get (conn); ENTER(); if (quorum_pd->tracking_enabled) { qb_list_del (&quorum_pd->list); qb_list_init (&quorum_pd->list); } LEAVE(); return (0); } /* * library internal functions */ static void qdevice_timer_fn(void *arg) { ENTER(); if ((!(us->flags & NODE_FLAGS_QDEVICE_ALIVE)) || (!qdevice_timer_set)) { LEAVE(); return; } us->flags &= ~NODE_FLAGS_QDEVICE_ALIVE; us->flags &= ~NODE_FLAGS_QDEVICE_CAST_VOTE; log_printf(LOGSYS_LEVEL_INFO, "lost contact with quorum device %s", qdevice_name); votequorum_exec_send_nodeinfo(us->node_id); qdevice_timer_set = 0; sync_wait_for_poll_or_timeout = 0; LEAVE(); } /* * Library Handler Functions */ static void message_handler_req_lib_votequorum_getinfo (void *conn, const void *message) { const struct req_lib_votequorum_getinfo *req_lib_votequorum_getinfo = message; struct res_lib_votequorum_getinfo res_lib_votequorum_getinfo; struct cluster_node *node; unsigned int highest_expected = 0; unsigned int total_votes = 0; cs_error_t error = CS_OK; uint32_t nodeid = req_lib_votequorum_getinfo->nodeid; ENTER(); log_printf(LOGSYS_LEVEL_DEBUG, "got getinfo request on %p for node %u", conn, req_lib_votequorum_getinfo->nodeid); if (nodeid == VOTEQUORUM_QDEVICE_NODEID) { nodeid = us->node_id; } node = find_node_by_nodeid(nodeid); if (node) { struct cluster_node *iternode; struct qb_list_head *nodelist; qb_list_for_each(nodelist, &cluster_members_list) { iternode = qb_list_entry(nodelist, struct cluster_node, list); if (iternode->state == NODESTATE_MEMBER) { highest_expected = max(highest_expected, iternode->expected_votes); total_votes += iternode->votes; } } if (node->flags & NODE_FLAGS_QDEVICE_CAST_VOTE) { total_votes += qdevice->votes; } switch(node->state) { case NODESTATE_MEMBER: res_lib_votequorum_getinfo.state = VOTEQUORUM_NODESTATE_MEMBER; break; case NODESTATE_DEAD: res_lib_votequorum_getinfo.state = VOTEQUORUM_NODESTATE_DEAD; break; case NODESTATE_LEAVING: res_lib_votequorum_getinfo.state = VOTEQUORUM_NODESTATE_LEAVING; break; default: res_lib_votequorum_getinfo.state = node->state; break; } res_lib_votequorum_getinfo.state = node->state; res_lib_votequorum_getinfo.votes = node->votes; res_lib_votequorum_getinfo.expected_votes = node->expected_votes; res_lib_votequorum_getinfo.highest_expected = highest_expected; res_lib_votequorum_getinfo.quorum = quorum; res_lib_votequorum_getinfo.total_votes = total_votes; res_lib_votequorum_getinfo.flags = 0; res_lib_votequorum_getinfo.nodeid = node->node_id; if (two_node) { res_lib_votequorum_getinfo.flags |= VOTEQUORUM_INFO_TWONODE; } if (cluster_is_quorate) { res_lib_votequorum_getinfo.flags |= VOTEQUORUM_INFO_QUORATE; } if (wait_for_all) { res_lib_votequorum_getinfo.flags |= VOTEQUORUM_INFO_WAIT_FOR_ALL; } if (last_man_standing) { res_lib_votequorum_getinfo.flags |= VOTEQUORUM_INFO_LAST_MAN_STANDING; } if (auto_tie_breaker != ATB_NONE) { res_lib_votequorum_getinfo.flags |= VOTEQUORUM_INFO_AUTO_TIE_BREAKER; } if (allow_downscale) { res_lib_votequorum_getinfo.flags |= VOTEQUORUM_INFO_ALLOW_DOWNSCALE; } memset(res_lib_votequorum_getinfo.qdevice_name, 0, VOTEQUORUM_QDEVICE_MAX_NAME_LEN); strcpy(res_lib_votequorum_getinfo.qdevice_name, qdevice_name); res_lib_votequorum_getinfo.qdevice_votes = qdevice->votes; if (node->flags & NODE_FLAGS_QDEVICE_REGISTERED) { res_lib_votequorum_getinfo.flags |= VOTEQUORUM_INFO_QDEVICE_REGISTERED; } if (node->flags & NODE_FLAGS_QDEVICE_ALIVE) { res_lib_votequorum_getinfo.flags |= VOTEQUORUM_INFO_QDEVICE_ALIVE; } if (node->flags & NODE_FLAGS_QDEVICE_CAST_VOTE) { res_lib_votequorum_getinfo.flags |= VOTEQUORUM_INFO_QDEVICE_CAST_VOTE; } if (node->flags & NODE_FLAGS_QDEVICE_MASTER_WINS) { res_lib_votequorum_getinfo.flags |= VOTEQUORUM_INFO_QDEVICE_MASTER_WINS; } } else { error = CS_ERR_NOT_EXIST; } res_lib_votequorum_getinfo.header.size = sizeof(res_lib_votequorum_getinfo); res_lib_votequorum_getinfo.header.id = MESSAGE_RES_VOTEQUORUM_GETINFO; res_lib_votequorum_getinfo.header.error = error; corosync_api->ipc_response_send(conn, &res_lib_votequorum_getinfo, sizeof(res_lib_votequorum_getinfo)); log_printf(LOGSYS_LEVEL_DEBUG, "getinfo response error: %d", error); LEAVE(); } static void message_handler_req_lib_votequorum_setexpected (void *conn, const void *message) { const struct req_lib_votequorum_setexpected *req_lib_votequorum_setexpected = message; struct res_lib_votequorum_status res_lib_votequorum_status; cs_error_t error = CS_OK; unsigned int newquorum; unsigned int total_votes; uint8_t allow_downscale_status = 0; ENTER(); allow_downscale_status = allow_downscale; allow_downscale = 0; /* * Validate new expected votes */ newquorum = calculate_quorum(1, req_lib_votequorum_setexpected->expected_votes, &total_votes); allow_downscale = allow_downscale_status; if (newquorum < total_votes / 2 || newquorum > total_votes) { error = CS_ERR_INVALID_PARAM; goto error_exit; } update_node_expected_votes(req_lib_votequorum_setexpected->expected_votes); votequorum_exec_send_reconfigure(VOTEQUORUM_RECONFIG_PARAM_EXPECTED_VOTES, us->node_id, req_lib_votequorum_setexpected->expected_votes); error_exit: res_lib_votequorum_status.header.size = sizeof(res_lib_votequorum_status); res_lib_votequorum_status.header.id = MESSAGE_RES_VOTEQUORUM_STATUS; res_lib_votequorum_status.header.error = error; corosync_api->ipc_response_send(conn, &res_lib_votequorum_status, sizeof(res_lib_votequorum_status)); LEAVE(); } static void message_handler_req_lib_votequorum_setvotes (void *conn, const void *message) { const struct req_lib_votequorum_setvotes *req_lib_votequorum_setvotes = message; struct res_lib_votequorum_status res_lib_votequorum_status; struct cluster_node *node; unsigned int newquorum; unsigned int total_votes; unsigned int saved_votes; cs_error_t error = CS_OK; unsigned int nodeid; ENTER(); nodeid = req_lib_votequorum_setvotes->nodeid; node = find_node_by_nodeid(nodeid); if (!node) { error = CS_ERR_NAME_NOT_FOUND; goto error_exit; } /* * Check votes is valid */ saved_votes = node->votes; node->votes = req_lib_votequorum_setvotes->votes; newquorum = calculate_quorum(1, 0, &total_votes); if (newquorum < total_votes / 2 || newquorum > total_votes) { node->votes = saved_votes; error = CS_ERR_INVALID_PARAM; goto error_exit; } votequorum_exec_send_reconfigure(VOTEQUORUM_RECONFIG_PARAM_NODE_VOTES, nodeid, req_lib_votequorum_setvotes->votes); error_exit: res_lib_votequorum_status.header.size = sizeof(res_lib_votequorum_status); res_lib_votequorum_status.header.id = MESSAGE_RES_VOTEQUORUM_STATUS; res_lib_votequorum_status.header.error = error; corosync_api->ipc_response_send(conn, &res_lib_votequorum_status, sizeof(res_lib_votequorum_status)); LEAVE(); } static void message_handler_req_lib_votequorum_trackstart (void *conn, const void *message) { const struct req_lib_votequorum_trackstart *req_lib_votequorum_trackstart = message; struct res_lib_votequorum_status res_lib_votequorum_status; struct quorum_pd *quorum_pd = (struct quorum_pd *)corosync_api->ipc_private_data_get (conn); cs_error_t error = CS_OK; ENTER(); /* * If an immediate listing of the current cluster membership * is requested, generate membership list */ if (req_lib_votequorum_trackstart->track_flags & CS_TRACK_CURRENT || req_lib_votequorum_trackstart->track_flags & CS_TRACK_CHANGES) { log_printf(LOGSYS_LEVEL_DEBUG, "sending initial status to %p", conn); votequorum_exec_send_nodelist_notification(conn, req_lib_votequorum_trackstart->context); votequorum_exec_send_quorum_notification(conn, req_lib_votequorum_trackstart->context); } if (quorum_pd->tracking_enabled) { error = CS_ERR_EXIST; goto response_send; } /* * Record requests for tracking */ if (req_lib_votequorum_trackstart->track_flags & CS_TRACK_CHANGES || req_lib_votequorum_trackstart->track_flags & CS_TRACK_CHANGES_ONLY) { quorum_pd->track_flags = req_lib_votequorum_trackstart->track_flags; quorum_pd->tracking_enabled = 1; quorum_pd->tracking_context = req_lib_votequorum_trackstart->context; qb_list_add (&quorum_pd->list, &trackers_list); } response_send: res_lib_votequorum_status.header.size = sizeof(res_lib_votequorum_status); res_lib_votequorum_status.header.id = MESSAGE_RES_VOTEQUORUM_STATUS; res_lib_votequorum_status.header.error = error; corosync_api->ipc_response_send(conn, &res_lib_votequorum_status, sizeof(res_lib_votequorum_status)); LEAVE(); } static void message_handler_req_lib_votequorum_trackstop (void *conn, const void *message) { struct res_lib_votequorum_status res_lib_votequorum_status; struct quorum_pd *quorum_pd = (struct quorum_pd *)corosync_api->ipc_private_data_get (conn); int error = CS_OK; ENTER(); if (quorum_pd->tracking_enabled) { error = CS_OK; quorum_pd->tracking_enabled = 0; qb_list_del (&quorum_pd->list); qb_list_init (&quorum_pd->list); } else { error = CS_ERR_NOT_EXIST; } res_lib_votequorum_status.header.size = sizeof(res_lib_votequorum_status); res_lib_votequorum_status.header.id = MESSAGE_RES_VOTEQUORUM_STATUS; res_lib_votequorum_status.header.error = error; corosync_api->ipc_response_send(conn, &res_lib_votequorum_status, sizeof(res_lib_votequorum_status)); LEAVE(); } static void message_handler_req_lib_votequorum_qdevice_register (void *conn, const void *message) { const struct req_lib_votequorum_qdevice_register *req_lib_votequorum_qdevice_register = message; struct res_lib_votequorum_status res_lib_votequorum_status; cs_error_t error = CS_OK; ENTER(); if (!qdevice_can_operate) { log_printf(LOGSYS_LEVEL_INFO, "Registration of quorum device is disabled by incorrect corosync.conf. See logs for more information"); error = CS_ERR_ACCESS; goto out; } if (us->flags & NODE_FLAGS_QDEVICE_REGISTERED) { if ((!strncmp(req_lib_votequorum_qdevice_register->name, qdevice_name, VOTEQUORUM_QDEVICE_MAX_NAME_LEN))) { goto out; } else { log_printf(LOGSYS_LEVEL_WARNING, "A new qdevice with different name (new: %s old: %s) is trying to re-register!", req_lib_votequorum_qdevice_register->name, qdevice_name); error = CS_ERR_EXIST; goto out; } } else { if (qdevice_reg_conn != NULL) { log_printf(LOGSYS_LEVEL_WARNING, "Registration request already in progress"); error = CS_ERR_TRY_AGAIN; goto out; } qdevice_reg_conn = conn; if (votequorum_exec_send_qdevice_reg(VOTEQUORUM_QDEVICE_OPERATION_REGISTER, req_lib_votequorum_qdevice_register->name) != 0) { log_printf(LOGSYS_LEVEL_WARNING, "Unable to send qdevice registration request to cluster"); error = CS_ERR_TRY_AGAIN; qdevice_reg_conn = NULL; } else { LEAVE(); return; } } out: res_lib_votequorum_status.header.size = sizeof(res_lib_votequorum_status); res_lib_votequorum_status.header.id = MESSAGE_RES_VOTEQUORUM_STATUS; res_lib_votequorum_status.header.error = error; corosync_api->ipc_response_send(conn, &res_lib_votequorum_status, sizeof(res_lib_votequorum_status)); LEAVE(); } static void message_handler_req_lib_votequorum_qdevice_unregister (void *conn, const void *message) { const struct req_lib_votequorum_qdevice_unregister *req_lib_votequorum_qdevice_unregister = message; struct res_lib_votequorum_status res_lib_votequorum_status; cs_error_t error = CS_OK; ENTER(); if (us->flags & NODE_FLAGS_QDEVICE_REGISTERED) { if (strncmp(req_lib_votequorum_qdevice_unregister->name, qdevice_name, VOTEQUORUM_QDEVICE_MAX_NAME_LEN)) { error = CS_ERR_INVALID_PARAM; goto out; } if (qdevice_timer_set) { corosync_api->timer_delete(qdevice_timer); qdevice_timer_set = 0; sync_wait_for_poll_or_timeout = 0; } us->flags &= ~NODE_FLAGS_QDEVICE_REGISTERED; us->flags &= ~NODE_FLAGS_QDEVICE_ALIVE; us->flags &= ~NODE_FLAGS_QDEVICE_CAST_VOTE; us->flags &= ~NODE_FLAGS_QDEVICE_MASTER_WINS; votequorum_exec_send_nodeinfo(us->node_id); votequorum_exec_send_qdevice_reg(VOTEQUORUM_QDEVICE_OPERATION_UNREGISTER, req_lib_votequorum_qdevice_unregister->name); } else { error = CS_ERR_NOT_EXIST; } out: res_lib_votequorum_status.header.size = sizeof(res_lib_votequorum_status); res_lib_votequorum_status.header.id = MESSAGE_RES_VOTEQUORUM_STATUS; res_lib_votequorum_status.header.error = error; corosync_api->ipc_response_send(conn, &res_lib_votequorum_status, sizeof(res_lib_votequorum_status)); LEAVE(); } static void message_handler_req_lib_votequorum_qdevice_update (void *conn, const void *message) { const struct req_lib_votequorum_qdevice_update *req_lib_votequorum_qdevice_update = message; struct res_lib_votequorum_status res_lib_votequorum_status; cs_error_t error = CS_OK; ENTER(); if (us->flags & NODE_FLAGS_QDEVICE_REGISTERED) { if (strncmp(req_lib_votequorum_qdevice_update->oldname, qdevice_name, VOTEQUORUM_QDEVICE_MAX_NAME_LEN)) { error = CS_ERR_INVALID_PARAM; goto out; } votequorum_exec_send_qdevice_reconfigure(req_lib_votequorum_qdevice_update->oldname, req_lib_votequorum_qdevice_update->newname); } else { error = CS_ERR_NOT_EXIST; } out: res_lib_votequorum_status.header.size = sizeof(res_lib_votequorum_status); res_lib_votequorum_status.header.id = MESSAGE_RES_VOTEQUORUM_STATUS; res_lib_votequorum_status.header.error = error; corosync_api->ipc_response_send(conn, &res_lib_votequorum_status, sizeof(res_lib_votequorum_status)); LEAVE(); } static void message_handler_req_lib_votequorum_qdevice_poll (void *conn, const void *message) { const struct req_lib_votequorum_qdevice_poll *req_lib_votequorum_qdevice_poll = message; struct res_lib_votequorum_status res_lib_votequorum_status; cs_error_t error = CS_OK; uint32_t oldflags; ENTER(); if (!qdevice_can_operate) { error = CS_ERR_ACCESS; goto out; } if (us->flags & NODE_FLAGS_QDEVICE_REGISTERED) { if (!(req_lib_votequorum_qdevice_poll->ring_id.nodeid == quorum_ringid.rep.nodeid && req_lib_votequorum_qdevice_poll->ring_id.seq == quorum_ringid.seq)) { log_printf(LOGSYS_LEVEL_DEBUG, "Received poll ring id (%u.%"PRIu64") != last sync " "ring id (%u.%"PRIu64"). Ignoring poll call.", req_lib_votequorum_qdevice_poll->ring_id.nodeid, req_lib_votequorum_qdevice_poll->ring_id.seq, quorum_ringid.rep.nodeid, quorum_ringid.seq); error = CS_ERR_MESSAGE_ERROR; goto out; } if (strncmp(req_lib_votequorum_qdevice_poll->name, qdevice_name, VOTEQUORUM_QDEVICE_MAX_NAME_LEN)) { error = CS_ERR_INVALID_PARAM; goto out; } if (qdevice_timer_set) { corosync_api->timer_delete(qdevice_timer); qdevice_timer_set = 0; } oldflags = us->flags; us->flags |= NODE_FLAGS_QDEVICE_ALIVE; if (req_lib_votequorum_qdevice_poll->cast_vote) { us->flags |= NODE_FLAGS_QDEVICE_CAST_VOTE; } else { us->flags &= ~NODE_FLAGS_QDEVICE_CAST_VOTE; } if (us->flags != oldflags) { votequorum_exec_send_nodeinfo(us->node_id); } corosync_api->timer_add_duration((unsigned long long)qdevice_timeout*1000000, qdevice, qdevice_timer_fn, &qdevice_timer); qdevice_timer_set = 1; sync_wait_for_poll_or_timeout = 0; } else { error = CS_ERR_NOT_EXIST; } out: res_lib_votequorum_status.header.size = sizeof(res_lib_votequorum_status); res_lib_votequorum_status.header.id = MESSAGE_RES_VOTEQUORUM_STATUS; res_lib_votequorum_status.header.error = error; corosync_api->ipc_response_send(conn, &res_lib_votequorum_status, sizeof(res_lib_votequorum_status)); LEAVE(); } static void message_handler_req_lib_votequorum_qdevice_master_wins (void *conn, const void *message) { const struct req_lib_votequorum_qdevice_master_wins *req_lib_votequorum_qdevice_master_wins = message; struct res_lib_votequorum_status res_lib_votequorum_status; cs_error_t error = CS_OK; uint32_t oldflags = us->flags; ENTER(); if (!qdevice_can_operate) { error = CS_ERR_ACCESS; goto out; } if (us->flags & NODE_FLAGS_QDEVICE_REGISTERED) { if (strncmp(req_lib_votequorum_qdevice_master_wins->name, qdevice_name, VOTEQUORUM_QDEVICE_MAX_NAME_LEN)) { error = CS_ERR_INVALID_PARAM; goto out; } if (req_lib_votequorum_qdevice_master_wins->allow) { us->flags |= NODE_FLAGS_QDEVICE_MASTER_WINS; } else { us->flags &= ~NODE_FLAGS_QDEVICE_MASTER_WINS; } if (us->flags != oldflags) { votequorum_exec_send_nodeinfo(us->node_id); } update_qdevice_master_wins(req_lib_votequorum_qdevice_master_wins->allow); } else { error = CS_ERR_NOT_EXIST; } out: res_lib_votequorum_status.header.size = sizeof(res_lib_votequorum_status); res_lib_votequorum_status.header.id = MESSAGE_RES_VOTEQUORUM_STATUS; res_lib_votequorum_status.header.error = error; corosync_api->ipc_response_send(conn, &res_lib_votequorum_status, sizeof(res_lib_votequorum_status)); LEAVE(); }