Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/vqsim/vqmain.c b/vqsim/vqmain.c
index 683cc227..c3a1327d 100644
--- a/vqsim/vqmain.c
+++ b/vqsim/vqmain.c
@@ -1,746 +1,740 @@
#include <config.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <qb/qblog.h>
#include <qb/qbloop.h>
#include <sys/poll.h>
#include <netinet/in.h>
#include <sys/queue.h>
#ifdef HAVE_READLINE_READLINE_H
#include <readline/readline.h>
#else
#include <unistd.h> /* isatty */
#endif
#include "../exec/votequorum.h"
#include "../exec/service.h"
#include <corosync/logsys.h>
#include <corosync/coroapi.h>
#include "icmap.h"
#include "vqsim.h"
/* Easier than including the config file with a ton of conflicting dependencies */
extern int coroparse_configparse (icmap_map_t config_map, const char **error_string);
extern int corosync_log_config_read (const char **error_string);
/* 'Keep the compiler happy' time */
const char *corosync_get_config_file(void);
/* One of these per partition */
struct vq_partition {
TAILQ_HEAD(, vq_node) nodelist;
struct memb_ring_id ring_id;
int num;
};
/* One of these per node */
struct vq_node {
vq_object_t instance;
unsigned int nodeid;
int fd;
struct vq_partition *partition;
TAILQ_ENTRY(vq_node) entries;
/* Last status */
int last_quorate;
struct memb_ring_id last_ring_id;
int last_view_list[MAX_NODES];
int last_view_list_entries;
};
static struct vq_partition partitions[MAX_PARTITIONS];
static qb_loop_t *poll_loop;
static int autofence;
static int check_for_quorum;
static FILE *output_file;
static int nosync;
static qb_loop_timer_handle kb_timer;
static ssize_t wait_count;
static ssize_t wait_count_to_unblock;
static struct vq_node *find_by_pid(pid_t pid);
static void send_partition_to_nodes(struct vq_partition *partition, int newring);
static void start_kb_input(void);
static void start_kb_input_timeout(void *data);
#ifndef HAVE_READLINE_READLINE_H
#define INPUT_BUF_SIZE 1024
static char input_buf[INPUT_BUF_SIZE];
static size_t input_buf_term = 0;
static int is_tty;
#endif
/* 'Keep the compiler happy' time */
static char corosync_config_file[PATH_MAX + 1] = COROSYSCONFDIR "/corosync.conf";
const char *corosync_get_config_file(void)
{
return (corosync_config_file);
}
/* Tell all non-quorate nodes to quit */
static void force_fence(void)
{
int i;
struct vq_node *vqn;
for (i=0; i<MAX_PARTITIONS; i++) {
TAILQ_FOREACH(vqn, &partitions[i].nodelist, entries) {
vq_quit_if_inquorate(vqn->instance);
}
}
}
/* Save quorum state from the incoming message */
static void save_quorum_state(struct vq_node *node, struct vqsim_quorum_msg *qmsg)
{
node->last_quorate = qmsg->quorate;
memcpy(&node->last_ring_id, &qmsg->ring_id, sizeof(struct memb_ring_id));
memcpy(node->last_view_list, qmsg->view_list, sizeof(int) * qmsg->view_list_entries);
node->last_view_list_entries = qmsg->view_list_entries;
/* If at least one node is quorate and autofence is enabled, then fence everyone who is not quorate */
if (check_for_quorum && qmsg->quorate & autofence) {
check_for_quorum = 0;
force_fence();
}
}
/* Print current node state */
static void print_quorum_state(struct vq_node *node)
{
int i;
if (node->last_quorate < 0) {
fprintf(output_file, "%d:%02d: q=UNINITIALIZED\n",
node->partition->num, node->nodeid);
return;
}
fprintf(output_file, "%d:%02d: q=%d ring=[%d/%lld] ", node->partition->num, node->nodeid, node->last_quorate,
node->last_ring_id.nodeid, node->last_ring_id.seq);
fprintf(output_file, "nodes=[");
for (i = 0; i < node->last_view_list_entries; i++) {
if (i) {
fprintf(output_file, " ");
}
fprintf(output_file, "%d", node->last_view_list[i]);
}
fprintf(output_file, "]\n");
}
static void propogate_vq_message(struct vq_node *vqn, const char *msg, int len)
{
struct vq_node *other_vqn;
/* Send it to everyone in that node's partition (including itself) */
TAILQ_FOREACH(other_vqn, &vqn->partition->nodelist, entries) {
write(other_vqn->fd, msg, len);
}
}
static int vq_parent_read_fn(int32_t fd, int32_t revents, void *data)
{
char msgbuf[8192];
int msglen;
struct vqsim_msg_header *msg;
struct vqsim_quorum_msg *qmsg;
struct vq_node *vqn = data;
if (revents == POLLIN) {
msglen = read(fd, msgbuf, sizeof(msgbuf));
if (msglen < 0) {
perror("read failed");
}
if (msglen > 0) {
msg = (void*)msgbuf;
switch (msg->type) {
case VQMSG_QUORUM:
if (!nosync && --wait_count_to_unblock <= 0)
qb_loop_timer_del(poll_loop, kb_timer);
qmsg = (void*)msgbuf;
save_quorum_state(vqn, qmsg);
print_quorum_state(vqn);
if (!nosync && wait_count_to_unblock <= 0)
start_kb_input();
break;
case VQMSG_EXEC:
/* Message from votequorum, pass around the partition */
propogate_vq_message(vqn, msgbuf, msglen);
break;
case VQMSG_QUIT:
case VQMSG_SYNC:
case VQMSG_QDEVICE:
case VQMSG_QUORUMQUIT:
/* not used here */
break;
}
}
}
if (revents == POLLERR) {
fprintf(stderr, "pollerr on %d\n", vqn->nodeid);
}
return 0;
}
static int read_corosync_conf(void)
{
int res;
const char *error_string;
int err = icmap_init();
if (!err) {
fprintf(stderr, "icmap_init failed\n");
}
/* Load corosync.conf */
logsys_format_set(NULL);
res = coroparse_configparse(icmap_get_global_map(), &error_string);
if (res == -1) {
log_printf (LOGSYS_LEVEL_INFO, "Error loading corosyc.conf %s", error_string);
return -1;
}
else {
res = corosync_log_config_read (&error_string);
if (res < 0) {
log_printf (LOGSYS_LEVEL_INFO, "error reading log config %s", error_string);
syslog (LOGSYS_LEVEL_INFO, "error reading log config %s", error_string);
}
else {
logsys_config_apply();
}
}
if (logsys_thread_start() != 0) {
log_printf (LOGSYS_LEVEL_ERROR, "Can't initialize log thread");
return -1;
}
return 0;
}
static void remove_node(struct vq_node *node)
{
struct vq_partition *part;
part = node->partition;
/* Remove from partition list */
TAILQ_REMOVE(&part->nodelist, node, entries);
free(node);
wait_count--;
/* Rebuild quorum */
send_partition_to_nodes(part, 1);
}
static int32_t sigchld_handler(int32_t sig, void *data)
{
pid_t pid;
int status;
struct vq_node *vqn;
const char *exit_status="";
char text[132];
pid = wait(&status);
if (WIFEXITED(status)) {
vqn = find_by_pid(pid);
if (vqn) {
switch (WEXITSTATUS(status)) {
case 0:
exit_status = "(on request)";
break;
case 1:
exit_status = "(autofenced)";
break;
default:
sprintf(text, "(exit code %d)", WEXITSTATUS(status));
break;
}
printf("%d:%02d Quit %s\n", vqn->partition->num, vqn->nodeid, exit_status);
remove_node(vqn);
}
else {
fprintf(stderr, "Unknown child %d exited with status %d\n", pid, WEXITSTATUS(status));
}
}
if (WIFSIGNALED(status)) {
vqn = find_by_pid(pid);
if (vqn) {
printf("%d:%02d exited on signal %d%s\n", vqn->partition->num, vqn->nodeid, WTERMSIG(status), WCOREDUMP(status)?" (core dumped)":"");
remove_node(vqn);
}
else {
fprintf(stderr, "Unknown child %d exited with status %d%s\n", pid, WTERMSIG(status), WCOREDUMP(status)?" (core dumped)":"");
}
}
return 0;
}
static void send_partition_to_nodes(struct vq_partition *partition, int newring)
{
struct vq_node *vqn;
int nodelist[MAX_NODES];
int nodes = 0;
int first = 1;
if (newring) {
/* Simulate corosync incrementing the seq by 4 for added authenticity */
partition->ring_id.seq += 4;
}
/* Build the node list */
TAILQ_FOREACH(vqn, &partition->nodelist, entries) {
nodelist[nodes++] = vqn->nodeid;
if (first) {
partition->ring_id.nodeid = vqn->nodeid;
first = 0;
}
}
TAILQ_FOREACH(vqn, &partition->nodelist, entries) {
vq_set_nodelist(vqn->instance, &partition->ring_id, nodelist, nodes);
}
}
static void init_partitions(void)
{
int i;
for (i=0; i<MAX_PARTITIONS; i++) {
TAILQ_INIT(&partitions[i].nodelist);
partitions[i].ring_id.nodeid = 1000+i;
partitions[i].ring_id.seq = 0;
partitions[i].num = i;
}
}
static pid_t create_node(int nodeid, int partno)
{
struct vq_node *newvq;
newvq = malloc(sizeof(struct vq_node));
if (newvq) {
if (!nosync) {
/* Number of expected "quorum" vq messages is a square
of the total nodes count, so increment the node
counter and set new square of this value as
a "to observe" counter */
wait_count++;
wait_count_to_unblock = wait_count * wait_count;
}
newvq->last_quorate = -1; /* mark "uninitialized" */
newvq->instance = vq_create_instance(poll_loop, nodeid);
if (!newvq->instance) {
fprintf(stderr,
"ERR: could not create vq instance nodeid %d\n",
nodeid);
return (pid_t) -1;
}
newvq->partition = &partitions[partno];
newvq->nodeid = nodeid;
newvq->fd = vq_get_parent_fd(newvq->instance);
TAILQ_INSERT_TAIL(&partitions[partno].nodelist, newvq, entries);
if (qb_loop_poll_add(poll_loop,
QB_LOOP_MED,
newvq->fd,
POLLIN | POLLERR,
newvq,
vq_parent_read_fn)) {
perror("qb_loop_poll_add returned error");
return (pid_t) -1;
}
/* Send sync with all the nodes so far in it. */
send_partition_to_nodes(&partitions[partno], 1);
return vq_get_pid(newvq->instance);
}
return (pid_t) -1;
}
static size_t create_nodes_from_config(void)
{
icmap_iter_t iter;
char tmp_key[ICMAP_KEYNAME_MAXLEN];
uint32_t node_pos;
uint32_t nodeid;
const char *iter_key;
int res;
pid_t pid;
size_t ret = 0;
init_partitions();
iter = icmap_iter_init("nodelist.node.");
while ((iter_key = icmap_iter_next(iter, NULL, NULL)) != NULL) {
res = sscanf(iter_key, "nodelist.node.%u.%s", &node_pos, tmp_key);
if (res != 2) {
continue;
}
if (strcmp(tmp_key, "ring0_addr") != 0) {
continue;
}
snprintf(tmp_key, ICMAP_KEYNAME_MAXLEN, "nodelist.node.%u.nodeid", node_pos);
if (icmap_get_uint32(tmp_key, &nodeid) == CS_OK) {
pid = create_node(nodeid, 0);
if (pid == (pid_t) -1) {
fprintf(stderr,
"ERR: nodeid %d could not be spawned\n",
nodeid);
exit(1);
}
ret++;
}
}
icmap_iter_finalize(iter);
return ret;
}
static struct vq_node *find_node(int nodeid)
{
int i;
struct vq_node *vqn;
for (i=0; i<MAX_PARTITIONS; i++) {
TAILQ_FOREACH(vqn, &partitions[i].nodelist, entries) {
if (vqn->nodeid == nodeid) {
return vqn;
}
}
}
return NULL;
}
static struct vq_node *find_by_pid(pid_t pid)
{
int i;
struct vq_node *vqn;
for (i=0; i<MAX_PARTITIONS; i++) {
TAILQ_FOREACH(vqn, &partitions[i].nodelist, entries) {
if (vq_get_pid(vqn->instance) == pid) {
return vqn;
}
}
}
return NULL;
}
/* Routines called from the parser */
void cmd_start_new_node(int nodeid, int partition)
{
struct vq_node *node;
node = find_node(nodeid);
if (node) {
fprintf(stderr, "ERR: nodeid %d already exists in partition %d\n", nodeid, node->partition->num);
return;
}
qb_loop_poll_del(poll_loop, STDIN_FILENO);
create_node(nodeid, partition);
if (!nosync) {
/* Delay kb input handling by 0.25 second when we've just
added a node; expect that the delay will be cancelled
substantially earlier once it has reported its quorum info
(the delay is in fact a failsafe input enabler here) */
qb_loop_timer_add(poll_loop,
QB_LOOP_MED,
250000000,
NULL,
start_kb_input_timeout,
&kb_timer);
}
}
void cmd_stop_all_nodes()
{
int i;
struct vq_node *vqn;
for (i=0; i<MAX_PARTITIONS; i++) {
TAILQ_FOREACH(vqn, &partitions[i].nodelist, entries) {
vq_quit(vqn->instance);
}
}
}
void cmd_show_node_states()
{
int i;
struct vq_node *vqn;
for (i=0; i<MAX_PARTITIONS; i++) {
TAILQ_FOREACH(vqn, &partitions[i].nodelist, entries) {
print_quorum_state(vqn);
}
}
fprintf(output_file, "#autofence: %s\n", autofence?"on":"off");
}
void cmd_stop_node(int nodeid)
{
struct vq_node *node;
node = find_node(nodeid);
if (!node) {
fprintf(stderr, "ERR: nodeid %d is not up\n", nodeid);
return;
}
/* Remove processor */
vq_quit(node->instance);
/* Node will be removed when the child process exits */
}
/* Move all nodes in 'nodelist' into partition 'partition' */
void cmd_move_nodes(int partition, int num_nodes, int *nodelist)
{
int i;
struct vq_node *node;
for (i=0; i<num_nodes; i++) {
node = find_node(nodelist[i]);
if (node) {
/* Remove it from the current partition */
TAILQ_REMOVE(&node->partition->nodelist, node, entries);
/* Add it to the new partition */
TAILQ_INSERT_TAIL(&partitions[partition].nodelist, node, entries);
node->partition = &partitions[partition];
}
else {
printf("ERR: node %d does not exist\n", nodelist[i]);
}
}
}
/* Take all the nodes in part2 and join them to part1 */
void cmd_join_partitions(int part1, int part2)
{
struct vq_node *vqn;
/* TAILQ_FOREACH is not delete safe *sigh* */
retry:
TAILQ_FOREACH(vqn, &partitions[part2].nodelist, entries) {
TAILQ_REMOVE(&vqn->partition->nodelist, vqn, entries);
TAILQ_INSERT_TAIL(&partitions[part1].nodelist, vqn, entries);
vqn->partition = &partitions[part1];
goto retry;
}
}
void cmd_set_autofence(int onoff)
{
autofence = onoff;
fprintf(output_file, "#autofence: %s\n", onoff?"on":"off");
}
void cmd_update_all_partitions(int newring)
{
int i;
check_for_quorum = 1;
for (i=0; i<MAX_PARTITIONS; i++) {
send_partition_to_nodes(&partitions[i], newring);
}
}
void cmd_qdevice_poll(int nodeid, int onoff)
{
struct vq_node *node;
node = find_node(nodeid);
if (node) {
vq_set_qdevice(node->instance, &node->partition->ring_id, onoff);
}
}
/* ---------------------------------- */
#ifndef HAVE_READLINE_READLINE_H
static void dummy_read_char(void);
static void dummy_read_char()
{
int c, flush = 0;
while (!flush) {
c = getchar();
if (++input_buf_term >= INPUT_BUF_SIZE) {
if (c != '\n' && c != EOF)
fprintf(stderr, "User input overflows the limit: %zu\n",
(size_t) INPUT_BUF_SIZE);
input_buf[INPUT_BUF_SIZE - 1] = '\0';
flush = 1;
} else if (c == '\n' || c == EOF) {
input_buf[input_buf_term - 1] = '\0';
flush = 1;
} else {
input_buf[input_buf_term - 1] = c;
}
}
parse_input_command((c == EOF) ? NULL : input_buf);
input_buf_term = 0;
if (is_tty) {
printf("vqsim> ");
fflush(stdout);
}
}
#endif
static int stdin_read_fn(int32_t fd, int32_t revents, void *data)
{
#ifdef HAVE_READLINE_READLINE_H
/* Send it to readline */
rl_callback_read_char();
#else
dummy_read_char();
#endif
return 0;
}
static void start_kb_input(void)
{
wait_count_to_unblock = 0;
#ifdef HAVE_READLINE_READLINE_H
/* Readline will deal with completed lines when they arrive */
rl_callback_handler_install("vqsim> ", parse_input_command);
#else
if (is_tty) {
printf("vqsim> ");
fflush(stdout);
}
#endif
/* Send stdin to readline */
if (qb_loop_poll_add(poll_loop,
QB_LOOP_MED,
STDIN_FILENO,
POLLIN | POLLERR,
NULL,
stdin_read_fn)) {
if (errno != EEXIST) {
perror("qb_loop_poll_add1 returned error");
}
}
}
static void start_kb_input_timeout(void *data)
{
// fprintf(stderr, "Waiting for nodes to report status timed out\n");
start_kb_input();
}
static void usage(char *program)
{
printf("Usage:\n");
printf("\n");
printf("%s [-f <config-file>] [-o <output-file>]\n", program);
printf("\n");
printf(" -f config file. defaults to /etc/corosync/corosync.conf\n");
printf(" -o output file. defaults to stdout\n");
printf(" -n no synchronization (on adding a node)\n");
printf(" -h display this help text\n");
printf("\n");
}
int main(int argc, char **argv)
{
qb_loop_signal_handle sigchld_qb_handle;
int ch;
- char *config_file_name = NULL;
char *output_file_name = NULL;
- char envstring[PATH_MAX];
while ((ch = getopt (argc, argv, "f:o:nh")) != EOF) {
switch (ch) {
case 'f':
- config_file_name = optarg;
+ strncpy(corosync_config_file, optarg, sizeof(corosync_config_file));
break;
case 'o':
output_file_name = optarg;
break;
case 'n':
nosync = 1;
break;
default:
usage(argv[0]);
exit(0);
}
}
- if (config_file_name) {
- sprintf(envstring, "COROSYNC_MAIN_CONFIG_FILE=%s", config_file_name);
- putenv(envstring);
- }
if (output_file_name) {
output_file = fopen(output_file_name, "w");
if (!output_file) {
fprintf(stderr, "Unable to open %s for output: %s\n", output_file_name, strerror(errno));
exit(-1);
}
}
else {
output_file = stdout;
}
#ifndef HAVE_READLINE_READLINE_H
is_tty = isatty(STDIN_FILENO);
#endif
qb_log_filter_ctl(QB_LOG_SYSLOG, QB_LOG_FILTER_ADD,
QB_LOG_FILTER_FUNCTION, "*", LOG_DEBUG);
qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE);
qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD,
QB_LOG_FILTER_FUNCTION, "*", LOG_DEBUG);
poll_loop = qb_loop_create();
/* SIGCHLD handler to reap sub-processes and reconfigure the cluster */
qb_loop_signal_add(poll_loop,
QB_LOOP_MED,
SIGCHLD,
NULL,
sigchld_handler,
&sigchld_qb_handle);
/* Create a full cluster of nodes from corosync.conf */
read_corosync_conf();
if (create_nodes_from_config() && !nosync) {
/* Delay kb input handling by 1 second when we've just
added the nodes from corosync.conf; expect that
the delay will be cancelled substantially earlier
once they all have reported their quorum info
(the delay is in fact a failsafe input enabler here) */
qb_loop_timer_add(poll_loop,
QB_LOOP_MED,
1000000000,
NULL,
start_kb_input_timeout,
&kb_timer);
} else {
start_kb_input();
}
qb_loop_run(poll_loop);
return 0;
}
diff --git a/vqsim/vqsim_vq_engine.c b/vqsim/vqsim_vq_engine.c
index 9d8b4eec..cbe1d471 100644
--- a/vqsim/vqsim_vq_engine.c
+++ b/vqsim/vqsim_vq_engine.c
@@ -1,434 +1,471 @@
/* This is the bit of VQSIM that runs in the forked process.
It represents a single votequorum instance or, if you like,
a 'node' in the cluster.
*/
#include <sys/types.h>
#include <qb/qblog.h>
#include <qb/qbloop.h>
#include <qb/qbipc_common.h>
#include <netinet/in.h>
#include <sys/poll.h>
#include <sys/socket.h>
#include <stdio.h>
#include "../exec/votequorum.h"
#include "../exec/service.h"
#include "../include/corosync/corotypes.h"
#include "../include/corosync/votequorum.h"
#include "../include/corosync/ipc_votequorum.h"
#include <corosync/logsys.h>
#include <corosync/coroapi.h>
#include "icmap.h"
#include "vqsim.h"
#define QDEVICE_NAME "VQsim_qdevice"
/* Static variables here are per-instance because we are forked */
static struct corosync_service_engine *engine;
static int parent_socket; /* Our end of the socket */
static char buffer[8192];
static int our_nodeid;
static char *private_data;
static qb_loop_t *poll_loop;
static qb_loop_timer_handle sync_timer;
static qb_loop_timer_handle qdevice_timer;
static int we_are_quorate;
static void *fake_conn = (void*)1;
static cs_error_t last_lib_error;
static struct memb_ring_id current_ring_id;
static int qdevice_registered;
static unsigned int qdevice_timeout = VOTEQUORUM_QDEVICE_DEFAULT_TIMEOUT;
/* 'Keep the compiler happy' time */
char *get_run_dir(void);
int api_timer_add_duration (
unsigned long long nanosec_duration,
void *data,
void (*timer_fn) (void *data),
corosync_timer_handle_t *handle);
static void api_error_memory_failure(void) __attribute__((noreturn));
static void api_error_memory_failure()
{
fprintf(stderr, "Out of memory error\n");
exit(-1);
}
static void api_timer_delete(corosync_timer_handle_t th)
{
qb_loop_timer_del(poll_loop, th);
}
int api_timer_add_duration (
unsigned long long nanosec_duration,
void *data,
void (*timer_fn) (void *data),
corosync_timer_handle_t *handle)
{
return qb_loop_timer_add(poll_loop,
QB_LOOP_MED,
nanosec_duration,
data,
timer_fn,
handle);
}
static unsigned int api_totem_nodeid_get(void)
{
return our_nodeid;
}
static int api_totem_mcast(const struct iovec *iov, unsigned int iovlen, unsigned int type)
{
struct vqsim_msg_header header;
struct iovec iovec[iovlen+1];
int total = sizeof(header);
int res;
int i;
header.type = VQMSG_EXEC;
header.from_nodeid = our_nodeid;
header.param = 0;
iovec[0].iov_base = &header;
iovec[0].iov_len = sizeof(header);
for (i=0; i<iovlen; i++) {
iovec[i+1].iov_base = iov[i].iov_base;
iovec[i+1].iov_len = iov[i].iov_len;
total += iov[i].iov_len;
}
res = writev(parent_socket, iovec, iovlen+1);
if (res != total) {
fprintf(stderr, "writev wrote only %d of %d bytes\n", res, total);
}
return 0;
}
static void *api_ipc_private_data_get(void *conn)
{
return private_data;
}
static int api_ipc_response_send(void *conn, const void *msg, size_t len)
{
struct qb_ipc_response_header *qb_header = (void*)msg;
/* Save the error so we can return it */
last_lib_error = qb_header->error;
return 0;
}
static struct corosync_api_v1 corosync_api = {
.error_memory_failure = api_error_memory_failure,
.timer_delete = api_timer_delete,
.timer_add_duration = api_timer_add_duration,
.totem_nodeid_get = api_totem_nodeid_get,
.totem_mcast = api_totem_mcast,
.ipc_private_data_get = api_ipc_private_data_get,
.ipc_response_send = api_ipc_response_send,
};
/* -------------------- Above is all for providing the corosync_api support routines --------------------------------------------*/
/* They need to be in the same file as the engine as they use the local 'poll_loop' variable which is per-process */
static void start_qdevice_poll(int longwait);
static void start_sync_timer(void);
/* Callback from Votequorum to tell us about the quorum state */
static void quorum_fn(const unsigned int *view_list,
size_t view_list_entries,
int quorate, struct memb_ring_id *ring_id)
{
char msgbuf[8192];
int len;
struct vqsim_quorum_msg *quorum_msg = (void*) msgbuf;
we_are_quorate = quorate;
/* Send back to parent */
quorum_msg->header.type = VQMSG_QUORUM;
quorum_msg->header.from_nodeid = our_nodeid;
quorum_msg->header.param = 0;
quorum_msg->quorate = quorate;
memcpy(&quorum_msg->ring_id, ring_id, sizeof(*ring_id));
quorum_msg->view_list_entries = view_list_entries;
memcpy(quorum_msg->view_list, view_list, sizeof(unsigned int)*view_list_entries);
if ( (len=write(parent_socket, msgbuf, sizeof(*quorum_msg) + sizeof(unsigned int)*view_list_entries)) <= 0) {
perror("write (view list to parent) failed");
}
memcpy(&current_ring_id, ring_id, sizeof(*ring_id));
}
char *corosync_service_link_and_init(struct corosync_api_v1 *api,
struct default_service *service_engine)
{
/* dummy */
return NULL;
}
/* For votequorum */
char *get_run_dir()
{
static char cwd_buffer[PATH_MAX];
return getcwd(cwd_buffer, PATH_MAX);
}
+/* This is different to the one in totemconfig.c in that we already
+ * know the 'local' node ID, so we can just search for that.
+ * It needs to be here rather than at main config read time as it's
+ * (obviously) going to be different for each instance.
+ */
+static void set_local_node_pos(struct corosync_api_v1 *api)
+{
+ icmap_iter_t iter;
+ uint32_t node_pos;
+ char name_str[ICMAP_KEYNAME_MAXLEN];
+ uint32_t nodeid;
+ const char *iter_key;
+ int res;
+
+ iter = icmap_iter_init("nodelist.node.");
+ while ((iter_key = icmap_iter_next(iter, NULL, NULL)) != NULL) {
+ res = sscanf(iter_key, "nodelist.node.%u.%s", &node_pos, name_str);
+ if (res != 2) {
+ continue;
+ }
+ if (strcmp(name_str, "nodeid")) {
+ continue;
+ }
+
+ res = icmap_get_uint32(iter_key, &nodeid);
+ if (res == CS_OK) {
+ if (nodeid == our_nodeid) {
+ res = icmap_set_uint32("nodelist.local_node_pos", node_pos);
+ if (res != CS_OK) {
+ fprintf(stderr, "Failed to find node %d in corosync.conf. Quorum calculations may not be correct:\n", our_nodeid);
+ }
+ }
+ }
+ }
+}
+
static int load_quorum_instance(struct corosync_api_v1 *api)
{
const char *error_string;
int res;
error_string = votequorum_init(api, quorum_fn);
if (error_string) {
fprintf(stderr, "Votequorum init failed: %s\n", error_string);
return -1;
}
engine = votequorum_get_service_engine_ver0();
error_string = engine->exec_init_fn(api);
if (error_string) {
fprintf(stderr, "votequorum exec init failed: %s\n", error_string);
return -1;
}
private_data = malloc(engine->private_data_size);
if (!private_data) {
perror("Malloc in child failed");
return -1;
}
res = engine->lib_init_fn(fake_conn);
return res;
}
static void sync_dispatch_fn(void *data)
{
if (engine->sync_process()) {
start_sync_timer();
}
else {
engine->sync_activate();
}
}
static void start_sync_timer()
{
qb_loop_timer_add(poll_loop,
QB_LOOP_MED,
10000000,
NULL,
sync_dispatch_fn,
&sync_timer);
}
static void send_sync(char *buf, int len)
{
struct vqsim_sync_msg *msg = (void*)buf;
/* Votequorum doesn't use the transitional node list :-) */
engine->sync_init(NULL, 0,
msg->view_list, msg->view_list_entries,
&msg->ring_id);
start_sync_timer();
}
static void send_exec_msg(char *buf, int len)
{
struct vqsim_exec_msg *execmsg = (void*)buf;
struct qb_ipc_request_header *qb_header = (void*)execmsg->execmsg;
engine->exec_engine[qb_header->id & 0xFFFF].exec_handler_fn(execmsg->execmsg, execmsg->header.from_nodeid);
}
static int send_lib_msg(int type, void *msg)
{
/* Clear this as not all lib functions return a response immediately */
last_lib_error = CS_OK;
engine->lib_engine[type].lib_handler_fn(fake_conn, msg);
return last_lib_error;
}
static int poll_qdevice(int onoff)
{
struct req_lib_votequorum_qdevice_poll pollmsg;
int res;
pollmsg.cast_vote = onoff;
pollmsg.ring_id.nodeid = current_ring_id.nodeid;
pollmsg.ring_id.seq = current_ring_id.seq;
strcpy(pollmsg.name, QDEVICE_NAME);
res = send_lib_msg(MESSAGE_REQ_VOTEQUORUM_QDEVICE_POLL, &pollmsg);
if (res != CS_OK) {
fprintf(stderr, "%d: qdevice poll failed: %d\n", our_nodeid, res);
}
return res;
}
static void qdevice_dispatch_fn(void *data)
{
if (poll_qdevice(1) == CS_OK) {
start_qdevice_poll(0);
}
}
static void start_qdevice_poll(int longwait)
{
unsigned long long timeout;
timeout = (unsigned long long)qdevice_timeout*500000; /* Half the corosync timeout */
if (longwait) {
timeout *= 2;
}
qb_loop_timer_add(poll_loop,
QB_LOOP_MED,
timeout,
NULL,
qdevice_dispatch_fn,
&qdevice_timer);
}
static void stop_qdevice_poll(void)
{
qb_loop_timer_del(poll_loop, qdevice_timer);
qdevice_timer = 0;
}
static void do_qdevice(int onoff)
{
int res;
if (onoff) {
if (!qdevice_registered) {
struct req_lib_votequorum_qdevice_register regmsg;
strcpy(regmsg.name, QDEVICE_NAME);
if ( (res=send_lib_msg(MESSAGE_REQ_VOTEQUORUM_QDEVICE_REGISTER, &regmsg)) == CS_OK) {
qdevice_registered = 1;
start_qdevice_poll(1);
}
else {
fprintf(stderr, "%d: qdevice registration failed: %d\n", our_nodeid, res);
}
}
else {
if (!qdevice_timer) {
start_qdevice_poll(0);
}
}
}
else {
poll_qdevice(0);
stop_qdevice_poll();
}
}
/* From controller */
static int parent_pipe_read_fn(int32_t fd, int32_t revents, void *data)
{
struct vqsim_msg_header *header = (void*)buffer;
int len;
len = read(fd, buffer, sizeof(buffer));
if (len > 0) {
/* Check header and route */
switch (header->type) {
case VQMSG_QUIT:
exit(0);
break;
case VQMSG_EXEC: /* For votequorum exec messages */
send_exec_msg(buffer, len);
break;
case VQMSG_SYNC:
send_sync(buffer, len);
break;
case VQMSG_QDEVICE:
do_qdevice(header->param);
break;
case VQMSG_QUORUMQUIT:
if (!we_are_quorate) {
exit(1);
}
break;
case VQMSG_QUORUM:
/* not used here */
break;
}
}
return 0;
}
static void initial_sync(int nodeid)
{
unsigned int trans_list[1] = {nodeid};
unsigned int member_list[1] = {nodeid};
struct memb_ring_id ring_id;
ring_id.nodeid = our_nodeid;
ring_id.seq = 1;
/* cluster with just us in it */
engine->sync_init(trans_list, 1,
member_list, 1,
&ring_id);
start_sync_timer();
}
/* Return pipe FDs & child PID if sucessful */
int fork_new_instance(int nodeid, int *vq_sock, pid_t *childpid)
{
int pipes[2];
pid_t pid;
if (socketpair(AF_UNIX, SOCK_SEQPACKET | SOCK_NONBLOCK, 0, pipes)) {
return -1;
}
parent_socket = pipes[0];
switch ( (pid=fork()) ) {
case -1:
perror("fork failed");
return -1;
case 0:
/* child process - continue below */
break;
default:
/* parent process */
*vq_sock = pipes[1];
*childpid = pid;
return 0;
}
our_nodeid = nodeid;
poll_loop = qb_loop_create();
if (icmap_get_uint32("quorum.device.timeout", &qdevice_timeout) != CS_OK) {
qdevice_timeout = VOTEQUORUM_QDEVICE_DEFAULT_TIMEOUT;
}
+ set_local_node_pos(&corosync_api);
load_quorum_instance(&corosync_api);
qb_loop_poll_add(poll_loop,
QB_LOOP_MED,
parent_socket,
POLLIN,
NULL,
parent_pipe_read_fn);
/* Start it up! */
initial_sync(nodeid);
qb_loop_run(poll_loop);
return 0;
}

File Metadata

Mime Type
text/x-diff
Expires
Thu, Oct 16, 12:39 AM (12 h, 35 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2530935
Default Alt Text
(29 KB)

Event Timeline