diff --git a/cts/agents/common_test_agent.c b/cts/agents/common_test_agent.c index 67d76c17..e395c316 100644 --- a/cts/agents/common_test_agent.c +++ b/cts/agents/common_test_agent.c @@ -1,333 +1,332 @@ /* * Copyright (c) 2010 Red Hat, Inc. * * All rights reserved. * * Author: Angus Salkeld (asalkeld@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include #include #include #include -#include #include #include "common_test_agent.h" #define MAX_CLIENTS 64 static char big_and_buf_rx[HOW_BIG_AND_BUF]; ta_do_command_fn do_command; static qb_loop_t *poll_handle; static pre_exit_fn pre_exit = NULL; static int client_fds[MAX_CLIENTS]; static int client_fds_pos = 0; qb_loop_t *ta_poll_handle_get(void) { return poll_handle; } static void shut_me_down(void) { if (pre_exit) { pre_exit(); } qb_loop_stop(poll_handle); } static void ta_handle_command (int sock, char* msg) { int num_args; char *saveptr = NULL; char *str = strdup (msg); char *str_len; char *str_arg; char *args[5]; int i = 0; int a = 0; char* func = NULL; qb_log(LOG_DEBUG,"(MSG:%s)", msg); str_len = strtok_r (str, ":", &saveptr); assert (str_len); num_args = atoi (str_len) * 2; for (i = 0; i < num_args / 2; i++) { str_len = strtok_r (NULL, ":", &saveptr); str_arg = strtok_r (NULL, ":", &saveptr); if (func == NULL) { /* first "arg" is the function */ qb_log (LOG_TRACE, "(LEN:%s, FUNC:%s)", str_len, str_arg); func = str_arg; a = 0; } else { args[a] = str_arg; a++; qb_log (LOG_TRACE, "(LEN:%s, ARG:%s)", str_len, str_arg); } } do_command (sock, func, args, a+1); free (str); } static int server_process_data_fn ( int fd, int revents, void *data) { char *saveptr; char *msg; char *cmd; int32_t nbytes; if (revents & POLLHUP || revents & POLLERR) { qb_log (LOG_INFO, "command sockect got POLLHUP exiting..."); shut_me_down(); return -1; } if ((nbytes = recv (fd, big_and_buf_rx, sizeof (big_and_buf_rx), 0)) <= 0) { /* got error or connection closed by client */ if (nbytes == 0) { /* connection closed */ qb_log (LOG_WARNING, "socket %d hung up: exiting...", fd); } else { qb_perror(LOG_ERR, "recv() failed"); } shut_me_down(); return -1; } else { big_and_buf_rx[nbytes] = '\0'; msg = strtok_r (big_and_buf_rx, ";", &saveptr); assert (msg); while (msg) { cmd = strdup (msg); ta_handle_command (fd, cmd); free (cmd); msg = strtok_r (NULL, ";", &saveptr); } } return 0; } static int server_accept_fn ( int fd, int revents, void *data) { socklen_t addrlen; struct sockaddr_in in_addr; int new_fd; int res; if (revents & POLLHUP || revents & POLLERR) { qb_log (LOG_INFO, "command sockect got POLLHUP exiting..."); shut_me_down(); return -1; } addrlen = sizeof (struct sockaddr_in); retry_accept: new_fd = accept (fd, (struct sockaddr *)&in_addr, &addrlen); if (new_fd == -1 && errno == EINTR) { goto retry_accept; } if (new_fd == -1) { qb_log (LOG_ERR, "Could not accept connection: %s", strerror (errno)); return (0); /* This is an error, but -1 would indicate disconnect from poll loop */ } res = fcntl (new_fd, F_SETFL, O_NONBLOCK); if (res == -1) { qb_log (LOG_ERR, "Could not set non-blocking operation on connection: %s", strerror (errno)); close (new_fd); return (0); /* This is an error, but -1 would indicate disconnect from poll loop */ } client_fds[client_fds_pos] = new_fd; client_fds_pos++; assert(client_fds_pos < MAX_CLIENTS); qb_loop_poll_add (poll_handle, QB_LOOP_MED, new_fd, POLLIN|POLLNVAL, NULL, server_process_data_fn); return 0; } static int create_server_sockect (int server_port) { int listener; int yes = 1; int rv; struct addrinfo hints, *ai, *p; char server_port_str[16]; char addr_str[INET_ADDRSTRLEN]; void *ptr = NULL; /* get a socket and bind it */ sprintf(server_port_str, "%d", server_port); memset (&hints, 0, sizeof hints); hints.ai_family = AF_UNSPEC; hints.ai_socktype = SOCK_STREAM; hints.ai_flags = AI_PASSIVE; if ((rv = getaddrinfo (NULL, server_port_str, &hints, &ai)) != 0) { qb_log (LOG_ERR, "%s", gai_strerror (rv)); exit (1); } for (p = ai; p != NULL; p = p->ai_next) { listener = socket (p->ai_family, p->ai_socktype, p->ai_protocol); if (listener < 0) { continue; } /* lose the pesky "address already in use" error message */ if (setsockopt (listener, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) < 0) { qb_log (LOG_ERR, "setsockopt() failed: %s", strerror (errno)); } switch (p->ai_family) { case AF_INET: ptr = &((struct sockaddr_in *) p->ai_addr)->sin_addr; break; case AF_INET6: ptr = &((struct sockaddr_in6 *) p->ai_addr)->sin6_addr; break; default: qb_log (LOG_ERR, "address family wrong"); exit (4); } if (inet_ntop(p->ai_family, ptr, addr_str, INET_ADDRSTRLEN) == NULL) { qb_log (LOG_ERR, "inet_ntop() failed: %s", strerror (errno)); } if (bind (listener, p->ai_addr, p->ai_addrlen) < 0) { qb_log (LOG_ERR, "bind(%s) failed: %s", addr_str, strerror (errno)); close (listener); continue; } break; } if (p == NULL) { qb_log (LOG_ERR, "failed to bind"); exit (2); } freeaddrinfo (ai); if (listen (listener, 10) == -1) { qb_log (LOG_ERR, "listen() failed: %s", strerror(errno)); exit (3); } return listener; } static int32_t sig_exit_handler (int num, void *data) { qb_log (LOG_INFO, "got signal %d, exiting", num); shut_me_down(); return 0; } int test_agent_run(const char * prog_name, int server_port, ta_do_command_fn func, pre_exit_fn exit_fn) { int listener; int i; qb_log_init(prog_name, LOG_DAEMON, LOG_DEBUG); qb_log_format_set(QB_LOG_SYSLOG, "%n() [%p] %b"); qb_log (LOG_INFO, "STARTING"); do_command = func; pre_exit = exit_fn; poll_handle = qb_loop_create (); if (exit_fn) { qb_loop_signal_add(poll_handle, QB_LOOP_HIGH, SIGINT, NULL, sig_exit_handler, NULL); qb_loop_signal_add(poll_handle, QB_LOOP_HIGH, SIGQUIT, NULL, sig_exit_handler, NULL); qb_loop_signal_add(poll_handle, QB_LOOP_HIGH, SIGTERM, NULL, sig_exit_handler, NULL); } listener = create_server_sockect (server_port); qb_loop_poll_add (poll_handle, QB_LOOP_MED, listener, POLLIN|POLLNVAL, NULL, server_accept_fn); qb_loop_run (poll_handle); close(listener); for (i = 0; i < client_fds_pos; i++) { close(client_fds[client_fds_pos]); } qb_log (LOG_INFO, "EXITING"); qb_log_fini(); return 0; } diff --git a/cts/agents/cpg_test_agent.c b/cts/agents/cpg_test_agent.c index ea2e6481..fdb846ed 100644 --- a/cts/agents/cpg_test_agent.c +++ b/cts/agents/cpg_test_agent.c @@ -1,805 +1,804 @@ /* * Copyright (c) 2010 Red Hat, Inc. * * All rights reserved. * * Author: Angus Salkeld (asalkeld@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include #include #include #include #include -#include #include #include #include #include #include #include #include #include #include "common_test_agent.h" #include #include typedef enum { MSG_OK, MSG_NODEID_ERR, MSG_PID_ERR, MSG_SEQ_ERR, MSG_SIZE_ERR, MSG_SHA1_ERR, } msg_status_t; typedef struct { uint32_t nodeid; pid_t pid; unsigned char sha1[20]; uint32_t seq; size_t size; unsigned char payload[0]; } msg_t; #define LOG_STR_SIZE 80 typedef struct { char log[LOG_STR_SIZE]; struct qb_list_head list; } log_entry_t; static char big_and_buf[HOW_BIG_AND_BUF]; static int32_t record_config_events_g = 0; static int32_t record_messages_g = 0; static cpg_handle_t cpg_handle = 0; static corosync_cfg_handle_t cfg_handle = 0; static int32_t cpg_fd = -1; static int32_t cfg_fd = -1; static struct qb_list_head config_chg_log_head; static struct qb_list_head msg_log_head; static pid_t my_pid; static uint32_t my_nodeid; static int32_t my_seq; static int32_t use_zcb = QB_FALSE; static int32_t my_msgs_to_send; static int32_t my_msgs_sent; static int32_t total_stored_msgs = 0; static int32_t total_msgs_revd = 0; static int32_t in_cnchg = 0; static int32_t pcmk_test = 0; PK11Context* sha1_context; static void send_some_more_messages (void * unused); static char* err_status_string (char * buf, size_t buf_len, msg_status_t status) { switch (status) { case MSG_OK: strncpy (buf, "OK", buf_len); break; case MSG_NODEID_ERR: strncpy (buf, "NODEID_ERR", buf_len); break; case MSG_PID_ERR: strncpy (buf, "PID_ERR", buf_len); break; case MSG_SEQ_ERR: strncpy (buf, "SEQ_ERR", buf_len); break; case MSG_SIZE_ERR: strncpy (buf, "SIZE_ERR", buf_len); break; case MSG_SHA1_ERR: strncpy (buf, "SHA1_ERR", buf_len); break; default: strncpy (buf, "UNKNOWN_ERR", buf_len); break; } if (buf_len > 0) { buf[buf_len - 1] = '\0'; } return buf; } static void delivery_callback ( cpg_handle_t handle, const struct cpg_name *groupName, uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len) { log_entry_t *log_pt; msg_t *msg_pt = (msg_t*)msg; msg_status_t status = MSG_OK; char status_buf[20]; unsigned char sha1_compare[20]; unsigned int sha1_len; if (record_messages_g == 0) { return; } if (nodeid != msg_pt->nodeid) { status = MSG_NODEID_ERR; } if (pid != msg_pt->pid) { status = MSG_PID_ERR; } if (msg_len != msg_pt->size) { status = MSG_SIZE_ERR; } PK11_DigestBegin(sha1_context); PK11_DigestOp(sha1_context, msg_pt->payload, (msg_pt->size - sizeof (msg_t))); PK11_DigestFinal(sha1_context, sha1_compare, &sha1_len, sizeof(sha1_compare)); if (memcmp (sha1_compare, msg_pt->sha1, 20) != 0) { qb_log (LOG_ERR, "msg seq:%d; incorrect hash", msg_pt->seq); status = MSG_SHA1_ERR; } log_pt = malloc (sizeof(log_entry_t)); qb_list_init (&log_pt->list); snprintf (log_pt->log, LOG_STR_SIZE, "%u:%d:%d:%s;", msg_pt->nodeid, msg_pt->seq, my_seq, err_status_string (status_buf, 20, status)); qb_list_add_tail (&log_pt->list, &msg_log_head); total_stored_msgs++; total_msgs_revd++; my_seq++; if ((total_msgs_revd % 1000) == 0) { qb_log (LOG_INFO, "rx %d", total_msgs_revd); } } static void config_change_callback ( cpg_handle_t handle, const struct cpg_name *groupName, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries) { int i; log_entry_t *log_pt; /* group_name,ip,pid,join|leave */ if (record_config_events_g > 0) { qb_log (LOG_INFO, "got cpg event[recording] for group %s", groupName->value); } else { qb_log (LOG_INFO, "got cpg event[ignoring] for group %s", groupName->value); } for (i = 0; i < left_list_entries; i++) { if (record_config_events_g > 0) { log_pt = malloc (sizeof(log_entry_t)); qb_list_init (&log_pt->list); snprintf (log_pt->log, LOG_STR_SIZE, "%s,%u,%u,left", groupName->value, left_list[i].nodeid,left_list[i].pid); qb_list_add_tail(&log_pt->list, &config_chg_log_head); qb_log (LOG_INFO, "cpg event %s", log_pt->log); } } for (i = 0; i < joined_list_entries; i++) { if (record_config_events_g > 0) { log_pt = malloc (sizeof(log_entry_t)); qb_list_init (&log_pt->list); snprintf (log_pt->log, LOG_STR_SIZE, "%s,%u,%u,join", groupName->value, joined_list[i].nodeid,joined_list[i].pid); qb_list_add_tail (&log_pt->list, &config_chg_log_head); qb_log (LOG_INFO, "cpg event %s", log_pt->log); } } if (pcmk_test == 1) { in_cnchg = 1; send_some_more_messages (NULL); in_cnchg = 0; } } static void my_shutdown_callback (corosync_cfg_handle_t handle, corosync_cfg_shutdown_flags_t flags) { qb_log (LOG_CRIT, "flags:%d", flags); if (flags == COROSYNC_CFG_SHUTDOWN_FLAG_REQUEST) { corosync_cfg_replyto_shutdown (cfg_handle, COROSYNC_CFG_SHUTDOWN_FLAG_YES); } } static corosync_cfg_callbacks_t cfg_callbacks = { .corosync_cfg_shutdown_callback = my_shutdown_callback, }; static cpg_callbacks_t callbacks = { .cpg_deliver_fn = delivery_callback, .cpg_confchg_fn = config_change_callback, }; static void record_messages (void) { record_messages_g = 1; qb_log (LOG_INFO, "record:%d", record_messages_g); } static void record_config_events (int sock) { char response[100]; ssize_t rc; size_t send_len; record_config_events_g = 1; qb_log (LOG_INFO, "record:%d", record_config_events_g); snprintf (response, 100, "%s", OK_STR); send_len = strlen (response); rc = send (sock, response, send_len, 0); assert(rc == send_len); } static void read_config_event (int sock) { const char *empty = "None"; log_entry_t *entry; ssize_t rc; size_t send_len; if (qb_list_empty(&config_chg_log_head) == 0) { entry = qb_list_first_entry (&config_chg_log_head, log_entry_t, list); send_len = strlen (entry->log); rc = send (sock, entry->log, send_len, 0); qb_list_del (&entry->list); free (entry); } else { qb_log (LOG_DEBUG, "no events in list"); send_len = strlen (empty); rc = send (sock, empty, send_len, 0); } assert(rc == send_len); } static void read_messages (int sock, char* atmost_str) { struct qb_list_head *iter, *tmp_iter; log_entry_t *entry; int atmost = atoi (atmost_str); int packed = 0; ssize_t rc; if (atmost == 0) atmost = 1; if (atmost > (HOW_BIG_AND_BUF / LOG_STR_SIZE)) atmost = (HOW_BIG_AND_BUF / LOG_STR_SIZE); big_and_buf[0] = '\0'; qb_list_for_each_safe(iter, tmp_iter, &msg_log_head) { if (packed >= atmost) break; entry = qb_list_entry (iter, log_entry_t, list); strcat (big_and_buf, entry->log); packed++; qb_list_del (&entry->list); free (entry); total_stored_msgs--; } if (packed == 0) { strcpy (big_and_buf, "None"); } else { if ((total_stored_msgs % 1000) == 0) { qb_log(LOG_INFO, "sending %d; total_stored_msgs:%d; len:%d", packed, total_stored_msgs, (int)strlen (big_and_buf)); } } rc = send (sock, big_and_buf, strlen (big_and_buf), 0); assert(rc == strlen (big_and_buf)); } static qb_loop_timer_handle more_messages_timer_handle; static void send_some_more_messages_later (void) { cpg_dispatch (cpg_handle, CS_DISPATCH_ALL); qb_loop_timer_add ( ta_poll_handle_get(), QB_LOOP_MED, 300*QB_TIME_NS_IN_MSEC, NULL, send_some_more_messages, &more_messages_timer_handle); } static void send_some_more_messages_zcb (void) { msg_t *my_msg; int i; int send_now; size_t payload_size; size_t total_size; unsigned int sha1_len; cs_error_t res; cpg_flow_control_state_t fc_state; void *zcb_buffer; if (cpg_fd < 0) return; send_now = my_msgs_to_send; payload_size = (rand() % 100000); total_size = payload_size + sizeof (msg_t); cpg_zcb_alloc (cpg_handle, total_size, &zcb_buffer); my_msg = (msg_t*)zcb_buffer; qb_log(LOG_DEBUG, "send_now:%d", send_now); my_msg->pid = my_pid; my_msg->nodeid = my_nodeid; my_msg->size = sizeof (msg_t) + payload_size; my_msg->seq = my_msgs_sent; for (i = 0; i < payload_size; i++) { my_msg->payload[i] = i; } PK11_DigestBegin(sha1_context); PK11_DigestOp(sha1_context, my_msg->payload, payload_size); PK11_DigestFinal(sha1_context, my_msg->sha1, &sha1_len, sizeof(my_msg->sha1)); for (i = 0; i < send_now; i++) { res = cpg_flow_control_state_get (cpg_handle, &fc_state); if (res == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) { /* lets do this later */ send_some_more_messages_later (); qb_log (LOG_INFO, "flow control enabled."); goto free_buffer; } res = cpg_zcb_mcast_joined (cpg_handle, CPG_TYPE_AGREED, zcb_buffer, total_size); if (res == CS_ERR_TRY_AGAIN) { /* lets do this later */ send_some_more_messages_later (); goto free_buffer; } else if (res != CS_OK) { qb_log (LOG_ERR, "cpg_mcast_joined error:%d, exiting.", res); exit (-2); } my_msgs_sent++; my_msgs_to_send--; } free_buffer: cpg_zcb_free (cpg_handle, zcb_buffer); } #define cs_repeat(counter, max, code) do { \ code; \ if (res == CS_ERR_TRY_AGAIN) { \ counter++; \ sleep(counter); \ } \ } while (res == CS_ERR_TRY_AGAIN && counter < max) static unsigned char buffer[200000]; static void send_some_more_messages_normal (void) { msg_t my_msg; struct iovec iov[2]; int i; int send_now; size_t payload_size; cs_error_t res; cpg_flow_control_state_t fc_state; int retries = 0; time_t before; unsigned int sha1_len; if (cpg_fd < 0) return; send_now = my_msgs_to_send; qb_log (LOG_TRACE, "send_now:%d", send_now); my_msg.pid = my_pid; my_msg.nodeid = my_nodeid; payload_size = (rand() % 10000); my_msg.size = sizeof (msg_t) + payload_size; my_msg.seq = my_msgs_sent; for (i = 0; i < payload_size; i++) { buffer[i] = i; } PK11_DigestBegin(sha1_context); PK11_DigestOp(sha1_context, buffer, payload_size); PK11_DigestFinal(sha1_context, my_msg.sha1, &sha1_len, sizeof(my_msg.sha1)); iov[0].iov_len = sizeof (msg_t); iov[0].iov_base = &my_msg; iov[1].iov_len = payload_size; iov[1].iov_base = buffer; for (i = 0; i < send_now; i++) { if (in_cnchg && pcmk_test) { retries = 0; before = time(NULL); cs_repeat(retries, 30, res = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, iov, 2)); if (retries > 20) { qb_log (LOG_ERR, "cs_repeat: blocked for :%lu secs.", (unsigned long)(time(NULL) - before)); } if (res != CS_OK) { qb_log (LOG_ERR, "cpg_mcast_joined error:%d.", res); return; } } else { res = cpg_flow_control_state_get (cpg_handle, &fc_state); if (res == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) { /* lets do this later */ send_some_more_messages_later (); qb_log (LOG_INFO, "flow control enabled."); return; } res = cpg_mcast_joined (cpg_handle, CPG_TYPE_AGREED, iov, 2); if (res == CS_ERR_TRY_AGAIN) { /* lets do this later */ send_some_more_messages_later (); if (i > 0) { qb_log (LOG_INFO, "TRY_AGAIN %d to send.", my_msgs_to_send); } return; } else if (res != CS_OK) { qb_log (LOG_ERR, "cpg_mcast_joined error:%d, exiting.", res); exit (-2); } } my_msgs_sent++; my_msg.seq = my_msgs_sent; my_msgs_to_send--; } qb_log (LOG_TRACE, "sent %d; to send %d.", my_msgs_sent, my_msgs_to_send); } static void send_some_more_messages (void * unused) { if (use_zcb) { send_some_more_messages_zcb (); } else { send_some_more_messages_normal (); } } static void msg_blaster (int sock, char* num_to_send_str) { my_msgs_to_send = atoi (num_to_send_str); my_msgs_sent = 0; my_seq = 1; my_pid = getpid(); use_zcb = QB_FALSE; total_stored_msgs = 0; cpg_local_get (cpg_handle, &my_nodeid); /* control the limits */ if (my_msgs_to_send <= 0) my_msgs_to_send = 1; if (my_msgs_to_send > 10000) my_msgs_to_send = 10000; send_some_more_messages_normal (); } static void context_test (int sock) { char response[100]; char *cmp; ssize_t rc; size_t send_len; cpg_context_set (cpg_handle, response); cpg_context_get (cpg_handle, (void**)&cmp); if (response != cmp) { snprintf (response, 100, "%s", FAIL_STR); } else { snprintf (response, 100, "%s", OK_STR); } send_len = strlen (response); rc = send (sock, response, send_len, 0); assert(rc == send_len); } static void msg_blaster_zcb (int sock, char* num_to_send_str) { my_msgs_to_send = atoi (num_to_send_str); my_seq = 1; my_pid = getpid(); use_zcb = QB_TRUE; total_stored_msgs = 0; cpg_local_get (cpg_handle, &my_nodeid); /* control the limits */ if (my_msgs_to_send <= 0) my_msgs_to_send = 1; if (my_msgs_to_send > 10000) my_msgs_to_send = 10000; send_some_more_messages_zcb (); } static int cfg_dispatch_wrapper_fn ( int fd, int revents, void *data) { cs_error_t error; if (revents & POLLHUP || revents & POLLERR) { qb_log (LOG_ERR, "got POLLHUP disconnecting from CFG"); corosync_cfg_finalize(cfg_handle); cfg_handle = 0; return -1; } error = corosync_cfg_dispatch (cfg_handle, CS_DISPATCH_ALL); if (error == CS_ERR_LIBRARY) { qb_log (LOG_ERR, "got LIB error disconnecting from CFG."); corosync_cfg_finalize(cfg_handle); cfg_handle = 0; return -1; } return 0; } static int cpg_dispatch_wrapper_fn ( int fd, int revents, void *data) { cs_error_t error; if (revents & POLLHUP || revents & POLLERR) { qb_log (LOG_ERR, "got POLLHUP disconnecting from CPG"); cpg_finalize(cpg_handle); cpg_handle = 0; return -1; } error = cpg_dispatch (cpg_handle, CS_DISPATCH_ALL); if (error == CS_ERR_LIBRARY) { qb_log (LOG_ERR, "got LIB error disconnecting from CPG"); cpg_finalize(cpg_handle); cpg_handle = 0; return -1; } return 0; } static void do_command (int sock, char* func, char*args[], int num_args) { int result; char response[100]; struct cpg_name group_name; ssize_t rc; size_t send_len; qb_log (LOG_TRACE, "RPC:%s() called.", func); if (strcmp ("cpg_mcast_joined",func) == 0) { struct iovec iov[5]; int a; for (a = 0; a < num_args; a++) { iov[a].iov_base = args[a]; iov[a].iov_len = strlen(args[a])+1; } cpg_mcast_joined (cpg_handle, CPG_TYPE_AGREED, iov, num_args); } else if (strcmp ("cpg_join",func) == 0) { if (strlen(args[0]) >= CPG_MAX_NAME_LENGTH) { qb_log (LOG_ERR, "Invalid group name"); exit (1); } strcpy (group_name.value, args[0]); group_name.length = strlen(args[0]); result = cpg_join (cpg_handle, &group_name); if (result != CS_OK) { qb_log (LOG_ERR, "Could not join process group, error %d", result); exit (1); } qb_log (LOG_INFO, "called cpg_join(%s)!", group_name.value); } else if (strcmp ("cpg_leave",func) == 0) { strcpy (group_name.value, args[0]); group_name.length = strlen(args[0]); result = cpg_leave (cpg_handle, &group_name); if (result != CS_OK) { qb_log (LOG_ERR, "Could not leave process group, error %d", result); exit (1); } qb_log (LOG_INFO, "called cpg_leave(%s)!", group_name.value); } else if (strcmp ("cpg_initialize",func) == 0) { int retry_count = 0; result = cpg_initialize (&cpg_handle, &callbacks); while (result != CS_OK) { qb_log (LOG_ERR, "cpg_initialize error %d (attempt %d)", result, retry_count); if (retry_count >= 3) { exit (1); } sleep(1); retry_count++; result = cpg_initialize (&cpg_handle, &callbacks); } cpg_fd_get (cpg_handle, &cpg_fd); qb_loop_poll_add (ta_poll_handle_get(), QB_LOOP_MED, cpg_fd, POLLIN|POLLNVAL, NULL, cpg_dispatch_wrapper_fn); } else if (strcmp ("cpg_local_get", func) == 0) { unsigned int local_nodeid; cpg_local_get (cpg_handle, &local_nodeid); snprintf (response, 100, "%u",local_nodeid); send_len = strlen (response); rc = send (sock, response, send_len, 0); assert(rc == send_len); } else if (strcmp ("cpg_finalize", func) == 0) { if (cpg_handle > 0) { cpg_finalize (cpg_handle); cpg_handle = 0; } } else if (strcmp ("record_config_events", func) == 0) { record_config_events (sock); } else if (strcmp ("record_messages", func) == 0) { record_messages (); } else if (strcmp ("read_config_event", func) == 0) { read_config_event (sock); } else if (strcmp ("read_messages", func) == 0) { read_messages (sock, args[0]); } else if (strcmp ("msg_blaster_zcb", func) == 0) { msg_blaster_zcb (sock, args[0]); } else if (strcmp ("pcmk_test", func) == 0) { pcmk_test = 1; } else if (strcmp ("msg_blaster", func) == 0) { msg_blaster (sock, args[0]); } else if (strcmp ("context_test", func) == 0) { context_test (sock); } else if (strcmp ("are_you_ok_dude", func) == 0) { snprintf (response, 100, "%s", OK_STR); send_len = strlen (response); rc = send (sock, response, strlen (response), 0); assert(rc == send_len); } else if (strcmp ("cfg_shutdown", func) == 0) { qb_log (LOG_INFO, "calling %s() called!", func); result = corosync_cfg_try_shutdown (cfg_handle, COROSYNC_CFG_SHUTDOWN_FLAG_REQUEST); qb_log (LOG_INFO,"%s() returned %d!", func, result); } else if (strcmp ("cfg_initialize",func) == 0) { int retry_count = 0; qb_log (LOG_INFO,"%s() called!", func); result = corosync_cfg_initialize (&cfg_handle, &cfg_callbacks); while (result != CS_OK) { qb_log (LOG_ERR, "cfg_initialize error %d (attempt %d)", result, retry_count); if (retry_count >= 3) { exit (1); } sleep(1); retry_count++; result = corosync_cfg_initialize (&cfg_handle, &cfg_callbacks); } qb_log (LOG_INFO,"corosync_cfg_initialize() == %d", result); result = corosync_cfg_fd_get (cfg_handle, &cfg_fd); qb_log (LOG_INFO,"corosync_cfg_fd_get() == %d", result); qb_loop_poll_add (ta_poll_handle_get(), QB_LOOP_MED, cfg_fd, POLLIN|POLLNVAL, NULL, cfg_dispatch_wrapper_fn); } else { qb_log(LOG_ERR, "RPC:%s not supported!", func); } } static void my_pre_exit(void) { qb_log (LOG_INFO, "%s PRE EXIT", __FILE__); if (cpg_handle > 0) { cpg_finalize (cpg_handle); cpg_handle = 0; } if (cfg_handle > 0) { corosync_cfg_finalize (cfg_handle); cfg_handle = 0; } PK11_DestroyContext(sha1_context, PR_TRUE); } int main(int argc, char *argv[]) { qb_list_init (&msg_log_head); qb_list_init (&config_chg_log_head); if (NSS_NoDB_Init(".") != SECSuccess) { qb_log(LOG_ERR, "Couldn't initialize nss"); exit (0); } if ((sha1_context = PK11_CreateDigestContext(SEC_OID_SHA1)) == NULL) { qb_log(LOG_ERR, "Couldn't initialize nss"); exit (0); } return test_agent_run ("cpg_test_agent", 9034, do_command, my_pre_exit); } diff --git a/exec/cmap.c b/exec/cmap.c index 161abc5d..646ae89e 100644 --- a/exec/cmap.c +++ b/exec/cmap.c @@ -1,1028 +1,1027 @@ /* * Copyright (c) 2011-2012 Red Hat, Inc. * * All rights reserved. * * Author: Jan Friesse (jfriesse@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the Red Hat, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include -#include #include #include #include #include #include #include #include #include #include #include #include #include "service.h" LOGSYS_DECLARE_SUBSYS ("CMAP"); #define MAX_REQ_EXEC_CMAP_MCAST_ITEMS 32 #define ICMAP_VALUETYPE_NOT_EXIST 0 struct cmap_conn_info { struct hdb_handle_database iter_db; struct hdb_handle_database track_db; }; typedef uint64_t cmap_iter_handle_t; typedef uint64_t cmap_track_handle_t; struct cmap_track_user_data { void *conn; cmap_track_handle_t track_handle; uint64_t track_inst_handle; }; enum cmap_message_req_types { MESSAGE_REQ_EXEC_CMAP_MCAST = 0, }; enum cmap_mcast_reason { CMAP_MCAST_REASON_SYNC = 0, CMAP_MCAST_REASON_NEW_CONFIG_VERSION = 1, }; static struct corosync_api_v1 *api; static char *cmap_exec_init_fn (struct corosync_api_v1 *corosync_api); static int cmap_exec_exit_fn(void); static int cmap_lib_init_fn (void *conn); static int cmap_lib_exit_fn (void *conn); static void message_handler_req_lib_cmap_set(void *conn, const void *message); static void message_handler_req_lib_cmap_delete(void *conn, const void *message); static void message_handler_req_lib_cmap_get(void *conn, const void *message); static void message_handler_req_lib_cmap_adjust_int(void *conn, const void *message); static void message_handler_req_lib_cmap_iter_init(void *conn, const void *message); static void message_handler_req_lib_cmap_iter_next(void *conn, const void *message); static void message_handler_req_lib_cmap_iter_finalize(void *conn, const void *message); static void message_handler_req_lib_cmap_track_add(void *conn, const void *message); static void message_handler_req_lib_cmap_track_delete(void *conn, const void *message); static void cmap_notify_fn(int32_t event, const char *key_name, struct icmap_notify_value new_val, struct icmap_notify_value old_val, void *user_data); static void message_handler_req_exec_cmap_mcast( const void *message, unsigned int nodeid); static void exec_cmap_mcast_endian_convert(void *message); /* * Reson is subtype of message. argc is number of items in argv array. Argv is array * of strings (key names) which will be send to wire. There can be maximum * MAX_REQ_EXEC_CMAP_MCAST_ITEMS items (for more items, CS_ERR_TOO_MANY_GROUPS * error is returned). If key is not found, item has type ICMAP_VALUETYPE_NOT_EXIST * and length zero. */ static cs_error_t cmap_mcast_send(enum cmap_mcast_reason reason, int argc, char *argv[]); static void cmap_sync_init ( const unsigned int *trans_list, size_t trans_list_entries, const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id); static int cmap_sync_process (void); static void cmap_sync_activate (void); static void cmap_sync_abort (void); static void cmap_config_version_track_cb( int32_t event, const char *key_name, struct icmap_notify_value new_value, struct icmap_notify_value old_value, void *user_data); /* * Library Handler Definition */ static struct corosync_lib_handler cmap_lib_engine[] = { { /* 0 */ .lib_handler_fn = message_handler_req_lib_cmap_set, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 1 */ .lib_handler_fn = message_handler_req_lib_cmap_delete, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 2 */ .lib_handler_fn = message_handler_req_lib_cmap_get, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 3 */ .lib_handler_fn = message_handler_req_lib_cmap_adjust_int, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 4 */ .lib_handler_fn = message_handler_req_lib_cmap_iter_init, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 5 */ .lib_handler_fn = message_handler_req_lib_cmap_iter_next, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 6 */ .lib_handler_fn = message_handler_req_lib_cmap_iter_finalize, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 7 */ .lib_handler_fn = message_handler_req_lib_cmap_track_add, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 8 */ .lib_handler_fn = message_handler_req_lib_cmap_track_delete, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, }; static struct corosync_exec_handler cmap_exec_engine[] = { { /* 0 - MESSAGE_REQ_EXEC_CMAP_MCAST */ .exec_handler_fn = message_handler_req_exec_cmap_mcast, .exec_endian_convert_fn = exec_cmap_mcast_endian_convert }, }; struct corosync_service_engine cmap_service_engine = { .name = "corosync configuration map access", .id = CMAP_SERVICE, .priority = 1, .private_data_size = sizeof(struct cmap_conn_info), .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED, .allow_inquorate = CS_LIB_ALLOW_INQUORATE, .lib_init_fn = cmap_lib_init_fn, .lib_exit_fn = cmap_lib_exit_fn, .lib_engine = cmap_lib_engine, .lib_engine_count = sizeof (cmap_lib_engine) / sizeof (struct corosync_lib_handler), .exec_init_fn = cmap_exec_init_fn, .exec_exit_fn = cmap_exec_exit_fn, .exec_engine = cmap_exec_engine, .exec_engine_count = sizeof (cmap_exec_engine) / sizeof (struct corosync_exec_handler), .sync_init = cmap_sync_init, .sync_process = cmap_sync_process, .sync_activate = cmap_sync_activate, .sync_abort = cmap_sync_abort }; struct corosync_service_engine *cmap_get_service_engine_ver0 (void) { return (&cmap_service_engine); } struct req_exec_cmap_mcast_item { mar_name_t key_name __attribute__((aligned(8))); mar_uint8_t value_type __attribute__((aligned(8))); mar_size_t value_len __attribute__((aligned(8))); uint8_t value[] __attribute__((aligned(8))); }; struct req_exec_cmap_mcast { struct qb_ipc_request_header header __attribute__((aligned(8))); mar_uint8_t reason __attribute__((aligned(8))); mar_uint8_t no_items __attribute__((aligned(8))); mar_uint8_t reserved1 __attribute__((aligned(8))); mar_uint8_t reserver2 __attribute__((aligned(8))); /* * Following are array of req_exec_cmap_mcast_item alligned to 8 bytes */ }; static size_t cmap_sync_trans_list_entries = 0; static size_t cmap_sync_member_list_entries = 0; static uint64_t cmap_highest_config_version_received = 0; static uint64_t cmap_my_config_version = 0; static int cmap_first_sync = 1; static icmap_track_t cmap_config_version_track; static void cmap_config_version_track_cb( int32_t event, const char *key_name, struct icmap_notify_value new_value, struct icmap_notify_value old_value, void *user_data) { const char *key = "totem.config_version"; cs_error_t ret; ENTER(); if (icmap_get_uint64("totem.config_version", &cmap_my_config_version) != CS_OK) { cmap_my_config_version = 0; } ret = cmap_mcast_send(CMAP_MCAST_REASON_NEW_CONFIG_VERSION, 1, (char **)&key); if (ret != CS_OK) { log_printf(LOGSYS_LEVEL_ERROR, "Can't inform other nodes about new config version"); } LEAVE(); } static int cmap_exec_exit_fn(void) { if (icmap_track_delete(cmap_config_version_track) != CS_OK) { log_printf(LOGSYS_LEVEL_ERROR, "Can't delete config_version icmap tracker"); } return 0; } static char *cmap_exec_init_fn ( struct corosync_api_v1 *corosync_api) { cs_error_t ret; api = corosync_api; ret = icmap_track_add("totem.config_version", ICMAP_TRACK_ADD | ICMAP_TRACK_DELETE | ICMAP_TRACK_MODIFY, cmap_config_version_track_cb, NULL, &cmap_config_version_track); if (ret != CS_OK) { return ((char *)"Can't add config_version icmap tracker"); } return (NULL); } static int cmap_lib_init_fn (void *conn) { struct cmap_conn_info *conn_info = (struct cmap_conn_info *)api->ipc_private_data_get (conn); log_printf(LOGSYS_LEVEL_DEBUG, "lib_init_fn: conn=%p", conn); api->ipc_refcnt_inc(conn); memset(conn_info, 0, sizeof(*conn_info)); hdb_create(&conn_info->iter_db); hdb_create(&conn_info->track_db); return (0); } static int cmap_lib_exit_fn (void *conn) { struct cmap_conn_info *conn_info = (struct cmap_conn_info *)api->ipc_private_data_get (conn); hdb_handle_t iter_handle = 0; icmap_iter_t *iter; hdb_handle_t track_handle = 0; icmap_track_t *track; log_printf(LOGSYS_LEVEL_DEBUG, "exit_fn for conn=%p", conn); hdb_iterator_reset(&conn_info->iter_db); while (hdb_iterator_next(&conn_info->iter_db, (void*)&iter, &iter_handle) == 0) { icmap_iter_finalize(*iter); (void)hdb_handle_put (&conn_info->iter_db, iter_handle); } hdb_destroy(&conn_info->iter_db); hdb_iterator_reset(&conn_info->track_db); while (hdb_iterator_next(&conn_info->track_db, (void*)&track, &track_handle) == 0) { free(icmap_track_get_user_data(*track)); icmap_track_delete(*track); (void)hdb_handle_put (&conn_info->track_db, track_handle); } hdb_destroy(&conn_info->track_db); api->ipc_refcnt_dec(conn); return (0); } static void cmap_sync_init ( const unsigned int *trans_list, size_t trans_list_entries, const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id) { cmap_sync_trans_list_entries = trans_list_entries; cmap_sync_member_list_entries = member_list_entries; cmap_highest_config_version_received = 0; if (icmap_get_uint64("totem.config_version", &cmap_my_config_version) != CS_OK) { cmap_my_config_version = 0; } } static int cmap_sync_process (void) { const char *key = "totem.config_version"; cs_error_t ret; ret = cmap_mcast_send(CMAP_MCAST_REASON_SYNC, 1, (char **)&key); return (ret == CS_OK ? 0 : -1); } static void cmap_sync_activate (void) { if (cmap_sync_trans_list_entries == 0) { log_printf(LOGSYS_LEVEL_DEBUG, "Single node sync -> no action"); return ; } if (cmap_first_sync == 1) { cmap_first_sync = 0; } else { log_printf(LOGSYS_LEVEL_DEBUG, "Not first sync -> no action"); return ; } if (cmap_my_config_version == 0) { log_printf(LOGSYS_LEVEL_DEBUG, "My config version is 0 -> no action"); return ; } if (cmap_highest_config_version_received == 0) { log_printf(LOGSYS_LEVEL_DEBUG, "Other nodes version is 0 -> no action"); return ; } if (cmap_highest_config_version_received != cmap_my_config_version) { log_printf(LOGSYS_LEVEL_ERROR, "Received config version (%"PRIu64") is different than my config version (%"PRIu64")! Exiting", cmap_highest_config_version_received, cmap_my_config_version); api->shutdown_request(); return ; } } static void cmap_sync_abort (void) { } static void message_handler_req_lib_cmap_set(void *conn, const void *message) { const struct req_lib_cmap_set *req_lib_cmap_set = message; struct res_lib_cmap_set res_lib_cmap_set; cs_error_t ret; if (icmap_is_key_ro((char *)req_lib_cmap_set->key_name.value)) { ret = CS_ERR_ACCESS; } else { ret = icmap_set((char *)req_lib_cmap_set->key_name.value, &req_lib_cmap_set->value, req_lib_cmap_set->value_len, req_lib_cmap_set->type); } memset(&res_lib_cmap_set, 0, sizeof(res_lib_cmap_set)); res_lib_cmap_set.header.size = sizeof(res_lib_cmap_set); res_lib_cmap_set.header.id = MESSAGE_RES_CMAP_SET; res_lib_cmap_set.header.error = ret; api->ipc_response_send(conn, &res_lib_cmap_set, sizeof(res_lib_cmap_set)); } static void message_handler_req_lib_cmap_delete(void *conn, const void *message) { const struct req_lib_cmap_set *req_lib_cmap_set = message; struct res_lib_cmap_delete res_lib_cmap_delete; cs_error_t ret; if (icmap_is_key_ro((char *)req_lib_cmap_set->key_name.value)) { ret = CS_ERR_ACCESS; } else { ret = icmap_delete((char *)req_lib_cmap_set->key_name.value); } memset(&res_lib_cmap_delete, 0, sizeof(res_lib_cmap_delete)); res_lib_cmap_delete.header.size = sizeof(res_lib_cmap_delete); res_lib_cmap_delete.header.id = MESSAGE_RES_CMAP_DELETE; res_lib_cmap_delete.header.error = ret; api->ipc_response_send(conn, &res_lib_cmap_delete, sizeof(res_lib_cmap_delete)); } static void message_handler_req_lib_cmap_get(void *conn, const void *message) { const struct req_lib_cmap_get *req_lib_cmap_get = message; struct res_lib_cmap_get *res_lib_cmap_get; struct res_lib_cmap_get error_res_lib_cmap_get; cs_error_t ret; size_t value_len; size_t res_lib_cmap_get_size; icmap_value_types_t type; void *value; value_len = req_lib_cmap_get->value_len; res_lib_cmap_get_size = sizeof(*res_lib_cmap_get) + value_len; res_lib_cmap_get = malloc(res_lib_cmap_get_size); if (res_lib_cmap_get == NULL) { ret = CS_ERR_NO_MEMORY; goto error_exit; } memset(res_lib_cmap_get, 0, res_lib_cmap_get_size); if (value_len > 0) { value = res_lib_cmap_get->value; } else { value = NULL; } ret = icmap_get((char *)req_lib_cmap_get->key_name.value, value, &value_len, &type); if (ret != CS_OK) { free(res_lib_cmap_get); goto error_exit; } res_lib_cmap_get->header.size = res_lib_cmap_get_size; res_lib_cmap_get->header.id = MESSAGE_RES_CMAP_GET; res_lib_cmap_get->header.error = ret; res_lib_cmap_get->type = type; res_lib_cmap_get->value_len = value_len; api->ipc_response_send(conn, res_lib_cmap_get, res_lib_cmap_get_size); free(res_lib_cmap_get); return ; error_exit: memset(&error_res_lib_cmap_get, 0, sizeof(error_res_lib_cmap_get)); error_res_lib_cmap_get.header.size = sizeof(error_res_lib_cmap_get); error_res_lib_cmap_get.header.id = MESSAGE_RES_CMAP_GET; error_res_lib_cmap_get.header.error = ret; api->ipc_response_send(conn, &error_res_lib_cmap_get, sizeof(error_res_lib_cmap_get)); } static void message_handler_req_lib_cmap_adjust_int(void *conn, const void *message) { const struct req_lib_cmap_adjust_int *req_lib_cmap_adjust_int = message; struct res_lib_cmap_adjust_int res_lib_cmap_adjust_int; cs_error_t ret; if (icmap_is_key_ro((char *)req_lib_cmap_adjust_int->key_name.value)) { ret = CS_ERR_ACCESS; } else { ret = icmap_adjust_int((char *)req_lib_cmap_adjust_int->key_name.value, req_lib_cmap_adjust_int->step); } memset(&res_lib_cmap_adjust_int, 0, sizeof(res_lib_cmap_adjust_int)); res_lib_cmap_adjust_int.header.size = sizeof(res_lib_cmap_adjust_int); res_lib_cmap_adjust_int.header.id = MESSAGE_RES_CMAP_ADJUST_INT; res_lib_cmap_adjust_int.header.error = ret; api->ipc_response_send(conn, &res_lib_cmap_adjust_int, sizeof(res_lib_cmap_adjust_int)); } static void message_handler_req_lib_cmap_iter_init(void *conn, const void *message) { const struct req_lib_cmap_iter_init *req_lib_cmap_iter_init = message; struct res_lib_cmap_iter_init res_lib_cmap_iter_init; cs_error_t ret; icmap_iter_t iter; icmap_iter_t *hdb_iter; cmap_iter_handle_t handle = 0ULL; const char *prefix; struct cmap_conn_info *conn_info = (struct cmap_conn_info *)api->ipc_private_data_get (conn); if (req_lib_cmap_iter_init->prefix.length > 0) { prefix = (char *)req_lib_cmap_iter_init->prefix.value; } else { prefix = NULL; } iter = icmap_iter_init(prefix); if (iter == NULL) { ret = CS_ERR_NO_SECTIONS; goto reply_send; } ret = hdb_error_to_cs(hdb_handle_create(&conn_info->iter_db, sizeof(iter), &handle)); if (ret != CS_OK) { goto reply_send; } ret = hdb_error_to_cs(hdb_handle_get(&conn_info->iter_db, handle, (void *)&hdb_iter)); if (ret != CS_OK) { goto reply_send; } *hdb_iter = iter; (void)hdb_handle_put (&conn_info->iter_db, handle); reply_send: memset(&res_lib_cmap_iter_init, 0, sizeof(res_lib_cmap_iter_init)); res_lib_cmap_iter_init.header.size = sizeof(res_lib_cmap_iter_init); res_lib_cmap_iter_init.header.id = MESSAGE_RES_CMAP_ITER_INIT; res_lib_cmap_iter_init.header.error = ret; res_lib_cmap_iter_init.iter_handle = handle; api->ipc_response_send(conn, &res_lib_cmap_iter_init, sizeof(res_lib_cmap_iter_init)); } static void message_handler_req_lib_cmap_iter_next(void *conn, const void *message) { const struct req_lib_cmap_iter_next *req_lib_cmap_iter_next = message; struct res_lib_cmap_iter_next res_lib_cmap_iter_next; cs_error_t ret; icmap_iter_t *iter; size_t value_len = 0; icmap_value_types_t type = 0; const char *res = NULL; struct cmap_conn_info *conn_info = (struct cmap_conn_info *)api->ipc_private_data_get (conn); ret = hdb_error_to_cs(hdb_handle_get(&conn_info->iter_db, req_lib_cmap_iter_next->iter_handle, (void *)&iter)); if (ret != CS_OK) { goto reply_send; } res = icmap_iter_next(*iter, &value_len, &type); if (res == NULL) { ret = CS_ERR_NO_SECTIONS; } (void)hdb_handle_put (&conn_info->iter_db, req_lib_cmap_iter_next->iter_handle); reply_send: memset(&res_lib_cmap_iter_next, 0, sizeof(res_lib_cmap_iter_next)); res_lib_cmap_iter_next.header.size = sizeof(res_lib_cmap_iter_next); res_lib_cmap_iter_next.header.id = MESSAGE_RES_CMAP_ITER_NEXT; res_lib_cmap_iter_next.header.error = ret; if (res != NULL) { res_lib_cmap_iter_next.value_len = value_len; res_lib_cmap_iter_next.type = type; memcpy(res_lib_cmap_iter_next.key_name.value, res, strlen(res)); res_lib_cmap_iter_next.key_name.length = strlen(res); } api->ipc_response_send(conn, &res_lib_cmap_iter_next, sizeof(res_lib_cmap_iter_next)); } static void message_handler_req_lib_cmap_iter_finalize(void *conn, const void *message) { const struct req_lib_cmap_iter_finalize *req_lib_cmap_iter_finalize = message; struct res_lib_cmap_iter_finalize res_lib_cmap_iter_finalize; cs_error_t ret; icmap_iter_t *iter; struct cmap_conn_info *conn_info = (struct cmap_conn_info *)api->ipc_private_data_get (conn); ret = hdb_error_to_cs(hdb_handle_get(&conn_info->iter_db, req_lib_cmap_iter_finalize->iter_handle, (void *)&iter)); if (ret != CS_OK) { goto reply_send; } icmap_iter_finalize(*iter); (void)hdb_handle_destroy(&conn_info->iter_db, req_lib_cmap_iter_finalize->iter_handle); (void)hdb_handle_put (&conn_info->iter_db, req_lib_cmap_iter_finalize->iter_handle); reply_send: memset(&res_lib_cmap_iter_finalize, 0, sizeof(res_lib_cmap_iter_finalize)); res_lib_cmap_iter_finalize.header.size = sizeof(res_lib_cmap_iter_finalize); res_lib_cmap_iter_finalize.header.id = MESSAGE_RES_CMAP_ITER_FINALIZE; res_lib_cmap_iter_finalize.header.error = ret; api->ipc_response_send(conn, &res_lib_cmap_iter_finalize, sizeof(res_lib_cmap_iter_finalize)); } static void cmap_notify_fn(int32_t event, const char *key_name, struct icmap_notify_value new_val, struct icmap_notify_value old_val, void *user_data) { struct cmap_track_user_data *cmap_track_user_data = (struct cmap_track_user_data *)user_data; struct res_lib_cmap_notify_callback res_lib_cmap_notify_callback; struct iovec iov[3]; memset(&res_lib_cmap_notify_callback, 0, sizeof(res_lib_cmap_notify_callback)); res_lib_cmap_notify_callback.header.size = sizeof(res_lib_cmap_notify_callback) + new_val.len + old_val.len; res_lib_cmap_notify_callback.header.id = MESSAGE_RES_CMAP_NOTIFY_CALLBACK; res_lib_cmap_notify_callback.header.error = CS_OK; res_lib_cmap_notify_callback.new_value_type = new_val.type; res_lib_cmap_notify_callback.old_value_type = old_val.type; res_lib_cmap_notify_callback.new_value_len = new_val.len; res_lib_cmap_notify_callback.old_value_len = old_val.len; res_lib_cmap_notify_callback.event = event; res_lib_cmap_notify_callback.key_name.length = strlen(key_name); res_lib_cmap_notify_callback.track_inst_handle = cmap_track_user_data->track_inst_handle; memcpy(res_lib_cmap_notify_callback.key_name.value, key_name, strlen(key_name)); iov[0].iov_base = (char *)&res_lib_cmap_notify_callback; iov[0].iov_len = sizeof(res_lib_cmap_notify_callback); iov[1].iov_base = (char *)new_val.data; iov[1].iov_len = new_val.len; iov[2].iov_base = (char *)old_val.data; iov[2].iov_len = old_val.len; api->ipc_dispatch_iov_send(cmap_track_user_data->conn, iov, 3); } static void message_handler_req_lib_cmap_track_add(void *conn, const void *message) { const struct req_lib_cmap_track_add *req_lib_cmap_track_add = message; struct res_lib_cmap_track_add res_lib_cmap_track_add; cs_error_t ret; cmap_track_handle_t handle = 0; icmap_track_t track = NULL; icmap_track_t *hdb_track; struct cmap_track_user_data *cmap_track_user_data; const char *key_name; struct cmap_conn_info *conn_info = (struct cmap_conn_info *)api->ipc_private_data_get (conn); cmap_track_user_data = malloc(sizeof(*cmap_track_user_data)); if (cmap_track_user_data == NULL) { ret = CS_ERR_NO_MEMORY; goto reply_send; } memset(cmap_track_user_data, 0, sizeof(*cmap_track_user_data)); if (req_lib_cmap_track_add->key_name.length > 0) { key_name = (char *)req_lib_cmap_track_add->key_name.value; } else { key_name = NULL; } ret = icmap_track_add(key_name, req_lib_cmap_track_add->track_type, cmap_notify_fn, cmap_track_user_data, &track); if (ret != CS_OK) { free(cmap_track_user_data); goto reply_send; } ret = hdb_error_to_cs(hdb_handle_create(&conn_info->track_db, sizeof(track), &handle)); if (ret != CS_OK) { free(cmap_track_user_data); goto reply_send; } ret = hdb_error_to_cs(hdb_handle_get(&conn_info->track_db, handle, (void *)&hdb_track)); if (ret != CS_OK) { free(cmap_track_user_data); goto reply_send; } *hdb_track = track; cmap_track_user_data->conn = conn; cmap_track_user_data->track_handle = handle; cmap_track_user_data->track_inst_handle = req_lib_cmap_track_add->track_inst_handle; (void)hdb_handle_put (&conn_info->track_db, handle); reply_send: memset(&res_lib_cmap_track_add, 0, sizeof(res_lib_cmap_track_add)); res_lib_cmap_track_add.header.size = sizeof(res_lib_cmap_track_add); res_lib_cmap_track_add.header.id = MESSAGE_RES_CMAP_TRACK_ADD; res_lib_cmap_track_add.header.error = ret; res_lib_cmap_track_add.track_handle = handle; api->ipc_response_send(conn, &res_lib_cmap_track_add, sizeof(res_lib_cmap_track_add)); } static void message_handler_req_lib_cmap_track_delete(void *conn, const void *message) { const struct req_lib_cmap_track_delete *req_lib_cmap_track_delete = message; struct res_lib_cmap_track_delete res_lib_cmap_track_delete; cs_error_t ret; icmap_track_t *track; struct cmap_conn_info *conn_info = (struct cmap_conn_info *)api->ipc_private_data_get (conn); uint64_t track_inst_handle = 0; ret = hdb_error_to_cs(hdb_handle_get(&conn_info->track_db, req_lib_cmap_track_delete->track_handle, (void *)&track)); if (ret != CS_OK) { goto reply_send; } track_inst_handle = ((struct cmap_track_user_data *)icmap_track_get_user_data(*track))->track_inst_handle; free(icmap_track_get_user_data(*track)); ret = icmap_track_delete(*track); (void)hdb_handle_put (&conn_info->track_db, req_lib_cmap_track_delete->track_handle); (void)hdb_handle_destroy(&conn_info->track_db, req_lib_cmap_track_delete->track_handle); reply_send: memset(&res_lib_cmap_track_delete, 0, sizeof(res_lib_cmap_track_delete)); res_lib_cmap_track_delete.header.size = sizeof(res_lib_cmap_track_delete); res_lib_cmap_track_delete.header.id = MESSAGE_RES_CMAP_TRACK_DELETE; res_lib_cmap_track_delete.header.error = ret; res_lib_cmap_track_delete.track_inst_handle = track_inst_handle; api->ipc_response_send(conn, &res_lib_cmap_track_delete, sizeof(res_lib_cmap_track_delete)); } static cs_error_t cmap_mcast_send(enum cmap_mcast_reason reason, int argc, char *argv[]) { int i; size_t value_len; icmap_value_types_t value_type; cs_error_t err; size_t item_len; size_t msg_len = 0; struct req_exec_cmap_mcast req_exec_cmap_mcast; struct req_exec_cmap_mcast_item *item = NULL; struct iovec req_exec_cmap_iovec[MAX_REQ_EXEC_CMAP_MCAST_ITEMS + 1]; ENTER(); if (argc > MAX_REQ_EXEC_CMAP_MCAST_ITEMS) { return (CS_ERR_TOO_MANY_GROUPS); } memset(req_exec_cmap_iovec, 0, sizeof(req_exec_cmap_iovec)); for (i = 0; i < argc; i++) { err = icmap_get(argv[i], NULL, &value_len, &value_type); if (err != CS_OK && err != CS_ERR_NOT_EXIST) { goto free_mem; } if (err == CS_ERR_NOT_EXIST) { value_type = ICMAP_VALUETYPE_NOT_EXIST; value_len = 0; } item_len = MAR_ALIGN_UP(sizeof(*item) + value_len, 8); item = malloc(item_len); if (item == NULL) { goto free_mem; } memset(item, 0, item_len); item->value_type = value_type; item->value_len = value_len; item->key_name.length = strlen(argv[i]); strcpy((char *)item->key_name.value, argv[i]); if (value_type != ICMAP_VALUETYPE_NOT_EXIST) { err = icmap_get(argv[i], item->value, &value_len, &value_type); if (err != CS_OK) { goto free_mem; } } req_exec_cmap_iovec[i + 1].iov_base = item; req_exec_cmap_iovec[i + 1].iov_len = item_len; msg_len += item_len; qb_log(LOG_TRACE, "Item %u - type %u, len %zu", i, item->value_type, item->value_len); item = NULL; } memset(&req_exec_cmap_mcast, 0, sizeof(req_exec_cmap_mcast)); req_exec_cmap_mcast.header.size = sizeof(req_exec_cmap_mcast) + msg_len; req_exec_cmap_mcast.reason = reason; req_exec_cmap_mcast.no_items = argc; req_exec_cmap_iovec[0].iov_base = &req_exec_cmap_mcast; req_exec_cmap_iovec[0].iov_len = sizeof(req_exec_cmap_mcast); qb_log(LOG_TRACE, "Sending %u items (%u iovec) for reason %u", argc, argc + 1, reason); err = (api->totem_mcast(req_exec_cmap_iovec, argc + 1, TOTEM_AGREED) == 0 ? CS_OK : CS_ERR_MESSAGE_ERROR); free_mem: for (i = 0; i < argc; i++) { free(req_exec_cmap_iovec[i + 1].iov_base); } free(item); LEAVE(); return (err); } static struct req_exec_cmap_mcast_item *cmap_mcast_item_find( const void *message, char *key) { const struct req_exec_cmap_mcast *req_exec_cmap_mcast = message; int i; const char *p; struct req_exec_cmap_mcast_item *item; mar_uint16_t key_name_len; p = (const char *)message + sizeof(*req_exec_cmap_mcast); for (i = 0; i < req_exec_cmap_mcast->no_items; i++) { item = (struct req_exec_cmap_mcast_item *)p; key_name_len = item->key_name.length; if (strlen(key) == key_name_len && strcmp((char *)item->key_name.value, key) == 0) { return (item); } p += MAR_ALIGN_UP(sizeof(*item) + item->value_len, 8); } return (NULL); } static void message_handler_req_exec_cmap_mcast_reason_sync_nv( enum cmap_mcast_reason reason, const void *message, unsigned int nodeid) { char member_config_version[ICMAP_KEYNAME_MAXLEN]; uint64_t config_version = 0; struct req_exec_cmap_mcast_item *item; mar_size_t value_len; ENTER(); item = cmap_mcast_item_find(message, (char *)"totem.config_version"); if (item != NULL) { value_len = item->value_len; if (item->value_type == ICMAP_VALUETYPE_NOT_EXIST) { config_version = 0; } if (item->value_type == ICMAP_VALUETYPE_UINT64) { memcpy(&config_version, item->value, value_len); } } qb_log(LOG_TRACE, "Received config version %"PRIu64" from node %x", config_version, nodeid); if (nodeid != api->totem_nodeid_get() && config_version > cmap_highest_config_version_received) { cmap_highest_config_version_received = config_version; } snprintf(member_config_version, ICMAP_KEYNAME_MAXLEN, "runtime.totem.pg.mrp.srp.members.%u.config_version", nodeid); icmap_set_uint64(member_config_version, config_version); LEAVE(); } static void message_handler_req_exec_cmap_mcast( const void *message, unsigned int nodeid) { const struct req_exec_cmap_mcast *req_exec_cmap_mcast = message; ENTER(); switch (req_exec_cmap_mcast->reason) { case CMAP_MCAST_REASON_SYNC: message_handler_req_exec_cmap_mcast_reason_sync_nv(req_exec_cmap_mcast->reason, message, nodeid); break; case CMAP_MCAST_REASON_NEW_CONFIG_VERSION: message_handler_req_exec_cmap_mcast_reason_sync_nv(req_exec_cmap_mcast->reason, message, nodeid); break; default: qb_log(LOG_TRACE, "Received mcast with unknown reason %u", req_exec_cmap_mcast->reason); }; LEAVE(); } static void exec_cmap_mcast_endian_convert(void *message) { struct req_exec_cmap_mcast *req_exec_cmap_mcast = message; const char *p; int i; struct req_exec_cmap_mcast_item *item; uint16_t u16; uint32_t u32; uint64_t u64; float flt; double dbl; swab_coroipc_request_header_t(&req_exec_cmap_mcast->header); p = (const char *)message + sizeof(*req_exec_cmap_mcast); for (i = 0; i < req_exec_cmap_mcast->no_items; i++) { item = (struct req_exec_cmap_mcast_item *)p; swab_mar_uint16_t(&item->key_name.length); swab_mar_size_t(&item->value_len); switch (item->value_type) { case ICMAP_VALUETYPE_INT16: case ICMAP_VALUETYPE_UINT16: memcpy(&u16, item->value, sizeof(u16)); u16 = swab16(u16); memcpy(item->value, &u16, sizeof(u16)); break; case ICMAP_VALUETYPE_INT32: case ICMAP_VALUETYPE_UINT32: memcpy(&u32, item->value, sizeof(u32)); u32 = swab32(u32); memcpy(item->value, &u32, sizeof(u32)); break; case ICMAP_VALUETYPE_INT64: case ICMAP_VALUETYPE_UINT64: memcpy(&u64, item->value, sizeof(u64)); u64 = swab64(u64); memcpy(item->value, &u64, sizeof(u64)); break; case ICMAP_VALUETYPE_FLOAT: memcpy(&flt, item->value, sizeof(flt)); swabflt(&flt); memcpy(item->value, &flt, sizeof(flt)); break; case ICMAP_VALUETYPE_DOUBLE: memcpy(&dbl, item->value, sizeof(dbl)); swabdbl(&dbl); memcpy(item->value, &dbl, sizeof(dbl)); break; } p += MAR_ALIGN_UP(sizeof(*item) + item->value_len, 8); } } diff --git a/exec/cpg.c b/exec/cpg.c index 0e117137..4c97437a 100644 --- a/exec/cpg.c +++ b/exec/cpg.c @@ -1,2384 +1,2382 @@ /* * Copyright (c) 2006-2015 Red Hat, Inc. * * All rights reserved. * * Author: Christine Caulfield (ccaulfie@redhat.com) * Author: Jan Friesse (jfriesse@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #ifdef HAVE_ALLOCA_H #include #endif #include #include #include #include #include #include #include #include #include #include #include #include #include -#include -#include #include #include #include #include #include #include #include #include #include #include #include #ifndef MAP_ANONYMOUS #define MAP_ANONYMOUS MAP_ANON #endif #include "service.h" LOGSYS_DECLARE_SUBSYS ("CPG"); #define GROUP_HASH_SIZE 32 enum cpg_message_req_types { MESSAGE_REQ_EXEC_CPG_PROCJOIN = 0, MESSAGE_REQ_EXEC_CPG_PROCLEAVE = 1, MESSAGE_REQ_EXEC_CPG_JOINLIST = 2, MESSAGE_REQ_EXEC_CPG_MCAST = 3, MESSAGE_REQ_EXEC_CPG_DOWNLIST_OLD = 4, MESSAGE_REQ_EXEC_CPG_DOWNLIST = 5, MESSAGE_REQ_EXEC_CPG_PARTIAL_MCAST = 6, }; struct zcb_mapped { struct qb_list_head list; void *addr; size_t size; }; /* * state` exec deliver * match group name, pid -> if matched deliver for YES: * XXX indicates impossible state * * join leave mcast * UNJOINED XXX XXX NO * LEAVE_STARTED XXX YES(unjoined_enter) YES * JOIN_STARTED YES(join_started_enter) XXX NO * JOIN_COMPLETED XXX NO YES * * join_started_enter * set JOIN_COMPLETED * add entry to process_info list * unjoined_enter * set UNJOINED * delete entry from process_info list * * * library accept join error codes * UNJOINED YES(CS_OK) set JOIN_STARTED * LEAVE_STARTED NO(CS_ERR_BUSY) * JOIN_STARTED NO(CS_ERR_EXIST) * JOIN_COMPlETED NO(CS_ERR_EXIST) * * library accept leave error codes * UNJOINED NO(CS_ERR_NOT_EXIST) * LEAVE_STARTED NO(CS_ERR_NOT_EXIST) * JOIN_STARTED NO(CS_ERR_BUSY) * JOIN_COMPLETED YES(CS_OK) set LEAVE_STARTED * * library accept mcast * UNJOINED NO(CS_ERR_NOT_EXIST) * LEAVE_STARTED NO(CS_ERR_NOT_EXIST) * JOIN_STARTED YES(CS_OK) * JOIN_COMPLETED YES(CS_OK) */ enum cpd_state { CPD_STATE_UNJOINED, CPD_STATE_LEAVE_STARTED, CPD_STATE_JOIN_STARTED, CPD_STATE_JOIN_COMPLETED }; enum cpg_sync_state { CPGSYNC_DOWNLIST, CPGSYNC_JOINLIST }; enum cpg_downlist_state_e { CPG_DOWNLIST_NONE, CPG_DOWNLIST_WAITING_FOR_MESSAGES, CPG_DOWNLIST_APPLYING, }; static enum cpg_downlist_state_e downlist_state; static struct qb_list_head downlist_messages_head; static struct qb_list_head joinlist_messages_head; struct cpg_pd { void *conn; mar_cpg_name_t group_name; uint32_t pid; enum cpd_state cpd_state; unsigned int flags; int initial_totem_conf_sent; uint64_t transition_counter; /* These two are used when sending fragmented messages */ uint64_t initial_transition_counter; struct qb_list_head list; struct qb_list_head iteration_instance_list_head; struct qb_list_head zcb_mapped_list_head; }; struct cpg_iteration_instance { hdb_handle_t handle; struct qb_list_head list; struct qb_list_head items_list_head; /* List of process_info */ struct qb_list_head *current_pointer; }; DECLARE_HDB_DATABASE(cpg_iteration_handle_t_db,NULL); QB_LIST_DECLARE (cpg_pd_list_head); static unsigned int my_member_list[PROCESSOR_COUNT_MAX]; static unsigned int my_member_list_entries; static unsigned int my_old_member_list[PROCESSOR_COUNT_MAX]; static unsigned int my_old_member_list_entries = 0; static struct corosync_api_v1 *api = NULL; static enum cpg_sync_state my_sync_state = CPGSYNC_DOWNLIST; static mar_cpg_ring_id_t last_sync_ring_id; struct process_info { unsigned int nodeid; uint32_t pid; mar_cpg_name_t group; struct qb_list_head list; /* on the group_info members list */ }; QB_LIST_DECLARE (process_info_list_head); struct join_list_entry { uint32_t pid; mar_cpg_name_t group_name; }; /* * Service Interfaces required by service_message_handler struct */ static char *cpg_exec_init_fn (struct corosync_api_v1 *); static int cpg_lib_init_fn (void *conn); static int cpg_lib_exit_fn (void *conn); static void message_handler_req_exec_cpg_procjoin ( const void *message, unsigned int nodeid); static void message_handler_req_exec_cpg_procleave ( const void *message, unsigned int nodeid); static void message_handler_req_exec_cpg_joinlist ( const void *message, unsigned int nodeid); static void message_handler_req_exec_cpg_mcast ( const void *message, unsigned int nodeid); static void message_handler_req_exec_cpg_partial_mcast ( const void *message, unsigned int nodeid); static void message_handler_req_exec_cpg_downlist_old ( const void *message, unsigned int nodeid); static void message_handler_req_exec_cpg_downlist ( const void *message, unsigned int nodeid); static void exec_cpg_procjoin_endian_convert (void *msg); static void exec_cpg_joinlist_endian_convert (void *msg); static void exec_cpg_mcast_endian_convert (void *msg); static void exec_cpg_partial_mcast_endian_convert (void *msg); static void exec_cpg_downlist_endian_convert_old (void *msg); static void exec_cpg_downlist_endian_convert (void *msg); static void message_handler_req_lib_cpg_join (void *conn, const void *message); static void message_handler_req_lib_cpg_leave (void *conn, const void *message); static void message_handler_req_lib_cpg_finalize (void *conn, const void *message); static void message_handler_req_lib_cpg_mcast (void *conn, const void *message); static void message_handler_req_lib_cpg_partial_mcast (void *conn, const void *message); static void message_handler_req_lib_cpg_membership (void *conn, const void *message); static void message_handler_req_lib_cpg_local_get (void *conn, const void *message); static void message_handler_req_lib_cpg_iteration_initialize ( void *conn, const void *message); static void message_handler_req_lib_cpg_iteration_next ( void *conn, const void *message); static void message_handler_req_lib_cpg_iteration_finalize ( void *conn, const void *message); static void message_handler_req_lib_cpg_zc_alloc ( void *conn, const void *message); static void message_handler_req_lib_cpg_zc_free ( void *conn, const void *message); static void message_handler_req_lib_cpg_zc_execute ( void *conn, const void *message); static int cpg_node_joinleave_send (unsigned int pid, const mar_cpg_name_t *group_name, int fn, int reason); static int cpg_exec_send_downlist(void); static int cpg_exec_send_joinlist(void); static void downlist_messages_delete (void); static void downlist_master_choose_and_send (void); static void joinlist_inform_clients (void); static void joinlist_messages_delete (void); static void cpg_sync_init ( const unsigned int *trans_list, size_t trans_list_entries, const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id); static int cpg_sync_process (void); static void cpg_sync_activate (void); static void cpg_sync_abort (void); static void do_proc_join( const mar_cpg_name_t *name, uint32_t pid, unsigned int nodeid, int reason); static void do_proc_leave( const mar_cpg_name_t *name, uint32_t pid, unsigned int nodeid, int reason); static int notify_lib_totem_membership ( void *conn, int member_list_entries, const unsigned int *member_list); static inline int zcb_all_free ( struct cpg_pd *cpd); static char *cpg_print_group_name ( const mar_cpg_name_t *group); /* * Library Handler Definition */ static struct corosync_lib_handler cpg_lib_engine[] = { { /* 0 - MESSAGE_REQ_CPG_JOIN */ .lib_handler_fn = message_handler_req_lib_cpg_join, .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED }, { /* 1 - MESSAGE_REQ_CPG_LEAVE */ .lib_handler_fn = message_handler_req_lib_cpg_leave, .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED }, { /* 2 - MESSAGE_REQ_CPG_MCAST */ .lib_handler_fn = message_handler_req_lib_cpg_mcast, .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED }, { /* 3 - MESSAGE_REQ_CPG_MEMBERSHIP */ .lib_handler_fn = message_handler_req_lib_cpg_membership, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 4 - MESSAGE_REQ_CPG_LOCAL_GET */ .lib_handler_fn = message_handler_req_lib_cpg_local_get, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 5 - MESSAGE_REQ_CPG_ITERATIONINITIALIZE */ .lib_handler_fn = message_handler_req_lib_cpg_iteration_initialize, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 6 - MESSAGE_REQ_CPG_ITERATIONNEXT */ .lib_handler_fn = message_handler_req_lib_cpg_iteration_next, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 7 - MESSAGE_REQ_CPG_ITERATIONFINALIZE */ .lib_handler_fn = message_handler_req_lib_cpg_iteration_finalize, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 8 - MESSAGE_REQ_CPG_FINALIZE */ .lib_handler_fn = message_handler_req_lib_cpg_finalize, .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED }, { /* 9 */ .lib_handler_fn = message_handler_req_lib_cpg_zc_alloc, .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED }, { /* 10 */ .lib_handler_fn = message_handler_req_lib_cpg_zc_free, .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED }, { /* 11 */ .lib_handler_fn = message_handler_req_lib_cpg_zc_execute, .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED }, { /* 12 */ .lib_handler_fn = message_handler_req_lib_cpg_partial_mcast, .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED }, }; static struct corosync_exec_handler cpg_exec_engine[] = { { /* 0 - MESSAGE_REQ_EXEC_CPG_PROCJOIN */ .exec_handler_fn = message_handler_req_exec_cpg_procjoin, .exec_endian_convert_fn = exec_cpg_procjoin_endian_convert }, { /* 1 - MESSAGE_REQ_EXEC_CPG_PROCLEAVE */ .exec_handler_fn = message_handler_req_exec_cpg_procleave, .exec_endian_convert_fn = exec_cpg_procjoin_endian_convert }, { /* 2 - MESSAGE_REQ_EXEC_CPG_JOINLIST */ .exec_handler_fn = message_handler_req_exec_cpg_joinlist, .exec_endian_convert_fn = exec_cpg_joinlist_endian_convert }, { /* 3 - MESSAGE_REQ_EXEC_CPG_MCAST */ .exec_handler_fn = message_handler_req_exec_cpg_mcast, .exec_endian_convert_fn = exec_cpg_mcast_endian_convert }, { /* 4 - MESSAGE_REQ_EXEC_CPG_DOWNLIST_OLD */ .exec_handler_fn = message_handler_req_exec_cpg_downlist_old, .exec_endian_convert_fn = exec_cpg_downlist_endian_convert_old }, { /* 5 - MESSAGE_REQ_EXEC_CPG_DOWNLIST */ .exec_handler_fn = message_handler_req_exec_cpg_downlist, .exec_endian_convert_fn = exec_cpg_downlist_endian_convert }, { /* 6 - MESSAGE_REQ_EXEC_CPG_PARTIAL_MCAST */ .exec_handler_fn = message_handler_req_exec_cpg_partial_mcast, .exec_endian_convert_fn = exec_cpg_partial_mcast_endian_convert }, }; struct corosync_service_engine cpg_service_engine = { .name = "corosync cluster closed process group service v1.01", .id = CPG_SERVICE, .priority = 1, .private_data_size = sizeof (struct cpg_pd), .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED, .allow_inquorate = CS_LIB_ALLOW_INQUORATE, .lib_init_fn = cpg_lib_init_fn, .lib_exit_fn = cpg_lib_exit_fn, .lib_engine = cpg_lib_engine, .lib_engine_count = sizeof (cpg_lib_engine) / sizeof (struct corosync_lib_handler), .exec_init_fn = cpg_exec_init_fn, .exec_dump_fn = NULL, .exec_engine = cpg_exec_engine, .exec_engine_count = sizeof (cpg_exec_engine) / sizeof (struct corosync_exec_handler), .sync_init = cpg_sync_init, .sync_process = cpg_sync_process, .sync_activate = cpg_sync_activate, .sync_abort = cpg_sync_abort }; struct corosync_service_engine *cpg_get_service_engine_ver0 (void) { return (&cpg_service_engine); } struct req_exec_cpg_procjoin { struct qb_ipc_request_header header __attribute__((aligned(8))); mar_cpg_name_t group_name __attribute__((aligned(8))); mar_uint32_t pid __attribute__((aligned(8))); mar_uint32_t reason __attribute__((aligned(8))); }; struct req_exec_cpg_mcast { struct qb_ipc_request_header header __attribute__((aligned(8))); mar_cpg_name_t group_name __attribute__((aligned(8))); mar_uint32_t msglen __attribute__((aligned(8))); mar_uint32_t pid __attribute__((aligned(8))); mar_message_source_t source __attribute__((aligned(8))); mar_uint8_t message[] __attribute__((aligned(8))); }; struct req_exec_cpg_partial_mcast { struct qb_ipc_request_header header __attribute__((aligned(8))); mar_cpg_name_t group_name __attribute__((aligned(8))); mar_uint32_t msglen __attribute__((aligned(8))); mar_uint32_t fraglen __attribute__((aligned(8))); mar_uint32_t pid __attribute__((aligned(8))); mar_uint32_t type __attribute__((aligned(8))); mar_message_source_t source __attribute__((aligned(8))); mar_uint8_t message[] __attribute__((aligned(8))); }; struct req_exec_cpg_downlist_old { struct qb_ipc_request_header header __attribute__((aligned(8))); mar_uint32_t left_nodes __attribute__((aligned(8))); mar_uint32_t nodeids[PROCESSOR_COUNT_MAX] __attribute__((aligned(8))); }; struct req_exec_cpg_downlist { struct qb_ipc_request_header header __attribute__((aligned(8))); /* merge decisions */ mar_uint32_t old_members __attribute__((aligned(8))); /* downlist below */ mar_uint32_t left_nodes __attribute__((aligned(8))); mar_uint32_t nodeids[PROCESSOR_COUNT_MAX] __attribute__((aligned(8))); }; struct downlist_msg { mar_uint32_t sender_nodeid; mar_uint32_t old_members __attribute__((aligned(8))); mar_uint32_t left_nodes __attribute__((aligned(8))); mar_uint32_t nodeids[PROCESSOR_COUNT_MAX] __attribute__((aligned(8))); struct qb_list_head list; }; struct joinlist_msg { mar_uint32_t sender_nodeid; uint32_t pid; mar_cpg_name_t group_name; struct qb_list_head list; }; static struct req_exec_cpg_downlist g_req_exec_cpg_downlist; /* * Function print group name. It's not reentrant */ static char *cpg_print_group_name(const mar_cpg_name_t *group) { static char res[CPG_MAX_NAME_LENGTH * 4 + 1]; int dest_pos = 0; char c; int i; for (i = 0; i < group->length; i++) { c = group->value[i]; if (c >= ' ' && c < 0x7f && c != '\\') { res[dest_pos++] = c; } else { if (c == '\\') { res[dest_pos++] = '\\'; res[dest_pos++] = '\\'; } else { snprintf(res + dest_pos, sizeof(res) - dest_pos, "\\x%02X", c); dest_pos += 4; } } } res[dest_pos] = 0; return (res); } static void cpg_sync_init ( const unsigned int *trans_list, size_t trans_list_entries, const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id) { int entries; int i, j; int found; my_sync_state = CPGSYNC_DOWNLIST; memcpy (my_member_list, member_list, member_list_entries * sizeof (unsigned int)); my_member_list_entries = member_list_entries; last_sync_ring_id.nodeid = ring_id->rep.nodeid; last_sync_ring_id.seq = ring_id->seq; downlist_state = CPG_DOWNLIST_WAITING_FOR_MESSAGES; entries = 0; /* * Determine list of nodeids for downlist message */ for (i = 0; i < my_old_member_list_entries; i++) { found = 0; for (j = 0; j < trans_list_entries; j++) { if (my_old_member_list[i] == trans_list[j]) { found = 1; break; } } if (found == 0) { g_req_exec_cpg_downlist.nodeids[entries++] = my_old_member_list[i]; } } g_req_exec_cpg_downlist.left_nodes = entries; } static int cpg_sync_process (void) { int res = -1; if (my_sync_state == CPGSYNC_DOWNLIST) { res = cpg_exec_send_downlist(); if (res == -1) { return (-1); } my_sync_state = CPGSYNC_JOINLIST; } if (my_sync_state == CPGSYNC_JOINLIST) { res = cpg_exec_send_joinlist(); } return (res); } static void cpg_sync_activate (void) { memcpy (my_old_member_list, my_member_list, my_member_list_entries * sizeof (unsigned int)); my_old_member_list_entries = my_member_list_entries; if (downlist_state == CPG_DOWNLIST_WAITING_FOR_MESSAGES) { downlist_master_choose_and_send (); } joinlist_inform_clients (); downlist_messages_delete (); downlist_state = CPG_DOWNLIST_NONE; joinlist_messages_delete (); notify_lib_totem_membership (NULL, my_member_list_entries, my_member_list); } static void cpg_sync_abort (void) { downlist_state = CPG_DOWNLIST_NONE; downlist_messages_delete (); joinlist_messages_delete (); } static int notify_lib_totem_membership ( void *conn, int member_list_entries, const unsigned int *member_list) { struct qb_list_head *iter; char *buf; int size; struct res_lib_cpg_totem_confchg_callback *res; size = sizeof(struct res_lib_cpg_totem_confchg_callback) + sizeof(mar_uint32_t) * (member_list_entries); buf = alloca(size); if (!buf) return CS_ERR_LIBRARY; res = (struct res_lib_cpg_totem_confchg_callback *)buf; res->member_list_entries = member_list_entries; res->header.size = size; res->header.id = MESSAGE_RES_CPG_TOTEM_CONFCHG_CALLBACK; res->header.error = CS_OK; memcpy (&res->ring_id, &last_sync_ring_id, sizeof (mar_cpg_ring_id_t)); memcpy (res->member_list, member_list, res->member_list_entries * sizeof (mar_uint32_t)); if (conn == NULL) { qb_list_for_each(iter, &cpg_pd_list_head) { struct cpg_pd *cpg_pd = qb_list_entry (iter, struct cpg_pd, list); api->ipc_dispatch_send (cpg_pd->conn, buf, size); } } else { api->ipc_dispatch_send (conn, buf, size); } return CS_OK; } static int notify_lib_joinlist( const mar_cpg_name_t *group_name, void *conn, int joined_list_entries, mar_cpg_address_t *joined_list, int left_list_entries, mar_cpg_address_t *left_list, int id) { int size; char *buf; struct qb_list_head *iter; int count; struct res_lib_cpg_confchg_callback *res; mar_cpg_address_t *retgi; count = 0; qb_list_for_each(iter, &process_info_list_head) { struct process_info *pi = qb_list_entry (iter, struct process_info, list); if (mar_name_compare (&pi->group, group_name) == 0) { int i; int founded = 0; for (i = 0; i < left_list_entries; i++) { if (left_list[i].nodeid == pi->nodeid && left_list[i].pid == pi->pid) { founded++; } } if (!founded) count++; } } size = sizeof(struct res_lib_cpg_confchg_callback) + sizeof(mar_cpg_address_t) * (count + left_list_entries + joined_list_entries); buf = alloca(size); if (!buf) return CS_ERR_LIBRARY; res = (struct res_lib_cpg_confchg_callback *)buf; res->joined_list_entries = joined_list_entries; res->left_list_entries = left_list_entries; res->member_list_entries = count; retgi = res->member_list; res->header.size = size; res->header.id = id; res->header.error = CS_OK; memcpy(&res->group_name, group_name, sizeof(mar_cpg_name_t)); qb_list_for_each(iter, &process_info_list_head) { struct process_info *pi=qb_list_entry (iter, struct process_info, list); if (mar_name_compare (&pi->group, group_name) == 0) { int i; int founded = 0; for (i = 0;i < left_list_entries; i++) { if (left_list[i].nodeid == pi->nodeid && left_list[i].pid == pi->pid) { founded++; } } if (!founded) { retgi->nodeid = pi->nodeid; retgi->pid = pi->pid; retgi++; } } } if (left_list_entries) { memcpy (retgi, left_list, left_list_entries * sizeof(mar_cpg_address_t)); retgi += left_list_entries; } if (joined_list_entries) { memcpy (retgi, joined_list, joined_list_entries * sizeof(mar_cpg_address_t)); retgi += joined_list_entries; } if (conn) { api->ipc_dispatch_send (conn, buf, size); } else { qb_list_for_each(iter, &cpg_pd_list_head) { struct cpg_pd *cpd = qb_list_entry (iter, struct cpg_pd, list); if (mar_name_compare (&cpd->group_name, group_name) == 0) { assert (joined_list_entries <= 1); if (joined_list_entries) { if (joined_list[0].pid == cpd->pid && joined_list[0].nodeid == api->totem_nodeid_get()) { cpd->cpd_state = CPD_STATE_JOIN_COMPLETED; } } if (cpd->cpd_state == CPD_STATE_JOIN_COMPLETED || cpd->cpd_state == CPD_STATE_LEAVE_STARTED) { api->ipc_dispatch_send (cpd->conn, buf, size); cpd->transition_counter++; } if (left_list_entries) { if (left_list[0].pid == cpd->pid && left_list[0].nodeid == api->totem_nodeid_get() && left_list[0].reason == CONFCHG_CPG_REASON_LEAVE) { cpd->pid = 0; memset (&cpd->group_name, 0, sizeof(cpd->group_name)); cpd->cpd_state = CPD_STATE_UNJOINED; } } } } } /* * Traverse thru cpds and send totem membership for cpd, where it is not send yet */ qb_list_for_each(iter, &cpg_pd_list_head) { struct cpg_pd *cpd = qb_list_entry (iter, struct cpg_pd, list); if ((cpd->flags & CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF) && (cpd->initial_totem_conf_sent == 0)) { cpd->initial_totem_conf_sent = 1; notify_lib_totem_membership (cpd->conn, my_old_member_list_entries, my_old_member_list); } } return CS_OK; } static void downlist_log(const char *msg, struct downlist_msg* dl) { log_printf (LOG_DEBUG, "%s: sender %s; members(old:%d left:%d)", msg, api->totem_ifaces_print(dl->sender_nodeid), dl->old_members, dl->left_nodes); } static struct downlist_msg* downlist_master_choose (void) { struct downlist_msg *cmp; struct downlist_msg *best = NULL; struct qb_list_head *iter; uint32_t cmp_members; uint32_t best_members; uint32_t i; int ignore_msg; qb_list_for_each(iter, &downlist_messages_head) { cmp = qb_list_entry(iter, struct downlist_msg, list); downlist_log("comparing", cmp); ignore_msg = 0; for (i = 0; i < cmp->left_nodes; i++) { if (cmp->nodeids[i] == api->totem_nodeid_get()) { log_printf (LOG_DEBUG, "Ignoring this entry because I'm in the left list\n"); ignore_msg = 1; break; } } if (ignore_msg) { continue ; } if (best == NULL) { best = cmp; continue; } best_members = best->old_members - best->left_nodes; cmp_members = cmp->old_members - cmp->left_nodes; if (cmp_members > best_members) { best = cmp; } else if (cmp_members == best_members) { if (cmp->old_members > best->old_members) { best = cmp; } else if (cmp->old_members == best->old_members) { if (cmp->sender_nodeid < best->sender_nodeid) { best = cmp; } } } } assert (best != NULL); return best; } static void downlist_master_choose_and_send (void) { struct downlist_msg *stored_msg; struct qb_list_head *iter, *tmp_iter; struct process_info *left_pi; qb_map_t *group_map; struct cpg_name cpg_group; mar_cpg_name_t group; struct confchg_data{ struct cpg_name cpg_group; mar_cpg_address_t left_list[CPG_MEMBERS_MAX]; int left_list_entries; struct qb_list_head list; } *pcd; qb_map_iter_t *miter; int i, size; downlist_state = CPG_DOWNLIST_APPLYING; stored_msg = downlist_master_choose (); if (!stored_msg) { log_printf (LOGSYS_LEVEL_DEBUG, "NO chosen downlist"); return; } downlist_log("chosen downlist", stored_msg); group_map = qb_skiplist_create(); /* * only the cpg groups included in left nodes should receive * confchg event, so we will collect these cpg groups and * relative left_lists here. */ qb_list_for_each_safe(iter, tmp_iter, &process_info_list_head) { struct process_info *pi = qb_list_entry(iter, struct process_info, list); left_pi = NULL; for (i = 0; i < stored_msg->left_nodes; i++) { if (pi->nodeid == stored_msg->nodeids[i]) { left_pi = pi; break; } } if (left_pi) { marshall_from_mar_cpg_name_t(&cpg_group, &left_pi->group); cpg_group.value[cpg_group.length] = 0; pcd = (struct confchg_data *)qb_map_get(group_map, cpg_group.value); if (pcd == NULL) { pcd = (struct confchg_data *)calloc(1, sizeof(struct confchg_data)); memcpy(&pcd->cpg_group, &cpg_group, sizeof(struct cpg_name)); qb_map_put(group_map, pcd->cpg_group.value, pcd); } size = pcd->left_list_entries; pcd->left_list[size].nodeid = left_pi->nodeid; pcd->left_list[size].pid = left_pi->pid; pcd->left_list[size].reason = CONFCHG_CPG_REASON_NODEDOWN; pcd->left_list_entries++; qb_list_del (&left_pi->list); free (left_pi); } } /* send only one confchg event per cpg group */ miter = qb_map_iter_create(group_map); while (qb_map_iter_next(miter, (void **)&pcd)) { marshall_to_mar_cpg_name_t(&group, &pcd->cpg_group); log_printf (LOG_DEBUG, "left_list_entries:%d", pcd->left_list_entries); for (i=0; ileft_list_entries; i++) { log_printf (LOG_DEBUG, "left_list[%d] group:%s, ip:%s, pid:%d", i, cpg_print_group_name(&group), (char*)api->totem_ifaces_print(pcd->left_list[i].nodeid), pcd->left_list[i].pid); } /* send confchg event */ notify_lib_joinlist(&group, NULL, 0, NULL, pcd->left_list_entries, pcd->left_list, MESSAGE_RES_CPG_CONFCHG_CALLBACK); free(pcd); } qb_map_iter_free(miter); qb_map_destroy(group_map); } /* * Remove processes that might have left the group while we were suspended. */ static void joinlist_remove_zombie_pi_entries (void) { struct qb_list_head *pi_iter, *tmp_iter; struct qb_list_head *jl_iter; struct process_info *pi; struct joinlist_msg *stored_msg; int found; qb_list_for_each_safe(pi_iter, tmp_iter, &process_info_list_head) { pi = qb_list_entry (pi_iter, struct process_info, list); /* * Ignore local node */ if (pi->nodeid == api->totem_nodeid_get()) { continue ; } /* * Try to find message in joinlist messages */ found = 0; qb_list_for_each(jl_iter, &joinlist_messages_head) { stored_msg = qb_list_entry(jl_iter, struct joinlist_msg, list); if (stored_msg->sender_nodeid == api->totem_nodeid_get()) { continue ; } if (pi->nodeid == stored_msg->sender_nodeid && pi->pid == stored_msg->pid && mar_name_compare (&pi->group, &stored_msg->group_name) == 0) { found = 1; break ; } } if (!found) { do_proc_leave(&pi->group, pi->pid, pi->nodeid, CONFCHG_CPG_REASON_PROCDOWN); } } } static void joinlist_inform_clients (void) { struct joinlist_msg *stored_msg; struct qb_list_head *iter; unsigned int i; i = 0; qb_list_for_each(iter, &joinlist_messages_head) { stored_msg = qb_list_entry(iter, struct joinlist_msg, list); log_printf (LOG_DEBUG, "joinlist_messages[%u] group:%s, ip:%s, pid:%d", i++, cpg_print_group_name(&stored_msg->group_name), (char*)api->totem_ifaces_print(stored_msg->sender_nodeid), stored_msg->pid); /* Ignore our own messages */ if (stored_msg->sender_nodeid == api->totem_nodeid_get()) { continue ; } do_proc_join (&stored_msg->group_name, stored_msg->pid, stored_msg->sender_nodeid, CONFCHG_CPG_REASON_NODEUP); } joinlist_remove_zombie_pi_entries (); } static void downlist_messages_delete (void) { struct downlist_msg *stored_msg; struct qb_list_head *iter, *tmp_iter; qb_list_for_each_safe(iter, tmp_iter, &downlist_messages_head) { stored_msg = qb_list_entry(iter, struct downlist_msg, list); qb_list_del (&stored_msg->list); free (stored_msg); } } static void joinlist_messages_delete (void) { struct joinlist_msg *stored_msg; struct qb_list_head *iter, *tmp_iter; qb_list_for_each_safe(iter, tmp_iter, &joinlist_messages_head) { stored_msg = qb_list_entry(iter, struct joinlist_msg, list); qb_list_del (&stored_msg->list); free (stored_msg); } qb_list_init (&joinlist_messages_head); } static char *cpg_exec_init_fn (struct corosync_api_v1 *corosync_api) { qb_list_init (&downlist_messages_head); qb_list_init (&joinlist_messages_head); api = corosync_api; return (NULL); } static void cpg_iteration_instance_finalize (struct cpg_iteration_instance *cpg_iteration_instance) { struct qb_list_head *iter, *tmp_iter; struct process_info *pi; qb_list_for_each_safe(iter, tmp_iter, &(cpg_iteration_instance->items_list_head)) { pi = qb_list_entry (iter, struct process_info, list); qb_list_del (&pi->list); free (pi); } qb_list_del (&cpg_iteration_instance->list); hdb_handle_destroy (&cpg_iteration_handle_t_db, cpg_iteration_instance->handle); } static void cpg_pd_finalize (struct cpg_pd *cpd) { struct qb_list_head *iter, *tmp_iter; struct cpg_iteration_instance *cpii; zcb_all_free(cpd); qb_list_for_each_safe(iter, tmp_iter, &(cpd->iteration_instance_list_head)) { cpii = qb_list_entry (iter, struct cpg_iteration_instance, list); cpg_iteration_instance_finalize (cpii); } qb_list_del (&cpd->list); } static int cpg_lib_exit_fn (void *conn) { struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn); log_printf(LOGSYS_LEVEL_DEBUG, "exit_fn for conn=%p", conn); if (cpd->group_name.length > 0 && cpd->cpd_state != CPD_STATE_LEAVE_STARTED) { cpg_node_joinleave_send (cpd->pid, &cpd->group_name, MESSAGE_REQ_EXEC_CPG_PROCLEAVE, CONFCHG_CPG_REASON_PROCDOWN); } cpg_pd_finalize (cpd); api->ipc_refcnt_dec (conn); return (0); } static int cpg_node_joinleave_send (unsigned int pid, const mar_cpg_name_t *group_name, int fn, int reason) { struct req_exec_cpg_procjoin req_exec_cpg_procjoin; struct iovec req_exec_cpg_iovec; int result; memcpy(&req_exec_cpg_procjoin.group_name, group_name, sizeof(mar_cpg_name_t)); req_exec_cpg_procjoin.pid = pid; req_exec_cpg_procjoin.reason = reason; req_exec_cpg_procjoin.header.size = sizeof(req_exec_cpg_procjoin); req_exec_cpg_procjoin.header.id = SERVICE_ID_MAKE(CPG_SERVICE, fn); req_exec_cpg_iovec.iov_base = (char *)&req_exec_cpg_procjoin; req_exec_cpg_iovec.iov_len = sizeof(req_exec_cpg_procjoin); result = api->totem_mcast (&req_exec_cpg_iovec, 1, TOTEM_AGREED); return (result); } /* Can byteswap join & leave messages */ static void exec_cpg_procjoin_endian_convert (void *msg) { struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = msg; req_exec_cpg_procjoin->pid = swab32(req_exec_cpg_procjoin->pid); swab_mar_cpg_name_t (&req_exec_cpg_procjoin->group_name); req_exec_cpg_procjoin->reason = swab32(req_exec_cpg_procjoin->reason); } static void exec_cpg_joinlist_endian_convert (void *msg_v) { char *msg = msg_v; struct qb_ipc_response_header *res = (struct qb_ipc_response_header *)msg; struct join_list_entry *jle = (struct join_list_entry *)(msg + sizeof(struct qb_ipc_response_header)); swab_mar_int32_t (&res->size); while ((const char*)jle < msg + res->size) { jle->pid = swab32(jle->pid); swab_mar_cpg_name_t (&jle->group_name); jle++; } } static void exec_cpg_downlist_endian_convert_old (void *msg) { } static void exec_cpg_downlist_endian_convert (void *msg) { struct req_exec_cpg_downlist *req_exec_cpg_downlist = msg; unsigned int i; req_exec_cpg_downlist->left_nodes = swab32(req_exec_cpg_downlist->left_nodes); req_exec_cpg_downlist->old_members = swab32(req_exec_cpg_downlist->old_members); for (i = 0; i < req_exec_cpg_downlist->left_nodes; i++) { req_exec_cpg_downlist->nodeids[i] = swab32(req_exec_cpg_downlist->nodeids[i]); } } static void exec_cpg_mcast_endian_convert (void *msg) { struct req_exec_cpg_mcast *req_exec_cpg_mcast = msg; swab_coroipc_request_header_t (&req_exec_cpg_mcast->header); swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name); req_exec_cpg_mcast->pid = swab32(req_exec_cpg_mcast->pid); req_exec_cpg_mcast->msglen = swab32(req_exec_cpg_mcast->msglen); swab_mar_message_source_t (&req_exec_cpg_mcast->source); } static void exec_cpg_partial_mcast_endian_convert (void *msg) { struct req_exec_cpg_partial_mcast *req_exec_cpg_mcast = msg; swab_coroipc_request_header_t (&req_exec_cpg_mcast->header); swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name); req_exec_cpg_mcast->pid = swab32(req_exec_cpg_mcast->pid); req_exec_cpg_mcast->msglen = swab32(req_exec_cpg_mcast->msglen); req_exec_cpg_mcast->fraglen = swab32(req_exec_cpg_mcast->fraglen); req_exec_cpg_mcast->type = swab32(req_exec_cpg_mcast->type); swab_mar_message_source_t (&req_exec_cpg_mcast->source); } static struct process_info *process_info_find(const mar_cpg_name_t *group_name, uint32_t pid, unsigned int nodeid) { struct qb_list_head *iter; qb_list_for_each(iter, &process_info_list_head) { struct process_info *pi = qb_list_entry (iter, struct process_info, list); if (pi->pid == pid && pi->nodeid == nodeid && mar_name_compare (&pi->group, group_name) == 0) { return pi; } } return NULL; } static void do_proc_join( const mar_cpg_name_t *name, uint32_t pid, unsigned int nodeid, int reason) { struct process_info *pi; struct process_info *pi_entry; mar_cpg_address_t notify_info; struct qb_list_head *list; struct qb_list_head *list_to_add = NULL; if (process_info_find (name, pid, nodeid) != NULL) { return ; } pi = malloc (sizeof (struct process_info)); if (!pi) { log_printf(LOGSYS_LEVEL_WARNING, "Unable to allocate process_info struct"); return; } pi->nodeid = nodeid; pi->pid = pid; memcpy(&pi->group, name, sizeof(*name)); qb_list_init(&pi->list); /* * Insert new process in sorted order so synchronization works properly */ list_to_add = &process_info_list_head; qb_list_for_each(list, &process_info_list_head) { pi_entry = qb_list_entry(list, struct process_info, list); if (pi_entry->nodeid > pi->nodeid || (pi_entry->nodeid == pi->nodeid && pi_entry->pid > pi->pid)) { break; } list_to_add = list; } qb_list_add (&pi->list, list_to_add); notify_info.pid = pi->pid; notify_info.nodeid = nodeid; notify_info.reason = reason; notify_lib_joinlist(&pi->group, NULL, 1, ¬ify_info, 0, NULL, MESSAGE_RES_CPG_CONFCHG_CALLBACK); } static void do_proc_leave( const mar_cpg_name_t *name, uint32_t pid, unsigned int nodeid, int reason) { struct process_info *pi; struct qb_list_head *iter, *tmp_iter; mar_cpg_address_t notify_info; notify_info.pid = pid; notify_info.nodeid = nodeid; notify_info.reason = reason; notify_lib_joinlist(name, NULL, 0, NULL, 1, ¬ify_info, MESSAGE_RES_CPG_CONFCHG_CALLBACK); qb_list_for_each_safe(iter, tmp_iter, &process_info_list_head) { pi = qb_list_entry(iter, struct process_info, list); if (pi->pid == pid && pi->nodeid == nodeid && mar_name_compare (&pi->group, name)==0) { qb_list_del (&pi->list); free (pi); } } } static void message_handler_req_exec_cpg_downlist_old ( const void *message, unsigned int nodeid) { log_printf (LOGSYS_LEVEL_WARNING, "downlist OLD from node 0x%x", nodeid); } static void message_handler_req_exec_cpg_downlist( const void *message, unsigned int nodeid) { const struct req_exec_cpg_downlist *req_exec_cpg_downlist = message; int i; struct qb_list_head *iter; struct downlist_msg *stored_msg; int found; if (downlist_state != CPG_DOWNLIST_WAITING_FOR_MESSAGES) { log_printf (LOGSYS_LEVEL_WARNING, "downlist left_list: %d received in state %d", req_exec_cpg_downlist->left_nodes, downlist_state); return; } stored_msg = malloc (sizeof (struct downlist_msg)); stored_msg->sender_nodeid = nodeid; stored_msg->old_members = req_exec_cpg_downlist->old_members; stored_msg->left_nodes = req_exec_cpg_downlist->left_nodes; memcpy (stored_msg->nodeids, req_exec_cpg_downlist->nodeids, req_exec_cpg_downlist->left_nodes * sizeof (mar_uint32_t)); qb_list_init (&stored_msg->list); qb_list_add (&stored_msg->list, &downlist_messages_head); for (i = 0; i < my_member_list_entries; i++) { found = 0; qb_list_for_each(iter, &downlist_messages_head) { stored_msg = qb_list_entry(iter, struct downlist_msg, list); if (my_member_list[i] == stored_msg->sender_nodeid) { found = 1; } } if (!found) { return; } } downlist_master_choose_and_send (); } static void message_handler_req_exec_cpg_procjoin ( const void *message, unsigned int nodeid) { const struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = message; log_printf(LOGSYS_LEVEL_DEBUG, "got procjoin message from cluster node 0x%x (%s) for pid %u", nodeid, api->totem_ifaces_print(nodeid), (unsigned int)req_exec_cpg_procjoin->pid); do_proc_join (&req_exec_cpg_procjoin->group_name, req_exec_cpg_procjoin->pid, nodeid, CONFCHG_CPG_REASON_JOIN); } static void message_handler_req_exec_cpg_procleave ( const void *message, unsigned int nodeid) { const struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = message; log_printf(LOGSYS_LEVEL_DEBUG, "got procleave message from cluster node 0x%x (%s) for pid %u", nodeid, api->totem_ifaces_print(nodeid), (unsigned int)req_exec_cpg_procjoin->pid); do_proc_leave (&req_exec_cpg_procjoin->group_name, req_exec_cpg_procjoin->pid, nodeid, req_exec_cpg_procjoin->reason); } /* Got a proclist from another node */ static void message_handler_req_exec_cpg_joinlist ( const void *message_v, unsigned int nodeid) { const char *message = message_v; const struct qb_ipc_response_header *res = (const struct qb_ipc_response_header *)message; const struct join_list_entry *jle = (const struct join_list_entry *)(message + sizeof(struct qb_ipc_response_header)); struct joinlist_msg *stored_msg; log_printf(LOGSYS_LEVEL_DEBUG, "got joinlist message from node 0x%x", nodeid); while ((const char*)jle < message + res->size) { stored_msg = malloc (sizeof (struct joinlist_msg)); memset(stored_msg, 0, sizeof (struct joinlist_msg)); stored_msg->sender_nodeid = nodeid; stored_msg->pid = jle->pid; memcpy(&stored_msg->group_name, &jle->group_name, sizeof(mar_cpg_name_t)); qb_list_init (&stored_msg->list); qb_list_add (&stored_msg->list, &joinlist_messages_head); jle++; } } static void message_handler_req_exec_cpg_mcast ( const void *message, unsigned int nodeid) { const struct req_exec_cpg_mcast *req_exec_cpg_mcast = message; struct res_lib_cpg_deliver_callback res_lib_cpg_mcast; int msglen = req_exec_cpg_mcast->msglen; struct qb_list_head *iter, *pi_iter, *tmp_iter; struct cpg_pd *cpd; struct iovec iovec[2]; int known_node = 0; res_lib_cpg_mcast.header.id = MESSAGE_RES_CPG_DELIVER_CALLBACK; res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast) + msglen; res_lib_cpg_mcast.msglen = msglen; res_lib_cpg_mcast.pid = req_exec_cpg_mcast->pid; res_lib_cpg_mcast.nodeid = nodeid; memcpy(&res_lib_cpg_mcast.group_name, &req_exec_cpg_mcast->group_name, sizeof(mar_cpg_name_t)); iovec[0].iov_base = (void *)&res_lib_cpg_mcast; iovec[0].iov_len = sizeof (res_lib_cpg_mcast); iovec[1].iov_base = (char*)message+sizeof(*req_exec_cpg_mcast); iovec[1].iov_len = msglen; qb_list_for_each_safe(iter, tmp_iter, &cpg_pd_list_head) { cpd = qb_list_entry(iter, struct cpg_pd, list); if ((cpd->cpd_state == CPD_STATE_LEAVE_STARTED || cpd->cpd_state == CPD_STATE_JOIN_COMPLETED) && (mar_name_compare (&cpd->group_name, &req_exec_cpg_mcast->group_name) == 0)) { if (!known_node) { /* Try to find, if we know the node */ qb_list_for_each(pi_iter, &process_info_list_head) { struct process_info *pi = qb_list_entry (pi_iter, struct process_info, list); if (pi->nodeid == nodeid && mar_name_compare (&pi->group, &req_exec_cpg_mcast->group_name) == 0) { known_node = 1; break; } } } if (!known_node) { log_printf(LOGSYS_LEVEL_WARNING, "Unknown node -> we will not deliver message"); return ; } api->ipc_dispatch_iov_send (cpd->conn, iovec, 2); } } } static void message_handler_req_exec_cpg_partial_mcast ( const void *message, unsigned int nodeid) { const struct req_exec_cpg_partial_mcast *req_exec_cpg_mcast = message; struct res_lib_cpg_partial_deliver_callback res_lib_cpg_mcast; int msglen = req_exec_cpg_mcast->fraglen; struct qb_list_head *iter, *pi_iter, *tmp_iter; struct cpg_pd *cpd; struct iovec iovec[2]; int known_node = 0; log_printf(LOGSYS_LEVEL_DEBUG, "Got fragmented message from node %d, size = %d bytes\n", nodeid, msglen); res_lib_cpg_mcast.header.id = MESSAGE_RES_CPG_PARTIAL_DELIVER_CALLBACK; res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast) + msglen; res_lib_cpg_mcast.fraglen = msglen; res_lib_cpg_mcast.msglen = req_exec_cpg_mcast->msglen; res_lib_cpg_mcast.pid = req_exec_cpg_mcast->pid; res_lib_cpg_mcast.type = req_exec_cpg_mcast->type; res_lib_cpg_mcast.nodeid = nodeid; memcpy(&res_lib_cpg_mcast.group_name, &req_exec_cpg_mcast->group_name, sizeof(mar_cpg_name_t)); iovec[0].iov_base = (void *)&res_lib_cpg_mcast; iovec[0].iov_len = sizeof (res_lib_cpg_mcast); iovec[1].iov_base = (char*)message+sizeof(*req_exec_cpg_mcast); iovec[1].iov_len = msglen; qb_list_for_each_safe(iter, tmp_iter, &cpg_pd_list_head) { cpd = qb_list_entry(iter, struct cpg_pd, list); if ((cpd->cpd_state == CPD_STATE_LEAVE_STARTED || cpd->cpd_state == CPD_STATE_JOIN_COMPLETED) && (mar_name_compare (&cpd->group_name, &req_exec_cpg_mcast->group_name) == 0)) { if (!known_node) { /* Try to find, if we know the node */ qb_list_for_each(pi_iter, &process_info_list_head) { struct process_info *pi = qb_list_entry (pi_iter, struct process_info, list); if (pi->nodeid == nodeid && mar_name_compare (&pi->group, &req_exec_cpg_mcast->group_name) == 0) { known_node = 1; break; } } } if (!known_node) { log_printf(LOGSYS_LEVEL_WARNING, "Unknown node -> we will not deliver message"); return ; } api->ipc_dispatch_iov_send (cpd->conn, iovec, 2); } } } static int cpg_exec_send_downlist(void) { struct iovec iov; g_req_exec_cpg_downlist.header.id = SERVICE_ID_MAKE(CPG_SERVICE, MESSAGE_REQ_EXEC_CPG_DOWNLIST); g_req_exec_cpg_downlist.header.size = sizeof(struct req_exec_cpg_downlist); g_req_exec_cpg_downlist.old_members = my_old_member_list_entries; iov.iov_base = (void *)&g_req_exec_cpg_downlist; iov.iov_len = g_req_exec_cpg_downlist.header.size; return (api->totem_mcast (&iov, 1, TOTEM_AGREED)); } static int cpg_exec_send_joinlist(void) { int count = 0; struct qb_list_head *iter; struct qb_ipc_response_header *res; char *buf; struct join_list_entry *jle; struct iovec req_exec_cpg_iovec; qb_list_for_each(iter, &process_info_list_head) { struct process_info *pi = qb_list_entry (iter, struct process_info, list); if (pi->nodeid == api->totem_nodeid_get ()) { count++; } } /* Nothing to send */ if (!count) return 0; buf = alloca(sizeof(struct qb_ipc_response_header) + sizeof(struct join_list_entry) * count); if (!buf) { log_printf(LOGSYS_LEVEL_WARNING, "Unable to allocate joinlist buffer"); return -1; } jle = (struct join_list_entry *)(buf + sizeof(struct qb_ipc_response_header)); res = (struct qb_ipc_response_header *)buf; qb_list_for_each(iter, &process_info_list_head) { struct process_info *pi = qb_list_entry (iter, struct process_info, list); if (pi->nodeid == api->totem_nodeid_get ()) { memcpy (&jle->group_name, &pi->group, sizeof (mar_cpg_name_t)); jle->pid = pi->pid; jle++; } } res->id = SERVICE_ID_MAKE(CPG_SERVICE, MESSAGE_REQ_EXEC_CPG_JOINLIST); res->size = sizeof(struct qb_ipc_response_header)+sizeof(struct join_list_entry) * count; req_exec_cpg_iovec.iov_base = buf; req_exec_cpg_iovec.iov_len = res->size; return (api->totem_mcast (&req_exec_cpg_iovec, 1, TOTEM_AGREED)); } static int cpg_lib_init_fn (void *conn) { struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn); memset (cpd, 0, sizeof(struct cpg_pd)); cpd->conn = conn; qb_list_add (&cpd->list, &cpg_pd_list_head); qb_list_init (&cpd->iteration_instance_list_head); qb_list_init (&cpd->zcb_mapped_list_head); api->ipc_refcnt_inc (conn); log_printf(LOGSYS_LEVEL_DEBUG, "lib_init_fn: conn=%p, cpd=%p", conn, cpd); return (0); } /* Join message from the library */ static void message_handler_req_lib_cpg_join (void *conn, const void *message) { const struct req_lib_cpg_join *req_lib_cpg_join = message; struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn); struct res_lib_cpg_join res_lib_cpg_join; cs_error_t error = CS_OK; struct qb_list_head *iter; /* Test, if we don't have same pid and group name joined */ qb_list_for_each(iter, &cpg_pd_list_head) { struct cpg_pd *cpd_item = qb_list_entry (iter, struct cpg_pd, list); if (cpd_item->pid == req_lib_cpg_join->pid && mar_name_compare(&req_lib_cpg_join->group_name, &cpd_item->group_name) == 0) { /* We have same pid and group name joined -> return error */ error = CS_ERR_EXIST; goto response_send; } } /* * Same check must be done in process info list, because there may be not yet delivered * leave of client. */ qb_list_for_each(iter, &process_info_list_head) { struct process_info *pi = qb_list_entry (iter, struct process_info, list); if (pi->nodeid == api->totem_nodeid_get () && pi->pid == req_lib_cpg_join->pid && mar_name_compare(&req_lib_cpg_join->group_name, &pi->group) == 0) { /* We have same pid and group name joined -> return error */ error = CS_ERR_TRY_AGAIN; goto response_send; } } if (req_lib_cpg_join->group_name.length > CPG_MAX_NAME_LENGTH) { error = CS_ERR_NAME_TOO_LONG; goto response_send; } switch (cpd->cpd_state) { case CPD_STATE_UNJOINED: error = CS_OK; cpd->cpd_state = CPD_STATE_JOIN_STARTED; cpd->pid = req_lib_cpg_join->pid; cpd->flags = req_lib_cpg_join->flags; memcpy (&cpd->group_name, &req_lib_cpg_join->group_name, sizeof (cpd->group_name)); cpg_node_joinleave_send (req_lib_cpg_join->pid, &req_lib_cpg_join->group_name, MESSAGE_REQ_EXEC_CPG_PROCJOIN, CONFCHG_CPG_REASON_JOIN); break; case CPD_STATE_LEAVE_STARTED: error = CS_ERR_BUSY; break; case CPD_STATE_JOIN_STARTED: error = CS_ERR_EXIST; break; case CPD_STATE_JOIN_COMPLETED: error = CS_ERR_EXIST; break; } response_send: res_lib_cpg_join.header.size = sizeof(res_lib_cpg_join); res_lib_cpg_join.header.id = MESSAGE_RES_CPG_JOIN; res_lib_cpg_join.header.error = error; api->ipc_response_send (conn, &res_lib_cpg_join, sizeof(res_lib_cpg_join)); } /* Leave message from the library */ static void message_handler_req_lib_cpg_leave (void *conn, const void *message) { struct res_lib_cpg_leave res_lib_cpg_leave; cs_error_t error = CS_OK; struct req_lib_cpg_leave *req_lib_cpg_leave = (struct req_lib_cpg_leave *)message; struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn); log_printf(LOGSYS_LEVEL_DEBUG, "got leave request on %p", conn); switch (cpd->cpd_state) { case CPD_STATE_UNJOINED: error = CS_ERR_NOT_EXIST; break; case CPD_STATE_LEAVE_STARTED: error = CS_ERR_NOT_EXIST; break; case CPD_STATE_JOIN_STARTED: error = CS_ERR_BUSY; break; case CPD_STATE_JOIN_COMPLETED: error = CS_OK; cpd->cpd_state = CPD_STATE_LEAVE_STARTED; cpg_node_joinleave_send (req_lib_cpg_leave->pid, &req_lib_cpg_leave->group_name, MESSAGE_REQ_EXEC_CPG_PROCLEAVE, CONFCHG_CPG_REASON_LEAVE); break; } /* send return */ res_lib_cpg_leave.header.size = sizeof(res_lib_cpg_leave); res_lib_cpg_leave.header.id = MESSAGE_RES_CPG_LEAVE; res_lib_cpg_leave.header.error = error; api->ipc_response_send(conn, &res_lib_cpg_leave, sizeof(res_lib_cpg_leave)); } /* Finalize message from library */ static void message_handler_req_lib_cpg_finalize ( void *conn, const void *message) { struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn); struct res_lib_cpg_finalize res_lib_cpg_finalize; cs_error_t error = CS_OK; log_printf (LOGSYS_LEVEL_DEBUG, "cpg finalize for conn=%p", conn); /* * We will just remove cpd from list. After this call, connection will be * closed on lib side, and cpg_lib_exit_fn will be called */ qb_list_del (&cpd->list); qb_list_init (&cpd->list); res_lib_cpg_finalize.header.size = sizeof (res_lib_cpg_finalize); res_lib_cpg_finalize.header.id = MESSAGE_RES_CPG_FINALIZE; res_lib_cpg_finalize.header.error = error; api->ipc_response_send (conn, &res_lib_cpg_finalize, sizeof (res_lib_cpg_finalize)); } static int memory_map ( const char *path, size_t bytes, void **buf) { int32_t fd; void *addr; int32_t res; fd = open (path, O_RDWR, 0600); unlink (path); if (fd == -1) { return (-1); } res = ftruncate (fd, bytes); if (res == -1) { goto error_close_unlink; } addr = mmap (NULL, bytes, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); if (addr == MAP_FAILED) { goto error_close_unlink; } #ifdef MADV_NOSYNC madvise(addr, bytes, MADV_NOSYNC); #endif res = close (fd); if (res) { munmap (addr, bytes); return (-1); } *buf = addr; return (0); error_close_unlink: close (fd); unlink(path); return -1; } static inline int zcb_alloc ( struct cpg_pd *cpd, const char *path_to_file, size_t size, void **addr) { struct zcb_mapped *zcb_mapped; unsigned int res; zcb_mapped = malloc (sizeof (struct zcb_mapped)); if (zcb_mapped == NULL) { return (-1); } res = memory_map ( path_to_file, size, addr); if (res == -1) { free (zcb_mapped); return (-1); } qb_list_init (&zcb_mapped->list); zcb_mapped->addr = *addr; zcb_mapped->size = size; qb_list_add_tail (&zcb_mapped->list, &cpd->zcb_mapped_list_head); return (0); } static inline int zcb_free (struct zcb_mapped *zcb_mapped) { unsigned int res; res = munmap (zcb_mapped->addr, zcb_mapped->size); qb_list_del (&zcb_mapped->list); free (zcb_mapped); return (res); } static inline int zcb_by_addr_free (struct cpg_pd *cpd, void *addr) { struct qb_list_head *list, *tmp_iter; struct zcb_mapped *zcb_mapped; unsigned int res = 0; qb_list_for_each_safe(list, tmp_iter, &(cpd->zcb_mapped_list_head)) { zcb_mapped = qb_list_entry (list, struct zcb_mapped, list); if (zcb_mapped->addr == addr) { res = zcb_free (zcb_mapped); break; } } return (res); } static inline int zcb_all_free ( struct cpg_pd *cpd) { struct qb_list_head *list, *tmp_iter; struct zcb_mapped *zcb_mapped; qb_list_for_each_safe(list, tmp_iter, &(cpd->zcb_mapped_list_head)) { zcb_mapped = qb_list_entry (list, struct zcb_mapped, list); zcb_free (zcb_mapped); } return (0); } union u { uint64_t server_addr; void *server_ptr; }; static uint64_t void2serveraddr (void *server_ptr) { union u u; u.server_ptr = server_ptr; return (u.server_addr); } static void *serveraddr2void (uint64_t server_addr) { union u u; u.server_addr = server_addr; return (u.server_ptr); }; static void message_handler_req_lib_cpg_zc_alloc ( void *conn, const void *message) { mar_req_coroipcc_zc_alloc_t *hdr = (mar_req_coroipcc_zc_alloc_t *)message; struct qb_ipc_response_header res_header; void *addr = NULL; struct coroipcs_zc_header *zc_header; unsigned int res; struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn); log_printf(LOGSYS_LEVEL_DEBUG, "path: %s", hdr->path_to_file); res = zcb_alloc (cpd, hdr->path_to_file, hdr->map_size, &addr); assert(res == 0); zc_header = (struct coroipcs_zc_header *)addr; zc_header->server_address = void2serveraddr(addr); res_header.size = sizeof (struct qb_ipc_response_header); res_header.id = 0; api->ipc_response_send (conn, &res_header, res_header.size); } static void message_handler_req_lib_cpg_zc_free ( void *conn, const void *message) { mar_req_coroipcc_zc_free_t *hdr = (mar_req_coroipcc_zc_free_t *)message; struct qb_ipc_response_header res_header; void *addr = NULL; struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn); log_printf(LOGSYS_LEVEL_DEBUG, " free'ing"); addr = serveraddr2void (hdr->server_address); zcb_by_addr_free (cpd, addr); res_header.size = sizeof (struct qb_ipc_response_header); res_header.id = 0; api->ipc_response_send ( conn, &res_header, res_header.size); } /* Fragmented mcast message from the library */ static void message_handler_req_lib_cpg_partial_mcast (void *conn, const void *message) { const struct req_lib_cpg_partial_mcast *req_lib_cpg_mcast = message; struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn); mar_cpg_name_t group_name = cpd->group_name; struct iovec req_exec_cpg_iovec[2]; struct req_exec_cpg_partial_mcast req_exec_cpg_mcast; struct res_lib_cpg_partial_send res_lib_cpg_partial_send; int msglen = req_lib_cpg_mcast->fraglen; int result; cs_error_t error = CS_ERR_NOT_EXIST; log_printf(LOGSYS_LEVEL_TRACE, "got fragmented mcast request on %p", conn); log_printf(LOGSYS_LEVEL_DEBUG, "Sending fragmented message size = %d bytes\n", msglen); switch (cpd->cpd_state) { case CPD_STATE_UNJOINED: error = CS_ERR_NOT_EXIST; break; case CPD_STATE_LEAVE_STARTED: error = CS_ERR_NOT_EXIST; break; case CPD_STATE_JOIN_STARTED: error = CS_OK; break; case CPD_STATE_JOIN_COMPLETED: error = CS_OK; break; } res_lib_cpg_partial_send.header.size = sizeof(res_lib_cpg_partial_send); res_lib_cpg_partial_send.header.id = MESSAGE_RES_CPG_PARTIAL_SEND; if (req_lib_cpg_mcast->type == LIBCPG_PARTIAL_FIRST) { cpd->initial_transition_counter = cpd->transition_counter; } if (cpd->transition_counter != cpd->initial_transition_counter) { error = CS_ERR_INTERRUPT; } if (error == CS_OK) { req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + msglen; req_exec_cpg_mcast.header.id = SERVICE_ID_MAKE(CPG_SERVICE, MESSAGE_REQ_EXEC_CPG_PARTIAL_MCAST); req_exec_cpg_mcast.pid = cpd->pid; req_exec_cpg_mcast.msglen = req_lib_cpg_mcast->msglen; req_exec_cpg_mcast.type = req_lib_cpg_mcast->type; req_exec_cpg_mcast.fraglen = req_lib_cpg_mcast->fraglen; api->ipc_source_set (&req_exec_cpg_mcast.source, conn); memcpy(&req_exec_cpg_mcast.group_name, &group_name, sizeof(mar_cpg_name_t)); req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast; req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast); req_exec_cpg_iovec[1].iov_base = (char *)&req_lib_cpg_mcast->message; req_exec_cpg_iovec[1].iov_len = msglen; result = api->totem_mcast (req_exec_cpg_iovec, 2, TOTEM_AGREED); assert(result == 0); } else { log_printf(LOGSYS_LEVEL_ERROR, "*** %p can't mcast to group %s state:%d, error:%d", conn, group_name.value, cpd->cpd_state, error); } res_lib_cpg_partial_send.header.error = error; api->ipc_response_send (conn, &res_lib_cpg_partial_send, sizeof (res_lib_cpg_partial_send)); } /* Mcast message from the library */ static void message_handler_req_lib_cpg_mcast (void *conn, const void *message) { const struct req_lib_cpg_mcast *req_lib_cpg_mcast = message; struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn); mar_cpg_name_t group_name = cpd->group_name; struct iovec req_exec_cpg_iovec[2]; struct req_exec_cpg_mcast req_exec_cpg_mcast; int msglen = req_lib_cpg_mcast->msglen; int result; cs_error_t error = CS_ERR_NOT_EXIST; log_printf(LOGSYS_LEVEL_TRACE, "got mcast request on %p", conn); switch (cpd->cpd_state) { case CPD_STATE_UNJOINED: error = CS_ERR_NOT_EXIST; break; case CPD_STATE_LEAVE_STARTED: error = CS_ERR_NOT_EXIST; break; case CPD_STATE_JOIN_STARTED: error = CS_OK; break; case CPD_STATE_JOIN_COMPLETED: error = CS_OK; break; } if (error == CS_OK) { req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + msglen; req_exec_cpg_mcast.header.id = SERVICE_ID_MAKE(CPG_SERVICE, MESSAGE_REQ_EXEC_CPG_MCAST); req_exec_cpg_mcast.pid = cpd->pid; req_exec_cpg_mcast.msglen = msglen; api->ipc_source_set (&req_exec_cpg_mcast.source, conn); memcpy(&req_exec_cpg_mcast.group_name, &group_name, sizeof(mar_cpg_name_t)); req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast; req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast); req_exec_cpg_iovec[1].iov_base = (char *)&req_lib_cpg_mcast->message; req_exec_cpg_iovec[1].iov_len = msglen; result = api->totem_mcast (req_exec_cpg_iovec, 2, TOTEM_AGREED); assert(result == 0); } else { log_printf(LOGSYS_LEVEL_ERROR, "*** %p can't mcast to group %s state:%d, error:%d", conn, group_name.value, cpd->cpd_state, error); } } static void message_handler_req_lib_cpg_zc_execute ( void *conn, const void *message) { mar_req_coroipcc_zc_execute_t *hdr = (mar_req_coroipcc_zc_execute_t *)message; struct qb_ipc_request_header *header; struct res_lib_cpg_mcast res_lib_cpg_mcast; struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn); struct iovec req_exec_cpg_iovec[2]; struct req_exec_cpg_mcast req_exec_cpg_mcast; struct req_lib_cpg_mcast *req_lib_cpg_mcast; int result; cs_error_t error = CS_ERR_NOT_EXIST; log_printf(LOGSYS_LEVEL_TRACE, "got ZC mcast request on %p", conn); header = (struct qb_ipc_request_header *)(((char *)serveraddr2void(hdr->server_address) + sizeof (struct coroipcs_zc_header))); req_lib_cpg_mcast = (struct req_lib_cpg_mcast *)header; switch (cpd->cpd_state) { case CPD_STATE_UNJOINED: error = CS_ERR_NOT_EXIST; break; case CPD_STATE_LEAVE_STARTED: error = CS_ERR_NOT_EXIST; break; case CPD_STATE_JOIN_STARTED: error = CS_OK; break; case CPD_STATE_JOIN_COMPLETED: error = CS_OK; break; } res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast); res_lib_cpg_mcast.header.id = MESSAGE_RES_CPG_MCAST; if (error == CS_OK) { req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + req_lib_cpg_mcast->msglen; req_exec_cpg_mcast.header.id = SERVICE_ID_MAKE(CPG_SERVICE, MESSAGE_REQ_EXEC_CPG_MCAST); req_exec_cpg_mcast.pid = cpd->pid; req_exec_cpg_mcast.msglen = req_lib_cpg_mcast->msglen; api->ipc_source_set (&req_exec_cpg_mcast.source, conn); memcpy(&req_exec_cpg_mcast.group_name, &cpd->group_name, sizeof(mar_cpg_name_t)); req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast; req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast); req_exec_cpg_iovec[1].iov_base = (char *)header + sizeof(struct req_lib_cpg_mcast); req_exec_cpg_iovec[1].iov_len = req_exec_cpg_mcast.msglen; result = api->totem_mcast (req_exec_cpg_iovec, 2, TOTEM_AGREED); if (result == 0) { res_lib_cpg_mcast.header.error = CS_OK; } else { res_lib_cpg_mcast.header.error = CS_ERR_TRY_AGAIN; } } else { res_lib_cpg_mcast.header.error = error; } api->ipc_response_send (conn, &res_lib_cpg_mcast, sizeof (res_lib_cpg_mcast)); } static void message_handler_req_lib_cpg_membership (void *conn, const void *message) { struct req_lib_cpg_membership_get *req_lib_cpg_membership_get = (struct req_lib_cpg_membership_get *)message; struct res_lib_cpg_membership_get res_lib_cpg_membership_get; struct qb_list_head *iter; int member_count = 0; res_lib_cpg_membership_get.header.id = MESSAGE_RES_CPG_MEMBERSHIP; res_lib_cpg_membership_get.header.error = CS_OK; res_lib_cpg_membership_get.header.size = sizeof (struct res_lib_cpg_membership_get); qb_list_for_each(iter, &process_info_list_head) { struct process_info *pi = qb_list_entry (iter, struct process_info, list); if (mar_name_compare (&pi->group, &req_lib_cpg_membership_get->group_name) == 0) { res_lib_cpg_membership_get.member_list[member_count].nodeid = pi->nodeid; res_lib_cpg_membership_get.member_list[member_count].pid = pi->pid; member_count += 1; } } res_lib_cpg_membership_get.member_count = member_count; api->ipc_response_send (conn, &res_lib_cpg_membership_get, sizeof (res_lib_cpg_membership_get)); } static void message_handler_req_lib_cpg_local_get (void *conn, const void *message) { struct res_lib_cpg_local_get res_lib_cpg_local_get; res_lib_cpg_local_get.header.size = sizeof (res_lib_cpg_local_get); res_lib_cpg_local_get.header.id = MESSAGE_RES_CPG_LOCAL_GET; res_lib_cpg_local_get.header.error = CS_OK; res_lib_cpg_local_get.local_nodeid = api->totem_nodeid_get (); api->ipc_response_send (conn, &res_lib_cpg_local_get, sizeof (res_lib_cpg_local_get)); } static void message_handler_req_lib_cpg_iteration_initialize ( void *conn, const void *message) { const struct req_lib_cpg_iterationinitialize *req_lib_cpg_iterationinitialize = message; struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn); hdb_handle_t cpg_iteration_handle = 0; struct res_lib_cpg_iterationinitialize res_lib_cpg_iterationinitialize; struct qb_list_head *iter, *iter2; struct cpg_iteration_instance *cpg_iteration_instance; cs_error_t error = CS_OK; int res; log_printf (LOGSYS_LEVEL_DEBUG, "cpg iteration initialize"); /* Because between calling this function and *next can be some operations which will * change list, we must do full copy. */ /* * Create new iteration instance */ res = hdb_handle_create (&cpg_iteration_handle_t_db, sizeof (struct cpg_iteration_instance), &cpg_iteration_handle); if (res != 0) { error = CS_ERR_NO_MEMORY; goto response_send; } res = hdb_handle_get (&cpg_iteration_handle_t_db, cpg_iteration_handle, (void *)&cpg_iteration_instance); if (res != 0) { error = CS_ERR_BAD_HANDLE; goto error_destroy; } qb_list_init (&cpg_iteration_instance->items_list_head); cpg_iteration_instance->handle = cpg_iteration_handle; /* * Create copy of process_info list "grouped by" group name */ qb_list_for_each(iter, &process_info_list_head) { struct process_info *pi = qb_list_entry (iter, struct process_info, list); struct process_info *new_pi; if (req_lib_cpg_iterationinitialize->iteration_type == CPG_ITERATION_NAME_ONLY) { /* * Try to find processed group name in our list new list */ int found = 0; qb_list_for_each(iter2, &(cpg_iteration_instance->items_list_head)) { struct process_info *pi2 = qb_list_entry (iter2, struct process_info, list); if (mar_name_compare (&pi2->group, &pi->group) == 0) { found = 1; break; } } if (found) { /* * We have this name in list -> don't add */ continue ; } } else if (req_lib_cpg_iterationinitialize->iteration_type == CPG_ITERATION_ONE_GROUP) { /* * Test pi group name with request */ if (mar_name_compare (&pi->group, &req_lib_cpg_iterationinitialize->group_name) != 0) /* * Not same -> don't add */ continue ; } new_pi = malloc (sizeof (struct process_info)); if (!new_pi) { log_printf(LOGSYS_LEVEL_WARNING, "Unable to allocate process_info struct"); error = CS_ERR_NO_MEMORY; goto error_put_destroy; } memcpy (new_pi, pi, sizeof (struct process_info)); qb_list_init (&new_pi->list); if (req_lib_cpg_iterationinitialize->iteration_type == CPG_ITERATION_NAME_ONLY) { /* * pid and nodeid -> undefined */ new_pi->pid = new_pi->nodeid = 0; } /* * We will return list "grouped" by "group name", so try to find right place to add */ qb_list_for_each(iter2, &(cpg_iteration_instance->items_list_head)) { struct process_info *pi2 = qb_list_entry (iter2, struct process_info, list); if (mar_name_compare (&pi2->group, &pi->group) == 0) { break; } } qb_list_add (&new_pi->list, iter2); } /* * Now we have a full "grouped by" copy of process_info list */ /* * Add instance to current cpd list */ qb_list_init (&cpg_iteration_instance->list); qb_list_add (&cpg_iteration_instance->list, &cpd->iteration_instance_list_head); cpg_iteration_instance->current_pointer = &cpg_iteration_instance->items_list_head; error_put_destroy: hdb_handle_put (&cpg_iteration_handle_t_db, cpg_iteration_handle); error_destroy: if (error != CS_OK) { hdb_handle_destroy (&cpg_iteration_handle_t_db, cpg_iteration_handle); } response_send: res_lib_cpg_iterationinitialize.header.size = sizeof (res_lib_cpg_iterationinitialize); res_lib_cpg_iterationinitialize.header.id = MESSAGE_RES_CPG_ITERATIONINITIALIZE; res_lib_cpg_iterationinitialize.header.error = error; res_lib_cpg_iterationinitialize.iteration_handle = cpg_iteration_handle; api->ipc_response_send (conn, &res_lib_cpg_iterationinitialize, sizeof (res_lib_cpg_iterationinitialize)); } static void message_handler_req_lib_cpg_iteration_next ( void *conn, const void *message) { const struct req_lib_cpg_iterationnext *req_lib_cpg_iterationnext = message; struct res_lib_cpg_iterationnext res_lib_cpg_iterationnext; struct cpg_iteration_instance *cpg_iteration_instance; cs_error_t error = CS_OK; int res; struct process_info *pi; log_printf (LOGSYS_LEVEL_DEBUG, "cpg iteration next"); res = hdb_handle_get (&cpg_iteration_handle_t_db, req_lib_cpg_iterationnext->iteration_handle, (void *)&cpg_iteration_instance); if (res != 0) { error = CS_ERR_LIBRARY; goto error_exit; } assert (cpg_iteration_instance); cpg_iteration_instance->current_pointer = cpg_iteration_instance->current_pointer->next; if (cpg_iteration_instance->current_pointer == &cpg_iteration_instance->items_list_head) { error = CS_ERR_NO_SECTIONS; goto error_put; } pi = qb_list_entry (cpg_iteration_instance->current_pointer, struct process_info, list); /* * Copy iteration data */ res_lib_cpg_iterationnext.description.nodeid = pi->nodeid; res_lib_cpg_iterationnext.description.pid = pi->pid; memcpy (&res_lib_cpg_iterationnext.description.group, &pi->group, sizeof (mar_cpg_name_t)); error_put: hdb_handle_put (&cpg_iteration_handle_t_db, req_lib_cpg_iterationnext->iteration_handle); error_exit: res_lib_cpg_iterationnext.header.size = sizeof (res_lib_cpg_iterationnext); res_lib_cpg_iterationnext.header.id = MESSAGE_RES_CPG_ITERATIONNEXT; res_lib_cpg_iterationnext.header.error = error; api->ipc_response_send (conn, &res_lib_cpg_iterationnext, sizeof (res_lib_cpg_iterationnext)); } static void message_handler_req_lib_cpg_iteration_finalize ( void *conn, const void *message) { const struct req_lib_cpg_iterationfinalize *req_lib_cpg_iterationfinalize = message; struct res_lib_cpg_iterationfinalize res_lib_cpg_iterationfinalize; struct cpg_iteration_instance *cpg_iteration_instance; cs_error_t error = CS_OK; int res; log_printf (LOGSYS_LEVEL_DEBUG, "cpg iteration finalize"); res = hdb_handle_get (&cpg_iteration_handle_t_db, req_lib_cpg_iterationfinalize->iteration_handle, (void *)&cpg_iteration_instance); if (res != 0) { error = CS_ERR_LIBRARY; goto error_exit; } assert (cpg_iteration_instance); cpg_iteration_instance_finalize (cpg_iteration_instance); hdb_handle_put (&cpg_iteration_handle_t_db, cpg_iteration_instance->handle); error_exit: res_lib_cpg_iterationfinalize.header.size = sizeof (res_lib_cpg_iterationfinalize); res_lib_cpg_iterationfinalize.header.id = MESSAGE_RES_CPG_ITERATIONFINALIZE; res_lib_cpg_iterationfinalize.header.error = error; api->ipc_response_send (conn, &res_lib_cpg_iterationfinalize, sizeof (res_lib_cpg_iterationfinalize)); } diff --git a/exec/quorum.c b/exec/quorum.c index d4837a2c..323a15f8 100644 --- a/exec/quorum.c +++ b/exec/quorum.c @@ -1,112 +1,110 @@ /* * Copyright (c) 2008-2012 Red Hat, Inc. * * All rights reserved. * * Author: Christine Caulfield (ccaulfie@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include #include #include #include #include -#include -#include #include #include #include #include #include #include #include "quorum.h" #include "main.h" #include "vsf.h" LOGSYS_DECLARE_SUBSYS ("QUORUM"); static struct quorum_callin_functions *corosync_quorum_fns = NULL; int corosync_quorum_is_quorate (void) { if (corosync_quorum_fns) { return corosync_quorum_fns->quorate(); } else { return 1; } } int corosync_quorum_register_callback (quorum_callback_fn_t fn, void *context) { if (corosync_quorum_fns) { return corosync_quorum_fns->register_callback(fn, context); } else { return 0; } } int corosync_quorum_unregister_callback (quorum_callback_fn_t fn, void *context) { if (corosync_quorum_fns) { return corosync_quorum_fns->unregister_callback(fn, context); } else { return 0; } } int corosync_quorum_initialize (struct quorum_callin_functions *fns) { if (corosync_quorum_fns) return -1; corosync_quorum_fns = fns; return 0; } int quorum_none(void) { if (corosync_quorum_fns) return 0; else return 1; } diff --git a/exec/sync.c b/exec/sync.c index ea452e60..283634a8 100644 --- a/exec/sync.c +++ b/exec/sync.c @@ -1,630 +1,628 @@ /* * Copyright (c) 2009-2012 Red Hat, Inc. * * All rights reserved. * * Author: Steven Dake (sdake@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include #include #include #include #include -#include -#include #include #include #include #include #include #include #include #include "schedwrk.h" #include "quorum.h" #include "sync.h" #include "main.h" LOGSYS_DECLARE_SUBSYS ("SYNC"); #define MESSAGE_REQ_SYNC_BARRIER 0 #define MESSAGE_REQ_SYNC_SERVICE_BUILD 1 #define MESSAGE_REQ_SYNC_MEMB_DETERMINE 2 enum sync_process_state { INIT, PROCESS, ACTIVATE }; enum sync_state { SYNC_SERVICELIST_BUILD, SYNC_PROCESS, SYNC_BARRIER }; struct service_entry { int service_id; void (*sync_init) ( const unsigned int *trans_list, size_t trans_list_entries, const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id); void (*sync_abort) (void); int (*sync_process) (void); void (*sync_activate) (void); enum sync_process_state state; char name[128]; }; struct processor_entry { int nodeid; int received; }; struct req_exec_memb_determine_message { struct qb_ipc_request_header header __attribute__((aligned(8))); struct memb_ring_id ring_id __attribute__((aligned(8))); }; struct req_exec_service_build_message { struct qb_ipc_request_header header __attribute__((aligned(8))); struct memb_ring_id ring_id __attribute__((aligned(8))); int service_list_entries __attribute__((aligned(8))); int service_list[128] __attribute__((aligned(8))); }; struct req_exec_barrier_message { struct qb_ipc_request_header header __attribute__((aligned(8))); struct memb_ring_id ring_id __attribute__((aligned(8))); }; static enum sync_state my_state = SYNC_BARRIER; static struct memb_ring_id my_ring_id; static struct memb_ring_id my_memb_determine_ring_id; static int my_memb_determine = 0; static unsigned int my_memb_determine_list[PROCESSOR_COUNT_MAX]; static unsigned int my_memb_determine_list_entries = 0; static int my_processing_idx = 0; static hdb_handle_t my_schedwrk_handle; static struct processor_entry my_processor_list[PROCESSOR_COUNT_MAX]; static unsigned int my_member_list[PROCESSOR_COUNT_MAX]; static unsigned int my_trans_list[PROCESSOR_COUNT_MAX]; static size_t my_member_list_entries = 0; static size_t my_trans_list_entries = 0; static int my_processor_list_entries = 0; static struct service_entry my_service_list[SERVICES_COUNT_MAX]; static int my_service_list_entries = 0; static void (*sync_synchronization_completed) (void); static void sync_deliver_fn ( unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required); static int schedwrk_processor (const void *context); static void sync_process_enter (void); static struct totempg_group sync_group = { .group = "sync", .group_len = 4 }; static void *sync_group_handle; int (*my_sync_callbacks_retrieve) ( int service_id, struct sync_callbacks *callbacks); int sync_init ( int (*sync_callbacks_retrieve) ( int service_id, struct sync_callbacks *callbacks), void (*synchronization_completed) (void)) { unsigned int res; res = totempg_groups_initialize ( &sync_group_handle, sync_deliver_fn, NULL); if (res == -1) { log_printf (LOGSYS_LEVEL_ERROR, "Couldn't initialize groups interface."); return (-1); } res = totempg_groups_join ( sync_group_handle, &sync_group, 1); if (res == -1) { log_printf (LOGSYS_LEVEL_ERROR, "Couldn't join group."); return (-1); } sync_synchronization_completed = synchronization_completed; my_sync_callbacks_retrieve = sync_callbacks_retrieve; return (0); } static void sync_barrier_handler (unsigned int nodeid, const void *msg) { const struct req_exec_barrier_message *req_exec_barrier_message = msg; int i; int barrier_reached = 1; if (memcmp (&my_ring_id, &req_exec_barrier_message->ring_id, sizeof (struct memb_ring_id)) != 0) { log_printf (LOGSYS_LEVEL_DEBUG, "barrier for old ring - discarding"); return; } for (i = 0; i < my_processor_list_entries; i++) { if (my_processor_list[i].nodeid == nodeid) { my_processor_list[i].received = 1; } } for (i = 0; i < my_processor_list_entries; i++) { if (my_processor_list[i].received == 0) { barrier_reached = 0; } } if (barrier_reached) { log_printf (LOGSYS_LEVEL_DEBUG, "Committing synchronization for %s", my_service_list[my_processing_idx].name); my_service_list[my_processing_idx].state = ACTIVATE; if (my_sync_callbacks_retrieve(my_service_list[my_processing_idx].service_id, NULL) != -1) { my_service_list[my_processing_idx].sync_activate (); } my_processing_idx += 1; if (my_service_list_entries == my_processing_idx) { my_memb_determine_list_entries = 0; sync_synchronization_completed (); } else { sync_process_enter (); } } } static void dummy_sync_init ( const unsigned int *trans_list, size_t trans_list_entries, const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id) { } static void dummy_sync_abort (void) { } static int dummy_sync_process (void) { return (0); } static void dummy_sync_activate (void) { } static int service_entry_compare (const void *a, const void *b) { const struct service_entry *service_entry_a = a; const struct service_entry *service_entry_b = b; return (service_entry_a->service_id > service_entry_b->service_id); } static void sync_memb_determine (unsigned int nodeid, const void *msg) { const struct req_exec_memb_determine_message *req_exec_memb_determine_message = msg; int found = 0; int i; if (memcmp (&req_exec_memb_determine_message->ring_id, &my_memb_determine_ring_id, sizeof (struct memb_ring_id)) != 0) { log_printf (LOGSYS_LEVEL_DEBUG, "memb determine for old ring - discarding"); return; } my_memb_determine = 1; for (i = 0; i < my_memb_determine_list_entries; i++) { if (my_memb_determine_list[i] == nodeid) { found = 1; } } if (found == 0) { my_memb_determine_list[my_memb_determine_list_entries] = nodeid; my_memb_determine_list_entries += 1; } } static void sync_service_build_handler (unsigned int nodeid, const void *msg) { const struct req_exec_service_build_message *req_exec_service_build_message = msg; int i, j; int barrier_reached = 1; int found; int qsort_trigger = 0; if (memcmp (&my_ring_id, &req_exec_service_build_message->ring_id, sizeof (struct memb_ring_id)) != 0) { log_printf (LOGSYS_LEVEL_DEBUG, "service build for old ring - discarding"); return; } for (i = 0; i < req_exec_service_build_message->service_list_entries; i++) { found = 0; for (j = 0; j < my_service_list_entries; j++) { if (req_exec_service_build_message->service_list[i] == my_service_list[j].service_id) { found = 1; break; } } if (found == 0) { my_service_list[my_service_list_entries].state = INIT; my_service_list[my_service_list_entries].service_id = req_exec_service_build_message->service_list[i]; sprintf (my_service_list[my_service_list_entries].name, "Unknown External Service (id = %d)\n", req_exec_service_build_message->service_list[i]); my_service_list[my_service_list_entries].sync_init = dummy_sync_init; my_service_list[my_service_list_entries].sync_abort = dummy_sync_abort; my_service_list[my_service_list_entries].sync_process = dummy_sync_process; my_service_list[my_service_list_entries].sync_activate = dummy_sync_activate; my_service_list_entries += 1; qsort_trigger = 1; } } if (qsort_trigger) { qsort (my_service_list, my_service_list_entries, sizeof (struct service_entry), service_entry_compare); } for (i = 0; i < my_processor_list_entries; i++) { if (my_processor_list[i].nodeid == nodeid) { my_processor_list[i].received = 1; } } for (i = 0; i < my_processor_list_entries; i++) { if (my_processor_list[i].received == 0) { barrier_reached = 0; } } if (barrier_reached) { sync_process_enter (); } } static void sync_deliver_fn ( unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required) { struct qb_ipc_request_header *header = (struct qb_ipc_request_header *)msg; switch (header->id) { case MESSAGE_REQ_SYNC_BARRIER: sync_barrier_handler (nodeid, msg); break; case MESSAGE_REQ_SYNC_SERVICE_BUILD: sync_service_build_handler (nodeid, msg); break; case MESSAGE_REQ_SYNC_MEMB_DETERMINE: sync_memb_determine (nodeid, msg); break; } } static void memb_determine_message_transmit (void) { struct iovec iovec; struct req_exec_memb_determine_message req_exec_memb_determine_message; req_exec_memb_determine_message.header.size = sizeof (struct req_exec_memb_determine_message); req_exec_memb_determine_message.header.id = MESSAGE_REQ_SYNC_MEMB_DETERMINE; memcpy (&req_exec_memb_determine_message.ring_id, &my_memb_determine_ring_id, sizeof (struct memb_ring_id)); iovec.iov_base = (char *)&req_exec_memb_determine_message; iovec.iov_len = sizeof (req_exec_memb_determine_message); (void)totempg_groups_mcast_joined (sync_group_handle, &iovec, 1, TOTEMPG_AGREED); } static void barrier_message_transmit (void) { struct iovec iovec; struct req_exec_barrier_message req_exec_barrier_message; req_exec_barrier_message.header.size = sizeof (struct req_exec_barrier_message); req_exec_barrier_message.header.id = MESSAGE_REQ_SYNC_BARRIER; memcpy (&req_exec_barrier_message.ring_id, &my_ring_id, sizeof (struct memb_ring_id)); iovec.iov_base = (char *)&req_exec_barrier_message; iovec.iov_len = sizeof (req_exec_barrier_message); (void)totempg_groups_mcast_joined (sync_group_handle, &iovec, 1, TOTEMPG_AGREED); } static void service_build_message_transmit (struct req_exec_service_build_message *service_build_message) { struct iovec iovec; service_build_message->header.size = sizeof (struct req_exec_service_build_message); service_build_message->header.id = MESSAGE_REQ_SYNC_SERVICE_BUILD; memcpy (&service_build_message->ring_id, &my_ring_id, sizeof (struct memb_ring_id)); iovec.iov_base = (void *)service_build_message; iovec.iov_len = sizeof (struct req_exec_service_build_message); (void)totempg_groups_mcast_joined (sync_group_handle, &iovec, 1, TOTEMPG_AGREED); } static void sync_barrier_enter (void) { my_state = SYNC_BARRIER; barrier_message_transmit (); } static void sync_process_enter (void) { int i; my_state = SYNC_PROCESS; /* * No sync services */ if (my_service_list_entries == 0) { my_state = SYNC_SERVICELIST_BUILD; my_memb_determine_list_entries = 0; sync_synchronization_completed (); return; } for (i = 0; i < my_processor_list_entries; i++) { my_processor_list[i].received = 0; } schedwrk_create (&my_schedwrk_handle, schedwrk_processor, NULL); } static void sync_servicelist_build_enter ( const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id) { struct req_exec_service_build_message service_build; int i; int res; struct sync_callbacks sync_callbacks; my_state = SYNC_SERVICELIST_BUILD; for (i = 0; i < member_list_entries; i++) { my_processor_list[i].nodeid = member_list[i]; my_processor_list[i].received = 0; } my_processor_list_entries = member_list_entries; memcpy (my_member_list, member_list, member_list_entries * sizeof (unsigned int)); my_member_list_entries = member_list_entries; my_processing_idx = 0; memset(my_service_list, 0, sizeof (struct service_entry) * SERVICES_COUNT_MAX); my_service_list_entries = 0; for (i = 0; i < SERVICES_COUNT_MAX; i++) { res = my_sync_callbacks_retrieve (i, &sync_callbacks); if (res == -1) { continue; } if (sync_callbacks.sync_init == NULL) { continue; } my_service_list[my_service_list_entries].state = INIT; my_service_list[my_service_list_entries].service_id = i; strcpy (my_service_list[my_service_list_entries].name, sync_callbacks.name); my_service_list[my_service_list_entries].sync_init = sync_callbacks.sync_init; my_service_list[my_service_list_entries].sync_process = sync_callbacks.sync_process; my_service_list[my_service_list_entries].sync_abort = sync_callbacks.sync_abort; my_service_list[my_service_list_entries].sync_activate = sync_callbacks.sync_activate; my_service_list_entries += 1; } for (i = 0; i < my_service_list_entries; i++) { service_build.service_list[i] = my_service_list[i].service_id; } service_build.service_list_entries = my_service_list_entries; service_build_message_transmit (&service_build); } static int schedwrk_processor (const void *context) { int res = 0; if (my_service_list[my_processing_idx].state == INIT) { unsigned int old_trans_list[PROCESSOR_COUNT_MAX]; size_t old_trans_list_entries = 0; int o, m; my_service_list[my_processing_idx].state = PROCESS; memcpy (old_trans_list, my_trans_list, my_trans_list_entries * sizeof (unsigned int)); old_trans_list_entries = my_trans_list_entries; my_trans_list_entries = 0; for (o = 0; o < old_trans_list_entries; o++) { for (m = 0; m < my_member_list_entries; m++) { if (old_trans_list[o] == my_member_list[m]) { my_trans_list[my_trans_list_entries] = my_member_list[m]; my_trans_list_entries++; break; } } } if (my_sync_callbacks_retrieve(my_service_list[my_processing_idx].service_id, NULL) != -1) { my_service_list[my_processing_idx].sync_init (my_trans_list, my_trans_list_entries, my_member_list, my_member_list_entries, &my_ring_id); } } if (my_service_list[my_processing_idx].state == PROCESS) { my_service_list[my_processing_idx].state = PROCESS; if (my_sync_callbacks_retrieve(my_service_list[my_processing_idx].service_id, NULL) != -1) { res = my_service_list[my_processing_idx].sync_process (); } else { res = 0; } if (res == 0) { sync_barrier_enter(); } else { return (-1); } } return (0); } void sync_start ( const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id) { ENTER(); memcpy (&my_ring_id, ring_id, sizeof (struct memb_ring_id)); if (my_memb_determine) { my_memb_determine = 0; sync_servicelist_build_enter (my_memb_determine_list, my_memb_determine_list_entries, ring_id); } else { sync_servicelist_build_enter (member_list, member_list_entries, ring_id); } } void sync_save_transitional ( const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id) { ENTER(); memcpy (my_trans_list, member_list, member_list_entries * sizeof (unsigned int)); my_trans_list_entries = member_list_entries; } void sync_abort (void) { ENTER(); if (my_state == SYNC_PROCESS) { schedwrk_destroy (my_schedwrk_handle); if (my_sync_callbacks_retrieve(my_service_list[my_processing_idx].service_id, NULL) != -1) { my_service_list[my_processing_idx].sync_abort (); } } /* this will cause any "old" barrier messages from causing * problems. */ memset (&my_ring_id, 0, sizeof (struct memb_ring_id)); } void sync_memb_list_determine (const struct memb_ring_id *ring_id) { ENTER(); memcpy (&my_memb_determine_ring_id, ring_id, sizeof (struct memb_ring_id)); memb_determine_message_transmit (); } void sync_memb_list_abort (void) { ENTER(); my_memb_determine_list_entries = 0; memset (&my_memb_determine_ring_id, 0, sizeof (struct memb_ring_id)); } diff --git a/exec/vsf_quorum.c b/exec/vsf_quorum.c index 425c21d3..f93eaf5b 100644 --- a/exec/vsf_quorum.c +++ b/exec/vsf_quorum.c @@ -1,485 +1,484 @@ /* * Copyright (c) 2008-2015 Red Hat, Inc. * * All rights reserved. * * Author: Christine Caulfield (ccaulfie@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of Red Hat Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "quorum.h" #include #include #include #include #include #include #include -#include #include #include #include #include "service.h" #include "votequorum.h" #include "vsf_ykd.h" LOGSYS_DECLARE_SUBSYS ("QUORUM"); struct quorum_pd { unsigned char track_flags; int tracking_enabled; struct qb_list_head list; void *conn; }; struct internal_callback_pd { struct qb_list_head list; quorum_callback_fn_t callback; void *context; }; static void message_handler_req_lib_quorum_getquorate (void *conn, const void *msg); static void message_handler_req_lib_quorum_trackstart (void *conn, const void *msg); static void message_handler_req_lib_quorum_trackstop (void *conn, const void *msg); static void message_handler_req_lib_quorum_gettype (void *conn, const void *msg); static void send_library_notification(void *conn); static void send_internal_notification(void); static char *quorum_exec_init_fn (struct corosync_api_v1 *api); static int quorum_lib_init_fn (void *conn); static int quorum_lib_exit_fn (void *conn); static int primary_designated = 0; static int quorum_type = 0; static struct corosync_api_v1 *corosync_api; static struct qb_list_head lib_trackers_list; static struct qb_list_head internal_trackers_list; static struct memb_ring_id quorum_ring_id; static size_t quorum_view_list_entries = 0; static int quorum_view_list[PROCESSOR_COUNT_MAX]; struct quorum_services_api_ver1 *quorum_iface = NULL; static char view_buf[64]; static void log_view_list(const unsigned int *view_list, size_t view_list_entries) { int total = (int)view_list_entries; int len, pos, ret; int i = 0; while (1) { len = sizeof(view_buf); pos = 0; memset(view_buf, 0, len); for (; i < total; i++) { ret = snprintf(view_buf + pos, len - pos, " %u", view_list[i]); if (ret >= len - pos) break; pos += ret; } log_printf (LOGSYS_LEVEL_NOTICE, "Members[%d]:%s%s", total, view_buf, i < total ? "\\" : ""); if (i == total) break; } } /* Internal quorum API function */ static void quorum_api_set_quorum(const unsigned int *view_list, size_t view_list_entries, int quorum, struct memb_ring_id *ring_id) { int old_quorum = primary_designated; primary_designated = quorum; if (primary_designated && !old_quorum) { log_printf (LOGSYS_LEVEL_NOTICE, "This node is within the primary component and will provide service."); } else if (!primary_designated && old_quorum) { log_printf (LOGSYS_LEVEL_NOTICE, "This node is within the non-primary component and will NOT provide any services."); } quorum_view_list_entries = view_list_entries; memcpy(&quorum_ring_id, ring_id, sizeof (quorum_ring_id)); memcpy(quorum_view_list, view_list, sizeof(unsigned int)*view_list_entries); log_view_list(view_list, view_list_entries); /* Tell internal listeners */ send_internal_notification(); /* Tell IPC listeners */ send_library_notification(NULL); } static struct corosync_lib_handler quorum_lib_service[] = { { /* 0 */ .lib_handler_fn = message_handler_req_lib_quorum_getquorate, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 1 */ .lib_handler_fn = message_handler_req_lib_quorum_trackstart, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 2 */ .lib_handler_fn = message_handler_req_lib_quorum_trackstop, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 3 */ .lib_handler_fn = message_handler_req_lib_quorum_gettype, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED } }; static struct corosync_service_engine quorum_service_handler = { .name = "corosync cluster quorum service v0.1", .id = QUORUM_SERVICE, .priority = 1, .private_data_size = sizeof (struct quorum_pd), .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED, .allow_inquorate = CS_LIB_ALLOW_INQUORATE, .lib_init_fn = quorum_lib_init_fn, .lib_exit_fn = quorum_lib_exit_fn, .lib_engine = quorum_lib_service, .exec_init_fn = quorum_exec_init_fn, .lib_engine_count = sizeof (quorum_lib_service) / sizeof (struct corosync_lib_handler) }; struct corosync_service_engine *vsf_quorum_get_service_engine_ver0 (void) { return (&quorum_service_handler); } /* -------------------------------------------------- */ /* * Internal API functions for corosync */ static int quorum_quorate(void) { return primary_designated; } static int quorum_register_callback(quorum_callback_fn_t function, void *context) { struct internal_callback_pd *pd = malloc(sizeof(struct internal_callback_pd)); if (!pd) return -1; pd->context = context; pd->callback = function; qb_list_add (&pd->list, &internal_trackers_list); return 0; } static int quorum_unregister_callback(quorum_callback_fn_t function, void *context) { struct internal_callback_pd *pd; struct qb_list_head *tmp, *tmp_iter; qb_list_for_each_safe(tmp, tmp_iter, &internal_trackers_list) { pd = qb_list_entry(tmp, struct internal_callback_pd, list); if (pd->callback == function && pd->context == context) { qb_list_del(&pd->list); free(pd); return 0; } } return -1; } static struct quorum_callin_functions callins = { .quorate = quorum_quorate, .register_callback = quorum_register_callback, .unregister_callback = quorum_unregister_callback }; /* --------------------------------------------------------------------- */ static char *quorum_exec_init_fn (struct corosync_api_v1 *api) { char *quorum_module = NULL; char *error; corosync_api = api; qb_list_init (&lib_trackers_list); qb_list_init (&internal_trackers_list); /* * Tell corosync we have a quorum engine. */ api->quorum_initialize(&callins); /* * Look for a quorum provider */ if (icmap_get_string("quorum.provider", &quorum_module) == CS_OK) { log_printf (LOGSYS_LEVEL_NOTICE, "Using quorum provider %s", quorum_module); error = (char *)"Invalid quorum provider"; if (strcmp (quorum_module, "corosync_votequorum") == 0) { error = votequorum_init (api, quorum_api_set_quorum); quorum_type = 1; } if (strcmp (quorum_module, "corosync_ykd") == 0) { error = ykd_init (api, quorum_api_set_quorum); quorum_type = 1; } if (error) { log_printf (LOGSYS_LEVEL_CRIT, "Quorum provider: %s failed to initialize.", quorum_module); free(quorum_module); return (error); } } if (quorum_module) { free(quorum_module); quorum_module = NULL; } /* * setting quorum_type and primary_designated in the right order is important * always try to lookup/init a quorum module, then revert back to be quorate */ if (quorum_type == 0) { primary_designated = 1; } return (NULL); } static int quorum_lib_init_fn (void *conn) { struct quorum_pd *pd = (struct quorum_pd *)corosync_api->ipc_private_data_get (conn); log_printf(LOGSYS_LEVEL_DEBUG, "lib_init_fn: conn=%p", conn); qb_list_init (&pd->list); pd->conn = conn; return (0); } static int quorum_lib_exit_fn (void *conn) { struct quorum_pd *quorum_pd = (struct quorum_pd *)corosync_api->ipc_private_data_get (conn); log_printf(LOGSYS_LEVEL_DEBUG, "lib_exit_fn: conn=%p", conn); if (quorum_pd->tracking_enabled) { qb_list_del (&quorum_pd->list); qb_list_init (&quorum_pd->list); } return (0); } static void send_internal_notification(void) { struct qb_list_head *tmp; struct internal_callback_pd *pd; qb_list_for_each(tmp, &internal_trackers_list) { pd = qb_list_entry(tmp, struct internal_callback_pd, list); pd->callback(primary_designated, pd->context); } } static void send_library_notification(void *conn) { int size = sizeof(struct res_lib_quorum_notification) + sizeof(unsigned int)*quorum_view_list_entries; char buf[size]; struct res_lib_quorum_notification *res_lib_quorum_notification = (struct res_lib_quorum_notification *)buf; struct qb_list_head *tmp; int i; log_printf(LOGSYS_LEVEL_DEBUG, "sending quorum notification to %p, length = %d", conn, size); res_lib_quorum_notification->quorate = primary_designated; res_lib_quorum_notification->ring_seq = quorum_ring_id.seq; res_lib_quorum_notification->view_list_entries = quorum_view_list_entries; for (i=0; iview_list[i] = quorum_view_list[i]; } res_lib_quorum_notification->header.id = MESSAGE_RES_QUORUM_NOTIFICATION; res_lib_quorum_notification->header.size = size; res_lib_quorum_notification->header.error = CS_OK; /* Send it to all interested parties */ if (conn) { corosync_api->ipc_dispatch_send(conn, res_lib_quorum_notification, size); } else { struct quorum_pd *qpd; qb_list_for_each(tmp, &lib_trackers_list) { qpd = qb_list_entry(tmp, struct quorum_pd, list); corosync_api->ipc_dispatch_send(qpd->conn, res_lib_quorum_notification, size); } } return; } static void message_handler_req_lib_quorum_getquorate (void *conn, const void *msg) { struct res_lib_quorum_getquorate res_lib_quorum_getquorate; log_printf(LOGSYS_LEVEL_DEBUG, "got quorate request on %p", conn); /* send status */ res_lib_quorum_getquorate.quorate = primary_designated; res_lib_quorum_getquorate.header.size = sizeof(res_lib_quorum_getquorate); res_lib_quorum_getquorate.header.id = MESSAGE_RES_QUORUM_GETQUORATE; res_lib_quorum_getquorate.header.error = CS_OK; corosync_api->ipc_response_send(conn, &res_lib_quorum_getquorate, sizeof(res_lib_quorum_getquorate)); } static void message_handler_req_lib_quorum_trackstart (void *conn, const void *msg) { const struct req_lib_quorum_trackstart *req_lib_quorum_trackstart = msg; struct qb_ipc_response_header res; struct quorum_pd *quorum_pd = (struct quorum_pd *)corosync_api->ipc_private_data_get (conn); cs_error_t error = CS_OK; log_printf(LOGSYS_LEVEL_DEBUG, "got trackstart request on %p", conn); /* * If an immediate listing of the current cluster membership * is requested, generate membership list */ if (req_lib_quorum_trackstart->track_flags & CS_TRACK_CURRENT || req_lib_quorum_trackstart->track_flags & CS_TRACK_CHANGES) { log_printf(LOGSYS_LEVEL_DEBUG, "sending initial status to %p", conn); send_library_notification(conn); } if (quorum_pd->tracking_enabled) { error = CS_ERR_EXIST; goto response_send; } /* * Record requests for tracking */ if (req_lib_quorum_trackstart->track_flags & CS_TRACK_CHANGES || req_lib_quorum_trackstart->track_flags & CS_TRACK_CHANGES_ONLY) { quorum_pd->track_flags = req_lib_quorum_trackstart->track_flags; quorum_pd->tracking_enabled = 1; qb_list_add (&quorum_pd->list, &lib_trackers_list); } response_send: /* send status */ res.size = sizeof(res); res.id = MESSAGE_RES_QUORUM_TRACKSTART; res.error = error; corosync_api->ipc_response_send(conn, &res, sizeof(struct qb_ipc_response_header)); } static void message_handler_req_lib_quorum_trackstop (void *conn, const void *msg) { struct qb_ipc_response_header res; struct quorum_pd *quorum_pd = (struct quorum_pd *)corosync_api->ipc_private_data_get (conn); log_printf(LOGSYS_LEVEL_DEBUG, "got trackstop request on %p", conn); if (quorum_pd->tracking_enabled) { res.error = CS_OK; quorum_pd->tracking_enabled = 0; qb_list_del (&quorum_pd->list); qb_list_init (&quorum_pd->list); } else { res.error = CS_ERR_NOT_EXIST; } /* send status */ res.size = sizeof(res); res.id = MESSAGE_RES_QUORUM_TRACKSTOP; res.error = CS_OK; corosync_api->ipc_response_send(conn, &res, sizeof(struct qb_ipc_response_header)); } static void message_handler_req_lib_quorum_gettype (void *conn, const void *msg) { struct res_lib_quorum_gettype res_lib_quorum_gettype; log_printf(LOGSYS_LEVEL_DEBUG, "got quorum_type request on %p", conn); /* send status */ res_lib_quorum_gettype.quorum_type = quorum_type; res_lib_quorum_gettype.header.size = sizeof(res_lib_quorum_gettype); res_lib_quorum_gettype.header.id = MESSAGE_RES_QUORUM_GETTYPE; res_lib_quorum_gettype.header.error = CS_OK; corosync_api->ipc_response_send(conn, &res_lib_quorum_gettype, sizeof(res_lib_quorum_gettype)); } diff --git a/qdevices/tlv.c b/qdevices/tlv.c index a7eae953..5b458690 100644 --- a/qdevices/tlv.c +++ b/qdevices/tlv.c @@ -1,1038 +1,1037 @@ /* * Copyright (c) 2015-2016 Red Hat, Inc. * * All rights reserved. * * Author: Jan Friesse (jfriesse@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the Red Hat, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include /* * 64-bit variant of ntoh is not exactly standard... */ #if defined(__linux__) #include #elif defined(__FreeBSD__) || defined(__NetBSD__) #include #elif defined(__OpenBSD__) -#include #define be64toh(x) betoh64(x) #endif #include "tlv.h" #define TLV_TYPE_LENGTH 2 #define TLV_LENGTH_LENGTH 2 #define TLV_STATIC_SUPPORTED_OPTIONS_SIZE 22 enum tlv_opt_type tlv_static_supported_options[TLV_STATIC_SUPPORTED_OPTIONS_SIZE] = { TLV_OPT_MSG_SEQ_NUMBER, TLV_OPT_CLUSTER_NAME, TLV_OPT_TLS_SUPPORTED, TLV_OPT_TLS_CLIENT_CERT_REQUIRED, TLV_OPT_SUPPORTED_MESSAGES, TLV_OPT_SUPPORTED_OPTIONS, TLV_OPT_REPLY_ERROR_CODE, TLV_OPT_SERVER_MAXIMUM_REQUEST_SIZE, TLV_OPT_SERVER_MAXIMUM_REPLY_SIZE, TLV_OPT_NODE_ID, TLV_OPT_SUPPORTED_DECISION_ALGORITHMS, TLV_OPT_DECISION_ALGORITHM, TLV_OPT_HEARTBEAT_INTERVAL, TLV_OPT_RING_ID, TLV_OPT_CONFIG_VERSION, TLV_OPT_DATA_CENTER_ID, TLV_OPT_NODE_STATE, TLV_OPT_NODE_INFO, TLV_OPT_NODE_LIST_TYPE, TLV_OPT_VOTE, TLV_OPT_QUORATE, TLV_OPT_TIE_BREAKER, }; int tlv_add(struct dynar *msg, enum tlv_opt_type opt_type, uint16_t opt_len, const void *value) { uint16_t nlen; uint16_t nopt_type; if (dynar_size(msg) + sizeof(nopt_type) + sizeof(nlen) + opt_len > dynar_max_size(msg)) { return (-1); } nopt_type = htons((uint16_t)opt_type); nlen = htons(opt_len); dynar_cat(msg, &nopt_type, sizeof(nopt_type)); dynar_cat(msg, &nlen, sizeof(nlen)); dynar_cat(msg, value, opt_len); return (0); } int tlv_add_u32(struct dynar *msg, enum tlv_opt_type opt_type, uint32_t u32) { uint32_t nu32; nu32 = htonl(u32); return (tlv_add(msg, opt_type, sizeof(nu32), &nu32)); } int tlv_add_u8(struct dynar *msg, enum tlv_opt_type opt_type, uint8_t u8) { return (tlv_add(msg, opt_type, sizeof(u8), &u8)); } int tlv_add_u16(struct dynar *msg, enum tlv_opt_type opt_type, uint16_t u16) { uint16_t nu16; nu16 = htons(u16); return (tlv_add(msg, opt_type, sizeof(nu16), &nu16)); } int tlv_add_u64(struct dynar *msg, enum tlv_opt_type opt_type, uint64_t u64) { uint64_t nu64; nu64 = htobe64(u64); return (tlv_add(msg, opt_type, sizeof(nu64), &nu64)); } int tlv_add_string(struct dynar *msg, enum tlv_opt_type opt_type, const char *str) { return (tlv_add(msg, opt_type, strlen(str), str)); } int tlv_add_msg_seq_number(struct dynar *msg, uint32_t msg_seq_number) { return (tlv_add_u32(msg, TLV_OPT_MSG_SEQ_NUMBER, msg_seq_number)); } int tlv_add_cluster_name(struct dynar *msg, const char *cluster_name) { return (tlv_add_string(msg, TLV_OPT_CLUSTER_NAME, cluster_name)); } int tlv_add_tls_supported(struct dynar *msg, enum tlv_tls_supported tls_supported) { return (tlv_add_u8(msg, TLV_OPT_TLS_SUPPORTED, tls_supported)); } int tlv_add_tls_client_cert_required(struct dynar *msg, int tls_client_cert_required) { return (tlv_add_u8(msg, TLV_OPT_TLS_CLIENT_CERT_REQUIRED, tls_client_cert_required)); } int tlv_add_u16_array(struct dynar *msg, enum tlv_opt_type opt_type, const uint16_t *array, size_t array_size) { size_t i; uint16_t *nu16a; uint16_t opt_len; int res; nu16a = malloc(sizeof(uint16_t) * array_size); if (nu16a == NULL) { return (-1); } for (i = 0; i < array_size; i++) { nu16a[i] = htons(array[i]); } opt_len = sizeof(uint16_t) * array_size; res = tlv_add(msg, opt_type, opt_len, nu16a); free(nu16a); return (res); } int tlv_add_supported_options(struct dynar *msg, const enum tlv_opt_type *supported_options, size_t no_supported_options) { uint16_t *u16a; size_t i; int res; u16a = malloc(sizeof(*u16a) * no_supported_options); if (u16a == NULL) { return (-1); } for (i = 0; i < no_supported_options; i++) { u16a[i] = (uint16_t)supported_options[i]; } res = (tlv_add_u16_array(msg, TLV_OPT_SUPPORTED_OPTIONS, u16a, no_supported_options)); free(u16a); return (res); } int tlv_add_supported_decision_algorithms(struct dynar *msg, const enum tlv_decision_algorithm_type *supported_algorithms, size_t no_supported_algorithms) { uint16_t *u16a; size_t i; int res; u16a = malloc(sizeof(*u16a) * no_supported_algorithms); if (u16a == NULL) { return (-1); } for (i = 0; i < no_supported_algorithms; i++) { u16a[i] = (uint16_t)supported_algorithms[i]; } res = (tlv_add_u16_array(msg, TLV_OPT_SUPPORTED_DECISION_ALGORITHMS, u16a, no_supported_algorithms)); free(u16a); return (res); } int tlv_add_reply_error_code(struct dynar *msg, enum tlv_reply_error_code error_code) { return (tlv_add_u16(msg, TLV_OPT_REPLY_ERROR_CODE, (uint16_t)error_code)); } int tlv_add_server_maximum_request_size(struct dynar *msg, size_t server_maximum_request_size) { return (tlv_add_u32(msg, TLV_OPT_SERVER_MAXIMUM_REQUEST_SIZE, server_maximum_request_size)); } int tlv_add_server_maximum_reply_size(struct dynar *msg, size_t server_maximum_reply_size) { return (tlv_add_u32(msg, TLV_OPT_SERVER_MAXIMUM_REPLY_SIZE, server_maximum_reply_size)); } int tlv_add_node_id(struct dynar *msg, uint32_t node_id) { return (tlv_add_u32(msg, TLV_OPT_NODE_ID, node_id)); } int tlv_add_decision_algorithm(struct dynar *msg, enum tlv_decision_algorithm_type decision_algorithm) { return (tlv_add_u16(msg, TLV_OPT_DECISION_ALGORITHM, (uint16_t)decision_algorithm)); } int tlv_add_heartbeat_interval(struct dynar *msg, uint32_t heartbeat_interval) { return (tlv_add_u32(msg, TLV_OPT_HEARTBEAT_INTERVAL, heartbeat_interval)); } int tlv_add_ring_id(struct dynar *msg, const struct tlv_ring_id *ring_id) { uint64_t nu64; uint32_t nu32; char tmp_buf[12]; nu32 = htonl(ring_id->node_id); nu64 = htobe64(ring_id->seq); memcpy(tmp_buf, &nu32, sizeof(nu32)); memcpy(tmp_buf + sizeof(nu32), &nu64, sizeof(nu64)); return (tlv_add(msg, TLV_OPT_RING_ID, sizeof(tmp_buf), tmp_buf)); } int tlv_add_tie_breaker(struct dynar *msg, const struct tlv_tie_breaker *tie_breaker) { uint32_t nu32; uint8_t u8; char tmp_buf[5]; u8 = tie_breaker->mode; nu32 = (tie_breaker->mode == TLV_TIE_BREAKER_MODE_NODE_ID ? htonl(tie_breaker->node_id) : 0); memcpy(tmp_buf, &u8, sizeof(u8)); memcpy(tmp_buf + sizeof(u8), &nu32, sizeof(nu32)); return (tlv_add(msg, TLV_OPT_TIE_BREAKER, sizeof(tmp_buf), tmp_buf)); } int tlv_add_config_version(struct dynar *msg, uint64_t config_version) { return (tlv_add_u64(msg, TLV_OPT_CONFIG_VERSION, config_version)); } int tlv_add_data_center_id(struct dynar *msg, uint32_t data_center_id) { return (tlv_add_u32(msg, TLV_OPT_DATA_CENTER_ID, data_center_id)); } int tlv_add_node_state(struct dynar *msg, enum tlv_node_state node_state) { return (tlv_add_u8(msg, TLV_OPT_NODE_STATE, node_state)); } int tlv_add_node_info(struct dynar *msg, const struct tlv_node_info *node_info) { struct dynar opt_value; int res; res = 0; /* * Create sub message, */ dynar_init(&opt_value, 1024); if ((res = tlv_add_node_id(&opt_value, node_info->node_id)) != 0) { goto exit_dynar_destroy; } if (node_info->data_center_id != 0) { if ((res = tlv_add_data_center_id(&opt_value, node_info->data_center_id)) != 0) { goto exit_dynar_destroy; } } if (node_info->node_state != TLV_NODE_STATE_NOT_SET) { if ((res = tlv_add_node_state(&opt_value, node_info->node_state)) != 0) { goto exit_dynar_destroy; } } res = tlv_add(msg, TLV_OPT_NODE_INFO, dynar_size(&opt_value), dynar_data(&opt_value)); if (res != 0) { goto exit_dynar_destroy; } exit_dynar_destroy: dynar_destroy(&opt_value); return (res); } int tlv_add_node_list_type(struct dynar *msg, enum tlv_node_list_type node_list_type) { return (tlv_add_u8(msg, TLV_OPT_NODE_LIST_TYPE, node_list_type)); } int tlv_add_vote(struct dynar *msg, enum tlv_vote vote) { return (tlv_add_u8(msg, TLV_OPT_VOTE, vote)); } int tlv_add_quorate(struct dynar *msg, enum tlv_quorate quorate) { return (tlv_add_u8(msg, TLV_OPT_QUORATE, quorate)); } void tlv_iter_init_str(const char *msg, size_t msg_len, size_t msg_header_len, struct tlv_iterator *tlv_iter) { tlv_iter->msg = msg; tlv_iter->msg_len = msg_len; tlv_iter->current_pos = 0; tlv_iter->msg_header_len = msg_header_len; tlv_iter->iter_next_called = 0; } void tlv_iter_init(const struct dynar *msg, size_t msg_header_len, struct tlv_iterator *tlv_iter) { tlv_iter_init_str(dynar_data(msg), dynar_size(msg), msg_header_len, tlv_iter); } enum tlv_opt_type tlv_iter_get_type(const struct tlv_iterator *tlv_iter) { uint16_t ntype; uint16_t type; memcpy(&ntype, tlv_iter->msg + tlv_iter->current_pos, sizeof(ntype)); type = ntohs(ntype); return (type); } uint16_t tlv_iter_get_len(const struct tlv_iterator *tlv_iter) { uint16_t nlen; uint16_t len; memcpy(&nlen, tlv_iter->msg + tlv_iter->current_pos + TLV_TYPE_LENGTH, sizeof(nlen)); len = ntohs(nlen); return (len); } const char * tlv_iter_get_data(const struct tlv_iterator *tlv_iter) { return (tlv_iter->msg + tlv_iter->current_pos + TLV_TYPE_LENGTH + TLV_LENGTH_LENGTH); } int tlv_iter_next(struct tlv_iterator *tlv_iter) { uint16_t len; if (tlv_iter->iter_next_called == 0) { tlv_iter->iter_next_called = 1; tlv_iter->current_pos = tlv_iter->msg_header_len; goto check_tlv_validity; } len = tlv_iter_get_len(tlv_iter); if (tlv_iter->current_pos + TLV_TYPE_LENGTH + TLV_LENGTH_LENGTH + len >= tlv_iter->msg_len) { return (0); } tlv_iter->current_pos += TLV_TYPE_LENGTH + TLV_LENGTH_LENGTH + len; check_tlv_validity: /* * Check if tlv is valid = is not larger than whole message */ len = tlv_iter_get_len(tlv_iter); if (tlv_iter->current_pos + TLV_TYPE_LENGTH + TLV_LENGTH_LENGTH + len > tlv_iter->msg_len) { return (-1); } return (1); } int tlv_iter_decode_u32(struct tlv_iterator *tlv_iter, uint32_t *res) { const char *opt_data; uint16_t opt_len; uint32_t nu32; opt_len = tlv_iter_get_len(tlv_iter); opt_data = tlv_iter_get_data(tlv_iter); if (opt_len != sizeof(nu32)) { return (-1); } memcpy(&nu32, opt_data, sizeof(nu32)); *res = ntohl(nu32); return (0); } int tlv_iter_decode_u8(struct tlv_iterator *tlv_iter, uint8_t *res) { const char *opt_data; uint16_t opt_len; opt_len = tlv_iter_get_len(tlv_iter); opt_data = tlv_iter_get_data(tlv_iter); if (opt_len != sizeof(*res)) { return (-1); } memcpy(res, opt_data, sizeof(*res)); return (0); } int tlv_iter_decode_client_cert_required(struct tlv_iterator *tlv_iter, uint8_t *client_cert_required) { return (tlv_iter_decode_u8(tlv_iter, client_cert_required)); } int tlv_iter_decode_str(struct tlv_iterator *tlv_iter, char **str, size_t *str_len) { const char *opt_data; uint16_t opt_len; char *tmp_str; opt_len = tlv_iter_get_len(tlv_iter); opt_data = tlv_iter_get_data(tlv_iter); tmp_str = malloc(opt_len + 1); if (tmp_str == NULL) { return (-1); } memcpy(tmp_str, opt_data, opt_len); tmp_str[opt_len] = '\0'; *str = tmp_str; *str_len = opt_len; return (0); } int tlv_iter_decode_u16_array(struct tlv_iterator *tlv_iter, uint16_t **u16a, size_t *no_items) { uint16_t opt_len; uint16_t *u16a_res; size_t i; opt_len = tlv_iter_get_len(tlv_iter); if (opt_len % sizeof(uint16_t) != 0) { return (-1); } *no_items = opt_len / sizeof(uint16_t); u16a_res = malloc(sizeof(uint16_t) * *no_items); if (u16a_res == NULL) { return (-2); } memcpy(u16a_res, tlv_iter_get_data(tlv_iter), opt_len); for (i = 0; i < *no_items; i++) { u16a_res[i] = ntohs(u16a_res[i]); } *u16a = u16a_res; return (0); } int tlv_iter_decode_supported_options(struct tlv_iterator *tlv_iter, enum tlv_opt_type **supported_options, size_t *no_supported_options) { uint16_t *u16a; enum tlv_opt_type *tlv_opt_array; size_t i; int res; res = tlv_iter_decode_u16_array(tlv_iter, &u16a, no_supported_options); if (res != 0) { return (res); } tlv_opt_array = malloc(sizeof(enum tlv_opt_type) * *no_supported_options); if (tlv_opt_array == NULL) { free(u16a); return (-2); } for (i = 0; i < *no_supported_options; i++) { tlv_opt_array[i] = (enum tlv_opt_type)u16a[i]; } free(u16a); *supported_options = tlv_opt_array; return (0); } int tlv_iter_decode_supported_decision_algorithms(struct tlv_iterator *tlv_iter, enum tlv_decision_algorithm_type **supported_decision_algorithms, size_t *no_supported_decision_algorithms) { uint16_t *u16a; enum tlv_decision_algorithm_type *tlv_decision_algorithm_type_array; size_t i; int res; res = tlv_iter_decode_u16_array(tlv_iter, &u16a, no_supported_decision_algorithms); if (res != 0) { return (res); } tlv_decision_algorithm_type_array = malloc( sizeof(enum tlv_decision_algorithm_type) * *no_supported_decision_algorithms); if (tlv_decision_algorithm_type_array == NULL) { free(u16a); return (-2); } for (i = 0; i < *no_supported_decision_algorithms; i++) { tlv_decision_algorithm_type_array[i] = (enum tlv_decision_algorithm_type)u16a[i]; } free(u16a); *supported_decision_algorithms = tlv_decision_algorithm_type_array; return (0); } int tlv_iter_decode_u16(struct tlv_iterator *tlv_iter, uint16_t *u16) { const char *opt_data; uint16_t opt_len; uint16_t nu16; opt_len = tlv_iter_get_len(tlv_iter); opt_data = tlv_iter_get_data(tlv_iter); if (opt_len != sizeof(nu16)) { return (-1); } memcpy(&nu16, opt_data, sizeof(nu16)); *u16 = ntohs(nu16); return (0); } int tlv_iter_decode_u64(struct tlv_iterator *tlv_iter, uint64_t *u64) { const char *opt_data; uint64_t opt_len; uint64_t nu64; opt_len = tlv_iter_get_len(tlv_iter); opt_data = tlv_iter_get_data(tlv_iter); if (opt_len != sizeof(nu64)) { return (-1); } memcpy(&nu64, opt_data, sizeof(nu64)); *u64 = be64toh(nu64); return (0); } int tlv_iter_decode_reply_error_code(struct tlv_iterator *tlv_iter, enum tlv_reply_error_code *reply_error_code) { return (tlv_iter_decode_u16(tlv_iter, (uint16_t *)reply_error_code)); } int tlv_iter_decode_tls_supported(struct tlv_iterator *tlv_iter, enum tlv_tls_supported *tls_supported) { uint8_t u8; enum tlv_tls_supported tmp_tls_supported; if (tlv_iter_decode_u8(tlv_iter, &u8) != 0) { return (-1); } tmp_tls_supported = u8; if (tmp_tls_supported != TLV_TLS_UNSUPPORTED && tmp_tls_supported != TLV_TLS_SUPPORTED && tmp_tls_supported != TLV_TLS_REQUIRED) { return (-4); } *tls_supported = tmp_tls_supported; return (0); } int tlv_iter_decode_decision_algorithm(struct tlv_iterator *tlv_iter, enum tlv_decision_algorithm_type *decision_algorithm) { uint16_t u16; if (tlv_iter_decode_u16(tlv_iter, &u16) != 0) { return (-1); } *decision_algorithm = (enum tlv_decision_algorithm_type)u16; return (0); } int tlv_iter_decode_ring_id(struct tlv_iterator *tlv_iter, struct tlv_ring_id *ring_id) { const char *opt_data; uint16_t opt_len; uint32_t nu32; uint64_t nu64; char tmp_buf[12]; opt_len = tlv_iter_get_len(tlv_iter); opt_data = tlv_iter_get_data(tlv_iter); if (opt_len != sizeof(tmp_buf)) { return (-1); } memcpy(&nu32, opt_data, sizeof(nu32)); memcpy(&nu64, opt_data + sizeof(nu32), sizeof(nu64)); ring_id->node_id = ntohl(nu32); ring_id->seq = be64toh(nu64); return (0); } int tlv_iter_decode_tie_breaker(struct tlv_iterator *tlv_iter, struct tlv_tie_breaker *tie_breaker) { const char *opt_data; uint16_t opt_len; uint32_t nu32; uint8_t u8; enum tlv_tie_breaker_mode tie_breaker_mode; char tmp_buf[5]; opt_len = tlv_iter_get_len(tlv_iter); opt_data = tlv_iter_get_data(tlv_iter); if (opt_len != sizeof(tmp_buf)) { return (-1); } memcpy(&u8, opt_data, sizeof(u8)); tie_breaker_mode = u8; if (tie_breaker_mode != TLV_TIE_BREAKER_MODE_LOWEST && tie_breaker_mode != TLV_TIE_BREAKER_MODE_HIGHEST && tie_breaker_mode != TLV_TIE_BREAKER_MODE_NODE_ID) { return (-4); } memcpy(&nu32, opt_data + sizeof(u8), sizeof(nu32)); tie_breaker->mode = tie_breaker_mode; tie_breaker->node_id = (tie_breaker->mode == TLV_TIE_BREAKER_MODE_NODE_ID ? ntohl(nu32) : 0); return (0); } int tlv_iter_decode_node_state(struct tlv_iterator *tlv_iter, enum tlv_node_state *node_state) { uint8_t u8; enum tlv_node_state tmp_node_state; if (tlv_iter_decode_u8(tlv_iter, &u8) != 0) { return (-1); } tmp_node_state = u8; if (tmp_node_state != TLV_NODE_STATE_MEMBER && tmp_node_state != TLV_NODE_STATE_DEAD && tmp_node_state != TLV_NODE_STATE_LEAVING) { return (-4); } *node_state = tmp_node_state; return (0); } int tlv_iter_decode_node_info(struct tlv_iterator *tlv_iter, struct tlv_node_info *node_info) { struct tlv_iterator data_tlv_iter; int iter_res; int res; enum tlv_opt_type opt_type; struct tlv_node_info tmp_node_info; memset(&tmp_node_info, 0, sizeof(tmp_node_info)); tlv_iter_init_str(tlv_iter_get_data(tlv_iter), tlv_iter_get_len(tlv_iter), 0, &data_tlv_iter); while ((iter_res = tlv_iter_next(&data_tlv_iter)) > 0) { opt_type = tlv_iter_get_type(&data_tlv_iter); switch (opt_type) { case TLV_OPT_NODE_ID: if ((res = tlv_iter_decode_u32(&data_tlv_iter, &tmp_node_info.node_id)) != 0) { return (res); } break; case TLV_OPT_DATA_CENTER_ID: if ((res = tlv_iter_decode_u32(&data_tlv_iter, &tmp_node_info.data_center_id)) != 0) { return (res); } break; case TLV_OPT_NODE_STATE: if ((res = tlv_iter_decode_node_state(&data_tlv_iter, &tmp_node_info.node_state)) != 0) { return (res); } break; default: /* * Other options are not processed */ break; } } if (iter_res != 0) { return (-3); } if (tmp_node_info.node_id == 0) { return (-4); } memcpy(node_info, &tmp_node_info, sizeof(tmp_node_info)); return (0); } int tlv_iter_decode_node_list_type(struct tlv_iterator *tlv_iter, enum tlv_node_list_type *node_list_type) { uint8_t u8; enum tlv_node_list_type tmp_node_list_type; if (tlv_iter_decode_u8(tlv_iter, &u8) != 0) { return (-1); } tmp_node_list_type = u8; if (tmp_node_list_type != TLV_NODE_LIST_TYPE_INITIAL_CONFIG && tmp_node_list_type != TLV_NODE_LIST_TYPE_CHANGED_CONFIG && tmp_node_list_type != TLV_NODE_LIST_TYPE_MEMBERSHIP && tmp_node_list_type != TLV_NODE_LIST_TYPE_QUORUM) { return (-4); } *node_list_type = tmp_node_list_type; return (0); } int tlv_iter_decode_vote(struct tlv_iterator *tlv_iter, enum tlv_vote *vote) { uint8_t u8; enum tlv_vote tmp_vote; if (tlv_iter_decode_u8(tlv_iter, &u8) != 0) { return (-1); } tmp_vote = u8; if (tmp_vote != TLV_VOTE_ACK && tmp_vote != TLV_VOTE_NACK && tmp_vote != TLV_VOTE_ASK_LATER && tmp_vote != TLV_VOTE_WAIT_FOR_REPLY && tmp_vote != TLV_VOTE_NO_CHANGE) { return (-4); } *vote = tmp_vote; return (0); } int tlv_iter_decode_quorate(struct tlv_iterator *tlv_iter, enum tlv_quorate *quorate) { uint8_t u8; enum tlv_quorate tmp_quorate; if (tlv_iter_decode_u8(tlv_iter, &u8) != 0) { return (-1); } tmp_quorate = u8; if (tmp_quorate != TLV_QUORATE_QUORATE && tmp_quorate != TLV_QUORATE_INQUORATE) { return (-4); } *quorate = tmp_quorate; return (0); } void tlv_get_supported_options(enum tlv_opt_type **supported_options, size_t *no_supported_options) { *supported_options = tlv_static_supported_options; *no_supported_options = TLV_STATIC_SUPPORTED_OPTIONS_SIZE; } int tlv_ring_id_eq(const struct tlv_ring_id *rid1, const struct tlv_ring_id *rid2) { return (rid1->node_id == rid2->node_id && rid1->seq == rid2->seq); } int tlv_tie_breaker_eq(const struct tlv_tie_breaker *tb1, const struct tlv_tie_breaker *tb2) { if (tb1->mode == tb2->mode && tb1->mode == TLV_TIE_BREAKER_MODE_NODE_ID) { return (tb1->node_id == tb2->node_id); } return (tb1->mode == tb2->mode); } const char * tlv_vote_to_str(enum tlv_vote vote) { switch (vote) { case TLV_VOTE_UNDEFINED: break; case TLV_VOTE_ACK: return ("ACK"); break; case TLV_VOTE_NACK: return ("NACK"); break; case TLV_VOTE_ASK_LATER: return ("Ask later"); break; case TLV_VOTE_WAIT_FOR_REPLY: return ("Wait for reply"); break; case TLV_VOTE_NO_CHANGE: return ("No change"); break; } return ("Unknown vote value"); } const char * tlv_node_state_to_str(enum tlv_node_state state) { switch (state) { case TLV_NODE_STATE_NOT_SET: return ("not set"); break; case TLV_NODE_STATE_MEMBER: return ("member"); break; case TLV_NODE_STATE_DEAD: return ("dead"); break; case TLV_NODE_STATE_LEAVING: return ("leaving"); break; } return ("Unhandled node state"); } const char * tlv_tls_supported_to_str(enum tlv_tls_supported tls_supported) { switch (tls_supported) { case TLV_TLS_UNSUPPORTED: return ("Unsupported"); break; case TLV_TLS_SUPPORTED: return ("Supported"); break; case TLV_TLS_REQUIRED: return ("Required"); break; } return ("Unhandled tls supported state"); } const char * tlv_decision_algorithm_type_to_str(enum tlv_decision_algorithm_type algorithm) { switch (algorithm) { case TLV_DECISION_ALGORITHM_TYPE_TEST: return ("Test"); break; case TLV_DECISION_ALGORITHM_TYPE_FFSPLIT: return ("Fifty-Fifty split"); break; case TLV_DECISION_ALGORITHM_TYPE_2NODELMS: return ("2 Node LMS"); break; case TLV_DECISION_ALGORITHM_TYPE_LMS: return ("LMS"); break; } return ("Unknown algorithm"); } diff --git a/test/cpgbench.c b/test/cpgbench.c index 23fc2b0a..059effe2 100644 --- a/test/cpgbench.c +++ b/test/cpgbench.c @@ -1,198 +1,197 @@ /* * Copyright (c) 2006, 2009 Red Hat, Inc. * * All rights reserved. * * Author: Steven Dake (sdake@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include -#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include static cpg_handle_t handle; static pthread_t thread; #ifndef timersub #define timersub(a, b, result) \ do { \ (result)->tv_sec = (a)->tv_sec - (b)->tv_sec; \ (result)->tv_usec = (a)->tv_usec - (b)->tv_usec; \ if ((result)->tv_usec < 0) { \ --(result)->tv_sec; \ (result)->tv_usec += 1000000; \ } \ } while (0) #endif /* timersub */ static int alarm_notice; static void cpg_bm_confchg_fn ( cpg_handle_t handle_in, const struct cpg_name *group_name, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries) { } static unsigned int write_count; static void cpg_bm_deliver_fn ( cpg_handle_t handle_in, const struct cpg_name *group_name, uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len) { write_count++; } static cpg_callbacks_t callbacks = { .cpg_deliver_fn = cpg_bm_deliver_fn, .cpg_confchg_fn = cpg_bm_confchg_fn }; #define ONE_MEG 1048576 static char data[ONE_MEG]; static void cpg_benchmark ( cpg_handle_t handle_in, int write_size) { struct timeval tv1, tv2, tv_elapsed; struct iovec iov; unsigned int res; alarm_notice = 0; iov.iov_base = data; iov.iov_len = write_size; write_count = 0; alarm (10); gettimeofday (&tv1, NULL); do { res = cpg_mcast_joined (handle_in, CPG_TYPE_AGREED, &iov, 1); } while (alarm_notice == 0 && (res == CS_OK || res == CS_ERR_TRY_AGAIN)); gettimeofday (&tv2, NULL); timersub (&tv2, &tv1, &tv_elapsed); printf ("%5d messages received ", write_count); printf ("%5d bytes per write ", write_size); printf ("%7.3f Seconds runtime ", (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0))); printf ("%9.3f TP/s ", ((float)write_count) / (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0))); printf ("%7.3f MB/s.\n", ((float)write_count) * ((float)write_size) / ((tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)) * 1000000.0)); } static void sigalrm_handler (int num) { alarm_notice = 1; } static struct cpg_name group_name = { .value = "cpg_bm", .length = 6 }; static void* dispatch_thread (void *arg) { cpg_dispatch (handle, CS_DISPATCH_BLOCKING); return NULL; } int main (void) { unsigned int size; int i; unsigned int res; qb_log_init("cpgbench", LOG_USER, LOG_EMERG); qb_log_ctl(QB_LOG_SYSLOG, QB_LOG_CONF_ENABLED, QB_FALSE); qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD, QB_LOG_FILTER_FILE, "*", LOG_DEBUG); qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE); size = 64; signal (SIGALRM, sigalrm_handler); res = cpg_initialize (&handle, &callbacks); if (res != CS_OK) { printf ("cpg_initialize failed with result %d\n", res); exit (1); } pthread_create (&thread, NULL, dispatch_thread, NULL); res = cpg_join (handle, &group_name); if (res != CS_OK) { printf ("cpg_join failed with result %d\n", res); exit (1); } for (i = 0; i < 10; i++) { /* number of repetitions - up to 50k */ cpg_benchmark (handle, size); signal (SIGALRM, sigalrm_handler); size *= 5; if (size >= (ONE_MEG - 100)) { break; } } res = cpg_finalize (handle); if (res != CS_OK) { printf ("cpg_finalize failed with result %d\n", res); exit (1); } return (0); } diff --git a/test/cpgbenchzc.c b/test/cpgbenchzc.c index db78289a..92f55ee9 100644 --- a/test/cpgbenchzc.c +++ b/test/cpgbenchzc.c @@ -1,195 +1,193 @@ /* * Copyright (c) 2006, 2009 Red Hat, Inc. * * All rights reserved. * * Author: Steven Dake (sdake@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include -#include #include #include #include #include #include #include -#include #include #include #include #include #include "../lib/util.h" #ifndef timersub #define timersub(a, b, result) \ do { \ (result)->tv_sec = (a)->tv_sec - (b)->tv_sec; \ (result)->tv_usec = (a)->tv_usec - (b)->tv_usec; \ if ((result)->tv_usec < 0) { \ --(result)->tv_sec; \ (result)->tv_usec += 1000000; \ } \ } while (0) #endif static int alarm_notice; static void cpg_bm_confchg_fn ( cpg_handle_t handle, const struct cpg_name *group_name, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries) { } static unsigned int write_count; static void cpg_bm_deliver_fn ( cpg_handle_t handle, const struct cpg_name *group_name, uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len) { write_count++; } static cpg_callbacks_t callbacks = { .cpg_deliver_fn = cpg_bm_deliver_fn, .cpg_confchg_fn = cpg_bm_confchg_fn }; void *data; static void cpg_benchmark ( cpg_handle_t handle, int write_size) { struct timeval tv1, tv2, tv_elapsed; unsigned int res; cpg_flow_control_state_t flow_control_state; alarm_notice = 0; write_count = 0; alarm (10); gettimeofday (&tv1, NULL); do { /* * Test checkpoint write */ cpg_flow_control_state_get (handle, &flow_control_state); if (flow_control_state == CPG_FLOW_CONTROL_DISABLED) { retry: res = cpg_zcb_mcast_joined (handle, CPG_TYPE_AGREED, data, write_size); if (res == CS_ERR_TRY_AGAIN) { goto retry; } } res = cpg_dispatch (handle, CS_DISPATCH_ALL); if (res != CS_OK) { printf ("cpg dispatch returned error %d\n", res); exit (1); } } while (alarm_notice == 0); gettimeofday (&tv2, NULL); timersub (&tv2, &tv1, &tv_elapsed); printf ("%5d messages received ", write_count); printf ("%5d bytes per write ", write_size); printf ("%7.3f Seconds runtime ", (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0))); printf ("%9.3f TP/s ", ((float)write_count) / (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0))); printf ("%7.3f MB/s.\n", ((float)write_count) * ((float)write_size) / ((tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)) * 1000000.0)); } static void sigalrm_handler (int num) { alarm_notice = 1; } static struct cpg_name group_name = { .value = "cpg_bm", .length = 6 }; int main (void) { cpg_handle_t handle; unsigned int size; int i; unsigned int res; size = 1000; signal (SIGALRM, sigalrm_handler); res = cpg_initialize (&handle, &callbacks); if (res != CS_OK) { printf ("cpg_initialize failed with result %d\n", res); exit (1); } cpg_zcb_alloc (handle, 500000, &data); if (res != CS_OK) { printf ("cpg_zcb_alloc couldn't allocate zero copy buffer %d\n", res); exit (1); } res = cpg_join (handle, &group_name); if (res != CS_OK) { printf ("cpg_join failed with result %s\n", cs_strerror(res)); exit (1); } for (i = 0; i < 50; i++) { /* number of repetitions - up to 50k */ cpg_benchmark (handle, size); size += 1000; } res = cpg_finalize (handle); if (res != CS_OK) { printf ("cpg_finalize failed with result %s\n", cs_strerror(res)); exit (1); } return (0); } diff --git a/test/cpghum.c b/test/cpghum.c index 5772f818..006bf45e 100644 --- a/test/cpghum.c +++ b/test/cpghum.c @@ -1,437 +1,436 @@ /* * Copyright (c) 2015 Red Hat, Inc. * * All rights reserved. * * Author: Christine Caulfield * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include -#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include static cpg_handle_t handle; static pthread_t thread; #ifndef timersub #define timersub(a, b, result) \ do { \ (result)->tv_sec = (a)->tv_sec - (b)->tv_sec; \ (result)->tv_usec = (a)->tv_usec - (b)->tv_usec; \ if ((result)->tv_usec < 0) { \ --(result)->tv_sec; \ (result)->tv_usec += 1000000; \ } \ } while (0) #endif /* timersub */ static int alarm_notice; #define ONE_MEG 1048576 #define DATASIZE (ONE_MEG*20) static char data[DATASIZE]; static int send_counter = 0; static int do_syslog = 0; static int quiet = 0; static volatile int stopped; // stats static unsigned int length_errors=0; static unsigned int crc_errors=0; static unsigned int sequence_errors=0; static unsigned int packets_sent=0; static unsigned int packets_recvd=0; static unsigned int send_retries=0; static unsigned int send_fails=0; static void cpg_bm_confchg_fn ( cpg_handle_t handle_in, const struct cpg_name *group_name, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries) { } static unsigned int g_recv_count; static unsigned int g_recv_length; static unsigned int g_write_size; static int g_recv_counter = 0; static void cpg_bm_deliver_fn ( cpg_handle_t handle_in, const struct cpg_name *group_name, uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len) { int *value = msg; uLong crc=0; uLong recv_crc = value[1] & 0xFFFFFFFF; packets_recvd++; g_recv_length = msg_len; // Basic check, packets should all be the right size if (g_write_size && (msg_len != g_write_size)) { length_errors++; fprintf(stderr, "%s: message sizes don't match. got %lu, expected %u\n", group_name->value, msg_len, g_write_size); if (do_syslog) { syslog(LOG_ERR, "%s: message sizes don't match. got %lu, expected %u\n", group_name->value, msg_len, g_write_size); } } // Sequence counters are incrementing in step? if (*value != g_recv_counter) { sequence_errors++; fprintf(stderr, "%s: counters don't match. got %d, expected %d\n", group_name->value, *value, g_recv_counter); if (do_syslog) { syslog(LOG_ERR, "%s: counters don't match. got %d, expected %d\n", group_name->value, *value, g_recv_counter); } // Catch up or we'll be printing errors for ever g_recv_counter = *value +1; } else { g_recv_counter++; } // Check crc crc = crc32(0, NULL, 0); crc = crc32(crc, (Bytef *)&value[2], msg_len-sizeof(int)*2) & 0xFFFFFFFF; if (crc != recv_crc) { crc_errors++; fprintf(stderr, "%s: CRCs don't match. got %lx, expected %lx\n", group_name->value, recv_crc, crc); if (do_syslog) { syslog(LOG_ERR, "%s: CRCs don't match. got %lx, expected %lx\n", group_name->value, recv_crc, crc); } } g_recv_count++; } static cpg_model_v1_data_t model1_data = { .cpg_deliver_fn = cpg_bm_deliver_fn, .cpg_confchg_fn = cpg_bm_confchg_fn, }; static cpg_callbacks_t callbacks = { .cpg_deliver_fn = cpg_bm_deliver_fn, .cpg_confchg_fn = cpg_bm_confchg_fn }; static struct cpg_name group_name = { .value = "cpghum", .length = 7 }; static void cpg_test ( cpg_handle_t handle_in, int write_size, int delay_time, int print_time) { struct timeval tv1, tv2, tv_elapsed; struct iovec iov; unsigned int res; int i; unsigned int *dataint = (unsigned int *)data; uLong crc; alarm_notice = 0; iov.iov_base = data; iov.iov_len = write_size; g_recv_count = 0; alarm (print_time); gettimeofday (&tv1, NULL); do { dataint[0] = send_counter++; for (i=2; i<(DATASIZE-sizeof(int)*2)/4; i++) { dataint[i] = rand(); } crc = crc32(0, NULL, 0); dataint[1] = crc32(crc, (Bytef*)&dataint[2], write_size-sizeof(int)*2); resend: res = cpg_mcast_joined (handle_in, CPG_TYPE_AGREED, &iov, 1); if (res == CS_ERR_TRY_AGAIN) { usleep(10000); send_retries++; goto resend; } if (res != CS_OK) { fprintf(stderr, "send failed: %d\n", res); send_fails++; } else { packets_sent++; } usleep(delay_time*1000); } while (alarm_notice == 0 && (res == CS_OK || res == CS_ERR_TRY_AGAIN) && stopped == 0); gettimeofday (&tv2, NULL); timersub (&tv2, &tv1, &tv_elapsed); if (!quiet) { printf ("%s: %5d message%s received, ", group_name.value, g_recv_count, g_recv_count==1?"":"s"); printf ("%5d bytes per write\n", write_size); } } static void sigalrm_handler (int num) { alarm_notice = 1; } static void sigint_handler (int num) { stopped = 1; } static void* dispatch_thread (void *arg) { cpg_dispatch (handle, CS_DISPATCH_BLOCKING); return NULL; } static void usage(char *cmd) { fprintf(stderr, "%s [OPTIONS]\n", cmd); fprintf(stderr, "\n"); fprintf(stderr, "%s sends CPG messages to all registered users of the CPG.\n", cmd); fprintf(stderr, "The messages have a sequence number and a CRC so that missing or\n"); fprintf(stderr, "corrupted messages will be detected and reported.\n"); fprintf(stderr, "\n"); fprintf(stderr, "%s can also be asked to simply listen for (and check) packets\n", cmd); fprintf(stderr, "so that there is another node in the cluster connected to the CPG.\n"); fprintf(stderr, "\n"); fprintf(stderr, "When -l is present, packet size is only checked if specified by -w or -W\n"); fprintf(stderr, "and it, obviously, must match that of the sender.\n"); fprintf(stderr, "\n"); fprintf(stderr, "Multiple copies, in different CPGs, can also be run on the same or\n"); fprintf(stderr, "different nodes by using the -n option.\n"); fprintf(stderr, "\n"); fprintf(stderr, "%s can't handle more than 1 sender in the same CPG as it messes with the\n", cmd); fprintf(stderr, "sequence numbers.\n"); fprintf(stderr, "\n"); fprintf(stderr, " -w Write size in Kbytes, default 4\n"); fprintf(stderr, " -W Write size in bytes, default 4096\n"); fprintf(stderr, " -n CPG name to use, default 'cpghum'\n"); fprintf(stderr, " -d Delay between sending packets (mS), default 1000\n"); fprintf(stderr, " -r Number of repetitions, default 100\n"); fprintf(stderr, " -p Delay between printing output(S), default 10s\n"); fprintf(stderr, " -l Listen and check CRCs only, don't send (^C to quit)\n"); fprintf(stderr, " -m cpg_initialise() model. Default 1.\n"); fprintf(stderr, " -s Also send errors to syslog (for daemon log correlation).\n"); fprintf(stderr, " -q Quiet. Don't print messages every 10 seconds (see also -p)\n"); fprintf(stderr, "\n"); } int main (int argc, char *argv[]) { int i; unsigned int res; uint32_t maxsize; int opt; int bs; int write_size = 4096; int delay_time = 1000; int repetitions = 100; int print_time = 10; int have_size = 0; int listen_only = 0; int model = 1; while ( (opt = getopt(argc, argv, "qlsn:d:r:p:m:w:W:")) != -1 ) { switch (opt) { case 'w': // Write size in K bs = atoi(optarg); if (bs > 0) { write_size = bs*1024; have_size = 1; } break; case 'W': // Write size in bytes bs = atoi(optarg); if (bs > 0) { write_size = bs; have_size = 1; } break; case 'n': if (strlen(optarg) >= CPG_MAX_NAME_LENGTH) { fprintf(stderr, "CPG name too long\n"); exit(1); } strcpy(group_name.value, optarg); group_name.length = strlen(group_name.value); break; case 'd': delay_time = atoi(optarg); break; case 'r': repetitions = atoi(optarg); break; case 'p': print_time = atoi(optarg); break; case 'l': listen_only = 1; break; case 's': do_syslog = 1; break; case 'q': quiet = 1; break; case 'm': model = atoi(optarg); if (model < 0 || model > 1) { fprintf(stderr, "%s: Model must be 0-1\n", argv[0]); exit(1); } break; case '?': usage(basename(argv[0])); exit(0); } } qb_log_init("cpghum", LOG_USER, LOG_EMERG); qb_log_ctl(QB_LOG_SYSLOG, QB_LOG_CONF_ENABLED, QB_FALSE); qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD, QB_LOG_FILTER_FILE, "*", LOG_DEBUG); qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE); g_write_size = write_size; signal (SIGALRM, sigalrm_handler); signal (SIGINT, sigint_handler); switch (model) { case 0: res = cpg_initialize (&handle, &callbacks); break; case 1: res = cpg_model_initialize (&handle, CPG_MODEL_V1, (cpg_model_data_t *)&model1_data, NULL); break; default: res=999; // can't get here but it keeps the compiler happy break; } if (res != CS_OK) { printf ("cpg_initialize failed with result %d\n", res); exit (1); } pthread_create (&thread, NULL, dispatch_thread, NULL); res = cpg_join (handle, &group_name); if (res != CS_OK) { printf ("cpg_join failed with result %d\n", res); exit (1); } if (listen_only) { int secs = 0; if (!quiet) { printf("-- Listening on CPG %s\n", group_name.value); printf("-- Ignore any starting \"counters don't match\" error while we catch up\n"); } /* Only check packet size if specified on the command-line */ if (!have_size) { g_write_size = 0; } while (!stopped) { sleep(1); if (++secs > print_time && !quiet) { printf ("%s: %5d message%s received. %d bytes\n", group_name.value, g_recv_count, g_recv_count==1?"":"s", g_recv_length); secs = 0; g_recv_count = 0; } } } else { cpg_max_atomic_msgsize_get (handle, &maxsize); if ( write_size > maxsize) { fprintf(stderr, "INFO: packet size (%d) is larger than the maximum atomic size (%d), libcpg will fragment\n", write_size, maxsize); } for (i = 0; i < repetitions && !stopped; i++) { cpg_test (handle, write_size, delay_time, print_time); signal (SIGALRM, sigalrm_handler); } } res = cpg_finalize (handle); if (res != CS_OK) { printf ("cpg_finalize failed with result %d\n", res); exit (1); } printf("\n"); printf("Stats:\n"); if (!listen_only) { printf(" packets sent: %d\n", packets_sent); printf(" send failures: %d\n", send_fails); printf(" send retries: %d\n", send_retries); } if (have_size) { printf(" length errors: %d\n", length_errors); } printf(" packets recvd: %d\n", packets_recvd); printf(" sequence errors: %d\n", sequence_errors); printf(" crc errors: %d\n", crc_errors); printf("\n"); return (0); } diff --git a/tools/corosync-notifyd.c b/tools/corosync-notifyd.c index bd16208d..2ba57c9c 100644 --- a/tools/corosync-notifyd.c +++ b/tools/corosync-notifyd.c @@ -1,1222 +1,1219 @@ /* * Copyright (c) 2011-2012 Red Hat, Inc. * * All rights reserved. * * Author: Angus Salkeld * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include -#include -#include - #include #include #include #include /* * generic declarations */ enum { CS_NTF_LOG, CS_NTF_STDOUT, CS_NTF_SNMP, CS_NTF_DBUS, CS_NTF_FG, CS_NTF_MAX, }; static int conf[CS_NTF_MAX]; static int32_t _cs_is_quorate = 0; typedef void (*node_membership_fn_t)(char *nodename, uint32_t nodeid, char *state, char* ip); typedef void (*node_quorum_fn_t)(char *nodename, uint32_t nodeid, const char *state); typedef void (*application_connection_fn_t)(char *nodename, uint32_t nodeid, char *app_name, const char *state); typedef void (*rrp_faulty_fn_t)(char *nodename, uint32_t nodeid, uint32_t iface_no, const char *state); struct notify_callbacks { node_membership_fn_t node_membership_fn; node_quorum_fn_t node_quorum_fn; application_connection_fn_t application_connection_fn; rrp_faulty_fn_t rrp_faulty_fn; }; #define MAX_NOTIFIERS 5 static int num_notifiers = 0; static struct notify_callbacks notifiers[MAX_NOTIFIERS]; static uint32_t local_nodeid = 0; static char local_nodename[CS_MAX_NAME_LENGTH]; static qb_loop_t *main_loop; static quorum_handle_t quorum_handle; static void _cs_node_membership_event(char *nodename, uint32_t nodeid, char *state, char* ip); static void _cs_node_quorum_event(const char *state); static void _cs_application_connection_event(char *app_name, const char *state); static void _cs_rrp_faulty_event(uint32_t iface_no, const char *state); #ifdef HAVE_DBUS #include /* * dbus */ #define DBUS_CS_NAME "org.corosync" #define DBUS_CS_IFACE "org.corosync" #define DBUS_CS_PATH "/org/corosync" static DBusConnection *db = NULL; static char _err[512]; static int err_set = 0; static void _cs_dbus_init(void); #endif /* HAVE_DBUS */ #ifdef ENABLE_SNMP #include #include #include #include #include #include #include enum snmp_node_status { SNMP_NODE_STATUS_UNKNOWN = 0, SNMP_NODE_STATUS_JOINED = 1, SNMP_NODE_STATUS_LEFT = 2 }; #define SNMP_OID_COROSYNC "1.3.6.1.4.1.35488" #define SNMP_OID_OBJECT_ROOT SNMP_OID_COROSYNC ".1" #define SNMP_OID_OBJECT_NODE_NAME SNMP_OID_OBJECT_ROOT ".1" #define SNMP_OID_OBJECT_NODE_ID SNMP_OID_OBJECT_ROOT ".2" #define SNMP_OID_OBJECT_NODE_STATUS SNMP_OID_OBJECT_ROOT ".3" #define SNMP_OID_OBJECT_NODE_ADDR SNMP_OID_OBJECT_ROOT ".4" #define SNMP_OID_OBJECT_RINGSEQ SNMP_OID_OBJECT_ROOT ".20" #define SNMP_OID_OBJECT_QUORUM SNMP_OID_OBJECT_ROOT ".21" #define SNMP_OID_OBJECT_APP_NAME SNMP_OID_OBJECT_ROOT ".40" #define SNMP_OID_OBJECT_APP_STATUS SNMP_OID_OBJECT_ROOT ".41" #define SNMP_OID_OBJECT_RRP_IFACE_NO SNMP_OID_OBJECT_ROOT ".60" #define SNMP_OID_OBJECT_RRP_STATUS SNMP_OID_OBJECT_ROOT ".61" #define SNMP_OID_TRAPS_ROOT SNMP_OID_COROSYNC ".0" #define SNMP_OID_TRAPS_NODE SNMP_OID_TRAPS_ROOT ".1" #define SNMP_OID_TRAPS_QUORUM SNMP_OID_TRAPS_ROOT ".2" #define SNMP_OID_TRAPS_APP SNMP_OID_TRAPS_ROOT ".3" #define SNMP_OID_TRAPS_RRP SNMP_OID_TRAPS_ROOT ".4" #define CS_TIMESTAMP_STR_LEN 20 static const char *local_host = "localhost"; #endif /* ENABLE_SNMP */ static char snmp_manager_buf[CS_MAX_NAME_LENGTH]; static char *snmp_manager = NULL; #define CMAP_MAX_RETRIES 10 /* * cmap */ static cmap_handle_t cmap_handle; static int32_t _cs_ip_to_hostname(char* ip, char* name_out) { struct sockaddr_in sa; int rc; if (strchr(ip, ':') == NULL) { sa.sin_family = AF_INET; } else { sa.sin_family = AF_INET6; } rc = inet_pton(sa.sin_family, ip, &sa.sin_addr); if (rc == 0) { return -EINVAL; } rc = getnameinfo((struct sockaddr*)&sa, sizeof(sa), name_out, CS_MAX_NAME_LENGTH, NULL, 0, 0); if (rc != 0) { qb_log(LOG_ERR, 0, "error looking up %s : %s", ip, gai_strerror(rc)); return -EINVAL; } return 0; } static void _cs_cmap_members_key_changed ( cmap_handle_t cmap_handle_c, cmap_track_handle_t cmap_track_handle, int32_t event, const char *key_name, struct cmap_notify_value new_value, struct cmap_notify_value old_value, void *user_data) { char nodename[CS_MAX_NAME_LENGTH]; char* open_bracket = NULL; char* close_bracket = NULL; int res; uint32_t nodeid; char *ip_str; char tmp_key[CMAP_KEYNAME_MAXLEN]; cs_error_t err; int no_retries; if (event != CMAP_TRACK_ADD && event != CMAP_TRACK_MODIFY) { return ; } res = sscanf(key_name, "runtime.totem.pg.mrp.srp.members.%u.%s", &nodeid, tmp_key); if (res != 2) return ; if (strcmp(tmp_key, "status") != 0) { return ; } snprintf(tmp_key, CMAP_KEYNAME_MAXLEN, "runtime.totem.pg.mrp.srp.members.%u.ip", nodeid); no_retries = 0; while ((err = cmap_get_string(cmap_handle, tmp_key, &ip_str)) == CS_ERR_TRY_AGAIN && no_retries++ < CMAP_MAX_RETRIES) { sleep(1); } if (err != CS_OK) { return ; } /* * We want the ip out of: "r(0) ip(192.168.100.92)" */ open_bracket = strrchr(ip_str, '('); open_bracket++; close_bracket = strrchr(open_bracket, ')'); *close_bracket = '\0'; _cs_ip_to_hostname(open_bracket, nodename); _cs_node_membership_event(nodename, nodeid, (char *)new_value.data, open_bracket); free(ip_str); } static void _cs_cmap_connections_key_changed ( cmap_handle_t cmap_handle_c, cmap_track_handle_t cmap_track_handle, int32_t event, const char *key_name, struct cmap_notify_value new_value, struct cmap_notify_value old_value, void *user_data) { char obj_name[CS_MAX_NAME_LENGTH]; char conn_str[CMAP_KEYNAME_MAXLEN]; char tmp_key[CMAP_KEYNAME_MAXLEN]; int res; res = sscanf(key_name, "runtime.connections.%[^.].%s", conn_str, tmp_key); if (res != 2) { return ; } if (strcmp(tmp_key, "service_id") != 0) { return ; } snprintf(obj_name, CS_MAX_NAME_LENGTH, "%s", conn_str); if (event == CMAP_TRACK_ADD) { _cs_application_connection_event(obj_name, "connected"); } if (event == CMAP_TRACK_DELETE) { _cs_application_connection_event(obj_name, "disconnected"); } } static void _cs_cmap_rrp_faulty_key_changed ( cmap_handle_t cmap_handle_c, cmap_track_handle_t cmap_track_handle, int32_t event, const char *key_name, struct cmap_notify_value new_value, struct cmap_notify_value old_value, void *user_data) { uint32_t iface_no; char tmp_key[CMAP_KEYNAME_MAXLEN]; int res; int no_retries; uint8_t faulty; cs_error_t err; res = sscanf(key_name, "runtime.totem.pg.mrp.rrp.%u.%s", &iface_no, tmp_key); if (res != 2) { return ; } if (strcmp(tmp_key, "faulty") != 0) { return ; } no_retries = 0; while ((err = cmap_get_uint8(cmap_handle, key_name, &faulty)) == CS_ERR_TRY_AGAIN && no_retries++ < CMAP_MAX_RETRIES) { sleep(1); } if (err != CS_OK) { return ; } if (faulty) { _cs_rrp_faulty_event(iface_no, "faulty"); } else { _cs_rrp_faulty_event(iface_no, "operational"); } } static int _cs_cmap_dispatch(int fd, int revents, void *data) { cs_error_t err; err = cmap_dispatch(cmap_handle, CS_DISPATCH_ONE); if (err != CS_OK && err != CS_ERR_TRY_AGAIN && err != CS_ERR_TIMEOUT && err != CS_ERR_QUEUE_FULL) { qb_log(LOG_ERR, "Could not dispatch cmap events. Error %u", err); qb_loop_stop(main_loop); return -1; } return 0; } static void _cs_quorum_notification(quorum_handle_t handle, uint32_t quorate, uint64_t ring_seq, uint32_t view_list_entries, uint32_t *view_list) { if (_cs_is_quorate == quorate) { return; } _cs_is_quorate = quorate; if (quorate) { _cs_node_quorum_event("quorate"); } else { _cs_node_quorum_event("not quorate"); } } static int _cs_quorum_dispatch(int fd, int revents, void *data) { cs_error_t err; err = quorum_dispatch(quorum_handle, CS_DISPATCH_ONE); if (err != CS_OK && err != CS_ERR_TRY_AGAIN && err != CS_ERR_TIMEOUT && err != CS_ERR_QUEUE_FULL) { qb_log(LOG_ERR, "Could not dispatch quorum events. Error %u", err); qb_loop_stop(main_loop); return -1; } return 0; } static void _cs_quorum_init(void) { cs_error_t rc; uint32_t quorum_type; int fd; quorum_callbacks_t quorum_callbacks = { .quorum_notify_fn = _cs_quorum_notification, }; rc = quorum_initialize (&quorum_handle, &quorum_callbacks, &quorum_type); if (rc != CS_OK) { qb_log(LOG_ERR, "Could not connect to corosync(quorum)"); return; } quorum_fd_get(quorum_handle, &fd); qb_loop_poll_add(main_loop, QB_LOOP_MED, fd, POLLIN|POLLNVAL, NULL, _cs_quorum_dispatch); rc = quorum_trackstart(quorum_handle, CS_TRACK_CHANGES); if (rc != CS_OK) { qb_log(LOG_ERR, "Could not start tracking"); return; } } static void _cs_quorum_finalize(void) { quorum_finalize (quorum_handle); } #ifdef HAVE_DBUS /* * dbus notifications */ static void _cs_dbus_auto_flush(void) { dbus_connection_ref(db); while (dbus_connection_get_dispatch_status(db) == DBUS_DISPATCH_DATA_REMAINS) { dbus_connection_dispatch(db); } while (dbus_connection_has_messages_to_send(db)) { dbus_connection_flush(db); } dbus_connection_unref(db); } static void _cs_dbus_release(void) { DBusError err; if (!db) return; dbus_error_init(&err); dbus_bus_release_name(db, DBUS_CS_NAME, &err); dbus_error_free(&err); dbus_connection_unref(db); db = NULL; } static void _cs_dbus_node_quorum_event(char *nodename, uint32_t nodeid, const char *state) { DBusMessage *msg = NULL; if (err_set) { qb_log(LOG_ERR, "%s", _err); err_set = 0; } if (!db) { goto out_free; } if (dbus_connection_get_is_connected(db) != TRUE) { err_set = 1; snprintf(_err, sizeof(_err), "DBus connection lost"); _cs_dbus_release(); goto out_unlock; } _cs_dbus_auto_flush(); if (!(msg = dbus_message_new_signal(DBUS_CS_PATH, DBUS_CS_IFACE, "QuorumStateChange"))) { qb_log(LOG_ERR, "error creating dbus signal"); goto out_unlock; } if (!dbus_message_append_args(msg, DBUS_TYPE_STRING, &nodename, DBUS_TYPE_UINT32, &nodeid, DBUS_TYPE_STRING, &state, DBUS_TYPE_INVALID)) { qb_log(LOG_ERR, "error adding args to quorum signal"); goto out_unlock; } dbus_connection_send(db, msg, NULL); out_unlock: if (msg) { dbus_message_unref(msg); } out_free: return; } static void _cs_dbus_node_membership_event(char *nodename, uint32_t nodeid, char *state, char* ip) { DBusMessage *msg = NULL; if (err_set) { qb_log(LOG_ERR, "%s", _err); err_set = 0; } if (!db) { goto out_free; } if (dbus_connection_get_is_connected(db) != TRUE) { err_set = 1; snprintf(_err, sizeof(_err), "DBus connection lost"); _cs_dbus_release(); goto out_unlock; } _cs_dbus_auto_flush(); if (!(msg = dbus_message_new_signal(DBUS_CS_PATH, DBUS_CS_IFACE, "NodeStateChange"))) { qb_log(LOG_ERR, "error creating NodeStateChange signal"); goto out_unlock; } if (!dbus_message_append_args(msg, DBUS_TYPE_STRING, &nodename, DBUS_TYPE_UINT32, &nodeid, DBUS_TYPE_STRING, &ip, DBUS_TYPE_STRING, &state, DBUS_TYPE_INVALID)) { qb_log(LOG_ERR, "error adding args to NodeStateChange signal"); goto out_unlock; } dbus_connection_send(db, msg, NULL); out_unlock: if (msg) { dbus_message_unref(msg); } out_free: return; } static void _cs_dbus_application_connection_event(char *nodename, uint32_t nodeid, char *app_name, const char *state) { DBusMessage *msg = NULL; if (err_set) { qb_log(LOG_ERR, "%s", _err); err_set = 0; } if (!db) { goto out_free; } if (dbus_connection_get_is_connected(db) != TRUE) { err_set = 1; snprintf(_err, sizeof(_err), "DBus connection lost"); _cs_dbus_release(); goto out_unlock; } _cs_dbus_auto_flush(); if (!(msg = dbus_message_new_signal(DBUS_CS_PATH, DBUS_CS_IFACE, "ConnectionStateChange"))) { qb_log(LOG_ERR, "error creating ConnectionStateChange signal"); goto out_unlock; } if (!dbus_message_append_args(msg, DBUS_TYPE_STRING, &nodename, DBUS_TYPE_UINT32, &nodeid, DBUS_TYPE_STRING, &app_name, DBUS_TYPE_STRING, &state, DBUS_TYPE_INVALID)) { qb_log(LOG_ERR, "error adding args to ConnectionStateChange signal"); goto out_unlock; } dbus_connection_send(db, msg, NULL); out_unlock: if (msg) { dbus_message_unref(msg); } out_free: return; } static void _cs_dbus_rrp_faulty_event(char *nodename, uint32_t nodeid, uint32_t iface_no, const char *state) { DBusMessage *msg = NULL; if (err_set) { qb_log(LOG_ERR, "%s", _err); err_set = 0; } if (!db) { goto out_free; } if (dbus_connection_get_is_connected(db) != TRUE) { err_set = 1; snprintf(_err, sizeof(_err), "DBus connection lost"); _cs_dbus_release(); goto out_unlock; } _cs_dbus_auto_flush(); if (!(msg = dbus_message_new_signal(DBUS_CS_PATH, DBUS_CS_IFACE, "QuorumStateChange"))) { qb_log(LOG_ERR, "error creating dbus signal"); goto out_unlock; } if (!dbus_message_append_args(msg, DBUS_TYPE_STRING, &nodename, DBUS_TYPE_UINT32, &nodeid, DBUS_TYPE_UINT32, &iface_no, DBUS_TYPE_STRING, &state, DBUS_TYPE_INVALID)) { qb_log(LOG_ERR, "error adding args to rrp signal"); goto out_unlock; } dbus_connection_send(db, msg, NULL); out_unlock: if (msg) { dbus_message_unref(msg); } out_free: return; } static void _cs_dbus_init(void) { DBusConnection *dbc = NULL; DBusError err; dbus_error_init(&err); dbc = dbus_bus_get(DBUS_BUS_SYSTEM, &err); if (!dbc) { snprintf(_err, sizeof(_err), "dbus_bus_get: %s", err.message); err_set = 1; dbus_error_free(&err); return; } dbus_connection_set_exit_on_disconnect(dbc, FALSE); db = dbc; notifiers[num_notifiers].node_membership_fn = _cs_dbus_node_membership_event; notifiers[num_notifiers].node_quorum_fn = _cs_dbus_node_quorum_event; notifiers[num_notifiers].application_connection_fn = _cs_dbus_application_connection_event; notifiers[num_notifiers].rrp_faulty_fn = _cs_dbus_rrp_faulty_event; num_notifiers++; } #endif /* HAVE_DBUS */ #ifdef ENABLE_SNMP static netsnmp_session *snmp_init (const char *target) { static netsnmp_session *session = NULL; #ifndef NETSNMPV54 char default_port[128]; snprintf (default_port, sizeof (default_port), "%s:162", target); #endif if (session) { return (session); } if (target == NULL) { return NULL; } session = malloc (sizeof (netsnmp_session)); snmp_sess_init (session); session->version = SNMP_VERSION_2c; session->callback = NULL; session->callback_magic = NULL; session = snmp_add(session, #ifdef NETSNMPV54 netsnmp_transport_open_client ("snmptrap", target), #else netsnmp_tdomain_transport (default_port, 0, "udp"), #endif NULL, NULL); if (session == NULL) { qb_log(LOG_ERR, 0, "Could not create snmp transport"); } return (session); } static inline void add_field ( netsnmp_pdu *trap_pdu, u_char asn_type, const char *prefix, void *value, size_t value_size) { oid _oid[MAX_OID_LEN]; size_t _oid_len = MAX_OID_LEN; if (snmp_parse_oid(prefix, _oid, &_oid_len)) { snmp_pdu_add_variable (trap_pdu, _oid, _oid_len, asn_type, (u_char *) value, value_size); } } static void _cs_snmp_node_membership_event(char *nodename, uint32_t nodeid, char *state, char* ip) { int ret; char csysuptime[CS_TIMESTAMP_STR_LEN]; static oid snmptrap_oid[] = { 1,3,6,1,6,3,1,1,4,1,0 }; static oid sysuptime_oid[] = { 1,3,6,1,2,1,1,3,0 }; time_t now = time (NULL); netsnmp_pdu *trap_pdu; netsnmp_session *session = snmp_init (snmp_manager); if (session == NULL) { qb_log(LOG_NOTICE, "Failed to init SNMP session."); return ; } trap_pdu = snmp_pdu_create (SNMP_MSG_TRAP2); if (!trap_pdu) { qb_log(LOG_NOTICE, "Failed to create SNMP notification."); return ; } /* send uptime */ snprintf (csysuptime, CS_TIMESTAMP_STR_LEN, "%ld", now); snmp_add_var (trap_pdu, sysuptime_oid, sizeof (sysuptime_oid) / sizeof (oid), 't', csysuptime); snmp_add_var (trap_pdu, snmptrap_oid, sizeof (snmptrap_oid) / sizeof (oid), 'o', SNMP_OID_TRAPS_NODE); /* Add extries to the trap */ add_field (trap_pdu, ASN_OCTET_STR, SNMP_OID_OBJECT_NODE_NAME, (void*)nodename, strlen (nodename)); add_field (trap_pdu, ASN_UNSIGNED, SNMP_OID_OBJECT_NODE_ID, (void*)&nodeid, sizeof (nodeid)); add_field (trap_pdu, ASN_OCTET_STR, SNMP_OID_OBJECT_NODE_ADDR, (void*)ip, strlen (ip)); add_field (trap_pdu, ASN_OCTET_STR, SNMP_OID_OBJECT_NODE_STATUS, (void*)state, strlen (state)); /* Send and cleanup */ ret = snmp_send (session, trap_pdu); if (ret == 0) { /* error */ qb_log(LOG_ERR, "Could not send SNMP trap"); snmp_free_pdu (trap_pdu); } } static void _cs_snmp_node_quorum_event(char *nodename, uint32_t nodeid, const char *state) { int ret; char csysuptime[20]; static oid snmptrap_oid[] = { 1,3,6,1,6,3,1,1,4,1,0 }; static oid sysuptime_oid[] = { 1,3,6,1,2,1,1,3,0 }; time_t now = time (NULL); netsnmp_pdu *trap_pdu; netsnmp_session *session = snmp_init (snmp_manager); if (session == NULL) { qb_log(LOG_NOTICE, "Failed to init SNMP session."); return ; } trap_pdu = snmp_pdu_create (SNMP_MSG_TRAP2); if (!trap_pdu) { qb_log(LOG_NOTICE, "Failed to create SNMP notification."); return ; } /* send uptime */ sprintf (csysuptime, "%ld", now); snmp_add_var (trap_pdu, sysuptime_oid, sizeof (sysuptime_oid) / sizeof (oid), 't', csysuptime); snmp_add_var (trap_pdu, snmptrap_oid, sizeof (snmptrap_oid) / sizeof (oid), 'o', SNMP_OID_TRAPS_QUORUM); /* Add extries to the trap */ add_field (trap_pdu, ASN_OCTET_STR, SNMP_OID_OBJECT_NODE_NAME, (void*)nodename, strlen (nodename)); add_field (trap_pdu, ASN_UNSIGNED, SNMP_OID_OBJECT_NODE_ID, (void*)&nodeid, sizeof (nodeid)); add_field (trap_pdu, ASN_OCTET_STR, SNMP_OID_OBJECT_QUORUM, (void*)state, strlen (state)); /* Send and cleanup */ ret = snmp_send (session, trap_pdu); if (ret == 0) { /* error */ qb_log(LOG_ERR, "Could not send SNMP trap"); snmp_free_pdu (trap_pdu); } } static void _cs_snmp_rrp_faulty_event(char *nodename, uint32_t nodeid, uint32_t iface_no, const char *state) { int ret; char csysuptime[20]; static oid snmptrap_oid[] = { 1,3,6,1,6,3,1,1,4,1,0 }; static oid sysuptime_oid[] = { 1,3,6,1,2,1,1,3,0 }; time_t now = time (NULL); netsnmp_pdu *trap_pdu; netsnmp_session *session = snmp_init (snmp_manager); if (session == NULL) { qb_log(LOG_NOTICE, "Failed to init SNMP session."); return ; } trap_pdu = snmp_pdu_create (SNMP_MSG_TRAP2); if (!trap_pdu) { qb_log(LOG_NOTICE, "Failed to create SNMP notification."); return ; } /* send uptime */ sprintf (csysuptime, "%ld", now); snmp_add_var (trap_pdu, sysuptime_oid, sizeof (sysuptime_oid) / sizeof (oid), 't', csysuptime); snmp_add_var (trap_pdu, snmptrap_oid, sizeof (snmptrap_oid) / sizeof (oid), 'o', SNMP_OID_TRAPS_RRP); /* Add extries to the trap */ add_field (trap_pdu, ASN_OCTET_STR, SNMP_OID_OBJECT_NODE_NAME, (void*)nodename, strlen (nodename)); add_field (trap_pdu, ASN_UNSIGNED, SNMP_OID_OBJECT_NODE_ID, (void*)&nodeid, sizeof (nodeid)); add_field (trap_pdu, ASN_INTEGER, SNMP_OID_OBJECT_RRP_IFACE_NO, (void*)&iface_no, sizeof (iface_no)); add_field (trap_pdu, ASN_OCTET_STR, SNMP_OID_OBJECT_RRP_STATUS, (void*)state, strlen (state)); /* Send and cleanup */ ret = snmp_send (session, trap_pdu); if (ret == 0) { /* error */ qb_log(LOG_ERR, "Could not send SNMP trap"); snmp_free_pdu (trap_pdu); } } static void _cs_snmp_init(void) { if (snmp_manager == NULL) { snmp_manager = (char*)local_host; } notifiers[num_notifiers].node_membership_fn = _cs_snmp_node_membership_event; notifiers[num_notifiers].node_quorum_fn = _cs_snmp_node_quorum_event; notifiers[num_notifiers].application_connection_fn = NULL; notifiers[num_notifiers].rrp_faulty_fn = _cs_snmp_rrp_faulty_event; num_notifiers++; } #endif /* ENABLE_SNMP */ static void _cs_syslog_node_membership_event(char *nodename, uint32_t nodeid, char *state, char* ip) { qb_log(LOG_NOTICE, "%s[%d] ip:%s %s", nodename, nodeid, ip, state); } static void _cs_syslog_node_quorum_event(char *nodename, uint32_t nodeid, const char *state) { if (strcmp(state, "quorate") == 0) { qb_log(LOG_NOTICE, "%s[%d] is now %s", nodename, nodeid, state); } else { qb_log(LOG_NOTICE, "%s[%d] has lost quorum", nodename, nodeid); } } static void _cs_syslog_application_connection_event(char *nodename, uint32_t nodeid, char* app_name, const char *state) { if (strcmp(state, "connected") == 0) { qb_log(LOG_NOTICE, "%s[%d] %s is now %s to corosync", nodename, nodeid, app_name, state); } else { qb_log(LOG_NOTICE, "%s[%d] %s is now %s from corosync", nodename, nodeid, app_name, state); } } static void _cs_syslog_rrp_faulty_event(char *nodename, uint32_t nodeid, uint32_t iface_no, const char *state) { qb_log(LOG_NOTICE, "%s[%d] interface %u is now %s", nodename, nodeid, iface_no, state); } static void _cs_node_membership_event(char *nodename, uint32_t nodeid, char *state, char* ip) { int i; for (i = 0; i < num_notifiers; i++) { if (notifiers[i].node_membership_fn) { notifiers[i].node_membership_fn(nodename, nodeid, state, ip); } } } static void _cs_local_node_info_get(char **nodename, uint32_t *nodeid) { cs_error_t rc; corosync_cfg_handle_t cfg_handle; if (local_nodeid == 0) { rc = corosync_cfg_initialize(&cfg_handle, NULL); if (rc != CS_OK) { syslog (LOG_ERR, "Failed to initialize the cfg API. Error %d\n", rc); exit (EXIT_FAILURE); } rc = corosync_cfg_local_get (cfg_handle, &local_nodeid); corosync_cfg_finalize(cfg_handle); if (rc != CS_OK) { local_nodeid = 0; strncpy(local_nodename, "localhost", sizeof (local_nodename)); local_nodename[sizeof (local_nodename) - 1] = '\0'; } else { gethostname(local_nodename, CS_MAX_NAME_LENGTH); } } *nodeid = local_nodeid; *nodename = local_nodename; } static void _cs_node_quorum_event(const char *state) { int i; char *nodename; uint32_t nodeid; _cs_local_node_info_get(&nodename, &nodeid); for (i = 0; i < num_notifiers; i++) { if (notifiers[i].node_quorum_fn) { notifiers[i].node_quorum_fn(nodename, nodeid, state); } } } static void _cs_application_connection_event(char *app_name, const char *state) { int i; char *nodename; uint32_t nodeid; _cs_local_node_info_get(&nodename, &nodeid); for (i = 0; i < num_notifiers; i++) { if (notifiers[i].application_connection_fn) { notifiers[i].application_connection_fn(nodename, nodeid, app_name, state); } } } static void _cs_rrp_faulty_event(uint32_t iface_no, const char *state) { int i; char *nodename; uint32_t nodeid; _cs_local_node_info_get(&nodename, &nodeid); for (i = 0; i < num_notifiers; i++) { if (notifiers[i].rrp_faulty_fn) { notifiers[i].rrp_faulty_fn(nodename, nodeid, iface_no, state); } } } static int32_t sig_exit_handler(int32_t num, void *data) { qb_loop_stop(main_loop); return 0; } static void _cs_cmap_init(void) { cs_error_t rc; int cmap_fd = 0; cmap_track_handle_t track_handle; rc = cmap_initialize (&cmap_handle); if (rc != CS_OK) { qb_log(LOG_ERR, "Failed to initialize the cmap API. Error %d", rc); exit (EXIT_FAILURE); } cmap_fd_get(cmap_handle, &cmap_fd); qb_loop_poll_add(main_loop, QB_LOOP_MED, cmap_fd, POLLIN|POLLNVAL, NULL, _cs_cmap_dispatch); rc = cmap_track_add(cmap_handle, "runtime.connections.", CMAP_TRACK_ADD | CMAP_TRACK_DELETE | CMAP_TRACK_PREFIX, _cs_cmap_connections_key_changed, NULL, &track_handle); if (rc != CS_OK) { qb_log(LOG_ERR, "Failed to track the connections key. Error %d", rc); exit (EXIT_FAILURE); } rc = cmap_track_add(cmap_handle, "runtime.totem.pg.mrp.srp.members.", CMAP_TRACK_ADD | CMAP_TRACK_MODIFY | CMAP_TRACK_PREFIX, _cs_cmap_members_key_changed, NULL, &track_handle); if (rc != CS_OK) { qb_log(LOG_ERR, "Failed to track the members key. Error %d", rc); exit (EXIT_FAILURE); } rc = cmap_track_add(cmap_handle, "runtime.totem.pg.mrp.rrp.", CMAP_TRACK_ADD | CMAP_TRACK_MODIFY | CMAP_TRACK_PREFIX, _cs_cmap_rrp_faulty_key_changed, NULL, &track_handle); if (rc != CS_OK) { qb_log(LOG_ERR, "Failed to track the rrp key. Error %d", rc); exit (EXIT_FAILURE); } } static void _cs_cmap_finalize(void) { cmap_finalize (cmap_handle); } static void _cs_check_config(void) { if (conf[CS_NTF_LOG] == QB_FALSE && conf[CS_NTF_STDOUT] == QB_FALSE && conf[CS_NTF_SNMP] == QB_FALSE && conf[CS_NTF_DBUS] == QB_FALSE) { qb_log(LOG_ERR, "no event type enabled, see corosync-notifyd -h, exiting."); exit(EXIT_FAILURE); } #ifndef ENABLE_SNMP if (conf[CS_NTF_SNMP]) { qb_log(LOG_ERR, "Not compiled with SNMP support enabled, exiting."); exit(EXIT_FAILURE); } #endif #ifndef HAVE_DBUS if (conf[CS_NTF_DBUS]) { qb_log(LOG_ERR, "Not compiled with DBus support enabled, exiting."); exit(EXIT_FAILURE); } #endif if (conf[CS_NTF_STDOUT] && !conf[CS_NTF_FG]) { qb_log(LOG_ERR, "configured to print to stdout and run in the background, exiting"); exit(EXIT_FAILURE); } if (conf[CS_NTF_SNMP] && conf[CS_NTF_DBUS]) { qb_log(LOG_ERR, "configured to send snmp traps and dbus signals - are you sure?."); } } static void _cs_usage(void) { fprintf(stderr, "usage:\n"\ " -f : Start application in foreground.\n"\ " -l : Log all events.\n"\ " -o : Print events to stdout (turns on -l).\n"\ " -s : Send SNMP traps on all events.\n"\ " -m : Set the SNMP Manager IP address (defaults to localhost).\n"\ " -d : Send DBUS signals on all events.\n"\ " -h : Print this help.\n\n"); } int main(int argc, char *argv[]) { int ch; conf[CS_NTF_FG] = QB_FALSE; conf[CS_NTF_LOG] = QB_FALSE; conf[CS_NTF_STDOUT] = QB_FALSE; conf[CS_NTF_SNMP] = QB_FALSE; conf[CS_NTF_DBUS] = QB_FALSE; while ((ch = getopt (argc, argv, "floshdm:")) != EOF) { switch (ch) { case 'f': conf[CS_NTF_FG] = QB_TRUE; break; case 'l': conf[CS_NTF_LOG] = QB_TRUE; break; case 'm': conf[CS_NTF_SNMP] = QB_TRUE; strncpy(snmp_manager_buf, optarg, sizeof (snmp_manager_buf)); snmp_manager_buf[sizeof (snmp_manager_buf) - 1] = '\0'; snmp_manager = snmp_manager_buf; break; case 'o': conf[CS_NTF_LOG] = QB_TRUE; conf[CS_NTF_STDOUT] = QB_TRUE; break; case 's': conf[CS_NTF_SNMP] = QB_TRUE; break; case 'd': conf[CS_NTF_DBUS] = QB_TRUE; break; case 'h': default: _cs_usage(); return EXIT_FAILURE; } } qb_log_init("notifyd", LOG_DAEMON, LOG_INFO); if (conf[CS_NTF_STDOUT]) { qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD, QB_LOG_FILTER_FILE, "*", LOG_DEBUG); qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, conf[CS_NTF_STDOUT]); } _cs_check_config(); if (!conf[CS_NTF_FG]) { if (daemon(0, 0) < 0) { perror("daemon() failed"); return EXIT_FAILURE; } } num_notifiers = 0; if (conf[CS_NTF_LOG]) { notifiers[num_notifiers].node_membership_fn = _cs_syslog_node_membership_event; notifiers[num_notifiers].node_quorum_fn = _cs_syslog_node_quorum_event; notifiers[num_notifiers].application_connection_fn = _cs_syslog_application_connection_event; notifiers[num_notifiers].rrp_faulty_fn = _cs_syslog_rrp_faulty_event; num_notifiers++; } main_loop = qb_loop_create(); _cs_cmap_init(); _cs_quorum_init(); #ifdef HAVE_DBUS if (conf[CS_NTF_DBUS]) { _cs_dbus_init(); } #endif /* HAVE_DBUS */ #ifdef ENABLE_SNMP if (conf[CS_NTF_SNMP]) { _cs_snmp_init(); } #endif /* ENABLE_SNMP */ qb_loop_signal_add(main_loop, QB_LOOP_HIGH, SIGINT, NULL, sig_exit_handler, NULL); qb_loop_signal_add(main_loop, QB_LOOP_HIGH, SIGQUIT, NULL, sig_exit_handler, NULL); qb_loop_signal_add(main_loop, QB_LOOP_HIGH, SIGTERM, NULL, sig_exit_handler, NULL); qb_loop_run(main_loop); #ifdef HAVE_DBUS if (conf[CS_NTF_DBUS]) { _cs_dbus_release(); } #endif /* HAVE_DBUS */ _cs_quorum_finalize(); _cs_cmap_finalize(); return 0; } diff --git a/vqsim/parser.c b/vqsim/parser.c index a7954d14..2fc10431 100644 --- a/vqsim/parser.c +++ b/vqsim/parser.c @@ -1,332 +1,331 @@ /* Parses the interactive commands */ #include #include #include #include -#include #include #ifdef HAVE_READLINE_HISTORY_H #include #endif #include #include "vqsim.h" static void do_usage(void) { printf(" All node IDs in the cluster are unique and belong to a numbered 'partition' (default=0)\n"); printf("\n"); printf("up [:][[,] ...] [[:][...]] [...]\n"); printf(" bring node(s) online in the specified partition(s)\n"); printf("down ,[...]\n"); printf(" send nodes offline (shut them down)\n"); printf("move/split [:][[,] ...] [[:][...]] [...]\n"); printf(" Move nodes from one partition to another (netsplit)\n"); printf(" here is the partition to move the nodes to\n"); printf("join [] ... \n"); printf(" Join partitions together (reverse of a netsplit)\n"); printf("qdevice on|off [:][[,] ...] [[:][...]] [...]\n"); printf(" Enable quorum device in specified nodes\n"); printf("autofence on|off\n"); printf(" automatically 'down' nodes on inquorate side on netsplit\n"); printf("show Show current nodes status\n"); printf("exit\n\n"); } typedef void (*cmd_routine_t)(int argc, char **argv); static void run_up_cmd(int argc, char **argv); static void run_down_cmd(int argc, char **argv); static void run_join_cmd(int argc, char **argv); static void run_move_cmd(int argc, char **argv); static void run_exit_cmd(int argc, char **argv); static void run_show_cmd(int argc, char **argv); static void run_autofence_cmd(int argc, char **argv); static void run_qdevice_cmd(int argc, char **argv); static struct cmd_list_struct { const char *cmd; int min_args; cmd_routine_t cmd_runner; } cmd_list[] = { { "up", 1, run_up_cmd}, { "down", 1, run_down_cmd}, { "move", 2, run_move_cmd}, { "split", 2, run_move_cmd}, { "join", 2, run_join_cmd}, { "autofence", 1, run_autofence_cmd}, { "qdevice", 1, run_qdevice_cmd}, { "show", 0, run_show_cmd}, { "exit", 0, run_exit_cmd}, { "quit", 0, run_exit_cmd}, { "q", 0, run_exit_cmd}, }; static int num_cmds = (sizeof(cmd_list)) / sizeof(struct cmd_list_struct); #define MAX_ARGS 1024 /* Takes a :[[,]...] list and return it as a partition and a list of nodes. Returns 0 if successful, -1 if not */ static int parse_partition_nodelist(char *string, int *partition, int *num_nodes, int **retnodes) { int i; int nodecount; int len; int last_comma; char *nodeptr; int *nodes; char *colonptr = strchr(string, ':'); if (colonptr) { *colonptr = '\0'; nodeptr = colonptr+1; *partition = atoi(string); } else { /* Default to partition 0 */ *partition = 0; nodeptr = string; } /* Count the number of commas and allocate space for the nodes */ nodecount = 0; for (i=0; i MAX_NODES) { return -1; } nodes = malloc(sizeof(int) * nodecount); if (!nodes) { return -1; } nodecount = 0; last_comma = 0; len = strlen(nodeptr); for (i=0; i<=len; i++) { if (nodeptr[i] == ',' || nodeptr[i] == '\0') { nodeptr[i] = '\0'; nodes[nodecount++] = atoi(&nodeptr[last_comma]); last_comma = i+1; } } *num_nodes = nodecount; *retnodes = nodes; return 0; } void parse_input_command(char *rl_cmd) { int i; int argc = 0; int valid_cmd = 0; char *argv[MAX_ARGS]; int last_arg_start = 0; int last_was_space = 0; int len; char *cmd; /* ^D quits */ if (rl_cmd == NULL) { run_exit_cmd(0, NULL); } cmd = strdup(rl_cmd); /* Split cmd up into args * destroying the original string mwahahahaha */ len = strlen(cmd); /* Span leading spaces */ for (i=0; cmd[i] == ' '; i++) ; last_arg_start = i; for (; i<=len; i++) { if (cmd[i] == ' ' || cmd[i] == '\0') { /* Allow multiple spaces */ if (last_was_space) { continue; } cmd[i] = '\0'; last_was_space = 1; argv[argc] = &cmd[last_arg_start]; argc++; } else { if (last_was_space) { last_arg_start = i; } last_was_space = 0; } } /* Ignore null commands */ if (strlen(argv[0]) == 0) { free(cmd); return; } #ifdef HAVE_READLINE_HISTORY_H add_history(rl_cmd); #endif /* Dispatch command */ for (i=0; i