Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F1841358
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
245 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/cts/agents/common_test_agent.c b/cts/agents/common_test_agent.c
index 67d76c17..e395c316 100644
--- a/cts/agents/common_test_agent.c
+++ b/cts/agents/common_test_agent.c
@@ -1,333 +1,332 @@
/*
* Copyright (c) 2010 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Angus Salkeld (asalkeld@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <errno.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <poll.h>
-#include <unistd.h>
#include <fcntl.h>
#include "common_test_agent.h"
#define MAX_CLIENTS 64
static char big_and_buf_rx[HOW_BIG_AND_BUF];
ta_do_command_fn do_command;
static qb_loop_t *poll_handle;
static pre_exit_fn pre_exit = NULL;
static int client_fds[MAX_CLIENTS];
static int client_fds_pos = 0;
qb_loop_t *ta_poll_handle_get(void)
{
return poll_handle;
}
static void shut_me_down(void)
{
if (pre_exit) {
pre_exit();
}
qb_loop_stop(poll_handle);
}
static void ta_handle_command (int sock, char* msg)
{
int num_args;
char *saveptr = NULL;
char *str = strdup (msg);
char *str_len;
char *str_arg;
char *args[5];
int i = 0;
int a = 0;
char* func = NULL;
qb_log(LOG_DEBUG,"(MSG:%s)", msg);
str_len = strtok_r (str, ":", &saveptr);
assert (str_len);
num_args = atoi (str_len) * 2;
for (i = 0; i < num_args / 2; i++) {
str_len = strtok_r (NULL, ":", &saveptr);
str_arg = strtok_r (NULL, ":", &saveptr);
if (func == NULL) {
/* first "arg" is the function */
qb_log (LOG_TRACE, "(LEN:%s, FUNC:%s)", str_len, str_arg);
func = str_arg;
a = 0;
} else {
args[a] = str_arg;
a++;
qb_log (LOG_TRACE, "(LEN:%s, ARG:%s)", str_len, str_arg);
}
}
do_command (sock, func, args, a+1);
free (str);
}
static int server_process_data_fn (
int fd,
int revents,
void *data)
{
char *saveptr;
char *msg;
char *cmd;
int32_t nbytes;
if (revents & POLLHUP || revents & POLLERR) {
qb_log (LOG_INFO, "command sockect got POLLHUP exiting...");
shut_me_down();
return -1;
}
if ((nbytes = recv (fd, big_and_buf_rx, sizeof (big_and_buf_rx), 0)) <= 0) {
/* got error or connection closed by client */
if (nbytes == 0) {
/* connection closed */
qb_log (LOG_WARNING, "socket %d hung up: exiting...", fd);
} else {
qb_perror(LOG_ERR, "recv() failed");
}
shut_me_down();
return -1;
} else {
big_and_buf_rx[nbytes] = '\0';
msg = strtok_r (big_and_buf_rx, ";", &saveptr);
assert (msg);
while (msg) {
cmd = strdup (msg);
ta_handle_command (fd, cmd);
free (cmd);
msg = strtok_r (NULL, ";", &saveptr);
}
}
return 0;
}
static int server_accept_fn (
int fd, int revents, void *data)
{
socklen_t addrlen;
struct sockaddr_in in_addr;
int new_fd;
int res;
if (revents & POLLHUP || revents & POLLERR) {
qb_log (LOG_INFO, "command sockect got POLLHUP exiting...");
shut_me_down();
return -1;
}
addrlen = sizeof (struct sockaddr_in);
retry_accept:
new_fd = accept (fd, (struct sockaddr *)&in_addr, &addrlen);
if (new_fd == -1 && errno == EINTR) {
goto retry_accept;
}
if (new_fd == -1) {
qb_log (LOG_ERR,
"Could not accept connection: %s", strerror (errno));
return (0); /* This is an error, but -1 would indicate disconnect from poll loop */
}
res = fcntl (new_fd, F_SETFL, O_NONBLOCK);
if (res == -1) {
qb_log (LOG_ERR,
"Could not set non-blocking operation on connection: %s",
strerror (errno));
close (new_fd);
return (0); /* This is an error, but -1 would indicate disconnect from poll loop */
}
client_fds[client_fds_pos] = new_fd;
client_fds_pos++;
assert(client_fds_pos < MAX_CLIENTS);
qb_loop_poll_add (poll_handle,
QB_LOOP_MED,
new_fd,
POLLIN|POLLNVAL,
NULL,
server_process_data_fn);
return 0;
}
static int create_server_sockect (int server_port)
{
int listener;
int yes = 1;
int rv;
struct addrinfo hints, *ai, *p;
char server_port_str[16];
char addr_str[INET_ADDRSTRLEN];
void *ptr = NULL;
/* get a socket and bind it
*/
sprintf(server_port_str, "%d", server_port);
memset (&hints, 0, sizeof hints);
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE;
if ((rv = getaddrinfo (NULL, server_port_str, &hints, &ai)) != 0) {
qb_log (LOG_ERR, "%s", gai_strerror (rv));
exit (1);
}
for (p = ai; p != NULL; p = p->ai_next) {
listener = socket (p->ai_family, p->ai_socktype, p->ai_protocol);
if (listener < 0) {
continue;
}
/* lose the pesky "address already in use" error message
*/
if (setsockopt (listener, SOL_SOCKET, SO_REUSEADDR,
&yes, sizeof(int)) < 0) {
qb_log (LOG_ERR, "setsockopt() failed: %s", strerror (errno));
}
switch (p->ai_family)
{
case AF_INET:
ptr = &((struct sockaddr_in *) p->ai_addr)->sin_addr;
break;
case AF_INET6:
ptr = &((struct sockaddr_in6 *) p->ai_addr)->sin6_addr;
break;
default:
qb_log (LOG_ERR, "address family wrong");
exit (4);
}
if (inet_ntop(p->ai_family, ptr, addr_str, INET_ADDRSTRLEN) == NULL) {
qb_log (LOG_ERR, "inet_ntop() failed: %s", strerror (errno));
}
if (bind (listener, p->ai_addr, p->ai_addrlen) < 0) {
qb_log (LOG_ERR, "bind(%s) failed: %s", addr_str, strerror (errno));
close (listener);
continue;
}
break;
}
if (p == NULL) {
qb_log (LOG_ERR, "failed to bind");
exit (2);
}
freeaddrinfo (ai);
if (listen (listener, 10) == -1) {
qb_log (LOG_ERR, "listen() failed: %s", strerror(errno));
exit (3);
}
return listener;
}
static int32_t sig_exit_handler (int num, void *data)
{
qb_log (LOG_INFO, "got signal %d, exiting", num);
shut_me_down();
return 0;
}
int
test_agent_run(const char * prog_name, int server_port,
ta_do_command_fn func, pre_exit_fn exit_fn)
{
int listener;
int i;
qb_log_init(prog_name, LOG_DAEMON, LOG_DEBUG);
qb_log_format_set(QB_LOG_SYSLOG, "%n() [%p] %b");
qb_log (LOG_INFO, "STARTING");
do_command = func;
pre_exit = exit_fn;
poll_handle = qb_loop_create ();
if (exit_fn) {
qb_loop_signal_add(poll_handle, QB_LOOP_HIGH,
SIGINT, NULL, sig_exit_handler, NULL);
qb_loop_signal_add(poll_handle, QB_LOOP_HIGH,
SIGQUIT, NULL, sig_exit_handler, NULL);
qb_loop_signal_add(poll_handle, QB_LOOP_HIGH,
SIGTERM, NULL, sig_exit_handler, NULL);
}
listener = create_server_sockect (server_port);
qb_loop_poll_add (poll_handle,
QB_LOOP_MED,
listener,
POLLIN|POLLNVAL,
NULL, server_accept_fn);
qb_loop_run (poll_handle);
close(listener);
for (i = 0; i < client_fds_pos; i++) {
close(client_fds[client_fds_pos]);
}
qb_log (LOG_INFO, "EXITING");
qb_log_fini();
return 0;
}
diff --git a/cts/agents/cpg_test_agent.c b/cts/agents/cpg_test_agent.c
index a52f4f0a..0837c69c 100644
--- a/cts/agents/cpg_test_agent.c
+++ b/cts/agents/cpg_test_agent.c
@@ -1,804 +1,803 @@
/*
* Copyright (c) 2010 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Angus Salkeld (asalkeld@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <errno.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/uio.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <poll.h>
-#include <unistd.h>
#include <fcntl.h>
#include <corosync/list.h>
#include <qb/qbdefs.h>
#include <qb/qbutil.h>
#include <qb/qbloop.h>
#include <qb/qblog.h>
#include <corosync/cpg.h>
#include <corosync/cfg.h>
#include "common_test_agent.h"
#include <nss.h>
#include <pk11pub.h>
typedef enum {
MSG_OK,
MSG_NODEID_ERR,
MSG_PID_ERR,
MSG_SEQ_ERR,
MSG_SIZE_ERR,
MSG_SHA1_ERR,
} msg_status_t;
typedef struct {
uint32_t nodeid;
pid_t pid;
unsigned char sha1[20];
uint32_t seq;
size_t size;
unsigned char payload[0];
} msg_t;
#define LOG_STR_SIZE 80
typedef struct {
char log[LOG_STR_SIZE];
struct list_head list;
} log_entry_t;
static char big_and_buf[HOW_BIG_AND_BUF];
static int32_t record_config_events_g = 0;
static int32_t record_messages_g = 0;
static cpg_handle_t cpg_handle = 0;
static corosync_cfg_handle_t cfg_handle = 0;
static int32_t cpg_fd = -1;
static int32_t cfg_fd = -1;
static struct list_head config_chg_log_head;
static struct list_head msg_log_head;
static pid_t my_pid;
static uint32_t my_nodeid;
static int32_t my_seq;
static int32_t use_zcb = QB_FALSE;
static int32_t my_msgs_to_send;
static int32_t my_msgs_sent;
static int32_t total_stored_msgs = 0;
static int32_t total_msgs_revd = 0;
static int32_t in_cnchg = 0;
static int32_t pcmk_test = 0;
PK11Context* sha1_context;
static void send_some_more_messages (void * unused);
static char*
err_status_string (char * buf, size_t buf_len, msg_status_t status)
{
switch (status) {
case MSG_OK:
strncpy (buf, "OK", buf_len);
break;
case MSG_NODEID_ERR:
strncpy (buf, "NODEID_ERR", buf_len);
break;
case MSG_PID_ERR:
strncpy (buf, "PID_ERR", buf_len);
break;
case MSG_SEQ_ERR:
strncpy (buf, "SEQ_ERR", buf_len);
break;
case MSG_SIZE_ERR:
strncpy (buf, "SIZE_ERR", buf_len);
break;
case MSG_SHA1_ERR:
strncpy (buf, "SHA1_ERR", buf_len);
break;
default:
strncpy (buf, "UNKNOWN_ERR", buf_len);
break;
}
if (buf_len > 0) {
buf[buf_len - 1] = '\0';
}
return buf;
}
static void delivery_callback (
cpg_handle_t handle,
const struct cpg_name *groupName,
uint32_t nodeid,
uint32_t pid,
void *msg,
size_t msg_len)
{
log_entry_t *log_pt;
msg_t *msg_pt = (msg_t*)msg;
msg_status_t status = MSG_OK;
char status_buf[20];
unsigned char sha1_compare[20];
unsigned int sha1_len;
if (record_messages_g == 0) {
return;
}
if (nodeid != msg_pt->nodeid) {
status = MSG_NODEID_ERR;
}
if (pid != msg_pt->pid) {
status = MSG_PID_ERR;
}
if (msg_len != msg_pt->size) {
status = MSG_SIZE_ERR;
}
PK11_DigestBegin(sha1_context);
PK11_DigestOp(sha1_context, msg_pt->payload, (msg_pt->size - sizeof (msg_t)));
PK11_DigestFinal(sha1_context, sha1_compare, &sha1_len, sizeof(sha1_compare));
if (memcmp (sha1_compare, msg_pt->sha1, 20) != 0) {
qb_log (LOG_ERR, "msg seq:%d; incorrect hash",
msg_pt->seq);
status = MSG_SHA1_ERR;
}
log_pt = malloc (sizeof(log_entry_t));
list_init (&log_pt->list);
snprintf (log_pt->log, LOG_STR_SIZE, "%u:%d:%d:%s;",
msg_pt->nodeid, msg_pt->seq, my_seq,
err_status_string (status_buf, 20, status));
list_add_tail (&log_pt->list, &msg_log_head);
total_stored_msgs++;
total_msgs_revd++;
my_seq++;
if ((total_msgs_revd % 1000) == 0) {
qb_log (LOG_INFO, "rx %d", total_msgs_revd);
}
}
static void config_change_callback (
cpg_handle_t handle,
const struct cpg_name *groupName,
const struct cpg_address *member_list, size_t member_list_entries,
const struct cpg_address *left_list, size_t left_list_entries,
const struct cpg_address *joined_list, size_t joined_list_entries)
{
int i;
log_entry_t *log_pt;
/* group_name,ip,pid,join|leave */
if (record_config_events_g > 0) {
qb_log (LOG_INFO, "got cpg event[recording] for group %s", groupName->value);
} else {
qb_log (LOG_INFO, "got cpg event[ignoring] for group %s", groupName->value);
}
for (i = 0; i < left_list_entries; i++) {
if (record_config_events_g > 0) {
log_pt = malloc (sizeof(log_entry_t));
list_init (&log_pt->list);
snprintf (log_pt->log, LOG_STR_SIZE, "%s,%u,%u,left",
groupName->value, left_list[i].nodeid,left_list[i].pid);
list_add_tail(&log_pt->list, &config_chg_log_head);
qb_log (LOG_INFO, "cpg event %s", log_pt->log);
}
}
for (i = 0; i < joined_list_entries; i++) {
if (record_config_events_g > 0) {
log_pt = malloc (sizeof(log_entry_t));
list_init (&log_pt->list);
snprintf (log_pt->log, LOG_STR_SIZE, "%s,%u,%u,join",
groupName->value, joined_list[i].nodeid,joined_list[i].pid);
list_add_tail (&log_pt->list, &config_chg_log_head);
qb_log (LOG_INFO, "cpg event %s", log_pt->log);
}
}
if (pcmk_test == 1) {
in_cnchg = 1;
send_some_more_messages (NULL);
in_cnchg = 0;
}
}
static void my_shutdown_callback (corosync_cfg_handle_t handle,
corosync_cfg_shutdown_flags_t flags)
{
qb_log (LOG_CRIT, "flags:%d", flags);
if (flags == COROSYNC_CFG_SHUTDOWN_FLAG_REQUEST) {
corosync_cfg_replyto_shutdown (cfg_handle, COROSYNC_CFG_SHUTDOWN_FLAG_YES);
}
}
static corosync_cfg_callbacks_t cfg_callbacks = {
.corosync_cfg_shutdown_callback = my_shutdown_callback,
};
static cpg_callbacks_t callbacks = {
.cpg_deliver_fn = delivery_callback,
.cpg_confchg_fn = config_change_callback,
};
static void record_messages (void)
{
record_messages_g = 1;
qb_log (LOG_INFO, "record:%d", record_messages_g);
}
static void record_config_events (int sock)
{
char response[100];
ssize_t rc;
size_t send_len;
record_config_events_g = 1;
qb_log (LOG_INFO, "record:%d", record_config_events_g);
snprintf (response, 100, "%s", OK_STR);
send_len = strlen (response);
rc = send (sock, response, send_len, 0);
assert(rc == send_len);
}
static void read_config_event (int sock)
{
const char *empty = "None";
struct list_head * list = config_chg_log_head.next;
log_entry_t *entry;
ssize_t rc;
size_t send_len;
if (list != &config_chg_log_head) {
entry = list_entry (list, log_entry_t, list);
send_len = strlen (entry->log);
rc = send (sock, entry->log, send_len, 0);
list_del (&entry->list);
free (entry);
} else {
qb_log (LOG_DEBUG, "no events in list");
send_len = strlen (empty);
rc = send (sock, empty, send_len, 0);
}
assert(rc == send_len);
}
static void read_messages (int sock, char* atmost_str)
{
struct list_head * list;
log_entry_t *entry;
int atmost = atoi (atmost_str);
int packed = 0;
ssize_t rc;
if (atmost == 0)
atmost = 1;
if (atmost > (HOW_BIG_AND_BUF / LOG_STR_SIZE))
atmost = (HOW_BIG_AND_BUF / LOG_STR_SIZE);
big_and_buf[0] = '\0';
for (list = msg_log_head.next;
(!list_empty (&msg_log_head) && packed < atmost); ) {
entry = list_entry (list, log_entry_t, list);
strcat (big_and_buf, entry->log);
packed++;
list = list->next;
list_del (&entry->list);
free (entry);
total_stored_msgs--;
}
if (packed == 0) {
strcpy (big_and_buf, "None");
} else {
if ((total_stored_msgs % 1000) == 0) {
qb_log(LOG_INFO, "sending %d; total_stored_msgs:%d; len:%d",
packed, total_stored_msgs, (int)strlen (big_and_buf));
}
}
rc = send (sock, big_and_buf, strlen (big_and_buf), 0);
assert(rc == strlen (big_and_buf));
}
static qb_loop_timer_handle more_messages_timer_handle;
static void send_some_more_messages_later (void)
{
cpg_dispatch (cpg_handle, CS_DISPATCH_ALL);
qb_loop_timer_add (
ta_poll_handle_get(),
QB_LOOP_MED,
300*QB_TIME_NS_IN_MSEC, NULL,
send_some_more_messages,
&more_messages_timer_handle);
}
static void send_some_more_messages_zcb (void)
{
msg_t *my_msg;
int i;
int send_now;
size_t payload_size;
size_t total_size;
unsigned int sha1_len;
cs_error_t res;
cpg_flow_control_state_t fc_state;
void *zcb_buffer;
if (cpg_fd < 0)
return;
send_now = my_msgs_to_send;
payload_size = (rand() % 100000);
total_size = payload_size + sizeof (msg_t);
cpg_zcb_alloc (cpg_handle, total_size, &zcb_buffer);
my_msg = (msg_t*)zcb_buffer;
qb_log(LOG_DEBUG, "send_now:%d", send_now);
my_msg->pid = my_pid;
my_msg->nodeid = my_nodeid;
my_msg->size = sizeof (msg_t) + payload_size;
my_msg->seq = my_msgs_sent;
for (i = 0; i < payload_size; i++) {
my_msg->payload[i] = i;
}
PK11_DigestBegin(sha1_context);
PK11_DigestOp(sha1_context, my_msg->payload, payload_size);
PK11_DigestFinal(sha1_context, my_msg->sha1, &sha1_len, sizeof(my_msg->sha1));
for (i = 0; i < send_now; i++) {
res = cpg_flow_control_state_get (cpg_handle, &fc_state);
if (res == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) {
/* lets do this later */
send_some_more_messages_later ();
qb_log (LOG_INFO, "flow control enabled.");
goto free_buffer;
}
res = cpg_zcb_mcast_joined (cpg_handle, CPG_TYPE_AGREED, zcb_buffer, total_size);
if (res == CS_ERR_TRY_AGAIN) {
/* lets do this later */
send_some_more_messages_later ();
goto free_buffer;
} else if (res != CS_OK) {
qb_log (LOG_ERR, "cpg_mcast_joined error:%d, exiting.",
res);
exit (-2);
}
my_msgs_sent++;
my_msgs_to_send--;
}
free_buffer:
cpg_zcb_free (cpg_handle, zcb_buffer);
}
#define cs_repeat(counter, max, code) do { \
code; \
if (res == CS_ERR_TRY_AGAIN) { \
counter++; \
sleep(counter); \
} \
} while (res == CS_ERR_TRY_AGAIN && counter < max)
static unsigned char buffer[200000];
static void send_some_more_messages_normal (void)
{
msg_t my_msg;
struct iovec iov[2];
int i;
int send_now;
size_t payload_size;
cs_error_t res;
cpg_flow_control_state_t fc_state;
int retries = 0;
time_t before;
unsigned int sha1_len;
if (cpg_fd < 0)
return;
send_now = my_msgs_to_send;
qb_log (LOG_TRACE, "send_now:%d", send_now);
my_msg.pid = my_pid;
my_msg.nodeid = my_nodeid;
payload_size = (rand() % 10000);
my_msg.size = sizeof (msg_t) + payload_size;
my_msg.seq = my_msgs_sent;
for (i = 0; i < payload_size; i++) {
buffer[i] = i;
}
PK11_DigestBegin(sha1_context);
PK11_DigestOp(sha1_context, buffer, payload_size);
PK11_DigestFinal(sha1_context, my_msg.sha1, &sha1_len, sizeof(my_msg.sha1));
iov[0].iov_len = sizeof (msg_t);
iov[0].iov_base = &my_msg;
iov[1].iov_len = payload_size;
iov[1].iov_base = buffer;
for (i = 0; i < send_now; i++) {
if (in_cnchg && pcmk_test) {
retries = 0;
before = time(NULL);
cs_repeat(retries, 30, res = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, iov, 2));
if (retries > 20) {
qb_log (LOG_ERR, "cs_repeat: blocked for :%lu secs.",
(unsigned long)(time(NULL) - before));
}
if (res != CS_OK) {
qb_log (LOG_ERR, "cpg_mcast_joined error:%d.",
res);
return;
}
} else {
res = cpg_flow_control_state_get (cpg_handle, &fc_state);
if (res == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) {
/* lets do this later */
send_some_more_messages_later ();
qb_log (LOG_INFO, "flow control enabled.");
return;
}
res = cpg_mcast_joined (cpg_handle, CPG_TYPE_AGREED, iov, 2);
if (res == CS_ERR_TRY_AGAIN) {
/* lets do this later */
send_some_more_messages_later ();
if (i > 0) {
qb_log (LOG_INFO, "TRY_AGAIN %d to send.",
my_msgs_to_send);
}
return;
} else if (res != CS_OK) {
qb_log (LOG_ERR, "cpg_mcast_joined error:%d, exiting.",
res);
exit (-2);
}
}
my_msgs_sent++;
my_msg.seq = my_msgs_sent;
my_msgs_to_send--;
}
qb_log (LOG_TRACE, "sent %d; to send %d.",
my_msgs_sent, my_msgs_to_send);
}
static void send_some_more_messages (void * unused)
{
if (use_zcb) {
send_some_more_messages_zcb ();
} else {
send_some_more_messages_normal ();
}
}
static void msg_blaster (int sock, char* num_to_send_str)
{
my_msgs_to_send = atoi (num_to_send_str);
my_msgs_sent = 0;
my_seq = 1;
my_pid = getpid();
use_zcb = QB_FALSE;
total_stored_msgs = 0;
cpg_local_get (cpg_handle, &my_nodeid);
/* control the limits */
if (my_msgs_to_send <= 0)
my_msgs_to_send = 1;
if (my_msgs_to_send > 10000)
my_msgs_to_send = 10000;
send_some_more_messages_normal ();
}
static void context_test (int sock)
{
char response[100];
char *cmp;
ssize_t rc;
size_t send_len;
cpg_context_set (cpg_handle, response);
cpg_context_get (cpg_handle, (void**)&cmp);
if (response != cmp) {
snprintf (response, 100, "%s", FAIL_STR);
}
else {
snprintf (response, 100, "%s", OK_STR);
}
send_len = strlen (response);
rc = send (sock, response, send_len, 0);
assert(rc == send_len);
}
static void msg_blaster_zcb (int sock, char* num_to_send_str)
{
my_msgs_to_send = atoi (num_to_send_str);
my_seq = 1;
my_pid = getpid();
use_zcb = QB_TRUE;
total_stored_msgs = 0;
cpg_local_get (cpg_handle, &my_nodeid);
/* control the limits */
if (my_msgs_to_send <= 0)
my_msgs_to_send = 1;
if (my_msgs_to_send > 10000)
my_msgs_to_send = 10000;
send_some_more_messages_zcb ();
}
static int cfg_dispatch_wrapper_fn (
int fd,
int revents,
void *data)
{
cs_error_t error;
if (revents & POLLHUP || revents & POLLERR) {
qb_log (LOG_ERR, "got POLLHUP disconnecting from CFG");
corosync_cfg_finalize(cfg_handle);
cfg_handle = 0;
return -1;
}
error = corosync_cfg_dispatch (cfg_handle, CS_DISPATCH_ALL);
if (error == CS_ERR_LIBRARY) {
qb_log (LOG_ERR, "got LIB error disconnecting from CFG.");
corosync_cfg_finalize(cfg_handle);
cfg_handle = 0;
return -1;
}
return 0;
}
static int cpg_dispatch_wrapper_fn (
int fd,
int revents,
void *data)
{
cs_error_t error;
if (revents & POLLHUP || revents & POLLERR) {
qb_log (LOG_ERR, "got POLLHUP disconnecting from CPG");
cpg_finalize(cpg_handle);
cpg_handle = 0;
return -1;
}
error = cpg_dispatch (cpg_handle, CS_DISPATCH_ALL);
if (error == CS_ERR_LIBRARY) {
qb_log (LOG_ERR, "got LIB error disconnecting from CPG");
cpg_finalize(cpg_handle);
cpg_handle = 0;
return -1;
}
return 0;
}
static void do_command (int sock, char* func, char*args[], int num_args)
{
int result;
char response[100];
struct cpg_name group_name;
ssize_t rc;
size_t send_len;
qb_log (LOG_TRACE, "RPC:%s() called.", func);
if (strcmp ("cpg_mcast_joined",func) == 0) {
struct iovec iov[5];
int a;
for (a = 0; a < num_args; a++) {
iov[a].iov_base = args[a];
iov[a].iov_len = strlen(args[a])+1;
}
cpg_mcast_joined (cpg_handle, CPG_TYPE_AGREED, iov, num_args);
} else if (strcmp ("cpg_join",func) == 0) {
if (strlen(args[0]) >= CPG_MAX_NAME_LENGTH) {
qb_log (LOG_ERR, "Invalid group name");
exit (1);
}
strcpy (group_name.value, args[0]);
group_name.length = strlen(args[0]);
result = cpg_join (cpg_handle, &group_name);
if (result != CS_OK) {
qb_log (LOG_ERR,
"Could not join process group, error %d", result);
exit (1);
}
qb_log (LOG_INFO, "called cpg_join(%s)!", group_name.value);
} else if (strcmp ("cpg_leave",func) == 0) {
strcpy (group_name.value, args[0]);
group_name.length = strlen(args[0]);
result = cpg_leave (cpg_handle, &group_name);
if (result != CS_OK) {
qb_log (LOG_ERR,
"Could not leave process group, error %d", result);
exit (1);
}
qb_log (LOG_INFO, "called cpg_leave(%s)!", group_name.value);
} else if (strcmp ("cpg_initialize",func) == 0) {
int retry_count = 0;
result = cpg_initialize (&cpg_handle, &callbacks);
while (result != CS_OK) {
qb_log (LOG_ERR,
"cpg_initialize error %d (attempt %d)",
result, retry_count);
if (retry_count >= 3) {
exit (1);
}
sleep(1);
retry_count++;
result = cpg_initialize (&cpg_handle, &callbacks);
}
cpg_fd_get (cpg_handle, &cpg_fd);
qb_loop_poll_add (ta_poll_handle_get(),
QB_LOOP_MED,
cpg_fd,
POLLIN|POLLNVAL,
NULL,
cpg_dispatch_wrapper_fn);
} else if (strcmp ("cpg_local_get", func) == 0) {
unsigned int local_nodeid;
cpg_local_get (cpg_handle, &local_nodeid);
snprintf (response, 100, "%u",local_nodeid);
send_len = strlen (response);
rc = send (sock, response, send_len, 0);
assert(rc == send_len);
} else if (strcmp ("cpg_finalize", func) == 0) {
if (cpg_handle > 0) {
cpg_finalize (cpg_handle);
cpg_handle = 0;
}
} else if (strcmp ("record_config_events", func) == 0) {
record_config_events (sock);
} else if (strcmp ("record_messages", func) == 0) {
record_messages ();
} else if (strcmp ("read_config_event", func) == 0) {
read_config_event (sock);
} else if (strcmp ("read_messages", func) == 0) {
read_messages (sock, args[0]);
} else if (strcmp ("msg_blaster_zcb", func) == 0) {
msg_blaster_zcb (sock, args[0]);
} else if (strcmp ("pcmk_test", func) == 0) {
pcmk_test = 1;
} else if (strcmp ("msg_blaster", func) == 0) {
msg_blaster (sock, args[0]);
} else if (strcmp ("context_test", func) == 0) {
context_test (sock);
} else if (strcmp ("are_you_ok_dude", func) == 0) {
snprintf (response, 100, "%s", OK_STR);
send_len = strlen (response);
rc = send (sock, response, strlen (response), 0);
assert(rc == send_len);
} else if (strcmp ("cfg_shutdown", func) == 0) {
qb_log (LOG_INFO, "calling %s() called!", func);
result = corosync_cfg_try_shutdown (cfg_handle, COROSYNC_CFG_SHUTDOWN_FLAG_REQUEST);
qb_log (LOG_INFO,"%s() returned %d!", func, result);
} else if (strcmp ("cfg_initialize",func) == 0) {
int retry_count = 0;
qb_log (LOG_INFO,"%s() called!", func);
result = corosync_cfg_initialize (&cfg_handle, &cfg_callbacks);
while (result != CS_OK) {
qb_log (LOG_ERR,
"cfg_initialize error %d (attempt %d)",
result, retry_count);
if (retry_count >= 3) {
exit (1);
}
sleep(1);
retry_count++;
result = corosync_cfg_initialize (&cfg_handle, &cfg_callbacks);
}
qb_log (LOG_INFO,"corosync_cfg_initialize() == %d", result);
result = corosync_cfg_fd_get (cfg_handle, &cfg_fd);
qb_log (LOG_INFO,"corosync_cfg_fd_get() == %d", result);
qb_loop_poll_add (ta_poll_handle_get(),
QB_LOOP_MED,
cfg_fd,
POLLIN|POLLNVAL,
NULL,
cfg_dispatch_wrapper_fn);
} else {
qb_log(LOG_ERR, "RPC:%s not supported!", func);
}
}
static void my_pre_exit(void)
{
qb_log (LOG_INFO, "%s PRE EXIT", __FILE__);
if (cpg_handle > 0) {
cpg_finalize (cpg_handle);
cpg_handle = 0;
}
if (cfg_handle > 0) {
corosync_cfg_finalize (cfg_handle);
cfg_handle = 0;
}
PK11_DestroyContext(sha1_context, PR_TRUE);
}
int
main(int argc, char *argv[])
{
list_init (&msg_log_head);
list_init (&config_chg_log_head);
if (NSS_NoDB_Init(".") != SECSuccess) {
qb_log(LOG_ERR, "Couldn't initialize nss");
exit (0);
}
if ((sha1_context = PK11_CreateDigestContext(SEC_OID_SHA1)) == NULL) {
qb_log(LOG_ERR, "Couldn't initialize nss");
exit (0);
}
return test_agent_run ("cpg_test_agent", 9034, do_command, my_pre_exit);
}
diff --git a/exec/cmap.c b/exec/cmap.c
index ba4c7987..950a76f5 100644
--- a/exec/cmap.c
+++ b/exec/cmap.c
@@ -1,1029 +1,1028 @@
/*
* Copyright (c) 2011-2012 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Jan Friesse (jfriesse@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the Red Hat, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <config.h>
#include <sys/types.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <errno.h>
-#include <unistd.h>
#include <poll.h>
#include <assert.h>
#include <qb/qbloop.h>
#include <qb/qbipc_common.h>
#include <corosync/corotypes.h>
#include <corosync/corodefs.h>
#include <corosync/list.h>
#include <corosync/mar_gen.h>
#include <corosync/ipc_cmap.h>
#include <corosync/logsys.h>
#include <corosync/coroapi.h>
#include <corosync/icmap.h>
#include "service.h"
LOGSYS_DECLARE_SUBSYS ("CMAP");
#define MAX_REQ_EXEC_CMAP_MCAST_ITEMS 32
#define ICMAP_VALUETYPE_NOT_EXIST 0
struct cmap_conn_info {
struct hdb_handle_database iter_db;
struct hdb_handle_database track_db;
};
typedef uint64_t cmap_iter_handle_t;
typedef uint64_t cmap_track_handle_t;
struct cmap_track_user_data {
void *conn;
cmap_track_handle_t track_handle;
uint64_t track_inst_handle;
};
enum cmap_message_req_types {
MESSAGE_REQ_EXEC_CMAP_MCAST = 0,
};
enum cmap_mcast_reason {
CMAP_MCAST_REASON_SYNC = 0,
CMAP_MCAST_REASON_NEW_CONFIG_VERSION = 1,
};
static struct corosync_api_v1 *api;
static char *cmap_exec_init_fn (struct corosync_api_v1 *corosync_api);
static int cmap_exec_exit_fn(void);
static int cmap_lib_init_fn (void *conn);
static int cmap_lib_exit_fn (void *conn);
static void message_handler_req_lib_cmap_set(void *conn, const void *message);
static void message_handler_req_lib_cmap_delete(void *conn, const void *message);
static void message_handler_req_lib_cmap_get(void *conn, const void *message);
static void message_handler_req_lib_cmap_adjust_int(void *conn, const void *message);
static void message_handler_req_lib_cmap_iter_init(void *conn, const void *message);
static void message_handler_req_lib_cmap_iter_next(void *conn, const void *message);
static void message_handler_req_lib_cmap_iter_finalize(void *conn, const void *message);
static void message_handler_req_lib_cmap_track_add(void *conn, const void *message);
static void message_handler_req_lib_cmap_track_delete(void *conn, const void *message);
static void cmap_notify_fn(int32_t event,
const char *key_name,
struct icmap_notify_value new_val,
struct icmap_notify_value old_val,
void *user_data);
static void message_handler_req_exec_cmap_mcast(
const void *message,
unsigned int nodeid);
static void exec_cmap_mcast_endian_convert(void *message);
/*
* Reson is subtype of message. argc is number of items in argv array. Argv is array
* of strings (key names) which will be send to wire. There can be maximum
* MAX_REQ_EXEC_CMAP_MCAST_ITEMS items (for more items, CS_ERR_TOO_MANY_GROUPS
* error is returned). If key is not found, item has type ICMAP_VALUETYPE_NOT_EXIST
* and length zero.
*/
static cs_error_t cmap_mcast_send(enum cmap_mcast_reason reason, int argc, char *argv[]);
static void cmap_sync_init (
const unsigned int *trans_list,
size_t trans_list_entries,
const unsigned int *member_list,
size_t member_list_entries,
const struct memb_ring_id *ring_id);
static int cmap_sync_process (void);
static void cmap_sync_activate (void);
static void cmap_sync_abort (void);
static void cmap_config_version_track_cb(
int32_t event,
const char *key_name,
struct icmap_notify_value new_value,
struct icmap_notify_value old_value,
void *user_data);
/*
* Library Handler Definition
*/
static struct corosync_lib_handler cmap_lib_engine[] =
{
{ /* 0 */
.lib_handler_fn = message_handler_req_lib_cmap_set,
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 1 */
.lib_handler_fn = message_handler_req_lib_cmap_delete,
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 2 */
.lib_handler_fn = message_handler_req_lib_cmap_get,
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 3 */
.lib_handler_fn = message_handler_req_lib_cmap_adjust_int,
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 4 */
.lib_handler_fn = message_handler_req_lib_cmap_iter_init,
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 5 */
.lib_handler_fn = message_handler_req_lib_cmap_iter_next,
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 6 */
.lib_handler_fn = message_handler_req_lib_cmap_iter_finalize,
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 7 */
.lib_handler_fn = message_handler_req_lib_cmap_track_add,
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 8 */
.lib_handler_fn = message_handler_req_lib_cmap_track_delete,
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
},
};
static struct corosync_exec_handler cmap_exec_engine[] =
{
{ /* 0 - MESSAGE_REQ_EXEC_CMAP_MCAST */
.exec_handler_fn = message_handler_req_exec_cmap_mcast,
.exec_endian_convert_fn = exec_cmap_mcast_endian_convert
},
};
struct corosync_service_engine cmap_service_engine = {
.name = "corosync configuration map access",
.id = CMAP_SERVICE,
.priority = 1,
.private_data_size = sizeof(struct cmap_conn_info),
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED,
.allow_inquorate = CS_LIB_ALLOW_INQUORATE,
.lib_init_fn = cmap_lib_init_fn,
.lib_exit_fn = cmap_lib_exit_fn,
.lib_engine = cmap_lib_engine,
.lib_engine_count = sizeof (cmap_lib_engine) / sizeof (struct corosync_lib_handler),
.exec_init_fn = cmap_exec_init_fn,
.exec_exit_fn = cmap_exec_exit_fn,
.exec_engine = cmap_exec_engine,
.exec_engine_count = sizeof (cmap_exec_engine) / sizeof (struct corosync_exec_handler),
.sync_init = cmap_sync_init,
.sync_process = cmap_sync_process,
.sync_activate = cmap_sync_activate,
.sync_abort = cmap_sync_abort
};
struct corosync_service_engine *cmap_get_service_engine_ver0 (void)
{
return (&cmap_service_engine);
}
struct req_exec_cmap_mcast_item {
mar_name_t key_name __attribute__((aligned(8)));
mar_uint8_t value_type __attribute__((aligned(8)));
mar_size_t value_len __attribute__((aligned(8)));
uint8_t value[] __attribute__((aligned(8)));
};
struct req_exec_cmap_mcast {
struct qb_ipc_request_header header __attribute__((aligned(8)));
mar_uint8_t reason __attribute__((aligned(8)));
mar_uint8_t no_items __attribute__((aligned(8)));
mar_uint8_t reserved1 __attribute__((aligned(8)));
mar_uint8_t reserver2 __attribute__((aligned(8)));
/*
* Following are array of req_exec_cmap_mcast_item alligned to 8 bytes
*/
};
static size_t cmap_sync_trans_list_entries = 0;
static size_t cmap_sync_member_list_entries = 0;
static uint64_t cmap_highest_config_version_received = 0;
static uint64_t cmap_my_config_version = 0;
static int cmap_first_sync = 1;
static icmap_track_t cmap_config_version_track;
static void cmap_config_version_track_cb(
int32_t event,
const char *key_name,
struct icmap_notify_value new_value,
struct icmap_notify_value old_value,
void *user_data)
{
const char *key = "totem.config_version";
cs_error_t ret;
ENTER();
if (icmap_get_uint64("totem.config_version", &cmap_my_config_version) != CS_OK) {
cmap_my_config_version = 0;
}
ret = cmap_mcast_send(CMAP_MCAST_REASON_NEW_CONFIG_VERSION, 1, (char **)&key);
if (ret != CS_OK) {
log_printf(LOGSYS_LEVEL_ERROR, "Can't inform other nodes about new config version");
}
LEAVE();
}
static int cmap_exec_exit_fn(void)
{
if (icmap_track_delete(cmap_config_version_track) != CS_OK) {
log_printf(LOGSYS_LEVEL_ERROR, "Can't delete config_version icmap tracker");
}
return 0;
}
static char *cmap_exec_init_fn (
struct corosync_api_v1 *corosync_api)
{
cs_error_t ret;
api = corosync_api;
ret = icmap_track_add("totem.config_version",
ICMAP_TRACK_ADD | ICMAP_TRACK_DELETE | ICMAP_TRACK_MODIFY,
cmap_config_version_track_cb,
NULL,
&cmap_config_version_track);
if (ret != CS_OK) {
return ((char *)"Can't add config_version icmap tracker");
}
return (NULL);
}
static int cmap_lib_init_fn (void *conn)
{
struct cmap_conn_info *conn_info = (struct cmap_conn_info *)api->ipc_private_data_get (conn);
log_printf(LOGSYS_LEVEL_DEBUG, "lib_init_fn: conn=%p", conn);
api->ipc_refcnt_inc(conn);
memset(conn_info, 0, sizeof(*conn_info));
hdb_create(&conn_info->iter_db);
hdb_create(&conn_info->track_db);
return (0);
}
static int cmap_lib_exit_fn (void *conn)
{
struct cmap_conn_info *conn_info = (struct cmap_conn_info *)api->ipc_private_data_get (conn);
hdb_handle_t iter_handle = 0;
icmap_iter_t *iter;
hdb_handle_t track_handle = 0;
icmap_track_t *track;
log_printf(LOGSYS_LEVEL_DEBUG, "exit_fn for conn=%p", conn);
hdb_iterator_reset(&conn_info->iter_db);
while (hdb_iterator_next(&conn_info->iter_db,
(void*)&iter, &iter_handle) == 0) {
icmap_iter_finalize(*iter);
(void)hdb_handle_put (&conn_info->iter_db, iter_handle);
}
hdb_destroy(&conn_info->iter_db);
hdb_iterator_reset(&conn_info->track_db);
while (hdb_iterator_next(&conn_info->track_db,
(void*)&track, &track_handle) == 0) {
free(icmap_track_get_user_data(*track));
icmap_track_delete(*track);
(void)hdb_handle_put (&conn_info->track_db, track_handle);
}
hdb_destroy(&conn_info->track_db);
api->ipc_refcnt_dec(conn);
return (0);
}
static void cmap_sync_init (
const unsigned int *trans_list,
size_t trans_list_entries,
const unsigned int *member_list,
size_t member_list_entries,
const struct memb_ring_id *ring_id)
{
cmap_sync_trans_list_entries = trans_list_entries;
cmap_sync_member_list_entries = member_list_entries;
cmap_highest_config_version_received = 0;
if (icmap_get_uint64("totem.config_version", &cmap_my_config_version) != CS_OK) {
cmap_my_config_version = 0;
}
}
static int cmap_sync_process (void)
{
const char *key = "totem.config_version";
cs_error_t ret;
ret = cmap_mcast_send(CMAP_MCAST_REASON_SYNC, 1, (char **)&key);
return (ret == CS_OK ? 0 : -1);
}
static void cmap_sync_activate (void)
{
if (cmap_sync_trans_list_entries == 0) {
log_printf(LOGSYS_LEVEL_DEBUG, "Single node sync -> no action");
return ;
}
if (cmap_first_sync == 1) {
cmap_first_sync = 0;
} else {
log_printf(LOGSYS_LEVEL_DEBUG, "Not first sync -> no action");
return ;
}
if (cmap_my_config_version == 0) {
log_printf(LOGSYS_LEVEL_DEBUG, "My config version is 0 -> no action");
return ;
}
if (cmap_highest_config_version_received == 0) {
log_printf(LOGSYS_LEVEL_DEBUG, "Other nodes version is 0 -> no action");
return ;
}
if (cmap_highest_config_version_received != cmap_my_config_version) {
log_printf(LOGSYS_LEVEL_ERROR,
"Received config version (%"PRIu64") is different than my config version (%"PRIu64")! Exiting",
cmap_highest_config_version_received, cmap_my_config_version);
api->shutdown_request();
return ;
}
}
static void cmap_sync_abort (void)
{
}
static void message_handler_req_lib_cmap_set(void *conn, const void *message)
{
const struct req_lib_cmap_set *req_lib_cmap_set = message;
struct res_lib_cmap_set res_lib_cmap_set;
cs_error_t ret;
if (icmap_is_key_ro((char *)req_lib_cmap_set->key_name.value)) {
ret = CS_ERR_ACCESS;
} else {
ret = icmap_set((char *)req_lib_cmap_set->key_name.value, &req_lib_cmap_set->value,
req_lib_cmap_set->value_len, req_lib_cmap_set->type);
}
memset(&res_lib_cmap_set, 0, sizeof(res_lib_cmap_set));
res_lib_cmap_set.header.size = sizeof(res_lib_cmap_set);
res_lib_cmap_set.header.id = MESSAGE_RES_CMAP_SET;
res_lib_cmap_set.header.error = ret;
api->ipc_response_send(conn, &res_lib_cmap_set, sizeof(res_lib_cmap_set));
}
static void message_handler_req_lib_cmap_delete(void *conn, const void *message)
{
const struct req_lib_cmap_set *req_lib_cmap_set = message;
struct res_lib_cmap_delete res_lib_cmap_delete;
cs_error_t ret;
if (icmap_is_key_ro((char *)req_lib_cmap_set->key_name.value)) {
ret = CS_ERR_ACCESS;
} else {
ret = icmap_delete((char *)req_lib_cmap_set->key_name.value);
}
memset(&res_lib_cmap_delete, 0, sizeof(res_lib_cmap_delete));
res_lib_cmap_delete.header.size = sizeof(res_lib_cmap_delete);
res_lib_cmap_delete.header.id = MESSAGE_RES_CMAP_DELETE;
res_lib_cmap_delete.header.error = ret;
api->ipc_response_send(conn, &res_lib_cmap_delete, sizeof(res_lib_cmap_delete));
}
static void message_handler_req_lib_cmap_get(void *conn, const void *message)
{
const struct req_lib_cmap_get *req_lib_cmap_get = message;
struct res_lib_cmap_get *res_lib_cmap_get;
struct res_lib_cmap_get error_res_lib_cmap_get;
cs_error_t ret;
size_t value_len;
size_t res_lib_cmap_get_size;
icmap_value_types_t type;
void *value;
value_len = req_lib_cmap_get->value_len;
res_lib_cmap_get_size = sizeof(*res_lib_cmap_get) + value_len;
res_lib_cmap_get = malloc(res_lib_cmap_get_size);
if (res_lib_cmap_get == NULL) {
ret = CS_ERR_NO_MEMORY;
goto error_exit;
}
memset(res_lib_cmap_get, 0, res_lib_cmap_get_size);
if (value_len > 0) {
value = res_lib_cmap_get->value;
} else {
value = NULL;
}
ret = icmap_get((char *)req_lib_cmap_get->key_name.value,
value,
&value_len,
&type);
if (ret != CS_OK) {
free(res_lib_cmap_get);
goto error_exit;
}
res_lib_cmap_get->header.size = res_lib_cmap_get_size;
res_lib_cmap_get->header.id = MESSAGE_RES_CMAP_GET;
res_lib_cmap_get->header.error = ret;
res_lib_cmap_get->type = type;
res_lib_cmap_get->value_len = value_len;
api->ipc_response_send(conn, res_lib_cmap_get, res_lib_cmap_get_size);
free(res_lib_cmap_get);
return ;
error_exit:
memset(&error_res_lib_cmap_get, 0, sizeof(error_res_lib_cmap_get));
error_res_lib_cmap_get.header.size = sizeof(error_res_lib_cmap_get);
error_res_lib_cmap_get.header.id = MESSAGE_RES_CMAP_GET;
error_res_lib_cmap_get.header.error = ret;
api->ipc_response_send(conn, &error_res_lib_cmap_get, sizeof(error_res_lib_cmap_get));
}
static void message_handler_req_lib_cmap_adjust_int(void *conn, const void *message)
{
const struct req_lib_cmap_adjust_int *req_lib_cmap_adjust_int = message;
struct res_lib_cmap_adjust_int res_lib_cmap_adjust_int;
cs_error_t ret;
if (icmap_is_key_ro((char *)req_lib_cmap_adjust_int->key_name.value)) {
ret = CS_ERR_ACCESS;
} else {
ret = icmap_adjust_int((char *)req_lib_cmap_adjust_int->key_name.value,
req_lib_cmap_adjust_int->step);
}
memset(&res_lib_cmap_adjust_int, 0, sizeof(res_lib_cmap_adjust_int));
res_lib_cmap_adjust_int.header.size = sizeof(res_lib_cmap_adjust_int);
res_lib_cmap_adjust_int.header.id = MESSAGE_RES_CMAP_ADJUST_INT;
res_lib_cmap_adjust_int.header.error = ret;
api->ipc_response_send(conn, &res_lib_cmap_adjust_int, sizeof(res_lib_cmap_adjust_int));
}
static void message_handler_req_lib_cmap_iter_init(void *conn, const void *message)
{
const struct req_lib_cmap_iter_init *req_lib_cmap_iter_init = message;
struct res_lib_cmap_iter_init res_lib_cmap_iter_init;
cs_error_t ret;
icmap_iter_t iter;
icmap_iter_t *hdb_iter;
cmap_iter_handle_t handle = 0ULL;
const char *prefix;
struct cmap_conn_info *conn_info = (struct cmap_conn_info *)api->ipc_private_data_get (conn);
if (req_lib_cmap_iter_init->prefix.length > 0) {
prefix = (char *)req_lib_cmap_iter_init->prefix.value;
} else {
prefix = NULL;
}
iter = icmap_iter_init(prefix);
if (iter == NULL) {
ret = CS_ERR_NO_SECTIONS;
goto reply_send;
}
ret = hdb_error_to_cs(hdb_handle_create(&conn_info->iter_db, sizeof(iter), &handle));
if (ret != CS_OK) {
goto reply_send;
}
ret = hdb_error_to_cs(hdb_handle_get(&conn_info->iter_db, handle, (void *)&hdb_iter));
if (ret != CS_OK) {
goto reply_send;
}
*hdb_iter = iter;
(void)hdb_handle_put (&conn_info->iter_db, handle);
reply_send:
memset(&res_lib_cmap_iter_init, 0, sizeof(res_lib_cmap_iter_init));
res_lib_cmap_iter_init.header.size = sizeof(res_lib_cmap_iter_init);
res_lib_cmap_iter_init.header.id = MESSAGE_RES_CMAP_ITER_INIT;
res_lib_cmap_iter_init.header.error = ret;
res_lib_cmap_iter_init.iter_handle = handle;
api->ipc_response_send(conn, &res_lib_cmap_iter_init, sizeof(res_lib_cmap_iter_init));
}
static void message_handler_req_lib_cmap_iter_next(void *conn, const void *message)
{
const struct req_lib_cmap_iter_next *req_lib_cmap_iter_next = message;
struct res_lib_cmap_iter_next res_lib_cmap_iter_next;
cs_error_t ret;
icmap_iter_t *iter;
size_t value_len = 0;
icmap_value_types_t type = 0;
const char *res = NULL;
struct cmap_conn_info *conn_info = (struct cmap_conn_info *)api->ipc_private_data_get (conn);
ret = hdb_error_to_cs(hdb_handle_get(&conn_info->iter_db,
req_lib_cmap_iter_next->iter_handle, (void *)&iter));
if (ret != CS_OK) {
goto reply_send;
}
res = icmap_iter_next(*iter, &value_len, &type);
if (res == NULL) {
ret = CS_ERR_NO_SECTIONS;
}
(void)hdb_handle_put (&conn_info->iter_db, req_lib_cmap_iter_next->iter_handle);
reply_send:
memset(&res_lib_cmap_iter_next, 0, sizeof(res_lib_cmap_iter_next));
res_lib_cmap_iter_next.header.size = sizeof(res_lib_cmap_iter_next);
res_lib_cmap_iter_next.header.id = MESSAGE_RES_CMAP_ITER_NEXT;
res_lib_cmap_iter_next.header.error = ret;
if (res != NULL) {
res_lib_cmap_iter_next.value_len = value_len;
res_lib_cmap_iter_next.type = type;
memcpy(res_lib_cmap_iter_next.key_name.value, res, strlen(res));
res_lib_cmap_iter_next.key_name.length = strlen(res);
}
api->ipc_response_send(conn, &res_lib_cmap_iter_next, sizeof(res_lib_cmap_iter_next));
}
static void message_handler_req_lib_cmap_iter_finalize(void *conn, const void *message)
{
const struct req_lib_cmap_iter_finalize *req_lib_cmap_iter_finalize = message;
struct res_lib_cmap_iter_finalize res_lib_cmap_iter_finalize;
cs_error_t ret;
icmap_iter_t *iter;
struct cmap_conn_info *conn_info = (struct cmap_conn_info *)api->ipc_private_data_get (conn);
ret = hdb_error_to_cs(hdb_handle_get(&conn_info->iter_db,
req_lib_cmap_iter_finalize->iter_handle, (void *)&iter));
if (ret != CS_OK) {
goto reply_send;
}
icmap_iter_finalize(*iter);
(void)hdb_handle_destroy(&conn_info->iter_db, req_lib_cmap_iter_finalize->iter_handle);
(void)hdb_handle_put (&conn_info->iter_db, req_lib_cmap_iter_finalize->iter_handle);
reply_send:
memset(&res_lib_cmap_iter_finalize, 0, sizeof(res_lib_cmap_iter_finalize));
res_lib_cmap_iter_finalize.header.size = sizeof(res_lib_cmap_iter_finalize);
res_lib_cmap_iter_finalize.header.id = MESSAGE_RES_CMAP_ITER_FINALIZE;
res_lib_cmap_iter_finalize.header.error = ret;
api->ipc_response_send(conn, &res_lib_cmap_iter_finalize, sizeof(res_lib_cmap_iter_finalize));
}
static void cmap_notify_fn(int32_t event,
const char *key_name,
struct icmap_notify_value new_val,
struct icmap_notify_value old_val,
void *user_data)
{
struct cmap_track_user_data *cmap_track_user_data = (struct cmap_track_user_data *)user_data;
struct res_lib_cmap_notify_callback res_lib_cmap_notify_callback;
struct iovec iov[3];
memset(&res_lib_cmap_notify_callback, 0, sizeof(res_lib_cmap_notify_callback));
res_lib_cmap_notify_callback.header.size = sizeof(res_lib_cmap_notify_callback) + new_val.len + old_val.len;
res_lib_cmap_notify_callback.header.id = MESSAGE_RES_CMAP_NOTIFY_CALLBACK;
res_lib_cmap_notify_callback.header.error = CS_OK;
res_lib_cmap_notify_callback.new_value_type = new_val.type;
res_lib_cmap_notify_callback.old_value_type = old_val.type;
res_lib_cmap_notify_callback.new_value_len = new_val.len;
res_lib_cmap_notify_callback.old_value_len = old_val.len;
res_lib_cmap_notify_callback.event = event;
res_lib_cmap_notify_callback.key_name.length = strlen(key_name);
res_lib_cmap_notify_callback.track_inst_handle = cmap_track_user_data->track_inst_handle;
memcpy(res_lib_cmap_notify_callback.key_name.value, key_name, strlen(key_name));
iov[0].iov_base = (char *)&res_lib_cmap_notify_callback;
iov[0].iov_len = sizeof(res_lib_cmap_notify_callback);
iov[1].iov_base = (char *)new_val.data;
iov[1].iov_len = new_val.len;
iov[2].iov_base = (char *)old_val.data;
iov[2].iov_len = old_val.len;
api->ipc_dispatch_iov_send(cmap_track_user_data->conn, iov, 3);
}
static void message_handler_req_lib_cmap_track_add(void *conn, const void *message)
{
const struct req_lib_cmap_track_add *req_lib_cmap_track_add = message;
struct res_lib_cmap_track_add res_lib_cmap_track_add;
cs_error_t ret;
cmap_track_handle_t handle = 0;
icmap_track_t track = NULL;
icmap_track_t *hdb_track;
struct cmap_track_user_data *cmap_track_user_data;
const char *key_name;
struct cmap_conn_info *conn_info = (struct cmap_conn_info *)api->ipc_private_data_get (conn);
cmap_track_user_data = malloc(sizeof(*cmap_track_user_data));
if (cmap_track_user_data == NULL) {
ret = CS_ERR_NO_MEMORY;
goto reply_send;
}
memset(cmap_track_user_data, 0, sizeof(*cmap_track_user_data));
if (req_lib_cmap_track_add->key_name.length > 0) {
key_name = (char *)req_lib_cmap_track_add->key_name.value;
} else {
key_name = NULL;
}
ret = icmap_track_add(key_name,
req_lib_cmap_track_add->track_type,
cmap_notify_fn,
cmap_track_user_data,
&track);
if (ret != CS_OK) {
free(cmap_track_user_data);
goto reply_send;
}
ret = hdb_error_to_cs(hdb_handle_create(&conn_info->track_db, sizeof(track), &handle));
if (ret != CS_OK) {
free(cmap_track_user_data);
goto reply_send;
}
ret = hdb_error_to_cs(hdb_handle_get(&conn_info->track_db, handle, (void *)&hdb_track));
if (ret != CS_OK) {
free(cmap_track_user_data);
goto reply_send;
}
*hdb_track = track;
cmap_track_user_data->conn = conn;
cmap_track_user_data->track_handle = handle;
cmap_track_user_data->track_inst_handle = req_lib_cmap_track_add->track_inst_handle;
(void)hdb_handle_put (&conn_info->track_db, handle);
reply_send:
memset(&res_lib_cmap_track_add, 0, sizeof(res_lib_cmap_track_add));
res_lib_cmap_track_add.header.size = sizeof(res_lib_cmap_track_add);
res_lib_cmap_track_add.header.id = MESSAGE_RES_CMAP_TRACK_ADD;
res_lib_cmap_track_add.header.error = ret;
res_lib_cmap_track_add.track_handle = handle;
api->ipc_response_send(conn, &res_lib_cmap_track_add, sizeof(res_lib_cmap_track_add));
}
static void message_handler_req_lib_cmap_track_delete(void *conn, const void *message)
{
const struct req_lib_cmap_track_delete *req_lib_cmap_track_delete = message;
struct res_lib_cmap_track_delete res_lib_cmap_track_delete;
cs_error_t ret;
icmap_track_t *track;
struct cmap_conn_info *conn_info = (struct cmap_conn_info *)api->ipc_private_data_get (conn);
uint64_t track_inst_handle = 0;
ret = hdb_error_to_cs(hdb_handle_get(&conn_info->track_db,
req_lib_cmap_track_delete->track_handle, (void *)&track));
if (ret != CS_OK) {
goto reply_send;
}
track_inst_handle = ((struct cmap_track_user_data *)icmap_track_get_user_data(*track))->track_inst_handle;
free(icmap_track_get_user_data(*track));
ret = icmap_track_delete(*track);
(void)hdb_handle_put (&conn_info->track_db, req_lib_cmap_track_delete->track_handle);
(void)hdb_handle_destroy(&conn_info->track_db, req_lib_cmap_track_delete->track_handle);
reply_send:
memset(&res_lib_cmap_track_delete, 0, sizeof(res_lib_cmap_track_delete));
res_lib_cmap_track_delete.header.size = sizeof(res_lib_cmap_track_delete);
res_lib_cmap_track_delete.header.id = MESSAGE_RES_CMAP_TRACK_DELETE;
res_lib_cmap_track_delete.header.error = ret;
res_lib_cmap_track_delete.track_inst_handle = track_inst_handle;
api->ipc_response_send(conn, &res_lib_cmap_track_delete, sizeof(res_lib_cmap_track_delete));
}
static cs_error_t cmap_mcast_send(enum cmap_mcast_reason reason, int argc, char *argv[])
{
int i;
size_t value_len;
icmap_value_types_t value_type;
cs_error_t err;
size_t item_len;
size_t msg_len = 0;
struct req_exec_cmap_mcast req_exec_cmap_mcast;
struct req_exec_cmap_mcast_item *item = NULL;
struct iovec req_exec_cmap_iovec[MAX_REQ_EXEC_CMAP_MCAST_ITEMS + 1];
ENTER();
if (argc > MAX_REQ_EXEC_CMAP_MCAST_ITEMS) {
return (CS_ERR_TOO_MANY_GROUPS);
}
memset(req_exec_cmap_iovec, 0, sizeof(req_exec_cmap_iovec));
for (i = 0; i < argc; i++) {
err = icmap_get(argv[i], NULL, &value_len, &value_type);
if (err != CS_OK && err != CS_ERR_NOT_EXIST) {
goto free_mem;
}
if (err == CS_ERR_NOT_EXIST) {
value_type = ICMAP_VALUETYPE_NOT_EXIST;
value_len = 0;
}
item_len = MAR_ALIGN_UP(sizeof(*item) + value_len, 8);
item = malloc(item_len);
if (item == NULL) {
goto free_mem;
}
memset(item, 0, item_len);
item->value_type = value_type;
item->value_len = value_len;
item->key_name.length = strlen(argv[i]);
strcpy((char *)item->key_name.value, argv[i]);
if (value_type != ICMAP_VALUETYPE_NOT_EXIST) {
err = icmap_get(argv[i], item->value, &value_len, &value_type);
if (err != CS_OK) {
goto free_mem;
}
}
req_exec_cmap_iovec[i + 1].iov_base = item;
req_exec_cmap_iovec[i + 1].iov_len = item_len;
msg_len += item_len;
qb_log(LOG_TRACE, "Item %u - type %u, len %zu", i, item->value_type, item->value_len);
item = NULL;
}
memset(&req_exec_cmap_mcast, 0, sizeof(req_exec_cmap_mcast));
req_exec_cmap_mcast.header.size = sizeof(req_exec_cmap_mcast) + msg_len;
req_exec_cmap_mcast.reason = reason;
req_exec_cmap_mcast.no_items = argc;
req_exec_cmap_iovec[0].iov_base = &req_exec_cmap_mcast;
req_exec_cmap_iovec[0].iov_len = sizeof(req_exec_cmap_mcast);
qb_log(LOG_TRACE, "Sending %u items (%u iovec) for reason %u", argc, argc + 1, reason);
err = (api->totem_mcast(req_exec_cmap_iovec, argc + 1, TOTEM_AGREED) == 0 ? CS_OK : CS_ERR_MESSAGE_ERROR);
free_mem:
for (i = 0; i < argc; i++) {
free(req_exec_cmap_iovec[i + 1].iov_base);
}
free(item);
LEAVE();
return (err);
}
static struct req_exec_cmap_mcast_item *cmap_mcast_item_find(
const void *message,
char *key)
{
const struct req_exec_cmap_mcast *req_exec_cmap_mcast = message;
int i;
const char *p;
struct req_exec_cmap_mcast_item *item;
mar_uint16_t key_name_len;
p = (const char *)message + sizeof(*req_exec_cmap_mcast);
for (i = 0; i < req_exec_cmap_mcast->no_items; i++) {
item = (struct req_exec_cmap_mcast_item *)p;
key_name_len = item->key_name.length;
if (strlen(key) == key_name_len && strcmp((char *)item->key_name.value, key) == 0) {
return (item);
}
p += MAR_ALIGN_UP(sizeof(*item) + item->value_len, 8);
}
return (NULL);
}
static void message_handler_req_exec_cmap_mcast_reason_sync_nv(
enum cmap_mcast_reason reason,
const void *message,
unsigned int nodeid)
{
char member_config_version[ICMAP_KEYNAME_MAXLEN];
uint64_t config_version = 0;
struct req_exec_cmap_mcast_item *item;
mar_size_t value_len;
ENTER();
item = cmap_mcast_item_find(message, (char *)"totem.config_version");
if (item != NULL) {
value_len = item->value_len;
if (item->value_type == ICMAP_VALUETYPE_NOT_EXIST) {
config_version = 0;
}
if (item->value_type == ICMAP_VALUETYPE_UINT64) {
memcpy(&config_version, item->value, value_len);
}
}
qb_log(LOG_TRACE, "Received config version %"PRIu64" from node %x", config_version, nodeid);
if (nodeid != api->totem_nodeid_get() &&
config_version > cmap_highest_config_version_received) {
cmap_highest_config_version_received = config_version;
}
snprintf(member_config_version, ICMAP_KEYNAME_MAXLEN,
"runtime.totem.pg.mrp.srp.members.%u.config_version", nodeid);
icmap_set_uint64(member_config_version, config_version);
LEAVE();
}
static void message_handler_req_exec_cmap_mcast(
const void *message,
unsigned int nodeid)
{
const struct req_exec_cmap_mcast *req_exec_cmap_mcast = message;
ENTER();
switch (req_exec_cmap_mcast->reason) {
case CMAP_MCAST_REASON_SYNC:
message_handler_req_exec_cmap_mcast_reason_sync_nv(req_exec_cmap_mcast->reason,
message, nodeid);
break;
case CMAP_MCAST_REASON_NEW_CONFIG_VERSION:
message_handler_req_exec_cmap_mcast_reason_sync_nv(req_exec_cmap_mcast->reason,
message, nodeid);
break;
default:
qb_log(LOG_TRACE, "Received mcast with unknown reason %u", req_exec_cmap_mcast->reason);
};
LEAVE();
}
static void exec_cmap_mcast_endian_convert(void *message)
{
struct req_exec_cmap_mcast *req_exec_cmap_mcast = message;
const char *p;
int i;
struct req_exec_cmap_mcast_item *item;
uint16_t u16;
uint32_t u32;
uint64_t u64;
float flt;
double dbl;
swab_coroipc_request_header_t(&req_exec_cmap_mcast->header);
p = (const char *)message + sizeof(*req_exec_cmap_mcast);
for (i = 0; i < req_exec_cmap_mcast->no_items; i++) {
item = (struct req_exec_cmap_mcast_item *)p;
swab_mar_uint16_t(&item->key_name.length);
swab_mar_size_t(&item->value_len);
switch (item->value_type) {
case ICMAP_VALUETYPE_INT16:
case ICMAP_VALUETYPE_UINT16:
memcpy(&u16, item->value, sizeof(u16));
u16 = swab16(u16);
memcpy(item->value, &u16, sizeof(u16));
break;
case ICMAP_VALUETYPE_INT32:
case ICMAP_VALUETYPE_UINT32:
memcpy(&u32, item->value, sizeof(u32));
u32 = swab32(u32);
memcpy(item->value, &u32, sizeof(u32));
break;
case ICMAP_VALUETYPE_INT64:
case ICMAP_VALUETYPE_UINT64:
memcpy(&u64, item->value, sizeof(u64));
u64 = swab64(u64);
memcpy(item->value, &u64, sizeof(u64));
break;
case ICMAP_VALUETYPE_FLOAT:
memcpy(&flt, item->value, sizeof(flt));
swabflt(&flt);
memcpy(item->value, &flt, sizeof(flt));
break;
case ICMAP_VALUETYPE_DOUBLE:
memcpy(&dbl, item->value, sizeof(dbl));
swabdbl(&dbl);
memcpy(item->value, &dbl, sizeof(dbl));
break;
}
p += MAR_ALIGN_UP(sizeof(*item) + item->value_len, 8);
}
}
diff --git a/exec/cpg.c b/exec/cpg.c
index 5bd830fd..78ac1e9e 100644
--- a/exec/cpg.c
+++ b/exec/cpg.c
@@ -1,2438 +1,2436 @@
/*
* Copyright (c) 2006-2015 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Christine Caulfield (ccaulfie@redhat.com)
* Author: Jan Friesse (jfriesse@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <config.h>
#ifdef HAVE_ALLOCA_H
#include <alloca.h>
#endif
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/ioctl.h>
#include <netinet/in.h>
#include <sys/uio.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <time.h>
#include <assert.h>
-#include <unistd.h>
-#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/mman.h>
#include <qb/qbmap.h>
#include <corosync/corotypes.h>
#include <qb/qbipc_common.h>
#include <corosync/corodefs.h>
#include <corosync/list.h>
#include <corosync/logsys.h>
#include <corosync/coroapi.h>
#include <corosync/cpg.h>
#include <corosync/ipc_cpg.h>
#ifndef MAP_ANONYMOUS
#define MAP_ANONYMOUS MAP_ANON
#endif
#include "service.h"
LOGSYS_DECLARE_SUBSYS ("CPG");
#define GROUP_HASH_SIZE 32
enum cpg_message_req_types {
MESSAGE_REQ_EXEC_CPG_PROCJOIN = 0,
MESSAGE_REQ_EXEC_CPG_PROCLEAVE = 1,
MESSAGE_REQ_EXEC_CPG_JOINLIST = 2,
MESSAGE_REQ_EXEC_CPG_MCAST = 3,
MESSAGE_REQ_EXEC_CPG_DOWNLIST_OLD = 4,
MESSAGE_REQ_EXEC_CPG_DOWNLIST = 5,
MESSAGE_REQ_EXEC_CPG_PARTIAL_MCAST = 6,
};
struct zcb_mapped {
struct list_head list;
void *addr;
size_t size;
};
/*
* state` exec deliver
* match group name, pid -> if matched deliver for YES:
* XXX indicates impossible state
*
* join leave mcast
* UNJOINED XXX XXX NO
* LEAVE_STARTED XXX YES(unjoined_enter) YES
* JOIN_STARTED YES(join_started_enter) XXX NO
* JOIN_COMPLETED XXX NO YES
*
* join_started_enter
* set JOIN_COMPLETED
* add entry to process_info list
* unjoined_enter
* set UNJOINED
* delete entry from process_info list
*
*
* library accept join error codes
* UNJOINED YES(CS_OK) set JOIN_STARTED
* LEAVE_STARTED NO(CS_ERR_BUSY)
* JOIN_STARTED NO(CS_ERR_EXIST)
* JOIN_COMPlETED NO(CS_ERR_EXIST)
*
* library accept leave error codes
* UNJOINED NO(CS_ERR_NOT_EXIST)
* LEAVE_STARTED NO(CS_ERR_NOT_EXIST)
* JOIN_STARTED NO(CS_ERR_BUSY)
* JOIN_COMPLETED YES(CS_OK) set LEAVE_STARTED
*
* library accept mcast
* UNJOINED NO(CS_ERR_NOT_EXIST)
* LEAVE_STARTED NO(CS_ERR_NOT_EXIST)
* JOIN_STARTED YES(CS_OK)
* JOIN_COMPLETED YES(CS_OK)
*/
enum cpd_state {
CPD_STATE_UNJOINED,
CPD_STATE_LEAVE_STARTED,
CPD_STATE_JOIN_STARTED,
CPD_STATE_JOIN_COMPLETED
};
enum cpg_sync_state {
CPGSYNC_DOWNLIST,
CPGSYNC_JOINLIST
};
enum cpg_downlist_state_e {
CPG_DOWNLIST_NONE,
CPG_DOWNLIST_WAITING_FOR_MESSAGES,
CPG_DOWNLIST_APPLYING,
};
static enum cpg_downlist_state_e downlist_state;
static struct list_head downlist_messages_head;
static struct list_head joinlist_messages_head;
struct cpg_pd {
void *conn;
mar_cpg_name_t group_name;
uint32_t pid;
enum cpd_state cpd_state;
unsigned int flags;
int initial_totem_conf_sent;
uint64_t transition_counter; /* These two are used when sending fragmented messages */
uint64_t initial_transition_counter;
struct list_head list;
struct list_head iteration_instance_list_head;
struct list_head zcb_mapped_list_head;
};
struct cpg_iteration_instance {
hdb_handle_t handle;
struct list_head list;
struct list_head items_list_head; /* List of process_info */
struct list_head *current_pointer;
};
DECLARE_HDB_DATABASE(cpg_iteration_handle_t_db,NULL);
DECLARE_LIST_INIT(cpg_pd_list_head);
static unsigned int my_member_list[PROCESSOR_COUNT_MAX];
static unsigned int my_member_list_entries;
static unsigned int my_old_member_list[PROCESSOR_COUNT_MAX];
static unsigned int my_old_member_list_entries = 0;
static struct corosync_api_v1 *api = NULL;
static enum cpg_sync_state my_sync_state = CPGSYNC_DOWNLIST;
static mar_cpg_ring_id_t last_sync_ring_id;
struct process_info {
unsigned int nodeid;
uint32_t pid;
mar_cpg_name_t group;
struct list_head list; /* on the group_info members list */
};
DECLARE_LIST_INIT(process_info_list_head);
struct join_list_entry {
uint32_t pid;
mar_cpg_name_t group_name;
};
/*
* Service Interfaces required by service_message_handler struct
*/
static char *cpg_exec_init_fn (struct corosync_api_v1 *);
static int cpg_lib_init_fn (void *conn);
static int cpg_lib_exit_fn (void *conn);
static void message_handler_req_exec_cpg_procjoin (
const void *message,
unsigned int nodeid);
static void message_handler_req_exec_cpg_procleave (
const void *message,
unsigned int nodeid);
static void message_handler_req_exec_cpg_joinlist (
const void *message,
unsigned int nodeid);
static void message_handler_req_exec_cpg_mcast (
const void *message,
unsigned int nodeid);
static void message_handler_req_exec_cpg_partial_mcast (
const void *message,
unsigned int nodeid);
static void message_handler_req_exec_cpg_downlist_old (
const void *message,
unsigned int nodeid);
static void message_handler_req_exec_cpg_downlist (
const void *message,
unsigned int nodeid);
static void exec_cpg_procjoin_endian_convert (void *msg);
static void exec_cpg_joinlist_endian_convert (void *msg);
static void exec_cpg_mcast_endian_convert (void *msg);
static void exec_cpg_partial_mcast_endian_convert (void *msg);
static void exec_cpg_downlist_endian_convert_old (void *msg);
static void exec_cpg_downlist_endian_convert (void *msg);
static void message_handler_req_lib_cpg_join (void *conn, const void *message);
static void message_handler_req_lib_cpg_leave (void *conn, const void *message);
static void message_handler_req_lib_cpg_finalize (void *conn, const void *message);
static void message_handler_req_lib_cpg_mcast (void *conn, const void *message);
static void message_handler_req_lib_cpg_partial_mcast (void *conn, const void *message);
static void message_handler_req_lib_cpg_membership (void *conn,
const void *message);
static void message_handler_req_lib_cpg_local_get (void *conn,
const void *message);
static void message_handler_req_lib_cpg_iteration_initialize (
void *conn,
const void *message);
static void message_handler_req_lib_cpg_iteration_next (
void *conn,
const void *message);
static void message_handler_req_lib_cpg_iteration_finalize (
void *conn,
const void *message);
static void message_handler_req_lib_cpg_zc_alloc (
void *conn,
const void *message);
static void message_handler_req_lib_cpg_zc_free (
void *conn,
const void *message);
static void message_handler_req_lib_cpg_zc_execute (
void *conn,
const void *message);
static int cpg_node_joinleave_send (unsigned int pid, const mar_cpg_name_t *group_name, int fn, int reason);
static int cpg_exec_send_downlist(void);
static int cpg_exec_send_joinlist(void);
static void downlist_messages_delete (void);
static void downlist_master_choose_and_send (void);
static void joinlist_inform_clients (void);
static void joinlist_messages_delete (void);
static void cpg_sync_init (
const unsigned int *trans_list,
size_t trans_list_entries,
const unsigned int *member_list,
size_t member_list_entries,
const struct memb_ring_id *ring_id);
static int cpg_sync_process (void);
static void cpg_sync_activate (void);
static void cpg_sync_abort (void);
static void do_proc_join(
const mar_cpg_name_t *name,
uint32_t pid,
unsigned int nodeid,
int reason);
static void do_proc_leave(
const mar_cpg_name_t *name,
uint32_t pid,
unsigned int nodeid,
int reason);
static int notify_lib_totem_membership (
void *conn,
int member_list_entries,
const unsigned int *member_list);
static inline int zcb_all_free (
struct cpg_pd *cpd);
static char *cpg_print_group_name (
const mar_cpg_name_t *group);
/*
* Library Handler Definition
*/
static struct corosync_lib_handler cpg_lib_engine[] =
{
{ /* 0 - MESSAGE_REQ_CPG_JOIN */
.lib_handler_fn = message_handler_req_lib_cpg_join,
.flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
},
{ /* 1 - MESSAGE_REQ_CPG_LEAVE */
.lib_handler_fn = message_handler_req_lib_cpg_leave,
.flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
},
{ /* 2 - MESSAGE_REQ_CPG_MCAST */
.lib_handler_fn = message_handler_req_lib_cpg_mcast,
.flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
},
{ /* 3 - MESSAGE_REQ_CPG_MEMBERSHIP */
.lib_handler_fn = message_handler_req_lib_cpg_membership,
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 4 - MESSAGE_REQ_CPG_LOCAL_GET */
.lib_handler_fn = message_handler_req_lib_cpg_local_get,
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 5 - MESSAGE_REQ_CPG_ITERATIONINITIALIZE */
.lib_handler_fn = message_handler_req_lib_cpg_iteration_initialize,
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 6 - MESSAGE_REQ_CPG_ITERATIONNEXT */
.lib_handler_fn = message_handler_req_lib_cpg_iteration_next,
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 7 - MESSAGE_REQ_CPG_ITERATIONFINALIZE */
.lib_handler_fn = message_handler_req_lib_cpg_iteration_finalize,
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 8 - MESSAGE_REQ_CPG_FINALIZE */
.lib_handler_fn = message_handler_req_lib_cpg_finalize,
.flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
},
{ /* 9 */
.lib_handler_fn = message_handler_req_lib_cpg_zc_alloc,
.flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
},
{ /* 10 */
.lib_handler_fn = message_handler_req_lib_cpg_zc_free,
.flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
},
{ /* 11 */
.lib_handler_fn = message_handler_req_lib_cpg_zc_execute,
.flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
},
{ /* 12 */
.lib_handler_fn = message_handler_req_lib_cpg_partial_mcast,
.flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
},
};
static struct corosync_exec_handler cpg_exec_engine[] =
{
{ /* 0 - MESSAGE_REQ_EXEC_CPG_PROCJOIN */
.exec_handler_fn = message_handler_req_exec_cpg_procjoin,
.exec_endian_convert_fn = exec_cpg_procjoin_endian_convert
},
{ /* 1 - MESSAGE_REQ_EXEC_CPG_PROCLEAVE */
.exec_handler_fn = message_handler_req_exec_cpg_procleave,
.exec_endian_convert_fn = exec_cpg_procjoin_endian_convert
},
{ /* 2 - MESSAGE_REQ_EXEC_CPG_JOINLIST */
.exec_handler_fn = message_handler_req_exec_cpg_joinlist,
.exec_endian_convert_fn = exec_cpg_joinlist_endian_convert
},
{ /* 3 - MESSAGE_REQ_EXEC_CPG_MCAST */
.exec_handler_fn = message_handler_req_exec_cpg_mcast,
.exec_endian_convert_fn = exec_cpg_mcast_endian_convert
},
{ /* 4 - MESSAGE_REQ_EXEC_CPG_DOWNLIST_OLD */
.exec_handler_fn = message_handler_req_exec_cpg_downlist_old,
.exec_endian_convert_fn = exec_cpg_downlist_endian_convert_old
},
{ /* 5 - MESSAGE_REQ_EXEC_CPG_DOWNLIST */
.exec_handler_fn = message_handler_req_exec_cpg_downlist,
.exec_endian_convert_fn = exec_cpg_downlist_endian_convert
},
{ /* 6 - MESSAGE_REQ_EXEC_CPG_PARTIAL_MCAST */
.exec_handler_fn = message_handler_req_exec_cpg_partial_mcast,
.exec_endian_convert_fn = exec_cpg_partial_mcast_endian_convert
},
};
struct corosync_service_engine cpg_service_engine = {
.name = "corosync cluster closed process group service v1.01",
.id = CPG_SERVICE,
.priority = 1,
.private_data_size = sizeof (struct cpg_pd),
.flow_control = CS_LIB_FLOW_CONTROL_REQUIRED,
.allow_inquorate = CS_LIB_ALLOW_INQUORATE,
.lib_init_fn = cpg_lib_init_fn,
.lib_exit_fn = cpg_lib_exit_fn,
.lib_engine = cpg_lib_engine,
.lib_engine_count = sizeof (cpg_lib_engine) / sizeof (struct corosync_lib_handler),
.exec_init_fn = cpg_exec_init_fn,
.exec_dump_fn = NULL,
.exec_engine = cpg_exec_engine,
.exec_engine_count = sizeof (cpg_exec_engine) / sizeof (struct corosync_exec_handler),
.sync_init = cpg_sync_init,
.sync_process = cpg_sync_process,
.sync_activate = cpg_sync_activate,
.sync_abort = cpg_sync_abort
};
struct corosync_service_engine *cpg_get_service_engine_ver0 (void)
{
return (&cpg_service_engine);
}
struct req_exec_cpg_procjoin {
struct qb_ipc_request_header header __attribute__((aligned(8)));
mar_cpg_name_t group_name __attribute__((aligned(8)));
mar_uint32_t pid __attribute__((aligned(8)));
mar_uint32_t reason __attribute__((aligned(8)));
};
struct req_exec_cpg_mcast {
struct qb_ipc_request_header header __attribute__((aligned(8)));
mar_cpg_name_t group_name __attribute__((aligned(8)));
mar_uint32_t msglen __attribute__((aligned(8)));
mar_uint32_t pid __attribute__((aligned(8)));
mar_message_source_t source __attribute__((aligned(8)));
mar_uint8_t message[] __attribute__((aligned(8)));
};
struct req_exec_cpg_partial_mcast {
struct qb_ipc_request_header header __attribute__((aligned(8)));
mar_cpg_name_t group_name __attribute__((aligned(8)));
mar_uint32_t msglen __attribute__((aligned(8)));
mar_uint32_t fraglen __attribute__((aligned(8)));
mar_uint32_t pid __attribute__((aligned(8)));
mar_uint32_t type __attribute__((aligned(8)));
mar_message_source_t source __attribute__((aligned(8)));
mar_uint8_t message[] __attribute__((aligned(8)));
};
struct req_exec_cpg_downlist_old {
struct qb_ipc_request_header header __attribute__((aligned(8)));
mar_uint32_t left_nodes __attribute__((aligned(8)));
mar_uint32_t nodeids[PROCESSOR_COUNT_MAX] __attribute__((aligned(8)));
};
struct req_exec_cpg_downlist {
struct qb_ipc_request_header header __attribute__((aligned(8)));
/* merge decisions */
mar_uint32_t old_members __attribute__((aligned(8)));
/* downlist below */
mar_uint32_t left_nodes __attribute__((aligned(8)));
mar_uint32_t nodeids[PROCESSOR_COUNT_MAX] __attribute__((aligned(8)));
};
struct downlist_msg {
mar_uint32_t sender_nodeid;
mar_uint32_t old_members __attribute__((aligned(8)));
mar_uint32_t left_nodes __attribute__((aligned(8)));
mar_uint32_t nodeids[PROCESSOR_COUNT_MAX] __attribute__((aligned(8)));
struct list_head list;
};
struct joinlist_msg {
mar_uint32_t sender_nodeid;
uint32_t pid;
mar_cpg_name_t group_name;
struct list_head list;
};
static struct req_exec_cpg_downlist g_req_exec_cpg_downlist;
/*
* Function print group name. It's not reentrant
*/
static char *cpg_print_group_name(const mar_cpg_name_t *group)
{
static char res[CPG_MAX_NAME_LENGTH * 4 + 1];
int dest_pos = 0;
char c;
int i;
for (i = 0; i < group->length; i++) {
c = group->value[i];
if (c >= ' ' && c < 0x7f && c != '\\') {
res[dest_pos++] = c;
} else {
if (c == '\\') {
res[dest_pos++] = '\\';
res[dest_pos++] = '\\';
} else {
snprintf(res + dest_pos, sizeof(res) - dest_pos, "\\x%02X", c);
dest_pos += 4;
}
}
}
res[dest_pos] = 0;
return (res);
}
static void cpg_sync_init (
const unsigned int *trans_list,
size_t trans_list_entries,
const unsigned int *member_list,
size_t member_list_entries,
const struct memb_ring_id *ring_id)
{
int entries;
int i, j;
int found;
my_sync_state = CPGSYNC_DOWNLIST;
memcpy (my_member_list, member_list, member_list_entries *
sizeof (unsigned int));
my_member_list_entries = member_list_entries;
last_sync_ring_id.nodeid = ring_id->rep.nodeid;
last_sync_ring_id.seq = ring_id->seq;
downlist_state = CPG_DOWNLIST_WAITING_FOR_MESSAGES;
entries = 0;
/*
* Determine list of nodeids for downlist message
*/
for (i = 0; i < my_old_member_list_entries; i++) {
found = 0;
for (j = 0; j < trans_list_entries; j++) {
if (my_old_member_list[i] == trans_list[j]) {
found = 1;
break;
}
}
if (found == 0) {
g_req_exec_cpg_downlist.nodeids[entries++] =
my_old_member_list[i];
}
}
g_req_exec_cpg_downlist.left_nodes = entries;
}
static int cpg_sync_process (void)
{
int res = -1;
if (my_sync_state == CPGSYNC_DOWNLIST) {
res = cpg_exec_send_downlist();
if (res == -1) {
return (-1);
}
my_sync_state = CPGSYNC_JOINLIST;
}
if (my_sync_state == CPGSYNC_JOINLIST) {
res = cpg_exec_send_joinlist();
}
return (res);
}
static void cpg_sync_activate (void)
{
memcpy (my_old_member_list, my_member_list,
my_member_list_entries * sizeof (unsigned int));
my_old_member_list_entries = my_member_list_entries;
if (downlist_state == CPG_DOWNLIST_WAITING_FOR_MESSAGES) {
downlist_master_choose_and_send ();
}
joinlist_inform_clients ();
downlist_messages_delete ();
downlist_state = CPG_DOWNLIST_NONE;
joinlist_messages_delete ();
notify_lib_totem_membership (NULL, my_member_list_entries, my_member_list);
}
static void cpg_sync_abort (void)
{
downlist_state = CPG_DOWNLIST_NONE;
downlist_messages_delete ();
joinlist_messages_delete ();
}
static int notify_lib_totem_membership (
void *conn,
int member_list_entries,
const unsigned int *member_list)
{
struct list_head *iter;
char *buf;
int size;
struct res_lib_cpg_totem_confchg_callback *res;
size = sizeof(struct res_lib_cpg_totem_confchg_callback) +
sizeof(mar_uint32_t) * (member_list_entries);
buf = alloca(size);
if (!buf)
return CS_ERR_LIBRARY;
res = (struct res_lib_cpg_totem_confchg_callback *)buf;
res->member_list_entries = member_list_entries;
res->header.size = size;
res->header.id = MESSAGE_RES_CPG_TOTEM_CONFCHG_CALLBACK;
res->header.error = CS_OK;
memcpy (&res->ring_id, &last_sync_ring_id, sizeof (mar_cpg_ring_id_t));
memcpy (res->member_list, member_list, res->member_list_entries * sizeof (mar_uint32_t));
if (conn == NULL) {
for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; iter = iter->next) {
struct cpg_pd *cpg_pd = list_entry (iter, struct cpg_pd, list);
api->ipc_dispatch_send (cpg_pd->conn, buf, size);
}
} else {
api->ipc_dispatch_send (conn, buf, size);
}
return CS_OK;
}
static int notify_lib_joinlist(
const mar_cpg_name_t *group_name,
void *conn,
int joined_list_entries,
mar_cpg_address_t *joined_list,
int left_list_entries,
mar_cpg_address_t *left_list,
int id)
{
int size;
char *buf;
struct list_head *iter;
int count;
struct res_lib_cpg_confchg_callback *res;
mar_cpg_address_t *retgi;
count = 0;
for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
struct process_info *pi = list_entry (iter, struct process_info, list);
if (mar_name_compare (&pi->group, group_name) == 0) {
int i;
int founded = 0;
for (i = 0; i < left_list_entries; i++) {
if (left_list[i].nodeid == pi->nodeid && left_list[i].pid == pi->pid) {
founded++;
}
}
if (!founded)
count++;
}
}
size = sizeof(struct res_lib_cpg_confchg_callback) +
sizeof(mar_cpg_address_t) * (count + left_list_entries + joined_list_entries);
buf = alloca(size);
if (!buf)
return CS_ERR_LIBRARY;
res = (struct res_lib_cpg_confchg_callback *)buf;
res->joined_list_entries = joined_list_entries;
res->left_list_entries = left_list_entries;
res->member_list_entries = count;
retgi = res->member_list;
res->header.size = size;
res->header.id = id;
res->header.error = CS_OK;
memcpy(&res->group_name, group_name, sizeof(mar_cpg_name_t));
for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
struct process_info *pi=list_entry (iter, struct process_info, list);
if (mar_name_compare (&pi->group, group_name) == 0) {
int i;
int founded = 0;
for (i = 0;i < left_list_entries; i++) {
if (left_list[i].nodeid == pi->nodeid && left_list[i].pid == pi->pid) {
founded++;
}
}
if (!founded) {
retgi->nodeid = pi->nodeid;
retgi->pid = pi->pid;
retgi++;
}
}
}
if (left_list_entries) {
memcpy (retgi, left_list, left_list_entries * sizeof(mar_cpg_address_t));
retgi += left_list_entries;
}
if (joined_list_entries) {
memcpy (retgi, joined_list, joined_list_entries * sizeof(mar_cpg_address_t));
retgi += joined_list_entries;
}
if (conn) {
api->ipc_dispatch_send (conn, buf, size);
} else {
for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; iter = iter->next) {
struct cpg_pd *cpd = list_entry (iter, struct cpg_pd, list);
if (mar_name_compare (&cpd->group_name, group_name) == 0) {
assert (joined_list_entries <= 1);
if (joined_list_entries) {
if (joined_list[0].pid == cpd->pid &&
joined_list[0].nodeid == api->totem_nodeid_get()) {
cpd->cpd_state = CPD_STATE_JOIN_COMPLETED;
}
}
if (cpd->cpd_state == CPD_STATE_JOIN_COMPLETED ||
cpd->cpd_state == CPD_STATE_LEAVE_STARTED) {
api->ipc_dispatch_send (cpd->conn, buf, size);
cpd->transition_counter++;
}
if (left_list_entries) {
if (left_list[0].pid == cpd->pid &&
left_list[0].nodeid == api->totem_nodeid_get() &&
left_list[0].reason == CONFCHG_CPG_REASON_LEAVE) {
cpd->pid = 0;
memset (&cpd->group_name, 0, sizeof(cpd->group_name));
cpd->cpd_state = CPD_STATE_UNJOINED;
}
}
}
}
}
/*
* Traverse thru cpds and send totem membership for cpd, where it is not send yet
*/
for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; iter = iter->next) {
struct cpg_pd *cpd = list_entry (iter, struct cpg_pd, list);
if ((cpd->flags & CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF) && (cpd->initial_totem_conf_sent == 0)) {
cpd->initial_totem_conf_sent = 1;
notify_lib_totem_membership (cpd->conn, my_old_member_list_entries, my_old_member_list);
}
}
return CS_OK;
}
static void downlist_log(const char *msg, struct downlist_msg* dl)
{
log_printf (LOG_DEBUG,
"%s: sender %s; members(old:%d left:%d)",
msg,
api->totem_ifaces_print(dl->sender_nodeid),
dl->old_members,
dl->left_nodes);
}
static struct downlist_msg* downlist_master_choose (void)
{
struct downlist_msg *cmp;
struct downlist_msg *best = NULL;
struct list_head *iter;
uint32_t cmp_members;
uint32_t best_members;
uint32_t i;
int ignore_msg;
for (iter = downlist_messages_head.next;
iter != &downlist_messages_head;
iter = iter->next) {
cmp = list_entry(iter, struct downlist_msg, list);
downlist_log("comparing", cmp);
ignore_msg = 0;
for (i = 0; i < cmp->left_nodes; i++) {
if (cmp->nodeids[i] == api->totem_nodeid_get()) {
log_printf (LOG_DEBUG, "Ignoring this entry because I'm in the left list\n");
ignore_msg = 1;
break;
}
}
if (ignore_msg) {
continue ;
}
if (best == NULL) {
best = cmp;
continue;
}
best_members = best->old_members - best->left_nodes;
cmp_members = cmp->old_members - cmp->left_nodes;
if (cmp_members > best_members) {
best = cmp;
} else if (cmp_members == best_members) {
if (cmp->old_members > best->old_members) {
best = cmp;
} else if (cmp->old_members == best->old_members) {
if (cmp->sender_nodeid < best->sender_nodeid) {
best = cmp;
}
}
}
}
assert (best != NULL);
return best;
}
static void downlist_master_choose_and_send (void)
{
struct downlist_msg *stored_msg;
struct list_head *iter;
struct process_info *left_pi;
qb_map_t *group_map;
struct cpg_name cpg_group;
mar_cpg_name_t group;
struct confchg_data{
struct cpg_name cpg_group;
mar_cpg_address_t left_list[CPG_MEMBERS_MAX];
int left_list_entries;
struct list_head list;
} *pcd;
qb_map_iter_t *miter;
int i, size;
downlist_state = CPG_DOWNLIST_APPLYING;
stored_msg = downlist_master_choose ();
if (!stored_msg) {
log_printf (LOGSYS_LEVEL_DEBUG, "NO chosen downlist");
return;
}
downlist_log("chosen downlist", stored_msg);
group_map = qb_skiplist_create();
/*
* only the cpg groups included in left nodes should receive
* confchg event, so we will collect these cpg groups and
* relative left_lists here.
*/
for (iter = process_info_list_head.next; iter != &process_info_list_head; ) {
struct process_info *pi = list_entry(iter, struct process_info, list);
iter = iter->next;
left_pi = NULL;
for (i = 0; i < stored_msg->left_nodes; i++) {
if (pi->nodeid == stored_msg->nodeids[i]) {
left_pi = pi;
break;
}
}
if (left_pi) {
marshall_from_mar_cpg_name_t(&cpg_group, &left_pi->group);
cpg_group.value[cpg_group.length] = 0;
pcd = (struct confchg_data *)qb_map_get(group_map, cpg_group.value);
if (pcd == NULL) {
pcd = (struct confchg_data *)calloc(1, sizeof(struct confchg_data));
memcpy(&pcd->cpg_group, &cpg_group, sizeof(struct cpg_name));
qb_map_put(group_map, pcd->cpg_group.value, pcd);
}
size = pcd->left_list_entries;
pcd->left_list[size].nodeid = left_pi->nodeid;
pcd->left_list[size].pid = left_pi->pid;
pcd->left_list[size].reason = CONFCHG_CPG_REASON_NODEDOWN;
pcd->left_list_entries++;
list_del (&left_pi->list);
free (left_pi);
}
}
/* send only one confchg event per cpg group */
miter = qb_map_iter_create(group_map);
while (qb_map_iter_next(miter, (void **)&pcd)) {
marshall_to_mar_cpg_name_t(&group, &pcd->cpg_group);
log_printf (LOG_DEBUG, "left_list_entries:%d", pcd->left_list_entries);
for (i=0; i<pcd->left_list_entries; i++) {
log_printf (LOG_DEBUG, "left_list[%d] group:%s, ip:%s, pid:%d",
i, cpg_print_group_name(&group),
(char*)api->totem_ifaces_print(pcd->left_list[i].nodeid),
pcd->left_list[i].pid);
}
/* send confchg event */
notify_lib_joinlist(&group, NULL,
0, NULL,
pcd->left_list_entries,
pcd->left_list,
MESSAGE_RES_CPG_CONFCHG_CALLBACK);
free(pcd);
}
qb_map_iter_free(miter);
qb_map_destroy(group_map);
}
/*
* Remove processes that might have left the group while we were suspended.
*/
static void joinlist_remove_zombie_pi_entries (void)
{
struct list_head *pi_iter;
struct list_head *jl_iter;
struct process_info *pi;
struct joinlist_msg *stored_msg;
int found;
for (pi_iter = process_info_list_head.next; pi_iter != &process_info_list_head; ) {
pi = list_entry (pi_iter, struct process_info, list);
pi_iter = pi_iter->next;
/*
* Ignore local node
*/
if (pi->nodeid == api->totem_nodeid_get()) {
continue ;
}
/*
* Try to find message in joinlist messages
*/
found = 0;
for (jl_iter = joinlist_messages_head.next;
jl_iter != &joinlist_messages_head;
jl_iter = jl_iter->next) {
stored_msg = list_entry(jl_iter, struct joinlist_msg, list);
if (stored_msg->sender_nodeid == api->totem_nodeid_get()) {
continue ;
}
if (pi->nodeid == stored_msg->sender_nodeid &&
pi->pid == stored_msg->pid &&
mar_name_compare (&pi->group, &stored_msg->group_name) == 0) {
found = 1;
break ;
}
}
if (!found) {
do_proc_leave(&pi->group, pi->pid, pi->nodeid, CONFCHG_CPG_REASON_PROCDOWN);
}
}
}
static void joinlist_inform_clients (void)
{
struct joinlist_msg *stored_msg;
struct list_head *iter;
unsigned int i;
i = 0;
for (iter = joinlist_messages_head.next;
iter != &joinlist_messages_head;
iter = iter->next) {
stored_msg = list_entry(iter, struct joinlist_msg, list);
log_printf (LOG_DEBUG, "joinlist_messages[%u] group:%s, ip:%s, pid:%d",
i++, cpg_print_group_name(&stored_msg->group_name),
(char*)api->totem_ifaces_print(stored_msg->sender_nodeid),
stored_msg->pid);
/* Ignore our own messages */
if (stored_msg->sender_nodeid == api->totem_nodeid_get()) {
continue ;
}
do_proc_join (&stored_msg->group_name, stored_msg->pid, stored_msg->sender_nodeid,
CONFCHG_CPG_REASON_NODEUP);
}
joinlist_remove_zombie_pi_entries ();
}
static void downlist_messages_delete (void)
{
struct downlist_msg *stored_msg;
struct list_head *iter, *iter_next;
for (iter = downlist_messages_head.next;
iter != &downlist_messages_head;
iter = iter_next) {
iter_next = iter->next;
stored_msg = list_entry(iter, struct downlist_msg, list);
list_del (&stored_msg->list);
free (stored_msg);
}
}
static void joinlist_messages_delete (void)
{
struct joinlist_msg *stored_msg;
struct list_head *iter, *iter_next;
for (iter = joinlist_messages_head.next;
iter != &joinlist_messages_head;
iter = iter_next) {
iter_next = iter->next;
stored_msg = list_entry(iter, struct joinlist_msg, list);
list_del (&stored_msg->list);
free (stored_msg);
}
list_init (&joinlist_messages_head);
}
static char *cpg_exec_init_fn (struct corosync_api_v1 *corosync_api)
{
list_init (&downlist_messages_head);
list_init (&joinlist_messages_head);
api = corosync_api;
return (NULL);
}
static void cpg_iteration_instance_finalize (struct cpg_iteration_instance *cpg_iteration_instance)
{
struct list_head *iter, *iter_next;
struct process_info *pi;
for (iter = cpg_iteration_instance->items_list_head.next;
iter != &cpg_iteration_instance->items_list_head;
iter = iter_next) {
iter_next = iter->next;
pi = list_entry (iter, struct process_info, list);
list_del (&pi->list);
free (pi);
}
list_del (&cpg_iteration_instance->list);
hdb_handle_destroy (&cpg_iteration_handle_t_db, cpg_iteration_instance->handle);
}
static void cpg_pd_finalize (struct cpg_pd *cpd)
{
struct list_head *iter, *iter_next;
struct cpg_iteration_instance *cpii;
zcb_all_free(cpd);
for (iter = cpd->iteration_instance_list_head.next;
iter != &cpd->iteration_instance_list_head;
iter = iter_next) {
iter_next = iter->next;
cpii = list_entry (iter, struct cpg_iteration_instance, list);
cpg_iteration_instance_finalize (cpii);
}
list_del (&cpd->list);
}
static int cpg_lib_exit_fn (void *conn)
{
struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
log_printf(LOGSYS_LEVEL_DEBUG, "exit_fn for conn=%p", conn);
if (cpd->group_name.length > 0 && cpd->cpd_state != CPD_STATE_LEAVE_STARTED) {
cpg_node_joinleave_send (cpd->pid, &cpd->group_name,
MESSAGE_REQ_EXEC_CPG_PROCLEAVE, CONFCHG_CPG_REASON_PROCDOWN);
}
cpg_pd_finalize (cpd);
api->ipc_refcnt_dec (conn);
return (0);
}
static int cpg_node_joinleave_send (unsigned int pid, const mar_cpg_name_t *group_name, int fn, int reason)
{
struct req_exec_cpg_procjoin req_exec_cpg_procjoin;
struct iovec req_exec_cpg_iovec;
int result;
memcpy(&req_exec_cpg_procjoin.group_name, group_name, sizeof(mar_cpg_name_t));
req_exec_cpg_procjoin.pid = pid;
req_exec_cpg_procjoin.reason = reason;
req_exec_cpg_procjoin.header.size = sizeof(req_exec_cpg_procjoin);
req_exec_cpg_procjoin.header.id = SERVICE_ID_MAKE(CPG_SERVICE, fn);
req_exec_cpg_iovec.iov_base = (char *)&req_exec_cpg_procjoin;
req_exec_cpg_iovec.iov_len = sizeof(req_exec_cpg_procjoin);
result = api->totem_mcast (&req_exec_cpg_iovec, 1, TOTEM_AGREED);
return (result);
}
/* Can byteswap join & leave messages */
static void exec_cpg_procjoin_endian_convert (void *msg)
{
struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = msg;
req_exec_cpg_procjoin->pid = swab32(req_exec_cpg_procjoin->pid);
swab_mar_cpg_name_t (&req_exec_cpg_procjoin->group_name);
req_exec_cpg_procjoin->reason = swab32(req_exec_cpg_procjoin->reason);
}
static void exec_cpg_joinlist_endian_convert (void *msg_v)
{
char *msg = msg_v;
struct qb_ipc_response_header *res = (struct qb_ipc_response_header *)msg;
struct join_list_entry *jle = (struct join_list_entry *)(msg + sizeof(struct qb_ipc_response_header));
swab_mar_int32_t (&res->size);
while ((const char*)jle < msg + res->size) {
jle->pid = swab32(jle->pid);
swab_mar_cpg_name_t (&jle->group_name);
jle++;
}
}
static void exec_cpg_downlist_endian_convert_old (void *msg)
{
}
static void exec_cpg_downlist_endian_convert (void *msg)
{
struct req_exec_cpg_downlist *req_exec_cpg_downlist = msg;
unsigned int i;
req_exec_cpg_downlist->left_nodes = swab32(req_exec_cpg_downlist->left_nodes);
req_exec_cpg_downlist->old_members = swab32(req_exec_cpg_downlist->old_members);
for (i = 0; i < req_exec_cpg_downlist->left_nodes; i++) {
req_exec_cpg_downlist->nodeids[i] = swab32(req_exec_cpg_downlist->nodeids[i]);
}
}
static void exec_cpg_mcast_endian_convert (void *msg)
{
struct req_exec_cpg_mcast *req_exec_cpg_mcast = msg;
swab_coroipc_request_header_t (&req_exec_cpg_mcast->header);
swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name);
req_exec_cpg_mcast->pid = swab32(req_exec_cpg_mcast->pid);
req_exec_cpg_mcast->msglen = swab32(req_exec_cpg_mcast->msglen);
swab_mar_message_source_t (&req_exec_cpg_mcast->source);
}
static void exec_cpg_partial_mcast_endian_convert (void *msg)
{
struct req_exec_cpg_partial_mcast *req_exec_cpg_mcast = msg;
swab_coroipc_request_header_t (&req_exec_cpg_mcast->header);
swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name);
req_exec_cpg_mcast->pid = swab32(req_exec_cpg_mcast->pid);
req_exec_cpg_mcast->msglen = swab32(req_exec_cpg_mcast->msglen);
req_exec_cpg_mcast->fraglen = swab32(req_exec_cpg_mcast->fraglen);
req_exec_cpg_mcast->type = swab32(req_exec_cpg_mcast->type);
swab_mar_message_source_t (&req_exec_cpg_mcast->source);
}
static struct process_info *process_info_find(const mar_cpg_name_t *group_name, uint32_t pid, unsigned int nodeid) {
struct list_head *iter;
for (iter = process_info_list_head.next; iter != &process_info_list_head; ) {
struct process_info *pi = list_entry (iter, struct process_info, list);
iter = iter->next;
if (pi->pid == pid && pi->nodeid == nodeid &&
mar_name_compare (&pi->group, group_name) == 0) {
return pi;
}
}
return NULL;
}
static void do_proc_join(
const mar_cpg_name_t *name,
uint32_t pid,
unsigned int nodeid,
int reason)
{
struct process_info *pi;
struct process_info *pi_entry;
mar_cpg_address_t notify_info;
struct list_head *list;
struct list_head *list_to_add = NULL;
if (process_info_find (name, pid, nodeid) != NULL) {
return ;
}
pi = malloc (sizeof (struct process_info));
if (!pi) {
log_printf(LOGSYS_LEVEL_WARNING, "Unable to allocate process_info struct");
return;
}
pi->nodeid = nodeid;
pi->pid = pid;
memcpy(&pi->group, name, sizeof(*name));
list_init(&pi->list);
/*
* Insert new process in sorted order so synchronization works properly
*/
list_to_add = &process_info_list_head;
for (list = process_info_list_head.next; list != &process_info_list_head; list = list->next) {
pi_entry = list_entry(list, struct process_info, list);
if (pi_entry->nodeid > pi->nodeid ||
(pi_entry->nodeid == pi->nodeid && pi_entry->pid > pi->pid)) {
break;
}
list_to_add = list;
}
list_add (&pi->list, list_to_add);
notify_info.pid = pi->pid;
notify_info.nodeid = nodeid;
notify_info.reason = reason;
notify_lib_joinlist(&pi->group, NULL,
1, ¬ify_info,
0, NULL,
MESSAGE_RES_CPG_CONFCHG_CALLBACK);
}
static void do_proc_leave(
const mar_cpg_name_t *name,
uint32_t pid,
unsigned int nodeid,
int reason)
{
struct process_info *pi;
struct list_head *iter;
mar_cpg_address_t notify_info;
notify_info.pid = pid;
notify_info.nodeid = nodeid;
notify_info.reason = reason;
notify_lib_joinlist(name, NULL,
0, NULL,
1, ¬ify_info,
MESSAGE_RES_CPG_CONFCHG_CALLBACK);
for (iter = process_info_list_head.next; iter != &process_info_list_head; ) {
pi = list_entry(iter, struct process_info, list);
iter = iter->next;
if (pi->pid == pid && pi->nodeid == nodeid &&
mar_name_compare (&pi->group, name)==0) {
list_del (&pi->list);
free (pi);
}
}
}
static void message_handler_req_exec_cpg_downlist_old (
const void *message,
unsigned int nodeid)
{
log_printf (LOGSYS_LEVEL_WARNING, "downlist OLD from node 0x%x",
nodeid);
}
static void message_handler_req_exec_cpg_downlist(
const void *message,
unsigned int nodeid)
{
const struct req_exec_cpg_downlist *req_exec_cpg_downlist = message;
int i;
struct list_head *iter;
struct downlist_msg *stored_msg;
int found;
if (downlist_state != CPG_DOWNLIST_WAITING_FOR_MESSAGES) {
log_printf (LOGSYS_LEVEL_WARNING, "downlist left_list: %d received in state %d",
req_exec_cpg_downlist->left_nodes, downlist_state);
return;
}
stored_msg = malloc (sizeof (struct downlist_msg));
stored_msg->sender_nodeid = nodeid;
stored_msg->old_members = req_exec_cpg_downlist->old_members;
stored_msg->left_nodes = req_exec_cpg_downlist->left_nodes;
memcpy (stored_msg->nodeids, req_exec_cpg_downlist->nodeids,
req_exec_cpg_downlist->left_nodes * sizeof (mar_uint32_t));
list_init (&stored_msg->list);
list_add (&stored_msg->list, &downlist_messages_head);
for (i = 0; i < my_member_list_entries; i++) {
found = 0;
for (iter = downlist_messages_head.next;
iter != &downlist_messages_head;
iter = iter->next) {
stored_msg = list_entry(iter, struct downlist_msg, list);
if (my_member_list[i] == stored_msg->sender_nodeid) {
found = 1;
}
}
if (!found) {
return;
}
}
downlist_master_choose_and_send ();
}
static void message_handler_req_exec_cpg_procjoin (
const void *message,
unsigned int nodeid)
{
const struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = message;
log_printf(LOGSYS_LEVEL_DEBUG, "got procjoin message from cluster node 0x%x (%s) for pid %u",
nodeid,
api->totem_ifaces_print(nodeid),
(unsigned int)req_exec_cpg_procjoin->pid);
do_proc_join (&req_exec_cpg_procjoin->group_name,
req_exec_cpg_procjoin->pid, nodeid,
CONFCHG_CPG_REASON_JOIN);
}
static void message_handler_req_exec_cpg_procleave (
const void *message,
unsigned int nodeid)
{
const struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = message;
log_printf(LOGSYS_LEVEL_DEBUG, "got procleave message from cluster node 0x%x (%s) for pid %u",
nodeid,
api->totem_ifaces_print(nodeid),
(unsigned int)req_exec_cpg_procjoin->pid);
do_proc_leave (&req_exec_cpg_procjoin->group_name,
req_exec_cpg_procjoin->pid, nodeid,
req_exec_cpg_procjoin->reason);
}
/* Got a proclist from another node */
static void message_handler_req_exec_cpg_joinlist (
const void *message_v,
unsigned int nodeid)
{
const char *message = message_v;
const struct qb_ipc_response_header *res = (const struct qb_ipc_response_header *)message;
const struct join_list_entry *jle = (const struct join_list_entry *)(message + sizeof(struct qb_ipc_response_header));
struct joinlist_msg *stored_msg;
log_printf(LOGSYS_LEVEL_DEBUG, "got joinlist message from node 0x%x",
nodeid);
while ((const char*)jle < message + res->size) {
stored_msg = malloc (sizeof (struct joinlist_msg));
memset(stored_msg, 0, sizeof (struct joinlist_msg));
stored_msg->sender_nodeid = nodeid;
stored_msg->pid = jle->pid;
memcpy(&stored_msg->group_name, &jle->group_name, sizeof(mar_cpg_name_t));
list_init (&stored_msg->list);
list_add (&stored_msg->list, &joinlist_messages_head);
jle++;
}
}
static void message_handler_req_exec_cpg_mcast (
const void *message,
unsigned int nodeid)
{
const struct req_exec_cpg_mcast *req_exec_cpg_mcast = message;
struct res_lib_cpg_deliver_callback res_lib_cpg_mcast;
int msglen = req_exec_cpg_mcast->msglen;
struct list_head *iter, *pi_iter;
struct cpg_pd *cpd;
struct iovec iovec[2];
int known_node = 0;
res_lib_cpg_mcast.header.id = MESSAGE_RES_CPG_DELIVER_CALLBACK;
res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast) + msglen;
res_lib_cpg_mcast.msglen = msglen;
res_lib_cpg_mcast.pid = req_exec_cpg_mcast->pid;
res_lib_cpg_mcast.nodeid = nodeid;
memcpy(&res_lib_cpg_mcast.group_name, &req_exec_cpg_mcast->group_name,
sizeof(mar_cpg_name_t));
iovec[0].iov_base = (void *)&res_lib_cpg_mcast;
iovec[0].iov_len = sizeof (res_lib_cpg_mcast);
iovec[1].iov_base = (char*)message+sizeof(*req_exec_cpg_mcast);
iovec[1].iov_len = msglen;
for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; ) {
cpd = list_entry(iter, struct cpg_pd, list);
iter = iter->next;
if ((cpd->cpd_state == CPD_STATE_LEAVE_STARTED || cpd->cpd_state == CPD_STATE_JOIN_COMPLETED)
&& (mar_name_compare (&cpd->group_name, &req_exec_cpg_mcast->group_name) == 0)) {
if (!known_node) {
/* Try to find, if we know the node */
for (pi_iter = process_info_list_head.next;
pi_iter != &process_info_list_head; pi_iter = pi_iter->next) {
struct process_info *pi = list_entry (pi_iter, struct process_info, list);
if (pi->nodeid == nodeid &&
mar_name_compare (&pi->group, &req_exec_cpg_mcast->group_name) == 0) {
known_node = 1;
break;
}
}
}
if (!known_node) {
log_printf(LOGSYS_LEVEL_WARNING, "Unknown node -> we will not deliver message");
return ;
}
api->ipc_dispatch_iov_send (cpd->conn, iovec, 2);
}
}
}
static void message_handler_req_exec_cpg_partial_mcast (
const void *message,
unsigned int nodeid)
{
const struct req_exec_cpg_partial_mcast *req_exec_cpg_mcast = message;
struct res_lib_cpg_partial_deliver_callback res_lib_cpg_mcast;
int msglen = req_exec_cpg_mcast->fraglen;
struct list_head *iter, *pi_iter;
struct cpg_pd *cpd;
struct iovec iovec[2];
int known_node = 0;
log_printf(LOGSYS_LEVEL_DEBUG, "Got fragmented message from node %d, size = %d bytes\n", nodeid, msglen);
res_lib_cpg_mcast.header.id = MESSAGE_RES_CPG_PARTIAL_DELIVER_CALLBACK;
res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast) + msglen;
res_lib_cpg_mcast.fraglen = msglen;
res_lib_cpg_mcast.msglen = req_exec_cpg_mcast->msglen;
res_lib_cpg_mcast.pid = req_exec_cpg_mcast->pid;
res_lib_cpg_mcast.type = req_exec_cpg_mcast->type;
res_lib_cpg_mcast.nodeid = nodeid;
memcpy(&res_lib_cpg_mcast.group_name, &req_exec_cpg_mcast->group_name,
sizeof(mar_cpg_name_t));
iovec[0].iov_base = (void *)&res_lib_cpg_mcast;
iovec[0].iov_len = sizeof (res_lib_cpg_mcast);
iovec[1].iov_base = (char*)message+sizeof(*req_exec_cpg_mcast);
iovec[1].iov_len = msglen;
for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; ) {
cpd = list_entry(iter, struct cpg_pd, list);
iter = iter->next;
if ((cpd->cpd_state == CPD_STATE_LEAVE_STARTED || cpd->cpd_state == CPD_STATE_JOIN_COMPLETED)
&& (mar_name_compare (&cpd->group_name, &req_exec_cpg_mcast->group_name) == 0)) {
if (!known_node) {
/* Try to find, if we know the node */
for (pi_iter = process_info_list_head.next;
pi_iter != &process_info_list_head; pi_iter = pi_iter->next) {
struct process_info *pi = list_entry (pi_iter, struct process_info, list);
if (pi->nodeid == nodeid &&
mar_name_compare (&pi->group, &req_exec_cpg_mcast->group_name) == 0) {
known_node = 1;
break;
}
}
}
if (!known_node) {
log_printf(LOGSYS_LEVEL_WARNING, "Unknown node -> we will not deliver message");
return ;
}
api->ipc_dispatch_iov_send (cpd->conn, iovec, 2);
}
}
}
static int cpg_exec_send_downlist(void)
{
struct iovec iov;
g_req_exec_cpg_downlist.header.id = SERVICE_ID_MAKE(CPG_SERVICE, MESSAGE_REQ_EXEC_CPG_DOWNLIST);
g_req_exec_cpg_downlist.header.size = sizeof(struct req_exec_cpg_downlist);
g_req_exec_cpg_downlist.old_members = my_old_member_list_entries;
iov.iov_base = (void *)&g_req_exec_cpg_downlist;
iov.iov_len = g_req_exec_cpg_downlist.header.size;
return (api->totem_mcast (&iov, 1, TOTEM_AGREED));
}
static int cpg_exec_send_joinlist(void)
{
int count = 0;
struct list_head *iter;
struct qb_ipc_response_header *res;
char *buf;
struct join_list_entry *jle;
struct iovec req_exec_cpg_iovec;
for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
struct process_info *pi = list_entry (iter, struct process_info, list);
if (pi->nodeid == api->totem_nodeid_get ()) {
count++;
}
}
/* Nothing to send */
if (!count)
return 0;
buf = alloca(sizeof(struct qb_ipc_response_header) + sizeof(struct join_list_entry) * count);
if (!buf) {
log_printf(LOGSYS_LEVEL_WARNING, "Unable to allocate joinlist buffer");
return -1;
}
jle = (struct join_list_entry *)(buf + sizeof(struct qb_ipc_response_header));
res = (struct qb_ipc_response_header *)buf;
for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
struct process_info *pi = list_entry (iter, struct process_info, list);
if (pi->nodeid == api->totem_nodeid_get ()) {
memcpy (&jle->group_name, &pi->group, sizeof (mar_cpg_name_t));
jle->pid = pi->pid;
jle++;
}
}
res->id = SERVICE_ID_MAKE(CPG_SERVICE, MESSAGE_REQ_EXEC_CPG_JOINLIST);
res->size = sizeof(struct qb_ipc_response_header)+sizeof(struct join_list_entry) * count;
req_exec_cpg_iovec.iov_base = buf;
req_exec_cpg_iovec.iov_len = res->size;
return (api->totem_mcast (&req_exec_cpg_iovec, 1, TOTEM_AGREED));
}
static int cpg_lib_init_fn (void *conn)
{
struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
memset (cpd, 0, sizeof(struct cpg_pd));
cpd->conn = conn;
list_add (&cpd->list, &cpg_pd_list_head);
list_init (&cpd->iteration_instance_list_head);
list_init (&cpd->zcb_mapped_list_head);
api->ipc_refcnt_inc (conn);
log_printf(LOGSYS_LEVEL_DEBUG, "lib_init_fn: conn=%p, cpd=%p", conn, cpd);
return (0);
}
/* Join message from the library */
static void message_handler_req_lib_cpg_join (void *conn, const void *message)
{
const struct req_lib_cpg_join *req_lib_cpg_join = message;
struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
struct res_lib_cpg_join res_lib_cpg_join;
cs_error_t error = CS_OK;
struct list_head *iter;
/* Test, if we don't have same pid and group name joined */
for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; iter = iter->next) {
struct cpg_pd *cpd_item = list_entry (iter, struct cpg_pd, list);
if (cpd_item->pid == req_lib_cpg_join->pid &&
mar_name_compare(&req_lib_cpg_join->group_name, &cpd_item->group_name) == 0) {
/* We have same pid and group name joined -> return error */
error = CS_ERR_EXIST;
goto response_send;
}
}
/*
* Same check must be done in process info list, because there may be not yet delivered
* leave of client.
*/
for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
struct process_info *pi = list_entry (iter, struct process_info, list);
if (pi->nodeid == api->totem_nodeid_get () && pi->pid == req_lib_cpg_join->pid &&
mar_name_compare(&req_lib_cpg_join->group_name, &pi->group) == 0) {
/* We have same pid and group name joined -> return error */
error = CS_ERR_TRY_AGAIN;
goto response_send;
}
}
if (req_lib_cpg_join->group_name.length > CPG_MAX_NAME_LENGTH) {
error = CS_ERR_NAME_TOO_LONG;
goto response_send;
}
switch (cpd->cpd_state) {
case CPD_STATE_UNJOINED:
error = CS_OK;
cpd->cpd_state = CPD_STATE_JOIN_STARTED;
cpd->pid = req_lib_cpg_join->pid;
cpd->flags = req_lib_cpg_join->flags;
memcpy (&cpd->group_name, &req_lib_cpg_join->group_name,
sizeof (cpd->group_name));
cpg_node_joinleave_send (req_lib_cpg_join->pid,
&req_lib_cpg_join->group_name,
MESSAGE_REQ_EXEC_CPG_PROCJOIN, CONFCHG_CPG_REASON_JOIN);
break;
case CPD_STATE_LEAVE_STARTED:
error = CS_ERR_BUSY;
break;
case CPD_STATE_JOIN_STARTED:
error = CS_ERR_EXIST;
break;
case CPD_STATE_JOIN_COMPLETED:
error = CS_ERR_EXIST;
break;
}
response_send:
res_lib_cpg_join.header.size = sizeof(res_lib_cpg_join);
res_lib_cpg_join.header.id = MESSAGE_RES_CPG_JOIN;
res_lib_cpg_join.header.error = error;
api->ipc_response_send (conn, &res_lib_cpg_join, sizeof(res_lib_cpg_join));
}
/* Leave message from the library */
static void message_handler_req_lib_cpg_leave (void *conn, const void *message)
{
struct res_lib_cpg_leave res_lib_cpg_leave;
cs_error_t error = CS_OK;
struct req_lib_cpg_leave *req_lib_cpg_leave = (struct req_lib_cpg_leave *)message;
struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
log_printf(LOGSYS_LEVEL_DEBUG, "got leave request on %p", conn);
switch (cpd->cpd_state) {
case CPD_STATE_UNJOINED:
error = CS_ERR_NOT_EXIST;
break;
case CPD_STATE_LEAVE_STARTED:
error = CS_ERR_NOT_EXIST;
break;
case CPD_STATE_JOIN_STARTED:
error = CS_ERR_BUSY;
break;
case CPD_STATE_JOIN_COMPLETED:
error = CS_OK;
cpd->cpd_state = CPD_STATE_LEAVE_STARTED;
cpg_node_joinleave_send (req_lib_cpg_leave->pid,
&req_lib_cpg_leave->group_name,
MESSAGE_REQ_EXEC_CPG_PROCLEAVE,
CONFCHG_CPG_REASON_LEAVE);
break;
}
/* send return */
res_lib_cpg_leave.header.size = sizeof(res_lib_cpg_leave);
res_lib_cpg_leave.header.id = MESSAGE_RES_CPG_LEAVE;
res_lib_cpg_leave.header.error = error;
api->ipc_response_send(conn, &res_lib_cpg_leave, sizeof(res_lib_cpg_leave));
}
/* Finalize message from library */
static void message_handler_req_lib_cpg_finalize (
void *conn,
const void *message)
{
struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
struct res_lib_cpg_finalize res_lib_cpg_finalize;
cs_error_t error = CS_OK;
log_printf (LOGSYS_LEVEL_DEBUG, "cpg finalize for conn=%p", conn);
/*
* We will just remove cpd from list. After this call, connection will be
* closed on lib side, and cpg_lib_exit_fn will be called
*/
list_del (&cpd->list);
list_init (&cpd->list);
res_lib_cpg_finalize.header.size = sizeof (res_lib_cpg_finalize);
res_lib_cpg_finalize.header.id = MESSAGE_RES_CPG_FINALIZE;
res_lib_cpg_finalize.header.error = error;
api->ipc_response_send (conn, &res_lib_cpg_finalize,
sizeof (res_lib_cpg_finalize));
}
static int
memory_map (
const char *path,
size_t bytes,
void **buf)
{
int32_t fd;
void *addr;
int32_t res;
fd = open (path, O_RDWR, 0600);
unlink (path);
if (fd == -1) {
return (-1);
}
res = ftruncate (fd, bytes);
if (res == -1) {
goto error_close_unlink;
}
addr = mmap (NULL, bytes, PROT_READ | PROT_WRITE,
MAP_SHARED, fd, 0);
if (addr == MAP_FAILED) {
goto error_close_unlink;
}
#ifdef MADV_NOSYNC
madvise(addr, bytes, MADV_NOSYNC);
#endif
res = close (fd);
if (res) {
munmap (addr, bytes);
return (-1);
}
*buf = addr;
return (0);
error_close_unlink:
close (fd);
unlink(path);
return -1;
}
static inline int zcb_alloc (
struct cpg_pd *cpd,
const char *path_to_file,
size_t size,
void **addr)
{
struct zcb_mapped *zcb_mapped;
unsigned int res;
zcb_mapped = malloc (sizeof (struct zcb_mapped));
if (zcb_mapped == NULL) {
return (-1);
}
res = memory_map (
path_to_file,
size,
addr);
if (res == -1) {
free (zcb_mapped);
return (-1);
}
list_init (&zcb_mapped->list);
zcb_mapped->addr = *addr;
zcb_mapped->size = size;
list_add_tail (&zcb_mapped->list, &cpd->zcb_mapped_list_head);
return (0);
}
static inline int zcb_free (struct zcb_mapped *zcb_mapped)
{
unsigned int res;
res = munmap (zcb_mapped->addr, zcb_mapped->size);
list_del (&zcb_mapped->list);
free (zcb_mapped);
return (res);
}
static inline int zcb_by_addr_free (struct cpg_pd *cpd, void *addr)
{
struct list_head *list;
struct zcb_mapped *zcb_mapped;
unsigned int res = 0;
for (list = cpd->zcb_mapped_list_head.next;
list != &cpd->zcb_mapped_list_head; list = list->next) {
zcb_mapped = list_entry (list, struct zcb_mapped, list);
if (zcb_mapped->addr == addr) {
res = zcb_free (zcb_mapped);
break;
}
}
return (res);
}
static inline int zcb_all_free (
struct cpg_pd *cpd)
{
struct list_head *list;
struct zcb_mapped *zcb_mapped;
for (list = cpd->zcb_mapped_list_head.next;
list != &cpd->zcb_mapped_list_head;) {
zcb_mapped = list_entry (list, struct zcb_mapped, list);
list = list->next;
zcb_free (zcb_mapped);
}
return (0);
}
union u {
uint64_t server_addr;
void *server_ptr;
};
static uint64_t void2serveraddr (void *server_ptr)
{
union u u;
u.server_ptr = server_ptr;
return (u.server_addr);
}
static void *serveraddr2void (uint64_t server_addr)
{
union u u;
u.server_addr = server_addr;
return (u.server_ptr);
};
static void message_handler_req_lib_cpg_zc_alloc (
void *conn,
const void *message)
{
mar_req_coroipcc_zc_alloc_t *hdr = (mar_req_coroipcc_zc_alloc_t *)message;
struct qb_ipc_response_header res_header;
void *addr = NULL;
struct coroipcs_zc_header *zc_header;
unsigned int res;
struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
log_printf(LOGSYS_LEVEL_DEBUG, "path: %s", hdr->path_to_file);
res = zcb_alloc (cpd, hdr->path_to_file, hdr->map_size,
&addr);
assert(res == 0);
zc_header = (struct coroipcs_zc_header *)addr;
zc_header->server_address = void2serveraddr(addr);
res_header.size = sizeof (struct qb_ipc_response_header);
res_header.id = 0;
api->ipc_response_send (conn,
&res_header,
res_header.size);
}
static void message_handler_req_lib_cpg_zc_free (
void *conn,
const void *message)
{
mar_req_coroipcc_zc_free_t *hdr = (mar_req_coroipcc_zc_free_t *)message;
struct qb_ipc_response_header res_header;
void *addr = NULL;
struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
log_printf(LOGSYS_LEVEL_DEBUG, " free'ing");
addr = serveraddr2void (hdr->server_address);
zcb_by_addr_free (cpd, addr);
res_header.size = sizeof (struct qb_ipc_response_header);
res_header.id = 0;
api->ipc_response_send (
conn, &res_header,
res_header.size);
}
/* Fragmented mcast message from the library */
static void message_handler_req_lib_cpg_partial_mcast (void *conn, const void *message)
{
const struct req_lib_cpg_partial_mcast *req_lib_cpg_mcast = message;
struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
mar_cpg_name_t group_name = cpd->group_name;
struct iovec req_exec_cpg_iovec[2];
struct req_exec_cpg_partial_mcast req_exec_cpg_mcast;
struct res_lib_cpg_partial_send res_lib_cpg_partial_send;
int msglen = req_lib_cpg_mcast->fraglen;
int result;
cs_error_t error = CS_ERR_NOT_EXIST;
log_printf(LOGSYS_LEVEL_TRACE, "got fragmented mcast request on %p", conn);
log_printf(LOGSYS_LEVEL_DEBUG, "Sending fragmented message size = %d bytes\n", msglen);
switch (cpd->cpd_state) {
case CPD_STATE_UNJOINED:
error = CS_ERR_NOT_EXIST;
break;
case CPD_STATE_LEAVE_STARTED:
error = CS_ERR_NOT_EXIST;
break;
case CPD_STATE_JOIN_STARTED:
error = CS_OK;
break;
case CPD_STATE_JOIN_COMPLETED:
error = CS_OK;
break;
}
res_lib_cpg_partial_send.header.size = sizeof(res_lib_cpg_partial_send);
res_lib_cpg_partial_send.header.id = MESSAGE_RES_CPG_PARTIAL_SEND;
if (req_lib_cpg_mcast->type == LIBCPG_PARTIAL_FIRST) {
cpd->initial_transition_counter = cpd->transition_counter;
}
if (cpd->transition_counter != cpd->initial_transition_counter) {
error = CS_ERR_INTERRUPT;
}
if (error == CS_OK) {
req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + msglen;
req_exec_cpg_mcast.header.id = SERVICE_ID_MAKE(CPG_SERVICE,
MESSAGE_REQ_EXEC_CPG_PARTIAL_MCAST);
req_exec_cpg_mcast.pid = cpd->pid;
req_exec_cpg_mcast.msglen = req_lib_cpg_mcast->msglen;
req_exec_cpg_mcast.type = req_lib_cpg_mcast->type;
req_exec_cpg_mcast.fraglen = req_lib_cpg_mcast->fraglen;
api->ipc_source_set (&req_exec_cpg_mcast.source, conn);
memcpy(&req_exec_cpg_mcast.group_name, &group_name,
sizeof(mar_cpg_name_t));
req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast;
req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast);
req_exec_cpg_iovec[1].iov_base = (char *)&req_lib_cpg_mcast->message;
req_exec_cpg_iovec[1].iov_len = msglen;
result = api->totem_mcast (req_exec_cpg_iovec, 2, TOTEM_AGREED);
assert(result == 0);
} else {
log_printf(LOGSYS_LEVEL_ERROR, "*** %p can't mcast to group %s state:%d, error:%d",
conn, group_name.value, cpd->cpd_state, error);
}
res_lib_cpg_partial_send.header.error = error;
api->ipc_response_send (conn, &res_lib_cpg_partial_send,
sizeof (res_lib_cpg_partial_send));
}
/* Mcast message from the library */
static void message_handler_req_lib_cpg_mcast (void *conn, const void *message)
{
const struct req_lib_cpg_mcast *req_lib_cpg_mcast = message;
struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
mar_cpg_name_t group_name = cpd->group_name;
struct iovec req_exec_cpg_iovec[2];
struct req_exec_cpg_mcast req_exec_cpg_mcast;
int msglen = req_lib_cpg_mcast->msglen;
int result;
cs_error_t error = CS_ERR_NOT_EXIST;
log_printf(LOGSYS_LEVEL_TRACE, "got mcast request on %p", conn);
switch (cpd->cpd_state) {
case CPD_STATE_UNJOINED:
error = CS_ERR_NOT_EXIST;
break;
case CPD_STATE_LEAVE_STARTED:
error = CS_ERR_NOT_EXIST;
break;
case CPD_STATE_JOIN_STARTED:
error = CS_OK;
break;
case CPD_STATE_JOIN_COMPLETED:
error = CS_OK;
break;
}
if (error == CS_OK) {
req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + msglen;
req_exec_cpg_mcast.header.id = SERVICE_ID_MAKE(CPG_SERVICE,
MESSAGE_REQ_EXEC_CPG_MCAST);
req_exec_cpg_mcast.pid = cpd->pid;
req_exec_cpg_mcast.msglen = msglen;
api->ipc_source_set (&req_exec_cpg_mcast.source, conn);
memcpy(&req_exec_cpg_mcast.group_name, &group_name,
sizeof(mar_cpg_name_t));
req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast;
req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast);
req_exec_cpg_iovec[1].iov_base = (char *)&req_lib_cpg_mcast->message;
req_exec_cpg_iovec[1].iov_len = msglen;
result = api->totem_mcast (req_exec_cpg_iovec, 2, TOTEM_AGREED);
assert(result == 0);
} else {
log_printf(LOGSYS_LEVEL_ERROR, "*** %p can't mcast to group %s state:%d, error:%d",
conn, group_name.value, cpd->cpd_state, error);
}
}
static void message_handler_req_lib_cpg_zc_execute (
void *conn,
const void *message)
{
mar_req_coroipcc_zc_execute_t *hdr = (mar_req_coroipcc_zc_execute_t *)message;
struct qb_ipc_request_header *header;
struct res_lib_cpg_mcast res_lib_cpg_mcast;
struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
struct iovec req_exec_cpg_iovec[2];
struct req_exec_cpg_mcast req_exec_cpg_mcast;
struct req_lib_cpg_mcast *req_lib_cpg_mcast;
int result;
cs_error_t error = CS_ERR_NOT_EXIST;
log_printf(LOGSYS_LEVEL_TRACE, "got ZC mcast request on %p", conn);
header = (struct qb_ipc_request_header *)(((char *)serveraddr2void(hdr->server_address) + sizeof (struct coroipcs_zc_header)));
req_lib_cpg_mcast = (struct req_lib_cpg_mcast *)header;
switch (cpd->cpd_state) {
case CPD_STATE_UNJOINED:
error = CS_ERR_NOT_EXIST;
break;
case CPD_STATE_LEAVE_STARTED:
error = CS_ERR_NOT_EXIST;
break;
case CPD_STATE_JOIN_STARTED:
error = CS_OK;
break;
case CPD_STATE_JOIN_COMPLETED:
error = CS_OK;
break;
}
res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast);
res_lib_cpg_mcast.header.id = MESSAGE_RES_CPG_MCAST;
if (error == CS_OK) {
req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + req_lib_cpg_mcast->msglen;
req_exec_cpg_mcast.header.id = SERVICE_ID_MAKE(CPG_SERVICE,
MESSAGE_REQ_EXEC_CPG_MCAST);
req_exec_cpg_mcast.pid = cpd->pid;
req_exec_cpg_mcast.msglen = req_lib_cpg_mcast->msglen;
api->ipc_source_set (&req_exec_cpg_mcast.source, conn);
memcpy(&req_exec_cpg_mcast.group_name, &cpd->group_name,
sizeof(mar_cpg_name_t));
req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast;
req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast);
req_exec_cpg_iovec[1].iov_base = (char *)header + sizeof(struct req_lib_cpg_mcast);
req_exec_cpg_iovec[1].iov_len = req_exec_cpg_mcast.msglen;
result = api->totem_mcast (req_exec_cpg_iovec, 2, TOTEM_AGREED);
if (result == 0) {
res_lib_cpg_mcast.header.error = CS_OK;
} else {
res_lib_cpg_mcast.header.error = CS_ERR_TRY_AGAIN;
}
} else {
res_lib_cpg_mcast.header.error = error;
}
api->ipc_response_send (conn, &res_lib_cpg_mcast,
sizeof (res_lib_cpg_mcast));
}
static void message_handler_req_lib_cpg_membership (void *conn,
const void *message)
{
struct req_lib_cpg_membership_get *req_lib_cpg_membership_get =
(struct req_lib_cpg_membership_get *)message;
struct res_lib_cpg_membership_get res_lib_cpg_membership_get;
struct list_head *iter;
int member_count = 0;
res_lib_cpg_membership_get.header.id = MESSAGE_RES_CPG_MEMBERSHIP;
res_lib_cpg_membership_get.header.error = CS_OK;
res_lib_cpg_membership_get.header.size =
sizeof (struct res_lib_cpg_membership_get);
for (iter = process_info_list_head.next;
iter != &process_info_list_head; iter = iter->next) {
struct process_info *pi = list_entry (iter, struct process_info, list);
if (mar_name_compare (&pi->group, &req_lib_cpg_membership_get->group_name) == 0) {
res_lib_cpg_membership_get.member_list[member_count].nodeid = pi->nodeid;
res_lib_cpg_membership_get.member_list[member_count].pid = pi->pid;
member_count += 1;
}
}
res_lib_cpg_membership_get.member_count = member_count;
api->ipc_response_send (conn, &res_lib_cpg_membership_get,
sizeof (res_lib_cpg_membership_get));
}
static void message_handler_req_lib_cpg_local_get (void *conn,
const void *message)
{
struct res_lib_cpg_local_get res_lib_cpg_local_get;
res_lib_cpg_local_get.header.size = sizeof (res_lib_cpg_local_get);
res_lib_cpg_local_get.header.id = MESSAGE_RES_CPG_LOCAL_GET;
res_lib_cpg_local_get.header.error = CS_OK;
res_lib_cpg_local_get.local_nodeid = api->totem_nodeid_get ();
api->ipc_response_send (conn, &res_lib_cpg_local_get,
sizeof (res_lib_cpg_local_get));
}
static void message_handler_req_lib_cpg_iteration_initialize (
void *conn,
const void *message)
{
const struct req_lib_cpg_iterationinitialize *req_lib_cpg_iterationinitialize = message;
struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
hdb_handle_t cpg_iteration_handle = 0;
struct res_lib_cpg_iterationinitialize res_lib_cpg_iterationinitialize;
struct list_head *iter, *iter2;
struct cpg_iteration_instance *cpg_iteration_instance;
cs_error_t error = CS_OK;
int res;
log_printf (LOGSYS_LEVEL_DEBUG, "cpg iteration initialize");
/* Because between calling this function and *next can be some operations which will
* change list, we must do full copy.
*/
/*
* Create new iteration instance
*/
res = hdb_handle_create (&cpg_iteration_handle_t_db, sizeof (struct cpg_iteration_instance),
&cpg_iteration_handle);
if (res != 0) {
error = CS_ERR_NO_MEMORY;
goto response_send;
}
res = hdb_handle_get (&cpg_iteration_handle_t_db, cpg_iteration_handle, (void *)&cpg_iteration_instance);
if (res != 0) {
error = CS_ERR_BAD_HANDLE;
goto error_destroy;
}
list_init (&cpg_iteration_instance->items_list_head);
cpg_iteration_instance->handle = cpg_iteration_handle;
/*
* Create copy of process_info list "grouped by" group name
*/
for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
struct process_info *pi = list_entry (iter, struct process_info, list);
struct process_info *new_pi;
if (req_lib_cpg_iterationinitialize->iteration_type == CPG_ITERATION_NAME_ONLY) {
/*
* Try to find processed group name in our list new list
*/
int found = 0;
for (iter2 = cpg_iteration_instance->items_list_head.next;
iter2 != &cpg_iteration_instance->items_list_head;
iter2 = iter2->next) {
struct process_info *pi2 = list_entry (iter2, struct process_info, list);
if (mar_name_compare (&pi2->group, &pi->group) == 0) {
found = 1;
break;
}
}
if (found) {
/*
* We have this name in list -> don't add
*/
continue ;
}
} else if (req_lib_cpg_iterationinitialize->iteration_type == CPG_ITERATION_ONE_GROUP) {
/*
* Test pi group name with request
*/
if (mar_name_compare (&pi->group, &req_lib_cpg_iterationinitialize->group_name) != 0)
/*
* Not same -> don't add
*/
continue ;
}
new_pi = malloc (sizeof (struct process_info));
if (!new_pi) {
log_printf(LOGSYS_LEVEL_WARNING, "Unable to allocate process_info struct");
error = CS_ERR_NO_MEMORY;
goto error_put_destroy;
}
memcpy (new_pi, pi, sizeof (struct process_info));
list_init (&new_pi->list);
if (req_lib_cpg_iterationinitialize->iteration_type == CPG_ITERATION_NAME_ONLY) {
/*
* pid and nodeid -> undefined
*/
new_pi->pid = new_pi->nodeid = 0;
}
/*
* We will return list "grouped" by "group name", so try to find right place to add
*/
for (iter2 = cpg_iteration_instance->items_list_head.next;
iter2 != &cpg_iteration_instance->items_list_head;
iter2 = iter2->next) {
struct process_info *pi2 = list_entry (iter2, struct process_info, list);
if (mar_name_compare (&pi2->group, &pi->group) == 0) {
break;
}
}
list_add (&new_pi->list, iter2);
}
/*
* Now we have a full "grouped by" copy of process_info list
*/
/*
* Add instance to current cpd list
*/
list_init (&cpg_iteration_instance->list);
list_add (&cpg_iteration_instance->list, &cpd->iteration_instance_list_head);
cpg_iteration_instance->current_pointer = &cpg_iteration_instance->items_list_head;
error_put_destroy:
hdb_handle_put (&cpg_iteration_handle_t_db, cpg_iteration_handle);
error_destroy:
if (error != CS_OK) {
hdb_handle_destroy (&cpg_iteration_handle_t_db, cpg_iteration_handle);
}
response_send:
res_lib_cpg_iterationinitialize.header.size = sizeof (res_lib_cpg_iterationinitialize);
res_lib_cpg_iterationinitialize.header.id = MESSAGE_RES_CPG_ITERATIONINITIALIZE;
res_lib_cpg_iterationinitialize.header.error = error;
res_lib_cpg_iterationinitialize.iteration_handle = cpg_iteration_handle;
api->ipc_response_send (conn, &res_lib_cpg_iterationinitialize,
sizeof (res_lib_cpg_iterationinitialize));
}
static void message_handler_req_lib_cpg_iteration_next (
void *conn,
const void *message)
{
const struct req_lib_cpg_iterationnext *req_lib_cpg_iterationnext = message;
struct res_lib_cpg_iterationnext res_lib_cpg_iterationnext;
struct cpg_iteration_instance *cpg_iteration_instance;
cs_error_t error = CS_OK;
int res;
struct process_info *pi;
log_printf (LOGSYS_LEVEL_DEBUG, "cpg iteration next");
res = hdb_handle_get (&cpg_iteration_handle_t_db,
req_lib_cpg_iterationnext->iteration_handle,
(void *)&cpg_iteration_instance);
if (res != 0) {
error = CS_ERR_LIBRARY;
goto error_exit;
}
assert (cpg_iteration_instance);
cpg_iteration_instance->current_pointer = cpg_iteration_instance->current_pointer->next;
if (cpg_iteration_instance->current_pointer == &cpg_iteration_instance->items_list_head) {
error = CS_ERR_NO_SECTIONS;
goto error_put;
}
pi = list_entry (cpg_iteration_instance->current_pointer, struct process_info, list);
/*
* Copy iteration data
*/
res_lib_cpg_iterationnext.description.nodeid = pi->nodeid;
res_lib_cpg_iterationnext.description.pid = pi->pid;
memcpy (&res_lib_cpg_iterationnext.description.group,
&pi->group,
sizeof (mar_cpg_name_t));
error_put:
hdb_handle_put (&cpg_iteration_handle_t_db, req_lib_cpg_iterationnext->iteration_handle);
error_exit:
res_lib_cpg_iterationnext.header.size = sizeof (res_lib_cpg_iterationnext);
res_lib_cpg_iterationnext.header.id = MESSAGE_RES_CPG_ITERATIONNEXT;
res_lib_cpg_iterationnext.header.error = error;
api->ipc_response_send (conn, &res_lib_cpg_iterationnext,
sizeof (res_lib_cpg_iterationnext));
}
static void message_handler_req_lib_cpg_iteration_finalize (
void *conn,
const void *message)
{
const struct req_lib_cpg_iterationfinalize *req_lib_cpg_iterationfinalize = message;
struct res_lib_cpg_iterationfinalize res_lib_cpg_iterationfinalize;
struct cpg_iteration_instance *cpg_iteration_instance;
cs_error_t error = CS_OK;
int res;
log_printf (LOGSYS_LEVEL_DEBUG, "cpg iteration finalize");
res = hdb_handle_get (&cpg_iteration_handle_t_db,
req_lib_cpg_iterationfinalize->iteration_handle,
(void *)&cpg_iteration_instance);
if (res != 0) {
error = CS_ERR_LIBRARY;
goto error_exit;
}
assert (cpg_iteration_instance);
cpg_iteration_instance_finalize (cpg_iteration_instance);
hdb_handle_put (&cpg_iteration_handle_t_db, cpg_iteration_instance->handle);
error_exit:
res_lib_cpg_iterationfinalize.header.size = sizeof (res_lib_cpg_iterationfinalize);
res_lib_cpg_iterationfinalize.header.id = MESSAGE_RES_CPG_ITERATIONFINALIZE;
res_lib_cpg_iterationfinalize.header.error = error;
api->ipc_response_send (conn, &res_lib_cpg_iterationfinalize,
sizeof (res_lib_cpg_iterationfinalize));
}
diff --git a/exec/quorum.c b/exec/quorum.c
index d4837a2c..323a15f8 100644
--- a/exec/quorum.c
+++ b/exec/quorum.c
@@ -1,112 +1,110 @@
/*
* Copyright (c) 2008-2012 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Christine Caulfield (ccaulfie@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <config.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/ioctl.h>
#include <netinet/in.h>
#include <sys/uio.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <time.h>
-#include <unistd.h>
-#include <netinet/in.h>
#include <arpa/inet.h>
#include <corosync/corotypes.h>
#include <corosync/swab.h>
#include <corosync/totem/totempg.h>
#include <corosync/totem/totem.h>
#include <corosync/logsys.h>
#include "quorum.h"
#include "main.h"
#include "vsf.h"
LOGSYS_DECLARE_SUBSYS ("QUORUM");
static struct quorum_callin_functions *corosync_quorum_fns = NULL;
int corosync_quorum_is_quorate (void)
{
if (corosync_quorum_fns) {
return corosync_quorum_fns->quorate();
}
else {
return 1;
}
}
int corosync_quorum_register_callback (quorum_callback_fn_t fn, void *context)
{
if (corosync_quorum_fns) {
return corosync_quorum_fns->register_callback(fn, context);
}
else {
return 0;
}
}
int corosync_quorum_unregister_callback (quorum_callback_fn_t fn, void *context)
{
if (corosync_quorum_fns) {
return corosync_quorum_fns->unregister_callback(fn, context);
}
else {
return 0;
}
}
int corosync_quorum_initialize (struct quorum_callin_functions *fns)
{
if (corosync_quorum_fns)
return -1;
corosync_quorum_fns = fns;
return 0;
}
int quorum_none(void)
{
if (corosync_quorum_fns)
return 0;
else
return 1;
}
diff --git a/exec/sync.c b/exec/sync.c
index ea452e60..283634a8 100644
--- a/exec/sync.c
+++ b/exec/sync.c
@@ -1,630 +1,628 @@
/*
* Copyright (c) 2009-2012 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Steven Dake (sdake@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <config.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/ioctl.h>
#include <netinet/in.h>
#include <sys/uio.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <time.h>
-#include <unistd.h>
-#include <netinet/in.h>
#include <arpa/inet.h>
#include <corosync/corotypes.h>
#include <corosync/swab.h>
#include <corosync/totem/totempg.h>
#include <corosync/totem/totem.h>
#include <corosync/logsys.h>
#include <qb/qbipc_common.h>
#include "schedwrk.h"
#include "quorum.h"
#include "sync.h"
#include "main.h"
LOGSYS_DECLARE_SUBSYS ("SYNC");
#define MESSAGE_REQ_SYNC_BARRIER 0
#define MESSAGE_REQ_SYNC_SERVICE_BUILD 1
#define MESSAGE_REQ_SYNC_MEMB_DETERMINE 2
enum sync_process_state {
INIT,
PROCESS,
ACTIVATE
};
enum sync_state {
SYNC_SERVICELIST_BUILD,
SYNC_PROCESS,
SYNC_BARRIER
};
struct service_entry {
int service_id;
void (*sync_init) (
const unsigned int *trans_list,
size_t trans_list_entries,
const unsigned int *member_list,
size_t member_list_entries,
const struct memb_ring_id *ring_id);
void (*sync_abort) (void);
int (*sync_process) (void);
void (*sync_activate) (void);
enum sync_process_state state;
char name[128];
};
struct processor_entry {
int nodeid;
int received;
};
struct req_exec_memb_determine_message {
struct qb_ipc_request_header header __attribute__((aligned(8)));
struct memb_ring_id ring_id __attribute__((aligned(8)));
};
struct req_exec_service_build_message {
struct qb_ipc_request_header header __attribute__((aligned(8)));
struct memb_ring_id ring_id __attribute__((aligned(8)));
int service_list_entries __attribute__((aligned(8)));
int service_list[128] __attribute__((aligned(8)));
};
struct req_exec_barrier_message {
struct qb_ipc_request_header header __attribute__((aligned(8)));
struct memb_ring_id ring_id __attribute__((aligned(8)));
};
static enum sync_state my_state = SYNC_BARRIER;
static struct memb_ring_id my_ring_id;
static struct memb_ring_id my_memb_determine_ring_id;
static int my_memb_determine = 0;
static unsigned int my_memb_determine_list[PROCESSOR_COUNT_MAX];
static unsigned int my_memb_determine_list_entries = 0;
static int my_processing_idx = 0;
static hdb_handle_t my_schedwrk_handle;
static struct processor_entry my_processor_list[PROCESSOR_COUNT_MAX];
static unsigned int my_member_list[PROCESSOR_COUNT_MAX];
static unsigned int my_trans_list[PROCESSOR_COUNT_MAX];
static size_t my_member_list_entries = 0;
static size_t my_trans_list_entries = 0;
static int my_processor_list_entries = 0;
static struct service_entry my_service_list[SERVICES_COUNT_MAX];
static int my_service_list_entries = 0;
static void (*sync_synchronization_completed) (void);
static void sync_deliver_fn (
unsigned int nodeid,
const void *msg,
unsigned int msg_len,
int endian_conversion_required);
static int schedwrk_processor (const void *context);
static void sync_process_enter (void);
static struct totempg_group sync_group = {
.group = "sync",
.group_len = 4
};
static void *sync_group_handle;
int (*my_sync_callbacks_retrieve) (
int service_id,
struct sync_callbacks *callbacks);
int sync_init (
int (*sync_callbacks_retrieve) (
int service_id,
struct sync_callbacks *callbacks),
void (*synchronization_completed) (void))
{
unsigned int res;
res = totempg_groups_initialize (
&sync_group_handle,
sync_deliver_fn,
NULL);
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR,
"Couldn't initialize groups interface.");
return (-1);
}
res = totempg_groups_join (
sync_group_handle,
&sync_group,
1);
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "Couldn't join group.");
return (-1);
}
sync_synchronization_completed = synchronization_completed;
my_sync_callbacks_retrieve = sync_callbacks_retrieve;
return (0);
}
static void sync_barrier_handler (unsigned int nodeid, const void *msg)
{
const struct req_exec_barrier_message *req_exec_barrier_message = msg;
int i;
int barrier_reached = 1;
if (memcmp (&my_ring_id, &req_exec_barrier_message->ring_id,
sizeof (struct memb_ring_id)) != 0) {
log_printf (LOGSYS_LEVEL_DEBUG, "barrier for old ring - discarding");
return;
}
for (i = 0; i < my_processor_list_entries; i++) {
if (my_processor_list[i].nodeid == nodeid) {
my_processor_list[i].received = 1;
}
}
for (i = 0; i < my_processor_list_entries; i++) {
if (my_processor_list[i].received == 0) {
barrier_reached = 0;
}
}
if (barrier_reached) {
log_printf (LOGSYS_LEVEL_DEBUG, "Committing synchronization for %s",
my_service_list[my_processing_idx].name);
my_service_list[my_processing_idx].state = ACTIVATE;
if (my_sync_callbacks_retrieve(my_service_list[my_processing_idx].service_id, NULL) != -1) {
my_service_list[my_processing_idx].sync_activate ();
}
my_processing_idx += 1;
if (my_service_list_entries == my_processing_idx) {
my_memb_determine_list_entries = 0;
sync_synchronization_completed ();
} else {
sync_process_enter ();
}
}
}
static void dummy_sync_init (
const unsigned int *trans_list,
size_t trans_list_entries,
const unsigned int *member_list,
size_t member_list_entries,
const struct memb_ring_id *ring_id)
{
}
static void dummy_sync_abort (void)
{
}
static int dummy_sync_process (void)
{
return (0);
}
static void dummy_sync_activate (void)
{
}
static int service_entry_compare (const void *a, const void *b)
{
const struct service_entry *service_entry_a = a;
const struct service_entry *service_entry_b = b;
return (service_entry_a->service_id > service_entry_b->service_id);
}
static void sync_memb_determine (unsigned int nodeid, const void *msg)
{
const struct req_exec_memb_determine_message *req_exec_memb_determine_message = msg;
int found = 0;
int i;
if (memcmp (&req_exec_memb_determine_message->ring_id,
&my_memb_determine_ring_id, sizeof (struct memb_ring_id)) != 0) {
log_printf (LOGSYS_LEVEL_DEBUG, "memb determine for old ring - discarding");
return;
}
my_memb_determine = 1;
for (i = 0; i < my_memb_determine_list_entries; i++) {
if (my_memb_determine_list[i] == nodeid) {
found = 1;
}
}
if (found == 0) {
my_memb_determine_list[my_memb_determine_list_entries] = nodeid;
my_memb_determine_list_entries += 1;
}
}
static void sync_service_build_handler (unsigned int nodeid, const void *msg)
{
const struct req_exec_service_build_message *req_exec_service_build_message = msg;
int i, j;
int barrier_reached = 1;
int found;
int qsort_trigger = 0;
if (memcmp (&my_ring_id, &req_exec_service_build_message->ring_id,
sizeof (struct memb_ring_id)) != 0) {
log_printf (LOGSYS_LEVEL_DEBUG, "service build for old ring - discarding");
return;
}
for (i = 0; i < req_exec_service_build_message->service_list_entries; i++) {
found = 0;
for (j = 0; j < my_service_list_entries; j++) {
if (req_exec_service_build_message->service_list[i] ==
my_service_list[j].service_id) {
found = 1;
break;
}
}
if (found == 0) {
my_service_list[my_service_list_entries].state =
INIT;
my_service_list[my_service_list_entries].service_id =
req_exec_service_build_message->service_list[i];
sprintf (my_service_list[my_service_list_entries].name,
"Unknown External Service (id = %d)\n",
req_exec_service_build_message->service_list[i]);
my_service_list[my_service_list_entries].sync_init =
dummy_sync_init;
my_service_list[my_service_list_entries].sync_abort =
dummy_sync_abort;
my_service_list[my_service_list_entries].sync_process =
dummy_sync_process;
my_service_list[my_service_list_entries].sync_activate =
dummy_sync_activate;
my_service_list_entries += 1;
qsort_trigger = 1;
}
}
if (qsort_trigger) {
qsort (my_service_list, my_service_list_entries,
sizeof (struct service_entry), service_entry_compare);
}
for (i = 0; i < my_processor_list_entries; i++) {
if (my_processor_list[i].nodeid == nodeid) {
my_processor_list[i].received = 1;
}
}
for (i = 0; i < my_processor_list_entries; i++) {
if (my_processor_list[i].received == 0) {
barrier_reached = 0;
}
}
if (barrier_reached) {
sync_process_enter ();
}
}
static void sync_deliver_fn (
unsigned int nodeid,
const void *msg,
unsigned int msg_len,
int endian_conversion_required)
{
struct qb_ipc_request_header *header = (struct qb_ipc_request_header *)msg;
switch (header->id) {
case MESSAGE_REQ_SYNC_BARRIER:
sync_barrier_handler (nodeid, msg);
break;
case MESSAGE_REQ_SYNC_SERVICE_BUILD:
sync_service_build_handler (nodeid, msg);
break;
case MESSAGE_REQ_SYNC_MEMB_DETERMINE:
sync_memb_determine (nodeid, msg);
break;
}
}
static void memb_determine_message_transmit (void)
{
struct iovec iovec;
struct req_exec_memb_determine_message req_exec_memb_determine_message;
req_exec_memb_determine_message.header.size = sizeof (struct req_exec_memb_determine_message);
req_exec_memb_determine_message.header.id = MESSAGE_REQ_SYNC_MEMB_DETERMINE;
memcpy (&req_exec_memb_determine_message.ring_id,
&my_memb_determine_ring_id,
sizeof (struct memb_ring_id));
iovec.iov_base = (char *)&req_exec_memb_determine_message;
iovec.iov_len = sizeof (req_exec_memb_determine_message);
(void)totempg_groups_mcast_joined (sync_group_handle,
&iovec, 1, TOTEMPG_AGREED);
}
static void barrier_message_transmit (void)
{
struct iovec iovec;
struct req_exec_barrier_message req_exec_barrier_message;
req_exec_barrier_message.header.size = sizeof (struct req_exec_barrier_message);
req_exec_barrier_message.header.id = MESSAGE_REQ_SYNC_BARRIER;
memcpy (&req_exec_barrier_message.ring_id, &my_ring_id,
sizeof (struct memb_ring_id));
iovec.iov_base = (char *)&req_exec_barrier_message;
iovec.iov_len = sizeof (req_exec_barrier_message);
(void)totempg_groups_mcast_joined (sync_group_handle,
&iovec, 1, TOTEMPG_AGREED);
}
static void service_build_message_transmit (struct req_exec_service_build_message *service_build_message)
{
struct iovec iovec;
service_build_message->header.size = sizeof (struct req_exec_service_build_message);
service_build_message->header.id = MESSAGE_REQ_SYNC_SERVICE_BUILD;
memcpy (&service_build_message->ring_id, &my_ring_id,
sizeof (struct memb_ring_id));
iovec.iov_base = (void *)service_build_message;
iovec.iov_len = sizeof (struct req_exec_service_build_message);
(void)totempg_groups_mcast_joined (sync_group_handle,
&iovec, 1, TOTEMPG_AGREED);
}
static void sync_barrier_enter (void)
{
my_state = SYNC_BARRIER;
barrier_message_transmit ();
}
static void sync_process_enter (void)
{
int i;
my_state = SYNC_PROCESS;
/*
* No sync services
*/
if (my_service_list_entries == 0) {
my_state = SYNC_SERVICELIST_BUILD;
my_memb_determine_list_entries = 0;
sync_synchronization_completed ();
return;
}
for (i = 0; i < my_processor_list_entries; i++) {
my_processor_list[i].received = 0;
}
schedwrk_create (&my_schedwrk_handle,
schedwrk_processor,
NULL);
}
static void sync_servicelist_build_enter (
const unsigned int *member_list,
size_t member_list_entries,
const struct memb_ring_id *ring_id)
{
struct req_exec_service_build_message service_build;
int i;
int res;
struct sync_callbacks sync_callbacks;
my_state = SYNC_SERVICELIST_BUILD;
for (i = 0; i < member_list_entries; i++) {
my_processor_list[i].nodeid = member_list[i];
my_processor_list[i].received = 0;
}
my_processor_list_entries = member_list_entries;
memcpy (my_member_list, member_list,
member_list_entries * sizeof (unsigned int));
my_member_list_entries = member_list_entries;
my_processing_idx = 0;
memset(my_service_list, 0, sizeof (struct service_entry) * SERVICES_COUNT_MAX);
my_service_list_entries = 0;
for (i = 0; i < SERVICES_COUNT_MAX; i++) {
res = my_sync_callbacks_retrieve (i, &sync_callbacks);
if (res == -1) {
continue;
}
if (sync_callbacks.sync_init == NULL) {
continue;
}
my_service_list[my_service_list_entries].state = INIT;
my_service_list[my_service_list_entries].service_id = i;
strcpy (my_service_list[my_service_list_entries].name,
sync_callbacks.name);
my_service_list[my_service_list_entries].sync_init = sync_callbacks.sync_init;
my_service_list[my_service_list_entries].sync_process = sync_callbacks.sync_process;
my_service_list[my_service_list_entries].sync_abort = sync_callbacks.sync_abort;
my_service_list[my_service_list_entries].sync_activate = sync_callbacks.sync_activate;
my_service_list_entries += 1;
}
for (i = 0; i < my_service_list_entries; i++) {
service_build.service_list[i] =
my_service_list[i].service_id;
}
service_build.service_list_entries = my_service_list_entries;
service_build_message_transmit (&service_build);
}
static int schedwrk_processor (const void *context)
{
int res = 0;
if (my_service_list[my_processing_idx].state == INIT) {
unsigned int old_trans_list[PROCESSOR_COUNT_MAX];
size_t old_trans_list_entries = 0;
int o, m;
my_service_list[my_processing_idx].state = PROCESS;
memcpy (old_trans_list, my_trans_list, my_trans_list_entries *
sizeof (unsigned int));
old_trans_list_entries = my_trans_list_entries;
my_trans_list_entries = 0;
for (o = 0; o < old_trans_list_entries; o++) {
for (m = 0; m < my_member_list_entries; m++) {
if (old_trans_list[o] == my_member_list[m]) {
my_trans_list[my_trans_list_entries] = my_member_list[m];
my_trans_list_entries++;
break;
}
}
}
if (my_sync_callbacks_retrieve(my_service_list[my_processing_idx].service_id, NULL) != -1) {
my_service_list[my_processing_idx].sync_init (my_trans_list,
my_trans_list_entries, my_member_list,
my_member_list_entries,
&my_ring_id);
}
}
if (my_service_list[my_processing_idx].state == PROCESS) {
my_service_list[my_processing_idx].state = PROCESS;
if (my_sync_callbacks_retrieve(my_service_list[my_processing_idx].service_id, NULL) != -1) {
res = my_service_list[my_processing_idx].sync_process ();
} else {
res = 0;
}
if (res == 0) {
sync_barrier_enter();
} else {
return (-1);
}
}
return (0);
}
void sync_start (
const unsigned int *member_list,
size_t member_list_entries,
const struct memb_ring_id *ring_id)
{
ENTER();
memcpy (&my_ring_id, ring_id, sizeof (struct memb_ring_id));
if (my_memb_determine) {
my_memb_determine = 0;
sync_servicelist_build_enter (my_memb_determine_list,
my_memb_determine_list_entries, ring_id);
} else {
sync_servicelist_build_enter (member_list, member_list_entries,
ring_id);
}
}
void sync_save_transitional (
const unsigned int *member_list,
size_t member_list_entries,
const struct memb_ring_id *ring_id)
{
ENTER();
memcpy (my_trans_list, member_list, member_list_entries *
sizeof (unsigned int));
my_trans_list_entries = member_list_entries;
}
void sync_abort (void)
{
ENTER();
if (my_state == SYNC_PROCESS) {
schedwrk_destroy (my_schedwrk_handle);
if (my_sync_callbacks_retrieve(my_service_list[my_processing_idx].service_id, NULL) != -1) {
my_service_list[my_processing_idx].sync_abort ();
}
}
/* this will cause any "old" barrier messages from causing
* problems.
*/
memset (&my_ring_id, 0, sizeof (struct memb_ring_id));
}
void sync_memb_list_determine (const struct memb_ring_id *ring_id)
{
ENTER();
memcpy (&my_memb_determine_ring_id, ring_id,
sizeof (struct memb_ring_id));
memb_determine_message_transmit ();
}
void sync_memb_list_abort (void)
{
ENTER();
my_memb_determine_list_entries = 0;
memset (&my_memb_determine_ring_id, 0, sizeof (struct memb_ring_id));
}
diff --git a/exec/vsf_quorum.c b/exec/vsf_quorum.c
index e268deac..4bf3b0a4 100644
--- a/exec/vsf_quorum.c
+++ b/exec/vsf_quorum.c
@@ -1,488 +1,487 @@
/*
* Copyright (c) 2008-2015 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Christine Caulfield (ccaulfie@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of Red Hat Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <config.h>
#include <pwd.h>
#include <grp.h>
#include <sys/types.h>
#include <sys/poll.h>
#include <sys/uio.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <sched.h>
#include <time.h>
#include "quorum.h"
#include <corosync/corotypes.h>
#include <qb/qbipc_common.h>
#include <corosync/corodefs.h>
#include <corosync/swab.h>
#include <corosync/list.h>
#include <corosync/mar_gen.h>
#include <corosync/ipc_quorum.h>
-#include <corosync/mar_gen.h>
#include <corosync/coroapi.h>
#include <corosync/logsys.h>
#include <corosync/icmap.h>
#include "service.h"
#include "votequorum.h"
#include "vsf_ykd.h"
LOGSYS_DECLARE_SUBSYS ("QUORUM");
struct quorum_pd {
unsigned char track_flags;
int tracking_enabled;
struct list_head list;
void *conn;
};
struct internal_callback_pd {
struct list_head list;
quorum_callback_fn_t callback;
void *context;
};
static void message_handler_req_lib_quorum_getquorate (void *conn,
const void *msg);
static void message_handler_req_lib_quorum_trackstart (void *conn,
const void *msg);
static void message_handler_req_lib_quorum_trackstop (void *conn,
const void *msg);
static void message_handler_req_lib_quorum_gettype (void *conn,
const void *msg);
static void send_library_notification(void *conn);
static void send_internal_notification(void);
static char *quorum_exec_init_fn (struct corosync_api_v1 *api);
static int quorum_lib_init_fn (void *conn);
static int quorum_lib_exit_fn (void *conn);
static int primary_designated = 0;
static int quorum_type = 0;
static struct corosync_api_v1 *corosync_api;
static struct list_head lib_trackers_list;
static struct list_head internal_trackers_list;
static struct memb_ring_id quorum_ring_id;
static size_t quorum_view_list_entries = 0;
static int quorum_view_list[PROCESSOR_COUNT_MAX];
struct quorum_services_api_ver1 *quorum_iface = NULL;
static char view_buf[64];
static void log_view_list(const unsigned int *view_list, size_t view_list_entries)
{
int total = (int)view_list_entries;
int len, pos, ret;
int i = 0;
while (1) {
len = sizeof(view_buf);
pos = 0;
memset(view_buf, 0, len);
for (; i < total; i++) {
ret = snprintf(view_buf + pos, len - pos, " %u", view_list[i]);
if (ret >= len - pos)
break;
pos += ret;
}
log_printf (LOGSYS_LEVEL_NOTICE, "Members[%d]:%s%s",
total, view_buf, i < total ? "\\" : "");
if (i == total)
break;
}
}
/* Internal quorum API function */
static void quorum_api_set_quorum(const unsigned int *view_list,
size_t view_list_entries,
int quorum, struct memb_ring_id *ring_id)
{
int old_quorum = primary_designated;
primary_designated = quorum;
if (primary_designated && !old_quorum) {
log_printf (LOGSYS_LEVEL_NOTICE, "This node is within the primary component and will provide service.");
} else if (!primary_designated && old_quorum) {
log_printf (LOGSYS_LEVEL_NOTICE, "This node is within the non-primary component and will NOT provide any services.");
}
quorum_view_list_entries = view_list_entries;
memcpy(&quorum_ring_id, ring_id, sizeof (quorum_ring_id));
memcpy(quorum_view_list, view_list, sizeof(unsigned int)*view_list_entries);
log_view_list(view_list, view_list_entries);
/* Tell internal listeners */
send_internal_notification();
/* Tell IPC listeners */
send_library_notification(NULL);
}
static struct corosync_lib_handler quorum_lib_service[] =
{
{ /* 0 */
.lib_handler_fn = message_handler_req_lib_quorum_getquorate,
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 1 */
.lib_handler_fn = message_handler_req_lib_quorum_trackstart,
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 2 */
.lib_handler_fn = message_handler_req_lib_quorum_trackstop,
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 3 */
.lib_handler_fn = message_handler_req_lib_quorum_gettype,
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
}
};
static struct corosync_service_engine quorum_service_handler = {
.name = "corosync cluster quorum service v0.1",
.id = QUORUM_SERVICE,
.priority = 1,
.private_data_size = sizeof (struct quorum_pd),
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED,
.allow_inquorate = CS_LIB_ALLOW_INQUORATE,
.lib_init_fn = quorum_lib_init_fn,
.lib_exit_fn = quorum_lib_exit_fn,
.lib_engine = quorum_lib_service,
.exec_init_fn = quorum_exec_init_fn,
.lib_engine_count = sizeof (quorum_lib_service) / sizeof (struct corosync_lib_handler)
};
struct corosync_service_engine *vsf_quorum_get_service_engine_ver0 (void)
{
return (&quorum_service_handler);
}
/* -------------------------------------------------- */
/*
* Internal API functions for corosync
*/
static int quorum_quorate(void)
{
return primary_designated;
}
static int quorum_register_callback(quorum_callback_fn_t function, void *context)
{
struct internal_callback_pd *pd = malloc(sizeof(struct internal_callback_pd));
if (!pd)
return -1;
pd->context = context;
pd->callback = function;
list_add (&pd->list, &internal_trackers_list);
return 0;
}
static int quorum_unregister_callback(quorum_callback_fn_t function, void *context)
{
struct internal_callback_pd *pd;
struct list_head *tmp;
for (tmp = internal_trackers_list.next; tmp != &internal_trackers_list; tmp = tmp->next) {
pd = list_entry(tmp, struct internal_callback_pd, list);
if (pd->callback == function && pd->context == context) {
list_del(&pd->list);
free(pd);
return 0;
}
}
return -1;
}
static struct quorum_callin_functions callins = {
.quorate = quorum_quorate,
.register_callback = quorum_register_callback,
.unregister_callback = quorum_unregister_callback
};
/* --------------------------------------------------------------------- */
static char *quorum_exec_init_fn (struct corosync_api_v1 *api)
{
char *quorum_module = NULL;
char *error;
corosync_api = api;
list_init (&lib_trackers_list);
list_init (&internal_trackers_list);
/*
* Tell corosync we have a quorum engine.
*/
api->quorum_initialize(&callins);
/*
* Look for a quorum provider
*/
if (icmap_get_string("quorum.provider", &quorum_module) == CS_OK) {
log_printf (LOGSYS_LEVEL_NOTICE,
"Using quorum provider %s", quorum_module);
error = (char *)"Invalid quorum provider";
if (strcmp (quorum_module, "corosync_votequorum") == 0) {
error = votequorum_init (api, quorum_api_set_quorum);
quorum_type = 1;
}
if (strcmp (quorum_module, "corosync_ykd") == 0) {
error = ykd_init (api, quorum_api_set_quorum);
quorum_type = 1;
}
if (error) {
log_printf (LOGSYS_LEVEL_CRIT,
"Quorum provider: %s failed to initialize.",
quorum_module);
free(quorum_module);
return (error);
}
}
if (quorum_module) {
free(quorum_module);
quorum_module = NULL;
}
/*
* setting quorum_type and primary_designated in the right order is important
* always try to lookup/init a quorum module, then revert back to be quorate
*/
if (quorum_type == 0) {
primary_designated = 1;
}
return (NULL);
}
static int quorum_lib_init_fn (void *conn)
{
struct quorum_pd *pd = (struct quorum_pd *)corosync_api->ipc_private_data_get (conn);
log_printf(LOGSYS_LEVEL_DEBUG, "lib_init_fn: conn=%p", conn);
list_init (&pd->list);
pd->conn = conn;
return (0);
}
static int quorum_lib_exit_fn (void *conn)
{
struct quorum_pd *quorum_pd = (struct quorum_pd *)corosync_api->ipc_private_data_get (conn);
log_printf(LOGSYS_LEVEL_DEBUG, "lib_exit_fn: conn=%p", conn);
if (quorum_pd->tracking_enabled) {
list_del (&quorum_pd->list);
list_init (&quorum_pd->list);
}
return (0);
}
static void send_internal_notification(void)
{
struct list_head *tmp;
struct internal_callback_pd *pd;
for (tmp = internal_trackers_list.next; tmp != &internal_trackers_list; tmp = tmp->next) {
pd = list_entry(tmp, struct internal_callback_pd, list);
pd->callback(primary_designated, pd->context);
}
}
static void send_library_notification(void *conn)
{
int size = sizeof(struct res_lib_quorum_notification) + sizeof(unsigned int)*quorum_view_list_entries;
char buf[size];
struct res_lib_quorum_notification *res_lib_quorum_notification = (struct res_lib_quorum_notification *)buf;
struct list_head *tmp;
int i;
log_printf(LOGSYS_LEVEL_DEBUG, "sending quorum notification to %p, length = %d", conn, size);
res_lib_quorum_notification->quorate = primary_designated;
res_lib_quorum_notification->ring_seq = quorum_ring_id.seq;
res_lib_quorum_notification->view_list_entries = quorum_view_list_entries;
for (i=0; i<quorum_view_list_entries; i++) {
res_lib_quorum_notification->view_list[i] = quorum_view_list[i];
}
res_lib_quorum_notification->header.id = MESSAGE_RES_QUORUM_NOTIFICATION;
res_lib_quorum_notification->header.size = size;
res_lib_quorum_notification->header.error = CS_OK;
/* Send it to all interested parties */
if (conn) {
corosync_api->ipc_dispatch_send(conn, res_lib_quorum_notification, size);
}
else {
struct quorum_pd *qpd;
for (tmp = lib_trackers_list.next; tmp != &lib_trackers_list; tmp = tmp->next) {
qpd = list_entry(tmp, struct quorum_pd, list);
corosync_api->ipc_dispatch_send(qpd->conn,
res_lib_quorum_notification, size);
}
}
return;
}
static void message_handler_req_lib_quorum_getquorate (void *conn,
const void *msg)
{
struct res_lib_quorum_getquorate res_lib_quorum_getquorate;
log_printf(LOGSYS_LEVEL_DEBUG, "got quorate request on %p", conn);
/* send status */
res_lib_quorum_getquorate.quorate = primary_designated;
res_lib_quorum_getquorate.header.size = sizeof(res_lib_quorum_getquorate);
res_lib_quorum_getquorate.header.id = MESSAGE_RES_QUORUM_GETQUORATE;
res_lib_quorum_getquorate.header.error = CS_OK;
corosync_api->ipc_response_send(conn, &res_lib_quorum_getquorate, sizeof(res_lib_quorum_getquorate));
}
static void message_handler_req_lib_quorum_trackstart (void *conn,
const void *msg)
{
const struct req_lib_quorum_trackstart *req_lib_quorum_trackstart = msg;
struct qb_ipc_response_header res;
struct quorum_pd *quorum_pd = (struct quorum_pd *)corosync_api->ipc_private_data_get (conn);
cs_error_t error = CS_OK;
log_printf(LOGSYS_LEVEL_DEBUG, "got trackstart request on %p", conn);
/*
* If an immediate listing of the current cluster membership
* is requested, generate membership list
*/
if (req_lib_quorum_trackstart->track_flags & CS_TRACK_CURRENT ||
req_lib_quorum_trackstart->track_flags & CS_TRACK_CHANGES) {
log_printf(LOGSYS_LEVEL_DEBUG, "sending initial status to %p", conn);
send_library_notification(conn);
}
if (quorum_pd->tracking_enabled) {
error = CS_ERR_EXIST;
goto response_send;
}
/*
* Record requests for tracking
*/
if (req_lib_quorum_trackstart->track_flags & CS_TRACK_CHANGES ||
req_lib_quorum_trackstart->track_flags & CS_TRACK_CHANGES_ONLY) {
quorum_pd->track_flags = req_lib_quorum_trackstart->track_flags;
quorum_pd->tracking_enabled = 1;
list_add (&quorum_pd->list, &lib_trackers_list);
}
response_send:
/* send status */
res.size = sizeof(res);
res.id = MESSAGE_RES_QUORUM_TRACKSTART;
res.error = error;
corosync_api->ipc_response_send(conn, &res, sizeof(struct qb_ipc_response_header));
}
static void message_handler_req_lib_quorum_trackstop (void *conn, const void *msg)
{
struct qb_ipc_response_header res;
struct quorum_pd *quorum_pd = (struct quorum_pd *)corosync_api->ipc_private_data_get (conn);
log_printf(LOGSYS_LEVEL_DEBUG, "got trackstop request on %p", conn);
if (quorum_pd->tracking_enabled) {
res.error = CS_OK;
quorum_pd->tracking_enabled = 0;
list_del (&quorum_pd->list);
list_init (&quorum_pd->list);
} else {
res.error = CS_ERR_NOT_EXIST;
}
/* send status */
res.size = sizeof(res);
res.id = MESSAGE_RES_QUORUM_TRACKSTOP;
res.error = CS_OK;
corosync_api->ipc_response_send(conn, &res, sizeof(struct qb_ipc_response_header));
}
static void message_handler_req_lib_quorum_gettype (void *conn,
const void *msg)
{
struct res_lib_quorum_gettype res_lib_quorum_gettype;
log_printf(LOGSYS_LEVEL_DEBUG, "got quorum_type request on %p", conn);
/* send status */
res_lib_quorum_gettype.quorum_type = quorum_type;
res_lib_quorum_gettype.header.size = sizeof(res_lib_quorum_gettype);
res_lib_quorum_gettype.header.id = MESSAGE_RES_QUORUM_GETTYPE;
res_lib_quorum_gettype.header.error = CS_OK;
corosync_api->ipc_response_send(conn, &res_lib_quorum_gettype, sizeof(res_lib_quorum_gettype));
}
diff --git a/qdevices/tlv.c b/qdevices/tlv.c
index a7eae953..5b458690 100644
--- a/qdevices/tlv.c
+++ b/qdevices/tlv.c
@@ -1,1038 +1,1037 @@
/*
* Copyright (c) 2015-2016 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Jan Friesse (jfriesse@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the Red Hat, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <sys/types.h>
#include <arpa/inet.h>
#include <inttypes.h>
#include <stdlib.h>
#include <string.h>
/*
* 64-bit variant of ntoh is not exactly standard...
*/
#if defined(__linux__)
#include <endian.h>
#elif defined(__FreeBSD__) || defined(__NetBSD__)
#include <sys/endian.h>
#elif defined(__OpenBSD__)
-#include <sys/types.h>
#define be64toh(x) betoh64(x)
#endif
#include "tlv.h"
#define TLV_TYPE_LENGTH 2
#define TLV_LENGTH_LENGTH 2
#define TLV_STATIC_SUPPORTED_OPTIONS_SIZE 22
enum tlv_opt_type tlv_static_supported_options[TLV_STATIC_SUPPORTED_OPTIONS_SIZE] = {
TLV_OPT_MSG_SEQ_NUMBER,
TLV_OPT_CLUSTER_NAME,
TLV_OPT_TLS_SUPPORTED,
TLV_OPT_TLS_CLIENT_CERT_REQUIRED,
TLV_OPT_SUPPORTED_MESSAGES,
TLV_OPT_SUPPORTED_OPTIONS,
TLV_OPT_REPLY_ERROR_CODE,
TLV_OPT_SERVER_MAXIMUM_REQUEST_SIZE,
TLV_OPT_SERVER_MAXIMUM_REPLY_SIZE,
TLV_OPT_NODE_ID,
TLV_OPT_SUPPORTED_DECISION_ALGORITHMS,
TLV_OPT_DECISION_ALGORITHM,
TLV_OPT_HEARTBEAT_INTERVAL,
TLV_OPT_RING_ID,
TLV_OPT_CONFIG_VERSION,
TLV_OPT_DATA_CENTER_ID,
TLV_OPT_NODE_STATE,
TLV_OPT_NODE_INFO,
TLV_OPT_NODE_LIST_TYPE,
TLV_OPT_VOTE,
TLV_OPT_QUORATE,
TLV_OPT_TIE_BREAKER,
};
int
tlv_add(struct dynar *msg, enum tlv_opt_type opt_type, uint16_t opt_len, const void *value)
{
uint16_t nlen;
uint16_t nopt_type;
if (dynar_size(msg) + sizeof(nopt_type) + sizeof(nlen) + opt_len > dynar_max_size(msg)) {
return (-1);
}
nopt_type = htons((uint16_t)opt_type);
nlen = htons(opt_len);
dynar_cat(msg, &nopt_type, sizeof(nopt_type));
dynar_cat(msg, &nlen, sizeof(nlen));
dynar_cat(msg, value, opt_len);
return (0);
}
int
tlv_add_u32(struct dynar *msg, enum tlv_opt_type opt_type, uint32_t u32)
{
uint32_t nu32;
nu32 = htonl(u32);
return (tlv_add(msg, opt_type, sizeof(nu32), &nu32));
}
int
tlv_add_u8(struct dynar *msg, enum tlv_opt_type opt_type, uint8_t u8)
{
return (tlv_add(msg, opt_type, sizeof(u8), &u8));
}
int
tlv_add_u16(struct dynar *msg, enum tlv_opt_type opt_type, uint16_t u16)
{
uint16_t nu16;
nu16 = htons(u16);
return (tlv_add(msg, opt_type, sizeof(nu16), &nu16));
}
int
tlv_add_u64(struct dynar *msg, enum tlv_opt_type opt_type, uint64_t u64)
{
uint64_t nu64;
nu64 = htobe64(u64);
return (tlv_add(msg, opt_type, sizeof(nu64), &nu64));
}
int
tlv_add_string(struct dynar *msg, enum tlv_opt_type opt_type, const char *str)
{
return (tlv_add(msg, opt_type, strlen(str), str));
}
int
tlv_add_msg_seq_number(struct dynar *msg, uint32_t msg_seq_number)
{
return (tlv_add_u32(msg, TLV_OPT_MSG_SEQ_NUMBER, msg_seq_number));
}
int
tlv_add_cluster_name(struct dynar *msg, const char *cluster_name)
{
return (tlv_add_string(msg, TLV_OPT_CLUSTER_NAME, cluster_name));
}
int
tlv_add_tls_supported(struct dynar *msg, enum tlv_tls_supported tls_supported)
{
return (tlv_add_u8(msg, TLV_OPT_TLS_SUPPORTED, tls_supported));
}
int
tlv_add_tls_client_cert_required(struct dynar *msg, int tls_client_cert_required)
{
return (tlv_add_u8(msg, TLV_OPT_TLS_CLIENT_CERT_REQUIRED, tls_client_cert_required));
}
int
tlv_add_u16_array(struct dynar *msg, enum tlv_opt_type opt_type, const uint16_t *array,
size_t array_size)
{
size_t i;
uint16_t *nu16a;
uint16_t opt_len;
int res;
nu16a = malloc(sizeof(uint16_t) * array_size);
if (nu16a == NULL) {
return (-1);
}
for (i = 0; i < array_size; i++) {
nu16a[i] = htons(array[i]);
}
opt_len = sizeof(uint16_t) * array_size;
res = tlv_add(msg, opt_type, opt_len, nu16a);
free(nu16a);
return (res);
}
int
tlv_add_supported_options(struct dynar *msg, const enum tlv_opt_type *supported_options,
size_t no_supported_options)
{
uint16_t *u16a;
size_t i;
int res;
u16a = malloc(sizeof(*u16a) * no_supported_options);
if (u16a == NULL) {
return (-1);
}
for (i = 0; i < no_supported_options; i++) {
u16a[i] = (uint16_t)supported_options[i];
}
res = (tlv_add_u16_array(msg, TLV_OPT_SUPPORTED_OPTIONS, u16a, no_supported_options));
free(u16a);
return (res);
}
int
tlv_add_supported_decision_algorithms(struct dynar *msg,
const enum tlv_decision_algorithm_type *supported_algorithms, size_t no_supported_algorithms)
{
uint16_t *u16a;
size_t i;
int res;
u16a = malloc(sizeof(*u16a) * no_supported_algorithms);
if (u16a == NULL) {
return (-1);
}
for (i = 0; i < no_supported_algorithms; i++) {
u16a[i] = (uint16_t)supported_algorithms[i];
}
res = (tlv_add_u16_array(msg, TLV_OPT_SUPPORTED_DECISION_ALGORITHMS, u16a,
no_supported_algorithms));
free(u16a);
return (res);
}
int
tlv_add_reply_error_code(struct dynar *msg, enum tlv_reply_error_code error_code)
{
return (tlv_add_u16(msg, TLV_OPT_REPLY_ERROR_CODE, (uint16_t)error_code));
}
int
tlv_add_server_maximum_request_size(struct dynar *msg, size_t server_maximum_request_size)
{
return (tlv_add_u32(msg, TLV_OPT_SERVER_MAXIMUM_REQUEST_SIZE, server_maximum_request_size));
}
int
tlv_add_server_maximum_reply_size(struct dynar *msg, size_t server_maximum_reply_size)
{
return (tlv_add_u32(msg, TLV_OPT_SERVER_MAXIMUM_REPLY_SIZE, server_maximum_reply_size));
}
int
tlv_add_node_id(struct dynar *msg, uint32_t node_id)
{
return (tlv_add_u32(msg, TLV_OPT_NODE_ID, node_id));
}
int
tlv_add_decision_algorithm(struct dynar *msg, enum tlv_decision_algorithm_type decision_algorithm)
{
return (tlv_add_u16(msg, TLV_OPT_DECISION_ALGORITHM, (uint16_t)decision_algorithm));
}
int
tlv_add_heartbeat_interval(struct dynar *msg, uint32_t heartbeat_interval)
{
return (tlv_add_u32(msg, TLV_OPT_HEARTBEAT_INTERVAL, heartbeat_interval));
}
int
tlv_add_ring_id(struct dynar *msg, const struct tlv_ring_id *ring_id)
{
uint64_t nu64;
uint32_t nu32;
char tmp_buf[12];
nu32 = htonl(ring_id->node_id);
nu64 = htobe64(ring_id->seq);
memcpy(tmp_buf, &nu32, sizeof(nu32));
memcpy(tmp_buf + sizeof(nu32), &nu64, sizeof(nu64));
return (tlv_add(msg, TLV_OPT_RING_ID, sizeof(tmp_buf), tmp_buf));
}
int
tlv_add_tie_breaker(struct dynar *msg, const struct tlv_tie_breaker *tie_breaker)
{
uint32_t nu32;
uint8_t u8;
char tmp_buf[5];
u8 = tie_breaker->mode;
nu32 = (tie_breaker->mode == TLV_TIE_BREAKER_MODE_NODE_ID ?
htonl(tie_breaker->node_id) : 0);
memcpy(tmp_buf, &u8, sizeof(u8));
memcpy(tmp_buf + sizeof(u8), &nu32, sizeof(nu32));
return (tlv_add(msg, TLV_OPT_TIE_BREAKER, sizeof(tmp_buf), tmp_buf));
}
int
tlv_add_config_version(struct dynar *msg, uint64_t config_version)
{
return (tlv_add_u64(msg, TLV_OPT_CONFIG_VERSION, config_version));
}
int
tlv_add_data_center_id(struct dynar *msg, uint32_t data_center_id)
{
return (tlv_add_u32(msg, TLV_OPT_DATA_CENTER_ID, data_center_id));
}
int
tlv_add_node_state(struct dynar *msg, enum tlv_node_state node_state)
{
return (tlv_add_u8(msg, TLV_OPT_NODE_STATE, node_state));
}
int
tlv_add_node_info(struct dynar *msg, const struct tlv_node_info *node_info)
{
struct dynar opt_value;
int res;
res = 0;
/*
* Create sub message,
*/
dynar_init(&opt_value, 1024);
if ((res = tlv_add_node_id(&opt_value, node_info->node_id)) != 0) {
goto exit_dynar_destroy;
}
if (node_info->data_center_id != 0) {
if ((res = tlv_add_data_center_id(&opt_value, node_info->data_center_id)) != 0) {
goto exit_dynar_destroy;
}
}
if (node_info->node_state != TLV_NODE_STATE_NOT_SET) {
if ((res = tlv_add_node_state(&opt_value, node_info->node_state)) != 0) {
goto exit_dynar_destroy;
}
}
res = tlv_add(msg, TLV_OPT_NODE_INFO, dynar_size(&opt_value), dynar_data(&opt_value));
if (res != 0) {
goto exit_dynar_destroy;
}
exit_dynar_destroy:
dynar_destroy(&opt_value);
return (res);
}
int
tlv_add_node_list_type(struct dynar *msg, enum tlv_node_list_type node_list_type)
{
return (tlv_add_u8(msg, TLV_OPT_NODE_LIST_TYPE, node_list_type));
}
int
tlv_add_vote(struct dynar *msg, enum tlv_vote vote)
{
return (tlv_add_u8(msg, TLV_OPT_VOTE, vote));
}
int
tlv_add_quorate(struct dynar *msg, enum tlv_quorate quorate)
{
return (tlv_add_u8(msg, TLV_OPT_QUORATE, quorate));
}
void
tlv_iter_init_str(const char *msg, size_t msg_len, size_t msg_header_len,
struct tlv_iterator *tlv_iter)
{
tlv_iter->msg = msg;
tlv_iter->msg_len = msg_len;
tlv_iter->current_pos = 0;
tlv_iter->msg_header_len = msg_header_len;
tlv_iter->iter_next_called = 0;
}
void
tlv_iter_init(const struct dynar *msg, size_t msg_header_len, struct tlv_iterator *tlv_iter)
{
tlv_iter_init_str(dynar_data(msg), dynar_size(msg), msg_header_len, tlv_iter);
}
enum tlv_opt_type
tlv_iter_get_type(const struct tlv_iterator *tlv_iter)
{
uint16_t ntype;
uint16_t type;
memcpy(&ntype, tlv_iter->msg + tlv_iter->current_pos, sizeof(ntype));
type = ntohs(ntype);
return (type);
}
uint16_t
tlv_iter_get_len(const struct tlv_iterator *tlv_iter)
{
uint16_t nlen;
uint16_t len;
memcpy(&nlen, tlv_iter->msg + tlv_iter->current_pos + TLV_TYPE_LENGTH, sizeof(nlen));
len = ntohs(nlen);
return (len);
}
const char *
tlv_iter_get_data(const struct tlv_iterator *tlv_iter)
{
return (tlv_iter->msg + tlv_iter->current_pos + TLV_TYPE_LENGTH + TLV_LENGTH_LENGTH);
}
int
tlv_iter_next(struct tlv_iterator *tlv_iter)
{
uint16_t len;
if (tlv_iter->iter_next_called == 0) {
tlv_iter->iter_next_called = 1;
tlv_iter->current_pos = tlv_iter->msg_header_len;
goto check_tlv_validity;
}
len = tlv_iter_get_len(tlv_iter);
if (tlv_iter->current_pos + TLV_TYPE_LENGTH + TLV_LENGTH_LENGTH + len >=
tlv_iter->msg_len) {
return (0);
}
tlv_iter->current_pos += TLV_TYPE_LENGTH + TLV_LENGTH_LENGTH + len;
check_tlv_validity:
/*
* Check if tlv is valid = is not larger than whole message
*/
len = tlv_iter_get_len(tlv_iter);
if (tlv_iter->current_pos + TLV_TYPE_LENGTH + TLV_LENGTH_LENGTH + len > tlv_iter->msg_len) {
return (-1);
}
return (1);
}
int
tlv_iter_decode_u32(struct tlv_iterator *tlv_iter, uint32_t *res)
{
const char *opt_data;
uint16_t opt_len;
uint32_t nu32;
opt_len = tlv_iter_get_len(tlv_iter);
opt_data = tlv_iter_get_data(tlv_iter);
if (opt_len != sizeof(nu32)) {
return (-1);
}
memcpy(&nu32, opt_data, sizeof(nu32));
*res = ntohl(nu32);
return (0);
}
int
tlv_iter_decode_u8(struct tlv_iterator *tlv_iter, uint8_t *res)
{
const char *opt_data;
uint16_t opt_len;
opt_len = tlv_iter_get_len(tlv_iter);
opt_data = tlv_iter_get_data(tlv_iter);
if (opt_len != sizeof(*res)) {
return (-1);
}
memcpy(res, opt_data, sizeof(*res));
return (0);
}
int
tlv_iter_decode_client_cert_required(struct tlv_iterator *tlv_iter, uint8_t *client_cert_required)
{
return (tlv_iter_decode_u8(tlv_iter, client_cert_required));
}
int
tlv_iter_decode_str(struct tlv_iterator *tlv_iter, char **str, size_t *str_len)
{
const char *opt_data;
uint16_t opt_len;
char *tmp_str;
opt_len = tlv_iter_get_len(tlv_iter);
opt_data = tlv_iter_get_data(tlv_iter);
tmp_str = malloc(opt_len + 1);
if (tmp_str == NULL) {
return (-1);
}
memcpy(tmp_str, opt_data, opt_len);
tmp_str[opt_len] = '\0';
*str = tmp_str;
*str_len = opt_len;
return (0);
}
int
tlv_iter_decode_u16_array(struct tlv_iterator *tlv_iter, uint16_t **u16a, size_t *no_items)
{
uint16_t opt_len;
uint16_t *u16a_res;
size_t i;
opt_len = tlv_iter_get_len(tlv_iter);
if (opt_len % sizeof(uint16_t) != 0) {
return (-1);
}
*no_items = opt_len / sizeof(uint16_t);
u16a_res = malloc(sizeof(uint16_t) * *no_items);
if (u16a_res == NULL) {
return (-2);
}
memcpy(u16a_res, tlv_iter_get_data(tlv_iter), opt_len);
for (i = 0; i < *no_items; i++) {
u16a_res[i] = ntohs(u16a_res[i]);
}
*u16a = u16a_res;
return (0);
}
int
tlv_iter_decode_supported_options(struct tlv_iterator *tlv_iter,
enum tlv_opt_type **supported_options, size_t *no_supported_options)
{
uint16_t *u16a;
enum tlv_opt_type *tlv_opt_array;
size_t i;
int res;
res = tlv_iter_decode_u16_array(tlv_iter, &u16a, no_supported_options);
if (res != 0) {
return (res);
}
tlv_opt_array = malloc(sizeof(enum tlv_opt_type) * *no_supported_options);
if (tlv_opt_array == NULL) {
free(u16a);
return (-2);
}
for (i = 0; i < *no_supported_options; i++) {
tlv_opt_array[i] = (enum tlv_opt_type)u16a[i];
}
free(u16a);
*supported_options = tlv_opt_array;
return (0);
}
int
tlv_iter_decode_supported_decision_algorithms(struct tlv_iterator *tlv_iter,
enum tlv_decision_algorithm_type **supported_decision_algorithms,
size_t *no_supported_decision_algorithms)
{
uint16_t *u16a;
enum tlv_decision_algorithm_type *tlv_decision_algorithm_type_array;
size_t i;
int res;
res = tlv_iter_decode_u16_array(tlv_iter, &u16a, no_supported_decision_algorithms);
if (res != 0) {
return (res);
}
tlv_decision_algorithm_type_array = malloc(
sizeof(enum tlv_decision_algorithm_type) * *no_supported_decision_algorithms);
if (tlv_decision_algorithm_type_array == NULL) {
free(u16a);
return (-2);
}
for (i = 0; i < *no_supported_decision_algorithms; i++) {
tlv_decision_algorithm_type_array[i] = (enum tlv_decision_algorithm_type)u16a[i];
}
free(u16a);
*supported_decision_algorithms = tlv_decision_algorithm_type_array;
return (0);
}
int
tlv_iter_decode_u16(struct tlv_iterator *tlv_iter, uint16_t *u16)
{
const char *opt_data;
uint16_t opt_len;
uint16_t nu16;
opt_len = tlv_iter_get_len(tlv_iter);
opt_data = tlv_iter_get_data(tlv_iter);
if (opt_len != sizeof(nu16)) {
return (-1);
}
memcpy(&nu16, opt_data, sizeof(nu16));
*u16 = ntohs(nu16);
return (0);
}
int
tlv_iter_decode_u64(struct tlv_iterator *tlv_iter, uint64_t *u64)
{
const char *opt_data;
uint64_t opt_len;
uint64_t nu64;
opt_len = tlv_iter_get_len(tlv_iter);
opt_data = tlv_iter_get_data(tlv_iter);
if (opt_len != sizeof(nu64)) {
return (-1);
}
memcpy(&nu64, opt_data, sizeof(nu64));
*u64 = be64toh(nu64);
return (0);
}
int
tlv_iter_decode_reply_error_code(struct tlv_iterator *tlv_iter,
enum tlv_reply_error_code *reply_error_code)
{
return (tlv_iter_decode_u16(tlv_iter, (uint16_t *)reply_error_code));
}
int
tlv_iter_decode_tls_supported(struct tlv_iterator *tlv_iter, enum tlv_tls_supported *tls_supported)
{
uint8_t u8;
enum tlv_tls_supported tmp_tls_supported;
if (tlv_iter_decode_u8(tlv_iter, &u8) != 0) {
return (-1);
}
tmp_tls_supported = u8;
if (tmp_tls_supported != TLV_TLS_UNSUPPORTED &&
tmp_tls_supported != TLV_TLS_SUPPORTED &&
tmp_tls_supported != TLV_TLS_REQUIRED) {
return (-4);
}
*tls_supported = tmp_tls_supported;
return (0);
}
int
tlv_iter_decode_decision_algorithm(struct tlv_iterator *tlv_iter,
enum tlv_decision_algorithm_type *decision_algorithm)
{
uint16_t u16;
if (tlv_iter_decode_u16(tlv_iter, &u16) != 0) {
return (-1);
}
*decision_algorithm = (enum tlv_decision_algorithm_type)u16;
return (0);
}
int
tlv_iter_decode_ring_id(struct tlv_iterator *tlv_iter, struct tlv_ring_id *ring_id)
{
const char *opt_data;
uint16_t opt_len;
uint32_t nu32;
uint64_t nu64;
char tmp_buf[12];
opt_len = tlv_iter_get_len(tlv_iter);
opt_data = tlv_iter_get_data(tlv_iter);
if (opt_len != sizeof(tmp_buf)) {
return (-1);
}
memcpy(&nu32, opt_data, sizeof(nu32));
memcpy(&nu64, opt_data + sizeof(nu32), sizeof(nu64));
ring_id->node_id = ntohl(nu32);
ring_id->seq = be64toh(nu64);
return (0);
}
int
tlv_iter_decode_tie_breaker(struct tlv_iterator *tlv_iter, struct tlv_tie_breaker *tie_breaker)
{
const char *opt_data;
uint16_t opt_len;
uint32_t nu32;
uint8_t u8;
enum tlv_tie_breaker_mode tie_breaker_mode;
char tmp_buf[5];
opt_len = tlv_iter_get_len(tlv_iter);
opt_data = tlv_iter_get_data(tlv_iter);
if (opt_len != sizeof(tmp_buf)) {
return (-1);
}
memcpy(&u8, opt_data, sizeof(u8));
tie_breaker_mode = u8;
if (tie_breaker_mode != TLV_TIE_BREAKER_MODE_LOWEST &&
tie_breaker_mode != TLV_TIE_BREAKER_MODE_HIGHEST &&
tie_breaker_mode != TLV_TIE_BREAKER_MODE_NODE_ID) {
return (-4);
}
memcpy(&nu32, opt_data + sizeof(u8), sizeof(nu32));
tie_breaker->mode = tie_breaker_mode;
tie_breaker->node_id = (tie_breaker->mode == TLV_TIE_BREAKER_MODE_NODE_ID ?
ntohl(nu32) : 0);
return (0);
}
int
tlv_iter_decode_node_state(struct tlv_iterator *tlv_iter, enum tlv_node_state *node_state)
{
uint8_t u8;
enum tlv_node_state tmp_node_state;
if (tlv_iter_decode_u8(tlv_iter, &u8) != 0) {
return (-1);
}
tmp_node_state = u8;
if (tmp_node_state != TLV_NODE_STATE_MEMBER &&
tmp_node_state != TLV_NODE_STATE_DEAD &&
tmp_node_state != TLV_NODE_STATE_LEAVING) {
return (-4);
}
*node_state = tmp_node_state;
return (0);
}
int
tlv_iter_decode_node_info(struct tlv_iterator *tlv_iter, struct tlv_node_info *node_info)
{
struct tlv_iterator data_tlv_iter;
int iter_res;
int res;
enum tlv_opt_type opt_type;
struct tlv_node_info tmp_node_info;
memset(&tmp_node_info, 0, sizeof(tmp_node_info));
tlv_iter_init_str(tlv_iter_get_data(tlv_iter), tlv_iter_get_len(tlv_iter), 0,
&data_tlv_iter);
while ((iter_res = tlv_iter_next(&data_tlv_iter)) > 0) {
opt_type = tlv_iter_get_type(&data_tlv_iter);
switch (opt_type) {
case TLV_OPT_NODE_ID:
if ((res = tlv_iter_decode_u32(&data_tlv_iter,
&tmp_node_info.node_id)) != 0) {
return (res);
}
break;
case TLV_OPT_DATA_CENTER_ID:
if ((res = tlv_iter_decode_u32(&data_tlv_iter,
&tmp_node_info.data_center_id)) != 0) {
return (res);
}
break;
case TLV_OPT_NODE_STATE:
if ((res = tlv_iter_decode_node_state(&data_tlv_iter,
&tmp_node_info.node_state)) != 0) {
return (res);
}
break;
default:
/*
* Other options are not processed
*/
break;
}
}
if (iter_res != 0) {
return (-3);
}
if (tmp_node_info.node_id == 0) {
return (-4);
}
memcpy(node_info, &tmp_node_info, sizeof(tmp_node_info));
return (0);
}
int
tlv_iter_decode_node_list_type(struct tlv_iterator *tlv_iter,
enum tlv_node_list_type *node_list_type)
{
uint8_t u8;
enum tlv_node_list_type tmp_node_list_type;
if (tlv_iter_decode_u8(tlv_iter, &u8) != 0) {
return (-1);
}
tmp_node_list_type = u8;
if (tmp_node_list_type != TLV_NODE_LIST_TYPE_INITIAL_CONFIG &&
tmp_node_list_type != TLV_NODE_LIST_TYPE_CHANGED_CONFIG &&
tmp_node_list_type != TLV_NODE_LIST_TYPE_MEMBERSHIP &&
tmp_node_list_type != TLV_NODE_LIST_TYPE_QUORUM) {
return (-4);
}
*node_list_type = tmp_node_list_type;
return (0);
}
int
tlv_iter_decode_vote(struct tlv_iterator *tlv_iter, enum tlv_vote *vote)
{
uint8_t u8;
enum tlv_vote tmp_vote;
if (tlv_iter_decode_u8(tlv_iter, &u8) != 0) {
return (-1);
}
tmp_vote = u8;
if (tmp_vote != TLV_VOTE_ACK &&
tmp_vote != TLV_VOTE_NACK &&
tmp_vote != TLV_VOTE_ASK_LATER &&
tmp_vote != TLV_VOTE_WAIT_FOR_REPLY &&
tmp_vote != TLV_VOTE_NO_CHANGE) {
return (-4);
}
*vote = tmp_vote;
return (0);
}
int
tlv_iter_decode_quorate(struct tlv_iterator *tlv_iter, enum tlv_quorate *quorate)
{
uint8_t u8;
enum tlv_quorate tmp_quorate;
if (tlv_iter_decode_u8(tlv_iter, &u8) != 0) {
return (-1);
}
tmp_quorate = u8;
if (tmp_quorate != TLV_QUORATE_QUORATE &&
tmp_quorate != TLV_QUORATE_INQUORATE) {
return (-4);
}
*quorate = tmp_quorate;
return (0);
}
void
tlv_get_supported_options(enum tlv_opt_type **supported_options, size_t *no_supported_options)
{
*supported_options = tlv_static_supported_options;
*no_supported_options = TLV_STATIC_SUPPORTED_OPTIONS_SIZE;
}
int
tlv_ring_id_eq(const struct tlv_ring_id *rid1, const struct tlv_ring_id *rid2)
{
return (rid1->node_id == rid2->node_id && rid1->seq == rid2->seq);
}
int
tlv_tie_breaker_eq(const struct tlv_tie_breaker *tb1, const struct tlv_tie_breaker *tb2)
{
if (tb1->mode == tb2->mode && tb1->mode == TLV_TIE_BREAKER_MODE_NODE_ID) {
return (tb1->node_id == tb2->node_id);
}
return (tb1->mode == tb2->mode);
}
const char *
tlv_vote_to_str(enum tlv_vote vote)
{
switch (vote) {
case TLV_VOTE_UNDEFINED: break;
case TLV_VOTE_ACK: return ("ACK"); break;
case TLV_VOTE_NACK: return ("NACK"); break;
case TLV_VOTE_ASK_LATER: return ("Ask later"); break;
case TLV_VOTE_WAIT_FOR_REPLY: return ("Wait for reply"); break;
case TLV_VOTE_NO_CHANGE: return ("No change"); break;
}
return ("Unknown vote value");
}
const char *
tlv_node_state_to_str(enum tlv_node_state state)
{
switch (state) {
case TLV_NODE_STATE_NOT_SET: return ("not set"); break;
case TLV_NODE_STATE_MEMBER: return ("member"); break;
case TLV_NODE_STATE_DEAD: return ("dead"); break;
case TLV_NODE_STATE_LEAVING: return ("leaving"); break;
}
return ("Unhandled node state");
}
const char *
tlv_tls_supported_to_str(enum tlv_tls_supported tls_supported)
{
switch (tls_supported) {
case TLV_TLS_UNSUPPORTED: return ("Unsupported"); break;
case TLV_TLS_SUPPORTED: return ("Supported"); break;
case TLV_TLS_REQUIRED: return ("Required"); break;
}
return ("Unhandled tls supported state");
}
const char *
tlv_decision_algorithm_type_to_str(enum tlv_decision_algorithm_type algorithm)
{
switch (algorithm) {
case TLV_DECISION_ALGORITHM_TYPE_TEST: return ("Test"); break;
case TLV_DECISION_ALGORITHM_TYPE_FFSPLIT: return ("Fifty-Fifty split"); break;
case TLV_DECISION_ALGORITHM_TYPE_2NODELMS: return ("2 Node LMS"); break;
case TLV_DECISION_ALGORITHM_TYPE_LMS: return ("LMS"); break;
}
return ("Unknown algorithm");
}
diff --git a/test/cpgbench.c b/test/cpgbench.c
index 23fc2b0a..059effe2 100644
--- a/test/cpgbench.c
+++ b/test/cpgbench.c
@@ -1,198 +1,197 @@
/*
* Copyright (c) 2006, 2009 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Steven Dake (sdake@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <config.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <unistd.h>
#include <errno.h>
-#include <unistd.h>
#include <time.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <sys/uio.h>
#include <sys/un.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <qb/qblog.h>
#include <qb/qbutil.h>
#include <corosync/corotypes.h>
#include <corosync/cpg.h>
static cpg_handle_t handle;
static pthread_t thread;
#ifndef timersub
#define timersub(a, b, result) \
do { \
(result)->tv_sec = (a)->tv_sec - (b)->tv_sec; \
(result)->tv_usec = (a)->tv_usec - (b)->tv_usec; \
if ((result)->tv_usec < 0) { \
--(result)->tv_sec; \
(result)->tv_usec += 1000000; \
} \
} while (0)
#endif /* timersub */
static int alarm_notice;
static void cpg_bm_confchg_fn (
cpg_handle_t handle_in,
const struct cpg_name *group_name,
const struct cpg_address *member_list, size_t member_list_entries,
const struct cpg_address *left_list, size_t left_list_entries,
const struct cpg_address *joined_list, size_t joined_list_entries)
{
}
static unsigned int write_count;
static void cpg_bm_deliver_fn (
cpg_handle_t handle_in,
const struct cpg_name *group_name,
uint32_t nodeid,
uint32_t pid,
void *msg,
size_t msg_len)
{
write_count++;
}
static cpg_callbacks_t callbacks = {
.cpg_deliver_fn = cpg_bm_deliver_fn,
.cpg_confchg_fn = cpg_bm_confchg_fn
};
#define ONE_MEG 1048576
static char data[ONE_MEG];
static void cpg_benchmark (
cpg_handle_t handle_in,
int write_size)
{
struct timeval tv1, tv2, tv_elapsed;
struct iovec iov;
unsigned int res;
alarm_notice = 0;
iov.iov_base = data;
iov.iov_len = write_size;
write_count = 0;
alarm (10);
gettimeofday (&tv1, NULL);
do {
res = cpg_mcast_joined (handle_in, CPG_TYPE_AGREED, &iov, 1);
} while (alarm_notice == 0 && (res == CS_OK || res == CS_ERR_TRY_AGAIN));
gettimeofday (&tv2, NULL);
timersub (&tv2, &tv1, &tv_elapsed);
printf ("%5d messages received ", write_count);
printf ("%5d bytes per write ", write_size);
printf ("%7.3f Seconds runtime ",
(tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)));
printf ("%9.3f TP/s ",
((float)write_count) / (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)));
printf ("%7.3f MB/s.\n",
((float)write_count) * ((float)write_size) / ((tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)) * 1000000.0));
}
static void sigalrm_handler (int num)
{
alarm_notice = 1;
}
static struct cpg_name group_name = {
.value = "cpg_bm",
.length = 6
};
static void* dispatch_thread (void *arg)
{
cpg_dispatch (handle, CS_DISPATCH_BLOCKING);
return NULL;
}
int main (void) {
unsigned int size;
int i;
unsigned int res;
qb_log_init("cpgbench", LOG_USER, LOG_EMERG);
qb_log_ctl(QB_LOG_SYSLOG, QB_LOG_CONF_ENABLED, QB_FALSE);
qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD,
QB_LOG_FILTER_FILE, "*", LOG_DEBUG);
qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE);
size = 64;
signal (SIGALRM, sigalrm_handler);
res = cpg_initialize (&handle, &callbacks);
if (res != CS_OK) {
printf ("cpg_initialize failed with result %d\n", res);
exit (1);
}
pthread_create (&thread, NULL, dispatch_thread, NULL);
res = cpg_join (handle, &group_name);
if (res != CS_OK) {
printf ("cpg_join failed with result %d\n", res);
exit (1);
}
for (i = 0; i < 10; i++) { /* number of repetitions - up to 50k */
cpg_benchmark (handle, size);
signal (SIGALRM, sigalrm_handler);
size *= 5;
if (size >= (ONE_MEG - 100)) {
break;
}
}
res = cpg_finalize (handle);
if (res != CS_OK) {
printf ("cpg_finalize failed with result %d\n", res);
exit (1);
}
return (0);
}
diff --git a/test/cpgbenchzc.c b/test/cpgbenchzc.c
index db78289a..92f55ee9 100644
--- a/test/cpgbenchzc.c
+++ b/test/cpgbenchzc.c
@@ -1,195 +1,193 @@
/*
* Copyright (c) 2006, 2009 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Steven Dake (sdake@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <config.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <unistd.h>
#include <errno.h>
-#include <unistd.h>
#include <time.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <sys/un.h>
-#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <corosync/corotypes.h>
#include <corosync/cpg.h>
#include "../lib/util.h"
#ifndef timersub
#define timersub(a, b, result) \
do { \
(result)->tv_sec = (a)->tv_sec - (b)->tv_sec; \
(result)->tv_usec = (a)->tv_usec - (b)->tv_usec; \
if ((result)->tv_usec < 0) { \
--(result)->tv_sec; \
(result)->tv_usec += 1000000; \
} \
} while (0)
#endif
static int alarm_notice;
static void cpg_bm_confchg_fn (
cpg_handle_t handle,
const struct cpg_name *group_name,
const struct cpg_address *member_list, size_t member_list_entries,
const struct cpg_address *left_list, size_t left_list_entries,
const struct cpg_address *joined_list, size_t joined_list_entries)
{
}
static unsigned int write_count;
static void cpg_bm_deliver_fn (
cpg_handle_t handle,
const struct cpg_name *group_name,
uint32_t nodeid,
uint32_t pid,
void *msg,
size_t msg_len)
{
write_count++;
}
static cpg_callbacks_t callbacks = {
.cpg_deliver_fn = cpg_bm_deliver_fn,
.cpg_confchg_fn = cpg_bm_confchg_fn
};
void *data;
static void cpg_benchmark (
cpg_handle_t handle,
int write_size)
{
struct timeval tv1, tv2, tv_elapsed;
unsigned int res;
cpg_flow_control_state_t flow_control_state;
alarm_notice = 0;
write_count = 0;
alarm (10);
gettimeofday (&tv1, NULL);
do {
/*
* Test checkpoint write
*/
cpg_flow_control_state_get (handle, &flow_control_state);
if (flow_control_state == CPG_FLOW_CONTROL_DISABLED) {
retry:
res = cpg_zcb_mcast_joined (handle, CPG_TYPE_AGREED, data, write_size);
if (res == CS_ERR_TRY_AGAIN) {
goto retry;
}
}
res = cpg_dispatch (handle, CS_DISPATCH_ALL);
if (res != CS_OK) {
printf ("cpg dispatch returned error %d\n", res);
exit (1);
}
} while (alarm_notice == 0);
gettimeofday (&tv2, NULL);
timersub (&tv2, &tv1, &tv_elapsed);
printf ("%5d messages received ", write_count);
printf ("%5d bytes per write ", write_size);
printf ("%7.3f Seconds runtime ",
(tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)));
printf ("%9.3f TP/s ",
((float)write_count) / (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)));
printf ("%7.3f MB/s.\n",
((float)write_count) * ((float)write_size) / ((tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)) * 1000000.0));
}
static void sigalrm_handler (int num)
{
alarm_notice = 1;
}
static struct cpg_name group_name = {
.value = "cpg_bm",
.length = 6
};
int main (void) {
cpg_handle_t handle;
unsigned int size;
int i;
unsigned int res;
size = 1000;
signal (SIGALRM, sigalrm_handler);
res = cpg_initialize (&handle, &callbacks);
if (res != CS_OK) {
printf ("cpg_initialize failed with result %d\n", res);
exit (1);
}
cpg_zcb_alloc (handle, 500000, &data);
if (res != CS_OK) {
printf ("cpg_zcb_alloc couldn't allocate zero copy buffer %d\n", res);
exit (1);
}
res = cpg_join (handle, &group_name);
if (res != CS_OK) {
printf ("cpg_join failed with result %s\n", cs_strerror(res));
exit (1);
}
for (i = 0; i < 50; i++) { /* number of repetitions - up to 50k */
cpg_benchmark (handle, size);
size += 1000;
}
res = cpg_finalize (handle);
if (res != CS_OK) {
printf ("cpg_finalize failed with result %s\n", cs_strerror(res));
exit (1);
}
return (0);
}
diff --git a/test/cpghum.c b/test/cpghum.c
index 5772f818..006bf45e 100644
--- a/test/cpghum.c
+++ b/test/cpghum.c
@@ -1,437 +1,436 @@
/*
* Copyright (c) 2015 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Christine Caulfield <ccaulfie@redhat.com>
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <unistd.h>
#include <errno.h>
-#include <unistd.h>
#include <time.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <sys/uio.h>
#include <sys/un.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <zlib.h>
#include <libgen.h>
#include <qb/qblog.h>
#include <qb/qbutil.h>
#include <corosync/corotypes.h>
#include <corosync/cpg.h>
static cpg_handle_t handle;
static pthread_t thread;
#ifndef timersub
#define timersub(a, b, result) \
do { \
(result)->tv_sec = (a)->tv_sec - (b)->tv_sec; \
(result)->tv_usec = (a)->tv_usec - (b)->tv_usec; \
if ((result)->tv_usec < 0) { \
--(result)->tv_sec; \
(result)->tv_usec += 1000000; \
} \
} while (0)
#endif /* timersub */
static int alarm_notice;
#define ONE_MEG 1048576
#define DATASIZE (ONE_MEG*20)
static char data[DATASIZE];
static int send_counter = 0;
static int do_syslog = 0;
static int quiet = 0;
static volatile int stopped;
// stats
static unsigned int length_errors=0;
static unsigned int crc_errors=0;
static unsigned int sequence_errors=0;
static unsigned int packets_sent=0;
static unsigned int packets_recvd=0;
static unsigned int send_retries=0;
static unsigned int send_fails=0;
static void cpg_bm_confchg_fn (
cpg_handle_t handle_in,
const struct cpg_name *group_name,
const struct cpg_address *member_list, size_t member_list_entries,
const struct cpg_address *left_list, size_t left_list_entries,
const struct cpg_address *joined_list, size_t joined_list_entries)
{
}
static unsigned int g_recv_count;
static unsigned int g_recv_length;
static unsigned int g_write_size;
static int g_recv_counter = 0;
static void cpg_bm_deliver_fn (
cpg_handle_t handle_in,
const struct cpg_name *group_name,
uint32_t nodeid,
uint32_t pid,
void *msg,
size_t msg_len)
{
int *value = msg;
uLong crc=0;
uLong recv_crc = value[1] & 0xFFFFFFFF;
packets_recvd++;
g_recv_length = msg_len;
// Basic check, packets should all be the right size
if (g_write_size && (msg_len != g_write_size)) {
length_errors++;
fprintf(stderr, "%s: message sizes don't match. got %lu, expected %u\n", group_name->value, msg_len, g_write_size);
if (do_syslog) {
syslog(LOG_ERR, "%s: message sizes don't match. got %lu, expected %u\n", group_name->value, msg_len, g_write_size);
}
}
// Sequence counters are incrementing in step?
if (*value != g_recv_counter) {
sequence_errors++;
fprintf(stderr, "%s: counters don't match. got %d, expected %d\n", group_name->value, *value, g_recv_counter);
if (do_syslog) {
syslog(LOG_ERR, "%s: counters don't match. got %d, expected %d\n", group_name->value, *value, g_recv_counter);
}
// Catch up or we'll be printing errors for ever
g_recv_counter = *value +1;
} else {
g_recv_counter++;
}
// Check crc
crc = crc32(0, NULL, 0);
crc = crc32(crc, (Bytef *)&value[2], msg_len-sizeof(int)*2) & 0xFFFFFFFF;
if (crc != recv_crc) {
crc_errors++;
fprintf(stderr, "%s: CRCs don't match. got %lx, expected %lx\n", group_name->value, recv_crc, crc);
if (do_syslog) {
syslog(LOG_ERR, "%s: CRCs don't match. got %lx, expected %lx\n", group_name->value, recv_crc, crc);
}
}
g_recv_count++;
}
static cpg_model_v1_data_t model1_data = {
.cpg_deliver_fn = cpg_bm_deliver_fn,
.cpg_confchg_fn = cpg_bm_confchg_fn,
};
static cpg_callbacks_t callbacks = {
.cpg_deliver_fn = cpg_bm_deliver_fn,
.cpg_confchg_fn = cpg_bm_confchg_fn
};
static struct cpg_name group_name = {
.value = "cpghum",
.length = 7
};
static void cpg_test (
cpg_handle_t handle_in,
int write_size,
int delay_time,
int print_time)
{
struct timeval tv1, tv2, tv_elapsed;
struct iovec iov;
unsigned int res;
int i;
unsigned int *dataint = (unsigned int *)data;
uLong crc;
alarm_notice = 0;
iov.iov_base = data;
iov.iov_len = write_size;
g_recv_count = 0;
alarm (print_time);
gettimeofday (&tv1, NULL);
do {
dataint[0] = send_counter++;
for (i=2; i<(DATASIZE-sizeof(int)*2)/4; i++) {
dataint[i] = rand();
}
crc = crc32(0, NULL, 0);
dataint[1] = crc32(crc, (Bytef*)&dataint[2], write_size-sizeof(int)*2);
resend:
res = cpg_mcast_joined (handle_in, CPG_TYPE_AGREED, &iov, 1);
if (res == CS_ERR_TRY_AGAIN) {
usleep(10000);
send_retries++;
goto resend;
}
if (res != CS_OK) {
fprintf(stderr, "send failed: %d\n", res);
send_fails++;
}
else {
packets_sent++;
}
usleep(delay_time*1000);
} while (alarm_notice == 0 && (res == CS_OK || res == CS_ERR_TRY_AGAIN) && stopped == 0);
gettimeofday (&tv2, NULL);
timersub (&tv2, &tv1, &tv_elapsed);
if (!quiet) {
printf ("%s: %5d message%s received, ", group_name.value, g_recv_count, g_recv_count==1?"":"s");
printf ("%5d bytes per write\n", write_size);
}
}
static void sigalrm_handler (int num)
{
alarm_notice = 1;
}
static void sigint_handler (int num)
{
stopped = 1;
}
static void* dispatch_thread (void *arg)
{
cpg_dispatch (handle, CS_DISPATCH_BLOCKING);
return NULL;
}
static void usage(char *cmd)
{
fprintf(stderr, "%s [OPTIONS]\n", cmd);
fprintf(stderr, "\n");
fprintf(stderr, "%s sends CPG messages to all registered users of the CPG.\n", cmd);
fprintf(stderr, "The messages have a sequence number and a CRC so that missing or\n");
fprintf(stderr, "corrupted messages will be detected and reported.\n");
fprintf(stderr, "\n");
fprintf(stderr, "%s can also be asked to simply listen for (and check) packets\n", cmd);
fprintf(stderr, "so that there is another node in the cluster connected to the CPG.\n");
fprintf(stderr, "\n");
fprintf(stderr, "When -l is present, packet size is only checked if specified by -w or -W\n");
fprintf(stderr, "and it, obviously, must match that of the sender.\n");
fprintf(stderr, "\n");
fprintf(stderr, "Multiple copies, in different CPGs, can also be run on the same or\n");
fprintf(stderr, "different nodes by using the -n option.\n");
fprintf(stderr, "\n");
fprintf(stderr, "%s can't handle more than 1 sender in the same CPG as it messes with the\n", cmd);
fprintf(stderr, "sequence numbers.\n");
fprintf(stderr, "\n");
fprintf(stderr, " -w Write size in Kbytes, default 4\n");
fprintf(stderr, " -W Write size in bytes, default 4096\n");
fprintf(stderr, " -n CPG name to use, default 'cpghum'\n");
fprintf(stderr, " -d Delay between sending packets (mS), default 1000\n");
fprintf(stderr, " -r Number of repetitions, default 100\n");
fprintf(stderr, " -p Delay between printing output(S), default 10s\n");
fprintf(stderr, " -l Listen and check CRCs only, don't send (^C to quit)\n");
fprintf(stderr, " -m cpg_initialise() model. Default 1.\n");
fprintf(stderr, " -s Also send errors to syslog (for daemon log correlation).\n");
fprintf(stderr, " -q Quiet. Don't print messages every 10 seconds (see also -p)\n");
fprintf(stderr, "\n");
}
int main (int argc, char *argv[]) {
int i;
unsigned int res;
uint32_t maxsize;
int opt;
int bs;
int write_size = 4096;
int delay_time = 1000;
int repetitions = 100;
int print_time = 10;
int have_size = 0;
int listen_only = 0;
int model = 1;
while ( (opt = getopt(argc, argv, "qlsn:d:r:p:m:w:W:")) != -1 ) {
switch (opt) {
case 'w': // Write size in K
bs = atoi(optarg);
if (bs > 0) {
write_size = bs*1024;
have_size = 1;
}
break;
case 'W': // Write size in bytes
bs = atoi(optarg);
if (bs > 0) {
write_size = bs;
have_size = 1;
}
break;
case 'n':
if (strlen(optarg) >= CPG_MAX_NAME_LENGTH) {
fprintf(stderr, "CPG name too long\n");
exit(1);
}
strcpy(group_name.value, optarg);
group_name.length = strlen(group_name.value);
break;
case 'd':
delay_time = atoi(optarg);
break;
case 'r':
repetitions = atoi(optarg);
break;
case 'p':
print_time = atoi(optarg);
break;
case 'l':
listen_only = 1;
break;
case 's':
do_syslog = 1;
break;
case 'q':
quiet = 1;
break;
case 'm':
model = atoi(optarg);
if (model < 0 || model > 1) {
fprintf(stderr, "%s: Model must be 0-1\n", argv[0]);
exit(1);
}
break;
case '?':
usage(basename(argv[0]));
exit(0);
}
}
qb_log_init("cpghum", LOG_USER, LOG_EMERG);
qb_log_ctl(QB_LOG_SYSLOG, QB_LOG_CONF_ENABLED, QB_FALSE);
qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD,
QB_LOG_FILTER_FILE, "*", LOG_DEBUG);
qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE);
g_write_size = write_size;
signal (SIGALRM, sigalrm_handler);
signal (SIGINT, sigint_handler);
switch (model) {
case 0:
res = cpg_initialize (&handle, &callbacks);
break;
case 1:
res = cpg_model_initialize (&handle, CPG_MODEL_V1, (cpg_model_data_t *)&model1_data, NULL);
break;
default:
res=999; // can't get here but it keeps the compiler happy
break;
}
if (res != CS_OK) {
printf ("cpg_initialize failed with result %d\n", res);
exit (1);
}
pthread_create (&thread, NULL, dispatch_thread, NULL);
res = cpg_join (handle, &group_name);
if (res != CS_OK) {
printf ("cpg_join failed with result %d\n", res);
exit (1);
}
if (listen_only) {
int secs = 0;
if (!quiet) {
printf("-- Listening on CPG %s\n", group_name.value);
printf("-- Ignore any starting \"counters don't match\" error while we catch up\n");
}
/* Only check packet size if specified on the command-line */
if (!have_size) {
g_write_size = 0;
}
while (!stopped) {
sleep(1);
if (++secs > print_time && !quiet) {
printf ("%s: %5d message%s received. %d bytes\n", group_name.value, g_recv_count, g_recv_count==1?"":"s", g_recv_length);
secs = 0;
g_recv_count = 0;
}
}
}
else {
cpg_max_atomic_msgsize_get (handle, &maxsize);
if ( write_size > maxsize) {
fprintf(stderr, "INFO: packet size (%d) is larger than the maximum atomic size (%d), libcpg will fragment\n",
write_size, maxsize);
}
for (i = 0; i < repetitions && !stopped; i++) {
cpg_test (handle, write_size, delay_time, print_time);
signal (SIGALRM, sigalrm_handler);
}
}
res = cpg_finalize (handle);
if (res != CS_OK) {
printf ("cpg_finalize failed with result %d\n", res);
exit (1);
}
printf("\n");
printf("Stats:\n");
if (!listen_only) {
printf(" packets sent: %d\n", packets_sent);
printf(" send failures: %d\n", send_fails);
printf(" send retries: %d\n", send_retries);
}
if (have_size) {
printf(" length errors: %d\n", length_errors);
}
printf(" packets recvd: %d\n", packets_recvd);
printf(" sequence errors: %d\n", sequence_errors);
printf(" crc errors: %d\n", crc_errors);
printf("\n");
return (0);
}
diff --git a/tools/corosync-notifyd.c b/tools/corosync-notifyd.c
index 507a2485..8f37a307 100644
--- a/tools/corosync-notifyd.c
+++ b/tools/corosync-notifyd.c
@@ -1,1222 +1,1219 @@
/*
* Copyright (c) 2011-2012 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <config.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/types.h>
#include <netdb.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include <ctype.h>
#include <poll.h>
#include <signal.h>
#include <qb/qbdefs.h>
#include <qb/qbloop.h>
#include <qb/qblog.h>
-#include <qb/qbdefs.h>
-#include <qb/qbloop.h>
-
#include <corosync/corotypes.h>
#include <corosync/cfg.h>
#include <corosync/quorum.h>
#include <corosync/cmap.h>
/*
* generic declarations
*/
enum {
CS_NTF_LOG,
CS_NTF_STDOUT,
CS_NTF_SNMP,
CS_NTF_DBUS,
CS_NTF_FG,
CS_NTF_MAX,
};
static int conf[CS_NTF_MAX];
static int32_t _cs_is_quorate = 0;
typedef void (*node_membership_fn_t)(char *nodename, uint32_t nodeid, char *state, char* ip);
typedef void (*node_quorum_fn_t)(char *nodename, uint32_t nodeid, const char *state);
typedef void (*application_connection_fn_t)(char *nodename, uint32_t nodeid, char *app_name, const char *state);
typedef void (*rrp_faulty_fn_t)(char *nodename, uint32_t nodeid, uint32_t iface_no, const char *state);
struct notify_callbacks {
node_membership_fn_t node_membership_fn;
node_quorum_fn_t node_quorum_fn;
application_connection_fn_t application_connection_fn;
rrp_faulty_fn_t rrp_faulty_fn;
};
#define MAX_NOTIFIERS 5
static int num_notifiers = 0;
static struct notify_callbacks notifiers[MAX_NOTIFIERS];
static uint32_t local_nodeid = 0;
static char local_nodename[CS_MAX_NAME_LENGTH];
static qb_loop_t *main_loop;
static quorum_handle_t quorum_handle;
static void _cs_node_membership_event(char *nodename, uint32_t nodeid, char *state, char* ip);
static void _cs_node_quorum_event(const char *state);
static void _cs_application_connection_event(char *app_name, const char *state);
static void _cs_rrp_faulty_event(uint32_t iface_no, const char *state);
#ifdef HAVE_DBUS
#include <dbus/dbus.h>
/*
* dbus
*/
#define DBUS_CS_NAME "org.corosync"
#define DBUS_CS_IFACE "org.corosync"
#define DBUS_CS_PATH "/org/corosync"
static DBusConnection *db = NULL;
static char _err[512];
static int err_set = 0;
static void _cs_dbus_init(void);
#endif /* HAVE_DBUS */
#ifdef ENABLE_SNMP
#include <net-snmp/net-snmp-config.h>
#include <net-snmp/snmpv3_api.h>
#include <net-snmp/agent/agent_trap.h>
#include <net-snmp/library/mib.h>
#include <net-snmp/library/snmp_api.h>
#include <net-snmp/library/snmp_client.h>
#include <net-snmp/library/snmp_debug.h>
enum snmp_node_status {
SNMP_NODE_STATUS_UNKNOWN = 0,
SNMP_NODE_STATUS_JOINED = 1,
SNMP_NODE_STATUS_LEFT = 2
};
#define SNMP_OID_COROSYNC "1.3.6.1.4.1.35488"
#define SNMP_OID_OBJECT_ROOT SNMP_OID_COROSYNC ".1"
#define SNMP_OID_OBJECT_NODE_NAME SNMP_OID_OBJECT_ROOT ".1"
#define SNMP_OID_OBJECT_NODE_ID SNMP_OID_OBJECT_ROOT ".2"
#define SNMP_OID_OBJECT_NODE_STATUS SNMP_OID_OBJECT_ROOT ".3"
#define SNMP_OID_OBJECT_NODE_ADDR SNMP_OID_OBJECT_ROOT ".4"
#define SNMP_OID_OBJECT_RINGSEQ SNMP_OID_OBJECT_ROOT ".20"
#define SNMP_OID_OBJECT_QUORUM SNMP_OID_OBJECT_ROOT ".21"
#define SNMP_OID_OBJECT_APP_NAME SNMP_OID_OBJECT_ROOT ".40"
#define SNMP_OID_OBJECT_APP_STATUS SNMP_OID_OBJECT_ROOT ".41"
#define SNMP_OID_OBJECT_RRP_IFACE_NO SNMP_OID_OBJECT_ROOT ".60"
#define SNMP_OID_OBJECT_RRP_STATUS SNMP_OID_OBJECT_ROOT ".61"
#define SNMP_OID_TRAPS_ROOT SNMP_OID_COROSYNC ".0"
#define SNMP_OID_TRAPS_NODE SNMP_OID_TRAPS_ROOT ".1"
#define SNMP_OID_TRAPS_QUORUM SNMP_OID_TRAPS_ROOT ".2"
#define SNMP_OID_TRAPS_APP SNMP_OID_TRAPS_ROOT ".3"
#define SNMP_OID_TRAPS_RRP SNMP_OID_TRAPS_ROOT ".4"
#define CS_TIMESTAMP_STR_LEN 20
static const char *local_host = "localhost";
#endif /* ENABLE_SNMP */
static char snmp_manager_buf[CS_MAX_NAME_LENGTH];
static char *snmp_manager = NULL;
#define CMAP_MAX_RETRIES 10
/*
* cmap
*/
static cmap_handle_t cmap_handle;
static int32_t _cs_ip_to_hostname(char* ip, char* name_out)
{
struct sockaddr_in sa;
int rc;
if (strchr(ip, ':') == NULL) {
sa.sin_family = AF_INET;
} else {
sa.sin_family = AF_INET6;
}
rc = inet_pton(sa.sin_family, ip, &sa.sin_addr);
if (rc == 0) {
return -EINVAL;
}
rc = getnameinfo((struct sockaddr*)&sa, sizeof(sa),
name_out, CS_MAX_NAME_LENGTH, NULL, 0, 0);
if (rc != 0) {
qb_log(LOG_ERR, 0, "error looking up %s : %s", ip, gai_strerror(rc));
return -EINVAL;
}
return 0;
}
static void _cs_cmap_members_key_changed (
cmap_handle_t cmap_handle_c,
cmap_track_handle_t cmap_track_handle,
int32_t event,
const char *key_name,
struct cmap_notify_value new_value,
struct cmap_notify_value old_value,
void *user_data)
{
char nodename[CS_MAX_NAME_LENGTH];
char* open_bracket = NULL;
char* close_bracket = NULL;
int res;
uint32_t nodeid;
char *ip_str;
char tmp_key[CMAP_KEYNAME_MAXLEN];
cs_error_t err;
int no_retries;
if (event != CMAP_TRACK_ADD && event != CMAP_TRACK_MODIFY) {
return ;
}
res = sscanf(key_name, "runtime.totem.pg.mrp.srp.members.%u.%s", &nodeid, tmp_key);
if (res != 2)
return ;
if (strcmp(tmp_key, "status") != 0) {
return ;
}
snprintf(tmp_key, CMAP_KEYNAME_MAXLEN, "runtime.totem.pg.mrp.srp.members.%u.ip", nodeid);
no_retries = 0;
while ((err = cmap_get_string(cmap_handle, tmp_key, &ip_str)) == CS_ERR_TRY_AGAIN &&
no_retries++ < CMAP_MAX_RETRIES) {
sleep(1);
}
if (err != CS_OK) {
return ;
}
/*
* We want the ip out of: "r(0) ip(192.168.100.92)"
*/
open_bracket = strrchr(ip_str, '(');
open_bracket++;
close_bracket = strrchr(open_bracket, ')');
*close_bracket = '\0';
_cs_ip_to_hostname(open_bracket, nodename);
_cs_node_membership_event(nodename, nodeid, (char *)new_value.data, open_bracket);
free(ip_str);
}
static void _cs_cmap_connections_key_changed (
cmap_handle_t cmap_handle_c,
cmap_track_handle_t cmap_track_handle,
int32_t event,
const char *key_name,
struct cmap_notify_value new_value,
struct cmap_notify_value old_value,
void *user_data)
{
char obj_name[CS_MAX_NAME_LENGTH];
char conn_str[CMAP_KEYNAME_MAXLEN];
char tmp_key[CMAP_KEYNAME_MAXLEN];
int res;
res = sscanf(key_name, "runtime.connections.%[^.].%s", conn_str, tmp_key);
if (res != 2) {
return ;
}
if (strcmp(tmp_key, "service_id") != 0) {
return ;
}
snprintf(obj_name, CS_MAX_NAME_LENGTH, "%s", conn_str);
if (event == CMAP_TRACK_ADD) {
_cs_application_connection_event(obj_name, "connected");
}
if (event == CMAP_TRACK_DELETE) {
_cs_application_connection_event(obj_name, "disconnected");
}
}
static void _cs_cmap_rrp_faulty_key_changed (
cmap_handle_t cmap_handle_c,
cmap_track_handle_t cmap_track_handle,
int32_t event,
const char *key_name,
struct cmap_notify_value new_value,
struct cmap_notify_value old_value,
void *user_data)
{
uint32_t iface_no;
char tmp_key[CMAP_KEYNAME_MAXLEN];
int res;
int no_retries;
uint8_t faulty;
cs_error_t err;
res = sscanf(key_name, "runtime.totem.pg.mrp.rrp.%u.%s", &iface_no, tmp_key);
if (res != 2) {
return ;
}
if (strcmp(tmp_key, "faulty") != 0) {
return ;
}
no_retries = 0;
while ((err = cmap_get_uint8(cmap_handle, key_name, &faulty)) == CS_ERR_TRY_AGAIN &&
no_retries++ < CMAP_MAX_RETRIES) {
sleep(1);
}
if (err != CS_OK) {
return ;
}
if (faulty) {
_cs_rrp_faulty_event(iface_no, "faulty");
} else {
_cs_rrp_faulty_event(iface_no, "operational");
}
}
static int
_cs_cmap_dispatch(int fd, int revents, void *data)
{
cs_error_t err;
err = cmap_dispatch(cmap_handle, CS_DISPATCH_ONE);
if (err != CS_OK && err != CS_ERR_TRY_AGAIN && err != CS_ERR_TIMEOUT &&
err != CS_ERR_QUEUE_FULL) {
qb_log(LOG_ERR, "Could not dispatch cmap events. Error %u", err);
qb_loop_stop(main_loop);
return -1;
}
return 0;
}
static void _cs_quorum_notification(quorum_handle_t handle,
uint32_t quorate, uint64_t ring_seq,
uint32_t view_list_entries, uint32_t *view_list)
{
if (_cs_is_quorate == quorate) {
return;
}
_cs_is_quorate = quorate;
if (quorate) {
_cs_node_quorum_event("quorate");
} else {
_cs_node_quorum_event("not quorate");
}
}
static int
_cs_quorum_dispatch(int fd, int revents, void *data)
{
cs_error_t err;
err = quorum_dispatch(quorum_handle, CS_DISPATCH_ONE);
if (err != CS_OK && err != CS_ERR_TRY_AGAIN && err != CS_ERR_TIMEOUT &&
err != CS_ERR_QUEUE_FULL) {
qb_log(LOG_ERR, "Could not dispatch quorum events. Error %u", err);
qb_loop_stop(main_loop);
return -1;
}
return 0;
}
static void
_cs_quorum_init(void)
{
cs_error_t rc;
uint32_t quorum_type;
int fd;
quorum_callbacks_t quorum_callbacks = {
.quorum_notify_fn = _cs_quorum_notification,
};
rc = quorum_initialize (&quorum_handle, &quorum_callbacks,
&quorum_type);
if (rc != CS_OK) {
qb_log(LOG_ERR, "Could not connect to corosync(quorum)");
return;
}
quorum_fd_get(quorum_handle, &fd);
qb_loop_poll_add(main_loop, QB_LOOP_MED, fd, POLLIN|POLLNVAL, NULL,
_cs_quorum_dispatch);
rc = quorum_trackstart(quorum_handle, CS_TRACK_CHANGES);
if (rc != CS_OK) {
qb_log(LOG_ERR, "Could not start tracking");
return;
}
}
static void
_cs_quorum_finalize(void)
{
quorum_finalize (quorum_handle);
}
#ifdef HAVE_DBUS
/*
* dbus notifications
*/
static void
_cs_dbus_auto_flush(void)
{
dbus_connection_ref(db);
while (dbus_connection_get_dispatch_status(db) == DBUS_DISPATCH_DATA_REMAINS) {
dbus_connection_dispatch(db);
}
while (dbus_connection_has_messages_to_send(db)) {
dbus_connection_flush(db);
}
dbus_connection_unref(db);
}
static void
_cs_dbus_release(void)
{
DBusError err;
if (!db)
return;
dbus_error_init(&err);
dbus_bus_release_name(db, DBUS_CS_NAME, &err);
dbus_error_free(&err);
dbus_connection_unref(db);
db = NULL;
}
static void
_cs_dbus_node_quorum_event(char *nodename, uint32_t nodeid, const char *state)
{
DBusMessage *msg = NULL;
if (err_set) {
qb_log(LOG_ERR, "%s", _err);
err_set = 0;
}
if (!db) {
goto out_free;
}
if (dbus_connection_get_is_connected(db) != TRUE) {
err_set = 1;
snprintf(_err, sizeof(_err), "DBus connection lost");
_cs_dbus_release();
goto out_unlock;
}
_cs_dbus_auto_flush();
if (!(msg = dbus_message_new_signal(DBUS_CS_PATH,
DBUS_CS_IFACE,
"QuorumStateChange"))) {
qb_log(LOG_ERR, "error creating dbus signal");
goto out_unlock;
}
if (!dbus_message_append_args(msg,
DBUS_TYPE_STRING, &nodename,
DBUS_TYPE_UINT32, &nodeid,
DBUS_TYPE_STRING, &state,
DBUS_TYPE_INVALID)) {
qb_log(LOG_ERR, "error adding args to quorum signal");
goto out_unlock;
}
dbus_connection_send(db, msg, NULL);
out_unlock:
if (msg) {
dbus_message_unref(msg);
}
out_free:
return;
}
static void
_cs_dbus_node_membership_event(char *nodename, uint32_t nodeid, char *state, char* ip)
{
DBusMessage *msg = NULL;
if (err_set) {
qb_log(LOG_ERR, "%s", _err);
err_set = 0;
}
if (!db) {
goto out_free;
}
if (dbus_connection_get_is_connected(db) != TRUE) {
err_set = 1;
snprintf(_err, sizeof(_err), "DBus connection lost");
_cs_dbus_release();
goto out_unlock;
}
_cs_dbus_auto_flush();
if (!(msg = dbus_message_new_signal(DBUS_CS_PATH,
DBUS_CS_IFACE,
"NodeStateChange"))) {
qb_log(LOG_ERR, "error creating NodeStateChange signal");
goto out_unlock;
}
if (!dbus_message_append_args(msg,
DBUS_TYPE_STRING, &nodename,
DBUS_TYPE_UINT32, &nodeid,
DBUS_TYPE_STRING, &ip,
DBUS_TYPE_STRING, &state,
DBUS_TYPE_INVALID)) {
qb_log(LOG_ERR, "error adding args to NodeStateChange signal");
goto out_unlock;
}
dbus_connection_send(db, msg, NULL);
out_unlock:
if (msg) {
dbus_message_unref(msg);
}
out_free:
return;
}
static void
_cs_dbus_application_connection_event(char *nodename, uint32_t nodeid, char *app_name, const char *state)
{
DBusMessage *msg = NULL;
if (err_set) {
qb_log(LOG_ERR, "%s", _err);
err_set = 0;
}
if (!db) {
goto out_free;
}
if (dbus_connection_get_is_connected(db) != TRUE) {
err_set = 1;
snprintf(_err, sizeof(_err), "DBus connection lost");
_cs_dbus_release();
goto out_unlock;
}
_cs_dbus_auto_flush();
if (!(msg = dbus_message_new_signal(DBUS_CS_PATH,
DBUS_CS_IFACE,
"ConnectionStateChange"))) {
qb_log(LOG_ERR, "error creating ConnectionStateChange signal");
goto out_unlock;
}
if (!dbus_message_append_args(msg,
DBUS_TYPE_STRING, &nodename,
DBUS_TYPE_UINT32, &nodeid,
DBUS_TYPE_STRING, &app_name,
DBUS_TYPE_STRING, &state,
DBUS_TYPE_INVALID)) {
qb_log(LOG_ERR, "error adding args to ConnectionStateChange signal");
goto out_unlock;
}
dbus_connection_send(db, msg, NULL);
out_unlock:
if (msg) {
dbus_message_unref(msg);
}
out_free:
return;
}
static void
_cs_dbus_rrp_faulty_event(char *nodename, uint32_t nodeid, uint32_t iface_no, const char *state)
{
DBusMessage *msg = NULL;
if (err_set) {
qb_log(LOG_ERR, "%s", _err);
err_set = 0;
}
if (!db) {
goto out_free;
}
if (dbus_connection_get_is_connected(db) != TRUE) {
err_set = 1;
snprintf(_err, sizeof(_err), "DBus connection lost");
_cs_dbus_release();
goto out_unlock;
}
_cs_dbus_auto_flush();
if (!(msg = dbus_message_new_signal(DBUS_CS_PATH,
DBUS_CS_IFACE,
"QuorumStateChange"))) {
qb_log(LOG_ERR, "error creating dbus signal");
goto out_unlock;
}
if (!dbus_message_append_args(msg,
DBUS_TYPE_STRING, &nodename,
DBUS_TYPE_UINT32, &nodeid,
DBUS_TYPE_UINT32, &iface_no,
DBUS_TYPE_STRING, &state,
DBUS_TYPE_INVALID)) {
qb_log(LOG_ERR, "error adding args to rrp signal");
goto out_unlock;
}
dbus_connection_send(db, msg, NULL);
out_unlock:
if (msg) {
dbus_message_unref(msg);
}
out_free:
return;
}
static void
_cs_dbus_init(void)
{
DBusConnection *dbc = NULL;
DBusError err;
dbus_error_init(&err);
dbc = dbus_bus_get(DBUS_BUS_SYSTEM, &err);
if (!dbc) {
snprintf(_err, sizeof(_err),
"dbus_bus_get: %s", err.message);
err_set = 1;
dbus_error_free(&err);
return;
}
dbus_connection_set_exit_on_disconnect(dbc, FALSE);
db = dbc;
notifiers[num_notifiers].node_membership_fn =
_cs_dbus_node_membership_event;
notifiers[num_notifiers].node_quorum_fn =
_cs_dbus_node_quorum_event;
notifiers[num_notifiers].application_connection_fn =
_cs_dbus_application_connection_event;
notifiers[num_notifiers].rrp_faulty_fn =
_cs_dbus_rrp_faulty_event;
num_notifiers++;
}
#endif /* HAVE_DBUS */
#ifdef ENABLE_SNMP
static netsnmp_session *snmp_init (const char *target)
{
static netsnmp_session *session = NULL;
#ifndef NETSNMPV54
char default_port[128];
snprintf (default_port, sizeof (default_port), "%s:162", target);
#endif
if (session) {
return (session);
}
if (target == NULL) {
return NULL;
}
session = malloc (sizeof (netsnmp_session));
snmp_sess_init (session);
session->version = SNMP_VERSION_2c;
session->callback = NULL;
session->callback_magic = NULL;
session = snmp_add(session,
#ifdef NETSNMPV54
netsnmp_transport_open_client ("snmptrap", target),
#else
netsnmp_tdomain_transport (default_port, 0, "udp"),
#endif
NULL, NULL);
if (session == NULL) {
qb_log(LOG_ERR, 0, "Could not create snmp transport");
}
return (session);
}
static inline void add_field (
netsnmp_pdu *trap_pdu,
u_char asn_type,
const char *prefix,
void *value,
size_t value_size)
{
oid _oid[MAX_OID_LEN];
size_t _oid_len = MAX_OID_LEN;
if (snmp_parse_oid(prefix, _oid, &_oid_len)) {
snmp_pdu_add_variable (trap_pdu, _oid, _oid_len, asn_type, (u_char *) value, value_size);
}
}
static void
_cs_snmp_node_membership_event(char *nodename, uint32_t nodeid, char *state, char* ip)
{
int ret;
char csysuptime[CS_TIMESTAMP_STR_LEN];
static oid snmptrap_oid[] = { 1,3,6,1,6,3,1,1,4,1,0 };
static oid sysuptime_oid[] = { 1,3,6,1,2,1,1,3,0 };
time_t now = time (NULL);
netsnmp_pdu *trap_pdu;
netsnmp_session *session = snmp_init (snmp_manager);
if (session == NULL) {
qb_log(LOG_NOTICE, "Failed to init SNMP session.");
return ;
}
trap_pdu = snmp_pdu_create (SNMP_MSG_TRAP2);
if (!trap_pdu) {
qb_log(LOG_NOTICE, "Failed to create SNMP notification.");
return ;
}
/* send uptime */
snprintf (csysuptime, CS_TIMESTAMP_STR_LEN, "%ld", now);
snmp_add_var (trap_pdu, sysuptime_oid, sizeof (sysuptime_oid) / sizeof (oid), 't', csysuptime);
snmp_add_var (trap_pdu, snmptrap_oid, sizeof (snmptrap_oid) / sizeof (oid), 'o', SNMP_OID_TRAPS_NODE);
/* Add extries to the trap */
add_field (trap_pdu, ASN_OCTET_STR, SNMP_OID_OBJECT_NODE_NAME, (void*)nodename, strlen (nodename));
add_field (trap_pdu, ASN_INTEGER, SNMP_OID_OBJECT_NODE_ID, (void*)&nodeid, sizeof (nodeid));
add_field (trap_pdu, ASN_OCTET_STR, SNMP_OID_OBJECT_NODE_ADDR, (void*)ip, strlen (ip));
add_field (trap_pdu, ASN_OCTET_STR, SNMP_OID_OBJECT_NODE_STATUS, (void*)state, strlen (state));
/* Send and cleanup */
ret = snmp_send (session, trap_pdu);
if (ret == 0) {
/* error */
qb_log(LOG_ERR, "Could not send SNMP trap");
snmp_free_pdu (trap_pdu);
}
}
static void
_cs_snmp_node_quorum_event(char *nodename, uint32_t nodeid,
const char *state)
{
int ret;
char csysuptime[20];
static oid snmptrap_oid[] = { 1,3,6,1,6,3,1,1,4,1,0 };
static oid sysuptime_oid[] = { 1,3,6,1,2,1,1,3,0 };
time_t now = time (NULL);
netsnmp_pdu *trap_pdu;
netsnmp_session *session = snmp_init (snmp_manager);
if (session == NULL) {
qb_log(LOG_NOTICE, "Failed to init SNMP session.");
return ;
}
trap_pdu = snmp_pdu_create (SNMP_MSG_TRAP2);
if (!trap_pdu) {
qb_log(LOG_NOTICE, "Failed to create SNMP notification.");
return ;
}
/* send uptime */
sprintf (csysuptime, "%ld", now);
snmp_add_var (trap_pdu, sysuptime_oid, sizeof (sysuptime_oid) / sizeof (oid), 't', csysuptime);
snmp_add_var (trap_pdu, snmptrap_oid, sizeof (snmptrap_oid) / sizeof (oid), 'o', SNMP_OID_TRAPS_QUORUM);
/* Add extries to the trap */
add_field (trap_pdu, ASN_OCTET_STR, SNMP_OID_OBJECT_NODE_NAME, (void*)nodename, strlen (nodename));
add_field (trap_pdu, ASN_INTEGER, SNMP_OID_OBJECT_NODE_ID, (void*)&nodeid, sizeof (nodeid));
add_field (trap_pdu, ASN_OCTET_STR, SNMP_OID_OBJECT_QUORUM, (void*)state, strlen (state));
/* Send and cleanup */
ret = snmp_send (session, trap_pdu);
if (ret == 0) {
/* error */
qb_log(LOG_ERR, "Could not send SNMP trap");
snmp_free_pdu (trap_pdu);
}
}
static void
_cs_snmp_rrp_faulty_event(char *nodename, uint32_t nodeid,
uint32_t iface_no, const char *state)
{
int ret;
char csysuptime[20];
static oid snmptrap_oid[] = { 1,3,6,1,6,3,1,1,4,1,0 };
static oid sysuptime_oid[] = { 1,3,6,1,2,1,1,3,0 };
time_t now = time (NULL);
netsnmp_pdu *trap_pdu;
netsnmp_session *session = snmp_init (snmp_manager);
if (session == NULL) {
qb_log(LOG_NOTICE, "Failed to init SNMP session.");
return ;
}
trap_pdu = snmp_pdu_create (SNMP_MSG_TRAP2);
if (!trap_pdu) {
qb_log(LOG_NOTICE, "Failed to create SNMP notification.");
return ;
}
/* send uptime */
sprintf (csysuptime, "%ld", now);
snmp_add_var (trap_pdu, sysuptime_oid, sizeof (sysuptime_oid) / sizeof (oid), 't', csysuptime);
snmp_add_var (trap_pdu, snmptrap_oid, sizeof (snmptrap_oid) / sizeof (oid), 'o', SNMP_OID_TRAPS_RRP);
/* Add extries to the trap */
add_field (trap_pdu, ASN_OCTET_STR, SNMP_OID_OBJECT_NODE_NAME, (void*)nodename, strlen (nodename));
add_field (trap_pdu, ASN_INTEGER, SNMP_OID_OBJECT_NODE_ID, (void*)&nodeid, sizeof (nodeid));
add_field (trap_pdu, ASN_INTEGER, SNMP_OID_OBJECT_RRP_IFACE_NO, (void*)&iface_no, sizeof (iface_no));
add_field (trap_pdu, ASN_OCTET_STR, SNMP_OID_OBJECT_RRP_STATUS, (void*)state, strlen (state));
/* Send and cleanup */
ret = snmp_send (session, trap_pdu);
if (ret == 0) {
/* error */
qb_log(LOG_ERR, "Could not send SNMP trap");
snmp_free_pdu (trap_pdu);
}
}
static void
_cs_snmp_init(void)
{
if (snmp_manager == NULL) {
snmp_manager = (char*)local_host;
}
notifiers[num_notifiers].node_membership_fn =
_cs_snmp_node_membership_event;
notifiers[num_notifiers].node_quorum_fn =
_cs_snmp_node_quorum_event;
notifiers[num_notifiers].application_connection_fn = NULL;
notifiers[num_notifiers].rrp_faulty_fn =
_cs_snmp_rrp_faulty_event;
num_notifiers++;
}
#endif /* ENABLE_SNMP */
static void
_cs_syslog_node_membership_event(char *nodename, uint32_t nodeid, char *state, char* ip)
{
qb_log(LOG_NOTICE, "%s[%d] ip:%s %s", nodename, nodeid, ip, state);
}
static void
_cs_syslog_node_quorum_event(char *nodename, uint32_t nodeid, const char *state)
{
if (strcmp(state, "quorate") == 0) {
qb_log(LOG_NOTICE, "%s[%d] is now %s", nodename, nodeid, state);
} else {
qb_log(LOG_NOTICE, "%s[%d] has lost quorum", nodename, nodeid);
}
}
static void
_cs_syslog_application_connection_event(char *nodename, uint32_t nodeid, char* app_name, const char *state)
{
if (strcmp(state, "connected") == 0) {
qb_log(LOG_NOTICE, "%s[%d] %s is now %s to corosync", nodename, nodeid, app_name, state);
} else {
qb_log(LOG_NOTICE, "%s[%d] %s is now %s from corosync", nodename, nodeid, app_name, state);
}
}
static void
_cs_syslog_rrp_faulty_event(char *nodename, uint32_t nodeid, uint32_t iface_no, const char *state)
{
qb_log(LOG_NOTICE, "%s[%d] interface %u is now %s", nodename, nodeid, iface_no, state);
}
static void
_cs_node_membership_event(char *nodename, uint32_t nodeid, char *state, char* ip)
{
int i;
for (i = 0; i < num_notifiers; i++) {
if (notifiers[i].node_membership_fn) {
notifiers[i].node_membership_fn(nodename, nodeid, state, ip);
}
}
}
static void
_cs_local_node_info_get(char **nodename, uint32_t *nodeid)
{
cs_error_t rc;
corosync_cfg_handle_t cfg_handle;
if (local_nodeid == 0) {
rc = corosync_cfg_initialize(&cfg_handle, NULL);
if (rc != CS_OK) {
syslog (LOG_ERR, "Failed to initialize the cfg API. Error %d\n", rc);
exit (EXIT_FAILURE);
}
rc = corosync_cfg_local_get (cfg_handle, &local_nodeid);
corosync_cfg_finalize(cfg_handle);
if (rc != CS_OK) {
local_nodeid = 0;
strncpy(local_nodename, "localhost", sizeof (local_nodename));
local_nodename[sizeof (local_nodename) - 1] = '\0';
} else {
gethostname(local_nodename, CS_MAX_NAME_LENGTH);
}
}
*nodeid = local_nodeid;
*nodename = local_nodename;
}
static void
_cs_node_quorum_event(const char *state)
{
int i;
char *nodename;
uint32_t nodeid;
_cs_local_node_info_get(&nodename, &nodeid);
for (i = 0; i < num_notifiers; i++) {
if (notifiers[i].node_quorum_fn) {
notifiers[i].node_quorum_fn(nodename, nodeid, state);
}
}
}
static void
_cs_application_connection_event(char *app_name, const char *state)
{
int i;
char *nodename;
uint32_t nodeid;
_cs_local_node_info_get(&nodename, &nodeid);
for (i = 0; i < num_notifiers; i++) {
if (notifiers[i].application_connection_fn) {
notifiers[i].application_connection_fn(nodename, nodeid, app_name, state);
}
}
}
static void
_cs_rrp_faulty_event(uint32_t iface_no, const char *state)
{
int i;
char *nodename;
uint32_t nodeid;
_cs_local_node_info_get(&nodename, &nodeid);
for (i = 0; i < num_notifiers; i++) {
if (notifiers[i].rrp_faulty_fn) {
notifiers[i].rrp_faulty_fn(nodename, nodeid, iface_no, state);
}
}
}
static int32_t
sig_exit_handler(int32_t num, void *data)
{
qb_loop_stop(main_loop);
return 0;
}
static void
_cs_cmap_init(void)
{
cs_error_t rc;
int cmap_fd = 0;
cmap_track_handle_t track_handle;
rc = cmap_initialize (&cmap_handle);
if (rc != CS_OK) {
qb_log(LOG_ERR, "Failed to initialize the cmap API. Error %d", rc);
exit (EXIT_FAILURE);
}
cmap_fd_get(cmap_handle, &cmap_fd);
qb_loop_poll_add(main_loop, QB_LOOP_MED, cmap_fd, POLLIN|POLLNVAL, NULL,
_cs_cmap_dispatch);
rc = cmap_track_add(cmap_handle, "runtime.connections.",
CMAP_TRACK_ADD | CMAP_TRACK_DELETE | CMAP_TRACK_PREFIX,
_cs_cmap_connections_key_changed,
NULL,
&track_handle);
if (rc != CS_OK) {
qb_log(LOG_ERR,
"Failed to track the connections key. Error %d", rc);
exit (EXIT_FAILURE);
}
rc = cmap_track_add(cmap_handle, "runtime.totem.pg.mrp.srp.members.",
CMAP_TRACK_ADD | CMAP_TRACK_MODIFY | CMAP_TRACK_PREFIX,
_cs_cmap_members_key_changed,
NULL,
&track_handle);
if (rc != CS_OK) {
qb_log(LOG_ERR,
"Failed to track the members key. Error %d", rc);
exit (EXIT_FAILURE);
}
rc = cmap_track_add(cmap_handle, "runtime.totem.pg.mrp.rrp.",
CMAP_TRACK_ADD | CMAP_TRACK_MODIFY | CMAP_TRACK_PREFIX,
_cs_cmap_rrp_faulty_key_changed,
NULL,
&track_handle);
if (rc != CS_OK) {
qb_log(LOG_ERR,
"Failed to track the rrp key. Error %d", rc);
exit (EXIT_FAILURE);
}
}
static void
_cs_cmap_finalize(void)
{
cmap_finalize (cmap_handle);
}
static void
_cs_check_config(void)
{
if (conf[CS_NTF_LOG] == QB_FALSE &&
conf[CS_NTF_STDOUT] == QB_FALSE &&
conf[CS_NTF_SNMP] == QB_FALSE &&
conf[CS_NTF_DBUS] == QB_FALSE) {
qb_log(LOG_ERR, "no event type enabled, see corosync-notifyd -h, exiting.");
exit(EXIT_FAILURE);
}
#ifndef ENABLE_SNMP
if (conf[CS_NTF_SNMP]) {
qb_log(LOG_ERR, "Not compiled with SNMP support enabled, exiting.");
exit(EXIT_FAILURE);
}
#endif
#ifndef HAVE_DBUS
if (conf[CS_NTF_DBUS]) {
qb_log(LOG_ERR, "Not compiled with DBus support enabled, exiting.");
exit(EXIT_FAILURE);
}
#endif
if (conf[CS_NTF_STDOUT] && !conf[CS_NTF_FG]) {
qb_log(LOG_ERR, "configured to print to stdout and run in the background, exiting");
exit(EXIT_FAILURE);
}
if (conf[CS_NTF_SNMP] && conf[CS_NTF_DBUS]) {
qb_log(LOG_ERR, "configured to send snmp traps and dbus signals - are you sure?.");
}
}
static void
_cs_usage(void)
{
fprintf(stderr, "usage:\n"\
" -f : Start application in foreground.\n"\
" -l : Log all events.\n"\
" -o : Print events to stdout (turns on -l).\n"\
" -s : Send SNMP traps on all events.\n"\
" -m : Set the SNMP Manager IP address (defaults to localhost).\n"\
" -d : Send DBUS signals on all events.\n"\
" -h : Print this help.\n\n");
}
int
main(int argc, char *argv[])
{
int ch;
conf[CS_NTF_FG] = QB_FALSE;
conf[CS_NTF_LOG] = QB_FALSE;
conf[CS_NTF_STDOUT] = QB_FALSE;
conf[CS_NTF_SNMP] = QB_FALSE;
conf[CS_NTF_DBUS] = QB_FALSE;
while ((ch = getopt (argc, argv, "floshdm:")) != EOF) {
switch (ch) {
case 'f':
conf[CS_NTF_FG] = QB_TRUE;
break;
case 'l':
conf[CS_NTF_LOG] = QB_TRUE;
break;
case 'm':
conf[CS_NTF_SNMP] = QB_TRUE;
strncpy(snmp_manager_buf, optarg, sizeof (snmp_manager_buf));
snmp_manager_buf[sizeof (snmp_manager_buf) - 1] = '\0';
snmp_manager = snmp_manager_buf;
break;
case 'o':
conf[CS_NTF_LOG] = QB_TRUE;
conf[CS_NTF_STDOUT] = QB_TRUE;
break;
case 's':
conf[CS_NTF_SNMP] = QB_TRUE;
break;
case 'd':
conf[CS_NTF_DBUS] = QB_TRUE;
break;
case 'h':
default:
_cs_usage();
return EXIT_FAILURE;
}
}
qb_log_init("notifyd", LOG_DAEMON, LOG_INFO);
if (conf[CS_NTF_STDOUT]) {
qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD,
QB_LOG_FILTER_FILE, "*", LOG_DEBUG);
qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, conf[CS_NTF_STDOUT]);
}
_cs_check_config();
if (!conf[CS_NTF_FG]) {
if (daemon(0, 0) < 0)
{
perror("daemon() failed");
return EXIT_FAILURE;
}
}
num_notifiers = 0;
if (conf[CS_NTF_LOG]) {
notifiers[num_notifiers].node_membership_fn =
_cs_syslog_node_membership_event;
notifiers[num_notifiers].node_quorum_fn =
_cs_syslog_node_quorum_event;
notifiers[num_notifiers].application_connection_fn =
_cs_syslog_application_connection_event;
notifiers[num_notifiers].rrp_faulty_fn =
_cs_syslog_rrp_faulty_event;
num_notifiers++;
}
main_loop = qb_loop_create();
_cs_cmap_init();
_cs_quorum_init();
#ifdef HAVE_DBUS
if (conf[CS_NTF_DBUS]) {
_cs_dbus_init();
}
#endif /* HAVE_DBUS */
#ifdef ENABLE_SNMP
if (conf[CS_NTF_SNMP]) {
_cs_snmp_init();
}
#endif /* ENABLE_SNMP */
qb_loop_signal_add(main_loop,
QB_LOOP_HIGH,
SIGINT,
NULL,
sig_exit_handler,
NULL);
qb_loop_signal_add(main_loop,
QB_LOOP_HIGH,
SIGQUIT,
NULL,
sig_exit_handler,
NULL);
qb_loop_signal_add(main_loop,
QB_LOOP_HIGH,
SIGTERM,
NULL,
sig_exit_handler,
NULL);
qb_loop_run(main_loop);
#ifdef HAVE_DBUS
if (conf[CS_NTF_DBUS]) {
_cs_dbus_release();
}
#endif /* HAVE_DBUS */
_cs_quorum_finalize();
_cs_cmap_finalize();
return 0;
}
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sat, Nov 23, 3:27 AM (46 m, 11 s)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1018116
Default Alt Text
(245 KB)
Attached To
Mode
rC Corosync
Attached
Detach File
Event Timeline
Log In to Comment