Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/cts/agents/common_test_agent.c b/cts/agents/common_test_agent.c
index 67d76c17..e395c316 100644
--- a/cts/agents/common_test_agent.c
+++ b/cts/agents/common_test_agent.c
@@ -1,333 +1,332 @@
/*
* Copyright (c) 2010 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Angus Salkeld (asalkeld@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <errno.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <poll.h>
-#include <unistd.h>
#include <fcntl.h>
#include "common_test_agent.h"
#define MAX_CLIENTS 64
static char big_and_buf_rx[HOW_BIG_AND_BUF];
ta_do_command_fn do_command;
static qb_loop_t *poll_handle;
static pre_exit_fn pre_exit = NULL;
static int client_fds[MAX_CLIENTS];
static int client_fds_pos = 0;
qb_loop_t *ta_poll_handle_get(void)
{
return poll_handle;
}
static void shut_me_down(void)
{
if (pre_exit) {
pre_exit();
}
qb_loop_stop(poll_handle);
}
static void ta_handle_command (int sock, char* msg)
{
int num_args;
char *saveptr = NULL;
char *str = strdup (msg);
char *str_len;
char *str_arg;
char *args[5];
int i = 0;
int a = 0;
char* func = NULL;
qb_log(LOG_DEBUG,"(MSG:%s)", msg);
str_len = strtok_r (str, ":", &saveptr);
assert (str_len);
num_args = atoi (str_len) * 2;
for (i = 0; i < num_args / 2; i++) {
str_len = strtok_r (NULL, ":", &saveptr);
str_arg = strtok_r (NULL, ":", &saveptr);
if (func == NULL) {
/* first "arg" is the function */
qb_log (LOG_TRACE, "(LEN:%s, FUNC:%s)", str_len, str_arg);
func = str_arg;
a = 0;
} else {
args[a] = str_arg;
a++;
qb_log (LOG_TRACE, "(LEN:%s, ARG:%s)", str_len, str_arg);
}
}
do_command (sock, func, args, a+1);
free (str);
}
static int server_process_data_fn (
int fd,
int revents,
void *data)
{
char *saveptr;
char *msg;
char *cmd;
int32_t nbytes;
if (revents & POLLHUP || revents & POLLERR) {
qb_log (LOG_INFO, "command sockect got POLLHUP exiting...");
shut_me_down();
return -1;
}
if ((nbytes = recv (fd, big_and_buf_rx, sizeof (big_and_buf_rx), 0)) <= 0) {
/* got error or connection closed by client */
if (nbytes == 0) {
/* connection closed */
qb_log (LOG_WARNING, "socket %d hung up: exiting...", fd);
} else {
qb_perror(LOG_ERR, "recv() failed");
}
shut_me_down();
return -1;
} else {
big_and_buf_rx[nbytes] = '\0';
msg = strtok_r (big_and_buf_rx, ";", &saveptr);
assert (msg);
while (msg) {
cmd = strdup (msg);
ta_handle_command (fd, cmd);
free (cmd);
msg = strtok_r (NULL, ";", &saveptr);
}
}
return 0;
}
static int server_accept_fn (
int fd, int revents, void *data)
{
socklen_t addrlen;
struct sockaddr_in in_addr;
int new_fd;
int res;
if (revents & POLLHUP || revents & POLLERR) {
qb_log (LOG_INFO, "command sockect got POLLHUP exiting...");
shut_me_down();
return -1;
}
addrlen = sizeof (struct sockaddr_in);
retry_accept:
new_fd = accept (fd, (struct sockaddr *)&in_addr, &addrlen);
if (new_fd == -1 && errno == EINTR) {
goto retry_accept;
}
if (new_fd == -1) {
qb_log (LOG_ERR,
"Could not accept connection: %s", strerror (errno));
return (0); /* This is an error, but -1 would indicate disconnect from poll loop */
}
res = fcntl (new_fd, F_SETFL, O_NONBLOCK);
if (res == -1) {
qb_log (LOG_ERR,
"Could not set non-blocking operation on connection: %s",
strerror (errno));
close (new_fd);
return (0); /* This is an error, but -1 would indicate disconnect from poll loop */
}
client_fds[client_fds_pos] = new_fd;
client_fds_pos++;
assert(client_fds_pos < MAX_CLIENTS);
qb_loop_poll_add (poll_handle,
QB_LOOP_MED,
new_fd,
POLLIN|POLLNVAL,
NULL,
server_process_data_fn);
return 0;
}
static int create_server_sockect (int server_port)
{
int listener;
int yes = 1;
int rv;
struct addrinfo hints, *ai, *p;
char server_port_str[16];
char addr_str[INET_ADDRSTRLEN];
void *ptr = NULL;
/* get a socket and bind it
*/
sprintf(server_port_str, "%d", server_port);
memset (&hints, 0, sizeof hints);
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE;
if ((rv = getaddrinfo (NULL, server_port_str, &hints, &ai)) != 0) {
qb_log (LOG_ERR, "%s", gai_strerror (rv));
exit (1);
}
for (p = ai; p != NULL; p = p->ai_next) {
listener = socket (p->ai_family, p->ai_socktype, p->ai_protocol);
if (listener < 0) {
continue;
}
/* lose the pesky "address already in use" error message
*/
if (setsockopt (listener, SOL_SOCKET, SO_REUSEADDR,
&yes, sizeof(int)) < 0) {
qb_log (LOG_ERR, "setsockopt() failed: %s", strerror (errno));
}
switch (p->ai_family)
{
case AF_INET:
ptr = &((struct sockaddr_in *) p->ai_addr)->sin_addr;
break;
case AF_INET6:
ptr = &((struct sockaddr_in6 *) p->ai_addr)->sin6_addr;
break;
default:
qb_log (LOG_ERR, "address family wrong");
exit (4);
}
if (inet_ntop(p->ai_family, ptr, addr_str, INET_ADDRSTRLEN) == NULL) {
qb_log (LOG_ERR, "inet_ntop() failed: %s", strerror (errno));
}
if (bind (listener, p->ai_addr, p->ai_addrlen) < 0) {
qb_log (LOG_ERR, "bind(%s) failed: %s", addr_str, strerror (errno));
close (listener);
continue;
}
break;
}
if (p == NULL) {
qb_log (LOG_ERR, "failed to bind");
exit (2);
}
freeaddrinfo (ai);
if (listen (listener, 10) == -1) {
qb_log (LOG_ERR, "listen() failed: %s", strerror(errno));
exit (3);
}
return listener;
}
static int32_t sig_exit_handler (int num, void *data)
{
qb_log (LOG_INFO, "got signal %d, exiting", num);
shut_me_down();
return 0;
}
int
test_agent_run(const char * prog_name, int server_port,
ta_do_command_fn func, pre_exit_fn exit_fn)
{
int listener;
int i;
qb_log_init(prog_name, LOG_DAEMON, LOG_DEBUG);
qb_log_format_set(QB_LOG_SYSLOG, "%n() [%p] %b");
qb_log (LOG_INFO, "STARTING");
do_command = func;
pre_exit = exit_fn;
poll_handle = qb_loop_create ();
if (exit_fn) {
qb_loop_signal_add(poll_handle, QB_LOOP_HIGH,
SIGINT, NULL, sig_exit_handler, NULL);
qb_loop_signal_add(poll_handle, QB_LOOP_HIGH,
SIGQUIT, NULL, sig_exit_handler, NULL);
qb_loop_signal_add(poll_handle, QB_LOOP_HIGH,
SIGTERM, NULL, sig_exit_handler, NULL);
}
listener = create_server_sockect (server_port);
qb_loop_poll_add (poll_handle,
QB_LOOP_MED,
listener,
POLLIN|POLLNVAL,
NULL, server_accept_fn);
qb_loop_run (poll_handle);
close(listener);
for (i = 0; i < client_fds_pos; i++) {
close(client_fds[client_fds_pos]);
}
qb_log (LOG_INFO, "EXITING");
qb_log_fini();
return 0;
}
diff --git a/cts/agents/cpg_test_agent.c b/cts/agents/cpg_test_agent.c
index a52f4f0a..0837c69c 100644
--- a/cts/agents/cpg_test_agent.c
+++ b/cts/agents/cpg_test_agent.c
@@ -1,804 +1,803 @@
/*
* Copyright (c) 2010 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Angus Salkeld (asalkeld@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <errno.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/uio.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <poll.h>
-#include <unistd.h>
#include <fcntl.h>
#include <corosync/list.h>
#include <qb/qbdefs.h>
#include <qb/qbutil.h>
#include <qb/qbloop.h>
#include <qb/qblog.h>
#include <corosync/cpg.h>
#include <corosync/cfg.h>
#include "common_test_agent.h"
#include <nss.h>
#include <pk11pub.h>
typedef enum {
MSG_OK,
MSG_NODEID_ERR,
MSG_PID_ERR,
MSG_SEQ_ERR,
MSG_SIZE_ERR,
MSG_SHA1_ERR,
} msg_status_t;
typedef struct {
uint32_t nodeid;
pid_t pid;
unsigned char sha1[20];
uint32_t seq;
size_t size;
unsigned char payload[0];
} msg_t;
#define LOG_STR_SIZE 80
typedef struct {
char log[LOG_STR_SIZE];
struct list_head list;
} log_entry_t;
static char big_and_buf[HOW_BIG_AND_BUF];
static int32_t record_config_events_g = 0;
static int32_t record_messages_g = 0;
static cpg_handle_t cpg_handle = 0;
static corosync_cfg_handle_t cfg_handle = 0;
static int32_t cpg_fd = -1;
static int32_t cfg_fd = -1;
static struct list_head config_chg_log_head;
static struct list_head msg_log_head;
static pid_t my_pid;
static uint32_t my_nodeid;
static int32_t my_seq;
static int32_t use_zcb = QB_FALSE;
static int32_t my_msgs_to_send;
static int32_t my_msgs_sent;
static int32_t total_stored_msgs = 0;
static int32_t total_msgs_revd = 0;
static int32_t in_cnchg = 0;
static int32_t pcmk_test = 0;
PK11Context* sha1_context;
static void send_some_more_messages (void * unused);
static char*
err_status_string (char * buf, size_t buf_len, msg_status_t status)
{
switch (status) {
case MSG_OK:
strncpy (buf, "OK", buf_len);
break;
case MSG_NODEID_ERR:
strncpy (buf, "NODEID_ERR", buf_len);
break;
case MSG_PID_ERR:
strncpy (buf, "PID_ERR", buf_len);
break;
case MSG_SEQ_ERR:
strncpy (buf, "SEQ_ERR", buf_len);
break;
case MSG_SIZE_ERR:
strncpy (buf, "SIZE_ERR", buf_len);
break;
case MSG_SHA1_ERR:
strncpy (buf, "SHA1_ERR", buf_len);
break;
default:
strncpy (buf, "UNKNOWN_ERR", buf_len);
break;
}
if (buf_len > 0) {
buf[buf_len - 1] = '\0';
}
return buf;
}
static void delivery_callback (
cpg_handle_t handle,
const struct cpg_name *groupName,
uint32_t nodeid,
uint32_t pid,
void *msg,
size_t msg_len)
{
log_entry_t *log_pt;
msg_t *msg_pt = (msg_t*)msg;
msg_status_t status = MSG_OK;
char status_buf[20];
unsigned char sha1_compare[20];
unsigned int sha1_len;
if (record_messages_g == 0) {
return;
}
if (nodeid != msg_pt->nodeid) {
status = MSG_NODEID_ERR;
}
if (pid != msg_pt->pid) {
status = MSG_PID_ERR;
}
if (msg_len != msg_pt->size) {
status = MSG_SIZE_ERR;
}
PK11_DigestBegin(sha1_context);
PK11_DigestOp(sha1_context, msg_pt->payload, (msg_pt->size - sizeof (msg_t)));
PK11_DigestFinal(sha1_context, sha1_compare, &sha1_len, sizeof(sha1_compare));
if (memcmp (sha1_compare, msg_pt->sha1, 20) != 0) {
qb_log (LOG_ERR, "msg seq:%d; incorrect hash",
msg_pt->seq);
status = MSG_SHA1_ERR;
}
log_pt = malloc (sizeof(log_entry_t));
list_init (&log_pt->list);
snprintf (log_pt->log, LOG_STR_SIZE, "%u:%d:%d:%s;",
msg_pt->nodeid, msg_pt->seq, my_seq,
err_status_string (status_buf, 20, status));
list_add_tail (&log_pt->list, &msg_log_head);
total_stored_msgs++;
total_msgs_revd++;
my_seq++;
if ((total_msgs_revd % 1000) == 0) {
qb_log (LOG_INFO, "rx %d", total_msgs_revd);
}
}
static void config_change_callback (
cpg_handle_t handle,
const struct cpg_name *groupName,
const struct cpg_address *member_list, size_t member_list_entries,
const struct cpg_address *left_list, size_t left_list_entries,
const struct cpg_address *joined_list, size_t joined_list_entries)
{
int i;
log_entry_t *log_pt;
/* group_name,ip,pid,join|leave */
if (record_config_events_g > 0) {
qb_log (LOG_INFO, "got cpg event[recording] for group %s", groupName->value);
} else {
qb_log (LOG_INFO, "got cpg event[ignoring] for group %s", groupName->value);
}
for (i = 0; i < left_list_entries; i++) {
if (record_config_events_g > 0) {
log_pt = malloc (sizeof(log_entry_t));
list_init (&log_pt->list);
snprintf (log_pt->log, LOG_STR_SIZE, "%s,%u,%u,left",
groupName->value, left_list[i].nodeid,left_list[i].pid);
list_add_tail(&log_pt->list, &config_chg_log_head);
qb_log (LOG_INFO, "cpg event %s", log_pt->log);
}
}
for (i = 0; i < joined_list_entries; i++) {
if (record_config_events_g > 0) {
log_pt = malloc (sizeof(log_entry_t));
list_init (&log_pt->list);
snprintf (log_pt->log, LOG_STR_SIZE, "%s,%u,%u,join",
groupName->value, joined_list[i].nodeid,joined_list[i].pid);
list_add_tail (&log_pt->list, &config_chg_log_head);
qb_log (LOG_INFO, "cpg event %s", log_pt->log);
}
}
if (pcmk_test == 1) {
in_cnchg = 1;
send_some_more_messages (NULL);
in_cnchg = 0;
}
}
static void my_shutdown_callback (corosync_cfg_handle_t handle,
corosync_cfg_shutdown_flags_t flags)
{
qb_log (LOG_CRIT, "flags:%d", flags);
if (flags == COROSYNC_CFG_SHUTDOWN_FLAG_REQUEST) {
corosync_cfg_replyto_shutdown (cfg_handle, COROSYNC_CFG_SHUTDOWN_FLAG_YES);
}
}
static corosync_cfg_callbacks_t cfg_callbacks = {
.corosync_cfg_shutdown_callback = my_shutdown_callback,
};
static cpg_callbacks_t callbacks = {
.cpg_deliver_fn = delivery_callback,
.cpg_confchg_fn = config_change_callback,
};
static void record_messages (void)
{
record_messages_g = 1;
qb_log (LOG_INFO, "record:%d", record_messages_g);
}
static void record_config_events (int sock)
{
char response[100];
ssize_t rc;
size_t send_len;
record_config_events_g = 1;
qb_log (LOG_INFO, "record:%d", record_config_events_g);
snprintf (response, 100, "%s", OK_STR);
send_len = strlen (response);
rc = send (sock, response, send_len, 0);
assert(rc == send_len);
}
static void read_config_event (int sock)
{
const char *empty = "None";
struct list_head * list = config_chg_log_head.next;
log_entry_t *entry;
ssize_t rc;
size_t send_len;
if (list != &config_chg_log_head) {
entry = list_entry (list, log_entry_t, list);
send_len = strlen (entry->log);
rc = send (sock, entry->log, send_len, 0);
list_del (&entry->list);
free (entry);
} else {
qb_log (LOG_DEBUG, "no events in list");
send_len = strlen (empty);
rc = send (sock, empty, send_len, 0);
}
assert(rc == send_len);
}
static void read_messages (int sock, char* atmost_str)
{
struct list_head * list;
log_entry_t *entry;
int atmost = atoi (atmost_str);
int packed = 0;
ssize_t rc;
if (atmost == 0)
atmost = 1;
if (atmost > (HOW_BIG_AND_BUF / LOG_STR_SIZE))
atmost = (HOW_BIG_AND_BUF / LOG_STR_SIZE);
big_and_buf[0] = '\0';
for (list = msg_log_head.next;
(!list_empty (&msg_log_head) && packed < atmost); ) {
entry = list_entry (list, log_entry_t, list);
strcat (big_and_buf, entry->log);
packed++;
list = list->next;
list_del (&entry->list);
free (entry);
total_stored_msgs--;
}
if (packed == 0) {
strcpy (big_and_buf, "None");
} else {
if ((total_stored_msgs % 1000) == 0) {
qb_log(LOG_INFO, "sending %d; total_stored_msgs:%d; len:%d",
packed, total_stored_msgs, (int)strlen (big_and_buf));
}
}
rc = send (sock, big_and_buf, strlen (big_and_buf), 0);
assert(rc == strlen (big_and_buf));
}
static qb_loop_timer_handle more_messages_timer_handle;
static void send_some_more_messages_later (void)
{
cpg_dispatch (cpg_handle, CS_DISPATCH_ALL);
qb_loop_timer_add (
ta_poll_handle_get(),
QB_LOOP_MED,
300*QB_TIME_NS_IN_MSEC, NULL,
send_some_more_messages,
&more_messages_timer_handle);
}
static void send_some_more_messages_zcb (void)
{
msg_t *my_msg;
int i;
int send_now;
size_t payload_size;
size_t total_size;
unsigned int sha1_len;
cs_error_t res;
cpg_flow_control_state_t fc_state;
void *zcb_buffer;
if (cpg_fd < 0)
return;
send_now = my_msgs_to_send;
payload_size = (rand() % 100000);
total_size = payload_size + sizeof (msg_t);
cpg_zcb_alloc (cpg_handle, total_size, &zcb_buffer);
my_msg = (msg_t*)zcb_buffer;
qb_log(LOG_DEBUG, "send_now:%d", send_now);
my_msg->pid = my_pid;
my_msg->nodeid = my_nodeid;
my_msg->size = sizeof (msg_t) + payload_size;
my_msg->seq = my_msgs_sent;
for (i = 0; i < payload_size; i++) {
my_msg->payload[i] = i;
}
PK11_DigestBegin(sha1_context);
PK11_DigestOp(sha1_context, my_msg->payload, payload_size);
PK11_DigestFinal(sha1_context, my_msg->sha1, &sha1_len, sizeof(my_msg->sha1));
for (i = 0; i < send_now; i++) {
res = cpg_flow_control_state_get (cpg_handle, &fc_state);
if (res == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) {
/* lets do this later */
send_some_more_messages_later ();
qb_log (LOG_INFO, "flow control enabled.");
goto free_buffer;
}
res = cpg_zcb_mcast_joined (cpg_handle, CPG_TYPE_AGREED, zcb_buffer, total_size);
if (res == CS_ERR_TRY_AGAIN) {
/* lets do this later */
send_some_more_messages_later ();
goto free_buffer;
} else if (res != CS_OK) {
qb_log (LOG_ERR, "cpg_mcast_joined error:%d, exiting.",
res);
exit (-2);
}
my_msgs_sent++;
my_msgs_to_send--;
}
free_buffer:
cpg_zcb_free (cpg_handle, zcb_buffer);
}
#define cs_repeat(counter, max, code) do { \
code; \
if (res == CS_ERR_TRY_AGAIN) { \
counter++; \
sleep(counter); \
} \
} while (res == CS_ERR_TRY_AGAIN && counter < max)
static unsigned char buffer[200000];
static void send_some_more_messages_normal (void)
{
msg_t my_msg;
struct iovec iov[2];
int i;
int send_now;
size_t payload_size;
cs_error_t res;
cpg_flow_control_state_t fc_state;
int retries = 0;
time_t before;
unsigned int sha1_len;
if (cpg_fd < 0)
return;
send_now = my_msgs_to_send;
qb_log (LOG_TRACE, "send_now:%d", send_now);
my_msg.pid = my_pid;
my_msg.nodeid = my_nodeid;
payload_size = (rand() % 10000);
my_msg.size = sizeof (msg_t) + payload_size;
my_msg.seq = my_msgs_sent;
for (i = 0; i < payload_size; i++) {
buffer[i] = i;
}
PK11_DigestBegin(sha1_context);
PK11_DigestOp(sha1_context, buffer, payload_size);
PK11_DigestFinal(sha1_context, my_msg.sha1, &sha1_len, sizeof(my_msg.sha1));
iov[0].iov_len = sizeof (msg_t);
iov[0].iov_base = &my_msg;
iov[1].iov_len = payload_size;
iov[1].iov_base = buffer;
for (i = 0; i < send_now; i++) {
if (in_cnchg && pcmk_test) {
retries = 0;
before = time(NULL);
cs_repeat(retries, 30, res = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, iov, 2));
if (retries > 20) {
qb_log (LOG_ERR, "cs_repeat: blocked for :%lu secs.",
(unsigned long)(time(NULL) - before));
}
if (res != CS_OK) {
qb_log (LOG_ERR, "cpg_mcast_joined error:%d.",
res);
return;
}
} else {
res = cpg_flow_control_state_get (cpg_handle, &fc_state);
if (res == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) {
/* lets do this later */
send_some_more_messages_later ();
qb_log (LOG_INFO, "flow control enabled.");
return;
}
res = cpg_mcast_joined (cpg_handle, CPG_TYPE_AGREED, iov, 2);
if (res == CS_ERR_TRY_AGAIN) {
/* lets do this later */
send_some_more_messages_later ();
if (i > 0) {
qb_log (LOG_INFO, "TRY_AGAIN %d to send.",
my_msgs_to_send);
}
return;
} else if (res != CS_OK) {
qb_log (LOG_ERR, "cpg_mcast_joined error:%d, exiting.",
res);
exit (-2);
}
}
my_msgs_sent++;
my_msg.seq = my_msgs_sent;
my_msgs_to_send--;
}
qb_log (LOG_TRACE, "sent %d; to send %d.",
my_msgs_sent, my_msgs_to_send);
}
static void send_some_more_messages (void * unused)
{
if (use_zcb) {
send_some_more_messages_zcb ();
} else {
send_some_more_messages_normal ();
}
}
static void msg_blaster (int sock, char* num_to_send_str)
{
my_msgs_to_send = atoi (num_to_send_str);
my_msgs_sent = 0;
my_seq = 1;
my_pid = getpid();
use_zcb = QB_FALSE;
total_stored_msgs = 0;
cpg_local_get (cpg_handle, &my_nodeid);
/* control the limits */
if (my_msgs_to_send <= 0)
my_msgs_to_send = 1;
if (my_msgs_to_send > 10000)
my_msgs_to_send = 10000;
send_some_more_messages_normal ();
}
static void context_test (int sock)
{
char response[100];
char *cmp;
ssize_t rc;
size_t send_len;
cpg_context_set (cpg_handle, response);
cpg_context_get (cpg_handle, (void**)&cmp);
if (response != cmp) {
snprintf (response, 100, "%s", FAIL_STR);
}
else {
snprintf (response, 100, "%s", OK_STR);
}
send_len = strlen (response);
rc = send (sock, response, send_len, 0);
assert(rc == send_len);
}
static void msg_blaster_zcb (int sock, char* num_to_send_str)
{
my_msgs_to_send = atoi (num_to_send_str);
my_seq = 1;
my_pid = getpid();
use_zcb = QB_TRUE;
total_stored_msgs = 0;
cpg_local_get (cpg_handle, &my_nodeid);
/* control the limits */
if (my_msgs_to_send <= 0)
my_msgs_to_send = 1;
if (my_msgs_to_send > 10000)
my_msgs_to_send = 10000;
send_some_more_messages_zcb ();
}
static int cfg_dispatch_wrapper_fn (
int fd,
int revents,
void *data)
{
cs_error_t error;
if (revents & POLLHUP || revents & POLLERR) {
qb_log (LOG_ERR, "got POLLHUP disconnecting from CFG");
corosync_cfg_finalize(cfg_handle);
cfg_handle = 0;
return -1;
}
error = corosync_cfg_dispatch (cfg_handle, CS_DISPATCH_ALL);
if (error == CS_ERR_LIBRARY) {
qb_log (LOG_ERR, "got LIB error disconnecting from CFG.");
corosync_cfg_finalize(cfg_handle);
cfg_handle = 0;
return -1;
}
return 0;
}
static int cpg_dispatch_wrapper_fn (
int fd,
int revents,
void *data)
{
cs_error_t error;
if (revents & POLLHUP || revents & POLLERR) {
qb_log (LOG_ERR, "got POLLHUP disconnecting from CPG");
cpg_finalize(cpg_handle);
cpg_handle = 0;
return -1;
}
error = cpg_dispatch (cpg_handle, CS_DISPATCH_ALL);
if (error == CS_ERR_LIBRARY) {
qb_log (LOG_ERR, "got LIB error disconnecting from CPG");
cpg_finalize(cpg_handle);
cpg_handle = 0;
return -1;
}
return 0;
}
static void do_command (int sock, char* func, char*args[], int num_args)
{
int result;
char response[100];
struct cpg_name group_name;
ssize_t rc;
size_t send_len;
qb_log (LOG_TRACE, "RPC:%s() called.", func);
if (strcmp ("cpg_mcast_joined",func) == 0) {
struct iovec iov[5];
int a;
for (a = 0; a < num_args; a++) {
iov[a].iov_base = args[a];
iov[a].iov_len = strlen(args[a])+1;
}
cpg_mcast_joined (cpg_handle, CPG_TYPE_AGREED, iov, num_args);
} else if (strcmp ("cpg_join",func) == 0) {
if (strlen(args[0]) >= CPG_MAX_NAME_LENGTH) {
qb_log (LOG_ERR, "Invalid group name");
exit (1);
}
strcpy (group_name.value, args[0]);
group_name.length = strlen(args[0]);
result = cpg_join (cpg_handle, &group_name);
if (result != CS_OK) {
qb_log (LOG_ERR,
"Could not join process group, error %d", result);
exit (1);
}
qb_log (LOG_INFO, "called cpg_join(%s)!", group_name.value);
} else if (strcmp ("cpg_leave",func) == 0) {
strcpy (group_name.value, args[0]);
group_name.length = strlen(args[0]);
result = cpg_leave (cpg_handle, &group_name);
if (result != CS_OK) {
qb_log (LOG_ERR,
"Could not leave process group, error %d", result);
exit (1);
}
qb_log (LOG_INFO, "called cpg_leave(%s)!", group_name.value);
} else if (strcmp ("cpg_initialize",func) == 0) {
int retry_count = 0;
result = cpg_initialize (&cpg_handle, &callbacks);
while (result != CS_OK) {
qb_log (LOG_ERR,
"cpg_initialize error %d (attempt %d)",
result, retry_count);
if (retry_count >= 3) {
exit (1);
}
sleep(1);
retry_count++;
result = cpg_initialize (&cpg_handle, &callbacks);
}
cpg_fd_get (cpg_handle, &cpg_fd);
qb_loop_poll_add (ta_poll_handle_get(),
QB_LOOP_MED,
cpg_fd,
POLLIN|POLLNVAL,
NULL,
cpg_dispatch_wrapper_fn);
} else if (strcmp ("cpg_local_get", func) == 0) {
unsigned int local_nodeid;
cpg_local_get (cpg_handle, &local_nodeid);
snprintf (response, 100, "%u",local_nodeid);
send_len = strlen (response);
rc = send (sock, response, send_len, 0);
assert(rc == send_len);
} else if (strcmp ("cpg_finalize", func) == 0) {
if (cpg_handle > 0) {
cpg_finalize (cpg_handle);
cpg_handle = 0;
}
} else if (strcmp ("record_config_events", func) == 0) {
record_config_events (sock);
} else if (strcmp ("record_messages", func) == 0) {
record_messages ();
} else if (strcmp ("read_config_event", func) == 0) {
read_config_event (sock);
} else if (strcmp ("read_messages", func) == 0) {
read_messages (sock, args[0]);
} else if (strcmp ("msg_blaster_zcb", func) == 0) {
msg_blaster_zcb (sock, args[0]);
} else if (strcmp ("pcmk_test", func) == 0) {
pcmk_test = 1;
} else if (strcmp ("msg_blaster", func) == 0) {
msg_blaster (sock, args[0]);
} else if (strcmp ("context_test", func) == 0) {
context_test (sock);
} else if (strcmp ("are_you_ok_dude", func) == 0) {
snprintf (response, 100, "%s", OK_STR);
send_len = strlen (response);
rc = send (sock, response, strlen (response), 0);
assert(rc == send_len);
} else if (strcmp ("cfg_shutdown", func) == 0) {
qb_log (LOG_INFO, "calling %s() called!", func);
result = corosync_cfg_try_shutdown (cfg_handle, COROSYNC_CFG_SHUTDOWN_FLAG_REQUEST);
qb_log (LOG_INFO,"%s() returned %d!", func, result);
} else if (strcmp ("cfg_initialize",func) == 0) {
int retry_count = 0;
qb_log (LOG_INFO,"%s() called!", func);
result = corosync_cfg_initialize (&cfg_handle, &cfg_callbacks);
while (result != CS_OK) {
qb_log (LOG_ERR,
"cfg_initialize error %d (attempt %d)",
result, retry_count);
if (retry_count >= 3) {
exit (1);
}
sleep(1);
retry_count++;
result = corosync_cfg_initialize (&cfg_handle, &cfg_callbacks);
}
qb_log (LOG_INFO,"corosync_cfg_initialize() == %d", result);
result = corosync_cfg_fd_get (cfg_handle, &cfg_fd);
qb_log (LOG_INFO,"corosync_cfg_fd_get() == %d", result);
qb_loop_poll_add (ta_poll_handle_get(),
QB_LOOP_MED,
cfg_fd,
POLLIN|POLLNVAL,
NULL,
cfg_dispatch_wrapper_fn);
} else {
qb_log(LOG_ERR, "RPC:%s not supported!", func);
}
}
static void my_pre_exit(void)
{
qb_log (LOG_INFO, "%s PRE EXIT", __FILE__);
if (cpg_handle > 0) {
cpg_finalize (cpg_handle);
cpg_handle = 0;
}
if (cfg_handle > 0) {
corosync_cfg_finalize (cfg_handle);
cfg_handle = 0;
}
PK11_DestroyContext(sha1_context, PR_TRUE);
}
int
main(int argc, char *argv[])
{
list_init (&msg_log_head);
list_init (&config_chg_log_head);
if (NSS_NoDB_Init(".") != SECSuccess) {
qb_log(LOG_ERR, "Couldn't initialize nss");
exit (0);
}
if ((sha1_context = PK11_CreateDigestContext(SEC_OID_SHA1)) == NULL) {
qb_log(LOG_ERR, "Couldn't initialize nss");
exit (0);
}
return test_agent_run ("cpg_test_agent", 9034, do_command, my_pre_exit);
}
diff --git a/exec/cmap.c b/exec/cmap.c
index ba4c7987..950a76f5 100644
--- a/exec/cmap.c
+++ b/exec/cmap.c
@@ -1,1029 +1,1028 @@
/*
* Copyright (c) 2011-2012 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Jan Friesse (jfriesse@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the Red Hat, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <config.h>
#include <sys/types.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <errno.h>
-#include <unistd.h>
#include <poll.h>
#include <assert.h>
#include <qb/qbloop.h>
#include <qb/qbipc_common.h>
#include <corosync/corotypes.h>
#include <corosync/corodefs.h>
#include <corosync/list.h>
#include <corosync/mar_gen.h>
#include <corosync/ipc_cmap.h>
#include <corosync/logsys.h>
#include <corosync/coroapi.h>
#include <corosync/icmap.h>
#include "service.h"
LOGSYS_DECLARE_SUBSYS ("CMAP");
#define MAX_REQ_EXEC_CMAP_MCAST_ITEMS 32
#define ICMAP_VALUETYPE_NOT_EXIST 0
struct cmap_conn_info {
struct hdb_handle_database iter_db;
struct hdb_handle_database track_db;
};
typedef uint64_t cmap_iter_handle_t;
typedef uint64_t cmap_track_handle_t;
struct cmap_track_user_data {
void *conn;
cmap_track_handle_t track_handle;
uint64_t track_inst_handle;
};
enum cmap_message_req_types {
MESSAGE_REQ_EXEC_CMAP_MCAST = 0,
};
enum cmap_mcast_reason {
CMAP_MCAST_REASON_SYNC = 0,
CMAP_MCAST_REASON_NEW_CONFIG_VERSION = 1,
};
static struct corosync_api_v1 *api;
static char *cmap_exec_init_fn (struct corosync_api_v1 *corosync_api);
static int cmap_exec_exit_fn(void);
static int cmap_lib_init_fn (void *conn);
static int cmap_lib_exit_fn (void *conn);
static void message_handler_req_lib_cmap_set(void *conn, const void *message);
static void message_handler_req_lib_cmap_delete(void *conn, const void *message);
static void message_handler_req_lib_cmap_get(void *conn, const void *message);
static void message_handler_req_lib_cmap_adjust_int(void *conn, const void *message);
static void message_handler_req_lib_cmap_iter_init(void *conn, const void *message);
static void message_handler_req_lib_cmap_iter_next(void *conn, const void *message);
static void message_handler_req_lib_cmap_iter_finalize(void *conn, const void *message);
static void message_handler_req_lib_cmap_track_add(void *conn, const void *message);
static void message_handler_req_lib_cmap_track_delete(void *conn, const void *message);
static void cmap_notify_fn(int32_t event,
const char *key_name,
struct icmap_notify_value new_val,
struct icmap_notify_value old_val,
void *user_data);
static void message_handler_req_exec_cmap_mcast(
const void *message,
unsigned int nodeid);
static void exec_cmap_mcast_endian_convert(void *message);
/*
* Reson is subtype of message. argc is number of items in argv array. Argv is array
* of strings (key names) which will be send to wire. There can be maximum
* MAX_REQ_EXEC_CMAP_MCAST_ITEMS items (for more items, CS_ERR_TOO_MANY_GROUPS
* error is returned). If key is not found, item has type ICMAP_VALUETYPE_NOT_EXIST
* and length zero.
*/
static cs_error_t cmap_mcast_send(enum cmap_mcast_reason reason, int argc, char *argv[]);
static void cmap_sync_init (
const unsigned int *trans_list,
size_t trans_list_entries,
const unsigned int *member_list,
size_t member_list_entries,
const struct memb_ring_id *ring_id);
static int cmap_sync_process (void);
static void cmap_sync_activate (void);
static void cmap_sync_abort (void);
static void cmap_config_version_track_cb(
int32_t event,
const char *key_name,
struct icmap_notify_value new_value,
struct icmap_notify_value old_value,
void *user_data);
/*
* Library Handler Definition
*/
static struct corosync_lib_handler cmap_lib_engine[] =
{
{ /* 0 */
.lib_handler_fn = message_handler_req_lib_cmap_set,
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 1 */
.lib_handler_fn = message_handler_req_lib_cmap_delete,
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 2 */
.lib_handler_fn = message_handler_req_lib_cmap_get,
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 3 */
.lib_handler_fn = message_handler_req_lib_cmap_adjust_int,
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 4 */
.lib_handler_fn = message_handler_req_lib_cmap_iter_init,
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 5 */
.lib_handler_fn = message_handler_req_lib_cmap_iter_next,
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 6 */
.lib_handler_fn = message_handler_req_lib_cmap_iter_finalize,
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 7 */
.lib_handler_fn = message_handler_req_lib_cmap_track_add,
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 8 */
.lib_handler_fn = message_handler_req_lib_cmap_track_delete,
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
},
};
static struct corosync_exec_handler cmap_exec_engine[] =
{
{ /* 0 - MESSAGE_REQ_EXEC_CMAP_MCAST */
.exec_handler_fn = message_handler_req_exec_cmap_mcast,
.exec_endian_convert_fn = exec_cmap_mcast_endian_convert
},
};
struct corosync_service_engine cmap_service_engine = {
.name = "corosync configuration map access",
.id = CMAP_SERVICE,
.priority = 1,
.private_data_size = sizeof(struct cmap_conn_info),
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED,
.allow_inquorate = CS_LIB_ALLOW_INQUORATE,
.lib_init_fn = cmap_lib_init_fn,
.lib_exit_fn = cmap_lib_exit_fn,
.lib_engine = cmap_lib_engine,
.lib_engine_count = sizeof (cmap_lib_engine) / sizeof (struct corosync_lib_handler),
.exec_init_fn = cmap_exec_init_fn,
.exec_exit_fn = cmap_exec_exit_fn,
.exec_engine = cmap_exec_engine,
.exec_engine_count = sizeof (cmap_exec_engine) / sizeof (struct corosync_exec_handler),
.sync_init = cmap_sync_init,
.sync_process = cmap_sync_process,
.sync_activate = cmap_sync_activate,
.sync_abort = cmap_sync_abort
};
struct corosync_service_engine *cmap_get_service_engine_ver0 (void)
{
return (&cmap_service_engine);
}
struct req_exec_cmap_mcast_item {
mar_name_t key_name __attribute__((aligned(8)));
mar_uint8_t value_type __attribute__((aligned(8)));
mar_size_t value_len __attribute__((aligned(8)));
uint8_t value[] __attribute__((aligned(8)));
};
struct req_exec_cmap_mcast {
struct qb_ipc_request_header header __attribute__((aligned(8)));
mar_uint8_t reason __attribute__((aligned(8)));
mar_uint8_t no_items __attribute__((aligned(8)));
mar_uint8_t reserved1 __attribute__((aligned(8)));
mar_uint8_t reserver2 __attribute__((aligned(8)));
/*
* Following are array of req_exec_cmap_mcast_item alligned to 8 bytes
*/
};
static size_t cmap_sync_trans_list_entries = 0;
static size_t cmap_sync_member_list_entries = 0;
static uint64_t cmap_highest_config_version_received = 0;
static uint64_t cmap_my_config_version = 0;
static int cmap_first_sync = 1;
static icmap_track_t cmap_config_version_track;
static void cmap_config_version_track_cb(
int32_t event,
const char *key_name,
struct icmap_notify_value new_value,
struct icmap_notify_value old_value,
void *user_data)
{
const char *key = "totem.config_version";
cs_error_t ret;
ENTER();
if (icmap_get_uint64("totem.config_version", &cmap_my_config_version) != CS_OK) {
cmap_my_config_version = 0;
}
ret = cmap_mcast_send(CMAP_MCAST_REASON_NEW_CONFIG_VERSION, 1, (char **)&key);
if (ret != CS_OK) {
log_printf(LOGSYS_LEVEL_ERROR, "Can't inform other nodes about new config version");
}
LEAVE();
}
static int cmap_exec_exit_fn(void)
{
if (icmap_track_delete(cmap_config_version_track) != CS_OK) {
log_printf(LOGSYS_LEVEL_ERROR, "Can't delete config_version icmap tracker");
}
return 0;
}
static char *cmap_exec_init_fn (
struct corosync_api_v1 *corosync_api)
{
cs_error_t ret;
api = corosync_api;
ret = icmap_track_add("totem.config_version",
ICMAP_TRACK_ADD | ICMAP_TRACK_DELETE | ICMAP_TRACK_MODIFY,
cmap_config_version_track_cb,
NULL,
&cmap_config_version_track);
if (ret != CS_OK) {
return ((char *)"Can't add config_version icmap tracker");
}
return (NULL);
}
static int cmap_lib_init_fn (void *conn)
{
struct cmap_conn_info *conn_info = (struct cmap_conn_info *)api->ipc_private_data_get (conn);
log_printf(LOGSYS_LEVEL_DEBUG, "lib_init_fn: conn=%p", conn);
api->ipc_refcnt_inc(conn);
memset(conn_info, 0, sizeof(*conn_info));
hdb_create(&conn_info->iter_db);
hdb_create(&conn_info->track_db);
return (0);
}
static int cmap_lib_exit_fn (void *conn)
{
struct cmap_conn_info *conn_info = (struct cmap_conn_info *)api->ipc_private_data_get (conn);
hdb_handle_t iter_handle = 0;
icmap_iter_t *iter;
hdb_handle_t track_handle = 0;
icmap_track_t *track;
log_printf(LOGSYS_LEVEL_DEBUG, "exit_fn for conn=%p", conn);
hdb_iterator_reset(&conn_info->iter_db);
while (hdb_iterator_next(&conn_info->iter_db,
(void*)&iter, &iter_handle) == 0) {
icmap_iter_finalize(*iter);
(void)hdb_handle_put (&conn_info->iter_db, iter_handle);
}
hdb_destroy(&conn_info->iter_db);
hdb_iterator_reset(&conn_info->track_db);
while (hdb_iterator_next(&conn_info->track_db,
(void*)&track, &track_handle) == 0) {
free(icmap_track_get_user_data(*track));
icmap_track_delete(*track);
(void)hdb_handle_put (&conn_info->track_db, track_handle);
}
hdb_destroy(&conn_info->track_db);
api->ipc_refcnt_dec(conn);
return (0);
}
static void cmap_sync_init (
const unsigned int *trans_list,
size_t trans_list_entries,
const unsigned int *member_list,
size_t member_list_entries,
const struct memb_ring_id *ring_id)
{
cmap_sync_trans_list_entries = trans_list_entries;
cmap_sync_member_list_entries = member_list_entries;
cmap_highest_config_version_received = 0;
if (icmap_get_uint64("totem.config_version", &cmap_my_config_version) != CS_OK) {
cmap_my_config_version = 0;
}
}
static int cmap_sync_process (void)
{
const char *key = "totem.config_version";
cs_error_t ret;
ret = cmap_mcast_send(CMAP_MCAST_REASON_SYNC, 1, (char **)&key);
return (ret == CS_OK ? 0 : -1);
}
static void cmap_sync_activate (void)
{
if (cmap_sync_trans_list_entries == 0) {
log_printf(LOGSYS_LEVEL_DEBUG, "Single node sync -> no action");
return ;
}
if (cmap_first_sync == 1) {
cmap_first_sync = 0;
} else {
log_printf(LOGSYS_LEVEL_DEBUG, "Not first sync -> no action");
return ;
}
if (cmap_my_config_version == 0) {
log_printf(LOGSYS_LEVEL_DEBUG, "My config version is 0 -> no action");
return ;
}
if (cmap_highest_config_version_received == 0) {
log_printf(LOGSYS_LEVEL_DEBUG, "Other nodes version is 0 -> no action");
return ;
}
if (cmap_highest_config_version_received != cmap_my_config_version) {
log_printf(LOGSYS_LEVEL_ERROR,
"Received config version (%"PRIu64") is different than my config version (%"PRIu64")! Exiting",
cmap_highest_config_version_received, cmap_my_config_version);
api->shutdown_request();
return ;
}
}
static void cmap_sync_abort (void)
{
}
static void message_handler_req_lib_cmap_set(void *conn, const void *message)
{
const struct req_lib_cmap_set *req_lib_cmap_set = message;
struct res_lib_cmap_set res_lib_cmap_set;
cs_error_t ret;
if (icmap_is_key_ro((char *)req_lib_cmap_set->key_name.value)) {
ret = CS_ERR_ACCESS;
} else {
ret = icmap_set((char *)req_lib_cmap_set->key_name.value, &req_lib_cmap_set->value,
req_lib_cmap_set->value_len, req_lib_cmap_set->type);
}
memset(&res_lib_cmap_set, 0, sizeof(res_lib_cmap_set));
res_lib_cmap_set.header.size = sizeof(res_lib_cmap_set);
res_lib_cmap_set.header.id = MESSAGE_RES_CMAP_SET;
res_lib_cmap_set.header.error = ret;
api->ipc_response_send(conn, &res_lib_cmap_set, sizeof(res_lib_cmap_set));
}
static void message_handler_req_lib_cmap_delete(void *conn, const void *message)
{
const struct req_lib_cmap_set *req_lib_cmap_set = message;
struct res_lib_cmap_delete res_lib_cmap_delete;
cs_error_t ret;
if (icmap_is_key_ro((char *)req_lib_cmap_set->key_name.value)) {
ret = CS_ERR_ACCESS;
} else {
ret = icmap_delete((char *)req_lib_cmap_set->key_name.value);
}
memset(&res_lib_cmap_delete, 0, sizeof(res_lib_cmap_delete));
res_lib_cmap_delete.header.size = sizeof(res_lib_cmap_delete);
res_lib_cmap_delete.header.id = MESSAGE_RES_CMAP_DELETE;
res_lib_cmap_delete.header.error = ret;
api->ipc_response_send(conn, &res_lib_cmap_delete, sizeof(res_lib_cmap_delete));
}
static void message_handler_req_lib_cmap_get(void *conn, const void *message)
{
const struct req_lib_cmap_get *req_lib_cmap_get = message;
struct res_lib_cmap_get *res_lib_cmap_get;
struct res_lib_cmap_get error_res_lib_cmap_get;
cs_error_t ret;
size_t value_len;
size_t res_lib_cmap_get_size;
icmap_value_types_t type;
void *value;
value_len = req_lib_cmap_get->value_len;
res_lib_cmap_get_size = sizeof(*res_lib_cmap_get) + value_len;
res_lib_cmap_get = malloc(res_lib_cmap_get_size);
if (res_lib_cmap_get == NULL) {
ret = CS_ERR_NO_MEMORY;
goto error_exit;
}
memset(res_lib_cmap_get, 0, res_lib_cmap_get_size);
if (value_len > 0) {
value = res_lib_cmap_get->value;
} else {
value = NULL;
}
ret = icmap_get((char *)req_lib_cmap_get->key_name.value,
value,
&value_len,
&type);
if (ret != CS_OK) {
free(res_lib_cmap_get);
goto error_exit;
}
res_lib_cmap_get->header.size = res_lib_cmap_get_size;
res_lib_cmap_get->header.id = MESSAGE_RES_CMAP_GET;
res_lib_cmap_get->header.error = ret;
res_lib_cmap_get->type = type;
res_lib_cmap_get->value_len = value_len;
api->ipc_response_send(conn, res_lib_cmap_get, res_lib_cmap_get_size);
free(res_lib_cmap_get);
return ;
error_exit:
memset(&error_res_lib_cmap_get, 0, sizeof(error_res_lib_cmap_get));
error_res_lib_cmap_get.header.size = sizeof(error_res_lib_cmap_get);
error_res_lib_cmap_get.header.id = MESSAGE_RES_CMAP_GET;
error_res_lib_cmap_get.header.error = ret;
api->ipc_response_send(conn, &error_res_lib_cmap_get, sizeof(error_res_lib_cmap_get));
}
static void message_handler_req_lib_cmap_adjust_int(void *conn, const void *message)
{
const struct req_lib_cmap_adjust_int *req_lib_cmap_adjust_int = message;
struct res_lib_cmap_adjust_int res_lib_cmap_adjust_int;
cs_error_t ret;
if (icmap_is_key_ro((char *)req_lib_cmap_adjust_int->key_name.value)) {
ret = CS_ERR_ACCESS;
} else {
ret = icmap_adjust_int((char *)req_lib_cmap_adjust_int->key_name.value,
req_lib_cmap_adjust_int->step);
}
memset(&res_lib_cmap_adjust_int, 0, sizeof(res_lib_cmap_adjust_int));
res_lib_cmap_adjust_int.header.size = sizeof(res_lib_cmap_adjust_int);
res_lib_cmap_adjust_int.header.id = MESSAGE_RES_CMAP_ADJUST_INT;
res_lib_cmap_adjust_int.header.error = ret;
api->ipc_response_send(conn, &res_lib_cmap_adjust_int, sizeof(res_lib_cmap_adjust_int));
}
static void message_handler_req_lib_cmap_iter_init(void *conn, const void *message)
{
const struct req_lib_cmap_iter_init *req_lib_cmap_iter_init = message;
struct res_lib_cmap_iter_init res_lib_cmap_iter_init;
cs_error_t ret;
icmap_iter_t iter;
icmap_iter_t *hdb_iter;
cmap_iter_handle_t handle = 0ULL;
const char *prefix;
struct cmap_conn_info *conn_info = (struct cmap_conn_info *)api->ipc_private_data_get (conn);
if (req_lib_cmap_iter_init->prefix.length > 0) {
prefix = (char *)req_lib_cmap_iter_init->prefix.value;
} else {
prefix = NULL;
}
iter = icmap_iter_init(prefix);
if (iter == NULL) {
ret = CS_ERR_NO_SECTIONS;
goto reply_send;
}
ret = hdb_error_to_cs(hdb_handle_create(&conn_info->iter_db, sizeof(iter), &handle));
if (ret != CS_OK) {
goto reply_send;
}
ret = hdb_error_to_cs(hdb_handle_get(&conn_info->iter_db, handle, (void *)&hdb_iter));
if (ret != CS_OK) {
goto reply_send;
}
*hdb_iter = iter;
(void)hdb_handle_put (&conn_info->iter_db, handle);
reply_send:
memset(&res_lib_cmap_iter_init, 0, sizeof(res_lib_cmap_iter_init));
res_lib_cmap_iter_init.header.size = sizeof(res_lib_cmap_iter_init);
res_lib_cmap_iter_init.header.id = MESSAGE_RES_CMAP_ITER_INIT;
res_lib_cmap_iter_init.header.error = ret;
res_lib_cmap_iter_init.iter_handle = handle;
api->ipc_response_send(conn, &res_lib_cmap_iter_init, sizeof(res_lib_cmap_iter_init));
}
static void message_handler_req_lib_cmap_iter_next(void *conn, const void *message)
{
const struct req_lib_cmap_iter_next *req_lib_cmap_iter_next = message;
struct res_lib_cmap_iter_next res_lib_cmap_iter_next;
cs_error_t ret;
icmap_iter_t *iter;
size_t value_len = 0;
icmap_value_types_t type = 0;
const char *res = NULL;
struct cmap_conn_info *conn_info = (struct cmap_conn_info *)api->ipc_private_data_get (conn);
ret = hdb_error_to_cs(hdb_handle_get(&conn_info->iter_db,
req_lib_cmap_iter_next->iter_handle, (void *)&iter));
if (ret != CS_OK) {
goto reply_send;
}
res = icmap_iter_next(*iter, &value_len, &type);
if (res == NULL) {
ret = CS_ERR_NO_SECTIONS;
}
(void)hdb_handle_put (&conn_info->iter_db, req_lib_cmap_iter_next->iter_handle);
reply_send:
memset(&res_lib_cmap_iter_next, 0, sizeof(res_lib_cmap_iter_next));
res_lib_cmap_iter_next.header.size = sizeof(res_lib_cmap_iter_next);
res_lib_cmap_iter_next.header.id = MESSAGE_RES_CMAP_ITER_NEXT;
res_lib_cmap_iter_next.header.error = ret;
if (res != NULL) {
res_lib_cmap_iter_next.value_len = value_len;
res_lib_cmap_iter_next.type = type;
memcpy(res_lib_cmap_iter_next.key_name.value, res, strlen(res));
res_lib_cmap_iter_next.key_name.length = strlen(res);
}
api->ipc_response_send(conn, &res_lib_cmap_iter_next, sizeof(res_lib_cmap_iter_next));
}
static void message_handler_req_lib_cmap_iter_finalize(void *conn, const void *message)
{
const struct req_lib_cmap_iter_finalize *req_lib_cmap_iter_finalize = message;
struct res_lib_cmap_iter_finalize res_lib_cmap_iter_finalize;
cs_error_t ret;
icmap_iter_t *iter;
struct cmap_conn_info *conn_info = (struct cmap_conn_info *)api->ipc_private_data_get (conn);
ret = hdb_error_to_cs(hdb_handle_get(&conn_info->iter_db,
req_lib_cmap_iter_finalize->iter_handle, (void *)&iter));
if (ret != CS_OK) {
goto reply_send;
}
icmap_iter_finalize(*iter);
(void)hdb_handle_destroy(&conn_info->iter_db, req_lib_cmap_iter_finalize->iter_handle);
(void)hdb_handle_put (&conn_info->iter_db, req_lib_cmap_iter_finalize->iter_handle);
reply_send:
memset(&res_lib_cmap_iter_finalize, 0, sizeof(res_lib_cmap_iter_finalize));
res_lib_cmap_iter_finalize.header.size = sizeof(res_lib_cmap_iter_finalize);
res_lib_cmap_iter_finalize.header.id = MESSAGE_RES_CMAP_ITER_FINALIZE;
res_lib_cmap_iter_finalize.header.error = ret;
api->ipc_response_send(conn, &res_lib_cmap_iter_finalize, sizeof(res_lib_cmap_iter_finalize));
}
static void cmap_notify_fn(int32_t event,
const char *key_name,
struct icmap_notify_value new_val,
struct icmap_notify_value old_val,
void *user_data)
{
struct cmap_track_user_data *cmap_track_user_data = (struct cmap_track_user_data *)user_data;
struct res_lib_cmap_notify_callback res_lib_cmap_notify_callback;
struct iovec iov[3];
memset(&res_lib_cmap_notify_callback, 0, sizeof(res_lib_cmap_notify_callback));
res_lib_cmap_notify_callback.header.size = sizeof(res_lib_cmap_notify_callback) + new_val.len + old_val.len;
res_lib_cmap_notify_callback.header.id = MESSAGE_RES_CMAP_NOTIFY_CALLBACK;
res_lib_cmap_notify_callback.header.error = CS_OK;
res_lib_cmap_notify_callback.new_value_type = new_val.type;
res_lib_cmap_notify_callback.old_value_type = old_val.type;
res_lib_cmap_notify_callback.new_value_len = new_val.len;
res_lib_cmap_notify_callback.old_value_len = old_val.len;
res_lib_cmap_notify_callback.event = event;
res_lib_cmap_notify_callback.key_name.length = strlen(key_name);
res_lib_cmap_notify_callback.track_inst_handle = cmap_track_user_data->track_inst_handle;
memcpy(res_lib_cmap_notify_callback.key_name.value, key_name, strlen(key_name));
iov[0].iov_base = (char *)&res_lib_cmap_notify_callback;
iov[0].iov_len = sizeof(res_lib_cmap_notify_callback);
iov[1].iov_base = (char *)new_val.data;
iov[1].iov_len = new_val.len;
iov[2].iov_base = (char *)old_val.data;
iov[2].iov_len = old_val.len;
api->ipc_dispatch_iov_send(cmap_track_user_data->conn, iov, 3);
}
static void message_handler_req_lib_cmap_track_add(void *conn, const void *message)
{
const struct req_lib_cmap_track_add *req_lib_cmap_track_add = message;
struct res_lib_cmap_track_add res_lib_cmap_track_add;
cs_error_t ret;
cmap_track_handle_t handle = 0;
icmap_track_t track = NULL;
icmap_track_t *hdb_track;
struct cmap_track_user_data *cmap_track_user_data;
const char *key_name;
struct cmap_conn_info *conn_info = (struct cmap_conn_info *)api->ipc_private_data_get (conn);
cmap_track_user_data = malloc(sizeof(*cmap_track_user_data));
if (cmap_track_user_data == NULL) {
ret = CS_ERR_NO_MEMORY;
goto reply_send;
}
memset(cmap_track_user_data, 0, sizeof(*cmap_track_user_data));
if (req_lib_cmap_track_add->key_name.length > 0) {
key_name = (char *)req_lib_cmap_track_add->key_name.value;
} else {
key_name = NULL;
}
ret = icmap_track_add(key_name,
req_lib_cmap_track_add->track_type,
cmap_notify_fn,
cmap_track_user_data,
&track);
if (ret != CS_OK) {
free(cmap_track_user_data);
goto reply_send;
}
ret = hdb_error_to_cs(hdb_handle_create(&conn_info->track_db, sizeof(track), &handle));
if (ret != CS_OK) {
free(cmap_track_user_data);
goto reply_send;
}
ret = hdb_error_to_cs(hdb_handle_get(&conn_info->track_db, handle, (void *)&hdb_track));
if (ret != CS_OK) {
free(cmap_track_user_data);
goto reply_send;
}
*hdb_track = track;
cmap_track_user_data->conn = conn;
cmap_track_user_data->track_handle = handle;
cmap_track_user_data->track_inst_handle = req_lib_cmap_track_add->track_inst_handle;
(void)hdb_handle_put (&conn_info->track_db, handle);
reply_send:
memset(&res_lib_cmap_track_add, 0, sizeof(res_lib_cmap_track_add));
res_lib_cmap_track_add.header.size = sizeof(res_lib_cmap_track_add);
res_lib_cmap_track_add.header.id = MESSAGE_RES_CMAP_TRACK_ADD;
res_lib_cmap_track_add.header.error = ret;
res_lib_cmap_track_add.track_handle = handle;
api->ipc_response_send(conn, &res_lib_cmap_track_add, sizeof(res_lib_cmap_track_add));
}
static void message_handler_req_lib_cmap_track_delete(void *conn, const void *message)
{
const struct req_lib_cmap_track_delete *req_lib_cmap_track_delete = message;
struct res_lib_cmap_track_delete res_lib_cmap_track_delete;
cs_error_t ret;
icmap_track_t *track;
struct cmap_conn_info *conn_info = (struct cmap_conn_info *)api->ipc_private_data_get (conn);
uint64_t track_inst_handle = 0;
ret = hdb_error_to_cs(hdb_handle_get(&conn_info->track_db,
req_lib_cmap_track_delete->track_handle, (void *)&track));
if (ret != CS_OK) {
goto reply_send;
}
track_inst_handle = ((struct cmap_track_user_data *)icmap_track_get_user_data(*track))->track_inst_handle;
free(icmap_track_get_user_data(*track));
ret = icmap_track_delete(*track);
(void)hdb_handle_put (&conn_info->track_db, req_lib_cmap_track_delete->track_handle);
(void)hdb_handle_destroy(&conn_info->track_db, req_lib_cmap_track_delete->track_handle);
reply_send:
memset(&res_lib_cmap_track_delete, 0, sizeof(res_lib_cmap_track_delete));
res_lib_cmap_track_delete.header.size = sizeof(res_lib_cmap_track_delete);
res_lib_cmap_track_delete.header.id = MESSAGE_RES_CMAP_TRACK_DELETE;
res_lib_cmap_track_delete.header.error = ret;
res_lib_cmap_track_delete.track_inst_handle = track_inst_handle;
api->ipc_response_send(conn, &res_lib_cmap_track_delete, sizeof(res_lib_cmap_track_delete));
}
static cs_error_t cmap_mcast_send(enum cmap_mcast_reason reason, int argc, char *argv[])
{
int i;
size_t value_len;
icmap_value_types_t value_type;
cs_error_t err;
size_t item_len;
size_t msg_len = 0;
struct req_exec_cmap_mcast req_exec_cmap_mcast;
struct req_exec_cmap_mcast_item *item = NULL;
struct iovec req_exec_cmap_iovec[MAX_REQ_EXEC_CMAP_MCAST_ITEMS + 1];
ENTER();
if (argc > MAX_REQ_EXEC_CMAP_MCAST_ITEMS) {
return (CS_ERR_TOO_MANY_GROUPS);
}
memset(req_exec_cmap_iovec, 0, sizeof(req_exec_cmap_iovec));
for (i = 0; i < argc; i++) {
err = icmap_get(argv[i], NULL, &value_len, &value_type);
if (err != CS_OK && err != CS_ERR_NOT_EXIST) {
goto free_mem;
}
if (err == CS_ERR_NOT_EXIST) {
value_type = ICMAP_VALUETYPE_NOT_EXIST;
value_len = 0;
}
item_len = MAR_ALIGN_UP(sizeof(*item) + value_len, 8);
item = malloc(item_len);
if (item == NULL) {
goto free_mem;
}
memset(item, 0, item_len);
item->value_type = value_type;
item->value_len = value_len;
item->key_name.length = strlen(argv[i]);
strcpy((char *)item->key_name.value, argv[i]);
if (value_type != ICMAP_VALUETYPE_NOT_EXIST) {
err = icmap_get(argv[i], item->value, &value_len, &value_type);
if (err != CS_OK) {
goto free_mem;
}
}
req_exec_cmap_iovec[i + 1].iov_base = item;
req_exec_cmap_iovec[i + 1].iov_len = item_len;
msg_len += item_len;
qb_log(LOG_TRACE, "Item %u - type %u, len %zu", i, item->value_type, item->value_len);
item = NULL;
}
memset(&req_exec_cmap_mcast, 0, sizeof(req_exec_cmap_mcast));
req_exec_cmap_mcast.header.size = sizeof(req_exec_cmap_mcast) + msg_len;
req_exec_cmap_mcast.reason = reason;
req_exec_cmap_mcast.no_items = argc;
req_exec_cmap_iovec[0].iov_base = &req_exec_cmap_mcast;
req_exec_cmap_iovec[0].iov_len = sizeof(req_exec_cmap_mcast);
qb_log(LOG_TRACE, "Sending %u items (%u iovec) for reason %u", argc, argc + 1, reason);
err = (api->totem_mcast(req_exec_cmap_iovec, argc + 1, TOTEM_AGREED) == 0 ? CS_OK : CS_ERR_MESSAGE_ERROR);
free_mem:
for (i = 0; i < argc; i++) {
free(req_exec_cmap_iovec[i + 1].iov_base);
}
free(item);
LEAVE();
return (err);
}
static struct req_exec_cmap_mcast_item *cmap_mcast_item_find(
const void *message,
char *key)
{
const struct req_exec_cmap_mcast *req_exec_cmap_mcast = message;
int i;
const char *p;
struct req_exec_cmap_mcast_item *item;
mar_uint16_t key_name_len;
p = (const char *)message + sizeof(*req_exec_cmap_mcast);
for (i = 0; i < req_exec_cmap_mcast->no_items; i++) {
item = (struct req_exec_cmap_mcast_item *)p;
key_name_len = item->key_name.length;
if (strlen(key) == key_name_len && strcmp((char *)item->key_name.value, key) == 0) {
return (item);
}
p += MAR_ALIGN_UP(sizeof(*item) + item->value_len, 8);
}
return (NULL);
}
static void message_handler_req_exec_cmap_mcast_reason_sync_nv(
enum cmap_mcast_reason reason,
const void *message,
unsigned int nodeid)
{
char member_config_version[ICMAP_KEYNAME_MAXLEN];
uint64_t config_version = 0;
struct req_exec_cmap_mcast_item *item;
mar_size_t value_len;
ENTER();
item = cmap_mcast_item_find(message, (char *)"totem.config_version");
if (item != NULL) {
value_len = item->value_len;
if (item->value_type == ICMAP_VALUETYPE_NOT_EXIST) {
config_version = 0;
}
if (item->value_type == ICMAP_VALUETYPE_UINT64) {
memcpy(&config_version, item->value, value_len);
}
}
qb_log(LOG_TRACE, "Received config version %"PRIu64" from node %x", config_version, nodeid);
if (nodeid != api->totem_nodeid_get() &&
config_version > cmap_highest_config_version_received) {
cmap_highest_config_version_received = config_version;
}
snprintf(member_config_version, ICMAP_KEYNAME_MAXLEN,
"runtime.totem.pg.mrp.srp.members.%u.config_version", nodeid);
icmap_set_uint64(member_config_version, config_version);
LEAVE();
}
static void message_handler_req_exec_cmap_mcast(
const void *message,
unsigned int nodeid)
{
const struct req_exec_cmap_mcast *req_exec_cmap_mcast = message;
ENTER();
switch (req_exec_cmap_mcast->reason) {
case CMAP_MCAST_REASON_SYNC:
message_handler_req_exec_cmap_mcast_reason_sync_nv(req_exec_cmap_mcast->reason,
message, nodeid);
break;
case CMAP_MCAST_REASON_NEW_CONFIG_VERSION:
message_handler_req_exec_cmap_mcast_reason_sync_nv(req_exec_cmap_mcast->reason,
message, nodeid);
break;
default:
qb_log(LOG_TRACE, "Received mcast with unknown reason %u", req_exec_cmap_mcast->reason);
};
LEAVE();
}
static void exec_cmap_mcast_endian_convert(void *message)
{
struct req_exec_cmap_mcast *req_exec_cmap_mcast = message;
const char *p;
int i;
struct req_exec_cmap_mcast_item *item;
uint16_t u16;
uint32_t u32;
uint64_t u64;
float flt;
double dbl;
swab_coroipc_request_header_t(&req_exec_cmap_mcast->header);
p = (const char *)message + sizeof(*req_exec_cmap_mcast);
for (i = 0; i < req_exec_cmap_mcast->no_items; i++) {
item = (struct req_exec_cmap_mcast_item *)p;
swab_mar_uint16_t(&item->key_name.length);
swab_mar_size_t(&item->value_len);
switch (item->value_type) {
case ICMAP_VALUETYPE_INT16:
case ICMAP_VALUETYPE_UINT16:
memcpy(&u16, item->value, sizeof(u16));
u16 = swab16(u16);
memcpy(item->value, &u16, sizeof(u16));
break;
case ICMAP_VALUETYPE_INT32:
case ICMAP_VALUETYPE_UINT32:
memcpy(&u32, item->value, sizeof(u32));
u32 = swab32(u32);
memcpy(item->value, &u32, sizeof(u32));
break;
case ICMAP_VALUETYPE_INT64:
case ICMAP_VALUETYPE_UINT64:
memcpy(&u64, item->value, sizeof(u64));
u64 = swab64(u64);
memcpy(item->value, &u64, sizeof(u64));
break;
case ICMAP_VALUETYPE_FLOAT:
memcpy(&flt, item->value, sizeof(flt));
swabflt(&flt);
memcpy(item->value, &flt, sizeof(flt));
break;
case ICMAP_VALUETYPE_DOUBLE:
memcpy(&dbl, item->value, sizeof(dbl));
swabdbl(&dbl);
memcpy(item->value, &dbl, sizeof(dbl));
break;
}
p += MAR_ALIGN_UP(sizeof(*item) + item->value_len, 8);
}
}
diff --git a/exec/cpg.c b/exec/cpg.c
index 5bd830fd..78ac1e9e 100644
--- a/exec/cpg.c
+++ b/exec/cpg.c
@@ -1,2438 +1,2436 @@
/*
* Copyright (c) 2006-2015 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Christine Caulfield (ccaulfie@redhat.com)
* Author: Jan Friesse (jfriesse@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <config.h>
#ifdef HAVE_ALLOCA_H
#include <alloca.h>
#endif
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/ioctl.h>
#include <netinet/in.h>
#include <sys/uio.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <time.h>
#include <assert.h>
-#include <unistd.h>
-#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/mman.h>
#include <qb/qbmap.h>
#include <corosync/corotypes.h>
#include <qb/qbipc_common.h>
#include <corosync/corodefs.h>
#include <corosync/list.h>
#include <corosync/logsys.h>
#include <corosync/coroapi.h>
#include <corosync/cpg.h>
#include <corosync/ipc_cpg.h>
#ifndef MAP_ANONYMOUS
#define MAP_ANONYMOUS MAP_ANON
#endif
#include "service.h"
LOGSYS_DECLARE_SUBSYS ("CPG");
#define GROUP_HASH_SIZE 32
enum cpg_message_req_types {
MESSAGE_REQ_EXEC_CPG_PROCJOIN = 0,
MESSAGE_REQ_EXEC_CPG_PROCLEAVE = 1,
MESSAGE_REQ_EXEC_CPG_JOINLIST = 2,
MESSAGE_REQ_EXEC_CPG_MCAST = 3,
MESSAGE_REQ_EXEC_CPG_DOWNLIST_OLD = 4,
MESSAGE_REQ_EXEC_CPG_DOWNLIST = 5,
MESSAGE_REQ_EXEC_CPG_PARTIAL_MCAST = 6,
};
struct zcb_mapped {
struct list_head list;
void *addr;
size_t size;
};
/*
* state` exec deliver
* match group name, pid -> if matched deliver for YES:
* XXX indicates impossible state
*
* join leave mcast
* UNJOINED XXX XXX NO
* LEAVE_STARTED XXX YES(unjoined_enter) YES
* JOIN_STARTED YES(join_started_enter) XXX NO
* JOIN_COMPLETED XXX NO YES
*
* join_started_enter
* set JOIN_COMPLETED
* add entry to process_info list
* unjoined_enter
* set UNJOINED
* delete entry from process_info list
*
*
* library accept join error codes
* UNJOINED YES(CS_OK) set JOIN_STARTED
* LEAVE_STARTED NO(CS_ERR_BUSY)
* JOIN_STARTED NO(CS_ERR_EXIST)
* JOIN_COMPlETED NO(CS_ERR_EXIST)
*
* library accept leave error codes
* UNJOINED NO(CS_ERR_NOT_EXIST)
* LEAVE_STARTED NO(CS_ERR_NOT_EXIST)
* JOIN_STARTED NO(CS_ERR_BUSY)
* JOIN_COMPLETED YES(CS_OK) set LEAVE_STARTED
*
* library accept mcast
* UNJOINED NO(CS_ERR_NOT_EXIST)
* LEAVE_STARTED NO(CS_ERR_NOT_EXIST)
* JOIN_STARTED YES(CS_OK)
* JOIN_COMPLETED YES(CS_OK)
*/
enum cpd_state {
CPD_STATE_UNJOINED,
CPD_STATE_LEAVE_STARTED,
CPD_STATE_JOIN_STARTED,
CPD_STATE_JOIN_COMPLETED
};
enum cpg_sync_state {
CPGSYNC_DOWNLIST,
CPGSYNC_JOINLIST
};
enum cpg_downlist_state_e {
CPG_DOWNLIST_NONE,
CPG_DOWNLIST_WAITING_FOR_MESSAGES,
CPG_DOWNLIST_APPLYING,
};
static enum cpg_downlist_state_e downlist_state;
static struct list_head downlist_messages_head;
static struct list_head joinlist_messages_head;
struct cpg_pd {
void *conn;
mar_cpg_name_t group_name;
uint32_t pid;
enum cpd_state cpd_state;
unsigned int flags;
int initial_totem_conf_sent;
uint64_t transition_counter; /* These two are used when sending fragmented messages */
uint64_t initial_transition_counter;
struct list_head list;
struct list_head iteration_instance_list_head;
struct list_head zcb_mapped_list_head;
};
struct cpg_iteration_instance {
hdb_handle_t handle;
struct list_head list;
struct list_head items_list_head; /* List of process_info */
struct list_head *current_pointer;
};
DECLARE_HDB_DATABASE(cpg_iteration_handle_t_db,NULL);
DECLARE_LIST_INIT(cpg_pd_list_head);
static unsigned int my_member_list[PROCESSOR_COUNT_MAX];
static unsigned int my_member_list_entries;
static unsigned int my_old_member_list[PROCESSOR_COUNT_MAX];
static unsigned int my_old_member_list_entries = 0;
static struct corosync_api_v1 *api = NULL;
static enum cpg_sync_state my_sync_state = CPGSYNC_DOWNLIST;
static mar_cpg_ring_id_t last_sync_ring_id;
struct process_info {
unsigned int nodeid;
uint32_t pid;
mar_cpg_name_t group;
struct list_head list; /* on the group_info members list */
};
DECLARE_LIST_INIT(process_info_list_head);
struct join_list_entry {
uint32_t pid;
mar_cpg_name_t group_name;
};
/*
* Service Interfaces required by service_message_handler struct
*/
static char *cpg_exec_init_fn (struct corosync_api_v1 *);
static int cpg_lib_init_fn (void *conn);
static int cpg_lib_exit_fn (void *conn);
static void message_handler_req_exec_cpg_procjoin (
const void *message,
unsigned int nodeid);
static void message_handler_req_exec_cpg_procleave (
const void *message,
unsigned int nodeid);
static void message_handler_req_exec_cpg_joinlist (
const void *message,
unsigned int nodeid);
static void message_handler_req_exec_cpg_mcast (
const void *message,
unsigned int nodeid);
static void message_handler_req_exec_cpg_partial_mcast (
const void *message,
unsigned int nodeid);
static void message_handler_req_exec_cpg_downlist_old (
const void *message,
unsigned int nodeid);
static void message_handler_req_exec_cpg_downlist (
const void *message,
unsigned int nodeid);
static void exec_cpg_procjoin_endian_convert (void *msg);
static void exec_cpg_joinlist_endian_convert (void *msg);
static void exec_cpg_mcast_endian_convert (void *msg);
static void exec_cpg_partial_mcast_endian_convert (void *msg);
static void exec_cpg_downlist_endian_convert_old (void *msg);
static void exec_cpg_downlist_endian_convert (void *msg);
static void message_handler_req_lib_cpg_join (void *conn, const void *message);
static void message_handler_req_lib_cpg_leave (void *conn, const void *message);
static void message_handler_req_lib_cpg_finalize (void *conn, const void *message);
static void message_handler_req_lib_cpg_mcast (void *conn, const void *message);
static void message_handler_req_lib_cpg_partial_mcast (void *conn, const void *message);
static void message_handler_req_lib_cpg_membership (void *conn,
const void *message);
static void message_handler_req_lib_cpg_local_get (void *conn,
const void *message);
static void message_handler_req_lib_cpg_iteration_initialize (
void *conn,
const void *message);
static void message_handler_req_lib_cpg_iteration_next (
void *conn,
const void *message);
static void message_handler_req_lib_cpg_iteration_finalize (
void *conn,
const void *message);
static void message_handler_req_lib_cpg_zc_alloc (
void *conn,
const void *message);
static void message_handler_req_lib_cpg_zc_free (
void *conn,
const void *message);
static void message_handler_req_lib_cpg_zc_execute (
void *conn,
const void *message);
static int cpg_node_joinleave_send (unsigned int pid, const mar_cpg_name_t *group_name, int fn, int reason);
static int cpg_exec_send_downlist(void);
static int cpg_exec_send_joinlist(void);
static void downlist_messages_delete (void);
static void downlist_master_choose_and_send (void);
static void joinlist_inform_clients (void);
static void joinlist_messages_delete (void);
static void cpg_sync_init (
const unsigned int *trans_list,
size_t trans_list_entries,
const unsigned int *member_list,
size_t member_list_entries,
const struct memb_ring_id *ring_id);
static int cpg_sync_process (void);
static void cpg_sync_activate (void);
static void cpg_sync_abort (void);
static void do_proc_join(
const mar_cpg_name_t *name,
uint32_t pid,
unsigned int nodeid,
int reason);
static void do_proc_leave(
const mar_cpg_name_t *name,
uint32_t pid,
unsigned int nodeid,
int reason);
static int notify_lib_totem_membership (
void *conn,
int member_list_entries,
const unsigned int *member_list);
static inline int zcb_all_free (
struct cpg_pd *cpd);
static char *cpg_print_group_name (
const mar_cpg_name_t *group);
/*
* Library Handler Definition
*/
static struct corosync_lib_handler cpg_lib_engine[] =
{
{ /* 0 - MESSAGE_REQ_CPG_JOIN */
.lib_handler_fn = message_handler_req_lib_cpg_join,
.flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
},
{ /* 1 - MESSAGE_REQ_CPG_LEAVE */
.lib_handler_fn = message_handler_req_lib_cpg_leave,
.flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
},
{ /* 2 - MESSAGE_REQ_CPG_MCAST */
.lib_handler_fn = message_handler_req_lib_cpg_mcast,
.flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
},
{ /* 3 - MESSAGE_REQ_CPG_MEMBERSHIP */
.lib_handler_fn = message_handler_req_lib_cpg_membership,
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 4 - MESSAGE_REQ_CPG_LOCAL_GET */
.lib_handler_fn = message_handler_req_lib_cpg_local_get,
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 5 - MESSAGE_REQ_CPG_ITERATIONINITIALIZE */
.lib_handler_fn = message_handler_req_lib_cpg_iteration_initialize,
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 6 - MESSAGE_REQ_CPG_ITERATIONNEXT */
.lib_handler_fn = message_handler_req_lib_cpg_iteration_next,
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 7 - MESSAGE_REQ_CPG_ITERATIONFINALIZE */
.lib_handler_fn = message_handler_req_lib_cpg_iteration_finalize,
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 8 - MESSAGE_REQ_CPG_FINALIZE */
.lib_handler_fn = message_handler_req_lib_cpg_finalize,
.flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
},
{ /* 9 */
.lib_handler_fn = message_handler_req_lib_cpg_zc_alloc,
.flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
},
{ /* 10 */
.lib_handler_fn = message_handler_req_lib_cpg_zc_free,
.flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
},
{ /* 11 */
.lib_handler_fn = message_handler_req_lib_cpg_zc_execute,
.flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
},
{ /* 12 */
.lib_handler_fn = message_handler_req_lib_cpg_partial_mcast,
.flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
},
};
static struct corosync_exec_handler cpg_exec_engine[] =
{
{ /* 0 - MESSAGE_REQ_EXEC_CPG_PROCJOIN */
.exec_handler_fn = message_handler_req_exec_cpg_procjoin,
.exec_endian_convert_fn = exec_cpg_procjoin_endian_convert
},
{ /* 1 - MESSAGE_REQ_EXEC_CPG_PROCLEAVE */
.exec_handler_fn = message_handler_req_exec_cpg_procleave,
.exec_endian_convert_fn = exec_cpg_procjoin_endian_convert
},
{ /* 2 - MESSAGE_REQ_EXEC_CPG_JOINLIST */
.exec_handler_fn = message_handler_req_exec_cpg_joinlist,
.exec_endian_convert_fn = exec_cpg_joinlist_endian_convert
},
{ /* 3 - MESSAGE_REQ_EXEC_CPG_MCAST */
.exec_handler_fn = message_handler_req_exec_cpg_mcast,
.exec_endian_convert_fn = exec_cpg_mcast_endian_convert
},
{ /* 4 - MESSAGE_REQ_EXEC_CPG_DOWNLIST_OLD */
.exec_handler_fn = message_handler_req_exec_cpg_downlist_old,
.exec_endian_convert_fn = exec_cpg_downlist_endian_convert_old
},
{ /* 5 - MESSAGE_REQ_EXEC_CPG_DOWNLIST */
.exec_handler_fn = message_handler_req_exec_cpg_downlist,
.exec_endian_convert_fn = exec_cpg_downlist_endian_convert
},
{ /* 6 - MESSAGE_REQ_EXEC_CPG_PARTIAL_MCAST */
.exec_handler_fn = message_handler_req_exec_cpg_partial_mcast,
.exec_endian_convert_fn = exec_cpg_partial_mcast_endian_convert
},
};
struct corosync_service_engine cpg_service_engine = {
.name = "corosync cluster closed process group service v1.01",
.id = CPG_SERVICE,
.priority = 1,
.private_data_size = sizeof (struct cpg_pd),
.flow_control = CS_LIB_FLOW_CONTROL_REQUIRED,
.allow_inquorate = CS_LIB_ALLOW_INQUORATE,
.lib_init_fn = cpg_lib_init_fn,
.lib_exit_fn = cpg_lib_exit_fn,
.lib_engine = cpg_lib_engine,
.lib_engine_count = sizeof (cpg_lib_engine) / sizeof (struct corosync_lib_handler),
.exec_init_fn = cpg_exec_init_fn,
.exec_dump_fn = NULL,
.exec_engine = cpg_exec_engine,
.exec_engine_count = sizeof (cpg_exec_engine) / sizeof (struct corosync_exec_handler),
.sync_init = cpg_sync_init,
.sync_process = cpg_sync_process,
.sync_activate = cpg_sync_activate,
.sync_abort = cpg_sync_abort
};
struct corosync_service_engine *cpg_get_service_engine_ver0 (void)
{
return (&cpg_service_engine);
}
struct req_exec_cpg_procjoin {
struct qb_ipc_request_header header __attribute__((aligned(8)));
mar_cpg_name_t group_name __attribute__((aligned(8)));
mar_uint32_t pid __attribute__((aligned(8)));
mar_uint32_t reason __attribute__((aligned(8)));
};
struct req_exec_cpg_mcast {
struct qb_ipc_request_header header __attribute__((aligned(8)));
mar_cpg_name_t group_name __attribute__((aligned(8)));
mar_uint32_t msglen __attribute__((aligned(8)));
mar_uint32_t pid __attribute__((aligned(8)));
mar_message_source_t source __attribute__((aligned(8)));
mar_uint8_t message[] __attribute__((aligned(8)));
};
struct req_exec_cpg_partial_mcast {
struct qb_ipc_request_header header __attribute__((aligned(8)));
mar_cpg_name_t group_name __attribute__((aligned(8)));
mar_uint32_t msglen __attribute__((aligned(8)));
mar_uint32_t fraglen __attribute__((aligned(8)));
mar_uint32_t pid __attribute__((aligned(8)));
mar_uint32_t type __attribute__((aligned(8)));
mar_message_source_t source __attribute__((aligned(8)));
mar_uint8_t message[] __attribute__((aligned(8)));
};
struct req_exec_cpg_downlist_old {
struct qb_ipc_request_header header __attribute__((aligned(8)));
mar_uint32_t left_nodes __attribute__((aligned(8)));
mar_uint32_t nodeids[PROCESSOR_COUNT_MAX] __attribute__((aligned(8)));
};
struct req_exec_cpg_downlist {
struct qb_ipc_request_header header __attribute__((aligned(8)));
/* merge decisions */
mar_uint32_t old_members __attribute__((aligned(8)));
/* downlist below */
mar_uint32_t left_nodes __attribute__((aligned(8)));
mar_uint32_t nodeids[PROCESSOR_COUNT_MAX] __attribute__((aligned(8)));
};
struct downlist_msg {
mar_uint32_t sender_nodeid;
mar_uint32_t old_members __attribute__((aligned(8)));
mar_uint32_t left_nodes __attribute__((aligned(8)));
mar_uint32_t nodeids[PROCESSOR_COUNT_MAX] __attribute__((aligned(8)));
struct list_head list;
};
struct joinlist_msg {
mar_uint32_t sender_nodeid;
uint32_t pid;
mar_cpg_name_t group_name;
struct list_head list;
};
static struct req_exec_cpg_downlist g_req_exec_cpg_downlist;
/*
* Function print group name. It's not reentrant
*/
static char *cpg_print_group_name(const mar_cpg_name_t *group)
{
static char res[CPG_MAX_NAME_LENGTH * 4 + 1];
int dest_pos = 0;
char c;
int i;
for (i = 0; i < group->length; i++) {
c = group->value[i];
if (c >= ' ' && c < 0x7f && c != '\\') {
res[dest_pos++] = c;
} else {
if (c == '\\') {
res[dest_pos++] = '\\';
res[dest_pos++] = '\\';
} else {
snprintf(res + dest_pos, sizeof(res) - dest_pos, "\\x%02X", c);
dest_pos += 4;
}
}
}
res[dest_pos] = 0;
return (res);
}
static void cpg_sync_init (
const unsigned int *trans_list,
size_t trans_list_entries,
const unsigned int *member_list,
size_t member_list_entries,
const struct memb_ring_id *ring_id)
{
int entries;
int i, j;
int found;
my_sync_state = CPGSYNC_DOWNLIST;
memcpy (my_member_list, member_list, member_list_entries *
sizeof (unsigned int));
my_member_list_entries = member_list_entries;
last_sync_ring_id.nodeid = ring_id->rep.nodeid;
last_sync_ring_id.seq = ring_id->seq;
downlist_state = CPG_DOWNLIST_WAITING_FOR_MESSAGES;
entries = 0;
/*
* Determine list of nodeids for downlist message
*/
for (i = 0; i < my_old_member_list_entries; i++) {
found = 0;
for (j = 0; j < trans_list_entries; j++) {
if (my_old_member_list[i] == trans_list[j]) {
found = 1;
break;
}
}
if (found == 0) {
g_req_exec_cpg_downlist.nodeids[entries++] =
my_old_member_list[i];
}
}
g_req_exec_cpg_downlist.left_nodes = entries;
}
static int cpg_sync_process (void)
{
int res = -1;
if (my_sync_state == CPGSYNC_DOWNLIST) {
res = cpg_exec_send_downlist();
if (res == -1) {
return (-1);
}
my_sync_state = CPGSYNC_JOINLIST;
}
if (my_sync_state == CPGSYNC_JOINLIST) {
res = cpg_exec_send_joinlist();
}
return (res);
}
static void cpg_sync_activate (void)
{
memcpy (my_old_member_list, my_member_list,
my_member_list_entries * sizeof (unsigned int));
my_old_member_list_entries = my_member_list_entries;
if (downlist_state == CPG_DOWNLIST_WAITING_FOR_MESSAGES) {
downlist_master_choose_and_send ();
}
joinlist_inform_clients ();
downlist_messages_delete ();
downlist_state = CPG_DOWNLIST_NONE;
joinlist_messages_delete ();
notify_lib_totem_membership (NULL, my_member_list_entries, my_member_list);
}
static void cpg_sync_abort (void)
{
downlist_state = CPG_DOWNLIST_NONE;
downlist_messages_delete ();
joinlist_messages_delete ();
}
static int notify_lib_totem_membership (
void *conn,
int member_list_entries,
const unsigned int *member_list)
{
struct list_head *iter;
char *buf;
int size;
struct res_lib_cpg_totem_confchg_callback *res;
size = sizeof(struct res_lib_cpg_totem_confchg_callback) +
sizeof(mar_uint32_t) * (member_list_entries);
buf = alloca(size);
if (!buf)
return CS_ERR_LIBRARY;
res = (struct res_lib_cpg_totem_confchg_callback *)buf;
res->member_list_entries = member_list_entries;
res->header.size = size;
res->header.id = MESSAGE_RES_CPG_TOTEM_CONFCHG_CALLBACK;
res->header.error = CS_OK;
memcpy (&res->ring_id, &last_sync_ring_id, sizeof (mar_cpg_ring_id_t));
memcpy (res->member_list, member_list, res->member_list_entries * sizeof (mar_uint32_t));
if (conn == NULL) {
for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; iter = iter->next) {
struct cpg_pd *cpg_pd = list_entry (iter, struct cpg_pd, list);
api->ipc_dispatch_send (cpg_pd->conn, buf, size);
}
} else {
api->ipc_dispatch_send (conn, buf, size);
}
return CS_OK;
}
static int notify_lib_joinlist(
const mar_cpg_name_t *group_name,
void *conn,
int joined_list_entries,
mar_cpg_address_t *joined_list,
int left_list_entries,
mar_cpg_address_t *left_list,
int id)
{
int size;
char *buf;
struct list_head *iter;
int count;
struct res_lib_cpg_confchg_callback *res;
mar_cpg_address_t *retgi;
count = 0;
for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
struct process_info *pi = list_entry (iter, struct process_info, list);
if (mar_name_compare (&pi->group, group_name) == 0) {
int i;
int founded = 0;
for (i = 0; i < left_list_entries; i++) {
if (left_list[i].nodeid == pi->nodeid && left_list[i].pid == pi->pid) {
founded++;
}
}
if (!founded)
count++;
}
}
size = sizeof(struct res_lib_cpg_confchg_callback) +
sizeof(mar_cpg_address_t) * (count + left_list_entries + joined_list_entries);
buf = alloca(size);
if (!buf)
return CS_ERR_LIBRARY;
res = (struct res_lib_cpg_confchg_callback *)buf;
res->joined_list_entries = joined_list_entries;
res->left_list_entries = left_list_entries;
res->member_list_entries = count;
retgi = res->member_list;
res->header.size = size;
res->header.id = id;
res->header.error = CS_OK;
memcpy(&res->group_name, group_name, sizeof(mar_cpg_name_t));
for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
struct process_info *pi=list_entry (iter, struct process_info, list);
if (mar_name_compare (&pi->group, group_name) == 0) {
int i;
int founded = 0;
for (i = 0;i < left_list_entries; i++) {
if (left_list[i].nodeid == pi->nodeid && left_list[i].pid == pi->pid) {
founded++;
}
}
if (!founded) {
retgi->nodeid = pi->nodeid;
retgi->pid = pi->pid;
retgi++;
}
}
}
if (left_list_entries) {
memcpy (retgi, left_list, left_list_entries * sizeof(mar_cpg_address_t));
retgi += left_list_entries;
}
if (joined_list_entries) {
memcpy (retgi, joined_list, joined_list_entries * sizeof(mar_cpg_address_t));
retgi += joined_list_entries;
}
if (conn) {
api->ipc_dispatch_send (conn, buf, size);
} else {
for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; iter = iter->next) {
struct cpg_pd *cpd = list_entry (iter, struct cpg_pd, list);
if (mar_name_compare (&cpd->group_name, group_name) == 0) {
assert (joined_list_entries <= 1);
if (joined_list_entries) {
if (joined_list[0].pid == cpd->pid &&
joined_list[0].nodeid == api->totem_nodeid_get()) {
cpd->cpd_state = CPD_STATE_JOIN_COMPLETED;
}
}
if (cpd->cpd_state == CPD_STATE_JOIN_COMPLETED ||
cpd->cpd_state == CPD_STATE_LEAVE_STARTED) {
api->ipc_dispatch_send (cpd->conn, buf, size);
cpd->transition_counter++;
}
if (left_list_entries) {
if (left_list[0].pid == cpd->pid &&
left_list[0].nodeid == api->totem_nodeid_get() &&
left_list[0].reason == CONFCHG_CPG_REASON_LEAVE) {
cpd->pid = 0;
memset (&cpd->group_name, 0, sizeof(cpd->group_name));
cpd->cpd_state = CPD_STATE_UNJOINED;
}
}
}
}
}
/*
* Traverse thru cpds and send totem membership for cpd, where it is not send yet
*/
for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; iter = iter->next) {
struct cpg_pd *cpd = list_entry (iter, struct cpg_pd, list);
if ((cpd->flags & CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF) && (cpd->initial_totem_conf_sent == 0)) {
cpd->initial_totem_conf_sent = 1;
notify_lib_totem_membership (cpd->conn, my_old_member_list_entries, my_old_member_list);
}
}
return CS_OK;
}
static void downlist_log(const char *msg, struct downlist_msg* dl)
{
log_printf (LOG_DEBUG,
"%s: sender %s; members(old:%d left:%d)",
msg,
api->totem_ifaces_print(dl->sender_nodeid),
dl->old_members,
dl->left_nodes);
}
static struct downlist_msg* downlist_master_choose (void)
{
struct downlist_msg *cmp;
struct downlist_msg *best = NULL;
struct list_head *iter;
uint32_t cmp_members;
uint32_t best_members;
uint32_t i;
int ignore_msg;
for (iter = downlist_messages_head.next;
iter != &downlist_messages_head;
iter = iter->next) {
cmp = list_entry(iter, struct downlist_msg, list);
downlist_log("comparing", cmp);
ignore_msg = 0;
for (i = 0; i < cmp->left_nodes; i++) {
if (cmp->nodeids[i] == api->totem_nodeid_get()) {
log_printf (LOG_DEBUG, "Ignoring this entry because I'm in the left list\n");
ignore_msg = 1;
break;
}
}
if (ignore_msg) {
continue ;
}
if (best == NULL) {
best = cmp;
continue;
}
best_members = best->old_members - best->left_nodes;
cmp_members = cmp->old_members - cmp->left_nodes;
if (cmp_members > best_members) {
best = cmp;
} else if (cmp_members == best_members) {
if (cmp->old_members > best->old_members) {
best = cmp;
} else if (cmp->old_members == best->old_members) {
if (cmp->sender_nodeid < best->sender_nodeid) {
best = cmp;
}
}
}
}
assert (best != NULL);
return best;
}
static void downlist_master_choose_and_send (void)
{
struct downlist_msg *stored_msg;
struct list_head *iter;
struct process_info *left_pi;
qb_map_t *group_map;
struct cpg_name cpg_group;
mar_cpg_name_t group;
struct confchg_data{
struct cpg_name cpg_group;
mar_cpg_address_t left_list[CPG_MEMBERS_MAX];
int left_list_entries;
struct list_head list;
} *pcd;
qb_map_iter_t *miter;
int i, size;
downlist_state = CPG_DOWNLIST_APPLYING;
stored_msg = downlist_master_choose ();
if (!stored_msg) {
log_printf (LOGSYS_LEVEL_DEBUG, "NO chosen downlist");
return;
}
downlist_log("chosen downlist", stored_msg);
group_map = qb_skiplist_create();
/*
* only the cpg groups included in left nodes should receive
* confchg event, so we will collect these cpg groups and
* relative left_lists here.
*/
for (iter = process_info_list_head.next; iter != &process_info_list_head; ) {
struct process_info *pi = list_entry(iter, struct process_info, list);
iter = iter->next;
left_pi = NULL;
for (i = 0; i < stored_msg->left_nodes; i++) {
if (pi->nodeid == stored_msg->nodeids[i]) {
left_pi = pi;
break;
}
}
if (left_pi) {
marshall_from_mar_cpg_name_t(&cpg_group, &left_pi->group);
cpg_group.value[cpg_group.length] = 0;
pcd = (struct confchg_data *)qb_map_get(group_map, cpg_group.value);
if (pcd == NULL) {
pcd = (struct confchg_data *)calloc(1, sizeof(struct confchg_data));
memcpy(&pcd->cpg_group, &cpg_group, sizeof(struct cpg_name));
qb_map_put(group_map, pcd->cpg_group.value, pcd);
}
size = pcd->left_list_entries;
pcd->left_list[size].nodeid = left_pi->nodeid;
pcd->left_list[size].pid = left_pi->pid;
pcd->left_list[size].reason = CONFCHG_CPG_REASON_NODEDOWN;
pcd->left_list_entries++;
list_del (&left_pi->list);
free (left_pi);
}
}
/* send only one confchg event per cpg group */
miter = qb_map_iter_create(group_map);
while (qb_map_iter_next(miter, (void **)&pcd)) {
marshall_to_mar_cpg_name_t(&group, &pcd->cpg_group);
log_printf (LOG_DEBUG, "left_list_entries:%d", pcd->left_list_entries);
for (i=0; i<pcd->left_list_entries; i++) {
log_printf (LOG_DEBUG, "left_list[%d] group:%s, ip:%s, pid:%d",
i, cpg_print_group_name(&group),
(char*)api->totem_ifaces_print(pcd->left_list[i].nodeid),
pcd->left_list[i].pid);
}
/* send confchg event */
notify_lib_joinlist(&group, NULL,
0, NULL,
pcd->left_list_entries,
pcd->left_list,
MESSAGE_RES_CPG_CONFCHG_CALLBACK);
free(pcd);
}
qb_map_iter_free(miter);
qb_map_destroy(group_map);
}
/*
* Remove processes that might have left the group while we were suspended.
*/
static void joinlist_remove_zombie_pi_entries (void)
{
struct list_head *pi_iter;
struct list_head *jl_iter;
struct process_info *pi;
struct joinlist_msg *stored_msg;
int found;
for (pi_iter = process_info_list_head.next; pi_iter != &process_info_list_head; ) {
pi = list_entry (pi_iter, struct process_info, list);
pi_iter = pi_iter->next;
/*
* Ignore local node
*/
if (pi->nodeid == api->totem_nodeid_get()) {
continue ;
}
/*
* Try to find message in joinlist messages
*/
found = 0;
for (jl_iter = joinlist_messages_head.next;
jl_iter != &joinlist_messages_head;
jl_iter = jl_iter->next) {
stored_msg = list_entry(jl_iter, struct joinlist_msg, list);
if (stored_msg->sender_nodeid == api->totem_nodeid_get()) {
continue ;
}
if (pi->nodeid == stored_msg->sender_nodeid &&
pi->pid == stored_msg->pid &&
mar_name_compare (&pi->group, &stored_msg->group_name) == 0) {
found = 1;
break ;
}
}
if (!found) {
do_proc_leave(&pi->group, pi->pid, pi->nodeid, CONFCHG_CPG_REASON_PROCDOWN);
}
}
}
static void joinlist_inform_clients (void)
{
struct joinlist_msg *stored_msg;
struct list_head *iter;
unsigned int i;
i = 0;
for (iter = joinlist_messages_head.next;
iter != &joinlist_messages_head;
iter = iter->next) {
stored_msg = list_entry(iter, struct joinlist_msg, list);
log_printf (LOG_DEBUG, "joinlist_messages[%u] group:%s, ip:%s, pid:%d",
i++, cpg_print_group_name(&stored_msg->group_name),
(char*)api->totem_ifaces_print(stored_msg->sender_nodeid),
stored_msg->pid);
/* Ignore our own messages */
if (stored_msg->sender_nodeid == api->totem_nodeid_get()) {
continue ;
}
do_proc_join (&stored_msg->group_name, stored_msg->pid, stored_msg->sender_nodeid,
CONFCHG_CPG_REASON_NODEUP);
}
joinlist_remove_zombie_pi_entries ();
}
static void downlist_messages_delete (void)
{
struct downlist_msg *stored_msg;
struct list_head *iter, *iter_next;
for (iter = downlist_messages_head.next;
iter != &downlist_messages_head;
iter = iter_next) {
iter_next = iter->next;
stored_msg = list_entry(iter, struct downlist_msg, list);
list_del (&stored_msg->list);
free (stored_msg);
}
}
static void joinlist_messages_delete (void)
{
struct joinlist_msg *stored_msg;
struct list_head *iter, *iter_next;
for (iter = joinlist_messages_head.next;
iter != &joinlist_messages_head;
iter = iter_next) {
iter_next = iter->next;
stored_msg = list_entry(iter, struct joinlist_msg, list);
list_del (&stored_msg->list);
free (stored_msg);
}
list_init (&joinlist_messages_head);
}
static char *cpg_exec_init_fn (struct corosync_api_v1 *corosync_api)
{
list_init (&downlist_messages_head);
list_init (&joinlist_messages_head);
api = corosync_api;
return (NULL);
}
static void cpg_iteration_instance_finalize (struct cpg_iteration_instance *cpg_iteration_instance)
{
struct list_head *iter, *iter_next;
struct process_info *pi;
for (iter = cpg_iteration_instance->items_list_head.next;
iter != &cpg_iteration_instance->items_list_head;
iter = iter_next) {
iter_next = iter->next;
pi = list_entry (iter, struct process_info, list);
list_del (&pi->list);
free (pi);
}
list_del (&cpg_iteration_instance->list);
hdb_handle_destroy (&cpg_iteration_handle_t_db, cpg_iteration_instance->handle);
}
static void cpg_pd_finalize (struct cpg_pd *cpd)
{
struct list_head *iter, *iter_next;
struct cpg_iteration_instance *cpii;
zcb_all_free(cpd);
for (iter = cpd->iteration_instance_list_head.next;
iter != &cpd->iteration_instance_list_head;
iter = iter_next) {
iter_next = iter->next;
cpii = list_entry (iter, struct cpg_iteration_instance, list);
cpg_iteration_instance_finalize (cpii);
}
list_del (&cpd->list);
}
static int cpg_lib_exit_fn (void *conn)
{
struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
log_printf(LOGSYS_LEVEL_DEBUG, "exit_fn for conn=%p", conn);
if (cpd->group_name.length > 0 && cpd->cpd_state != CPD_STATE_LEAVE_STARTED) {
cpg_node_joinleave_send (cpd->pid, &cpd->group_name,
MESSAGE_REQ_EXEC_CPG_PROCLEAVE, CONFCHG_CPG_REASON_PROCDOWN);
}
cpg_pd_finalize (cpd);
api->ipc_refcnt_dec (conn);
return (0);
}
static int cpg_node_joinleave_send (unsigned int pid, const mar_cpg_name_t *group_name, int fn, int reason)
{
struct req_exec_cpg_procjoin req_exec_cpg_procjoin;
struct iovec req_exec_cpg_iovec;
int result;
memcpy(&req_exec_cpg_procjoin.group_name, group_name, sizeof(mar_cpg_name_t));
req_exec_cpg_procjoin.pid = pid;
req_exec_cpg_procjoin.reason = reason;
req_exec_cpg_procjoin.header.size = sizeof(req_exec_cpg_procjoin);
req_exec_cpg_procjoin.header.id = SERVICE_ID_MAKE(CPG_SERVICE, fn);
req_exec_cpg_iovec.iov_base = (char *)&req_exec_cpg_procjoin;
req_exec_cpg_iovec.iov_len = sizeof(req_exec_cpg_procjoin);
result = api->totem_mcast (&req_exec_cpg_iovec, 1, TOTEM_AGREED);
return (result);
}
/* Can byteswap join & leave messages */
static void exec_cpg_procjoin_endian_convert (void *msg)
{
struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = msg;
req_exec_cpg_procjoin->pid = swab32(req_exec_cpg_procjoin->pid);
swab_mar_cpg_name_t (&req_exec_cpg_procjoin->group_name);
req_exec_cpg_procjoin->reason = swab32(req_exec_cpg_procjoin->reason);
}
static void exec_cpg_joinlist_endian_convert (void *msg_v)
{
char *msg = msg_v;
struct qb_ipc_response_header *res = (struct qb_ipc_response_header *)msg;
struct join_list_entry *jle = (struct join_list_entry *)(msg + sizeof(struct qb_ipc_response_header));
swab_mar_int32_t (&res->size);
while ((const char*)jle < msg + res->size) {
jle->pid = swab32(jle->pid);
swab_mar_cpg_name_t (&jle->group_name);
jle++;
}
}
static void exec_cpg_downlist_endian_convert_old (void *msg)
{
}
static void exec_cpg_downlist_endian_convert (void *msg)
{
struct req_exec_cpg_downlist *req_exec_cpg_downlist = msg;
unsigned int i;
req_exec_cpg_downlist->left_nodes = swab32(req_exec_cpg_downlist->left_nodes);
req_exec_cpg_downlist->old_members = swab32(req_exec_cpg_downlist->old_members);
for (i = 0; i < req_exec_cpg_downlist->left_nodes; i++) {
req_exec_cpg_downlist->nodeids[i] = swab32(req_exec_cpg_downlist->nodeids[i]);
}
}
static void exec_cpg_mcast_endian_convert (void *msg)
{
struct req_exec_cpg_mcast *req_exec_cpg_mcast = msg;
swab_coroipc_request_header_t (&req_exec_cpg_mcast->header);
swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name);
req_exec_cpg_mcast->pid = swab32(req_exec_cpg_mcast->pid);
req_exec_cpg_mcast->msglen = swab32(req_exec_cpg_mcast->msglen);
swab_mar_message_source_t (&req_exec_cpg_mcast->source);
}
static void exec_cpg_partial_mcast_endian_convert (void *msg)
{
struct req_exec_cpg_partial_mcast *req_exec_cpg_mcast = msg;
swab_coroipc_request_header_t (&req_exec_cpg_mcast->header);
swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name);
req_exec_cpg_mcast->pid = swab32(req_exec_cpg_mcast->pid);
req_exec_cpg_mcast->msglen = swab32(req_exec_cpg_mcast->msglen);
req_exec_cpg_mcast->fraglen = swab32(req_exec_cpg_mcast->fraglen);
req_exec_cpg_mcast->type = swab32(req_exec_cpg_mcast->type);
swab_mar_message_source_t (&req_exec_cpg_mcast->source);
}
static struct process_info *process_info_find(const mar_cpg_name_t *group_name, uint32_t pid, unsigned int nodeid) {
struct list_head *iter;
for (iter = process_info_list_head.next; iter != &process_info_list_head; ) {
struct process_info *pi = list_entry (iter, struct process_info, list);
iter = iter->next;
if (pi->pid == pid && pi->nodeid == nodeid &&
mar_name_compare (&pi->group, group_name) == 0) {
return pi;
}
}
return NULL;
}
static void do_proc_join(
const mar_cpg_name_t *name,
uint32_t pid,
unsigned int nodeid,
int reason)
{
struct process_info *pi;
struct process_info *pi_entry;
mar_cpg_address_t notify_info;
struct list_head *list;
struct list_head *list_to_add = NULL;
if (process_info_find (name, pid, nodeid) != NULL) {
return ;
}
pi = malloc (sizeof (struct process_info));
if (!pi) {
log_printf(LOGSYS_LEVEL_WARNING, "Unable to allocate process_info struct");
return;
}
pi->nodeid = nodeid;
pi->pid = pid;
memcpy(&pi->group, name, sizeof(*name));
list_init(&pi->list);
/*
* Insert new process in sorted order so synchronization works properly
*/
list_to_add = &process_info_list_head;
for (list = process_info_list_head.next; list != &process_info_list_head; list = list->next) {
pi_entry = list_entry(list, struct process_info, list);
if (pi_entry->nodeid > pi->nodeid ||
(pi_entry->nodeid == pi->nodeid && pi_entry->pid > pi->pid)) {
break;
}
list_to_add = list;
}
list_add (&pi->list, list_to_add);
notify_info.pid = pi->pid;
notify_info.nodeid = nodeid;
notify_info.reason = reason;
notify_lib_joinlist(&pi->group, NULL,
1, &notify_info,
0, NULL,
MESSAGE_RES_CPG_CONFCHG_CALLBACK);
}
static void do_proc_leave(
const mar_cpg_name_t *name,
uint32_t pid,
unsigned int nodeid,
int reason)
{
struct process_info *pi;
struct list_head *iter;
mar_cpg_address_t notify_info;
notify_info.pid = pid;
notify_info.nodeid = nodeid;
notify_info.reason = reason;
notify_lib_joinlist(name, NULL,
0, NULL,
1, &notify_info,
MESSAGE_RES_CPG_CONFCHG_CALLBACK);
for (iter = process_info_list_head.next; iter != &process_info_list_head; ) {
pi = list_entry(iter, struct process_info, list);
iter = iter->next;
if (pi->pid == pid && pi->nodeid == nodeid &&
mar_name_compare (&pi->group, name)==0) {
list_del (&pi->list);
free (pi);
}
}
}
static void message_handler_req_exec_cpg_downlist_old (
const void *message,
unsigned int nodeid)
{
log_printf (LOGSYS_LEVEL_WARNING, "downlist OLD from node 0x%x",
nodeid);
}
static void message_handler_req_exec_cpg_downlist(
const void *message,
unsigned int nodeid)
{
const struct req_exec_cpg_downlist *req_exec_cpg_downlist = message;
int i;
struct list_head *iter;
struct downlist_msg *stored_msg;
int found;
if (downlist_state != CPG_DOWNLIST_WAITING_FOR_MESSAGES) {
log_printf (LOGSYS_LEVEL_WARNING, "downlist left_list: %d received in state %d",
req_exec_cpg_downlist->left_nodes, downlist_state);
return;
}
stored_msg = malloc (sizeof (struct downlist_msg));
stored_msg->sender_nodeid = nodeid;
stored_msg->old_members = req_exec_cpg_downlist->old_members;
stored_msg->left_nodes = req_exec_cpg_downlist->left_nodes;
memcpy (stored_msg->nodeids, req_exec_cpg_downlist->nodeids,
req_exec_cpg_downlist->left_nodes * sizeof (mar_uint32_t));
list_init (&stored_msg->list);
list_add (&stored_msg->list, &downlist_messages_head);
for (i = 0; i < my_member_list_entries; i++) {
found = 0;
for (iter = downlist_messages_head.next;
iter != &downlist_messages_head;
iter = iter->next) {
stored_msg = list_entry(iter, struct downlist_msg, list);
if (my_member_list[i] == stored_msg->sender_nodeid) {
found = 1;
}
}
if (!found) {
return;
}
}
downlist_master_choose_and_send ();
}
static void message_handler_req_exec_cpg_procjoin (
const void *message,
unsigned int nodeid)
{
const struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = message;
log_printf(LOGSYS_LEVEL_DEBUG, "got procjoin message from cluster node 0x%x (%s) for pid %u",
nodeid,
api->totem_ifaces_print(nodeid),
(unsigned int)req_exec_cpg_procjoin->pid);
do_proc_join (&req_exec_cpg_procjoin->group_name,
req_exec_cpg_procjoin->pid, nodeid,
CONFCHG_CPG_REASON_JOIN);
}
static void message_handler_req_exec_cpg_procleave (
const void *message,
unsigned int nodeid)
{
const struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = message;
log_printf(LOGSYS_LEVEL_DEBUG, "got procleave message from cluster node 0x%x (%s) for pid %u",
nodeid,
api->totem_ifaces_print(nodeid),
(unsigned int)req_exec_cpg_procjoin->pid);
do_proc_leave (&req_exec_cpg_procjoin->group_name,
req_exec_cpg_procjoin->pid, nodeid,
req_exec_cpg_procjoin->reason);
}
/* Got a proclist from another node */
static void message_handler_req_exec_cpg_joinlist (
const void *message_v,
unsigned int nodeid)
{
const char *message = message_v;
const struct qb_ipc_response_header *res = (const struct qb_ipc_response_header *)message;
const struct join_list_entry *jle = (const struct join_list_entry *)(message + sizeof(struct qb_ipc_response_header));
struct joinlist_msg *stored_msg;
log_printf(LOGSYS_LEVEL_DEBUG, "got joinlist message from node 0x%x",
nodeid);
while ((const char*)jle < message + res->size) {
stored_msg = malloc (sizeof (struct joinlist_msg));
memset(stored_msg, 0, sizeof (struct joinlist_msg));
stored_msg->sender_nodeid = nodeid;
stored_msg->pid = jle->pid;
memcpy(&stored_msg->group_name, &jle->group_name, sizeof(mar_cpg_name_t));
list_init (&stored_msg->list);
list_add (&stored_msg->list, &joinlist_messages_head);
jle++;
}
}
static void message_handler_req_exec_cpg_mcast (
const void *message,
unsigned int nodeid)
{
const struct req_exec_cpg_mcast *req_exec_cpg_mcast = message;
struct res_lib_cpg_deliver_callback res_lib_cpg_mcast;
int msglen = req_exec_cpg_mcast->msglen;
struct list_head *iter, *pi_iter;
struct cpg_pd *cpd;
struct iovec iovec[2];
int known_node = 0;
res_lib_cpg_mcast.header.id = MESSAGE_RES_CPG_DELIVER_CALLBACK;
res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast) + msglen;
res_lib_cpg_mcast.msglen = msglen;
res_lib_cpg_mcast.pid = req_exec_cpg_mcast->pid;
res_lib_cpg_mcast.nodeid = nodeid;
memcpy(&res_lib_cpg_mcast.group_name, &req_exec_cpg_mcast->group_name,
sizeof(mar_cpg_name_t));
iovec[0].iov_base = (void *)&res_lib_cpg_mcast;
iovec[0].iov_len = sizeof (res_lib_cpg_mcast);
iovec[1].iov_base = (char*)message+sizeof(*req_exec_cpg_mcast);
iovec[1].iov_len = msglen;
for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; ) {
cpd = list_entry(iter, struct cpg_pd, list);
iter = iter->next;
if ((cpd->cpd_state == CPD_STATE_LEAVE_STARTED || cpd->cpd_state == CPD_STATE_JOIN_COMPLETED)
&& (mar_name_compare (&cpd->group_name, &req_exec_cpg_mcast->group_name) == 0)) {
if (!known_node) {
/* Try to find, if we know the node */
for (pi_iter = process_info_list_head.next;
pi_iter != &process_info_list_head; pi_iter = pi_iter->next) {
struct process_info *pi = list_entry (pi_iter, struct process_info, list);
if (pi->nodeid == nodeid &&
mar_name_compare (&pi->group, &req_exec_cpg_mcast->group_name) == 0) {
known_node = 1;
break;
}
}
}
if (!known_node) {
log_printf(LOGSYS_LEVEL_WARNING, "Unknown node -> we will not deliver message");
return ;
}
api->ipc_dispatch_iov_send (cpd->conn, iovec, 2);
}
}
}
static void message_handler_req_exec_cpg_partial_mcast (
const void *message,
unsigned int nodeid)
{
const struct req_exec_cpg_partial_mcast *req_exec_cpg_mcast = message;
struct res_lib_cpg_partial_deliver_callback res_lib_cpg_mcast;
int msglen = req_exec_cpg_mcast->fraglen;
struct list_head *iter, *pi_iter;
struct cpg_pd *cpd;
struct iovec iovec[2];
int known_node = 0;
log_printf(LOGSYS_LEVEL_DEBUG, "Got fragmented message from node %d, size = %d bytes\n", nodeid, msglen);
res_lib_cpg_mcast.header.id = MESSAGE_RES_CPG_PARTIAL_DELIVER_CALLBACK;
res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast) + msglen;
res_lib_cpg_mcast.fraglen = msglen;
res_lib_cpg_mcast.msglen = req_exec_cpg_mcast->msglen;
res_lib_cpg_mcast.pid = req_exec_cpg_mcast->pid;
res_lib_cpg_mcast.type = req_exec_cpg_mcast->type;
res_lib_cpg_mcast.nodeid = nodeid;
memcpy(&res_lib_cpg_mcast.group_name, &req_exec_cpg_mcast->group_name,
sizeof(mar_cpg_name_t));
iovec[0].iov_base = (void *)&res_lib_cpg_mcast;
iovec[0].iov_len = sizeof (res_lib_cpg_mcast);
iovec[1].iov_base = (char*)message+sizeof(*req_exec_cpg_mcast);
iovec[1].iov_len = msglen;
for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; ) {
cpd = list_entry(iter, struct cpg_pd, list);
iter = iter->next;
if ((cpd->cpd_state == CPD_STATE_LEAVE_STARTED || cpd->cpd_state == CPD_STATE_JOIN_COMPLETED)
&& (mar_name_compare (&cpd->group_name, &req_exec_cpg_mcast->group_name) == 0)) {
if (!known_node) {
/* Try to find, if we know the node */
for (pi_iter = process_info_list_head.next;
pi_iter != &process_info_list_head; pi_iter = pi_iter->next) {
struct process_info *pi = list_entry (pi_iter, struct process_info, list);
if (pi->nodeid == nodeid &&
mar_name_compare (&pi->group, &req_exec_cpg_mcast->group_name) == 0) {
known_node = 1;
break;
}
}
}
if (!known_node) {
log_printf(LOGSYS_LEVEL_WARNING, "Unknown node -> we will not deliver message");
return ;
}
api->ipc_dispatch_iov_send (cpd->conn, iovec, 2);
}
}
}
static int cpg_exec_send_downlist(void)
{
struct iovec iov;
g_req_exec_cpg_downlist.header.id = SERVICE_ID_MAKE(CPG_SERVICE, MESSAGE_REQ_EXEC_CPG_DOWNLIST);
g_req_exec_cpg_downlist.header.size = sizeof(struct req_exec_cpg_downlist);
g_req_exec_cpg_downlist.old_members = my_old_member_list_entries;
iov.iov_base = (void *)&g_req_exec_cpg_downlist;
iov.iov_len = g_req_exec_cpg_downlist.header.size;
return (api->totem_mcast (&iov, 1, TOTEM_AGREED));
}
static int cpg_exec_send_joinlist(void)
{
int count = 0;
struct list_head *iter;
struct qb_ipc_response_header *res;
char *buf;
struct join_list_entry *jle;
struct iovec req_exec_cpg_iovec;
for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
struct process_info *pi = list_entry (iter, struct process_info, list);
if (pi->nodeid == api->totem_nodeid_get ()) {
count++;
}
}
/* Nothing to send */
if (!count)
return 0;
buf = alloca(sizeof(struct qb_ipc_response_header) + sizeof(struct join_list_entry) * count);
if (!buf) {
log_printf(LOGSYS_LEVEL_WARNING, "Unable to allocate joinlist buffer");
return -1;
}
jle = (struct join_list_entry *)(buf + sizeof(struct qb_ipc_response_header));
res = (struct qb_ipc_response_header *)buf;
for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
struct process_info *pi = list_entry (iter, struct process_info, list);
if (pi->nodeid == api->totem_nodeid_get ()) {
memcpy (&jle->group_name, &pi->group, sizeof (mar_cpg_name_t));
jle->pid = pi->pid;
jle++;
}
}
res->id = SERVICE_ID_MAKE(CPG_SERVICE, MESSAGE_REQ_EXEC_CPG_JOINLIST);
res->size = sizeof(struct qb_ipc_response_header)+sizeof(struct join_list_entry) * count;
req_exec_cpg_iovec.iov_base = buf;
req_exec_cpg_iovec.iov_len = res->size;
return (api->totem_mcast (&req_exec_cpg_iovec, 1, TOTEM_AGREED));
}
static int cpg_lib_init_fn (void *conn)
{
struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
memset (cpd, 0, sizeof(struct cpg_pd));
cpd->conn = conn;
list_add (&cpd->list, &cpg_pd_list_head);
list_init (&cpd->iteration_instance_list_head);
list_init (&cpd->zcb_mapped_list_head);
api->ipc_refcnt_inc (conn);
log_printf(LOGSYS_LEVEL_DEBUG, "lib_init_fn: conn=%p, cpd=%p", conn, cpd);
return (0);
}
/* Join message from the library */
static void message_handler_req_lib_cpg_join (void *conn, const void *message)
{
const struct req_lib_cpg_join *req_lib_cpg_join = message;
struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
struct res_lib_cpg_join res_lib_cpg_join;
cs_error_t error = CS_OK;
struct list_head *iter;
/* Test, if we don't have same pid and group name joined */
for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; iter = iter->next) {
struct cpg_pd *cpd_item = list_entry (iter, struct cpg_pd, list);
if (cpd_item->pid == req_lib_cpg_join->pid &&
mar_name_compare(&req_lib_cpg_join->group_name, &cpd_item->group_name) == 0) {
/* We have same pid and group name joined -> return error */
error = CS_ERR_EXIST;
goto response_send;
}
}
/*
* Same check must be done in process info list, because there may be not yet delivered
* leave of client.
*/
for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
struct process_info *pi = list_entry (iter, struct process_info, list);
if (pi->nodeid == api->totem_nodeid_get () && pi->pid == req_lib_cpg_join->pid &&
mar_name_compare(&req_lib_cpg_join->group_name, &pi->group) == 0) {
/* We have same pid and group name joined -> return error */
error = CS_ERR_TRY_AGAIN;
goto response_send;
}
}
if (req_lib_cpg_join->group_name.length > CPG_MAX_NAME_LENGTH) {
error = CS_ERR_NAME_TOO_LONG;
goto response_send;
}
switch (cpd->cpd_state) {
case CPD_STATE_UNJOINED:
error = CS_OK;
cpd->cpd_state = CPD_STATE_JOIN_STARTED;
cpd->pid = req_lib_cpg_join->pid;
cpd->flags = req_lib_cpg_join->flags;
memcpy (&cpd->group_name, &req_lib_cpg_join->group_name,
sizeof (cpd->group_name));
cpg_node_joinleave_send (req_lib_cpg_join->pid,
&req_lib_cpg_join->group_name,
MESSAGE_REQ_EXEC_CPG_PROCJOIN, CONFCHG_CPG_REASON_JOIN);
break;
case CPD_STATE_LEAVE_STARTED:
error = CS_ERR_BUSY;
break;
case CPD_STATE_JOIN_STARTED:
error = CS_ERR_EXIST;
break;
case CPD_STATE_JOIN_COMPLETED:
error = CS_ERR_EXIST;
break;
}
response_send:
res_lib_cpg_join.header.size = sizeof(res_lib_cpg_join);
res_lib_cpg_join.header.id = MESSAGE_RES_CPG_JOIN;
res_lib_cpg_join.header.error = error;
api->ipc_response_send (conn, &res_lib_cpg_join, sizeof(res_lib_cpg_join));
}
/* Leave message from the library */
static void message_handler_req_lib_cpg_leave (void *conn, const void *message)
{
struct res_lib_cpg_leave res_lib_cpg_leave;
cs_error_t error = CS_OK;
struct req_lib_cpg_leave *req_lib_cpg_leave = (struct req_lib_cpg_leave *)message;
struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
log_printf(LOGSYS_LEVEL_DEBUG, "got leave request on %p", conn);
switch (cpd->cpd_state) {
case CPD_STATE_UNJOINED:
error = CS_ERR_NOT_EXIST;
break;
case CPD_STATE_LEAVE_STARTED:
error = CS_ERR_NOT_EXIST;
break;
case CPD_STATE_JOIN_STARTED:
error = CS_ERR_BUSY;
break;
case CPD_STATE_JOIN_COMPLETED:
error = CS_OK;
cpd->cpd_state = CPD_STATE_LEAVE_STARTED;
cpg_node_joinleave_send (req_lib_cpg_leave->pid,
&req_lib_cpg_leave->group_name,
MESSAGE_REQ_EXEC_CPG_PROCLEAVE,
CONFCHG_CPG_REASON_LEAVE);
break;
}
/* send return */
res_lib_cpg_leave.header.size = sizeof(res_lib_cpg_leave);
res_lib_cpg_leave.header.id = MESSAGE_RES_CPG_LEAVE;
res_lib_cpg_leave.header.error = error;
api->ipc_response_send(conn, &res_lib_cpg_leave, sizeof(res_lib_cpg_leave));
}
/* Finalize message from library */
static void message_handler_req_lib_cpg_finalize (
void *conn,
const void *message)
{
struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
struct res_lib_cpg_finalize res_lib_cpg_finalize;
cs_error_t error = CS_OK;
log_printf (LOGSYS_LEVEL_DEBUG, "cpg finalize for conn=%p", conn);
/*
* We will just remove cpd from list. After this call, connection will be
* closed on lib side, and cpg_lib_exit_fn will be called
*/
list_del (&cpd->list);
list_init (&cpd->list);
res_lib_cpg_finalize.header.size = sizeof (res_lib_cpg_finalize);
res_lib_cpg_finalize.header.id = MESSAGE_RES_CPG_FINALIZE;
res_lib_cpg_finalize.header.error = error;
api->ipc_response_send (conn, &res_lib_cpg_finalize,
sizeof (res_lib_cpg_finalize));
}
static int
memory_map (
const char *path,
size_t bytes,
void **buf)
{
int32_t fd;
void *addr;
int32_t res;
fd = open (path, O_RDWR, 0600);
unlink (path);
if (fd == -1) {
return (-1);
}
res = ftruncate (fd, bytes);
if (res == -1) {
goto error_close_unlink;
}
addr = mmap (NULL, bytes, PROT_READ | PROT_WRITE,
MAP_SHARED, fd, 0);
if (addr == MAP_FAILED) {
goto error_close_unlink;
}
#ifdef MADV_NOSYNC
madvise(addr, bytes, MADV_NOSYNC);
#endif
res = close (fd);
if (res) {
munmap (addr, bytes);
return (-1);
}
*buf = addr;
return (0);
error_close_unlink:
close (fd);
unlink(path);
return -1;
}
static inline int zcb_alloc (
struct cpg_pd *cpd,
const char *path_to_file,
size_t size,
void **addr)
{
struct zcb_mapped *zcb_mapped;
unsigned int res;
zcb_mapped = malloc (sizeof (struct zcb_mapped));
if (zcb_mapped == NULL) {
return (-1);
}
res = memory_map (
path_to_file,
size,
addr);
if (res == -1) {
free (zcb_mapped);
return (-1);
}
list_init (&zcb_mapped->list);
zcb_mapped->addr = *addr;
zcb_mapped->size = size;
list_add_tail (&zcb_mapped->list, &cpd->zcb_mapped_list_head);
return (0);
}
static inline int zcb_free (struct zcb_mapped *zcb_mapped)
{
unsigned int res;
res = munmap (zcb_mapped->addr, zcb_mapped->size);
list_del (&zcb_mapped->list);
free (zcb_mapped);
return (res);
}
static inline int zcb_by_addr_free (struct cpg_pd *cpd, void *addr)
{
struct list_head *list;
struct zcb_mapped *zcb_mapped;
unsigned int res = 0;
for (list = cpd->zcb_mapped_list_head.next;
list != &cpd->zcb_mapped_list_head; list = list->next) {
zcb_mapped = list_entry (list, struct zcb_mapped, list);
if (zcb_mapped->addr == addr) {
res = zcb_free (zcb_mapped);
break;
}
}
return (res);
}
static inline int zcb_all_free (
struct cpg_pd *cpd)
{
struct list_head *list;
struct zcb_mapped *zcb_mapped;
for (list = cpd->zcb_mapped_list_head.next;
list != &cpd->zcb_mapped_list_head;) {
zcb_mapped = list_entry (list, struct zcb_mapped, list);
list = list->next;
zcb_free (zcb_mapped);
}
return (0);
}
union u {
uint64_t server_addr;
void *server_ptr;
};
static uint64_t void2serveraddr (void *server_ptr)
{
union u u;
u.server_ptr = server_ptr;
return (u.server_addr);
}
static void *serveraddr2void (uint64_t server_addr)
{
union u u;
u.server_addr = server_addr;
return (u.server_ptr);
};
static void message_handler_req_lib_cpg_zc_alloc (
void *conn,
const void *message)
{
mar_req_coroipcc_zc_alloc_t *hdr = (mar_req_coroipcc_zc_alloc_t *)message;
struct qb_ipc_response_header res_header;
void *addr = NULL;
struct coroipcs_zc_header *zc_header;
unsigned int res;
struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
log_printf(LOGSYS_LEVEL_DEBUG, "path: %s", hdr->path_to_file);
res = zcb_alloc (cpd, hdr->path_to_file, hdr->map_size,
&addr);
assert(res == 0);
zc_header = (struct coroipcs_zc_header *)addr;
zc_header->server_address = void2serveraddr(addr);
res_header.size = sizeof (struct qb_ipc_response_header);
res_header.id = 0;
api->ipc_response_send (conn,
&res_header,
res_header.size);
}
static void message_handler_req_lib_cpg_zc_free (
void *conn,
const void *message)
{
mar_req_coroipcc_zc_free_t *hdr = (mar_req_coroipcc_zc_free_t *)message;
struct qb_ipc_response_header res_header;
void *addr = NULL;
struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
log_printf(LOGSYS_LEVEL_DEBUG, " free'ing");
addr = serveraddr2void (hdr->server_address);
zcb_by_addr_free (cpd, addr);
res_header.size = sizeof (struct qb_ipc_response_header);
res_header.id = 0;
api->ipc_response_send (
conn, &res_header,
res_header.size);
}
/* Fragmented mcast message from the library */
static void message_handler_req_lib_cpg_partial_mcast (void *conn, const void *message)
{
const struct req_lib_cpg_partial_mcast *req_lib_cpg_mcast = message;
struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
mar_cpg_name_t group_name = cpd->group_name;
struct iovec req_exec_cpg_iovec[2];
struct req_exec_cpg_partial_mcast req_exec_cpg_mcast;
struct res_lib_cpg_partial_send res_lib_cpg_partial_send;
int msglen = req_lib_cpg_mcast->fraglen;
int result;
cs_error_t error = CS_ERR_NOT_EXIST;
log_printf(LOGSYS_LEVEL_TRACE, "got fragmented mcast request on %p", conn);
log_printf(LOGSYS_LEVEL_DEBUG, "Sending fragmented message size = %d bytes\n", msglen);
switch (cpd->cpd_state) {
case CPD_STATE_UNJOINED:
error = CS_ERR_NOT_EXIST;
break;
case CPD_STATE_LEAVE_STARTED:
error = CS_ERR_NOT_EXIST;
break;
case CPD_STATE_JOIN_STARTED:
error = CS_OK;
break;
case CPD_STATE_JOIN_COMPLETED:
error = CS_OK;
break;
}
res_lib_cpg_partial_send.header.size = sizeof(res_lib_cpg_partial_send);
res_lib_cpg_partial_send.header.id = MESSAGE_RES_CPG_PARTIAL_SEND;
if (req_lib_cpg_mcast->type == LIBCPG_PARTIAL_FIRST) {
cpd->initial_transition_counter = cpd->transition_counter;
}
if (cpd->transition_counter != cpd->initial_transition_counter) {
error = CS_ERR_INTERRUPT;
}
if (error == CS_OK) {
req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + msglen;
req_exec_cpg_mcast.header.id = SERVICE_ID_MAKE(CPG_SERVICE,
MESSAGE_REQ_EXEC_CPG_PARTIAL_MCAST);
req_exec_cpg_mcast.pid = cpd->pid;
req_exec_cpg_mcast.msglen = req_lib_cpg_mcast->msglen;
req_exec_cpg_mcast.type = req_lib_cpg_mcast->type;
req_exec_cpg_mcast.fraglen = req_lib_cpg_mcast->fraglen;
api->ipc_source_set (&req_exec_cpg_mcast.source, conn);
memcpy(&req_exec_cpg_mcast.group_name, &group_name,
sizeof(mar_cpg_name_t));
req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast;
req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast);
req_exec_cpg_iovec[1].iov_base = (char *)&req_lib_cpg_mcast->message;
req_exec_cpg_iovec[1].iov_len = msglen;
result = api->totem_mcast (req_exec_cpg_iovec, 2, TOTEM_AGREED);
assert(result == 0);
} else {
log_printf(LOGSYS_LEVEL_ERROR, "*** %p can't mcast to group %s state:%d, error:%d",
conn, group_name.value, cpd->cpd_state, error);
}
res_lib_cpg_partial_send.header.error = error;
api->ipc_response_send (conn, &res_lib_cpg_partial_send,
sizeof (res_lib_cpg_partial_send));
}
/* Mcast message from the library */
static void message_handler_req_lib_cpg_mcast (void *conn, const void *message)
{
const struct req_lib_cpg_mcast *req_lib_cpg_mcast = message;
struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
mar_cpg_name_t group_name = cpd->group_name;
struct iovec req_exec_cpg_iovec[2];
struct req_exec_cpg_mcast req_exec_cpg_mcast;
int msglen = req_lib_cpg_mcast->msglen;
int result;
cs_error_t error = CS_ERR_NOT_EXIST;
log_printf(LOGSYS_LEVEL_TRACE, "got mcast request on %p", conn);
switch (cpd->cpd_state) {
case CPD_STATE_UNJOINED:
error = CS_ERR_NOT_EXIST;
break;
case CPD_STATE_LEAVE_STARTED:
error = CS_ERR_NOT_EXIST;
break;
case CPD_STATE_JOIN_STARTED:
error = CS_OK;
break;
case CPD_STATE_JOIN_COMPLETED:
error = CS_OK;
break;
}
if (error == CS_OK) {
req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + msglen;
req_exec_cpg_mcast.header.id = SERVICE_ID_MAKE(CPG_SERVICE,
MESSAGE_REQ_EXEC_CPG_MCAST);
req_exec_cpg_mcast.pid = cpd->pid;
req_exec_cpg_mcast.msglen = msglen;
api->ipc_source_set (&req_exec_cpg_mcast.source, conn);
memcpy(&req_exec_cpg_mcast.group_name, &group_name,
sizeof(mar_cpg_name_t));
req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast;
req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast);
req_exec_cpg_iovec[1].iov_base = (char *)&req_lib_cpg_mcast->message;
req_exec_cpg_iovec[1].iov_len = msglen;
result = api->totem_mcast (req_exec_cpg_iovec, 2, TOTEM_AGREED);
assert(result == 0);
} else {
log_printf(LOGSYS_LEVEL_ERROR, "*** %p can't mcast to group %s state:%d, error:%d",
conn, group_name.value, cpd->cpd_state, error);
}
}
static void message_handler_req_lib_cpg_zc_execute (
void *conn,
const void *message)
{
mar_req_coroipcc_zc_execute_t *hdr = (mar_req_coroipcc_zc_execute_t *)message;
struct qb_ipc_request_header *header;
struct res_lib_cpg_mcast res_lib_cpg_mcast;
struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
struct iovec req_exec_cpg_iovec[2];
struct req_exec_cpg_mcast req_exec_cpg_mcast;
struct req_lib_cpg_mcast *req_lib_cpg_mcast;
int result;
cs_error_t error = CS_ERR_NOT_EXIST;
log_printf(LOGSYS_LEVEL_TRACE, "got ZC mcast request on %p", conn);
header = (struct qb_ipc_request_header *)(((char *)serveraddr2void(hdr->server_address) + sizeof (struct coroipcs_zc_header)));
req_lib_cpg_mcast = (struct req_lib_cpg_mcast *)header;
switch (cpd->cpd_state) {
case CPD_STATE_UNJOINED:
error = CS_ERR_NOT_EXIST;
break;
case CPD_STATE_LEAVE_STARTED:
error = CS_ERR_NOT_EXIST;
break;
case CPD_STATE_JOIN_STARTED:
error = CS_OK;
break;
case CPD_STATE_JOIN_COMPLETED:
error = CS_OK;
break;
}
res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast);
res_lib_cpg_mcast.header.id = MESSAGE_RES_CPG_MCAST;
if (error == CS_OK) {
req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + req_lib_cpg_mcast->msglen;
req_exec_cpg_mcast.header.id = SERVICE_ID_MAKE(CPG_SERVICE,
MESSAGE_REQ_EXEC_CPG_MCAST);
req_exec_cpg_mcast.pid = cpd->pid;
req_exec_cpg_mcast.msglen = req_lib_cpg_mcast->msglen;
api->ipc_source_set (&req_exec_cpg_mcast.source, conn);
memcpy(&req_exec_cpg_mcast.group_name, &cpd->group_name,
sizeof(mar_cpg_name_t));
req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast;
req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast);
req_exec_cpg_iovec[1].iov_base = (char *)header + sizeof(struct req_lib_cpg_mcast);
req_exec_cpg_iovec[1].iov_len = req_exec_cpg_mcast.msglen;
result = api->totem_mcast (req_exec_cpg_iovec, 2, TOTEM_AGREED);
if (result == 0) {
res_lib_cpg_mcast.header.error = CS_OK;
} else {
res_lib_cpg_mcast.header.error = CS_ERR_TRY_AGAIN;
}
} else {
res_lib_cpg_mcast.header.error = error;
}
api->ipc_response_send (conn, &res_lib_cpg_mcast,
sizeof (res_lib_cpg_mcast));
}
static void message_handler_req_lib_cpg_membership (void *conn,
const void *message)
{
struct req_lib_cpg_membership_get *req_lib_cpg_membership_get =
(struct req_lib_cpg_membership_get *)message;
struct res_lib_cpg_membership_get res_lib_cpg_membership_get;
struct list_head *iter;
int member_count = 0;
res_lib_cpg_membership_get.header.id = MESSAGE_RES_CPG_MEMBERSHIP;
res_lib_cpg_membership_get.header.error = CS_OK;
res_lib_cpg_membership_get.header.size =
sizeof (struct res_lib_cpg_membership_get);
for (iter = process_info_list_head.next;
iter != &process_info_list_head; iter = iter->next) {
struct process_info *pi = list_entry (iter, struct process_info, list);
if (mar_name_compare (&pi->group, &req_lib_cpg_membership_get->group_name) == 0) {
res_lib_cpg_membership_get.member_list[member_count].nodeid = pi->nodeid;
res_lib_cpg_membership_get.member_list[member_count].pid = pi->pid;
member_count += 1;
}
}
res_lib_cpg_membership_get.member_count = member_count;
api->ipc_response_send (conn, &res_lib_cpg_membership_get,
sizeof (res_lib_cpg_membership_get));
}
static void message_handler_req_lib_cpg_local_get (void *conn,
const void *message)
{
struct res_lib_cpg_local_get res_lib_cpg_local_get;
res_lib_cpg_local_get.header.size = sizeof (res_lib_cpg_local_get);
res_lib_cpg_local_get.header.id = MESSAGE_RES_CPG_LOCAL_GET;
res_lib_cpg_local_get.header.error = CS_OK;
res_lib_cpg_local_get.local_nodeid = api->totem_nodeid_get ();
api->ipc_response_send (conn, &res_lib_cpg_local_get,
sizeof (res_lib_cpg_local_get));
}
static void message_handler_req_lib_cpg_iteration_initialize (
void *conn,
const void *message)
{
const struct req_lib_cpg_iterationinitialize *req_lib_cpg_iterationinitialize = message;
struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
hdb_handle_t cpg_iteration_handle = 0;
struct res_lib_cpg_iterationinitialize res_lib_cpg_iterationinitialize;
struct list_head *iter, *iter2;
struct cpg_iteration_instance *cpg_iteration_instance;
cs_error_t error = CS_OK;
int res;
log_printf (LOGSYS_LEVEL_DEBUG, "cpg iteration initialize");
/* Because between calling this function and *next can be some operations which will
* change list, we must do full copy.
*/
/*
* Create new iteration instance
*/
res = hdb_handle_create (&cpg_iteration_handle_t_db, sizeof (struct cpg_iteration_instance),
&cpg_iteration_handle);
if (res != 0) {
error = CS_ERR_NO_MEMORY;
goto response_send;
}
res = hdb_handle_get (&cpg_iteration_handle_t_db, cpg_iteration_handle, (void *)&cpg_iteration_instance);
if (res != 0) {
error = CS_ERR_BAD_HANDLE;
goto error_destroy;
}
list_init (&cpg_iteration_instance->items_list_head);
cpg_iteration_instance->handle = cpg_iteration_handle;
/*
* Create copy of process_info list "grouped by" group name
*/
for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
struct process_info *pi = list_entry (iter, struct process_info, list);
struct process_info *new_pi;
if (req_lib_cpg_iterationinitialize->iteration_type == CPG_ITERATION_NAME_ONLY) {
/*
* Try to find processed group name in our list new list
*/
int found = 0;
for (iter2 = cpg_iteration_instance->items_list_head.next;
iter2 != &cpg_iteration_instance->items_list_head;
iter2 = iter2->next) {
struct process_info *pi2 = list_entry (iter2, struct process_info, list);
if (mar_name_compare (&pi2->group, &pi->group) == 0) {
found = 1;
break;
}
}
if (found) {
/*
* We have this name in list -> don't add
*/
continue ;
}
} else if (req_lib_cpg_iterationinitialize->iteration_type == CPG_ITERATION_ONE_GROUP) {
/*
* Test pi group name with request
*/
if (mar_name_compare (&pi->group, &req_lib_cpg_iterationinitialize->group_name) != 0)
/*
* Not same -> don't add
*/
continue ;
}
new_pi = malloc (sizeof (struct process_info));
if (!new_pi) {
log_printf(LOGSYS_LEVEL_WARNING, "Unable to allocate process_info struct");
error = CS_ERR_NO_MEMORY;
goto error_put_destroy;
}
memcpy (new_pi, pi, sizeof (struct process_info));
list_init (&new_pi->list);
if (req_lib_cpg_iterationinitialize->iteration_type == CPG_ITERATION_NAME_ONLY) {
/*
* pid and nodeid -> undefined
*/
new_pi->pid = new_pi->nodeid = 0;
}
/*
* We will return list "grouped" by "group name", so try to find right place to add
*/
for (iter2 = cpg_iteration_instance->items_list_head.next;
iter2 != &cpg_iteration_instance->items_list_head;
iter2 = iter2->next) {
struct process_info *pi2 = list_entry (iter2, struct process_info, list);
if (mar_name_compare (&pi2->group, &pi->group) == 0) {
break;
}
}
list_add (&new_pi->list, iter2);
}
/*
* Now we have a full "grouped by" copy of process_info list
*/
/*
* Add instance to current cpd list
*/
list_init (&cpg_iteration_instance->list);
list_add (&cpg_iteration_instance->list, &cpd->iteration_instance_list_head);
cpg_iteration_instance->current_pointer = &cpg_iteration_instance->items_list_head;
error_put_destroy:
hdb_handle_put (&cpg_iteration_handle_t_db, cpg_iteration_handle);
error_destroy:
if (error != CS_OK) {
hdb_handle_destroy (&cpg_iteration_handle_t_db, cpg_iteration_handle);
}
response_send:
res_lib_cpg_iterationinitialize.header.size = sizeof (res_lib_cpg_iterationinitialize);
res_lib_cpg_iterationinitialize.header.id = MESSAGE_RES_CPG_ITERATIONINITIALIZE;
res_lib_cpg_iterationinitialize.header.error = error;
res_lib_cpg_iterationinitialize.iteration_handle = cpg_iteration_handle;
api->ipc_response_send (conn, &res_lib_cpg_iterationinitialize,
sizeof (res_lib_cpg_iterationinitialize));
}
static void message_handler_req_lib_cpg_iteration_next (
void *conn,
const void *message)
{
const struct req_lib_cpg_iterationnext *req_lib_cpg_iterationnext = message;
struct res_lib_cpg_iterationnext res_lib_cpg_iterationnext;
struct cpg_iteration_instance *cpg_iteration_instance;
cs_error_t error = CS_OK;
int res;
struct process_info *pi;
log_printf (LOGSYS_LEVEL_DEBUG, "cpg iteration next");
res = hdb_handle_get (&cpg_iteration_handle_t_db,
req_lib_cpg_iterationnext->iteration_handle,
(void *)&cpg_iteration_instance);
if (res != 0) {
error = CS_ERR_LIBRARY;
goto error_exit;
}
assert (cpg_iteration_instance);
cpg_iteration_instance->current_pointer = cpg_iteration_instance->current_pointer->next;
if (cpg_iteration_instance->current_pointer == &cpg_iteration_instance->items_list_head) {
error = CS_ERR_NO_SECTIONS;
goto error_put;
}
pi = list_entry (cpg_iteration_instance->current_pointer, struct process_info, list);
/*
* Copy iteration data
*/
res_lib_cpg_iterationnext.description.nodeid = pi->nodeid;
res_lib_cpg_iterationnext.description.pid = pi->pid;
memcpy (&res_lib_cpg_iterationnext.description.group,
&pi->group,
sizeof (mar_cpg_name_t));
error_put:
hdb_handle_put (&cpg_iteration_handle_t_db, req_lib_cpg_iterationnext->iteration_handle);
error_exit:
res_lib_cpg_iterationnext.header.size = sizeof (res_lib_cpg_iterationnext);
res_lib_cpg_iterationnext.header.id = MESSAGE_RES_CPG_ITERATIONNEXT;
res_lib_cpg_iterationnext.header.error = error;
api->ipc_response_send (conn, &res_lib_cpg_iterationnext,
sizeof (res_lib_cpg_iterationnext));
}
static void message_handler_req_lib_cpg_iteration_finalize (
void *conn,
const void *message)
{
const struct req_lib_cpg_iterationfinalize *req_lib_cpg_iterationfinalize = message;
struct res_lib_cpg_iterationfinalize res_lib_cpg_iterationfinalize;
struct cpg_iteration_instance *cpg_iteration_instance;
cs_error_t error = CS_OK;
int res;
log_printf (LOGSYS_LEVEL_DEBUG, "cpg iteration finalize");
res = hdb_handle_get (&cpg_iteration_handle_t_db,
req_lib_cpg_iterationfinalize->iteration_handle,
(void *)&cpg_iteration_instance);
if (res != 0) {
error = CS_ERR_LIBRARY;
goto error_exit;
}
assert (cpg_iteration_instance);
cpg_iteration_instance_finalize (cpg_iteration_instance);
hdb_handle_put (&cpg_iteration_handle_t_db, cpg_iteration_instance->handle);
error_exit:
res_lib_cpg_iterationfinalize.header.size = sizeof (res_lib_cpg_iterationfinalize);
res_lib_cpg_iterationfinalize.header.id = MESSAGE_RES_CPG_ITERATIONFINALIZE;
res_lib_cpg_iterationfinalize.header.error = error;
api->ipc_response_send (conn, &res_lib_cpg_iterationfinalize,
sizeof (res_lib_cpg_iterationfinalize));
}
diff --git a/exec/quorum.c b/exec/quorum.c
index d4837a2c..323a15f8 100644
--- a/exec/quorum.c
+++ b/exec/quorum.c
@@ -1,112 +1,110 @@
/*
* Copyright (c) 2008-2012 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Christine Caulfield (ccaulfie@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <config.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/ioctl.h>
#include <netinet/in.h>
#include <sys/uio.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <time.h>
-#include <unistd.h>
-#include <netinet/in.h>
#include <arpa/inet.h>
#include <corosync/corotypes.h>
#include <corosync/swab.h>
#include <corosync/totem/totempg.h>
#include <corosync/totem/totem.h>
#include <corosync/logsys.h>
#include "quorum.h"
#include "main.h"
#include "vsf.h"
LOGSYS_DECLARE_SUBSYS ("QUORUM");
static struct quorum_callin_functions *corosync_quorum_fns = NULL;
int corosync_quorum_is_quorate (void)
{
if (corosync_quorum_fns) {
return corosync_quorum_fns->quorate();
}
else {
return 1;
}
}
int corosync_quorum_register_callback (quorum_callback_fn_t fn, void *context)
{
if (corosync_quorum_fns) {
return corosync_quorum_fns->register_callback(fn, context);
}
else {
return 0;
}
}
int corosync_quorum_unregister_callback (quorum_callback_fn_t fn, void *context)
{
if (corosync_quorum_fns) {
return corosync_quorum_fns->unregister_callback(fn, context);
}
else {
return 0;
}
}
int corosync_quorum_initialize (struct quorum_callin_functions *fns)
{
if (corosync_quorum_fns)
return -1;
corosync_quorum_fns = fns;
return 0;
}
int quorum_none(void)
{
if (corosync_quorum_fns)
return 0;
else
return 1;
}
diff --git a/exec/sync.c b/exec/sync.c
index ea452e60..283634a8 100644
--- a/exec/sync.c
+++ b/exec/sync.c
@@ -1,630 +1,628 @@
/*
* Copyright (c) 2009-2012 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Steven Dake (sdake@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <config.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/ioctl.h>
#include <netinet/in.h>
#include <sys/uio.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <time.h>
-#include <unistd.h>
-#include <netinet/in.h>
#include <arpa/inet.h>
#include <corosync/corotypes.h>
#include <corosync/swab.h>
#include <corosync/totem/totempg.h>
#include <corosync/totem/totem.h>
#include <corosync/logsys.h>
#include <qb/qbipc_common.h>
#include "schedwrk.h"
#include "quorum.h"
#include "sync.h"
#include "main.h"
LOGSYS_DECLARE_SUBSYS ("SYNC");
#define MESSAGE_REQ_SYNC_BARRIER 0
#define MESSAGE_REQ_SYNC_SERVICE_BUILD 1
#define MESSAGE_REQ_SYNC_MEMB_DETERMINE 2
enum sync_process_state {
INIT,
PROCESS,
ACTIVATE
};
enum sync_state {
SYNC_SERVICELIST_BUILD,
SYNC_PROCESS,
SYNC_BARRIER
};
struct service_entry {
int service_id;
void (*sync_init) (
const unsigned int *trans_list,
size_t trans_list_entries,
const unsigned int *member_list,
size_t member_list_entries,
const struct memb_ring_id *ring_id);
void (*sync_abort) (void);
int (*sync_process) (void);
void (*sync_activate) (void);
enum sync_process_state state;
char name[128];
};
struct processor_entry {
int nodeid;
int received;
};
struct req_exec_memb_determine_message {
struct qb_ipc_request_header header __attribute__((aligned(8)));
struct memb_ring_id ring_id __attribute__((aligned(8)));
};
struct req_exec_service_build_message {
struct qb_ipc_request_header header __attribute__((aligned(8)));
struct memb_ring_id ring_id __attribute__((aligned(8)));
int service_list_entries __attribute__((aligned(8)));
int service_list[128] __attribute__((aligned(8)));
};
struct req_exec_barrier_message {
struct qb_ipc_request_header header __attribute__((aligned(8)));
struct memb_ring_id ring_id __attribute__((aligned(8)));
};
static enum sync_state my_state = SYNC_BARRIER;
static struct memb_ring_id my_ring_id;
static struct memb_ring_id my_memb_determine_ring_id;
static int my_memb_determine = 0;
static unsigned int my_memb_determine_list[PROCESSOR_COUNT_MAX];
static unsigned int my_memb_determine_list_entries = 0;
static int my_processing_idx = 0;
static hdb_handle_t my_schedwrk_handle;
static struct processor_entry my_processor_list[PROCESSOR_COUNT_MAX];
static unsigned int my_member_list[PROCESSOR_COUNT_MAX];
static unsigned int my_trans_list[PROCESSOR_COUNT_MAX];
static size_t my_member_list_entries = 0;
static size_t my_trans_list_entries = 0;
static int my_processor_list_entries = 0;
static struct service_entry my_service_list[SERVICES_COUNT_MAX];
static int my_service_list_entries = 0;
static void (*sync_synchronization_completed) (void);
static void sync_deliver_fn (
unsigned int nodeid,
const void *msg,
unsigned int msg_len,
int endian_conversion_required);
static int schedwrk_processor (const void *context);
static void sync_process_enter (void);
static struct totempg_group sync_group = {
.group = "sync",
.group_len = 4
};
static void *sync_group_handle;
int (*my_sync_callbacks_retrieve) (
int service_id,
struct sync_callbacks *callbacks);
int sync_init (
int (*sync_callbacks_retrieve) (
int service_id,
struct sync_callbacks *callbacks),
void (*synchronization_completed) (void))
{
unsigned int res;
res = totempg_groups_initialize (
&sync_group_handle,
sync_deliver_fn,
NULL);
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR,
"Couldn't initialize groups interface.");
return (-1);
}
res = totempg_groups_join (
sync_group_handle,
&sync_group,
1);
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "Couldn't join group.");
return (-1);
}
sync_synchronization_completed = synchronization_completed;
my_sync_callbacks_retrieve = sync_callbacks_retrieve;
return (0);
}
static void sync_barrier_handler (unsigned int nodeid, const void *msg)
{
const struct req_exec_barrier_message *req_exec_barrier_message = msg;
int i;
int barrier_reached = 1;
if (memcmp (&my_ring_id, &req_exec_barrier_message->ring_id,
sizeof (struct memb_ring_id)) != 0) {
log_printf (LOGSYS_LEVEL_DEBUG, "barrier for old ring - discarding");
return;
}
for (i = 0; i < my_processor_list_entries; i++) {
if (my_processor_list[i].nodeid == nodeid) {
my_processor_list[i].received = 1;
}
}
for (i = 0; i < my_processor_list_entries; i++) {
if (my_processor_list[i].received == 0) {
barrier_reached = 0;
}
}
if (barrier_reached) {
log_printf (LOGSYS_LEVEL_DEBUG, "Committing synchronization for %s",
my_service_list[my_processing_idx].name);
my_service_list[my_processing_idx].state = ACTIVATE;
if (my_sync_callbacks_retrieve(my_service_list[my_processing_idx].service_id, NULL) != -1) {
my_service_list[my_processing_idx].sync_activate ();
}
my_processing_idx += 1;
if (my_service_list_entries == my_processing_idx) {
my_memb_determine_list_entries = 0;
sync_synchronization_completed ();
} else {
sync_process_enter ();
}
}
}
static void dummy_sync_init (
const unsigned int *trans_list,
size_t trans_list_entries,
const unsigned int *member_list,
size_t member_list_entries,
const struct memb_ring_id *ring_id)
{
}
static void dummy_sync_abort (void)
{
}
static int dummy_sync_process (void)
{
return (0);
}
static void dummy_sync_activate (void)
{
}
static int service_entry_compare (const void *a, const void *b)
{
const struct service_entry *service_entry_a = a;
const struct service_entry *service_entry_b = b;
return (service_entry_a->service_id > service_entry_b->service_id);
}
static void sync_memb_determine (unsigned int nodeid, const void *msg)
{
const struct req_exec_memb_determine_message *req_exec_memb_determine_message = msg;
int found = 0;
int i;
if (memcmp (&req_exec_memb_determine_message->ring_id,
&my_memb_determine_ring_id, sizeof (struct memb_ring_id)) != 0) {
log_printf (LOGSYS_LEVEL_DEBUG, "memb determine for old ring - discarding");
return;
}
my_memb_determine = 1;
for (i = 0; i < my_memb_determine_list_entries; i++) {
if (my_memb_determine_list[i] == nodeid) {
found = 1;
}
}
if (found == 0) {
my_memb_determine_list[my_memb_determine_list_entries] = nodeid;
my_memb_determine_list_entries += 1;
}
}
static void sync_service_build_handler (unsigned int nodeid, const void *msg)
{
const struct req_exec_service_build_message *req_exec_service_build_message = msg;
int i, j;
int barrier_reached = 1;
int found;
int qsort_trigger = 0;
if (memcmp (&my_ring_id, &req_exec_service_build_message->ring_id,
sizeof (struct memb_ring_id)) != 0) {
log_printf (LOGSYS_LEVEL_DEBUG, "service build for old ring - discarding");
return;
}
for (i = 0; i < req_exec_service_build_message->service_list_entries; i++) {
found = 0;
for (j = 0; j < my_service_list_entries; j++) {
if (req_exec_service_build_message->service_list[i] ==
my_service_list[j].service_id) {
found = 1;
break;
}
}
if (found == 0) {
my_service_list[my_service_list_entries].state =
INIT;
my_service_list[my_service_list_entries].service_id =
req_exec_service_build_message->service_list[i];
sprintf (my_service_list[my_service_list_entries].name,
"Unknown External Service (id = %d)\n",
req_exec_service_build_message->service_list[i]);
my_service_list[my_service_list_entries].sync_init =
dummy_sync_init;
my_service_list[my_service_list_entries].sync_abort =
dummy_sync_abort;
my_service_list[my_service_list_entries].sync_process =
dummy_sync_process;
my_service_list[my_service_list_entries].sync_activate =
dummy_sync_activate;
my_service_list_entries += 1;
qsort_trigger = 1;
}
}
if (qsort_trigger) {
qsort (my_service_list, my_service_list_entries,
sizeof (struct service_entry), service_entry_compare);
}
for (i = 0; i < my_processor_list_entries; i++) {
if (my_processor_list[i].nodeid == nodeid) {
my_processor_list[i].received = 1;
}
}
for (i = 0; i < my_processor_list_entries; i++) {
if (my_processor_list[i].received == 0) {
barrier_reached = 0;
}
}
if (barrier_reached) {
sync_process_enter ();
}
}
static void sync_deliver_fn (
unsigned int nodeid,
const void *msg,
unsigned int msg_len,
int endian_conversion_required)
{
struct qb_ipc_request_header *header = (struct qb_ipc_request_header *)msg;
switch (header->id) {
case MESSAGE_REQ_SYNC_BARRIER:
sync_barrier_handler (nodeid, msg);
break;
case MESSAGE_REQ_SYNC_SERVICE_BUILD:
sync_service_build_handler (nodeid, msg);
break;
case MESSAGE_REQ_SYNC_MEMB_DETERMINE:
sync_memb_determine (nodeid, msg);
break;
}
}
static void memb_determine_message_transmit (void)
{
struct iovec iovec;
struct req_exec_memb_determine_message req_exec_memb_determine_message;
req_exec_memb_determine_message.header.size = sizeof (struct req_exec_memb_determine_message);
req_exec_memb_determine_message.header.id = MESSAGE_REQ_SYNC_MEMB_DETERMINE;
memcpy (&req_exec_memb_determine_message.ring_id,
&my_memb_determine_ring_id,
sizeof (struct memb_ring_id));
iovec.iov_base = (char *)&req_exec_memb_determine_message;
iovec.iov_len = sizeof (req_exec_memb_determine_message);
(void)totempg_groups_mcast_joined (sync_group_handle,
&iovec, 1, TOTEMPG_AGREED);
}
static void barrier_message_transmit (void)
{
struct iovec iovec;
struct req_exec_barrier_message req_exec_barrier_message;
req_exec_barrier_message.header.size = sizeof (struct req_exec_barrier_message);
req_exec_barrier_message.header.id = MESSAGE_REQ_SYNC_BARRIER;
memcpy (&req_exec_barrier_message.ring_id, &my_ring_id,
sizeof (struct memb_ring_id));
iovec.iov_base = (char *)&req_exec_barrier_message;
iovec.iov_len = sizeof (req_exec_barrier_message);
(void)totempg_groups_mcast_joined (sync_group_handle,
&iovec, 1, TOTEMPG_AGREED);
}
static void service_build_message_transmit (struct req_exec_service_build_message *service_build_message)
{
struct iovec iovec;
service_build_message->header.size = sizeof (struct req_exec_service_build_message);
service_build_message->header.id = MESSAGE_REQ_SYNC_SERVICE_BUILD;
memcpy (&service_build_message->ring_id, &my_ring_id,
sizeof (struct memb_ring_id));
iovec.iov_base = (void *)service_build_message;
iovec.iov_len = sizeof (struct req_exec_service_build_message);
(void)totempg_groups_mcast_joined (sync_group_handle,
&iovec, 1, TOTEMPG_AGREED);
}
static void sync_barrier_enter (void)
{
my_state = SYNC_BARRIER;
barrier_message_transmit ();
}
static void sync_process_enter (void)
{
int i;
my_state = SYNC_PROCESS;
/*
* No sync services
*/
if (my_service_list_entries == 0) {
my_state = SYNC_SERVICELIST_BUILD;
my_memb_determine_list_entries = 0;
sync_synchronization_completed ();
return;
}
for (i = 0; i < my_processor_list_entries; i++) {
my_processor_list[i].received = 0;
}
schedwrk_create (&my_schedwrk_handle,
schedwrk_processor,
NULL);
}
static void sync_servicelist_build_enter (
const unsigned int *member_list,
size_t member_list_entries,
const struct memb_ring_id *ring_id)
{
struct req_exec_service_build_message service_build;
int i;
int res;
struct sync_callbacks sync_callbacks;
my_state = SYNC_SERVICELIST_BUILD;
for (i = 0; i < member_list_entries; i++) {
my_processor_list[i].nodeid = member_list[i];
my_processor_list[i].received = 0;
}
my_processor_list_entries = member_list_entries;
memcpy (my_member_list, member_list,
member_list_entries * sizeof (unsigned int));
my_member_list_entries = member_list_entries;
my_processing_idx = 0;
memset(my_service_list, 0, sizeof (struct service_entry) * SERVICES_COUNT_MAX);
my_service_list_entries = 0;
for (i = 0; i < SERVICES_COUNT_MAX; i++) {
res = my_sync_callbacks_retrieve (i, &sync_callbacks);
if (res == -1) {
continue;
}
if (sync_callbacks.sync_init == NULL) {
continue;
}
my_service_list[my_service_list_entries].state = INIT;
my_service_list[my_service_list_entries].service_id = i;
strcpy (my_service_list[my_service_list_entries].name,
sync_callbacks.name);
my_service_list[my_service_list_entries].sync_init = sync_callbacks.sync_init;
my_service_list[my_service_list_entries].sync_process = sync_callbacks.sync_process;
my_service_list[my_service_list_entries].sync_abort = sync_callbacks.sync_abort;
my_service_list[my_service_list_entries].sync_activate = sync_callbacks.sync_activate;
my_service_list_entries += 1;
}
for (i = 0; i < my_service_list_entries; i++) {
service_build.service_list[i] =
my_service_list[i].service_id;
}
service_build.service_list_entries = my_service_list_entries;
service_build_message_transmit (&service_build);
}
static int schedwrk_processor (const void *context)
{
int res = 0;
if (my_service_list[my_processing_idx].state == INIT) {
unsigned int old_trans_list[PROCESSOR_COUNT_MAX];
size_t old_trans_list_entries = 0;
int o, m;
my_service_list[my_processing_idx].state = PROCESS;
memcpy (old_trans_list, my_trans_list, my_trans_list_entries *
sizeof (unsigned int));
old_trans_list_entries = my_trans_list_entries;
my_trans_list_entries = 0;
for (o = 0; o < old_trans_list_entries; o++) {
for (m = 0; m < my_member_list_entries; m++) {
if (old_trans_list[o] == my_member_list[m]) {
my_trans_list[my_trans_list_entries] = my_member_list[m];
my_trans_list_entries++;
break;
}
}
}
if (my_sync_callbacks_retrieve(my_service_list[my_processing_idx].service_id, NULL) != -1) {
my_service_list[my_processing_idx].sync_init (my_trans_list,
my_trans_list_entries, my_member_list,
my_member_list_entries,
&my_ring_id);
}
}
if (my_service_list[my_processing_idx].state == PROCESS) {
my_service_list[my_processing_idx].state = PROCESS;
if (my_sync_callbacks_retrieve(my_service_list[my_processing_idx].service_id, NULL) != -1) {
res = my_service_list[my_processing_idx].sync_process ();
} else {
res = 0;
}
if (res == 0) {
sync_barrier_enter();
} else {
return (-1);
}
}
return (0);
}
void sync_start (
const unsigned int *member_list,
size_t member_list_entries,
const struct memb_ring_id *ring_id)
{
ENTER();
memcpy (&my_ring_id, ring_id, sizeof (struct memb_ring_id));
if (my_memb_determine) {
my_memb_determine = 0;
sync_servicelist_build_enter (my_memb_determine_list,
my_memb_determine_list_entries, ring_id);
} else {
sync_servicelist_build_enter (member_list, member_list_entries,
ring_id);
}
}
void sync_save_transitional (
const unsigned int *member_list,
size_t member_list_entries,
const struct memb_ring_id *ring_id)
{
ENTER();
memcpy (my_trans_list, member_list, member_list_entries *
sizeof (unsigned int));
my_trans_list_entries = member_list_entries;
}
void sync_abort (void)
{
ENTER();
if (my_state == SYNC_PROCESS) {
schedwrk_destroy (my_schedwrk_handle);
if (my_sync_callbacks_retrieve(my_service_list[my_processing_idx].service_id, NULL) != -1) {
my_service_list[my_processing_idx].sync_abort ();
}
}
/* this will cause any "old" barrier messages from causing
* problems.
*/
memset (&my_ring_id, 0, sizeof (struct memb_ring_id));
}
void sync_memb_list_determine (const struct memb_ring_id *ring_id)
{
ENTER();
memcpy (&my_memb_determine_ring_id, ring_id,
sizeof (struct memb_ring_id));
memb_determine_message_transmit ();
}
void sync_memb_list_abort (void)
{
ENTER();
my_memb_determine_list_entries = 0;
memset (&my_memb_determine_ring_id, 0, sizeof (struct memb_ring_id));
}
diff --git a/exec/vsf_quorum.c b/exec/vsf_quorum.c
index e268deac..4bf3b0a4 100644
--- a/exec/vsf_quorum.c
+++ b/exec/vsf_quorum.c
@@ -1,488 +1,487 @@
/*
* Copyright (c) 2008-2015 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Christine Caulfield (ccaulfie@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of Red Hat Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <config.h>
#include <pwd.h>
#include <grp.h>
#include <sys/types.h>
#include <sys/poll.h>
#include <sys/uio.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <sched.h>
#include <time.h>
#include "quorum.h"
#include <corosync/corotypes.h>
#include <qb/qbipc_common.h>
#include <corosync/corodefs.h>
#include <corosync/swab.h>
#include <corosync/list.h>
#include <corosync/mar_gen.h>
#include <corosync/ipc_quorum.h>
-#include <corosync/mar_gen.h>
#include <corosync/coroapi.h>
#include <corosync/logsys.h>
#include <corosync/icmap.h>
#include "service.h"
#include "votequorum.h"
#include "vsf_ykd.h"
LOGSYS_DECLARE_SUBSYS ("QUORUM");
struct quorum_pd {
unsigned char track_flags;
int tracking_enabled;
struct list_head list;
void *conn;
};
struct internal_callback_pd {
struct list_head list;
quorum_callback_fn_t callback;
void *context;
};
static void message_handler_req_lib_quorum_getquorate (void *conn,
const void *msg);
static void message_handler_req_lib_quorum_trackstart (void *conn,
const void *msg);
static void message_handler_req_lib_quorum_trackstop (void *conn,
const void *msg);
static void message_handler_req_lib_quorum_gettype (void *conn,
const void *msg);
static void send_library_notification(void *conn);
static void send_internal_notification(void);
static char *quorum_exec_init_fn (struct corosync_api_v1 *api);
static int quorum_lib_init_fn (void *conn);
static int quorum_lib_exit_fn (void *conn);
static int primary_designated = 0;
static int quorum_type = 0;
static struct corosync_api_v1 *corosync_api;
static struct list_head lib_trackers_list;
static struct list_head internal_trackers_list;
static struct memb_ring_id quorum_ring_id;
static size_t quorum_view_list_entries = 0;
static int quorum_view_list[PROCESSOR_COUNT_MAX];
struct quorum_services_api_ver1 *quorum_iface = NULL;
static char view_buf[64];
static void log_view_list(const unsigned int *view_list, size_t view_list_entries)
{
int total = (int)view_list_entries;
int len, pos, ret;
int i = 0;
while (1) {
len = sizeof(view_buf);
pos = 0;
memset(view_buf, 0, len);
for (; i < total; i++) {
ret = snprintf(view_buf + pos, len - pos, " %u", view_list[i]);
if (ret >= len - pos)
break;
pos += ret;
}
log_printf (LOGSYS_LEVEL_NOTICE, "Members[%d]:%s%s",
total, view_buf, i < total ? "\\" : "");
if (i == total)
break;
}
}
/* Internal quorum API function */
static void quorum_api_set_quorum(const unsigned int *view_list,
size_t view_list_entries,
int quorum, struct memb_ring_id *ring_id)
{
int old_quorum = primary_designated;
primary_designated = quorum;
if (primary_designated && !old_quorum) {
log_printf (LOGSYS_LEVEL_NOTICE, "This node is within the primary component and will provide service.");
} else if (!primary_designated && old_quorum) {
log_printf (LOGSYS_LEVEL_NOTICE, "This node is within the non-primary component and will NOT provide any services.");
}
quorum_view_list_entries = view_list_entries;
memcpy(&quorum_ring_id, ring_id, sizeof (quorum_ring_id));
memcpy(quorum_view_list, view_list, sizeof(unsigned int)*view_list_entries);
log_view_list(view_list, view_list_entries);
/* Tell internal listeners */
send_internal_notification();
/* Tell IPC listeners */
send_library_notification(NULL);
}
static struct corosync_lib_handler quorum_lib_service[] =
{
{ /* 0 */
.lib_handler_fn = message_handler_req_lib_quorum_getquorate,
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 1 */
.lib_handler_fn = message_handler_req_lib_quorum_trackstart,
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 2 */
.lib_handler_fn = message_handler_req_lib_quorum_trackstop,
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 3 */
.lib_handler_fn = message_handler_req_lib_quorum_gettype,
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
}
};
static struct corosync_service_engine quorum_service_handler = {
.name = "corosync cluster quorum service v0.1",
.id = QUORUM_SERVICE,
.priority = 1,
.private_data_size = sizeof (struct quorum_pd),
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED,
.allow_inquorate = CS_LIB_ALLOW_INQUORATE,
.lib_init_fn = quorum_lib_init_fn,
.lib_exit_fn = quorum_lib_exit_fn,
.lib_engine = quorum_lib_service,
.exec_init_fn = quorum_exec_init_fn,
.lib_engine_count = sizeof (quorum_lib_service) / sizeof (struct corosync_lib_handler)
};
struct corosync_service_engine *vsf_quorum_get_service_engine_ver0 (void)
{
return (&quorum_service_handler);
}
/* -------------------------------------------------- */
/*
* Internal API functions for corosync
*/
static int quorum_quorate(void)
{
return primary_designated;
}
static int quorum_register_callback(quorum_callback_fn_t function, void *context)
{
struct internal_callback_pd *pd = malloc(sizeof(struct internal_callback_pd));
if (!pd)
return -1;
pd->context = context;
pd->callback = function;
list_add (&pd->list, &internal_trackers_list);
return 0;
}
static int quorum_unregister_callback(quorum_callback_fn_t function, void *context)
{
struct internal_callback_pd *pd;
struct list_head *tmp;
for (tmp = internal_trackers_list.next; tmp != &internal_trackers_list; tmp = tmp->next) {
pd = list_entry(tmp, struct internal_callback_pd, list);
if (pd->callback == function && pd->context == context) {
list_del(&pd->list);
free(pd);
return 0;
}
}
return -1;
}
static struct quorum_callin_functions callins = {
.quorate = quorum_quorate,
.register_callback = quorum_register_callback,
.unregister_callback = quorum_unregister_callback
};
/* --------------------------------------------------------------------- */
static char *quorum_exec_init_fn (struct corosync_api_v1 *api)
{
char *quorum_module = NULL;
char *error;
corosync_api = api;
list_init (&lib_trackers_list);
list_init (&internal_trackers_list);
/*
* Tell corosync we have a quorum engine.
*/
api->quorum_initialize(&callins);
/*
* Look for a quorum provider
*/
if (icmap_get_string("quorum.provider", &quorum_module) == CS_OK) {
log_printf (LOGSYS_LEVEL_NOTICE,
"Using quorum provider %s", quorum_module);
error = (char *)"Invalid quorum provider";
if (strcmp (quorum_module, "corosync_votequorum") == 0) {
error = votequorum_init (api, quorum_api_set_quorum);
quorum_type = 1;
}
if (strcmp (quorum_module, "corosync_ykd") == 0) {
error = ykd_init (api, quorum_api_set_quorum);
quorum_type = 1;
}
if (error) {
log_printf (LOGSYS_LEVEL_CRIT,
"Quorum provider: %s failed to initialize.",
quorum_module);
free(quorum_module);
return (error);
}
}
if (quorum_module) {
free(quorum_module);
quorum_module = NULL;
}
/*
* setting quorum_type and primary_designated in the right order is important
* always try to lookup/init a quorum module, then revert back to be quorate
*/
if (quorum_type == 0) {
primary_designated = 1;
}
return (NULL);
}
static int quorum_lib_init_fn (void *conn)
{
struct quorum_pd *pd = (struct quorum_pd *)corosync_api->ipc_private_data_get (conn);
log_printf(LOGSYS_LEVEL_DEBUG, "lib_init_fn: conn=%p", conn);
list_init (&pd->list);
pd->conn = conn;
return (0);
}
static int quorum_lib_exit_fn (void *conn)
{
struct quorum_pd *quorum_pd = (struct quorum_pd *)corosync_api->ipc_private_data_get (conn);
log_printf(LOGSYS_LEVEL_DEBUG, "lib_exit_fn: conn=%p", conn);
if (quorum_pd->tracking_enabled) {
list_del (&quorum_pd->list);
list_init (&quorum_pd->list);
}
return (0);
}
static void send_internal_notification(void)
{
struct list_head *tmp;
struct internal_callback_pd *pd;
for (tmp = internal_trackers_list.next; tmp != &internal_trackers_list; tmp = tmp->next) {
pd = list_entry(tmp, struct internal_callback_pd, list);
pd->callback(primary_designated, pd->context);
}
}
static void send_library_notification(void *conn)
{
int size = sizeof(struct res_lib_quorum_notification) + sizeof(unsigned int)*quorum_view_list_entries;
char buf[size];
struct res_lib_quorum_notification *res_lib_quorum_notification = (struct res_lib_quorum_notification *)buf;
struct list_head *tmp;
int i;
log_printf(LOGSYS_LEVEL_DEBUG, "sending quorum notification to %p, length = %d", conn, size);
res_lib_quorum_notification->quorate = primary_designated;
res_lib_quorum_notification->ring_seq = quorum_ring_id.seq;
res_lib_quorum_notification->view_list_entries = quorum_view_list_entries;
for (i=0; i<quorum_view_list_entries; i++) {
res_lib_quorum_notification->view_list[i] = quorum_view_list[i];
}
res_lib_quorum_notification->header.id = MESSAGE_RES_QUORUM_NOTIFICATION;
res_lib_quorum_notification->header.size = size;
res_lib_quorum_notification->header.error = CS_OK;
/* Send it to all interested parties */
if (conn) {
corosync_api->ipc_dispatch_send(conn, res_lib_quorum_notification, size);
}
else {
struct quorum_pd *qpd;
for (tmp = lib_trackers_list.next; tmp != &lib_trackers_list; tmp = tmp->next) {
qpd = list_entry(tmp, struct quorum_pd, list);
corosync_api->ipc_dispatch_send(qpd->conn,
res_lib_quorum_notification, size);
}
}
return;
}
static void message_handler_req_lib_quorum_getquorate (void *conn,
const void *msg)
{
struct res_lib_quorum_getquorate res_lib_quorum_getquorate;
log_printf(LOGSYS_LEVEL_DEBUG, "got quorate request on %p", conn);
/* send status */
res_lib_quorum_getquorate.quorate = primary_designated;
res_lib_quorum_getquorate.header.size = sizeof(res_lib_quorum_getquorate);
res_lib_quorum_getquorate.header.id = MESSAGE_RES_QUORUM_GETQUORATE;
res_lib_quorum_getquorate.header.error = CS_OK;
corosync_api->ipc_response_send(conn, &res_lib_quorum_getquorate, sizeof(res_lib_quorum_getquorate));
}
static void message_handler_req_lib_quorum_trackstart (void *conn,
const void *msg)
{
const struct req_lib_quorum_trackstart *req_lib_quorum_trackstart = msg;
struct qb_ipc_response_header res;
struct quorum_pd *quorum_pd = (struct quorum_pd *)corosync_api->ipc_private_data_get (conn);
cs_error_t error = CS_OK;
log_printf(LOGSYS_LEVEL_DEBUG, "got trackstart request on %p", conn);
/*
* If an immediate listing of the current cluster membership
* is requested, generate membership list
*/
if (req_lib_quorum_trackstart->track_flags & CS_TRACK_CURRENT ||
req_lib_quorum_trackstart->track_flags & CS_TRACK_CHANGES) {
log_printf(LOGSYS_LEVEL_DEBUG, "sending initial status to %p", conn);
send_library_notification(conn);
}
if (quorum_pd->tracking_enabled) {
error = CS_ERR_EXIST;
goto response_send;
}
/*
* Record requests for tracking
*/
if (req_lib_quorum_trackstart->track_flags & CS_TRACK_CHANGES ||
req_lib_quorum_trackstart->track_flags & CS_TRACK_CHANGES_ONLY) {
quorum_pd->track_flags = req_lib_quorum_trackstart->track_flags;
quorum_pd->tracking_enabled = 1;
list_add (&quorum_pd->list, &lib_trackers_list);
}
response_send:
/* send status */
res.size = sizeof(res);
res.id = MESSAGE_RES_QUORUM_TRACKSTART;
res.error = error;
corosync_api->ipc_response_send(conn, &res, sizeof(struct qb_ipc_response_header));
}
static void message_handler_req_lib_quorum_trackstop (void *conn, const void *msg)
{
struct qb_ipc_response_header res;
struct quorum_pd *quorum_pd = (struct quorum_pd *)corosync_api->ipc_private_data_get (conn);
log_printf(LOGSYS_LEVEL_DEBUG, "got trackstop request on %p", conn);
if (quorum_pd->tracking_enabled) {
res.error = CS_OK;
quorum_pd->tracking_enabled = 0;
list_del (&quorum_pd->list);
list_init (&quorum_pd->list);
} else {
res.error = CS_ERR_NOT_EXIST;
}
/* send status */
res.size = sizeof(res);
res.id = MESSAGE_RES_QUORUM_TRACKSTOP;
res.error = CS_OK;
corosync_api->ipc_response_send(conn, &res, sizeof(struct qb_ipc_response_header));
}
static void message_handler_req_lib_quorum_gettype (void *conn,
const void *msg)
{
struct res_lib_quorum_gettype res_lib_quorum_gettype;
log_printf(LOGSYS_LEVEL_DEBUG, "got quorum_type request on %p", conn);
/* send status */
res_lib_quorum_gettype.quorum_type = quorum_type;
res_lib_quorum_gettype.header.size = sizeof(res_lib_quorum_gettype);
res_lib_quorum_gettype.header.id = MESSAGE_RES_QUORUM_GETTYPE;
res_lib_quorum_gettype.header.error = CS_OK;
corosync_api->ipc_response_send(conn, &res_lib_quorum_gettype, sizeof(res_lib_quorum_gettype));
}
diff --git a/qdevices/tlv.c b/qdevices/tlv.c
index a7eae953..5b458690 100644
--- a/qdevices/tlv.c
+++ b/qdevices/tlv.c
@@ -1,1038 +1,1037 @@
/*
* Copyright (c) 2015-2016 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Jan Friesse (jfriesse@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the Red Hat, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <sys/types.h>
#include <arpa/inet.h>
#include <inttypes.h>
#include <stdlib.h>
#include <string.h>
/*
* 64-bit variant of ntoh is not exactly standard...
*/
#if defined(__linux__)
#include <endian.h>
#elif defined(__FreeBSD__) || defined(__NetBSD__)
#include <sys/endian.h>
#elif defined(__OpenBSD__)
-#include <sys/types.h>
#define be64toh(x) betoh64(x)
#endif
#include "tlv.h"
#define TLV_TYPE_LENGTH 2
#define TLV_LENGTH_LENGTH 2
#define TLV_STATIC_SUPPORTED_OPTIONS_SIZE 22
enum tlv_opt_type tlv_static_supported_options[TLV_STATIC_SUPPORTED_OPTIONS_SIZE] = {
TLV_OPT_MSG_SEQ_NUMBER,
TLV_OPT_CLUSTER_NAME,
TLV_OPT_TLS_SUPPORTED,
TLV_OPT_TLS_CLIENT_CERT_REQUIRED,
TLV_OPT_SUPPORTED_MESSAGES,
TLV_OPT_SUPPORTED_OPTIONS,
TLV_OPT_REPLY_ERROR_CODE,
TLV_OPT_SERVER_MAXIMUM_REQUEST_SIZE,
TLV_OPT_SERVER_MAXIMUM_REPLY_SIZE,
TLV_OPT_NODE_ID,
TLV_OPT_SUPPORTED_DECISION_ALGORITHMS,
TLV_OPT_DECISION_ALGORITHM,
TLV_OPT_HEARTBEAT_INTERVAL,
TLV_OPT_RING_ID,
TLV_OPT_CONFIG_VERSION,
TLV_OPT_DATA_CENTER_ID,
TLV_OPT_NODE_STATE,
TLV_OPT_NODE_INFO,
TLV_OPT_NODE_LIST_TYPE,
TLV_OPT_VOTE,
TLV_OPT_QUORATE,
TLV_OPT_TIE_BREAKER,
};
int
tlv_add(struct dynar *msg, enum tlv_opt_type opt_type, uint16_t opt_len, const void *value)
{
uint16_t nlen;
uint16_t nopt_type;
if (dynar_size(msg) + sizeof(nopt_type) + sizeof(nlen) + opt_len > dynar_max_size(msg)) {
return (-1);
}
nopt_type = htons((uint16_t)opt_type);
nlen = htons(opt_len);
dynar_cat(msg, &nopt_type, sizeof(nopt_type));
dynar_cat(msg, &nlen, sizeof(nlen));
dynar_cat(msg, value, opt_len);
return (0);
}
int
tlv_add_u32(struct dynar *msg, enum tlv_opt_type opt_type, uint32_t u32)
{
uint32_t nu32;
nu32 = htonl(u32);
return (tlv_add(msg, opt_type, sizeof(nu32), &nu32));
}
int
tlv_add_u8(struct dynar *msg, enum tlv_opt_type opt_type, uint8_t u8)
{
return (tlv_add(msg, opt_type, sizeof(u8), &u8));
}
int
tlv_add_u16(struct dynar *msg, enum tlv_opt_type opt_type, uint16_t u16)
{
uint16_t nu16;
nu16 = htons(u16);
return (tlv_add(msg, opt_type, sizeof(nu16), &nu16));
}
int
tlv_add_u64(struct dynar *msg, enum tlv_opt_type opt_type, uint64_t u64)
{
uint64_t nu64;
nu64 = htobe64(u64);
return (tlv_add(msg, opt_type, sizeof(nu64), &nu64));
}
int
tlv_add_string(struct dynar *msg, enum tlv_opt_type opt_type, const char *str)
{
return (tlv_add(msg, opt_type, strlen(str), str));
}
int
tlv_add_msg_seq_number(struct dynar *msg, uint32_t msg_seq_number)
{
return (tlv_add_u32(msg, TLV_OPT_MSG_SEQ_NUMBER, msg_seq_number));
}
int
tlv_add_cluster_name(struct dynar *msg, const char *cluster_name)
{
return (tlv_add_string(msg, TLV_OPT_CLUSTER_NAME, cluster_name));
}
int
tlv_add_tls_supported(struct dynar *msg, enum tlv_tls_supported tls_supported)
{
return (tlv_add_u8(msg, TLV_OPT_TLS_SUPPORTED, tls_supported));
}
int
tlv_add_tls_client_cert_required(struct dynar *msg, int tls_client_cert_required)
{
return (tlv_add_u8(msg, TLV_OPT_TLS_CLIENT_CERT_REQUIRED, tls_client_cert_required));
}
int
tlv_add_u16_array(struct dynar *msg, enum tlv_opt_type opt_type, const uint16_t *array,
size_t array_size)
{
size_t i;
uint16_t *nu16a;
uint16_t opt_len;
int res;
nu16a = malloc(sizeof(uint16_t) * array_size);
if (nu16a == NULL) {
return (-1);
}
for (i = 0; i < array_size; i++) {
nu16a[i] = htons(array[i]);
}
opt_len = sizeof(uint16_t) * array_size;
res = tlv_add(msg, opt_type, opt_len, nu16a);
free(nu16a);
return (res);
}
int
tlv_add_supported_options(struct dynar *msg, const enum tlv_opt_type *supported_options,
size_t no_supported_options)
{
uint16_t *u16a;
size_t i;
int res;
u16a = malloc(sizeof(*u16a) * no_supported_options);
if (u16a == NULL) {
return (-1);
}
for (i = 0; i < no_supported_options; i++) {
u16a[i] = (uint16_t)supported_options[i];
}
res = (tlv_add_u16_array(msg, TLV_OPT_SUPPORTED_OPTIONS, u16a, no_supported_options));
free(u16a);
return (res);
}
int
tlv_add_supported_decision_algorithms(struct dynar *msg,
const enum tlv_decision_algorithm_type *supported_algorithms, size_t no_supported_algorithms)
{
uint16_t *u16a;
size_t i;
int res;
u16a = malloc(sizeof(*u16a) * no_supported_algorithms);
if (u16a == NULL) {
return (-1);
}
for (i = 0; i < no_supported_algorithms; i++) {
u16a[i] = (uint16_t)supported_algorithms[i];
}
res = (tlv_add_u16_array(msg, TLV_OPT_SUPPORTED_DECISION_ALGORITHMS, u16a,
no_supported_algorithms));
free(u16a);
return (res);
}
int
tlv_add_reply_error_code(struct dynar *msg, enum tlv_reply_error_code error_code)
{
return (tlv_add_u16(msg, TLV_OPT_REPLY_ERROR_CODE, (uint16_t)error_code));
}
int
tlv_add_server_maximum_request_size(struct dynar *msg, size_t server_maximum_request_size)
{
return (tlv_add_u32(msg, TLV_OPT_SERVER_MAXIMUM_REQUEST_SIZE, server_maximum_request_size));
}
int
tlv_add_server_maximum_reply_size(struct dynar *msg, size_t server_maximum_reply_size)
{
return (tlv_add_u32(msg, TLV_OPT_SERVER_MAXIMUM_REPLY_SIZE, server_maximum_reply_size));
}
int
tlv_add_node_id(struct dynar *msg, uint32_t node_id)
{
return (tlv_add_u32(msg, TLV_OPT_NODE_ID, node_id));
}
int
tlv_add_decision_algorithm(struct dynar *msg, enum tlv_decision_algorithm_type decision_algorithm)
{
return (tlv_add_u16(msg, TLV_OPT_DECISION_ALGORITHM, (uint16_t)decision_algorithm));
}
int
tlv_add_heartbeat_interval(struct dynar *msg, uint32_t heartbeat_interval)
{
return (tlv_add_u32(msg, TLV_OPT_HEARTBEAT_INTERVAL, heartbeat_interval));
}
int
tlv_add_ring_id(struct dynar *msg, const struct tlv_ring_id *ring_id)
{
uint64_t nu64;
uint32_t nu32;
char tmp_buf[12];
nu32 = htonl(ring_id->node_id);
nu64 = htobe64(ring_id->seq);
memcpy(tmp_buf, &nu32, sizeof(nu32));
memcpy(tmp_buf + sizeof(nu32), &nu64, sizeof(nu64));
return (tlv_add(msg, TLV_OPT_RING_ID, sizeof(tmp_buf), tmp_buf));
}
int
tlv_add_tie_breaker(struct dynar *msg, const struct tlv_tie_breaker *tie_breaker)
{
uint32_t nu32;
uint8_t u8;
char tmp_buf[5];
u8 = tie_breaker->mode;
nu32 = (tie_breaker->mode == TLV_TIE_BREAKER_MODE_NODE_ID ?
htonl(tie_breaker->node_id) : 0);
memcpy(tmp_buf, &u8, sizeof(u8));
memcpy(tmp_buf + sizeof(u8), &nu32, sizeof(nu32));
return (tlv_add(msg, TLV_OPT_TIE_BREAKER, sizeof(tmp_buf), tmp_buf));
}
int
tlv_add_config_version(struct dynar *msg, uint64_t config_version)
{
return (tlv_add_u64(msg, TLV_OPT_CONFIG_VERSION, config_version));
}
int
tlv_add_data_center_id(struct dynar *msg, uint32_t data_center_id)
{
return (tlv_add_u32(msg, TLV_OPT_DATA_CENTER_ID, data_center_id));
}
int
tlv_add_node_state(struct dynar *msg, enum tlv_node_state node_state)
{
return (tlv_add_u8(msg, TLV_OPT_NODE_STATE, node_state));
}
int
tlv_add_node_info(struct dynar *msg, const struct tlv_node_info *node_info)
{
struct dynar opt_value;
int res;
res = 0;
/*
* Create sub message,
*/
dynar_init(&opt_value, 1024);
if ((res = tlv_add_node_id(&opt_value, node_info->node_id)) != 0) {
goto exit_dynar_destroy;
}
if (node_info->data_center_id != 0) {
if ((res = tlv_add_data_center_id(&opt_value, node_info->data_center_id)) != 0) {
goto exit_dynar_destroy;
}
}
if (node_info->node_state != TLV_NODE_STATE_NOT_SET) {
if ((res = tlv_add_node_state(&opt_value, node_info->node_state)) != 0) {
goto exit_dynar_destroy;
}
}
res = tlv_add(msg, TLV_OPT_NODE_INFO, dynar_size(&opt_value), dynar_data(&opt_value));
if (res != 0) {
goto exit_dynar_destroy;
}
exit_dynar_destroy:
dynar_destroy(&opt_value);
return (res);
}
int
tlv_add_node_list_type(struct dynar *msg, enum tlv_node_list_type node_list_type)
{
return (tlv_add_u8(msg, TLV_OPT_NODE_LIST_TYPE, node_list_type));
}
int
tlv_add_vote(struct dynar *msg, enum tlv_vote vote)
{
return (tlv_add_u8(msg, TLV_OPT_VOTE, vote));
}
int
tlv_add_quorate(struct dynar *msg, enum tlv_quorate quorate)
{
return (tlv_add_u8(msg, TLV_OPT_QUORATE, quorate));
}
void
tlv_iter_init_str(const char *msg, size_t msg_len, size_t msg_header_len,
struct tlv_iterator *tlv_iter)
{
tlv_iter->msg = msg;
tlv_iter->msg_len = msg_len;
tlv_iter->current_pos = 0;
tlv_iter->msg_header_len = msg_header_len;
tlv_iter->iter_next_called = 0;
}
void
tlv_iter_init(const struct dynar *msg, size_t msg_header_len, struct tlv_iterator *tlv_iter)
{
tlv_iter_init_str(dynar_data(msg), dynar_size(msg), msg_header_len, tlv_iter);
}
enum tlv_opt_type
tlv_iter_get_type(const struct tlv_iterator *tlv_iter)
{
uint16_t ntype;
uint16_t type;
memcpy(&ntype, tlv_iter->msg + tlv_iter->current_pos, sizeof(ntype));
type = ntohs(ntype);
return (type);
}
uint16_t
tlv_iter_get_len(const struct tlv_iterator *tlv_iter)
{
uint16_t nlen;
uint16_t len;
memcpy(&nlen, tlv_iter->msg + tlv_iter->current_pos + TLV_TYPE_LENGTH, sizeof(nlen));
len = ntohs(nlen);
return (len);
}
const char *
tlv_iter_get_data(const struct tlv_iterator *tlv_iter)
{
return (tlv_iter->msg + tlv_iter->current_pos + TLV_TYPE_LENGTH + TLV_LENGTH_LENGTH);
}
int
tlv_iter_next(struct tlv_iterator *tlv_iter)
{
uint16_t len;
if (tlv_iter->iter_next_called == 0) {
tlv_iter->iter_next_called = 1;
tlv_iter->current_pos = tlv_iter->msg_header_len;
goto check_tlv_validity;
}
len = tlv_iter_get_len(tlv_iter);
if (tlv_iter->current_pos + TLV_TYPE_LENGTH + TLV_LENGTH_LENGTH + len >=
tlv_iter->msg_len) {
return (0);
}
tlv_iter->current_pos += TLV_TYPE_LENGTH + TLV_LENGTH_LENGTH + len;
check_tlv_validity:
/*
* Check if tlv is valid = is not larger than whole message
*/
len = tlv_iter_get_len(tlv_iter);
if (tlv_iter->current_pos + TLV_TYPE_LENGTH + TLV_LENGTH_LENGTH + len > tlv_iter->msg_len) {
return (-1);
}
return (1);
}
int
tlv_iter_decode_u32(struct tlv_iterator *tlv_iter, uint32_t *res)
{
const char *opt_data;
uint16_t opt_len;
uint32_t nu32;
opt_len = tlv_iter_get_len(tlv_iter);
opt_data = tlv_iter_get_data(tlv_iter);
if (opt_len != sizeof(nu32)) {
return (-1);
}
memcpy(&nu32, opt_data, sizeof(nu32));
*res = ntohl(nu32);
return (0);
}
int
tlv_iter_decode_u8(struct tlv_iterator *tlv_iter, uint8_t *res)
{
const char *opt_data;
uint16_t opt_len;
opt_len = tlv_iter_get_len(tlv_iter);
opt_data = tlv_iter_get_data(tlv_iter);
if (opt_len != sizeof(*res)) {
return (-1);
}
memcpy(res, opt_data, sizeof(*res));
return (0);
}
int
tlv_iter_decode_client_cert_required(struct tlv_iterator *tlv_iter, uint8_t *client_cert_required)
{
return (tlv_iter_decode_u8(tlv_iter, client_cert_required));
}
int
tlv_iter_decode_str(struct tlv_iterator *tlv_iter, char **str, size_t *str_len)
{
const char *opt_data;
uint16_t opt_len;
char *tmp_str;
opt_len = tlv_iter_get_len(tlv_iter);
opt_data = tlv_iter_get_data(tlv_iter);
tmp_str = malloc(opt_len + 1);
if (tmp_str == NULL) {
return (-1);
}
memcpy(tmp_str, opt_data, opt_len);
tmp_str[opt_len] = '\0';
*str = tmp_str;
*str_len = opt_len;
return (0);
}
int
tlv_iter_decode_u16_array(struct tlv_iterator *tlv_iter, uint16_t **u16a, size_t *no_items)
{
uint16_t opt_len;
uint16_t *u16a_res;
size_t i;
opt_len = tlv_iter_get_len(tlv_iter);
if (opt_len % sizeof(uint16_t) != 0) {
return (-1);
}
*no_items = opt_len / sizeof(uint16_t);
u16a_res = malloc(sizeof(uint16_t) * *no_items);
if (u16a_res == NULL) {
return (-2);
}
memcpy(u16a_res, tlv_iter_get_data(tlv_iter), opt_len);
for (i = 0; i < *no_items; i++) {
u16a_res[i] = ntohs(u16a_res[i]);
}
*u16a = u16a_res;
return (0);
}
int
tlv_iter_decode_supported_options(struct tlv_iterator *tlv_iter,
enum tlv_opt_type **supported_options, size_t *no_supported_options)
{
uint16_t *u16a;
enum tlv_opt_type *tlv_opt_array;
size_t i;
int res;
res = tlv_iter_decode_u16_array(tlv_iter, &u16a, no_supported_options);
if (res != 0) {
return (res);
}
tlv_opt_array = malloc(sizeof(enum tlv_opt_type) * *no_supported_options);
if (tlv_opt_array == NULL) {
free(u16a);
return (-2);
}
for (i = 0; i < *no_supported_options; i++) {
tlv_opt_array[i] = (enum tlv_opt_type)u16a[i];
}
free(u16a);
*supported_options = tlv_opt_array;
return (0);
}
int
tlv_iter_decode_supported_decision_algorithms(struct tlv_iterator *tlv_iter,
enum tlv_decision_algorithm_type **supported_decision_algorithms,
size_t *no_supported_decision_algorithms)
{
uint16_t *u16a;
enum tlv_decision_algorithm_type *tlv_decision_algorithm_type_array;
size_t i;
int res;
res = tlv_iter_decode_u16_array(tlv_iter, &u16a, no_supported_decision_algorithms);
if (res != 0) {
return (res);
}
tlv_decision_algorithm_type_array = malloc(
sizeof(enum tlv_decision_algorithm_type) * *no_supported_decision_algorithms);
if (tlv_decision_algorithm_type_array == NULL) {
free(u16a);
return (-2);
}
for (i = 0; i < *no_supported_decision_algorithms; i++) {
tlv_decision_algorithm_type_array[i] = (enum tlv_decision_algorithm_type)u16a[i];
}
free(u16a);
*supported_decision_algorithms = tlv_decision_algorithm_type_array;
return (0);
}
int
tlv_iter_decode_u16(struct tlv_iterator *tlv_iter, uint16_t *u16)
{
const char *opt_data;
uint16_t opt_len;
uint16_t nu16;
opt_len = tlv_iter_get_len(tlv_iter);
opt_data = tlv_iter_get_data(tlv_iter);
if (opt_len != sizeof(nu16)) {
return (-1);
}
memcpy(&nu16, opt_data, sizeof(nu16));
*u16 = ntohs(nu16);
return (0);
}
int
tlv_iter_decode_u64(struct tlv_iterator *tlv_iter, uint64_t *u64)
{
const char *opt_data;
uint64_t opt_len;
uint64_t nu64;
opt_len = tlv_iter_get_len(tlv_iter);
opt_data = tlv_iter_get_data(tlv_iter);
if (opt_len != sizeof(nu64)) {
return (-1);
}
memcpy(&nu64, opt_data, sizeof(nu64));
*u64 = be64toh(nu64);
return (0);
}
int
tlv_iter_decode_reply_error_code(struct tlv_iterator *tlv_iter,
enum tlv_reply_error_code *reply_error_code)
{
return (tlv_iter_decode_u16(tlv_iter, (uint16_t *)reply_error_code));
}
int
tlv_iter_decode_tls_supported(struct tlv_iterator *tlv_iter, enum tlv_tls_supported *tls_supported)
{
uint8_t u8;
enum tlv_tls_supported tmp_tls_supported;
if (tlv_iter_decode_u8(tlv_iter, &u8) != 0) {
return (-1);
}
tmp_tls_supported = u8;
if (tmp_tls_supported != TLV_TLS_UNSUPPORTED &&
tmp_tls_supported != TLV_TLS_SUPPORTED &&
tmp_tls_supported != TLV_TLS_REQUIRED) {
return (-4);
}
*tls_supported = tmp_tls_supported;
return (0);
}
int
tlv_iter_decode_decision_algorithm(struct tlv_iterator *tlv_iter,
enum tlv_decision_algorithm_type *decision_algorithm)
{
uint16_t u16;
if (tlv_iter_decode_u16(tlv_iter, &u16) != 0) {
return (-1);
}
*decision_algorithm = (enum tlv_decision_algorithm_type)u16;
return (0);
}
int
tlv_iter_decode_ring_id(struct tlv_iterator *tlv_iter, struct tlv_ring_id *ring_id)
{
const char *opt_data;
uint16_t opt_len;
uint32_t nu32;
uint64_t nu64;
char tmp_buf[12];
opt_len = tlv_iter_get_len(tlv_iter);
opt_data = tlv_iter_get_data(tlv_iter);
if (opt_len != sizeof(tmp_buf)) {
return (-1);
}
memcpy(&nu32, opt_data, sizeof(nu32));
memcpy(&nu64, opt_data + sizeof(nu32), sizeof(nu64));
ring_id->node_id = ntohl(nu32);
ring_id->seq = be64toh(nu64);
return (0);
}
int
tlv_iter_decode_tie_breaker(struct tlv_iterator *tlv_iter, struct tlv_tie_breaker *tie_breaker)
{
const char *opt_data;
uint16_t opt_len;
uint32_t nu32;
uint8_t u8;
enum tlv_tie_breaker_mode tie_breaker_mode;
char tmp_buf[5];
opt_len = tlv_iter_get_len(tlv_iter);
opt_data = tlv_iter_get_data(tlv_iter);
if (opt_len != sizeof(tmp_buf)) {
return (-1);
}
memcpy(&u8, opt_data, sizeof(u8));
tie_breaker_mode = u8;
if (tie_breaker_mode != TLV_TIE_BREAKER_MODE_LOWEST &&
tie_breaker_mode != TLV_TIE_BREAKER_MODE_HIGHEST &&
tie_breaker_mode != TLV_TIE_BREAKER_MODE_NODE_ID) {
return (-4);
}
memcpy(&nu32, opt_data + sizeof(u8), sizeof(nu32));
tie_breaker->mode = tie_breaker_mode;
tie_breaker->node_id = (tie_breaker->mode == TLV_TIE_BREAKER_MODE_NODE_ID ?
ntohl(nu32) : 0);
return (0);
}
int
tlv_iter_decode_node_state(struct tlv_iterator *tlv_iter, enum tlv_node_state *node_state)
{
uint8_t u8;
enum tlv_node_state tmp_node_state;
if (tlv_iter_decode_u8(tlv_iter, &u8) != 0) {
return (-1);
}
tmp_node_state = u8;
if (tmp_node_state != TLV_NODE_STATE_MEMBER &&
tmp_node_state != TLV_NODE_STATE_DEAD &&
tmp_node_state != TLV_NODE_STATE_LEAVING) {
return (-4);
}
*node_state = tmp_node_state;
return (0);
}
int
tlv_iter_decode_node_info(struct tlv_iterator *tlv_iter, struct tlv_node_info *node_info)
{
struct tlv_iterator data_tlv_iter;
int iter_res;
int res;
enum tlv_opt_type opt_type;
struct tlv_node_info tmp_node_info;
memset(&tmp_node_info, 0, sizeof(tmp_node_info));
tlv_iter_init_str(tlv_iter_get_data(tlv_iter), tlv_iter_get_len(tlv_iter), 0,
&data_tlv_iter);
while ((iter_res = tlv_iter_next(&data_tlv_iter)) > 0) {
opt_type = tlv_iter_get_type(&data_tlv_iter);
switch (opt_type) {
case TLV_OPT_NODE_ID:
if ((res = tlv_iter_decode_u32(&data_tlv_iter,
&tmp_node_info.node_id)) != 0) {
return (res);
}
break;
case TLV_OPT_DATA_CENTER_ID:
if ((res = tlv_iter_decode_u32(&data_tlv_iter,
&tmp_node_info.data_center_id)) != 0) {
return (res);
}
break;
case TLV_OPT_NODE_STATE:
if ((res = tlv_iter_decode_node_state(&data_tlv_iter,
&tmp_node_info.node_state)) != 0) {
return (res);
}
break;
default:
/*
* Other options are not processed
*/
break;
}
}
if (iter_res != 0) {
return (-3);
}
if (tmp_node_info.node_id == 0) {
return (-4);
}
memcpy(node_info, &tmp_node_info, sizeof(tmp_node_info));
return (0);
}
int
tlv_iter_decode_node_list_type(struct tlv_iterator *tlv_iter,
enum tlv_node_list_type *node_list_type)
{
uint8_t u8;
enum tlv_node_list_type tmp_node_list_type;
if (tlv_iter_decode_u8(tlv_iter, &u8) != 0) {
return (-1);
}
tmp_node_list_type = u8;
if (tmp_node_list_type != TLV_NODE_LIST_TYPE_INITIAL_CONFIG &&
tmp_node_list_type != TLV_NODE_LIST_TYPE_CHANGED_CONFIG &&
tmp_node_list_type != TLV_NODE_LIST_TYPE_MEMBERSHIP &&
tmp_node_list_type != TLV_NODE_LIST_TYPE_QUORUM) {
return (-4);
}
*node_list_type = tmp_node_list_type;
return (0);
}
int
tlv_iter_decode_vote(struct tlv_iterator *tlv_iter, enum tlv_vote *vote)
{
uint8_t u8;
enum tlv_vote tmp_vote;
if (tlv_iter_decode_u8(tlv_iter, &u8) != 0) {
return (-1);
}
tmp_vote = u8;
if (tmp_vote != TLV_VOTE_ACK &&
tmp_vote != TLV_VOTE_NACK &&
tmp_vote != TLV_VOTE_ASK_LATER &&
tmp_vote != TLV_VOTE_WAIT_FOR_REPLY &&
tmp_vote != TLV_VOTE_NO_CHANGE) {
return (-4);
}
*vote = tmp_vote;
return (0);
}
int
tlv_iter_decode_quorate(struct tlv_iterator *tlv_iter, enum tlv_quorate *quorate)
{
uint8_t u8;
enum tlv_quorate tmp_quorate;
if (tlv_iter_decode_u8(tlv_iter, &u8) != 0) {
return (-1);
}
tmp_quorate = u8;
if (tmp_quorate != TLV_QUORATE_QUORATE &&
tmp_quorate != TLV_QUORATE_INQUORATE) {
return (-4);
}
*quorate = tmp_quorate;
return (0);
}
void
tlv_get_supported_options(enum tlv_opt_type **supported_options, size_t *no_supported_options)
{
*supported_options = tlv_static_supported_options;
*no_supported_options = TLV_STATIC_SUPPORTED_OPTIONS_SIZE;
}
int
tlv_ring_id_eq(const struct tlv_ring_id *rid1, const struct tlv_ring_id *rid2)
{
return (rid1->node_id == rid2->node_id && rid1->seq == rid2->seq);
}
int
tlv_tie_breaker_eq(const struct tlv_tie_breaker *tb1, const struct tlv_tie_breaker *tb2)
{
if (tb1->mode == tb2->mode && tb1->mode == TLV_TIE_BREAKER_MODE_NODE_ID) {
return (tb1->node_id == tb2->node_id);
}
return (tb1->mode == tb2->mode);
}
const char *
tlv_vote_to_str(enum tlv_vote vote)
{
switch (vote) {
case TLV_VOTE_UNDEFINED: break;
case TLV_VOTE_ACK: return ("ACK"); break;
case TLV_VOTE_NACK: return ("NACK"); break;
case TLV_VOTE_ASK_LATER: return ("Ask later"); break;
case TLV_VOTE_WAIT_FOR_REPLY: return ("Wait for reply"); break;
case TLV_VOTE_NO_CHANGE: return ("No change"); break;
}
return ("Unknown vote value");
}
const char *
tlv_node_state_to_str(enum tlv_node_state state)
{
switch (state) {
case TLV_NODE_STATE_NOT_SET: return ("not set"); break;
case TLV_NODE_STATE_MEMBER: return ("member"); break;
case TLV_NODE_STATE_DEAD: return ("dead"); break;
case TLV_NODE_STATE_LEAVING: return ("leaving"); break;
}
return ("Unhandled node state");
}
const char *
tlv_tls_supported_to_str(enum tlv_tls_supported tls_supported)
{
switch (tls_supported) {
case TLV_TLS_UNSUPPORTED: return ("Unsupported"); break;
case TLV_TLS_SUPPORTED: return ("Supported"); break;
case TLV_TLS_REQUIRED: return ("Required"); break;
}
return ("Unhandled tls supported state");
}
const char *
tlv_decision_algorithm_type_to_str(enum tlv_decision_algorithm_type algorithm)
{
switch (algorithm) {
case TLV_DECISION_ALGORITHM_TYPE_TEST: return ("Test"); break;
case TLV_DECISION_ALGORITHM_TYPE_FFSPLIT: return ("Fifty-Fifty split"); break;
case TLV_DECISION_ALGORITHM_TYPE_2NODELMS: return ("2 Node LMS"); break;
case TLV_DECISION_ALGORITHM_TYPE_LMS: return ("LMS"); break;
}
return ("Unknown algorithm");
}
diff --git a/test/cpgbench.c b/test/cpgbench.c
index 23fc2b0a..059effe2 100644
--- a/test/cpgbench.c
+++ b/test/cpgbench.c
@@ -1,198 +1,197 @@
/*
* Copyright (c) 2006, 2009 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Steven Dake (sdake@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <config.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <unistd.h>
#include <errno.h>
-#include <unistd.h>
#include <time.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <sys/uio.h>
#include <sys/un.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <qb/qblog.h>
#include <qb/qbutil.h>
#include <corosync/corotypes.h>
#include <corosync/cpg.h>
static cpg_handle_t handle;
static pthread_t thread;
#ifndef timersub
#define timersub(a, b, result) \
do { \
(result)->tv_sec = (a)->tv_sec - (b)->tv_sec; \
(result)->tv_usec = (a)->tv_usec - (b)->tv_usec; \
if ((result)->tv_usec < 0) { \
--(result)->tv_sec; \
(result)->tv_usec += 1000000; \
} \
} while (0)
#endif /* timersub */
static int alarm_notice;
static void cpg_bm_confchg_fn (
cpg_handle_t handle_in,
const struct cpg_name *group_name,
const struct cpg_address *member_list, size_t member_list_entries,
const struct cpg_address *left_list, size_t left_list_entries,
const struct cpg_address *joined_list, size_t joined_list_entries)
{
}
static unsigned int write_count;
static void cpg_bm_deliver_fn (
cpg_handle_t handle_in,
const struct cpg_name *group_name,
uint32_t nodeid,
uint32_t pid,
void *msg,
size_t msg_len)
{
write_count++;
}
static cpg_callbacks_t callbacks = {
.cpg_deliver_fn = cpg_bm_deliver_fn,
.cpg_confchg_fn = cpg_bm_confchg_fn
};
#define ONE_MEG 1048576
static char data[ONE_MEG];
static void cpg_benchmark (
cpg_handle_t handle_in,
int write_size)
{
struct timeval tv1, tv2, tv_elapsed;
struct iovec iov;
unsigned int res;
alarm_notice = 0;
iov.iov_base = data;
iov.iov_len = write_size;
write_count = 0;
alarm (10);
gettimeofday (&tv1, NULL);
do {
res = cpg_mcast_joined (handle_in, CPG_TYPE_AGREED, &iov, 1);
} while (alarm_notice == 0 && (res == CS_OK || res == CS_ERR_TRY_AGAIN));
gettimeofday (&tv2, NULL);
timersub (&tv2, &tv1, &tv_elapsed);
printf ("%5d messages received ", write_count);
printf ("%5d bytes per write ", write_size);
printf ("%7.3f Seconds runtime ",
(tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)));
printf ("%9.3f TP/s ",
((float)write_count) / (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)));
printf ("%7.3f MB/s.\n",
((float)write_count) * ((float)write_size) / ((tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)) * 1000000.0));
}
static void sigalrm_handler (int num)
{
alarm_notice = 1;
}
static struct cpg_name group_name = {
.value = "cpg_bm",
.length = 6
};
static void* dispatch_thread (void *arg)
{
cpg_dispatch (handle, CS_DISPATCH_BLOCKING);
return NULL;
}
int main (void) {
unsigned int size;
int i;
unsigned int res;
qb_log_init("cpgbench", LOG_USER, LOG_EMERG);
qb_log_ctl(QB_LOG_SYSLOG, QB_LOG_CONF_ENABLED, QB_FALSE);
qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD,
QB_LOG_FILTER_FILE, "*", LOG_DEBUG);
qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE);
size = 64;
signal (SIGALRM, sigalrm_handler);
res = cpg_initialize (&handle, &callbacks);
if (res != CS_OK) {
printf ("cpg_initialize failed with result %d\n", res);
exit (1);
}
pthread_create (&thread, NULL, dispatch_thread, NULL);
res = cpg_join (handle, &group_name);
if (res != CS_OK) {
printf ("cpg_join failed with result %d\n", res);
exit (1);
}
for (i = 0; i < 10; i++) { /* number of repetitions - up to 50k */
cpg_benchmark (handle, size);
signal (SIGALRM, sigalrm_handler);
size *= 5;
if (size >= (ONE_MEG - 100)) {
break;
}
}
res = cpg_finalize (handle);
if (res != CS_OK) {
printf ("cpg_finalize failed with result %d\n", res);
exit (1);
}
return (0);
}
diff --git a/test/cpgbenchzc.c b/test/cpgbenchzc.c
index db78289a..92f55ee9 100644
--- a/test/cpgbenchzc.c
+++ b/test/cpgbenchzc.c
@@ -1,195 +1,193 @@
/*
* Copyright (c) 2006, 2009 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Steven Dake (sdake@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <config.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <unistd.h>
#include <errno.h>
-#include <unistd.h>
#include <time.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <sys/un.h>
-#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <corosync/corotypes.h>
#include <corosync/cpg.h>
#include "../lib/util.h"
#ifndef timersub
#define timersub(a, b, result) \
do { \
(result)->tv_sec = (a)->tv_sec - (b)->tv_sec; \
(result)->tv_usec = (a)->tv_usec - (b)->tv_usec; \
if ((result)->tv_usec < 0) { \
--(result)->tv_sec; \
(result)->tv_usec += 1000000; \
} \
} while (0)
#endif
static int alarm_notice;
static void cpg_bm_confchg_fn (
cpg_handle_t handle,
const struct cpg_name *group_name,
const struct cpg_address *member_list, size_t member_list_entries,
const struct cpg_address *left_list, size_t left_list_entries,
const struct cpg_address *joined_list, size_t joined_list_entries)
{
}
static unsigned int write_count;
static void cpg_bm_deliver_fn (
cpg_handle_t handle,
const struct cpg_name *group_name,
uint32_t nodeid,
uint32_t pid,
void *msg,
size_t msg_len)
{
write_count++;
}
static cpg_callbacks_t callbacks = {
.cpg_deliver_fn = cpg_bm_deliver_fn,
.cpg_confchg_fn = cpg_bm_confchg_fn
};
void *data;
static void cpg_benchmark (
cpg_handle_t handle,
int write_size)
{
struct timeval tv1, tv2, tv_elapsed;
unsigned int res;
cpg_flow_control_state_t flow_control_state;
alarm_notice = 0;
write_count = 0;
alarm (10);
gettimeofday (&tv1, NULL);
do {
/*
* Test checkpoint write
*/
cpg_flow_control_state_get (handle, &flow_control_state);
if (flow_control_state == CPG_FLOW_CONTROL_DISABLED) {
retry:
res = cpg_zcb_mcast_joined (handle, CPG_TYPE_AGREED, data, write_size);
if (res == CS_ERR_TRY_AGAIN) {
goto retry;
}
}
res = cpg_dispatch (handle, CS_DISPATCH_ALL);
if (res != CS_OK) {
printf ("cpg dispatch returned error %d\n", res);
exit (1);
}
} while (alarm_notice == 0);
gettimeofday (&tv2, NULL);
timersub (&tv2, &tv1, &tv_elapsed);
printf ("%5d messages received ", write_count);
printf ("%5d bytes per write ", write_size);
printf ("%7.3f Seconds runtime ",
(tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)));
printf ("%9.3f TP/s ",
((float)write_count) / (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)));
printf ("%7.3f MB/s.\n",
((float)write_count) * ((float)write_size) / ((tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)) * 1000000.0));
}
static void sigalrm_handler (int num)
{
alarm_notice = 1;
}
static struct cpg_name group_name = {
.value = "cpg_bm",
.length = 6
};
int main (void) {
cpg_handle_t handle;
unsigned int size;
int i;
unsigned int res;
size = 1000;
signal (SIGALRM, sigalrm_handler);
res = cpg_initialize (&handle, &callbacks);
if (res != CS_OK) {
printf ("cpg_initialize failed with result %d\n", res);
exit (1);
}
cpg_zcb_alloc (handle, 500000, &data);
if (res != CS_OK) {
printf ("cpg_zcb_alloc couldn't allocate zero copy buffer %d\n", res);
exit (1);
}
res = cpg_join (handle, &group_name);
if (res != CS_OK) {
printf ("cpg_join failed with result %s\n", cs_strerror(res));
exit (1);
}
for (i = 0; i < 50; i++) { /* number of repetitions - up to 50k */
cpg_benchmark (handle, size);
size += 1000;
}
res = cpg_finalize (handle);
if (res != CS_OK) {
printf ("cpg_finalize failed with result %s\n", cs_strerror(res));
exit (1);
}
return (0);
}
diff --git a/test/cpghum.c b/test/cpghum.c
index 5772f818..006bf45e 100644
--- a/test/cpghum.c
+++ b/test/cpghum.c
@@ -1,437 +1,436 @@
/*
* Copyright (c) 2015 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Christine Caulfield <ccaulfie@redhat.com>
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <unistd.h>
#include <errno.h>
-#include <unistd.h>
#include <time.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <sys/uio.h>
#include <sys/un.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <zlib.h>
#include <libgen.h>
#include <qb/qblog.h>
#include <qb/qbutil.h>
#include <corosync/corotypes.h>
#include <corosync/cpg.h>
static cpg_handle_t handle;
static pthread_t thread;
#ifndef timersub
#define timersub(a, b, result) \
do { \
(result)->tv_sec = (a)->tv_sec - (b)->tv_sec; \
(result)->tv_usec = (a)->tv_usec - (b)->tv_usec; \
if ((result)->tv_usec < 0) { \
--(result)->tv_sec; \
(result)->tv_usec += 1000000; \
} \
} while (0)
#endif /* timersub */
static int alarm_notice;
#define ONE_MEG 1048576
#define DATASIZE (ONE_MEG*20)
static char data[DATASIZE];
static int send_counter = 0;
static int do_syslog = 0;
static int quiet = 0;
static volatile int stopped;
// stats
static unsigned int length_errors=0;
static unsigned int crc_errors=0;
static unsigned int sequence_errors=0;
static unsigned int packets_sent=0;
static unsigned int packets_recvd=0;
static unsigned int send_retries=0;
static unsigned int send_fails=0;
static void cpg_bm_confchg_fn (
cpg_handle_t handle_in,
const struct cpg_name *group_name,
const struct cpg_address *member_list, size_t member_list_entries,
const struct cpg_address *left_list, size_t left_list_entries,
const struct cpg_address *joined_list, size_t joined_list_entries)
{
}
static unsigned int g_recv_count;
static unsigned int g_recv_length;
static unsigned int g_write_size;
static int g_recv_counter = 0;
static void cpg_bm_deliver_fn (
cpg_handle_t handle_in,
const struct cpg_name *group_name,
uint32_t nodeid,
uint32_t pid,
void *msg,
size_t msg_len)
{
int *value = msg;
uLong crc=0;
uLong recv_crc = value[1] & 0xFFFFFFFF;
packets_recvd++;
g_recv_length = msg_len;
// Basic check, packets should all be the right size
if (g_write_size && (msg_len != g_write_size)) {
length_errors++;
fprintf(stderr, "%s: message sizes don't match. got %lu, expected %u\n", group_name->value, msg_len, g_write_size);
if (do_syslog) {
syslog(LOG_ERR, "%s: message sizes don't match. got %lu, expected %u\n", group_name->value, msg_len, g_write_size);
}
}
// Sequence counters are incrementing in step?
if (*value != g_recv_counter) {
sequence_errors++;
fprintf(stderr, "%s: counters don't match. got %d, expected %d\n", group_name->value, *value, g_recv_counter);
if (do_syslog) {
syslog(LOG_ERR, "%s: counters don't match. got %d, expected %d\n", group_name->value, *value, g_recv_counter);
}
// Catch up or we'll be printing errors for ever
g_recv_counter = *value +1;
} else {
g_recv_counter++;
}
// Check crc
crc = crc32(0, NULL, 0);
crc = crc32(crc, (Bytef *)&value[2], msg_len-sizeof(int)*2) & 0xFFFFFFFF;
if (crc != recv_crc) {
crc_errors++;
fprintf(stderr, "%s: CRCs don't match. got %lx, expected %lx\n", group_name->value, recv_crc, crc);
if (do_syslog) {
syslog(LOG_ERR, "%s: CRCs don't match. got %lx, expected %lx\n", group_name->value, recv_crc, crc);
}
}
g_recv_count++;
}
static cpg_model_v1_data_t model1_data = {
.cpg_deliver_fn = cpg_bm_deliver_fn,
.cpg_confchg_fn = cpg_bm_confchg_fn,
};
static cpg_callbacks_t callbacks = {
.cpg_deliver_fn = cpg_bm_deliver_fn,
.cpg_confchg_fn = cpg_bm_confchg_fn
};
static struct cpg_name group_name = {
.value = "cpghum",
.length = 7
};
static void cpg_test (
cpg_handle_t handle_in,
int write_size,
int delay_time,
int print_time)
{
struct timeval tv1, tv2, tv_elapsed;
struct iovec iov;
unsigned int res;
int i;
unsigned int *dataint = (unsigned int *)data;
uLong crc;
alarm_notice = 0;
iov.iov_base = data;
iov.iov_len = write_size;
g_recv_count = 0;
alarm (print_time);
gettimeofday (&tv1, NULL);
do {
dataint[0] = send_counter++;
for (i=2; i<(DATASIZE-sizeof(int)*2)/4; i++) {
dataint[i] = rand();
}
crc = crc32(0, NULL, 0);
dataint[1] = crc32(crc, (Bytef*)&dataint[2], write_size-sizeof(int)*2);
resend:
res = cpg_mcast_joined (handle_in, CPG_TYPE_AGREED, &iov, 1);
if (res == CS_ERR_TRY_AGAIN) {
usleep(10000);
send_retries++;
goto resend;
}
if (res != CS_OK) {
fprintf(stderr, "send failed: %d\n", res);
send_fails++;
}
else {
packets_sent++;
}
usleep(delay_time*1000);
} while (alarm_notice == 0 && (res == CS_OK || res == CS_ERR_TRY_AGAIN) && stopped == 0);
gettimeofday (&tv2, NULL);
timersub (&tv2, &tv1, &tv_elapsed);
if (!quiet) {
printf ("%s: %5d message%s received, ", group_name.value, g_recv_count, g_recv_count==1?"":"s");
printf ("%5d bytes per write\n", write_size);
}
}
static void sigalrm_handler (int num)
{
alarm_notice = 1;
}
static void sigint_handler (int num)
{
stopped = 1;
}
static void* dispatch_thread (void *arg)
{
cpg_dispatch (handle, CS_DISPATCH_BLOCKING);
return NULL;
}
static void usage(char *cmd)
{
fprintf(stderr, "%s [OPTIONS]\n", cmd);
fprintf(stderr, "\n");
fprintf(stderr, "%s sends CPG messages to all registered users of the CPG.\n", cmd);
fprintf(stderr, "The messages have a sequence number and a CRC so that missing or\n");
fprintf(stderr, "corrupted messages will be detected and reported.\n");
fprintf(stderr, "\n");
fprintf(stderr, "%s can also be asked to simply listen for (and check) packets\n", cmd);
fprintf(stderr, "so that there is another node in the cluster connected to the CPG.\n");
fprintf(stderr, "\n");
fprintf(stderr, "When -l is present, packet size is only checked if specified by -w or -W\n");
fprintf(stderr, "and it, obviously, must match that of the sender.\n");
fprintf(stderr, "\n");
fprintf(stderr, "Multiple copies, in different CPGs, can also be run on the same or\n");
fprintf(stderr, "different nodes by using the -n option.\n");
fprintf(stderr, "\n");
fprintf(stderr, "%s can't handle more than 1 sender in the same CPG as it messes with the\n", cmd);
fprintf(stderr, "sequence numbers.\n");
fprintf(stderr, "\n");
fprintf(stderr, " -w Write size in Kbytes, default 4\n");
fprintf(stderr, " -W Write size in bytes, default 4096\n");
fprintf(stderr, " -n CPG name to use, default 'cpghum'\n");
fprintf(stderr, " -d Delay between sending packets (mS), default 1000\n");
fprintf(stderr, " -r Number of repetitions, default 100\n");
fprintf(stderr, " -p Delay between printing output(S), default 10s\n");
fprintf(stderr, " -l Listen and check CRCs only, don't send (^C to quit)\n");
fprintf(stderr, " -m cpg_initialise() model. Default 1.\n");
fprintf(stderr, " -s Also send errors to syslog (for daemon log correlation).\n");
fprintf(stderr, " -q Quiet. Don't print messages every 10 seconds (see also -p)\n");
fprintf(stderr, "\n");
}
int main (int argc, char *argv[]) {
int i;
unsigned int res;
uint32_t maxsize;
int opt;
int bs;
int write_size = 4096;
int delay_time = 1000;
int repetitions = 100;
int print_time = 10;
int have_size = 0;
int listen_only = 0;
int model = 1;
while ( (opt = getopt(argc, argv, "qlsn:d:r:p:m:w:W:")) != -1 ) {
switch (opt) {
case 'w': // Write size in K
bs = atoi(optarg);
if (bs > 0) {
write_size = bs*1024;
have_size = 1;
}
break;
case 'W': // Write size in bytes
bs = atoi(optarg);
if (bs > 0) {
write_size = bs;
have_size = 1;
}
break;
case 'n':
if (strlen(optarg) >= CPG_MAX_NAME_LENGTH) {
fprintf(stderr, "CPG name too long\n");
exit(1);
}
strcpy(group_name.value, optarg);
group_name.length = strlen(group_name.value);
break;
case 'd':
delay_time = atoi(optarg);
break;
case 'r':
repetitions = atoi(optarg);
break;
case 'p':
print_time = atoi(optarg);
break;
case 'l':
listen_only = 1;
break;
case 's':
do_syslog = 1;
break;
case 'q':
quiet = 1;
break;
case 'm':
model = atoi(optarg);
if (model < 0 || model > 1) {
fprintf(stderr, "%s: Model must be 0-1\n", argv[0]);
exit(1);
}
break;
case '?':
usage(basename(argv[0]));
exit(0);
}
}
qb_log_init("cpghum", LOG_USER, LOG_EMERG);
qb_log_ctl(QB_LOG_SYSLOG, QB_LOG_CONF_ENABLED, QB_FALSE);
qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD,
QB_LOG_FILTER_FILE, "*", LOG_DEBUG);
qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE);
g_write_size = write_size;
signal (SIGALRM, sigalrm_handler);
signal (SIGINT, sigint_handler);
switch (model) {
case 0:
res = cpg_initialize (&handle, &callbacks);
break;
case 1:
res = cpg_model_initialize (&handle, CPG_MODEL_V1, (cpg_model_data_t *)&model1_data, NULL);
break;
default:
res=999; // can't get here but it keeps the compiler happy
break;
}
if (res != CS_OK) {
printf ("cpg_initialize failed with result %d\n", res);
exit (1);
}
pthread_create (&thread, NULL, dispatch_thread, NULL);
res = cpg_join (handle, &group_name);
if (res != CS_OK) {
printf ("cpg_join failed with result %d\n", res);
exit (1);
}
if (listen_only) {
int secs = 0;
if (!quiet) {
printf("-- Listening on CPG %s\n", group_name.value);
printf("-- Ignore any starting \"counters don't match\" error while we catch up\n");
}
/* Only check packet size if specified on the command-line */
if (!have_size) {
g_write_size = 0;
}
while (!stopped) {
sleep(1);
if (++secs > print_time && !quiet) {
printf ("%s: %5d message%s received. %d bytes\n", group_name.value, g_recv_count, g_recv_count==1?"":"s", g_recv_length);
secs = 0;
g_recv_count = 0;
}
}
}
else {
cpg_max_atomic_msgsize_get (handle, &maxsize);
if ( write_size > maxsize) {
fprintf(stderr, "INFO: packet size (%d) is larger than the maximum atomic size (%d), libcpg will fragment\n",
write_size, maxsize);
}
for (i = 0; i < repetitions && !stopped; i++) {
cpg_test (handle, write_size, delay_time, print_time);
signal (SIGALRM, sigalrm_handler);
}
}
res = cpg_finalize (handle);
if (res != CS_OK) {
printf ("cpg_finalize failed with result %d\n", res);
exit (1);
}
printf("\n");
printf("Stats:\n");
if (!listen_only) {
printf(" packets sent: %d\n", packets_sent);
printf(" send failures: %d\n", send_fails);
printf(" send retries: %d\n", send_retries);
}
if (have_size) {
printf(" length errors: %d\n", length_errors);
}
printf(" packets recvd: %d\n", packets_recvd);
printf(" sequence errors: %d\n", sequence_errors);
printf(" crc errors: %d\n", crc_errors);
printf("\n");
return (0);
}
diff --git a/tools/corosync-notifyd.c b/tools/corosync-notifyd.c
index 507a2485..8f37a307 100644
--- a/tools/corosync-notifyd.c
+++ b/tools/corosync-notifyd.c
@@ -1,1222 +1,1219 @@
/*
* Copyright (c) 2011-2012 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <config.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/types.h>
#include <netdb.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include <ctype.h>
#include <poll.h>
#include <signal.h>
#include <qb/qbdefs.h>
#include <qb/qbloop.h>
#include <qb/qblog.h>
-#include <qb/qbdefs.h>
-#include <qb/qbloop.h>
-
#include <corosync/corotypes.h>
#include <corosync/cfg.h>
#include <corosync/quorum.h>
#include <corosync/cmap.h>
/*
* generic declarations
*/
enum {
CS_NTF_LOG,
CS_NTF_STDOUT,
CS_NTF_SNMP,
CS_NTF_DBUS,
CS_NTF_FG,
CS_NTF_MAX,
};
static int conf[CS_NTF_MAX];
static int32_t _cs_is_quorate = 0;
typedef void (*node_membership_fn_t)(char *nodename, uint32_t nodeid, char *state, char* ip);
typedef void (*node_quorum_fn_t)(char *nodename, uint32_t nodeid, const char *state);
typedef void (*application_connection_fn_t)(char *nodename, uint32_t nodeid, char *app_name, const char *state);
typedef void (*rrp_faulty_fn_t)(char *nodename, uint32_t nodeid, uint32_t iface_no, const char *state);
struct notify_callbacks {
node_membership_fn_t node_membership_fn;
node_quorum_fn_t node_quorum_fn;
application_connection_fn_t application_connection_fn;
rrp_faulty_fn_t rrp_faulty_fn;
};
#define MAX_NOTIFIERS 5
static int num_notifiers = 0;
static struct notify_callbacks notifiers[MAX_NOTIFIERS];
static uint32_t local_nodeid = 0;
static char local_nodename[CS_MAX_NAME_LENGTH];
static qb_loop_t *main_loop;
static quorum_handle_t quorum_handle;
static void _cs_node_membership_event(char *nodename, uint32_t nodeid, char *state, char* ip);
static void _cs_node_quorum_event(const char *state);
static void _cs_application_connection_event(char *app_name, const char *state);
static void _cs_rrp_faulty_event(uint32_t iface_no, const char *state);
#ifdef HAVE_DBUS
#include <dbus/dbus.h>
/*
* dbus
*/
#define DBUS_CS_NAME "org.corosync"
#define DBUS_CS_IFACE "org.corosync"
#define DBUS_CS_PATH "/org/corosync"
static DBusConnection *db = NULL;
static char _err[512];
static int err_set = 0;
static void _cs_dbus_init(void);
#endif /* HAVE_DBUS */
#ifdef ENABLE_SNMP
#include <net-snmp/net-snmp-config.h>
#include <net-snmp/snmpv3_api.h>
#include <net-snmp/agent/agent_trap.h>
#include <net-snmp/library/mib.h>
#include <net-snmp/library/snmp_api.h>
#include <net-snmp/library/snmp_client.h>
#include <net-snmp/library/snmp_debug.h>
enum snmp_node_status {
SNMP_NODE_STATUS_UNKNOWN = 0,
SNMP_NODE_STATUS_JOINED = 1,
SNMP_NODE_STATUS_LEFT = 2
};
#define SNMP_OID_COROSYNC "1.3.6.1.4.1.35488"
#define SNMP_OID_OBJECT_ROOT SNMP_OID_COROSYNC ".1"
#define SNMP_OID_OBJECT_NODE_NAME SNMP_OID_OBJECT_ROOT ".1"
#define SNMP_OID_OBJECT_NODE_ID SNMP_OID_OBJECT_ROOT ".2"
#define SNMP_OID_OBJECT_NODE_STATUS SNMP_OID_OBJECT_ROOT ".3"
#define SNMP_OID_OBJECT_NODE_ADDR SNMP_OID_OBJECT_ROOT ".4"
#define SNMP_OID_OBJECT_RINGSEQ SNMP_OID_OBJECT_ROOT ".20"
#define SNMP_OID_OBJECT_QUORUM SNMP_OID_OBJECT_ROOT ".21"
#define SNMP_OID_OBJECT_APP_NAME SNMP_OID_OBJECT_ROOT ".40"
#define SNMP_OID_OBJECT_APP_STATUS SNMP_OID_OBJECT_ROOT ".41"
#define SNMP_OID_OBJECT_RRP_IFACE_NO SNMP_OID_OBJECT_ROOT ".60"
#define SNMP_OID_OBJECT_RRP_STATUS SNMP_OID_OBJECT_ROOT ".61"
#define SNMP_OID_TRAPS_ROOT SNMP_OID_COROSYNC ".0"
#define SNMP_OID_TRAPS_NODE SNMP_OID_TRAPS_ROOT ".1"
#define SNMP_OID_TRAPS_QUORUM SNMP_OID_TRAPS_ROOT ".2"
#define SNMP_OID_TRAPS_APP SNMP_OID_TRAPS_ROOT ".3"
#define SNMP_OID_TRAPS_RRP SNMP_OID_TRAPS_ROOT ".4"
#define CS_TIMESTAMP_STR_LEN 20
static const char *local_host = "localhost";
#endif /* ENABLE_SNMP */
static char snmp_manager_buf[CS_MAX_NAME_LENGTH];
static char *snmp_manager = NULL;
#define CMAP_MAX_RETRIES 10
/*
* cmap
*/
static cmap_handle_t cmap_handle;
static int32_t _cs_ip_to_hostname(char* ip, char* name_out)
{
struct sockaddr_in sa;
int rc;
if (strchr(ip, ':') == NULL) {
sa.sin_family = AF_INET;
} else {
sa.sin_family = AF_INET6;
}
rc = inet_pton(sa.sin_family, ip, &sa.sin_addr);
if (rc == 0) {
return -EINVAL;
}
rc = getnameinfo((struct sockaddr*)&sa, sizeof(sa),
name_out, CS_MAX_NAME_LENGTH, NULL, 0, 0);
if (rc != 0) {
qb_log(LOG_ERR, 0, "error looking up %s : %s", ip, gai_strerror(rc));
return -EINVAL;
}
return 0;
}
static void _cs_cmap_members_key_changed (
cmap_handle_t cmap_handle_c,
cmap_track_handle_t cmap_track_handle,
int32_t event,
const char *key_name,
struct cmap_notify_value new_value,
struct cmap_notify_value old_value,
void *user_data)
{
char nodename[CS_MAX_NAME_LENGTH];
char* open_bracket = NULL;
char* close_bracket = NULL;
int res;
uint32_t nodeid;
char *ip_str;
char tmp_key[CMAP_KEYNAME_MAXLEN];
cs_error_t err;
int no_retries;
if (event != CMAP_TRACK_ADD && event != CMAP_TRACK_MODIFY) {
return ;
}
res = sscanf(key_name, "runtime.totem.pg.mrp.srp.members.%u.%s", &nodeid, tmp_key);
if (res != 2)
return ;
if (strcmp(tmp_key, "status") != 0) {
return ;
}
snprintf(tmp_key, CMAP_KEYNAME_MAXLEN, "runtime.totem.pg.mrp.srp.members.%u.ip", nodeid);
no_retries = 0;
while ((err = cmap_get_string(cmap_handle, tmp_key, &ip_str)) == CS_ERR_TRY_AGAIN &&
no_retries++ < CMAP_MAX_RETRIES) {
sleep(1);
}
if (err != CS_OK) {
return ;
}
/*
* We want the ip out of: "r(0) ip(192.168.100.92)"
*/
open_bracket = strrchr(ip_str, '(');
open_bracket++;
close_bracket = strrchr(open_bracket, ')');
*close_bracket = '\0';
_cs_ip_to_hostname(open_bracket, nodename);
_cs_node_membership_event(nodename, nodeid, (char *)new_value.data, open_bracket);
free(ip_str);
}
static void _cs_cmap_connections_key_changed (
cmap_handle_t cmap_handle_c,
cmap_track_handle_t cmap_track_handle,
int32_t event,
const char *key_name,
struct cmap_notify_value new_value,
struct cmap_notify_value old_value,
void *user_data)
{
char obj_name[CS_MAX_NAME_LENGTH];
char conn_str[CMAP_KEYNAME_MAXLEN];
char tmp_key[CMAP_KEYNAME_MAXLEN];
int res;
res = sscanf(key_name, "runtime.connections.%[^.].%s", conn_str, tmp_key);
if (res != 2) {
return ;
}
if (strcmp(tmp_key, "service_id") != 0) {
return ;
}
snprintf(obj_name, CS_MAX_NAME_LENGTH, "%s", conn_str);
if (event == CMAP_TRACK_ADD) {
_cs_application_connection_event(obj_name, "connected");
}
if (event == CMAP_TRACK_DELETE) {
_cs_application_connection_event(obj_name, "disconnected");
}
}
static void _cs_cmap_rrp_faulty_key_changed (
cmap_handle_t cmap_handle_c,
cmap_track_handle_t cmap_track_handle,
int32_t event,
const char *key_name,
struct cmap_notify_value new_value,
struct cmap_notify_value old_value,
void *user_data)
{
uint32_t iface_no;
char tmp_key[CMAP_KEYNAME_MAXLEN];
int res;
int no_retries;
uint8_t faulty;
cs_error_t err;
res = sscanf(key_name, "runtime.totem.pg.mrp.rrp.%u.%s", &iface_no, tmp_key);
if (res != 2) {
return ;
}
if (strcmp(tmp_key, "faulty") != 0) {
return ;
}
no_retries = 0;
while ((err = cmap_get_uint8(cmap_handle, key_name, &faulty)) == CS_ERR_TRY_AGAIN &&
no_retries++ < CMAP_MAX_RETRIES) {
sleep(1);
}
if (err != CS_OK) {
return ;
}
if (faulty) {
_cs_rrp_faulty_event(iface_no, "faulty");
} else {
_cs_rrp_faulty_event(iface_no, "operational");
}
}
static int
_cs_cmap_dispatch(int fd, int revents, void *data)
{
cs_error_t err;
err = cmap_dispatch(cmap_handle, CS_DISPATCH_ONE);
if (err != CS_OK && err != CS_ERR_TRY_AGAIN && err != CS_ERR_TIMEOUT &&
err != CS_ERR_QUEUE_FULL) {
qb_log(LOG_ERR, "Could not dispatch cmap events. Error %u", err);
qb_loop_stop(main_loop);
return -1;
}
return 0;
}
static void _cs_quorum_notification(quorum_handle_t handle,
uint32_t quorate, uint64_t ring_seq,
uint32_t view_list_entries, uint32_t *view_list)
{
if (_cs_is_quorate == quorate) {
return;
}
_cs_is_quorate = quorate;
if (quorate) {
_cs_node_quorum_event("quorate");
} else {
_cs_node_quorum_event("not quorate");
}
}
static int
_cs_quorum_dispatch(int fd, int revents, void *data)
{
cs_error_t err;
err = quorum_dispatch(quorum_handle, CS_DISPATCH_ONE);
if (err != CS_OK && err != CS_ERR_TRY_AGAIN && err != CS_ERR_TIMEOUT &&
err != CS_ERR_QUEUE_FULL) {
qb_log(LOG_ERR, "Could not dispatch quorum events. Error %u", err);
qb_loop_stop(main_loop);
return -1;
}
return 0;
}
static void
_cs_quorum_init(void)
{
cs_error_t rc;
uint32_t quorum_type;
int fd;
quorum_callbacks_t quorum_callbacks = {
.quorum_notify_fn = _cs_quorum_notification,
};
rc = quorum_initialize (&quorum_handle, &quorum_callbacks,
&quorum_type);
if (rc != CS_OK) {
qb_log(LOG_ERR, "Could not connect to corosync(quorum)");
return;
}
quorum_fd_get(quorum_handle, &fd);
qb_loop_poll_add(main_loop, QB_LOOP_MED, fd, POLLIN|POLLNVAL, NULL,
_cs_quorum_dispatch);
rc = quorum_trackstart(quorum_handle, CS_TRACK_CHANGES);
if (rc != CS_OK) {
qb_log(LOG_ERR, "Could not start tracking");
return;
}
}
static void
_cs_quorum_finalize(void)
{
quorum_finalize (quorum_handle);
}
#ifdef HAVE_DBUS
/*
* dbus notifications
*/
static void
_cs_dbus_auto_flush(void)
{
dbus_connection_ref(db);
while (dbus_connection_get_dispatch_status(db) == DBUS_DISPATCH_DATA_REMAINS) {
dbus_connection_dispatch(db);
}
while (dbus_connection_has_messages_to_send(db)) {
dbus_connection_flush(db);
}
dbus_connection_unref(db);
}
static void
_cs_dbus_release(void)
{
DBusError err;
if (!db)
return;
dbus_error_init(&err);
dbus_bus_release_name(db, DBUS_CS_NAME, &err);
dbus_error_free(&err);
dbus_connection_unref(db);
db = NULL;
}
static void
_cs_dbus_node_quorum_event(char *nodename, uint32_t nodeid, const char *state)
{
DBusMessage *msg = NULL;
if (err_set) {
qb_log(LOG_ERR, "%s", _err);
err_set = 0;
}
if (!db) {
goto out_free;
}
if (dbus_connection_get_is_connected(db) != TRUE) {
err_set = 1;
snprintf(_err, sizeof(_err), "DBus connection lost");
_cs_dbus_release();
goto out_unlock;
}
_cs_dbus_auto_flush();
if (!(msg = dbus_message_new_signal(DBUS_CS_PATH,
DBUS_CS_IFACE,
"QuorumStateChange"))) {
qb_log(LOG_ERR, "error creating dbus signal");
goto out_unlock;
}
if (!dbus_message_append_args(msg,
DBUS_TYPE_STRING, &nodename,
DBUS_TYPE_UINT32, &nodeid,
DBUS_TYPE_STRING, &state,
DBUS_TYPE_INVALID)) {
qb_log(LOG_ERR, "error adding args to quorum signal");
goto out_unlock;
}
dbus_connection_send(db, msg, NULL);
out_unlock:
if (msg) {
dbus_message_unref(msg);
}
out_free:
return;
}
static void
_cs_dbus_node_membership_event(char *nodename, uint32_t nodeid, char *state, char* ip)
{
DBusMessage *msg = NULL;
if (err_set) {
qb_log(LOG_ERR, "%s", _err);
err_set = 0;
}
if (!db) {
goto out_free;
}
if (dbus_connection_get_is_connected(db) != TRUE) {
err_set = 1;
snprintf(_err, sizeof(_err), "DBus connection lost");
_cs_dbus_release();
goto out_unlock;
}
_cs_dbus_auto_flush();
if (!(msg = dbus_message_new_signal(DBUS_CS_PATH,
DBUS_CS_IFACE,
"NodeStateChange"))) {
qb_log(LOG_ERR, "error creating NodeStateChange signal");
goto out_unlock;
}
if (!dbus_message_append_args(msg,
DBUS_TYPE_STRING, &nodename,
DBUS_TYPE_UINT32, &nodeid,
DBUS_TYPE_STRING, &ip,
DBUS_TYPE_STRING, &state,
DBUS_TYPE_INVALID)) {
qb_log(LOG_ERR, "error adding args to NodeStateChange signal");
goto out_unlock;
}
dbus_connection_send(db, msg, NULL);
out_unlock:
if (msg) {
dbus_message_unref(msg);
}
out_free:
return;
}
static void
_cs_dbus_application_connection_event(char *nodename, uint32_t nodeid, char *app_name, const char *state)
{
DBusMessage *msg = NULL;
if (err_set) {
qb_log(LOG_ERR, "%s", _err);
err_set = 0;
}
if (!db) {
goto out_free;
}
if (dbus_connection_get_is_connected(db) != TRUE) {
err_set = 1;
snprintf(_err, sizeof(_err), "DBus connection lost");
_cs_dbus_release();
goto out_unlock;
}
_cs_dbus_auto_flush();
if (!(msg = dbus_message_new_signal(DBUS_CS_PATH,
DBUS_CS_IFACE,
"ConnectionStateChange"))) {
qb_log(LOG_ERR, "error creating ConnectionStateChange signal");
goto out_unlock;
}
if (!dbus_message_append_args(msg,
DBUS_TYPE_STRING, &nodename,
DBUS_TYPE_UINT32, &nodeid,
DBUS_TYPE_STRING, &app_name,
DBUS_TYPE_STRING, &state,
DBUS_TYPE_INVALID)) {
qb_log(LOG_ERR, "error adding args to ConnectionStateChange signal");
goto out_unlock;
}
dbus_connection_send(db, msg, NULL);
out_unlock:
if (msg) {
dbus_message_unref(msg);
}
out_free:
return;
}
static void
_cs_dbus_rrp_faulty_event(char *nodename, uint32_t nodeid, uint32_t iface_no, const char *state)
{
DBusMessage *msg = NULL;
if (err_set) {
qb_log(LOG_ERR, "%s", _err);
err_set = 0;
}
if (!db) {
goto out_free;
}
if (dbus_connection_get_is_connected(db) != TRUE) {
err_set = 1;
snprintf(_err, sizeof(_err), "DBus connection lost");
_cs_dbus_release();
goto out_unlock;
}
_cs_dbus_auto_flush();
if (!(msg = dbus_message_new_signal(DBUS_CS_PATH,
DBUS_CS_IFACE,
"QuorumStateChange"))) {
qb_log(LOG_ERR, "error creating dbus signal");
goto out_unlock;
}
if (!dbus_message_append_args(msg,
DBUS_TYPE_STRING, &nodename,
DBUS_TYPE_UINT32, &nodeid,
DBUS_TYPE_UINT32, &iface_no,
DBUS_TYPE_STRING, &state,
DBUS_TYPE_INVALID)) {
qb_log(LOG_ERR, "error adding args to rrp signal");
goto out_unlock;
}
dbus_connection_send(db, msg, NULL);
out_unlock:
if (msg) {
dbus_message_unref(msg);
}
out_free:
return;
}
static void
_cs_dbus_init(void)
{
DBusConnection *dbc = NULL;
DBusError err;
dbus_error_init(&err);
dbc = dbus_bus_get(DBUS_BUS_SYSTEM, &err);
if (!dbc) {
snprintf(_err, sizeof(_err),
"dbus_bus_get: %s", err.message);
err_set = 1;
dbus_error_free(&err);
return;
}
dbus_connection_set_exit_on_disconnect(dbc, FALSE);
db = dbc;
notifiers[num_notifiers].node_membership_fn =
_cs_dbus_node_membership_event;
notifiers[num_notifiers].node_quorum_fn =
_cs_dbus_node_quorum_event;
notifiers[num_notifiers].application_connection_fn =
_cs_dbus_application_connection_event;
notifiers[num_notifiers].rrp_faulty_fn =
_cs_dbus_rrp_faulty_event;
num_notifiers++;
}
#endif /* HAVE_DBUS */
#ifdef ENABLE_SNMP
static netsnmp_session *snmp_init (const char *target)
{
static netsnmp_session *session = NULL;
#ifndef NETSNMPV54
char default_port[128];
snprintf (default_port, sizeof (default_port), "%s:162", target);
#endif
if (session) {
return (session);
}
if (target == NULL) {
return NULL;
}
session = malloc (sizeof (netsnmp_session));
snmp_sess_init (session);
session->version = SNMP_VERSION_2c;
session->callback = NULL;
session->callback_magic = NULL;
session = snmp_add(session,
#ifdef NETSNMPV54
netsnmp_transport_open_client ("snmptrap", target),
#else
netsnmp_tdomain_transport (default_port, 0, "udp"),
#endif
NULL, NULL);
if (session == NULL) {
qb_log(LOG_ERR, 0, "Could not create snmp transport");
}
return (session);
}
static inline void add_field (
netsnmp_pdu *trap_pdu,
u_char asn_type,
const char *prefix,
void *value,
size_t value_size)
{
oid _oid[MAX_OID_LEN];
size_t _oid_len = MAX_OID_LEN;
if (snmp_parse_oid(prefix, _oid, &_oid_len)) {
snmp_pdu_add_variable (trap_pdu, _oid, _oid_len, asn_type, (u_char *) value, value_size);
}
}
static void
_cs_snmp_node_membership_event(char *nodename, uint32_t nodeid, char *state, char* ip)
{
int ret;
char csysuptime[CS_TIMESTAMP_STR_LEN];
static oid snmptrap_oid[] = { 1,3,6,1,6,3,1,1,4,1,0 };
static oid sysuptime_oid[] = { 1,3,6,1,2,1,1,3,0 };
time_t now = time (NULL);
netsnmp_pdu *trap_pdu;
netsnmp_session *session = snmp_init (snmp_manager);
if (session == NULL) {
qb_log(LOG_NOTICE, "Failed to init SNMP session.");
return ;
}
trap_pdu = snmp_pdu_create (SNMP_MSG_TRAP2);
if (!trap_pdu) {
qb_log(LOG_NOTICE, "Failed to create SNMP notification.");
return ;
}
/* send uptime */
snprintf (csysuptime, CS_TIMESTAMP_STR_LEN, "%ld", now);
snmp_add_var (trap_pdu, sysuptime_oid, sizeof (sysuptime_oid) / sizeof (oid), 't', csysuptime);
snmp_add_var (trap_pdu, snmptrap_oid, sizeof (snmptrap_oid) / sizeof (oid), 'o', SNMP_OID_TRAPS_NODE);
/* Add extries to the trap */
add_field (trap_pdu, ASN_OCTET_STR, SNMP_OID_OBJECT_NODE_NAME, (void*)nodename, strlen (nodename));
add_field (trap_pdu, ASN_INTEGER, SNMP_OID_OBJECT_NODE_ID, (void*)&nodeid, sizeof (nodeid));
add_field (trap_pdu, ASN_OCTET_STR, SNMP_OID_OBJECT_NODE_ADDR, (void*)ip, strlen (ip));
add_field (trap_pdu, ASN_OCTET_STR, SNMP_OID_OBJECT_NODE_STATUS, (void*)state, strlen (state));
/* Send and cleanup */
ret = snmp_send (session, trap_pdu);
if (ret == 0) {
/* error */
qb_log(LOG_ERR, "Could not send SNMP trap");
snmp_free_pdu (trap_pdu);
}
}
static void
_cs_snmp_node_quorum_event(char *nodename, uint32_t nodeid,
const char *state)
{
int ret;
char csysuptime[20];
static oid snmptrap_oid[] = { 1,3,6,1,6,3,1,1,4,1,0 };
static oid sysuptime_oid[] = { 1,3,6,1,2,1,1,3,0 };
time_t now = time (NULL);
netsnmp_pdu *trap_pdu;
netsnmp_session *session = snmp_init (snmp_manager);
if (session == NULL) {
qb_log(LOG_NOTICE, "Failed to init SNMP session.");
return ;
}
trap_pdu = snmp_pdu_create (SNMP_MSG_TRAP2);
if (!trap_pdu) {
qb_log(LOG_NOTICE, "Failed to create SNMP notification.");
return ;
}
/* send uptime */
sprintf (csysuptime, "%ld", now);
snmp_add_var (trap_pdu, sysuptime_oid, sizeof (sysuptime_oid) / sizeof (oid), 't', csysuptime);
snmp_add_var (trap_pdu, snmptrap_oid, sizeof (snmptrap_oid) / sizeof (oid), 'o', SNMP_OID_TRAPS_QUORUM);
/* Add extries to the trap */
add_field (trap_pdu, ASN_OCTET_STR, SNMP_OID_OBJECT_NODE_NAME, (void*)nodename, strlen (nodename));
add_field (trap_pdu, ASN_INTEGER, SNMP_OID_OBJECT_NODE_ID, (void*)&nodeid, sizeof (nodeid));
add_field (trap_pdu, ASN_OCTET_STR, SNMP_OID_OBJECT_QUORUM, (void*)state, strlen (state));
/* Send and cleanup */
ret = snmp_send (session, trap_pdu);
if (ret == 0) {
/* error */
qb_log(LOG_ERR, "Could not send SNMP trap");
snmp_free_pdu (trap_pdu);
}
}
static void
_cs_snmp_rrp_faulty_event(char *nodename, uint32_t nodeid,
uint32_t iface_no, const char *state)
{
int ret;
char csysuptime[20];
static oid snmptrap_oid[] = { 1,3,6,1,6,3,1,1,4,1,0 };
static oid sysuptime_oid[] = { 1,3,6,1,2,1,1,3,0 };
time_t now = time (NULL);
netsnmp_pdu *trap_pdu;
netsnmp_session *session = snmp_init (snmp_manager);
if (session == NULL) {
qb_log(LOG_NOTICE, "Failed to init SNMP session.");
return ;
}
trap_pdu = snmp_pdu_create (SNMP_MSG_TRAP2);
if (!trap_pdu) {
qb_log(LOG_NOTICE, "Failed to create SNMP notification.");
return ;
}
/* send uptime */
sprintf (csysuptime, "%ld", now);
snmp_add_var (trap_pdu, sysuptime_oid, sizeof (sysuptime_oid) / sizeof (oid), 't', csysuptime);
snmp_add_var (trap_pdu, snmptrap_oid, sizeof (snmptrap_oid) / sizeof (oid), 'o', SNMP_OID_TRAPS_RRP);
/* Add extries to the trap */
add_field (trap_pdu, ASN_OCTET_STR, SNMP_OID_OBJECT_NODE_NAME, (void*)nodename, strlen (nodename));
add_field (trap_pdu, ASN_INTEGER, SNMP_OID_OBJECT_NODE_ID, (void*)&nodeid, sizeof (nodeid));
add_field (trap_pdu, ASN_INTEGER, SNMP_OID_OBJECT_RRP_IFACE_NO, (void*)&iface_no, sizeof (iface_no));
add_field (trap_pdu, ASN_OCTET_STR, SNMP_OID_OBJECT_RRP_STATUS, (void*)state, strlen (state));
/* Send and cleanup */
ret = snmp_send (session, trap_pdu);
if (ret == 0) {
/* error */
qb_log(LOG_ERR, "Could not send SNMP trap");
snmp_free_pdu (trap_pdu);
}
}
static void
_cs_snmp_init(void)
{
if (snmp_manager == NULL) {
snmp_manager = (char*)local_host;
}
notifiers[num_notifiers].node_membership_fn =
_cs_snmp_node_membership_event;
notifiers[num_notifiers].node_quorum_fn =
_cs_snmp_node_quorum_event;
notifiers[num_notifiers].application_connection_fn = NULL;
notifiers[num_notifiers].rrp_faulty_fn =
_cs_snmp_rrp_faulty_event;
num_notifiers++;
}
#endif /* ENABLE_SNMP */
static void
_cs_syslog_node_membership_event(char *nodename, uint32_t nodeid, char *state, char* ip)
{
qb_log(LOG_NOTICE, "%s[%d] ip:%s %s", nodename, nodeid, ip, state);
}
static void
_cs_syslog_node_quorum_event(char *nodename, uint32_t nodeid, const char *state)
{
if (strcmp(state, "quorate") == 0) {
qb_log(LOG_NOTICE, "%s[%d] is now %s", nodename, nodeid, state);
} else {
qb_log(LOG_NOTICE, "%s[%d] has lost quorum", nodename, nodeid);
}
}
static void
_cs_syslog_application_connection_event(char *nodename, uint32_t nodeid, char* app_name, const char *state)
{
if (strcmp(state, "connected") == 0) {
qb_log(LOG_NOTICE, "%s[%d] %s is now %s to corosync", nodename, nodeid, app_name, state);
} else {
qb_log(LOG_NOTICE, "%s[%d] %s is now %s from corosync", nodename, nodeid, app_name, state);
}
}
static void
_cs_syslog_rrp_faulty_event(char *nodename, uint32_t nodeid, uint32_t iface_no, const char *state)
{
qb_log(LOG_NOTICE, "%s[%d] interface %u is now %s", nodename, nodeid, iface_no, state);
}
static void
_cs_node_membership_event(char *nodename, uint32_t nodeid, char *state, char* ip)
{
int i;
for (i = 0; i < num_notifiers; i++) {
if (notifiers[i].node_membership_fn) {
notifiers[i].node_membership_fn(nodename, nodeid, state, ip);
}
}
}
static void
_cs_local_node_info_get(char **nodename, uint32_t *nodeid)
{
cs_error_t rc;
corosync_cfg_handle_t cfg_handle;
if (local_nodeid == 0) {
rc = corosync_cfg_initialize(&cfg_handle, NULL);
if (rc != CS_OK) {
syslog (LOG_ERR, "Failed to initialize the cfg API. Error %d\n", rc);
exit (EXIT_FAILURE);
}
rc = corosync_cfg_local_get (cfg_handle, &local_nodeid);
corosync_cfg_finalize(cfg_handle);
if (rc != CS_OK) {
local_nodeid = 0;
strncpy(local_nodename, "localhost", sizeof (local_nodename));
local_nodename[sizeof (local_nodename) - 1] = '\0';
} else {
gethostname(local_nodename, CS_MAX_NAME_LENGTH);
}
}
*nodeid = local_nodeid;
*nodename = local_nodename;
}
static void
_cs_node_quorum_event(const char *state)
{
int i;
char *nodename;
uint32_t nodeid;
_cs_local_node_info_get(&nodename, &nodeid);
for (i = 0; i < num_notifiers; i++) {
if (notifiers[i].node_quorum_fn) {
notifiers[i].node_quorum_fn(nodename, nodeid, state);
}
}
}
static void
_cs_application_connection_event(char *app_name, const char *state)
{
int i;
char *nodename;
uint32_t nodeid;
_cs_local_node_info_get(&nodename, &nodeid);
for (i = 0; i < num_notifiers; i++) {
if (notifiers[i].application_connection_fn) {
notifiers[i].application_connection_fn(nodename, nodeid, app_name, state);
}
}
}
static void
_cs_rrp_faulty_event(uint32_t iface_no, const char *state)
{
int i;
char *nodename;
uint32_t nodeid;
_cs_local_node_info_get(&nodename, &nodeid);
for (i = 0; i < num_notifiers; i++) {
if (notifiers[i].rrp_faulty_fn) {
notifiers[i].rrp_faulty_fn(nodename, nodeid, iface_no, state);
}
}
}
static int32_t
sig_exit_handler(int32_t num, void *data)
{
qb_loop_stop(main_loop);
return 0;
}
static void
_cs_cmap_init(void)
{
cs_error_t rc;
int cmap_fd = 0;
cmap_track_handle_t track_handle;
rc = cmap_initialize (&cmap_handle);
if (rc != CS_OK) {
qb_log(LOG_ERR, "Failed to initialize the cmap API. Error %d", rc);
exit (EXIT_FAILURE);
}
cmap_fd_get(cmap_handle, &cmap_fd);
qb_loop_poll_add(main_loop, QB_LOOP_MED, cmap_fd, POLLIN|POLLNVAL, NULL,
_cs_cmap_dispatch);
rc = cmap_track_add(cmap_handle, "runtime.connections.",
CMAP_TRACK_ADD | CMAP_TRACK_DELETE | CMAP_TRACK_PREFIX,
_cs_cmap_connections_key_changed,
NULL,
&track_handle);
if (rc != CS_OK) {
qb_log(LOG_ERR,
"Failed to track the connections key. Error %d", rc);
exit (EXIT_FAILURE);
}
rc = cmap_track_add(cmap_handle, "runtime.totem.pg.mrp.srp.members.",
CMAP_TRACK_ADD | CMAP_TRACK_MODIFY | CMAP_TRACK_PREFIX,
_cs_cmap_members_key_changed,
NULL,
&track_handle);
if (rc != CS_OK) {
qb_log(LOG_ERR,
"Failed to track the members key. Error %d", rc);
exit (EXIT_FAILURE);
}
rc = cmap_track_add(cmap_handle, "runtime.totem.pg.mrp.rrp.",
CMAP_TRACK_ADD | CMAP_TRACK_MODIFY | CMAP_TRACK_PREFIX,
_cs_cmap_rrp_faulty_key_changed,
NULL,
&track_handle);
if (rc != CS_OK) {
qb_log(LOG_ERR,
"Failed to track the rrp key. Error %d", rc);
exit (EXIT_FAILURE);
}
}
static void
_cs_cmap_finalize(void)
{
cmap_finalize (cmap_handle);
}
static void
_cs_check_config(void)
{
if (conf[CS_NTF_LOG] == QB_FALSE &&
conf[CS_NTF_STDOUT] == QB_FALSE &&
conf[CS_NTF_SNMP] == QB_FALSE &&
conf[CS_NTF_DBUS] == QB_FALSE) {
qb_log(LOG_ERR, "no event type enabled, see corosync-notifyd -h, exiting.");
exit(EXIT_FAILURE);
}
#ifndef ENABLE_SNMP
if (conf[CS_NTF_SNMP]) {
qb_log(LOG_ERR, "Not compiled with SNMP support enabled, exiting.");
exit(EXIT_FAILURE);
}
#endif
#ifndef HAVE_DBUS
if (conf[CS_NTF_DBUS]) {
qb_log(LOG_ERR, "Not compiled with DBus support enabled, exiting.");
exit(EXIT_FAILURE);
}
#endif
if (conf[CS_NTF_STDOUT] && !conf[CS_NTF_FG]) {
qb_log(LOG_ERR, "configured to print to stdout and run in the background, exiting");
exit(EXIT_FAILURE);
}
if (conf[CS_NTF_SNMP] && conf[CS_NTF_DBUS]) {
qb_log(LOG_ERR, "configured to send snmp traps and dbus signals - are you sure?.");
}
}
static void
_cs_usage(void)
{
fprintf(stderr, "usage:\n"\
" -f : Start application in foreground.\n"\
" -l : Log all events.\n"\
" -o : Print events to stdout (turns on -l).\n"\
" -s : Send SNMP traps on all events.\n"\
" -m : Set the SNMP Manager IP address (defaults to localhost).\n"\
" -d : Send DBUS signals on all events.\n"\
" -h : Print this help.\n\n");
}
int
main(int argc, char *argv[])
{
int ch;
conf[CS_NTF_FG] = QB_FALSE;
conf[CS_NTF_LOG] = QB_FALSE;
conf[CS_NTF_STDOUT] = QB_FALSE;
conf[CS_NTF_SNMP] = QB_FALSE;
conf[CS_NTF_DBUS] = QB_FALSE;
while ((ch = getopt (argc, argv, "floshdm:")) != EOF) {
switch (ch) {
case 'f':
conf[CS_NTF_FG] = QB_TRUE;
break;
case 'l':
conf[CS_NTF_LOG] = QB_TRUE;
break;
case 'm':
conf[CS_NTF_SNMP] = QB_TRUE;
strncpy(snmp_manager_buf, optarg, sizeof (snmp_manager_buf));
snmp_manager_buf[sizeof (snmp_manager_buf) - 1] = '\0';
snmp_manager = snmp_manager_buf;
break;
case 'o':
conf[CS_NTF_LOG] = QB_TRUE;
conf[CS_NTF_STDOUT] = QB_TRUE;
break;
case 's':
conf[CS_NTF_SNMP] = QB_TRUE;
break;
case 'd':
conf[CS_NTF_DBUS] = QB_TRUE;
break;
case 'h':
default:
_cs_usage();
return EXIT_FAILURE;
}
}
qb_log_init("notifyd", LOG_DAEMON, LOG_INFO);
if (conf[CS_NTF_STDOUT]) {
qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD,
QB_LOG_FILTER_FILE, "*", LOG_DEBUG);
qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, conf[CS_NTF_STDOUT]);
}
_cs_check_config();
if (!conf[CS_NTF_FG]) {
if (daemon(0, 0) < 0)
{
perror("daemon() failed");
return EXIT_FAILURE;
}
}
num_notifiers = 0;
if (conf[CS_NTF_LOG]) {
notifiers[num_notifiers].node_membership_fn =
_cs_syslog_node_membership_event;
notifiers[num_notifiers].node_quorum_fn =
_cs_syslog_node_quorum_event;
notifiers[num_notifiers].application_connection_fn =
_cs_syslog_application_connection_event;
notifiers[num_notifiers].rrp_faulty_fn =
_cs_syslog_rrp_faulty_event;
num_notifiers++;
}
main_loop = qb_loop_create();
_cs_cmap_init();
_cs_quorum_init();
#ifdef HAVE_DBUS
if (conf[CS_NTF_DBUS]) {
_cs_dbus_init();
}
#endif /* HAVE_DBUS */
#ifdef ENABLE_SNMP
if (conf[CS_NTF_SNMP]) {
_cs_snmp_init();
}
#endif /* ENABLE_SNMP */
qb_loop_signal_add(main_loop,
QB_LOOP_HIGH,
SIGINT,
NULL,
sig_exit_handler,
NULL);
qb_loop_signal_add(main_loop,
QB_LOOP_HIGH,
SIGQUIT,
NULL,
sig_exit_handler,
NULL);
qb_loop_signal_add(main_loop,
QB_LOOP_HIGH,
SIGTERM,
NULL,
sig_exit_handler,
NULL);
qb_loop_run(main_loop);
#ifdef HAVE_DBUS
if (conf[CS_NTF_DBUS]) {
_cs_dbus_release();
}
#endif /* HAVE_DBUS */
_cs_quorum_finalize();
_cs_cmap_finalize();
return 0;
}

File Metadata

Mime Type
text/x-diff
Expires
Sat, Nov 23, 3:27 AM (46 m, 11 s)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1018116
Default Alt Text
(245 KB)

Event Timeline