Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/exec/totempg.c b/exec/totempg.c
index 60f7dc11..68e21c85 100644
--- a/exec/totempg.c
+++ b/exec/totempg.c
@@ -1,1330 +1,1332 @@
/*
* Copyright (c) 2003-2005 MontaVista Software, Inc.
* Copyright (c) 2005 OSDL.
* Copyright (c) 2006-2009 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Steven Dake (sdake@redhat.com)
* Author: Mark Haverkamp (markh@osdl.org)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
/*
* FRAGMENTATION AND PACKING ALGORITHM:
*
* Assemble the entire message into one buffer
* if full fragment
* store fragment into lengths list
* for each full fragment
* multicast fragment
* set length and fragment fields of pg mesage
* store remaining multicast into head of fragmentation data and set lens field
*
* If a message exceeds the maximum packet size allowed by the totem
* single ring protocol, the protocol could lose forward progress.
* Statically calculating the allowed data amount doesn't work because
* the amount of data allowed depends on the number of fragments in
* each message. In this implementation, the maximum fragment size
* is dynamically calculated for each fragment added to the message.
* It is possible for a message to be two bytes short of the maximum
* packet size. This occurs when a message or collection of
* messages + the mcast header + the lens are two bytes short of the
* end of the packet. Since another len field consumes two bytes, the
* len field would consume the rest of the packet without room for data.
*
* One optimization would be to forgo the final len field and determine
* it from the size of the udp datagram. Then this condition would no
* longer occur.
*/
/*
* ASSEMBLY AND UNPACKING ALGORITHM:
*
* copy incoming packet into assembly data buffer indexed by current
* location of end of fragment
*
* if not fragmented
* deliver all messages in assembly data buffer
* else
* if msg_count > 1 and fragmented
* deliver all messages except last message in assembly data buffer
* copy last fragmented section to start of assembly data buffer
* else
* if msg_count = 1 and fragmented
* do nothing
*
*/
#include <config.h>
+#ifdef HAVE_ALLOCA_H
#include <alloca.h>
+#endif
#include <netinet/in.h>
#include <sys/uio.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <pthread.h>
#include <errno.h>
#include <limits.h>
#include <corosync/swab.h>
#include <corosync/hdb.h>
#include <corosync/list.h>
#include <corosync/totem/coropoll.h>
#include <corosync/totem/totempg.h>
#define LOGSYS_UTILS_ONLY 1
#include <corosync/engine/logsys.h>
#include "totemmrp.h"
#include "totemsrp.h"
#define min(a,b) ((a) < (b)) ? a : b
struct totempg_mcast_header {
short version;
short type;
};
/*
* totempg_mcast structure
*
* header: Identify the mcast.
* fragmented: Set if this message continues into next message
* continuation: Set if this message is a continuation from last message
* msg_count Indicates how many packed messages are contained
* in the mcast.
* Also, the size of each packed message and the messages themselves are
* appended to the end of this structure when sent.
*/
struct totempg_mcast {
struct totempg_mcast_header header;
unsigned char fragmented;
unsigned char continuation;
unsigned short msg_count;
/*
* short msg_len[msg_count];
*/
/*
* data for messages
*/
};
/*
* Maximum packet size for totem pg messages
*/
#define TOTEMPG_PACKET_SIZE (totempg_totem_config->net_mtu - \
sizeof (struct totempg_mcast))
/*
* Local variables used for packing small messages
*/
static unsigned short mcast_packed_msg_lens[FRAME_SIZE_MAX];
static int mcast_packed_msg_count = 0;
static int totempg_reserved = 0;
/*
* Function and data used to log messages
*/
static int totempg_log_level_security;
static int totempg_log_level_error;
static int totempg_log_level_warning;
static int totempg_log_level_notice;
static int totempg_log_level_debug;
static int totempg_subsys_id;
static void (*totempg_log_printf) (
unsigned int rec_ident,
const char *function,
const char *file,
int line,
const char *format, ...) __attribute__((format(printf, 5, 6)));
struct totem_config *totempg_totem_config;
enum throw_away_mode {
THROW_AWAY_INACTIVE,
THROW_AWAY_ACTIVE
};
struct assembly {
unsigned int nodeid;
unsigned char data[MESSAGE_SIZE_MAX];
int index;
unsigned char last_frag_num;
enum throw_away_mode throw_away_mode;
struct list_head list;
};
static void assembly_deref (struct assembly *assembly);
static int callback_token_received_fn (enum totem_callback_token_type type,
const void *data);
DECLARE_LIST_INIT(assembly_list_inuse);
DECLARE_LIST_INIT(assembly_list_free);
/*
* Staging buffer for packed messages. Messages are staged in this buffer
* before sending. Multiple messages may fit which cuts down on the
* number of mcasts sent. If a message doesn't completely fit, then
* the mcast header has a fragment bit set that says that there are more
* data to follow. fragment_size is an index into the buffer. It indicates
* the size of message data and where to place new message data.
* fragment_contuation indicates whether the first packed message in
* the buffer is a continuation of a previously packed fragment.
*/
static unsigned char *fragmentation_data;
static int fragment_size = 0;
static int fragment_continuation = 0;
static struct iovec iov_delv;
static unsigned int totempg_max_handle = 0;
struct totempg_group_instance {
void (*deliver_fn) (
unsigned int nodeid,
const void *msg,
unsigned int msg_len,
int endian_conversion_required);
void (*confchg_fn) (
enum totem_configuration_type configuration_type,
const unsigned int *member_list, size_t member_list_entries,
const unsigned int *left_list, size_t left_list_entries,
const unsigned int *joined_list, size_t joined_list_entries,
const struct memb_ring_id *ring_id);
struct totempg_group *groups;
int groups_cnt;
};
DECLARE_HDB_DATABASE (totempg_groups_instance_database,NULL);
static unsigned char next_fragment = 1;
static pthread_mutex_t totempg_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t callback_token_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t mcast_msg_mutex = PTHREAD_MUTEX_INITIALIZER;
#define log_printf(level, format, args...) \
do { \
totempg_log_printf ( \
LOGSYS_ENCODE_RECID(level, \
totempg_subsys_id, \
LOGSYS_RECID_LOG), \
__FUNCTION__, __FILE__, __LINE__, \
format, ##args); \
} while (0);
static int msg_count_send_ok (int msg_count);
static int byte_count_send_ok (int byte_count);
static struct assembly *assembly_ref (unsigned int nodeid)
{
struct assembly *assembly;
struct list_head *list;
/*
* Search inuse list for node id and return assembly buffer if found
*/
for (list = assembly_list_inuse.next;
list != &assembly_list_inuse;
list = list->next) {
assembly = list_entry (list, struct assembly, list);
if (nodeid == assembly->nodeid) {
return (assembly);
}
}
/*
* Nothing found in inuse list get one from free list if available
*/
if (list_empty (&assembly_list_free) == 0) {
assembly = list_entry (assembly_list_free.next, struct assembly, list);
list_del (&assembly->list);
list_add (&assembly->list, &assembly_list_inuse);
assembly->nodeid = nodeid;
assembly->index = 0;
assembly->last_frag_num = 0;
assembly->throw_away_mode = THROW_AWAY_INACTIVE;
return (assembly);
}
/*
* Nothing available in inuse or free list, so allocate a new one
*/
assembly = malloc (sizeof (struct assembly));
/*
* TODO handle memory allocation failure here
*/
assert (assembly);
assembly->nodeid = nodeid;
assembly->data[0] = 0;
assembly->index = 0;
assembly->last_frag_num = 0;
assembly->throw_away_mode = THROW_AWAY_INACTIVE;
list_init (&assembly->list);
list_add (&assembly->list, &assembly_list_inuse);
return (assembly);
}
static void assembly_deref (struct assembly *assembly)
{
list_del (&assembly->list);
list_add (&assembly->list, &assembly_list_free);
}
static inline void app_confchg_fn (
enum totem_configuration_type configuration_type,
const unsigned int *member_list, size_t member_list_entries,
const unsigned int *left_list, size_t left_list_entries,
const unsigned int *joined_list, size_t joined_list_entries,
const struct memb_ring_id *ring_id)
{
int i;
struct totempg_group_instance *instance;
unsigned int res;
for (i = 0; i <= totempg_max_handle; i++) {
res = hdb_handle_get (&totempg_groups_instance_database,
hdb_nocheck_convert (i), (void *)&instance);
if (res == 0) {
if (instance->confchg_fn) {
instance->confchg_fn (
configuration_type,
member_list,
member_list_entries,
left_list,
left_list_entries,
joined_list,
joined_list_entries,
ring_id);
}
hdb_handle_put (&totempg_groups_instance_database,
hdb_nocheck_convert (i));
}
}
}
static inline void group_endian_convert (
void *msg,
int msg_len)
{
unsigned short *group_len;
int i;
char *aligned_msg;
/*
* Align data structure for sparc and ia64
*/
if ((size_t)msg % 4 != 0) {
aligned_msg = alloca(msg_len);
memcpy(aligned_msg, msg, msg_len);
} else {
aligned_msg = msg;
}
group_len = (unsigned short *)aligned_msg;
group_len[0] = swab16(group_len[0]);
for (i = 1; i < group_len[0] + 1; i++) {
group_len[i] = swab16(group_len[i]);
}
if (aligned_msg != msg) {
memcpy(msg, aligned_msg, msg_len);
}
}
static inline int group_matches (
struct iovec *iovec,
unsigned int iov_len,
struct totempg_group *groups_b,
unsigned int group_b_cnt,
unsigned int *adjust_iovec)
{
unsigned short *group_len;
char *group_name;
int i;
int j;
struct iovec iovec_aligned = { NULL, 0 };
assert (iov_len == 1);
/*
* Align data structure for sparc and ia64
*/
if ((size_t)iovec->iov_base % 4 != 0) {
iovec_aligned.iov_base = alloca(iovec->iov_len);
memcpy(iovec_aligned.iov_base, iovec->iov_base, iovec->iov_len);
iovec_aligned.iov_len = iovec->iov_len;
iovec = &iovec_aligned;
}
group_len = (unsigned short *)iovec->iov_base;
group_name = ((char *)iovec->iov_base) +
sizeof (unsigned short) * (group_len[0] + 1);
/*
* Calculate amount to adjust the iovec by before delivering to app
*/
*adjust_iovec = sizeof (unsigned short) * (group_len[0] + 1);
for (i = 1; i < group_len[0] + 1; i++) {
*adjust_iovec += group_len[i];
}
/*
* Determine if this message should be delivered to this instance
*/
for (i = 1; i < group_len[0] + 1; i++) {
for (j = 0; j < group_b_cnt; j++) {
if ((group_len[i] == groups_b[j].group_len) &&
(memcmp (groups_b[j].group, group_name, group_len[i]) == 0)) {
return (1);
}
}
group_name += group_len[i];
}
return (0);
}
static inline void app_deliver_fn (
unsigned int nodeid,
void *msg,
unsigned int msg_len,
int endian_conversion_required)
{
int i;
struct totempg_group_instance *instance;
struct iovec stripped_iovec;
unsigned int adjust_iovec;
unsigned int res;
struct iovec *iovec;
struct iovec aligned_iovec = { NULL, 0 };
if (endian_conversion_required) {
group_endian_convert (msg, msg_len);
}
/*
* TODO This function needs to be rewritten for proper alignment to avoid 3+ memory copies
*/
/*
* Align data structure for sparc and ia64
*/
aligned_iovec.iov_base = alloca(msg_len);
aligned_iovec.iov_len = msg_len;
memcpy(aligned_iovec.iov_base, msg, msg_len);
iovec = &aligned_iovec;
for (i = 0; i <= totempg_max_handle; i++) {
res = hdb_handle_get (&totempg_groups_instance_database,
hdb_nocheck_convert (i), (void *)&instance);
if (res == 0) {
if (group_matches (iovec, 1, instance->groups, instance->groups_cnt, &adjust_iovec)) {
stripped_iovec.iov_len = iovec->iov_len - adjust_iovec;
stripped_iovec.iov_base = (char *)iovec->iov_base + adjust_iovec;
/*
* Align data structure for sparc and ia64
*/
if ((char *)iovec->iov_base + adjust_iovec % 4 != 0) {
/*
* Deal with misalignment
*/
stripped_iovec.iov_base =
alloca (stripped_iovec.iov_len);
memcpy (stripped_iovec.iov_base,
(char *)iovec->iov_base + adjust_iovec,
stripped_iovec.iov_len);
}
instance->deliver_fn (
nodeid,
stripped_iovec.iov_base,
stripped_iovec.iov_len,
endian_conversion_required);
}
hdb_handle_put (&totempg_groups_instance_database, hdb_nocheck_convert(i));
}
}
}
static void totempg_confchg_fn (
enum totem_configuration_type configuration_type,
const unsigned int *member_list, size_t member_list_entries,
const unsigned int *left_list, size_t left_list_entries,
const unsigned int *joined_list, size_t joined_list_entries,
const struct memb_ring_id *ring_id)
{
// TODO optimize this
app_confchg_fn (configuration_type,
member_list, member_list_entries,
left_list, left_list_entries,
joined_list, joined_list_entries,
ring_id);
}
static void totempg_deliver_fn (
unsigned int nodeid,
const void *msg,
unsigned int msg_len,
int endian_conversion_required)
{
struct totempg_mcast *mcast;
unsigned short *msg_lens;
int i;
struct assembly *assembly;
char header[FRAME_SIZE_MAX];
int msg_count;
int continuation;
int start;
const char *data;
int datasize;
assembly = assembly_ref (nodeid);
assert (assembly);
/*
* Assemble the header into one block of data and
* assemble the packet contents into one block of data to simplify delivery
*/
mcast = (struct totempg_mcast *)msg;
if (endian_conversion_required) {
mcast->msg_count = swab16 (mcast->msg_count);
}
msg_count = mcast->msg_count;
datasize = sizeof (struct totempg_mcast) +
msg_count * sizeof (unsigned short);
memcpy (header, msg, datasize);
data = msg;
msg_lens = (unsigned short *) (header + sizeof (struct totempg_mcast));
if (endian_conversion_required) {
for (i = 0; i < mcast->msg_count; i++) {
msg_lens[i] = swab16 (msg_lens[i]);
}
}
memcpy (&assembly->data[assembly->index], &data[datasize],
msg_len - datasize);
/*
* If the last message in the buffer is a fragment, then we
* can't deliver it. We'll first deliver the full messages
* then adjust the assembly buffer so we can add the rest of the
* fragment when it arrives.
*/
msg_count = mcast->fragmented ? mcast->msg_count - 1 : mcast->msg_count;
continuation = mcast->continuation;
iov_delv.iov_base = (void *)&assembly->data[0];
iov_delv.iov_len = assembly->index + msg_lens[0];
/*
* Make sure that if this message is a continuation, that it
* matches the sequence number of the previous fragment.
* Also, if the first packed message is a continuation
* of a previous message, but the assembly buffer
* is empty, then we need to discard it since we can't
* assemble a complete message. Likewise, if this message isn't a
* continuation and the assembly buffer is empty, we have to discard
* the continued message.
*/
start = 0;
if (assembly->throw_away_mode == THROW_AWAY_ACTIVE) {
/* Throw away the first msg block */
if (mcast->fragmented == 0 || mcast->fragmented == 1) {
assembly->throw_away_mode = THROW_AWAY_INACTIVE;
assembly->index += msg_lens[0];
iov_delv.iov_base = (void *)&assembly->data[assembly->index];
iov_delv.iov_len = msg_lens[1];
start = 1;
}
} else
if (assembly->throw_away_mode == THROW_AWAY_INACTIVE) {
if (continuation == assembly->last_frag_num) {
assembly->last_frag_num = mcast->fragmented;
for (i = start; i < msg_count; i++) {
app_deliver_fn(nodeid, iov_delv.iov_base, iov_delv.iov_len,
endian_conversion_required);
assembly->index += msg_lens[i];
iov_delv.iov_base = (void *)&assembly->data[assembly->index];
if (i < (msg_count - 1)) {
iov_delv.iov_len = msg_lens[i + 1];
}
}
} else {
assembly->throw_away_mode = THROW_AWAY_ACTIVE;
}
}
if (mcast->fragmented == 0) {
/*
* End of messages, dereference assembly struct
*/
assembly->last_frag_num = 0;
assembly->index = 0;
assembly_deref (assembly);
} else {
/*
* Message is fragmented, keep around assembly list
*/
if (mcast->msg_count > 1) {
memmove (&assembly->data[0],
&assembly->data[assembly->index],
msg_lens[msg_count]);
assembly->index = 0;
}
assembly->index += msg_lens[msg_count];
}
}
/*
* Totem Process Group Abstraction
* depends on poll abstraction, POSIX, IPV4
*/
void *callback_token_received_handle;
int callback_token_received_fn (enum totem_callback_token_type type,
const void *data)
{
struct totempg_mcast mcast;
struct iovec iovecs[3];
int res;
pthread_mutex_lock (&mcast_msg_mutex);
if (mcast_packed_msg_count == 0) {
pthread_mutex_unlock (&mcast_msg_mutex);
return (0);
}
if (totemmrp_avail() == 0) {
pthread_mutex_unlock (&mcast_msg_mutex);
return (0);
}
mcast.fragmented = 0;
/*
* Was the first message in this buffer a continuation of a
* fragmented message?
*/
mcast.continuation = fragment_continuation;
fragment_continuation = 0;
mcast.msg_count = mcast_packed_msg_count;
iovecs[0].iov_base = (void *)&mcast;
iovecs[0].iov_len = sizeof (struct totempg_mcast);
iovecs[1].iov_base = (void *)mcast_packed_msg_lens;
iovecs[1].iov_len = mcast_packed_msg_count * sizeof (unsigned short);
iovecs[2].iov_base = (void *)&fragmentation_data[0];
iovecs[2].iov_len = fragment_size;
res = totemmrp_mcast (iovecs, 3, 0);
mcast_packed_msg_count = 0;
fragment_size = 0;
pthread_mutex_unlock (&mcast_msg_mutex);
return (0);
}
/*
* Initialize the totem process group abstraction
*/
int totempg_initialize (
hdb_handle_t poll_handle,
struct totem_config *totem_config)
{
int res;
totempg_totem_config = totem_config;
totempg_log_level_security = totem_config->totem_logging_configuration.log_level_security;
totempg_log_level_error = totem_config->totem_logging_configuration.log_level_error;
totempg_log_level_warning = totem_config->totem_logging_configuration.log_level_warning;
totempg_log_level_notice = totem_config->totem_logging_configuration.log_level_notice;
totempg_log_level_debug = totem_config->totem_logging_configuration.log_level_debug;
totempg_log_printf = totem_config->totem_logging_configuration.log_printf;
totempg_subsys_id = totem_config->totem_logging_configuration.log_subsys_id;
fragmentation_data = malloc (TOTEMPG_PACKET_SIZE);
if (fragmentation_data == 0) {
return (-1);
}
res = totemmrp_initialize (
poll_handle,
totem_config,
totempg_deliver_fn,
totempg_confchg_fn);
totemmrp_callback_token_create (
&callback_token_received_handle,
TOTEM_CALLBACK_TOKEN_RECEIVED,
0,
callback_token_received_fn,
0);
totemsrp_net_mtu_adjust (totem_config);
return (res);
}
void totempg_finalize (void)
{
pthread_mutex_lock (&totempg_mutex);
totemmrp_finalize ();
pthread_mutex_unlock (&totempg_mutex);
}
/*
* Multicast a message
*/
static int mcast_msg (
struct iovec *iovec_in,
unsigned int iov_len,
int guarantee)
{
int res = 0;
struct totempg_mcast mcast;
struct iovec iovecs[3];
struct iovec iovec[64];
int i;
int dest, src;
int max_packet_size = 0;
int copy_len = 0;
int copy_base = 0;
int total_size = 0;
pthread_mutex_lock (&mcast_msg_mutex);
totemmrp_new_msg_signal ();
/*
* Remove zero length iovectors from the list
*/
assert (iov_len < 64);
for (dest = 0, src = 0; src < iov_len; src++) {
if (iovec_in[src].iov_len) {
memcpy (&iovec[dest++], &iovec_in[src],
sizeof (struct iovec));
}
}
iov_len = dest;
max_packet_size = TOTEMPG_PACKET_SIZE -
(sizeof (unsigned short) * (mcast_packed_msg_count + 1));
mcast_packed_msg_lens[mcast_packed_msg_count] = 0;
/*
* Check if we would overwrite new message queue
*/
for (i = 0; i < iov_len; i++) {
total_size += iovec[i].iov_len;
}
if (byte_count_send_ok (total_size + sizeof(unsigned short) *
(mcast_packed_msg_count+1)) == 0) {
pthread_mutex_unlock (&mcast_msg_mutex);
return(-1);
}
for (i = 0; i < iov_len; ) {
mcast.fragmented = 0;
mcast.continuation = fragment_continuation;
copy_len = iovec[i].iov_len - copy_base;
/*
* If it all fits with room left over, copy it in.
* We need to leave at least sizeof(short) + 1 bytes in the
* fragment_buffer on exit so that max_packet_size + fragment_size
* doesn't exceed the size of the fragment_buffer on the next call.
*/
if ((copy_len + fragment_size) <
(max_packet_size - sizeof (unsigned short))) {
memcpy (&fragmentation_data[fragment_size],
(char *)iovec[i].iov_base + copy_base, copy_len);
fragment_size += copy_len;
mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len;
next_fragment = 1;
copy_len = 0;
copy_base = 0;
i++;
continue;
/*
* If it just fits or is too big, then send out what fits.
*/
} else {
unsigned char *data_ptr;
copy_len = min(copy_len, max_packet_size - fragment_size);
if( copy_len == max_packet_size )
data_ptr = (unsigned char *)iovec[i].iov_base + copy_base;
else {
data_ptr = fragmentation_data;
memcpy (&fragmentation_data[fragment_size],
(unsigned char *)iovec[i].iov_base + copy_base, copy_len);
}
memcpy (&fragmentation_data[fragment_size],
(unsigned char *)iovec[i].iov_base + copy_base, copy_len);
mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len;
/*
* if we're not on the last iovec or the iovec is too large to
* fit, then indicate a fragment. This also means that the next
* message will have the continuation of this one.
*/
if ((i < (iov_len - 1)) ||
((copy_base + copy_len) < iovec[i].iov_len)) {
if (!next_fragment) {
next_fragment++;
}
fragment_continuation = next_fragment;
mcast.fragmented = next_fragment++;
assert(fragment_continuation != 0);
assert(mcast.fragmented != 0);
} else {
fragment_continuation = 0;
}
/*
* assemble the message and send it
*/
mcast.msg_count = ++mcast_packed_msg_count;
iovecs[0].iov_base = (void *)&mcast;
iovecs[0].iov_len = sizeof(struct totempg_mcast);
iovecs[1].iov_base = (void *)mcast_packed_msg_lens;
iovecs[1].iov_len = mcast_packed_msg_count *
sizeof(unsigned short);
iovecs[2].iov_base = (void *)data_ptr;
iovecs[2].iov_len = max_packet_size;
assert (totemmrp_avail() > 0);
res = totemmrp_mcast (iovecs, 3, guarantee);
/*
* Recalculate counts and indexes for the next.
*/
mcast_packed_msg_lens[0] = 0;
mcast_packed_msg_count = 0;
fragment_size = 0;
max_packet_size = TOTEMPG_PACKET_SIZE - (sizeof(unsigned short));
/*
* If the iovec all fit, go to the next iovec
*/
if ((copy_base + copy_len) == iovec[i].iov_len) {
copy_len = 0;
copy_base = 0;
i++;
/*
* Continue with the rest of the current iovec.
*/
} else {
copy_base += copy_len;
}
}
}
/*
* Bump only if we added message data. This may be zero if
* the last buffer just fit into the fragmentation_data buffer
* and we were at the last iovec.
*/
if (mcast_packed_msg_lens[mcast_packed_msg_count]) {
mcast_packed_msg_count++;
}
pthread_mutex_unlock (&mcast_msg_mutex);
return (res);
}
/*
* Determine if a message of msg_size could be queued
*/
static int msg_count_send_ok (
int msg_count)
{
int avail = 0;
avail = totemmrp_avail () - totempg_reserved - 1;
return (avail > msg_count);
}
static int byte_count_send_ok (
int byte_count)
{
unsigned int msg_count = 0;
int avail = 0;
avail = totemmrp_avail () - 1;
msg_count = (byte_count / (totempg_totem_config->net_mtu - 25)) + 1;
return (avail > msg_count);
}
static int send_reserve (
int msg_size)
{
unsigned int msg_count = 0;
msg_count = (msg_size / (totempg_totem_config->net_mtu - 25)) + 1;
totempg_reserved += msg_count;
return (msg_count);
}
static void send_release (
int msg_count)
{
totempg_reserved -= msg_count;
}
int totempg_callback_token_create (
void **handle_out,
enum totem_callback_token_type type,
int delete,
int (*callback_fn) (enum totem_callback_token_type type, const void *),
const void *data)
{
unsigned int res;
pthread_mutex_lock (&callback_token_mutex);
res = totemmrp_callback_token_create (handle_out, type, delete,
callback_fn, data);
pthread_mutex_unlock (&callback_token_mutex);
return (res);
}
void totempg_callback_token_destroy (
void *handle_out)
{
pthread_mutex_lock (&callback_token_mutex);
totemmrp_callback_token_destroy (handle_out);
pthread_mutex_unlock (&callback_token_mutex);
}
/*
* vi: set autoindent tabstop=4 shiftwidth=4 :
*/
int totempg_groups_initialize (
hdb_handle_t *handle,
void (*deliver_fn) (
unsigned int nodeid,
const void *msg,
unsigned int msg_len,
int endian_conversion_required),
void (*confchg_fn) (
enum totem_configuration_type configuration_type,
const unsigned int *member_list, size_t member_list_entries,
const unsigned int *left_list, size_t left_list_entries,
const unsigned int *joined_list, size_t joined_list_entries,
const struct memb_ring_id *ring_id))
{
struct totempg_group_instance *instance;
unsigned int res;
pthread_mutex_lock (&totempg_mutex);
res = hdb_handle_create (&totempg_groups_instance_database,
sizeof (struct totempg_group_instance), handle);
if (res != 0) {
goto error_exit;
}
if (*handle > totempg_max_handle) {
totempg_max_handle = *handle;
}
res = hdb_handle_get (&totempg_groups_instance_database, *handle,
(void *)&instance);
if (res != 0) {
goto error_destroy;
}
instance->deliver_fn = deliver_fn;
instance->confchg_fn = confchg_fn;
instance->groups = 0;
instance->groups_cnt = 0;
hdb_handle_put (&totempg_groups_instance_database, *handle);
pthread_mutex_unlock (&totempg_mutex);
return (0);
error_destroy:
hdb_handle_destroy (&totempg_groups_instance_database, *handle);
error_exit:
pthread_mutex_unlock (&totempg_mutex);
return (-1);
}
int totempg_groups_join (
hdb_handle_t handle,
const struct totempg_group *groups,
size_t group_cnt)
{
struct totempg_group_instance *instance;
struct totempg_group *new_groups;
unsigned int res;
pthread_mutex_lock (&totempg_mutex);
res = hdb_handle_get (&totempg_groups_instance_database, handle,
(void *)&instance);
if (res != 0) {
goto error_exit;
}
new_groups = realloc (instance->groups,
sizeof (struct totempg_group) *
(instance->groups_cnt + group_cnt));
if (new_groups == 0) {
res = ENOMEM;
goto error_exit;
}
memcpy (&new_groups[instance->groups_cnt],
groups, group_cnt * sizeof (struct totempg_group));
instance->groups = new_groups;
instance->groups_cnt += group_cnt;
hdb_handle_put (&totempg_groups_instance_database, handle);
error_exit:
pthread_mutex_unlock (&totempg_mutex);
return (res);
}
int totempg_groups_leave (
hdb_handle_t handle,
const struct totempg_group *groups,
size_t group_cnt)
{
struct totempg_group_instance *instance;
unsigned int res;
pthread_mutex_lock (&totempg_mutex);
res = hdb_handle_get (&totempg_groups_instance_database, handle,
(void *)&instance);
if (res != 0) {
goto error_exit;
}
hdb_handle_put (&totempg_groups_instance_database, handle);
error_exit:
pthread_mutex_unlock (&totempg_mutex);
return (res);
}
#define MAX_IOVECS_FROM_APP 32
#define MAX_GROUPS_PER_MSG 32
int totempg_groups_mcast_joined (
hdb_handle_t handle,
const struct iovec *iovec,
unsigned int iov_len,
int guarantee)
{
struct totempg_group_instance *instance;
unsigned short group_len[MAX_GROUPS_PER_MSG + 1];
struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP];
int i;
unsigned int res;
pthread_mutex_lock (&totempg_mutex);
res = hdb_handle_get (&totempg_groups_instance_database, handle,
(void *)&instance);
if (res != 0) {
goto error_exit;
}
/*
* Build group_len structure and the iovec_mcast structure
*/
group_len[0] = instance->groups_cnt;
for (i = 0; i < instance->groups_cnt; i++) {
group_len[i + 1] = instance->groups[i].group_len;
iovec_mcast[i + 1].iov_len = instance->groups[i].group_len;
iovec_mcast[i + 1].iov_base = (void *) instance->groups[i].group;
}
iovec_mcast[0].iov_len = (instance->groups_cnt + 1) * sizeof (unsigned short);
iovec_mcast[0].iov_base = group_len;
for (i = 0; i < iov_len; i++) {
iovec_mcast[i + instance->groups_cnt + 1].iov_len = iovec[i].iov_len;
iovec_mcast[i + instance->groups_cnt + 1].iov_base = iovec[i].iov_base;
}
res = mcast_msg (iovec_mcast, iov_len + instance->groups_cnt + 1, guarantee);
hdb_handle_put (&totempg_groups_instance_database, handle);
error_exit:
pthread_mutex_unlock (&totempg_mutex);
return (res);
}
int totempg_groups_joined_reserve (
hdb_handle_t handle,
const struct iovec *iovec,
unsigned int iov_len)
{
struct totempg_group_instance *instance;
unsigned int size = 0;
unsigned int i;
unsigned int res;
unsigned int reserved = 0;
pthread_mutex_lock (&totempg_mutex);
pthread_mutex_lock (&mcast_msg_mutex);
res = hdb_handle_get (&totempg_groups_instance_database, handle,
(void *)&instance);
if (res != 0) {
goto error_exit;
}
for (i = 0; i < instance->groups_cnt; i++) {
size += instance->groups[i].group_len;
}
for (i = 0; i < iov_len; i++) {
size += iovec[i].iov_len;
}
reserved = send_reserve (size);
if (msg_count_send_ok (reserved) == 0) {
send_release (reserved);
reserved = 0;
}
hdb_handle_put (&totempg_groups_instance_database, handle);
error_exit:
pthread_mutex_unlock (&mcast_msg_mutex);
pthread_mutex_unlock (&totempg_mutex);
return (reserved);
}
int totempg_groups_joined_release (int msg_count)
{
pthread_mutex_lock (&totempg_mutex);
pthread_mutex_lock (&mcast_msg_mutex);
send_release (msg_count);
pthread_mutex_unlock (&mcast_msg_mutex);
pthread_mutex_unlock (&totempg_mutex);
return 0;
}
int totempg_groups_mcast_groups (
hdb_handle_t handle,
int guarantee,
const struct totempg_group *groups,
size_t groups_cnt,
const struct iovec *iovec,
unsigned int iov_len)
{
struct totempg_group_instance *instance;
unsigned short group_len[MAX_GROUPS_PER_MSG + 1];
struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP];
int i;
unsigned int res;
pthread_mutex_lock (&totempg_mutex);
res = hdb_handle_get (&totempg_groups_instance_database, handle,
(void *)&instance);
if (res != 0) {
goto error_exit;
}
/*
* Build group_len structure and the iovec_mcast structure
*/
group_len[0] = groups_cnt;
for (i = 0; i < groups_cnt; i++) {
group_len[i + 1] = groups[i].group_len;
iovec_mcast[i + 1].iov_len = groups[i].group_len;
iovec_mcast[i + 1].iov_base = (void *) groups[i].group;
}
iovec_mcast[0].iov_len = (groups_cnt + 1) * sizeof (unsigned short);
iovec_mcast[0].iov_base = group_len;
for (i = 0; i < iov_len; i++) {
iovec_mcast[i + groups_cnt + 1].iov_len = iovec[i].iov_len;
iovec_mcast[i + groups_cnt + 1].iov_base = iovec[i].iov_base;
}
res = mcast_msg (iovec_mcast, iov_len + groups_cnt + 1, guarantee);
hdb_handle_put (&totempg_groups_instance_database, handle);
error_exit:
pthread_mutex_unlock (&totempg_mutex);
return (res);
}
/*
* Returns -1 if error, 0 if can't send, 1 if can send the message
*/
int totempg_groups_send_ok_groups (
hdb_handle_t handle,
const struct totempg_group *groups,
size_t groups_cnt,
const struct iovec *iovec,
unsigned int iov_len)
{
struct totempg_group_instance *instance;
unsigned int size = 0;
unsigned int i;
unsigned int res;
pthread_mutex_lock (&totempg_mutex);
res = hdb_handle_get (&totempg_groups_instance_database, handle,
(void *)&instance);
if (res != 0) {
goto error_exit;
}
for (i = 0; i < groups_cnt; i++) {
size += groups[i].group_len;
}
for (i = 0; i < iov_len; i++) {
size += iovec[i].iov_len;
}
res = msg_count_send_ok (size);
hdb_handle_put (&totempg_groups_instance_database, handle);
error_exit:
pthread_mutex_unlock (&totempg_mutex);
return (res);
}
int totempg_ifaces_get (
unsigned int nodeid,
struct totem_ip_address *interfaces,
char ***status,
unsigned int *iface_count)
{
int res;
res = totemmrp_ifaces_get (
nodeid,
interfaces,
status,
iface_count);
return (res);
}
int totempg_crypto_set (
unsigned int type)
{
int res;
res = totemmrp_crypto_set (
type);
return (res);
}
int totempg_ring_reenable (void)
{
int res;
res = totemmrp_ring_reenable ();
return (res);
}
const char *totempg_ifaces_print (unsigned int nodeid)
{
static char iface_string[256 * INTERFACE_MAX];
char one_iface[64];
struct totem_ip_address interfaces[INTERFACE_MAX];
char **status;
unsigned int iface_count;
unsigned int i;
int res;
iface_string[0] = '\0';
res = totempg_ifaces_get (nodeid, interfaces, &status, &iface_count);
if (res == -1) {
return ("no interface found for nodeid");
}
for (i = 0; i < iface_count; i++) {
sprintf (one_iface, "r(%d) ip(%s) ",
i, totemip_print (&interfaces[i]));
strcat (iface_string, one_iface);
}
return (iface_string);
}
unsigned int totempg_my_nodeid_get (void)
{
return (totemmrp_my_nodeid_get());
}
int totempg_my_family_get (void)
{
return (totemmrp_my_family_get());
}
diff --git a/exec/totemsrp.c b/exec/totemsrp.c
index f0501a93..6ad041ef 100644
--- a/exec/totemsrp.c
+++ b/exec/totemsrp.c
@@ -1,4242 +1,4244 @@
/*
* Copyright (c) 2003-2006 MontaVista Software, Inc.
* Copyright (c) 2006-2009 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Steven Dake (sdake@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
/*
* The first version of this code was based upon Yair Amir's PhD thesis:
* http://www.cs.jhu.edu/~yairamir/phd.ps) (ch4,5).
*
* The current version of totemsrp implements the Totem protocol specified in:
* http://citeseer.ist.psu.edu/amir95totem.html
*
* The deviations from the above published protocols are:
* - encryption of message contents with SOBER128
* - authentication of meessage contents with SHA1/HMAC
* - token hold mode where token doesn't rotate on unused ring - reduces cpu
* usage on 1.6ghz xeon from 35% to less then .1 % as measured by top
*/
#include <config.h>
#include <assert.h>
+#ifdef HAVE_ALLOCA_H
#include <alloca.h>
+#endif
#include <sys/mman.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <netdb.h>
#include <sys/un.h>
#include <sys/ioctl.h>
#include <sys/param.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <sched.h>
#include <time.h>
#include <sys/time.h>
#include <sys/poll.h>
#include <limits.h>
#include <corosync/swab.h>
#include <corosync/cs_queue.h>
#include <corosync/sq.h>
#include <corosync/list.h>
#include <corosync/hdb.h>
#include <corosync/totem/coropoll.h>
#define LOGSYS_UTILS_ONLY 1
#include <corosync/engine/logsys.h>
#include "totemsrp.h"
#include "totemrrp.h"
#include "totemnet.h"
#include "wthread.h"
#include "crypto.h"
#define LOCALHOST_IP inet_addr("127.0.0.1")
#define QUEUE_RTR_ITEMS_SIZE_MAX 256 /* allow 256 retransmit items */
#define RETRANS_MESSAGE_QUEUE_SIZE_MAX 500 /* allow 500 messages to be queued */
#define RECEIVED_MESSAGE_QUEUE_SIZE_MAX 500 /* allow 500 messages to be queued */
#define MAXIOVS 5
#define RETRANSMIT_ENTRIES_MAX 30
#define TOKEN_SIZE_MAX 64000 /* bytes */
#define LEAVE_DUMMY_NODEID 0
/*
* Rollover handling:
* SEQNO_START_MSG is the starting sequence number after a new configuration
* This should remain zero, unless testing overflow in which case
* 0x7ffff000 and 0xfffff000 are good starting values.
*
* SEQNO_START_TOKEN is the starting sequence number after a new configuration
* for a token. This should remain zero, unless testing overflow in which
* case 07fffff00 or 0xffffff00 are good starting values.
*
* SEQNO_START_MSG is the starting sequence number after a new configuration
* This should remain zero, unless testing overflow in which case
* 0x7ffff000 and 0xfffff000 are good values to start with
*/
#define SEQNO_START_MSG 0x0
#define SEQNO_START_TOKEN 0x0
/*
* These can be used ot test different rollover points
* #define SEQNO_START_MSG 0xfffffe00
* #define SEQNO_START_TOKEN 0xfffffe00
*/
/*
* These can be used to test the error recovery algorithms
* #define TEST_DROP_ORF_TOKEN_PERCENTAGE 30
* #define TEST_DROP_COMMIT_TOKEN_PERCENTAGE 30
* #define TEST_DROP_MCAST_PERCENTAGE 50
* #define TEST_RECOVERY_MSG_COUNT 300
*/
/*
* we compare incoming messages to determine if their endian is
* different - if so convert them
*
* do not change
*/
#define ENDIAN_LOCAL 0xff22
enum message_type {
MESSAGE_TYPE_ORF_TOKEN = 0, /* Ordering, Reliability, Flow (ORF) control Token */
MESSAGE_TYPE_MCAST = 1, /* ring ordered multicast message */
MESSAGE_TYPE_MEMB_MERGE_DETECT = 2, /* merge rings if there are available rings */
MESSAGE_TYPE_MEMB_JOIN = 3, /* membership join message */
MESSAGE_TYPE_MEMB_COMMIT_TOKEN = 4, /* membership commit token */
MESSAGE_TYPE_TOKEN_HOLD_CANCEL = 5, /* cancel the holding of the token */
};
enum encapsulation_type {
MESSAGE_ENCAPSULATED = 1,
MESSAGE_NOT_ENCAPSULATED = 2
};
/*
* New membership algorithm local variables
*/
struct srp_addr {
struct totem_ip_address addr[INTERFACE_MAX];
};
struct consensus_list_item {
struct srp_addr addr;
int set;
};
struct token_callback_instance {
struct list_head list;
int (*callback_fn) (enum totem_callback_token_type type, const void *);
enum totem_callback_token_type callback_type;
int delete;
void *data;
};
struct totemsrp_socket {
int mcast;
int token;
};
struct message_header {
char type;
char encapsulated;
unsigned short endian_detector;
unsigned int nodeid;
} __attribute__((packed));
struct mcast {
struct message_header header;
struct srp_addr system_from;
unsigned int seq;
int this_seqno;
struct memb_ring_id ring_id;
unsigned int node_id;
int guarantee;
} __attribute__((packed));
/*
* MTU - multicast message header - IP header - UDP header
*
* On lossy switches, making use of the DF UDP flag can lead to loss of
* forward progress. So the packets must be fragmented by a higher layer
*
* This layer can only handle packets of MTU size.
*/
#define FRAGMENT_SIZE (FRAME_SIZE_MAX - sizeof (struct mcast) - 20 - 8)
struct rtr_item {
struct memb_ring_id ring_id;
unsigned int seq;
}__attribute__((packed));
struct orf_token {
struct message_header header;
unsigned int seq;
unsigned int token_seq;
unsigned int aru;
unsigned int aru_addr;
struct memb_ring_id ring_id;
unsigned int backlog;
unsigned int fcc;
int retrans_flg;
int rtr_list_entries;
struct rtr_item rtr_list[0];
}__attribute__((packed));
struct memb_join {
struct message_header header;
struct srp_addr system_from;
unsigned int proc_list_entries;
unsigned int failed_list_entries;
unsigned long long ring_seq;
unsigned char end_of_memb_join[0];
/*
* These parts of the data structure are dynamic:
* struct srp_addr proc_list[];
* struct srp_addr failed_list[];
*/
} __attribute__((packed));
struct memb_merge_detect {
struct message_header header;
struct srp_addr system_from;
struct memb_ring_id ring_id;
} __attribute__((packed));
struct token_hold_cancel {
struct message_header header;
struct memb_ring_id ring_id;
} __attribute__((packed));
struct memb_commit_token_memb_entry {
struct memb_ring_id ring_id;
unsigned int aru;
unsigned int high_delivered;
unsigned int received_flg;
}__attribute__((packed));
struct memb_commit_token {
struct message_header header;
unsigned int token_seq;
struct memb_ring_id ring_id;
unsigned int retrans_flg;
int memb_index;
int addr_entries;
unsigned char end_of_commit_token[0];
/*
* These parts of the data structure are dynamic:
*
* struct srp_addr addr[PROCESSOR_COUNT_MAX];
* struct memb_commit_token_memb_entry memb_list[PROCESSOR_COUNT_MAX];
*/
}__attribute__((packed));
struct message_item {
struct mcast *mcast;
unsigned int msg_len;
};
struct sort_queue_item {
struct mcast *mcast;
unsigned int msg_len;
};
struct orf_token_mcast_thread_state {
char iobuf[9000];
prng_state prng_state;
};
enum memb_state {
MEMB_STATE_OPERATIONAL = 1,
MEMB_STATE_GATHER = 2,
MEMB_STATE_COMMIT = 3,
MEMB_STATE_RECOVERY = 4
};
struct totemsrp_instance {
int iface_changes;
/*
* Flow control mcasts and remcasts on last and current orf_token
*/
int fcc_remcast_last;
int fcc_mcast_last;
int fcc_remcast_current;
struct consensus_list_item consensus_list[PROCESSOR_COUNT_MAX];
int consensus_list_entries;
struct srp_addr my_id;
struct srp_addr my_proc_list[PROCESSOR_COUNT_MAX];
struct srp_addr my_failed_list[PROCESSOR_COUNT_MAX];
struct srp_addr my_new_memb_list[PROCESSOR_COUNT_MAX];
struct srp_addr my_trans_memb_list[PROCESSOR_COUNT_MAX];
struct srp_addr my_memb_list[PROCESSOR_COUNT_MAX];
struct srp_addr my_deliver_memb_list[PROCESSOR_COUNT_MAX];
struct srp_addr my_left_memb_list[PROCESSOR_COUNT_MAX];
int my_proc_list_entries;
int my_failed_list_entries;
int my_new_memb_entries;
int my_trans_memb_entries;
int my_memb_entries;
int my_deliver_memb_entries;
int my_left_memb_entries;
struct memb_ring_id my_ring_id;
struct memb_ring_id my_old_ring_id;
int my_aru_count;
int my_merge_detect_timeout_outstanding;
unsigned int my_last_aru;
int my_seq_unchanged;
int my_received_flg;
unsigned int my_high_seq_received;
unsigned int my_install_seq;
int my_rotation_counter;
int my_set_retrans_flg;
int my_retrans_flg_count;
unsigned int my_high_ring_delivered;
int heartbeat_timeout;
/*
* Queues used to order, deliver, and recover messages
*/
struct cs_queue new_message_queue;
struct cs_queue retrans_message_queue;
struct sq regular_sort_queue;
struct sq recovery_sort_queue;
/*
* Received up to and including
*/
unsigned int my_aru;
unsigned int my_high_delivered;
struct list_head token_callback_received_listhead;
struct list_head token_callback_sent_listhead;
char *orf_token_retransmit[TOKEN_SIZE_MAX];
int orf_token_retransmit_size;
unsigned int my_token_seq;
/*
* Timers
*/
poll_timer_handle timer_orf_token_timeout;
poll_timer_handle timer_orf_token_retransmit_timeout;
poll_timer_handle timer_orf_token_hold_retransmit_timeout;
poll_timer_handle timer_merge_detect_timeout;
poll_timer_handle memb_timer_state_gather_join_timeout;
poll_timer_handle memb_timer_state_gather_consensus_timeout;
poll_timer_handle memb_timer_state_commit_timeout;
poll_timer_handle timer_heartbeat_timeout;
/*
* Function and data used to log messages
*/
int totemsrp_log_level_security;
int totemsrp_log_level_error;
int totemsrp_log_level_warning;
int totemsrp_log_level_notice;
int totemsrp_log_level_debug;
int totemsrp_subsys_id;
void (*totemsrp_log_printf) (
unsigned int rec_ident,
const char *function,
const char *file,
int line,
const char *format, ...)__attribute__((format(printf, 5, 6)));;
enum memb_state memb_state;
//TODO struct srp_addr next_memb;
hdb_handle_t totemsrp_poll_handle;
struct totem_ip_address mcast_address;
void (*totemsrp_deliver_fn) (
unsigned int nodeid,
const void *msg,
unsigned int msg_len,
int endian_conversion_required);
void (*totemsrp_confchg_fn) (
enum totem_configuration_type configuration_type,
const unsigned int *member_list, size_t member_list_entries,
const unsigned int *left_list, size_t left_list_entries,
const unsigned int *joined_list, size_t joined_list_entries,
const struct memb_ring_id *ring_id);
int global_seqno;
int my_token_held;
unsigned long long token_ring_id_seq;
unsigned int last_released;
unsigned int set_aru;
int old_ring_state_saved;
int old_ring_state_aru;
unsigned int old_ring_state_high_seq_received;
int ring_saved;
unsigned int my_last_seq;
struct timeval tv_old;
hdb_handle_t totemrrp_handle;
struct totem_config *totem_config;
unsigned int use_heartbeat;
unsigned int my_trc;
unsigned int my_pbl;
unsigned int my_cbl;
};
struct message_handlers {
int count;
int (*handler_functions[6]) (
struct totemsrp_instance *instance,
const void *msg,
size_t msg_len,
int endian_conversion_needed);
};
/*
* forward decls
*/
static int message_handler_orf_token (
struct totemsrp_instance *instance,
const void *msg,
size_t msg_len,
int endian_conversion_needed);
static int message_handler_mcast (
struct totemsrp_instance *instance,
const void *msg,
size_t msg_len,
int endian_conversion_needed);
static int message_handler_memb_merge_detect (
struct totemsrp_instance *instance,
const void *msg,
size_t msg_len,
int endian_conversion_needed);
static int message_handler_memb_join (
struct totemsrp_instance *instance,
const void *msg,
size_t msg_len,
int endian_conversion_needed);
static int message_handler_memb_commit_token (
struct totemsrp_instance *instance,
const void *msg,
size_t msg_len,
int endian_conversion_needed);
static int message_handler_token_hold_cancel (
struct totemsrp_instance *instance,
const void *msg,
size_t msg_len,
int endian_conversion_needed);
static void totemsrp_instance_initialize (struct totemsrp_instance *instance);
static unsigned int main_msgs_missing (void);
static void main_token_seqid_get (
const void *msg,
unsigned int *seqid,
unsigned int *token_is);
static void srp_addr_copy (struct srp_addr *dest, const struct srp_addr *src);
static void srp_addr_to_nodeid (
unsigned int *nodeid_out,
struct srp_addr *srp_addr_in,
unsigned int entries);
static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b);
static void memb_leave_message_send (struct totemsrp_instance *instance);
static void memb_ring_id_create_or_load (struct totemsrp_instance *, struct memb_ring_id *);
static void token_callbacks_execute (struct totemsrp_instance *instance, enum totem_callback_token_type type);
static void memb_state_gather_enter (struct totemsrp_instance *instance, int gather_from);
static void messages_deliver_to_app (struct totemsrp_instance *instance, int skip, unsigned int end_point);
static int orf_token_mcast (struct totemsrp_instance *instance, struct orf_token *oken,
int fcc_mcasts_allowed);
static void messages_free (struct totemsrp_instance *instance, unsigned int token_aru);
static void memb_ring_id_set_and_store (struct totemsrp_instance *instance,
const struct memb_ring_id *ring_id);
static void memb_state_commit_token_update (struct totemsrp_instance *instance, struct memb_commit_token *commit_token);
static void memb_state_commit_token_target_set (struct totemsrp_instance *instance, struct memb_commit_token *commit_token);
static int memb_state_commit_token_send (struct totemsrp_instance *instance, struct memb_commit_token *memb_commit_token);
static void memb_state_commit_token_create (struct totemsrp_instance *instance, struct memb_commit_token *commit_token);
static int token_hold_cancel_send (struct totemsrp_instance *instance);
static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out);
static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out);
static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out);
static void mcast_endian_convert (const struct mcast *in, struct mcast *out);
static void memb_merge_detect_endian_convert (
const struct memb_merge_detect *in,
struct memb_merge_detect *out);
static void srp_addr_copy_endian_convert (struct srp_addr *out, const struct srp_addr *in);
static void timer_function_orf_token_timeout (void *data);
static void timer_function_heartbeat_timeout (void *data);
static void timer_function_token_retransmit_timeout (void *data);
static void timer_function_token_hold_retransmit_timeout (void *data);
static void timer_function_merge_detect_timeout (void *data);
void main_deliver_fn (
void *context,
const void *msg,
unsigned int msg_len);
void main_iface_change_fn (
void *context,
const struct totem_ip_address *iface_address,
unsigned int iface_no);
/*
* All instances in one database
*/
DECLARE_HDB_DATABASE (totemsrp_instance_database,NULL);
struct message_handlers totemsrp_message_handlers = {
6,
{
message_handler_orf_token,
message_handler_mcast,
message_handler_memb_merge_detect,
message_handler_memb_join,
message_handler_memb_commit_token,
message_handler_token_hold_cancel
}
};
static const char *rundir = NULL;
#define log_printf(level, format, args...) \
do { \
instance->totemsrp_log_printf ( \
LOGSYS_ENCODE_RECID(level, \
instance->totemsrp_subsys_id, \
LOGSYS_RECID_LOG), \
__FUNCTION__, __FILE__, __LINE__, \
format, ##args); \
} while (0);
static void totemsrp_instance_initialize (struct totemsrp_instance *instance)
{
memset (instance, 0, sizeof (struct totemsrp_instance));
list_init (&instance->token_callback_received_listhead);
list_init (&instance->token_callback_sent_listhead);
instance->my_received_flg = 1;
instance->my_token_seq = SEQNO_START_TOKEN - 1;
instance->memb_state = MEMB_STATE_OPERATIONAL;
instance->set_aru = -1;
instance->my_aru = SEQNO_START_MSG;
instance->my_high_seq_received = SEQNO_START_MSG;
instance->my_high_delivered = SEQNO_START_MSG;
}
static void main_token_seqid_get (
const void *msg,
unsigned int *seqid,
unsigned int *token_is)
{
const struct orf_token *token = msg;
*seqid = 0;
*token_is = 0;
if (token->header.type == MESSAGE_TYPE_ORF_TOKEN) {
*seqid = token->token_seq;
*token_is = 1;
}
}
static unsigned int main_msgs_missing (void)
{
// TODO
return (0);
}
/*
* Exported interfaces
*/
int totemsrp_initialize (
hdb_handle_t poll_handle,
hdb_handle_t *handle,
struct totem_config *totem_config,
void (*deliver_fn) (
unsigned int nodeid,
const void *msg,
unsigned int msg_len,
int endian_conversion_required),
void (*confchg_fn) (
enum totem_configuration_type configuration_type,
const unsigned int *member_list, size_t member_list_entries,
const unsigned int *left_list, size_t left_list_entries,
const unsigned int *joined_list, size_t joined_list_entries,
const struct memb_ring_id *ring_id))
{
struct totemsrp_instance *instance;
unsigned int res;
res = hdb_handle_create (&totemsrp_instance_database,
sizeof (struct totemsrp_instance), handle);
if (res != 0) {
goto error_exit;
}
res = hdb_handle_get (&totemsrp_instance_database, *handle,
(void *)&instance);
if (res != 0) {
goto error_destroy;
}
rundir = getenv ("COROSYNC_RUN_DIR");
if (rundir == NULL) {
rundir = LOCALSTATEDIR "/lib/corosync";
}
res = mkdir (rundir, 0700);
if (res == -1 && errno != EEXIST) {
goto error_put;
}
res = chdir (rundir);
if (res == -1) {
goto error_put;
}
totemsrp_instance_initialize (instance);
instance->totem_config = totem_config;
/*
* Configure logging
*/
instance->totemsrp_log_level_security = totem_config->totem_logging_configuration.log_level_security;
instance->totemsrp_log_level_error = totem_config->totem_logging_configuration.log_level_error;
instance->totemsrp_log_level_warning = totem_config->totem_logging_configuration.log_level_warning;
instance->totemsrp_log_level_notice = totem_config->totem_logging_configuration.log_level_notice;
instance->totemsrp_log_level_debug = totem_config->totem_logging_configuration.log_level_debug;
instance->totemsrp_subsys_id = totem_config->totem_logging_configuration.log_subsys_id;
instance->totemsrp_log_printf = totem_config->totem_logging_configuration.log_printf;
/*
* Initialize local variables for totemsrp
*/
totemip_copy (&instance->mcast_address, &totem_config->interfaces[0].mcast_addr);
/*
* Display totem configuration
*/
log_printf (instance->totemsrp_log_level_notice,
"Token Timeout (%d ms) retransmit timeout (%d ms)\n",
totem_config->token_timeout, totem_config->token_retransmit_timeout);
log_printf (instance->totemsrp_log_level_notice,
"token hold (%d ms) retransmits before loss (%d retrans)\n",
totem_config->token_hold_timeout, totem_config->token_retransmits_before_loss_const);
log_printf (instance->totemsrp_log_level_notice,
"join (%d ms) send_join (%d ms) consensus (%d ms) merge (%d ms)\n",
totem_config->join_timeout,
totem_config->send_join_timeout,
totem_config->consensus_timeout,
totem_config->merge_timeout);
log_printf (instance->totemsrp_log_level_notice,
"downcheck (%d ms) fail to recv const (%d msgs)\n",
totem_config->downcheck_timeout, totem_config->fail_to_recv_const);
log_printf (instance->totemsrp_log_level_notice,
"seqno unchanged const (%d rotations) Maximum network MTU %d\n", totem_config->seqno_unchanged_const, totem_config->net_mtu);
log_printf (instance->totemsrp_log_level_notice,
"window size per rotation (%d messages) maximum messages per rotation (%d messages)\n",
totem_config->window_size, totem_config->max_messages);
log_printf (instance->totemsrp_log_level_notice,
"send threads (%d threads)\n", totem_config->threads);
log_printf (instance->totemsrp_log_level_notice,
"RRP token expired timeout (%d ms)\n",
totem_config->rrp_token_expired_timeout);
log_printf (instance->totemsrp_log_level_notice,
"RRP token problem counter (%d ms)\n",
totem_config->rrp_problem_count_timeout);
log_printf (instance->totemsrp_log_level_notice,
"RRP threshold (%d problem count)\n",
totem_config->rrp_problem_count_threshold);
log_printf (instance->totemsrp_log_level_notice,
"RRP mode set to %s.\n", instance->totem_config->rrp_mode);
log_printf (instance->totemsrp_log_level_notice,
"heartbeat_failures_allowed (%d)\n", totem_config->heartbeat_failures_allowed);
log_printf (instance->totemsrp_log_level_notice,
"max_network_delay (%d ms)\n", totem_config->max_network_delay);
cs_queue_init (&instance->retrans_message_queue, RETRANS_MESSAGE_QUEUE_SIZE_MAX,
sizeof (struct message_item));
sq_init (&instance->regular_sort_queue,
QUEUE_RTR_ITEMS_SIZE_MAX, sizeof (struct sort_queue_item), 0);
sq_init (&instance->recovery_sort_queue,
QUEUE_RTR_ITEMS_SIZE_MAX, sizeof (struct sort_queue_item), 0);
instance->totemsrp_poll_handle = poll_handle;
instance->totemsrp_deliver_fn = deliver_fn;
instance->totemsrp_confchg_fn = confchg_fn;
instance->use_heartbeat = 1;
if ( totem_config->heartbeat_failures_allowed == 0 ) {
log_printf (instance->totemsrp_log_level_notice,
"HeartBeat is Disabled. To enable set heartbeat_failures_allowed > 0\n");
instance->use_heartbeat = 0;
}
if (instance->use_heartbeat) {
instance->heartbeat_timeout
= (totem_config->heartbeat_failures_allowed) * totem_config->token_retransmit_timeout
+ totem_config->max_network_delay;
if (instance->heartbeat_timeout >= totem_config->token_timeout) {
log_printf (instance->totemsrp_log_level_notice,
"total heartbeat_timeout (%d ms) is not less than token timeout (%d ms)\n",
instance->heartbeat_timeout,
totem_config->token_timeout);
log_printf (instance->totemsrp_log_level_notice,
"heartbeat_timeout = heartbeat_failures_allowed * token_retransmit_timeout + max_network_delay\n");
log_printf (instance->totemsrp_log_level_notice,
"heartbeat timeout should be less than the token timeout. HeartBeat is Diabled !!\n");
instance->use_heartbeat = 0;
}
else {
log_printf (instance->totemsrp_log_level_notice,
"total heartbeat_timeout (%d ms)\n", instance->heartbeat_timeout);
}
}
totemrrp_initialize (
poll_handle,
&instance->totemrrp_handle,
totem_config,
instance,
main_deliver_fn,
main_iface_change_fn,
main_token_seqid_get,
main_msgs_missing);
/*
* Must have net_mtu adjusted by totemrrp_initialize first
*/
cs_queue_init (&instance->new_message_queue,
MESSAGE_QUEUE_MAX,
sizeof (struct message_item));
hdb_handle_put (&totemsrp_instance_database, *handle);
return (0);
error_put:
hdb_handle_put (&totemsrp_instance_database, *handle);
error_destroy:
hdb_handle_destroy (&totemsrp_instance_database, *handle);
error_exit:
return (-1);
}
void totemsrp_finalize (
hdb_handle_t handle)
{
struct totemsrp_instance *instance;
unsigned int res;
res = hdb_handle_get (&totemsrp_instance_database, handle,
(void *)&instance);
if (res != 0) {
return;
}
memb_leave_message_send (instance);
hdb_handle_put (&totemsrp_instance_database, handle);
}
int totemsrp_ifaces_get (
hdb_handle_t handle,
unsigned int nodeid,
struct totem_ip_address *interfaces,
char ***status,
unsigned int *iface_count)
{
struct totemsrp_instance *instance;
int res;
unsigned int found = 0;
unsigned int i;
res = hdb_handle_get (&totemsrp_instance_database, handle,
(void *)&instance);
if (res != 0) {
goto error_exit;
}
for (i = 0; i < instance->my_memb_entries; i++) {
if (instance->my_memb_list[i].addr[0].nodeid == nodeid) {
found = 1;
break;
}
}
if (found) {
memcpy (interfaces, &instance->my_memb_list[i],
sizeof (struct srp_addr));
*iface_count = instance->totem_config->interface_count;
goto finish;
}
for (i = 0; i < instance->my_left_memb_entries; i++) {
if (instance->my_left_memb_list[i].addr[0].nodeid == nodeid) {
found = 1;
break;
}
}
if (found) {
memcpy (interfaces, &instance->my_left_memb_list[i],
sizeof (struct srp_addr));
*iface_count = instance->totem_config->interface_count;
} else {
res = -1;
}
finish:
totemrrp_ifaces_get (instance->totemrrp_handle, status, NULL);
hdb_handle_put (&totemsrp_instance_database, handle);
error_exit:
return (res);
}
int totemsrp_crypto_set (
hdb_handle_t handle,
unsigned int type)
{
int res;
struct totemsrp_instance *instance;
res = hdb_handle_get (&totemsrp_instance_database, handle,
(void *)&instance);
if (res != 0) {
return (0);
}
res = totemrrp_crypto_set(instance->totemrrp_handle, type);
hdb_handle_put (&totemsrp_instance_database, handle);
return (res);
}
unsigned int totemsrp_my_nodeid_get (
hdb_handle_t handle)
{
struct totemsrp_instance *instance;
unsigned int res;
res = hdb_handle_get (&totemsrp_instance_database, handle,
(void *)&instance);
if (res != 0) {
return (0);
}
res = instance->totem_config->interfaces[0].boundto.nodeid;
hdb_handle_put (&totemsrp_instance_database, handle);
return (res);
}
int totemsrp_my_family_get (
hdb_handle_t handle)
{
struct totemsrp_instance *instance;
int res;
res = hdb_handle_get (&totemsrp_instance_database, handle,
(void *)&instance);
if (res != 0) {
return (0);
}
res = instance->totem_config->interfaces[0].boundto.family;
hdb_handle_put (&totemsrp_instance_database, handle);
return (res);
}
int totemsrp_ring_reenable (
hdb_handle_t handle)
{
struct totemsrp_instance *instance;
int res;
res = hdb_handle_get (&totemsrp_instance_database, handle,
(void *)&instance);
if (res != 0) {
goto error_exit;
}
totemrrp_ring_reenable (instance->totemrrp_handle);
hdb_handle_put (&totemsrp_instance_database, handle);
error_exit:
return (res);
}
/*
* Set operations for use by the membership algorithm
*/
static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b)
{
unsigned int i;
unsigned int res;
for (i = 0; i < 1; i++) {
res = totemip_equal (&a->addr[i], &b->addr[i]);
if (res == 0) {
return (0);
}
}
return (1);
}
static void srp_addr_copy (struct srp_addr *dest, const struct srp_addr *src)
{
unsigned int i;
for (i = 0; i < INTERFACE_MAX; i++) {
totemip_copy (&dest->addr[i], &src->addr[i]);
}
}
static void srp_addr_to_nodeid (
unsigned int *nodeid_out,
struct srp_addr *srp_addr_in,
unsigned int entries)
{
unsigned int i;
for (i = 0; i < entries; i++) {
nodeid_out[i] = srp_addr_in[i].addr[0].nodeid;
}
}
static void srp_addr_copy_endian_convert (struct srp_addr *out, const struct srp_addr *in)
{
int i;
for (i = 0; i < INTERFACE_MAX; i++) {
totemip_copy_endian_convert (&out->addr[i], &in->addr[i]);
}
}
static void memb_consensus_reset (struct totemsrp_instance *instance)
{
instance->consensus_list_entries = 0;
}
static void memb_set_subtract (
struct srp_addr *out_list, int *out_list_entries,
struct srp_addr *one_list, int one_list_entries,
struct srp_addr *two_list, int two_list_entries)
{
int found = 0;
int i;
int j;
*out_list_entries = 0;
for (i = 0; i < one_list_entries; i++) {
for (j = 0; j < two_list_entries; j++) {
if (srp_addr_equal (&one_list[i], &two_list[j])) {
found = 1;
break;
}
}
if (found == 0) {
srp_addr_copy (&out_list[*out_list_entries], &one_list[i]);
*out_list_entries = *out_list_entries + 1;
}
found = 0;
}
}
/*
* Set consensus for a specific processor
*/
static void memb_consensus_set (
struct totemsrp_instance *instance,
const struct srp_addr *addr)
{
int found = 0;
int i;
if (addr->addr[0].nodeid == LEAVE_DUMMY_NODEID)
return;
for (i = 0; i < instance->consensus_list_entries; i++) {
if (srp_addr_equal(addr, &instance->consensus_list[i].addr)) {
found = 1;
break; /* found entry */
}
}
srp_addr_copy (&instance->consensus_list[i].addr, addr);
instance->consensus_list[i].set = 1;
if (found == 0) {
instance->consensus_list_entries++;
}
return;
}
/*
* Is consensus set for a specific processor
*/
static int memb_consensus_isset (
struct totemsrp_instance *instance,
const struct srp_addr *addr)
{
int i;
for (i = 0; i < instance->consensus_list_entries; i++) {
if (srp_addr_equal (addr, &instance->consensus_list[i].addr)) {
return (instance->consensus_list[i].set);
}
}
return (0);
}
/*
* Is consensus agreed upon based upon consensus database
*/
static int memb_consensus_agreed (
struct totemsrp_instance *instance)
{
struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
int token_memb_entries = 0;
int agreed = 1;
int i;
memb_set_subtract (token_memb, &token_memb_entries,
instance->my_proc_list, instance->my_proc_list_entries,
instance->my_failed_list, instance->my_failed_list_entries);
for (i = 0; i < token_memb_entries; i++) {
if (memb_consensus_isset (instance, &token_memb[i]) == 0) {
agreed = 0;
break;
}
}
assert (token_memb_entries >= 1);
return (agreed);
}
static void memb_consensus_notset (
struct totemsrp_instance *instance,
struct srp_addr *no_consensus_list,
int *no_consensus_list_entries,
struct srp_addr *comparison_list,
int comparison_list_entries)
{
int i;
*no_consensus_list_entries = 0;
for (i = 0; i < instance->my_proc_list_entries; i++) {
if (memb_consensus_isset (instance, &instance->my_proc_list[i]) == 0) {
srp_addr_copy (&no_consensus_list[*no_consensus_list_entries], &instance->my_proc_list[i]);
*no_consensus_list_entries = *no_consensus_list_entries + 1;
}
}
}
/*
* Is set1 equal to set2 Entries can be in different orders
*/
static int memb_set_equal (
struct srp_addr *set1, int set1_entries,
struct srp_addr *set2, int set2_entries)
{
int i;
int j;
int found = 0;
if (set1_entries != set2_entries) {
return (0);
}
for (i = 0; i < set2_entries; i++) {
for (j = 0; j < set1_entries; j++) {
if (srp_addr_equal (&set1[j], &set2[i])) {
found = 1;
break;
}
}
if (found == 0) {
return (0);
}
found = 0;
}
return (1);
}
/*
* Is subset fully contained in fullset
*/
static int memb_set_subset (
const struct srp_addr *subset, int subset_entries,
const struct srp_addr *fullset, int fullset_entries)
{
int i;
int j;
int found = 0;
if (subset_entries > fullset_entries) {
return (0);
}
for (i = 0; i < subset_entries; i++) {
for (j = 0; j < fullset_entries; j++) {
if (srp_addr_equal (&subset[i], &fullset[j])) {
found = 1;
}
}
if (found == 0) {
return (0);
}
found = 0;
}
return (1);
}
/*
* merge subset into fullset taking care not to add duplicates
*/
static void memb_set_merge (
const struct srp_addr *subset, int subset_entries,
struct srp_addr *fullset, int *fullset_entries)
{
int found = 0;
int i;
int j;
for (i = 0; i < subset_entries; i++) {
for (j = 0; j < *fullset_entries; j++) {
if (srp_addr_equal (&fullset[j], &subset[i])) {
found = 1;
break;
}
}
if (found == 0) {
srp_addr_copy (&fullset[*fullset_entries], &subset[i]);
*fullset_entries = *fullset_entries + 1;
}
found = 0;
}
return;
}
static void memb_set_and (
struct srp_addr *set1, int set1_entries,
struct srp_addr *set2, int set2_entries,
struct srp_addr *and, int *and_entries)
{
int i;
int j;
int found = 0;
*and_entries = 0;
for (i = 0; i < set2_entries; i++) {
for (j = 0; j < set1_entries; j++) {
if (srp_addr_equal (&set1[j], &set2[i])) {
found = 1;
break;
}
}
if (found) {
srp_addr_copy (&and[*and_entries], &set1[j]);
*and_entries = *and_entries + 1;
}
found = 0;
}
return;
}
#ifdef CODE_COVERAGE
static void memb_set_print (
char *string,
struct srp_addr *list,
int list_entries)
{
int i;
int j;
printf ("List '%s' contains %d entries:\n", string, list_entries);
for (i = 0; i < list_entries; i++) {
for (j = 0; j < INTERFACE_MAX; j++) {
printf ("Address %d\n", i);
printf ("\tiface %d %s\n", j, totemip_print (&list[i].addr[j]));
printf ("family %d\n", list[i].addr[j].family);
}
}
}
#endif
static void reset_token_retransmit_timeout (struct totemsrp_instance *instance)
{
poll_timer_delete (instance->totemsrp_poll_handle,
instance->timer_orf_token_retransmit_timeout);
poll_timer_add (instance->totemsrp_poll_handle,
instance->totem_config->token_retransmit_timeout,
(void *)instance,
timer_function_token_retransmit_timeout,
&instance->timer_orf_token_retransmit_timeout);
}
static void start_merge_detect_timeout (struct totemsrp_instance *instance)
{
if (instance->my_merge_detect_timeout_outstanding == 0) {
poll_timer_add (instance->totemsrp_poll_handle,
instance->totem_config->merge_timeout,
(void *)instance,
timer_function_merge_detect_timeout,
&instance->timer_merge_detect_timeout);
instance->my_merge_detect_timeout_outstanding = 1;
}
}
static void cancel_merge_detect_timeout (struct totemsrp_instance *instance)
{
poll_timer_delete (instance->totemsrp_poll_handle, instance->timer_merge_detect_timeout);
instance->my_merge_detect_timeout_outstanding = 0;
}
/*
* ring_state_* is used to save and restore the sort queue
* state when a recovery operation fails (and enters gather)
*/
static void old_ring_state_save (struct totemsrp_instance *instance)
{
if (instance->old_ring_state_saved == 0) {
instance->old_ring_state_saved = 1;
instance->old_ring_state_aru = instance->my_aru;
instance->old_ring_state_high_seq_received = instance->my_high_seq_received;
log_printf (instance->totemsrp_log_level_notice,
"Saving state aru %x high seq received %x\n",
instance->my_aru, instance->my_high_seq_received);
}
}
static void ring_save (struct totemsrp_instance *instance)
{
if (instance->ring_saved == 0) {
instance->ring_saved = 1;
memcpy (&instance->my_old_ring_id, &instance->my_ring_id,
sizeof (struct memb_ring_id));
}
}
static void ring_reset (struct totemsrp_instance *instance)
{
instance->ring_saved = 0;
}
static void ring_state_restore (struct totemsrp_instance *instance)
{
if (instance->old_ring_state_saved) {
totemip_zero_set(&instance->my_ring_id.rep);
instance->my_aru = instance->old_ring_state_aru;
instance->my_high_seq_received = instance->old_ring_state_high_seq_received;
log_printf (instance->totemsrp_log_level_notice,
"Restoring instance->my_aru %x my high seq received %x\n",
instance->my_aru, instance->my_high_seq_received);
}
}
static void old_ring_state_reset (struct totemsrp_instance *instance)
{
instance->old_ring_state_saved = 0;
}
static void reset_token_timeout (struct totemsrp_instance *instance) {
poll_timer_delete (instance->totemsrp_poll_handle, instance->timer_orf_token_timeout);
poll_timer_add (instance->totemsrp_poll_handle,
instance->totem_config->token_timeout,
(void *)instance,
timer_function_orf_token_timeout,
&instance->timer_orf_token_timeout);
}
static void reset_heartbeat_timeout (struct totemsrp_instance *instance) {
poll_timer_delete (instance->totemsrp_poll_handle, instance->timer_heartbeat_timeout);
poll_timer_add (instance->totemsrp_poll_handle,
instance->heartbeat_timeout,
(void *)instance,
timer_function_heartbeat_timeout,
&instance->timer_heartbeat_timeout);
}
static void cancel_token_timeout (struct totemsrp_instance *instance) {
poll_timer_delete (instance->totemsrp_poll_handle, instance->timer_orf_token_timeout);
}
static void cancel_heartbeat_timeout (struct totemsrp_instance *instance) {
poll_timer_delete (instance->totemsrp_poll_handle, instance->timer_heartbeat_timeout);
}
static void cancel_token_retransmit_timeout (struct totemsrp_instance *instance)
{
poll_timer_delete (instance->totemsrp_poll_handle, instance->timer_orf_token_retransmit_timeout);
}
static void start_token_hold_retransmit_timeout (struct totemsrp_instance *instance)
{
poll_timer_add (instance->totemsrp_poll_handle,
instance->totem_config->token_hold_timeout,
(void *)instance,
timer_function_token_hold_retransmit_timeout,
&instance->timer_orf_token_hold_retransmit_timeout);
}
static void cancel_token_hold_retransmit_timeout (struct totemsrp_instance *instance)
{
poll_timer_delete (instance->totemsrp_poll_handle,
instance->timer_orf_token_hold_retransmit_timeout);
}
static void memb_state_consensus_timeout_expired (
struct totemsrp_instance *instance)
{
struct srp_addr no_consensus_list[PROCESSOR_COUNT_MAX];
int no_consensus_list_entries;
if (memb_consensus_agreed (instance)) {
memb_consensus_reset (instance);
memb_consensus_set (instance, &instance->my_id);
reset_token_timeout (instance); // REVIEWED
} else {
memb_consensus_notset (
instance,
no_consensus_list,
&no_consensus_list_entries,
instance->my_proc_list,
instance->my_proc_list_entries);
memb_set_merge (no_consensus_list, no_consensus_list_entries,
instance->my_failed_list, &instance->my_failed_list_entries);
memb_state_gather_enter (instance, 0);
}
}
static void memb_join_message_send (struct totemsrp_instance *instance);
static void memb_merge_detect_transmit (struct totemsrp_instance *instance);
/*
* Timers used for various states of the membership algorithm
*/
static void timer_function_orf_token_timeout (void *data)
{
struct totemsrp_instance *instance = data;
switch (instance->memb_state) {
case MEMB_STATE_OPERATIONAL:
log_printf (instance->totemsrp_log_level_notice,
"The token was lost in the OPERATIONAL state.\n");
totemrrp_iface_check (instance->totemrrp_handle);
memb_state_gather_enter (instance, 2);
break;
case MEMB_STATE_GATHER:
log_printf (instance->totemsrp_log_level_notice,
"The consensus timeout expired.\n");
memb_state_consensus_timeout_expired (instance);
memb_state_gather_enter (instance, 3);
break;
case MEMB_STATE_COMMIT:
log_printf (instance->totemsrp_log_level_notice,
"The token was lost in the COMMIT state.\n");
memb_state_gather_enter (instance, 4);
break;
case MEMB_STATE_RECOVERY:
log_printf (instance->totemsrp_log_level_notice,
"The token was lost in the RECOVERY state.\n");
ring_state_restore (instance);
memb_state_gather_enter (instance, 5);
break;
}
}
static void timer_function_heartbeat_timeout (void *data)
{
struct totemsrp_instance *instance = data;
log_printf (instance->totemsrp_log_level_notice,
"HeartBeat Timer expired Invoking token loss mechanism in state %d \n", instance->memb_state);
timer_function_orf_token_timeout(data);
}
static void memb_timer_function_state_gather (void *data)
{
struct totemsrp_instance *instance = data;
switch (instance->memb_state) {
case MEMB_STATE_OPERATIONAL:
case MEMB_STATE_RECOVERY:
assert (0); /* this should never happen */
break;
case MEMB_STATE_GATHER:
case MEMB_STATE_COMMIT:
memb_join_message_send (instance);
/*
* Restart the join timeout
`*/
poll_timer_delete (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
poll_timer_add (instance->totemsrp_poll_handle,
instance->totem_config->join_timeout,
(void *)instance,
memb_timer_function_state_gather,
&instance->memb_timer_state_gather_join_timeout);
break;
}
}
static void memb_timer_function_gather_consensus_timeout (void *data)
{
struct totemsrp_instance *instance = data;
memb_state_consensus_timeout_expired (instance);
}
static void deliver_messages_from_recovery_to_regular (struct totemsrp_instance *instance)
{
unsigned int i;
struct sort_queue_item *recovery_message_item;
struct sort_queue_item regular_message_item;
unsigned int range = 0;
int res;
void *ptr;
struct mcast *mcast;
log_printf (instance->totemsrp_log_level_debug,
"recovery to regular %x-%x\n", SEQNO_START_MSG + 1, instance->my_aru);
range = instance->my_aru - SEQNO_START_MSG;
/*
* Move messages from recovery to regular sort queue
*/
// todo should i be initialized to 0 or 1 ?
for (i = 1; i <= range; i++) {
res = sq_item_get (&instance->recovery_sort_queue,
i + SEQNO_START_MSG, &ptr);
if (res != 0) {
continue;
}
recovery_message_item = ptr;
/*
* Convert recovery message into regular message
*/
mcast = recovery_message_item->mcast;
if (mcast->header.encapsulated == MESSAGE_ENCAPSULATED) {
/*
* Message is a recovery message encapsulated
* in a new ring message
*/
regular_message_item.mcast =
(struct mcast *)(((char *)recovery_message_item->mcast) + sizeof (struct mcast));
regular_message_item.msg_len =
recovery_message_item->msg_len - sizeof (struct mcast);
mcast = regular_message_item.mcast;
} else {
/*
* TODO this case shouldn't happen
*/
continue;
}
log_printf (instance->totemsrp_log_level_debug,
"comparing if ring id is for this processors old ring seqno %d\n",
mcast->seq);
/*
* Only add this message to the regular sort
* queue if it was originated with the same ring
* id as the previous ring
*/
if (memcmp (&instance->my_old_ring_id, &mcast->ring_id,
sizeof (struct memb_ring_id)) == 0) {
regular_message_item.msg_len = recovery_message_item->msg_len;
res = sq_item_inuse (&instance->regular_sort_queue, mcast->seq);
if (res == 0) {
sq_item_add (&instance->regular_sort_queue,
&regular_message_item, mcast->seq);
if (sq_lt_compare (instance->old_ring_state_high_seq_received, mcast->seq)) {
instance->old_ring_state_high_seq_received = mcast->seq;
}
}
} else {
log_printf (instance->totemsrp_log_level_notice,
"-not adding msg with seq no %x\n", mcast->seq);
}
}
}
/*
* Change states in the state machine of the membership algorithm
*/
static void memb_state_operational_enter (struct totemsrp_instance *instance)
{
struct srp_addr joined_list[PROCESSOR_COUNT_MAX];
int joined_list_entries = 0;
unsigned int aru_save;
unsigned int joined_list_totemip[PROCESSOR_COUNT_MAX];
unsigned int trans_memb_list_totemip[PROCESSOR_COUNT_MAX];
unsigned int new_memb_list_totemip[PROCESSOR_COUNT_MAX];
unsigned int left_list[PROCESSOR_COUNT_MAX];
memb_consensus_reset (instance);
old_ring_state_reset (instance);
ring_reset (instance);
deliver_messages_from_recovery_to_regular (instance);
log_printf (instance->totemsrp_log_level_debug,
"Delivering to app %x to %x\n",
instance->my_high_delivered + 1, instance->old_ring_state_high_seq_received);
aru_save = instance->my_aru;
instance->my_aru = instance->old_ring_state_aru;
messages_deliver_to_app (instance, 0, instance->old_ring_state_high_seq_received);
/*
* Calculate joined and left list
*/
memb_set_subtract (instance->my_left_memb_list,
&instance->my_left_memb_entries,
instance->my_memb_list, instance->my_memb_entries,
instance->my_trans_memb_list, instance->my_trans_memb_entries);
memb_set_subtract (joined_list, &joined_list_entries,
instance->my_new_memb_list, instance->my_new_memb_entries,
instance->my_trans_memb_list, instance->my_trans_memb_entries);
/*
* Install new membership
*/
instance->my_memb_entries = instance->my_new_memb_entries;
memcpy (&instance->my_memb_list, instance->my_new_memb_list,
sizeof (struct srp_addr) * instance->my_memb_entries);
instance->last_released = 0;
instance->my_set_retrans_flg = 0;
/*
* Deliver transitional configuration to application
*/
srp_addr_to_nodeid (left_list, instance->my_left_memb_list,
instance->my_left_memb_entries);
srp_addr_to_nodeid (trans_memb_list_totemip,
instance->my_trans_memb_list, instance->my_trans_memb_entries);
instance->totemsrp_confchg_fn (TOTEM_CONFIGURATION_TRANSITIONAL,
trans_memb_list_totemip, instance->my_trans_memb_entries,
left_list, instance->my_left_memb_entries,
0, 0, &instance->my_ring_id);
// TODO we need to filter to ensure we only deliver those
// messages which are part of instance->my_deliver_memb
messages_deliver_to_app (instance, 1, instance->old_ring_state_high_seq_received);
instance->my_aru = aru_save;
/*
* Deliver regular configuration to application
*/
srp_addr_to_nodeid (new_memb_list_totemip,
instance->my_new_memb_list, instance->my_new_memb_entries);
srp_addr_to_nodeid (joined_list_totemip, joined_list,
joined_list_entries);
instance->totemsrp_confchg_fn (TOTEM_CONFIGURATION_REGULAR,
new_memb_list_totemip, instance->my_new_memb_entries,
0, 0,
joined_list_totemip, joined_list_entries, &instance->my_ring_id);
/*
* The recovery sort queue now becomes the regular
* sort queue. It is necessary to copy the state
* into the regular sort queue.
*/
sq_copy (&instance->regular_sort_queue, &instance->recovery_sort_queue);
instance->my_last_aru = SEQNO_START_MSG;
sq_items_release (&instance->regular_sort_queue, SEQNO_START_MSG - 1);
/* When making my_proc_list smaller, ensure that the
* now non-used entries are zero-ed out. There are some suspect
* assert's that assume that there is always 2 entries in the list.
* These fail when my_proc_list is reduced to 1 entry (and the
* valid [0] entry is the same as the 'unused' [1] entry).
*/
memset(instance->my_proc_list, 0,
sizeof (struct srp_addr) * instance->my_proc_list_entries);
instance->my_proc_list_entries = instance->my_new_memb_entries;
memcpy (instance->my_proc_list, instance->my_new_memb_list,
sizeof (struct srp_addr) * instance->my_memb_entries);
instance->my_failed_list_entries = 0;
instance->my_high_delivered = instance->my_aru;
// TODO the recovery messages are leaked
log_printf (instance->totemsrp_log_level_notice,
"entering OPERATIONAL state.\n");
instance->memb_state = MEMB_STATE_OPERATIONAL;
instance->my_received_flg = 1;
return;
}
static void memb_state_gather_enter (
struct totemsrp_instance *instance,
int gather_from)
{
memb_set_merge (
&instance->my_id, 1,
instance->my_proc_list, &instance->my_proc_list_entries);
assert (srp_addr_equal (&instance->my_proc_list[0], &instance->my_proc_list[1]) == 0);
memb_join_message_send (instance);
/*
* Restart the join timeout
*/
poll_timer_delete (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
poll_timer_add (instance->totemsrp_poll_handle,
instance->totem_config->join_timeout,
(void *)instance,
memb_timer_function_state_gather,
&instance->memb_timer_state_gather_join_timeout);
/*
* Restart the consensus timeout
*/
poll_timer_delete (instance->totemsrp_poll_handle,
instance->memb_timer_state_gather_consensus_timeout);
poll_timer_add (instance->totemsrp_poll_handle,
instance->totem_config->consensus_timeout,
(void *)instance,
memb_timer_function_gather_consensus_timeout,
&instance->memb_timer_state_gather_consensus_timeout);
/*
* Cancel the token loss and token retransmission timeouts
*/
cancel_token_retransmit_timeout (instance); // REVIEWED
cancel_token_timeout (instance); // REVIEWED
cancel_merge_detect_timeout (instance);
memb_consensus_reset (instance);
memb_consensus_set (instance, &instance->my_id);
log_printf (instance->totemsrp_log_level_notice,
"entering GATHER state from %d.\n", gather_from);
instance->memb_state = MEMB_STATE_GATHER;
return;
}
static void timer_function_token_retransmit_timeout (void *data);
static void memb_state_commit_enter (
struct totemsrp_instance *instance,
struct memb_commit_token *commit_token)
{
ring_save (instance);
old_ring_state_save (instance);
memb_state_commit_token_update (instance, commit_token);
memb_state_commit_token_target_set (instance, commit_token);
memb_ring_id_set_and_store (instance, &commit_token->ring_id);
memb_state_commit_token_send (instance, commit_token);
instance->token_ring_id_seq = instance->my_ring_id.seq;
poll_timer_delete (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
instance->memb_timer_state_gather_join_timeout = 0;
poll_timer_delete (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_consensus_timeout);
instance->memb_timer_state_gather_consensus_timeout = 0;
reset_token_timeout (instance); // REVIEWED
reset_token_retransmit_timeout (instance); // REVIEWED
log_printf (instance->totemsrp_log_level_notice,
"entering COMMIT state.\n");
instance->memb_state = MEMB_STATE_COMMIT;
/*
* reset all flow control variables since we are starting a new ring
*/
instance->my_trc = 0;
instance->my_pbl = 0;
instance->my_cbl = 0;
return;
}
static void memb_state_recovery_enter (
struct totemsrp_instance *instance,
struct memb_commit_token *commit_token)
{
int i;
int local_received_flg = 1;
unsigned int low_ring_aru;
unsigned int range = 0;
unsigned int messages_originated = 0;
char is_originated[4096];
char not_originated[4096];
char seqno_string_hex[10];
const struct srp_addr *addr;
struct memb_commit_token_memb_entry *memb_list;
addr = (const struct srp_addr *)commit_token->end_of_commit_token;
memb_list = (struct memb_commit_token_memb_entry *)(addr + commit_token->addr_entries);
log_printf (instance->totemsrp_log_level_notice,
"entering RECOVERY state.\n");
instance->my_high_ring_delivered = 0;
sq_reinit (&instance->recovery_sort_queue, SEQNO_START_MSG);
cs_queue_reinit (&instance->retrans_message_queue);
low_ring_aru = instance->old_ring_state_high_seq_received;
memb_state_commit_token_send (instance, commit_token);
instance->my_token_seq = SEQNO_START_TOKEN - 1;
/*
* Build regular configuration
*/
totemrrp_processor_count_set (
instance->totemrrp_handle,
commit_token->addr_entries);
/*
* Build transitional configuration
*/
memb_set_and (instance->my_new_memb_list, instance->my_new_memb_entries,
instance->my_memb_list, instance->my_memb_entries,
instance->my_trans_memb_list, &instance->my_trans_memb_entries);
for (i = 0; i < instance->my_new_memb_entries; i++) {
log_printf (instance->totemsrp_log_level_notice,
"position [%d] member %s:\n", i, totemip_print (&addr[i].addr[0]));
log_printf (instance->totemsrp_log_level_notice,
"previous ring seq %lld rep %s\n",
memb_list[i].ring_id.seq,
totemip_print (&memb_list[i].ring_id.rep));
log_printf (instance->totemsrp_log_level_notice,
"aru %x high delivered %x received flag %d\n",
memb_list[i].aru,
memb_list[i].high_delivered,
memb_list[i].received_flg);
// assert (totemip_print (&memb_list[i].ring_id.rep) != 0);
}
/*
* Determine if any received flag is false
*/
for (i = 0; i < commit_token->addr_entries; i++) {
if (memb_set_subset (&instance->my_new_memb_list[i], 1,
instance->my_trans_memb_list, instance->my_trans_memb_entries) &&
memb_list[i].received_flg == 0) {
instance->my_deliver_memb_entries = instance->my_trans_memb_entries;
memcpy (instance->my_deliver_memb_list, instance->my_trans_memb_list,
sizeof (struct srp_addr) * instance->my_trans_memb_entries);
local_received_flg = 0;
break;
}
}
if (local_received_flg == 1) {
goto no_originate;
} /* Else originate messages if we should */
/*
* Calculate my_low_ring_aru, instance->my_high_ring_delivered for the transitional membership
*/
for (i = 0; i < commit_token->addr_entries; i++) {
if (memb_set_subset (&instance->my_new_memb_list[i], 1,
instance->my_deliver_memb_list,
instance->my_deliver_memb_entries) &&
memcmp (&instance->my_old_ring_id,
&memb_list[i].ring_id,
sizeof (struct memb_ring_id)) == 0) {
if (sq_lt_compare (memb_list[i].aru, low_ring_aru)) {
low_ring_aru = memb_list[i].aru;
}
if (sq_lt_compare (instance->my_high_ring_delivered, memb_list[i].high_delivered)) {
instance->my_high_ring_delivered = memb_list[i].high_delivered;
}
}
}
/*
* Copy all old ring messages to instance->retrans_message_queue
*/
range = instance->old_ring_state_high_seq_received - low_ring_aru;
if (range == 0) {
/*
* No messages to copy
*/
goto no_originate;
}
assert (range < 1024);
log_printf (instance->totemsrp_log_level_notice,
"copying all old ring messages from %x-%x.\n",
low_ring_aru + 1, instance->old_ring_state_high_seq_received);
strcpy (not_originated, "Not Originated for recovery: ");
strcpy (is_originated, "Originated for recovery: ");
for (i = 1; i <= range; i++) {
struct sort_queue_item *sort_queue_item;
struct message_item message_item;
void *ptr;
int res;
sprintf (seqno_string_hex, "%x ", low_ring_aru + i);
res = sq_item_get (&instance->regular_sort_queue,
low_ring_aru + i, &ptr);
if (res != 0) {
strcat (not_originated, seqno_string_hex);
continue;
}
strcat (is_originated, seqno_string_hex);
sort_queue_item = ptr;
messages_originated++;
memset (&message_item, 0, sizeof (struct message_item));
// TODO LEAK
message_item.mcast = malloc (10000);
assert (message_item.mcast);
message_item.mcast->header.type = MESSAGE_TYPE_MCAST;
srp_addr_copy (&message_item.mcast->system_from, &instance->my_id);
message_item.mcast->header.encapsulated = MESSAGE_ENCAPSULATED;
message_item.mcast->header.nodeid = instance->my_id.addr[0].nodeid;
assert (message_item.mcast->header.nodeid);
message_item.mcast->header.endian_detector = ENDIAN_LOCAL;
memcpy (&message_item.mcast->ring_id, &instance->my_ring_id,
sizeof (struct memb_ring_id));
message_item.msg_len = sort_queue_item->msg_len + sizeof (struct mcast);
memcpy (((char *)message_item.mcast) + sizeof (struct mcast),
sort_queue_item->mcast,
sort_queue_item->msg_len);
cs_queue_item_add (&instance->retrans_message_queue, &message_item);
}
log_printf (instance->totemsrp_log_level_notice,
"Originated %d messages in RECOVERY.\n", messages_originated);
strcat (not_originated, "\n");
strcat (is_originated, "\n");
log_printf (instance->totemsrp_log_level_notice, "%s", is_originated);
log_printf (instance->totemsrp_log_level_notice, "%s", not_originated);
goto originated;
no_originate:
log_printf (instance->totemsrp_log_level_notice,
"Did not need to originate any messages in recovery.\n");
originated:
instance->my_aru = SEQNO_START_MSG;
instance->my_aru_count = 0;
instance->my_seq_unchanged = 0;
instance->my_high_seq_received = SEQNO_START_MSG;
instance->my_install_seq = SEQNO_START_MSG;
instance->last_released = SEQNO_START_MSG;
reset_token_timeout (instance); // REVIEWED
reset_token_retransmit_timeout (instance); // REVIEWED
instance->memb_state = MEMB_STATE_RECOVERY;
return;
}
int totemsrp_new_msg_signal (hdb_handle_t handle)
{
struct totemsrp_instance *instance;
unsigned int res;
res = hdb_handle_get (&totemsrp_instance_database, handle,
(void *)&instance);
if (res != 0) {
goto error_exit;
}
token_hold_cancel_send (instance);
hdb_handle_put (&totemsrp_instance_database, handle);
return (0);
error_exit:
return (-1);
}
int totemsrp_mcast (
hdb_handle_t handle,
struct iovec *iovec,
unsigned int iov_len,
int guarantee)
{
int i;
struct message_item message_item;
struct totemsrp_instance *instance;
char *addr;
unsigned int addr_idx;
unsigned int res;
res = hdb_handle_get (&totemsrp_instance_database, handle,
(void *)&instance);
if (res != 0) {
goto error_exit;
}
if (cs_queue_is_full (&instance->new_message_queue)) {
log_printf (instance->totemsrp_log_level_warning, "queue full\n");
return (-1);
}
memset (&message_item, 0, sizeof (struct message_item));
/*
* Allocate pending item
*/
message_item.mcast = malloc (10000);
if (message_item.mcast == 0) {
goto error_mcast;
}
/*
* Set mcast header
*/
message_item.mcast->header.type = MESSAGE_TYPE_MCAST;
message_item.mcast->header.endian_detector = ENDIAN_LOCAL;
message_item.mcast->header.encapsulated = MESSAGE_NOT_ENCAPSULATED;
message_item.mcast->header.nodeid = instance->my_id.addr[0].nodeid;
assert (message_item.mcast->header.nodeid);
message_item.mcast->guarantee = guarantee;
srp_addr_copy (&message_item.mcast->system_from, &instance->my_id);
addr = (char *)message_item.mcast;
addr_idx = sizeof (struct mcast);
for (i = 0; i < iov_len; i++) {
memcpy (&addr[addr_idx], iovec[i].iov_base, iovec[i].iov_len);
addr_idx += iovec[i].iov_len;
}
message_item.msg_len = addr_idx;
log_printf (instance->totemsrp_log_level_debug, "mcasted message added to pending queue\n");
cs_queue_item_add (&instance->new_message_queue, &message_item);
hdb_handle_put (&totemsrp_instance_database, handle);
return (0);
error_mcast:
hdb_handle_put (&totemsrp_instance_database, handle);
error_exit:
return (-1);
}
/*
* Determine if there is room to queue a new message
*/
int totemsrp_avail (hdb_handle_t handle)
{
int avail;
struct totemsrp_instance *instance;
unsigned int res;
res = hdb_handle_get (&totemsrp_instance_database, handle,
(void *)&instance);
if (res != 0) {
goto error_exit;
}
cs_queue_avail (&instance->new_message_queue, &avail);
hdb_handle_put (&totemsrp_instance_database, handle);
return (avail);
error_exit:
return (0);
}
/*
* ORF Token Management
*/
/*
* Recast message to mcast group if it is available
*/
static int orf_token_remcast (
struct totemsrp_instance *instance,
int seq)
{
struct sort_queue_item *sort_queue_item;
int res;
void *ptr;
struct sq *sort_queue;
if (instance->memb_state == MEMB_STATE_RECOVERY) {
sort_queue = &instance->recovery_sort_queue;
} else {
sort_queue = &instance->regular_sort_queue;
}
res = sq_in_range (sort_queue, seq);
if (res == 0) {
log_printf (instance->totemsrp_log_level_debug, "sq not in range\n");
return (-1);
}
/*
* Get RTR item at seq, if not available, return
*/
res = sq_item_get (sort_queue, seq, &ptr);
if (res != 0) {
return -1;
}
sort_queue_item = ptr;
totemrrp_mcast_noflush_send (
instance->totemrrp_handle,
sort_queue_item->mcast,
sort_queue_item->msg_len);
return (0);
}
/*
* Free all freeable messages from ring
*/
static void messages_free (
struct totemsrp_instance *instance,
unsigned int token_aru)
{
struct sort_queue_item *regular_message;
unsigned int i;
int res;
int log_release = 0;
unsigned int release_to;
unsigned int range = 0;
release_to = token_aru;
if (sq_lt_compare (instance->my_last_aru, release_to)) {
release_to = instance->my_last_aru;
}
if (sq_lt_compare (instance->my_high_delivered, release_to)) {
release_to = instance->my_high_delivered;
}
/*
* Ensure we dont try release before an already released point
*/
if (sq_lt_compare (release_to, instance->last_released)) {
return;
}
range = release_to - instance->last_released;
assert (range < 1024);
/*
* Release retransmit list items if group aru indicates they are transmitted
*/
for (i = 1; i <= range; i++) {
void *ptr;
res = sq_item_get (&instance->regular_sort_queue,
instance->last_released + i, &ptr);
if (res == 0) {
regular_message = ptr;
free (regular_message->mcast);
}
sq_items_release (&instance->regular_sort_queue,
instance->last_released + i);
log_release = 1;
}
instance->last_released += range;
if (log_release) {
log_printf (instance->totemsrp_log_level_debug,
"releasing messages up to and including %x\n", release_to);
}
}
static void update_aru (
struct totemsrp_instance *instance)
{
unsigned int i;
int res;
struct sq *sort_queue;
unsigned int range;
unsigned int my_aru_saved = 0;
if (instance->memb_state == MEMB_STATE_RECOVERY) {
sort_queue = &instance->recovery_sort_queue;
} else {
sort_queue = &instance->regular_sort_queue;
}
range = instance->my_high_seq_received - instance->my_aru;
if (range > 1024) {
return;
}
my_aru_saved = instance->my_aru;
for (i = 1; i <= range; i++) {
void *ptr;
res = sq_item_get (sort_queue, my_aru_saved + i, &ptr);
/*
* If hole, stop updating aru
*/
if (res != 0) {
break;
}
}
instance->my_aru += i - 1;
}
/*
* Multicasts pending messages onto the ring (requires orf_token possession)
*/
static int orf_token_mcast (
struct totemsrp_instance *instance,
struct orf_token *token,
int fcc_mcasts_allowed)
{
struct message_item *message_item = 0;
struct cs_queue *mcast_queue;
struct sq *sort_queue;
struct sort_queue_item sort_queue_item;
struct sort_queue_item *sort_queue_item_ptr;
struct mcast *mcast;
unsigned int fcc_mcast_current;
if (instance->memb_state == MEMB_STATE_RECOVERY) {
mcast_queue = &instance->retrans_message_queue;
sort_queue = &instance->recovery_sort_queue;
reset_token_retransmit_timeout (instance); // REVIEWED
} else {
mcast_queue = &instance->new_message_queue;
sort_queue = &instance->regular_sort_queue;
}
for (fcc_mcast_current = 0; fcc_mcast_current < fcc_mcasts_allowed; fcc_mcast_current++) {
if (cs_queue_is_empty (mcast_queue)) {
break;
}
message_item = (struct message_item *)cs_queue_item_get (mcast_queue);
/* preincrement required by algo */
if (instance->old_ring_state_saved &&
(instance->memb_state == MEMB_STATE_GATHER ||
instance->memb_state == MEMB_STATE_COMMIT)) {
log_printf (instance->totemsrp_log_level_debug,
"not multicasting at seqno is %d\n",
token->seq);
return (0);
}
message_item->mcast->seq = ++token->seq;
message_item->mcast->this_seqno = instance->global_seqno++;
/*
* Build IO vector
*/
memset (&sort_queue_item, 0, sizeof (struct sort_queue_item));
sort_queue_item.mcast = message_item->mcast;
sort_queue_item.msg_len = message_item->msg_len;
mcast = sort_queue_item.mcast;
memcpy (&mcast->ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
/*
* Add message to retransmit queue
*/
sort_queue_item_ptr = sq_item_add (sort_queue,
&sort_queue_item, message_item->mcast->seq);
totemrrp_mcast_noflush_send (
instance->totemrrp_handle,
message_item->mcast,
message_item->msg_len);
/*
* Delete item from pending queue
*/
cs_queue_item_remove (mcast_queue);
/*
* If messages mcasted, deliver any new messages to totempg
*/
instance->my_high_seq_received = token->seq;
}
update_aru (instance);
/*
* Return 1 if more messages are available for single node clusters
*/
return (fcc_mcast_current);
}
/*
* Remulticasts messages in orf_token's retransmit list (requires orf_token)
* Modify's orf_token's rtr to include retransmits required by this process
*/
static int orf_token_rtr (
struct totemsrp_instance *instance,
struct orf_token *orf_token,
unsigned int *fcc_allowed)
{
unsigned int res;
unsigned int i, j;
unsigned int found;
unsigned int total_entries;
struct sq *sort_queue;
struct rtr_item *rtr_list;
unsigned int range = 0;
char retransmit_msg[1024];
char value[64];
if (instance->memb_state == MEMB_STATE_RECOVERY) {
sort_queue = &instance->recovery_sort_queue;
} else {
sort_queue = &instance->regular_sort_queue;
}
rtr_list = &orf_token->rtr_list[0];
strcpy (retransmit_msg, "Retransmit List: ");
if (orf_token->rtr_list_entries) {
log_printf (instance->totemsrp_log_level_debug,
"Retransmit List %d\n", orf_token->rtr_list_entries);
for (i = 0; i < orf_token->rtr_list_entries; i++) {
sprintf (value, "%x ", rtr_list[i].seq);
strcat (retransmit_msg, value);
}
strcat (retransmit_msg, "\n");
log_printf (instance->totemsrp_log_level_notice,
"%s", retransmit_msg);
}
total_entries = orf_token->rtr_list_entries;
/*
* Retransmit messages on orf_token's RTR list from RTR queue
*/
for (instance->fcc_remcast_current = 0, i = 0;
instance->fcc_remcast_current < *fcc_allowed && i < orf_token->rtr_list_entries;) {
/*
* If this retransmit request isn't from this configuration,
* try next rtr entry
*/
if (memcmp (&rtr_list[i].ring_id, &instance->my_ring_id,
sizeof (struct memb_ring_id)) != 0) {
i += 1;
continue;
}
res = orf_token_remcast (instance, rtr_list[i].seq);
if (res == 0) {
/*
* Multicasted message, so no need to copy to new retransmit list
*/
orf_token->rtr_list_entries -= 1;
assert (orf_token->rtr_list_entries >= 0);
memmove (&rtr_list[i], &rtr_list[i + 1],
sizeof (struct rtr_item) * (orf_token->rtr_list_entries));
instance->fcc_remcast_current++;
} else {
i += 1;
}
}
*fcc_allowed = *fcc_allowed - instance->fcc_remcast_current;
/*
* Add messages to retransmit to RTR list
* but only retry if there is room in the retransmit list
*/
range = instance->my_high_seq_received - instance->my_aru;
assert (range < 100000);
for (i = 1; (orf_token->rtr_list_entries < RETRANSMIT_ENTRIES_MAX) &&
(i <= range); i++) {
/*
* Ensure message is within the sort queue range
*/
res = sq_in_range (sort_queue, instance->my_aru + i);
if (res == 0) {
break;
}
/*
* Find if a message is missing from this processor
*/
res = sq_item_inuse (sort_queue, instance->my_aru + i);
if (res == 0) {
/*
* Determine if missing message is already in retransmit list
*/
found = 0;
for (j = 0; j < orf_token->rtr_list_entries; j++) {
if (instance->my_aru + i == rtr_list[j].seq) {
found = 1;
}
}
if (found == 0) {
/*
* Missing message not found in current retransmit list so add it
*/
memcpy (&rtr_list[orf_token->rtr_list_entries].ring_id,
&instance->my_ring_id, sizeof (struct memb_ring_id));
rtr_list[orf_token->rtr_list_entries].seq = instance->my_aru + i;
orf_token->rtr_list_entries++;
}
}
}
return (instance->fcc_remcast_current);
}
static void token_retransmit (struct totemsrp_instance *instance)
{
totemrrp_token_send (instance->totemrrp_handle,
instance->orf_token_retransmit,
instance->orf_token_retransmit_size);
}
/*
* Retransmit the regular token if no mcast or token has
* been received in retransmit token period retransmit
* the token to the next processor
*/
static void timer_function_token_retransmit_timeout (void *data)
{
struct totemsrp_instance *instance = data;
switch (instance->memb_state) {
case MEMB_STATE_GATHER:
break;
case MEMB_STATE_COMMIT:
case MEMB_STATE_OPERATIONAL:
case MEMB_STATE_RECOVERY:
token_retransmit (instance);
reset_token_retransmit_timeout (instance); // REVIEWED
break;
}
}
static void timer_function_token_hold_retransmit_timeout (void *data)
{
struct totemsrp_instance *instance = data;
switch (instance->memb_state) {
case MEMB_STATE_GATHER:
break;
case MEMB_STATE_COMMIT:
break;
case MEMB_STATE_OPERATIONAL:
case MEMB_STATE_RECOVERY:
token_retransmit (instance);
break;
}
}
static void timer_function_merge_detect_timeout(void *data)
{
struct totemsrp_instance *instance = data;
instance->my_merge_detect_timeout_outstanding = 0;
switch (instance->memb_state) {
case MEMB_STATE_OPERATIONAL:
if (totemip_equal(&instance->my_ring_id.rep, &instance->my_id.addr[0])) {
memb_merge_detect_transmit (instance);
}
break;
case MEMB_STATE_GATHER:
case MEMB_STATE_COMMIT:
case MEMB_STATE_RECOVERY:
break;
}
}
/*
* Send orf_token to next member (requires orf_token)
*/
static int token_send (
struct totemsrp_instance *instance,
struct orf_token *orf_token,
int forward_token)
{
int res = 0;
unsigned int orf_token_size;
orf_token_size = sizeof (struct orf_token) +
(orf_token->rtr_list_entries * sizeof (struct rtr_item));
memcpy (instance->orf_token_retransmit, orf_token, orf_token_size);
instance->orf_token_retransmit_size = orf_token_size;
orf_token->header.nodeid = instance->my_id.addr[0].nodeid;
assert (orf_token->header.nodeid);
if (forward_token == 0) {
return (0);
}
totemrrp_token_send (instance->totemrrp_handle,
orf_token,
orf_token_size);
return (res);
}
static int token_hold_cancel_send (struct totemsrp_instance *instance)
{
struct token_hold_cancel token_hold_cancel;
/*
* Only cancel if the token is currently held
*/
if (instance->my_token_held == 0) {
return (0);
}
instance->my_token_held = 0;
/*
* Build message
*/
token_hold_cancel.header.type = MESSAGE_TYPE_TOKEN_HOLD_CANCEL;
token_hold_cancel.header.endian_detector = ENDIAN_LOCAL;
token_hold_cancel.header.nodeid = instance->my_id.addr[0].nodeid;
memcpy (&token_hold_cancel.ring_id, &instance->my_ring_id,
sizeof (struct memb_ring_id));
assert (token_hold_cancel.header.nodeid);
totemrrp_mcast_flush_send (instance->totemrrp_handle, &token_hold_cancel,
sizeof (struct token_hold_cancel));
return (0);
}
static int orf_token_send_initial (struct totemsrp_instance *instance)
{
struct orf_token orf_token;
int res;
orf_token.header.type = MESSAGE_TYPE_ORF_TOKEN;
orf_token.header.endian_detector = ENDIAN_LOCAL;
orf_token.header.encapsulated = 0;
orf_token.header.nodeid = instance->my_id.addr[0].nodeid;
assert (orf_token.header.nodeid);
orf_token.seq = SEQNO_START_MSG;
orf_token.token_seq = SEQNO_START_TOKEN;
orf_token.retrans_flg = 1;
instance->my_set_retrans_flg = 1;
if (cs_queue_is_empty (&instance->retrans_message_queue) == 1) {
orf_token.retrans_flg = 0;
instance->my_set_retrans_flg = 0;
} else {
orf_token.retrans_flg = 1;
instance->my_set_retrans_flg = 1;
}
orf_token.aru = 0;
orf_token.aru = SEQNO_START_MSG - 1;
orf_token.aru_addr = instance->my_id.addr[0].nodeid;
memcpy (&orf_token.ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
orf_token.fcc = 0;
orf_token.backlog = 0;
orf_token.rtr_list_entries = 0;
res = token_send (instance, &orf_token, 1);
return (res);
}
static void memb_state_commit_token_update (
struct totemsrp_instance *instance,
struct memb_commit_token *commit_token)
{
struct srp_addr *addr;
struct memb_commit_token_memb_entry *memb_list;
unsigned int high_aru;
unsigned int i;
addr = (struct srp_addr *)commit_token->end_of_commit_token;
memb_list = (struct memb_commit_token_memb_entry *)(addr + commit_token->addr_entries);
memcpy (instance->my_new_memb_list, addr,
sizeof (struct srp_addr) * commit_token->addr_entries);
instance->my_new_memb_entries = commit_token->addr_entries;
memcpy (&memb_list[commit_token->memb_index].ring_id,
&instance->my_old_ring_id, sizeof (struct memb_ring_id));
assert (!totemip_zero_check(&instance->my_old_ring_id.rep));
memb_list[commit_token->memb_index].aru = instance->old_ring_state_aru;
/*
* TODO high delivered is really instance->my_aru, but with safe this
* could change?
*/
instance->my_received_flg =
(instance->my_aru == instance->my_high_seq_received);
memb_list[commit_token->memb_index].received_flg = instance->my_received_flg;
memb_list[commit_token->memb_index].high_delivered = instance->my_high_delivered;
/*
* find high aru up to current memb_index for all matching ring ids
* if any ring id matching memb_index has aru less then high aru set
* received flag for that entry to false
*/
high_aru = memb_list[commit_token->memb_index].aru;
for (i = 0; i <= commit_token->memb_index; i++) {
if (memcmp (&memb_list[commit_token->memb_index].ring_id,
&memb_list[i].ring_id,
sizeof (struct memb_ring_id)) == 0) {
if (sq_lt_compare (high_aru, memb_list[i].aru)) {
high_aru = memb_list[i].aru;
}
}
}
for (i = 0; i <= commit_token->memb_index; i++) {
if (memcmp (&memb_list[commit_token->memb_index].ring_id,
&memb_list[i].ring_id,
sizeof (struct memb_ring_id)) == 0) {
if (sq_lt_compare (memb_list[i].aru, high_aru)) {
memb_list[i].received_flg = 0;
if (i == commit_token->memb_index) {
instance->my_received_flg = 0;
}
}
}
}
commit_token->header.nodeid = instance->my_id.addr[0].nodeid;
commit_token->memb_index += 1;
assert (commit_token->memb_index <= commit_token->addr_entries);
assert (commit_token->header.nodeid);
}
static void memb_state_commit_token_target_set (
struct totemsrp_instance *instance,
struct memb_commit_token *commit_token)
{
struct srp_addr *addr;
unsigned int i;
addr = (struct srp_addr *)commit_token->end_of_commit_token;
for (i = 0; i < instance->totem_config->interface_count; i++) {
totemrrp_token_target_set (
instance->totemrrp_handle,
&addr[commit_token->memb_index %
commit_token->addr_entries].addr[i],
i);
}
}
static int memb_state_commit_token_send (
struct totemsrp_instance *instance,
struct memb_commit_token *commit_token)
{
struct srp_addr *addr;
struct memb_commit_token_memb_entry *memb_list;
unsigned int commit_token_size;
addr = (struct srp_addr *)commit_token->end_of_commit_token;
memb_list = (struct memb_commit_token_memb_entry *)(addr + commit_token->addr_entries);
commit_token->token_seq++;
commit_token_size = sizeof (struct memb_commit_token) +
((sizeof (struct srp_addr) +
sizeof (struct memb_commit_token_memb_entry)) * commit_token->addr_entries);
/*
* Make a copy for retransmission if necessary
*/
memcpy (instance->orf_token_retransmit, commit_token, commit_token_size);
instance->orf_token_retransmit_size = commit_token_size;
totemrrp_token_send (instance->totemrrp_handle,
commit_token,
commit_token_size);
/*
* Request retransmission of the commit token in case it is lost
*/
reset_token_retransmit_timeout (instance);
return (0);
}
static int memb_lowest_in_config (struct totemsrp_instance *instance)
{
struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
int token_memb_entries = 0;
int i;
struct totem_ip_address *lowest_addr;
memb_set_subtract (token_memb, &token_memb_entries,
instance->my_proc_list, instance->my_proc_list_entries,
instance->my_failed_list, instance->my_failed_list_entries);
/*
* find representative by searching for smallest identifier
*/
lowest_addr = &token_memb[0].addr[0];
for (i = 1; i < token_memb_entries; i++) {
if (totemip_compare(lowest_addr, &token_memb[i].addr[0]) > 0) {
totemip_copy (lowest_addr, &token_memb[i].addr[0]);
}
}
return (totemip_compare (lowest_addr, &instance->my_id.addr[0]) == 0);
}
static int srp_addr_compare (const void *a, const void *b)
{
const struct srp_addr *srp_a = (const struct srp_addr *)a;
const struct srp_addr *srp_b = (const struct srp_addr *)b;
return (totemip_compare (&srp_a->addr[0], &srp_b->addr[0]));
}
static void memb_state_commit_token_create (
struct totemsrp_instance *instance,
struct memb_commit_token *commit_token)
{
struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
struct srp_addr *addr;
struct memb_commit_token_memb_entry *memb_list;
int token_memb_entries = 0;
log_printf (instance->totemsrp_log_level_notice,
"Creating commit token because I am the rep.\n");
memb_set_subtract (token_memb, &token_memb_entries,
instance->my_proc_list, instance->my_proc_list_entries,
instance->my_failed_list, instance->my_failed_list_entries);
memset (commit_token, 0, sizeof (struct memb_commit_token));
commit_token->header.type = MESSAGE_TYPE_MEMB_COMMIT_TOKEN;
commit_token->header.endian_detector = ENDIAN_LOCAL;
commit_token->header.encapsulated = 0;
commit_token->header.nodeid = instance->my_id.addr[0].nodeid;
assert (commit_token->header.nodeid);
totemip_copy(&commit_token->ring_id.rep, &instance->my_id.addr[0]);
commit_token->ring_id.seq = instance->token_ring_id_seq + 4;
/*
* This qsort is necessary to ensure the commit token traverses
* the ring in the proper order
*/
qsort (token_memb, token_memb_entries, sizeof (struct srp_addr),
srp_addr_compare);
commit_token->memb_index = 0;
commit_token->addr_entries = token_memb_entries;
addr = (struct srp_addr *)commit_token->end_of_commit_token;
memb_list = (struct memb_commit_token_memb_entry *)(addr + commit_token->addr_entries);
memcpy (addr, token_memb,
token_memb_entries * sizeof (struct srp_addr));
memset (memb_list, 0,
sizeof (struct memb_commit_token_memb_entry) * token_memb_entries);
}
static void memb_join_message_send (struct totemsrp_instance *instance)
{
char memb_join_data[10000];
struct memb_join *memb_join = (struct memb_join *)memb_join_data;
char *addr;
unsigned int addr_idx;
memb_join->header.type = MESSAGE_TYPE_MEMB_JOIN;
memb_join->header.endian_detector = ENDIAN_LOCAL;
memb_join->header.encapsulated = 0;
memb_join->header.nodeid = instance->my_id.addr[0].nodeid;
assert (memb_join->header.nodeid);
assert (srp_addr_equal (&instance->my_proc_list[0], &instance->my_proc_list[1]) == 0);
memb_join->ring_seq = instance->my_ring_id.seq;
memb_join->proc_list_entries = instance->my_proc_list_entries;
memb_join->failed_list_entries = instance->my_failed_list_entries;
srp_addr_copy (&memb_join->system_from, &instance->my_id);
/*
* This mess adds the joined and failed processor lists into the join
* message
*/
addr = (char *)memb_join;
addr_idx = sizeof (struct memb_join);
memcpy (&addr[addr_idx],
instance->my_proc_list,
instance->my_proc_list_entries *
sizeof (struct srp_addr));
addr_idx +=
instance->my_proc_list_entries *
sizeof (struct srp_addr);
memcpy (&addr[addr_idx],
instance->my_failed_list,
instance->my_failed_list_entries *
sizeof (struct srp_addr));
addr_idx +=
instance->my_failed_list_entries *
sizeof (struct srp_addr);
if (instance->totem_config->send_join_timeout) {
usleep (random() % (instance->totem_config->send_join_timeout * 1000));
}
totemrrp_mcast_flush_send (
instance->totemrrp_handle,
memb_join,
addr_idx);
}
static void memb_leave_message_send (struct totemsrp_instance *instance)
{
char memb_join_data[10000];
struct memb_join *memb_join = (struct memb_join *)memb_join_data;
char *addr;
unsigned int addr_idx;
int active_memb_entries;
struct srp_addr active_memb[PROCESSOR_COUNT_MAX];
log_printf (instance->totemsrp_log_level_debug,
"sending join/leave message\n");
/*
* add us to the failed list, and remove us from
* the members list
*/
memb_set_merge(
&instance->my_id, 1,
instance->my_failed_list, &instance->my_failed_list_entries);
memb_set_subtract (active_memb, &active_memb_entries,
instance->my_proc_list, instance->my_proc_list_entries,
&instance->my_id, 1);
memb_join->header.type = MESSAGE_TYPE_MEMB_JOIN;
memb_join->header.endian_detector = ENDIAN_LOCAL;
memb_join->header.encapsulated = 0;
memb_join->header.nodeid = LEAVE_DUMMY_NODEID;
memb_join->ring_seq = instance->my_ring_id.seq;
memb_join->proc_list_entries = active_memb_entries;
memb_join->failed_list_entries = instance->my_failed_list_entries;
srp_addr_copy (&memb_join->system_from, &instance->my_id);
memb_join->system_from.addr[0].nodeid = LEAVE_DUMMY_NODEID;
// TODO: CC Maybe use the actual join send routine.
/*
* This mess adds the joined and failed processor lists into the join
* message
*/
addr = (char *)memb_join;
addr_idx = sizeof (struct memb_join);
memcpy (&addr[addr_idx],
active_memb,
active_memb_entries *
sizeof (struct srp_addr));
addr_idx +=
active_memb_entries *
sizeof (struct srp_addr);
memcpy (&addr[addr_idx],
instance->my_failed_list,
instance->my_failed_list_entries *
sizeof (struct srp_addr));
addr_idx +=
instance->my_failed_list_entries *
sizeof (struct srp_addr);
if (instance->totem_config->send_join_timeout) {
usleep (random() % (instance->totem_config->send_join_timeout * 1000));
}
totemrrp_mcast_flush_send (
instance->totemrrp_handle,
memb_join,
addr_idx);
}
static void memb_merge_detect_transmit (struct totemsrp_instance *instance)
{
struct memb_merge_detect memb_merge_detect;
memb_merge_detect.header.type = MESSAGE_TYPE_MEMB_MERGE_DETECT;
memb_merge_detect.header.endian_detector = ENDIAN_LOCAL;
memb_merge_detect.header.encapsulated = 0;
memb_merge_detect.header.nodeid = instance->my_id.addr[0].nodeid;
srp_addr_copy (&memb_merge_detect.system_from, &instance->my_id);
memcpy (&memb_merge_detect.ring_id, &instance->my_ring_id,
sizeof (struct memb_ring_id));
assert (memb_merge_detect.header.nodeid);
totemrrp_mcast_flush_send (instance->totemrrp_handle,
&memb_merge_detect,
sizeof (struct memb_merge_detect));
}
static void memb_ring_id_create_or_load (
struct totemsrp_instance *instance,
struct memb_ring_id *memb_ring_id)
{
int fd;
int res;
char filename[256];
snprintf (filename, sizeof(filename), "%s/ringid_%s",
rundir, totemip_print (&instance->my_id.addr[0]));
fd = open (filename, O_RDONLY, 0700);
if (fd > 0) {
res = read (fd, &memb_ring_id->seq, sizeof (unsigned long long));
assert (res == sizeof (unsigned long long));
close (fd);
} else
if (fd == -1 && errno == ENOENT) {
memb_ring_id->seq = 0;
umask(0);
fd = open (filename, O_CREAT|O_RDWR, 0700);
if (fd == -1) {
log_printf (instance->totemsrp_log_level_warning,
"Couldn't create %s %s\n", filename, strerror (errno));
}
res = write (fd, &memb_ring_id->seq, sizeof (unsigned long long));
assert (res == sizeof (unsigned long long));
close (fd);
} else {
log_printf (instance->totemsrp_log_level_warning,
"Couldn't open %s %s\n", filename, strerror (errno));
}
totemip_copy(&memb_ring_id->rep, &instance->my_id.addr[0]);
assert (!totemip_zero_check(&memb_ring_id->rep));
instance->token_ring_id_seq = memb_ring_id->seq;
}
static void memb_ring_id_set_and_store (
struct totemsrp_instance *instance,
const struct memb_ring_id *ring_id)
{
char filename[256];
int fd;
int res;
memcpy (&instance->my_ring_id, ring_id, sizeof (struct memb_ring_id));
snprintf (filename, sizeof(filename), "%s/ringid_%s",
rundir, totemip_print (&instance->my_id.addr[0]));
fd = open (filename, O_WRONLY, 0777);
if (fd == -1) {
fd = open (filename, O_CREAT|O_RDWR, 0777);
}
if (fd == -1) {
log_printf (instance->totemsrp_log_level_warning,
"Couldn't store new ring id %llx to stable storage (%s)\n",
instance->my_ring_id.seq, strerror (errno));
assert (0);
return;
}
log_printf (instance->totemsrp_log_level_notice,
"Storing new sequence id for ring %llx\n", instance->my_ring_id.seq);
//assert (fd > 0);
res = write (fd, &instance->my_ring_id.seq, sizeof (unsigned long long));
assert (res == sizeof (unsigned long long));
close (fd);
}
int totemsrp_callback_token_create (
hdb_handle_t handle,
void **handle_out,
enum totem_callback_token_type type,
int delete,
int (*callback_fn) (enum totem_callback_token_type type, const void *),
const void *data)
{
struct token_callback_instance *callback_handle;
struct totemsrp_instance *instance;
unsigned int res;
res = hdb_handle_get (&totemsrp_instance_database, handle,
(void *)&instance);
if (res != 0) {
goto error_exit;
}
token_hold_cancel_send (instance);
callback_handle = malloc (sizeof (struct token_callback_instance));
if (callback_handle == 0) {
return (-1);
}
*handle_out = (void *)callback_handle;
list_init (&callback_handle->list);
callback_handle->callback_fn = callback_fn;
callback_handle->data = (void *) data;
callback_handle->callback_type = type;
callback_handle->delete = delete;
switch (type) {
case TOTEM_CALLBACK_TOKEN_RECEIVED:
list_add (&callback_handle->list, &instance->token_callback_received_listhead);
break;
case TOTEM_CALLBACK_TOKEN_SENT:
list_add (&callback_handle->list, &instance->token_callback_sent_listhead);
break;
}
hdb_handle_put (&totemsrp_instance_database, handle);
error_exit:
return (0);
}
void totemsrp_callback_token_destroy (hdb_handle_t handle, void **handle_out)
{
struct token_callback_instance *h;
if (*handle_out) {
h = (struct token_callback_instance *)*handle_out;
list_del (&h->list);
free (h);
h = NULL;
*handle_out = 0;
}
}
static void token_callbacks_execute (
struct totemsrp_instance *instance,
enum totem_callback_token_type type)
{
struct list_head *list;
struct list_head *list_next;
struct list_head *callback_listhead = 0;
struct token_callback_instance *token_callback_instance;
int res;
int del;
switch (type) {
case TOTEM_CALLBACK_TOKEN_RECEIVED:
callback_listhead = &instance->token_callback_received_listhead;
break;
case TOTEM_CALLBACK_TOKEN_SENT:
callback_listhead = &instance->token_callback_sent_listhead;
break;
default:
assert (0);
}
for (list = callback_listhead->next; list != callback_listhead;
list = list_next) {
token_callback_instance = list_entry (list, struct token_callback_instance, list);
list_next = list->next;
del = token_callback_instance->delete;
if (del == 1) {
list_del (list);
}
res = token_callback_instance->callback_fn (
token_callback_instance->callback_type,
token_callback_instance->data);
/*
* This callback failed to execute, try it again on the next token
*/
if (res == -1 && del == 1) {
list_add (list, callback_listhead);
} else if (del) {
free (token_callback_instance);
}
}
}
/*
* Flow control functions
*/
static unsigned int backlog_get (struct totemsrp_instance *instance)
{
unsigned int backlog = 0;
if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
backlog = cs_queue_used (&instance->new_message_queue);
} else
if (instance->memb_state == MEMB_STATE_RECOVERY) {
backlog = cs_queue_used (&instance->retrans_message_queue);
}
return (backlog);
}
static int fcc_calculate (
struct totemsrp_instance *instance,
struct orf_token *token)
{
unsigned int transmits_allowed;
unsigned int backlog_calc;
transmits_allowed = instance->totem_config->max_messages;
if (transmits_allowed > instance->totem_config->window_size - token->fcc) {
transmits_allowed = instance->totem_config->window_size - token->fcc;
}
instance->my_cbl = backlog_get (instance);
/*
* Only do backlog calculation if there is a backlog otherwise
* we would result in div by zero
*/
if (token->backlog + instance->my_cbl - instance->my_pbl) {
backlog_calc = (instance->totem_config->window_size * instance->my_pbl) /
(token->backlog + instance->my_cbl - instance->my_pbl);
if (backlog_calc > 0 && transmits_allowed > backlog_calc) {
transmits_allowed = backlog_calc;
}
}
return (transmits_allowed);
}
/*
* don't overflow the RTR sort queue
*/
static void fcc_rtr_limit (
struct totemsrp_instance *instance,
struct orf_token *token,
unsigned int *transmits_allowed)
{
assert ((QUEUE_RTR_ITEMS_SIZE_MAX - *transmits_allowed - instance->totem_config->window_size) >= 0);
if (sq_lt_compare (instance->last_released +
QUEUE_RTR_ITEMS_SIZE_MAX - *transmits_allowed -
instance->totem_config->window_size,
token->seq)) {
*transmits_allowed = 0;
}
}
static void fcc_token_update (
struct totemsrp_instance *instance,
struct orf_token *token,
unsigned int msgs_transmitted)
{
token->fcc += msgs_transmitted - instance->my_trc;
token->backlog += instance->my_cbl - instance->my_pbl;
assert (token->backlog >= 0);
instance->my_trc = msgs_transmitted;
instance->my_pbl = instance->my_cbl;
}
/*
* Message Handlers
*/
struct timeval tv_old;
/*
* message handler called when TOKEN message type received
*/
static int message_handler_orf_token (
struct totemsrp_instance *instance,
const void *msg,
size_t msg_len,
int endian_conversion_needed)
{
char token_storage[1500];
char token_convert[1500];
struct orf_token *token = NULL;
int forward_token;
unsigned int transmits_allowed;
unsigned int mcasted_retransmit;
unsigned int mcasted_regular;
unsigned int last_aru;
#ifdef GIVEINFO
struct timeval tv_current;
struct timeval tv_diff;
gettimeofday (&tv_current, NULL);
timersub (&tv_current, &tv_old, &tv_diff);
memcpy (&tv_old, &tv_current, sizeof (struct timeval));
log_printf (instance->totemsrp_log_level_notice,
"Time since last token %0.4f ms\n",
(((float)tv_diff.tv_sec) * 1000) + ((float)tv_diff.tv_usec)
/ 1000.0);
#endif
#ifdef TEST_DROP_ORF_TOKEN_PERCENTAGE
if (random()%100 < TEST_DROP_ORF_TOKEN_PERCENTAGE) {
return (0);
}
#endif
if (endian_conversion_needed) {
orf_token_endian_convert ((struct orf_token *)msg,
(struct orf_token *)token_convert);
msg = (struct orf_token *)token_convert;
}
/*
* Make copy of token and retransmit list in case we have
* to flush incoming messages from the kernel queue
*/
token = (struct orf_token *)token_storage;
memcpy (token, msg, sizeof (struct orf_token));
memcpy (&token->rtr_list[0], (char *)msg + sizeof (struct orf_token),
sizeof (struct rtr_item) * RETRANSMIT_ENTRIES_MAX);
/*
* Handle merge detection timeout
*/
if (token->seq == instance->my_last_seq) {
start_merge_detect_timeout (instance);
instance->my_seq_unchanged += 1;
} else {
cancel_merge_detect_timeout (instance);
cancel_token_hold_retransmit_timeout (instance);
instance->my_seq_unchanged = 0;
}
instance->my_last_seq = token->seq;
#ifdef TEST_RECOVERY_MSG_COUNT
if (instance->memb_state == MEMB_STATE_OPERATIONAL && token->seq > TEST_RECOVERY_MSG_COUNT) {
return (0);
}
#endif
totemrrp_recv_flush (instance->totemrrp_handle);
/*
* Determine if we should hold (in reality drop) the token
*/
instance->my_token_held = 0;
if (totemip_equal(&instance->my_ring_id.rep, &instance->my_id.addr[0]) &&
instance->my_seq_unchanged > instance->totem_config->seqno_unchanged_const) {
instance->my_token_held = 1;
} else
if (!totemip_equal(&instance->my_ring_id.rep, &instance->my_id.addr[0]) &&
instance->my_seq_unchanged >= instance->totem_config->seqno_unchanged_const) {
instance->my_token_held = 1;
}
/*
* Hold onto token when there is no activity on ring and
* this processor is the ring rep
*/
forward_token = 1;
if (totemip_equal(&instance->my_ring_id.rep, &instance->my_id.addr[0])) {
if (instance->my_token_held) {
forward_token = 0;
}
}
token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_RECEIVED);
switch (instance->memb_state) {
case MEMB_STATE_COMMIT:
/* Discard token */
break;
case MEMB_STATE_OPERATIONAL:
messages_free (instance, token->aru);
case MEMB_STATE_GATHER:
/*
* DO NOT add break, we use different free mechanism in recovery state
*/
case MEMB_STATE_RECOVERY:
last_aru = instance->my_last_aru;
instance->my_last_aru = token->aru;
/*
* Discard tokens from another configuration
*/
if (memcmp (&token->ring_id, &instance->my_ring_id,
sizeof (struct memb_ring_id)) != 0) {
if ((forward_token)
&& instance->use_heartbeat) {
reset_heartbeat_timeout(instance);
}
else {
cancel_heartbeat_timeout(instance);
}
return (0); /* discard token */
}
/*
* Discard retransmitted tokens
*/
if (sq_lte_compare (token->token_seq, instance->my_token_seq)) {
/*
* If this processor receives a retransmitted token, it is sure
* the previous processor is still alive. As a result, it can
* reset its token timeout. If some processor previous to that
* has failed, it will eventually not execute a reset of the
* token timeout, and will cause a reconfiguration to occur.
*/
reset_token_timeout (instance);
if ((forward_token)
&& instance->use_heartbeat) {
reset_heartbeat_timeout(instance);
}
else {
cancel_heartbeat_timeout(instance);
}
return (0); /* discard token */
}
transmits_allowed = fcc_calculate (instance, token);
mcasted_retransmit = orf_token_rtr (instance, token, &transmits_allowed);
fcc_rtr_limit (instance, token, &transmits_allowed);
mcasted_regular = orf_token_mcast (instance, token, transmits_allowed);
fcc_token_update (instance, token, mcasted_retransmit +
mcasted_regular);
if (sq_lt_compare (instance->my_aru, token->aru) ||
instance->my_id.addr[0].nodeid == token->aru_addr ||
token->aru_addr == 0) {
token->aru = instance->my_aru;
if (token->aru == token->seq) {
token->aru_addr = 0;
} else {
token->aru_addr = instance->my_id.addr[0].nodeid;
}
}
if (token->aru == last_aru && token->aru_addr != 0) {
instance->my_aru_count += 1;
} else {
instance->my_aru_count = 0;
}
if (instance->my_aru_count > instance->totem_config->fail_to_recv_const &&
token->aru_addr != instance->my_id.addr[0].nodeid) {
log_printf (instance->totemsrp_log_level_error,
"FAILED TO RECEIVE\n");
// TODO if we fail to receive, it may be possible to end with a gather
// state of proc == failed = 0 entries
/* THIS IS A BIG TODO
memb_set_merge (&token->aru_addr, 1,
instance->my_failed_list,
&instance->my_failed_list_entries);
*/
ring_state_restore (instance);
memb_state_gather_enter (instance, 6);
} else {
instance->my_token_seq = token->token_seq;
token->token_seq += 1;
if (instance->memb_state == MEMB_STATE_RECOVERY) {
/*
* instance->my_aru == instance->my_high_seq_received means this processor
* has recovered all messages it can recover
* (ie: its retrans queue is empty)
*/
if (cs_queue_is_empty (&instance->retrans_message_queue) == 0) {
if (token->retrans_flg == 0) {
token->retrans_flg = 1;
instance->my_set_retrans_flg = 1;
}
} else
if (token->retrans_flg == 1 && instance->my_set_retrans_flg) {
token->retrans_flg = 0;
}
log_printf (instance->totemsrp_log_level_debug,
"token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, aru %x\n",
token->retrans_flg, instance->my_set_retrans_flg,
cs_queue_is_empty (&instance->retrans_message_queue),
instance->my_retrans_flg_count, token->aru);
if (token->retrans_flg == 0) {
instance->my_retrans_flg_count += 1;
} else {
instance->my_retrans_flg_count = 0;
}
if (instance->my_retrans_flg_count == 2) {
instance->my_install_seq = token->seq;
}
log_printf (instance->totemsrp_log_level_debug,
"install seq %x aru %x high seq received %x\n",
instance->my_install_seq, instance->my_aru, instance->my_high_seq_received);
if (instance->my_retrans_flg_count >= 2 &&
instance->my_received_flg == 0 &&
sq_lte_compare (instance->my_install_seq, instance->my_aru)) {
instance->my_received_flg = 1;
instance->my_deliver_memb_entries = instance->my_trans_memb_entries;
memcpy (instance->my_deliver_memb_list, instance->my_trans_memb_list,
sizeof (struct totem_ip_address) * instance->my_trans_memb_entries);
}
if (instance->my_retrans_flg_count >= 3 &&
sq_lte_compare (instance->my_install_seq, token->aru)) {
instance->my_rotation_counter += 1;
} else {
instance->my_rotation_counter = 0;
}
if (instance->my_rotation_counter == 2) {
log_printf (instance->totemsrp_log_level_debug,
"retrans flag count %x token aru %x install seq %x aru %x %x\n",
instance->my_retrans_flg_count, token->aru, instance->my_install_seq,
instance->my_aru, token->seq);
memb_state_operational_enter (instance);
instance->my_rotation_counter = 0;
instance->my_retrans_flg_count = 0;
}
}
totemrrp_send_flush (instance->totemrrp_handle);
token_send (instance, token, forward_token);
#ifdef GIVEINFO
gettimeofday (&tv_current, NULL);
timersub (&tv_current, &tv_old, &tv_diff);
memcpy (&tv_old, &tv_current, sizeof (struct timeval));
log_printf (instance->totemsrp_log_level_notice,
"I held %0.4f ms\n",
((float)tv_diff.tv_usec) / 1000.0);
#endif
if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
messages_deliver_to_app (instance, 0,
instance->my_high_seq_received);
}
/*
* Deliver messages after token has been transmitted
* to improve performance
*/
reset_token_timeout (instance); // REVIEWED
reset_token_retransmit_timeout (instance); // REVIEWED
if (totemip_equal(&instance->my_id.addr[0], &instance->my_ring_id.rep) &&
instance->my_token_held == 1) {
start_token_hold_retransmit_timeout (instance);
}
token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_SENT);
}
break;
}
if ((forward_token)
&& instance->use_heartbeat) {
reset_heartbeat_timeout(instance);
}
else {
cancel_heartbeat_timeout(instance);
}
return (0);
}
static void messages_deliver_to_app (
struct totemsrp_instance *instance,
int skip,
unsigned int end_point)
{
struct sort_queue_item *sort_queue_item_p;
unsigned int i;
int res;
struct mcast *mcast_in;
struct mcast mcast_header;
unsigned int range = 0;
int endian_conversion_required;
unsigned int my_high_delivered_stored = 0;
range = end_point - instance->my_high_delivered;
if (range) {
log_printf (instance->totemsrp_log_level_debug,
"Delivering %x to %x\n", instance->my_high_delivered,
end_point);
}
assert (range < 10240);
my_high_delivered_stored = instance->my_high_delivered;
/*
* Deliver messages in order from rtr queue to pending delivery queue
*/
for (i = 1; i <= range; i++) {
void *ptr = 0;
/*
* If out of range of sort queue, stop assembly
*/
res = sq_in_range (&instance->regular_sort_queue,
my_high_delivered_stored + i);
if (res == 0) {
break;
}
res = sq_item_get (&instance->regular_sort_queue,
my_high_delivered_stored + i, &ptr);
/*
* If hole, stop assembly
*/
if (res != 0 && skip == 0) {
break;
}
instance->my_high_delivered = my_high_delivered_stored + i;
if (res != 0) {
continue;
}
sort_queue_item_p = ptr;
mcast_in = sort_queue_item_p->mcast;
assert (mcast_in != (struct mcast *)0xdeadbeef);
endian_conversion_required = 0;
if (mcast_in->header.endian_detector != ENDIAN_LOCAL) {
endian_conversion_required = 1;
mcast_endian_convert (mcast_in, &mcast_header);
} else {
memcpy (&mcast_header, mcast_in, sizeof (struct mcast));
}
/*
* Skip messages not originated in instance->my_deliver_memb
*/
if (skip &&
memb_set_subset (&mcast_header.system_from,
1,
instance->my_deliver_memb_list,
instance->my_deliver_memb_entries) == 0) {
instance->my_high_delivered = my_high_delivered_stored + i;
continue;
}
/*
* Message found
*/
log_printf (instance->totemsrp_log_level_debug,
"Delivering MCAST message with seq %x to pending delivery queue\n",
mcast_header.seq);
/*
* Message is locally originated multicast
*/
instance->totemsrp_deliver_fn (
mcast_header.header.nodeid,
((char *)sort_queue_item_p->mcast) + sizeof (struct mcast),
sort_queue_item_p->msg_len - sizeof (struct mcast),
endian_conversion_required);
}
}
/*
* recv message handler called when MCAST message type received
*/
static int message_handler_mcast (
struct totemsrp_instance *instance,
const void *msg,
size_t msg_len,
int endian_conversion_needed)
{
struct sort_queue_item sort_queue_item;
struct sq *sort_queue;
struct mcast mcast_header;
if (endian_conversion_needed) {
mcast_endian_convert (msg, &mcast_header);
} else {
memcpy (&mcast_header, msg, sizeof (struct mcast));
}
if (mcast_header.header.encapsulated == MESSAGE_ENCAPSULATED) {
sort_queue = &instance->recovery_sort_queue;
} else {
sort_queue = &instance->regular_sort_queue;
}
assert (msg_len < FRAME_SIZE_MAX);
#ifdef TEST_DROP_MCAST_PERCENTAGE
if (random()%100 < TEST_DROP_MCAST_PERCENTAGE) {
printf ("dropping message %d\n", mcast_header.seq);
return (0);
} else {
printf ("accepting message %d\n", mcast_header.seq);
}
#endif
if (srp_addr_equal (&mcast_header.system_from, &instance->my_id) == 0) {
cancel_token_retransmit_timeout (instance);
}
/*
* If the message is foreign execute the switch below
*/
if (memcmp (&instance->my_ring_id, &mcast_header.ring_id,
sizeof (struct memb_ring_id)) != 0) {
switch (instance->memb_state) {
case MEMB_STATE_OPERATIONAL:
memb_set_merge (
&mcast_header.system_from, 1,
instance->my_proc_list, &instance->my_proc_list_entries);
memb_state_gather_enter (instance, 7);
break;
case MEMB_STATE_GATHER:
if (!memb_set_subset (
&mcast_header.system_from,
1,
instance->my_proc_list,
instance->my_proc_list_entries)) {
memb_set_merge (&mcast_header.system_from, 1,
instance->my_proc_list, &instance->my_proc_list_entries);
memb_state_gather_enter (instance, 8);
return (0);
}
break;
case MEMB_STATE_COMMIT:
/* discard message */
break;
case MEMB_STATE_RECOVERY:
/* discard message */
break;
}
return (0);
}
log_printf (instance->totemsrp_log_level_debug,
"Received ringid(%s:%lld) seq %x\n",
totemip_print (&mcast_header.ring_id.rep),
mcast_header.ring_id.seq,
mcast_header.seq);
/*
* Add mcast message to rtr queue if not already in rtr queue
* otherwise free io vectors
*/
if (msg_len > 0 && msg_len < FRAME_SIZE_MAX &&
sq_in_range (sort_queue, mcast_header.seq) &&
sq_item_inuse (sort_queue, mcast_header.seq) == 0) {
/*
* Allocate new multicast memory block
*/
// TODO LEAK
sort_queue_item.mcast = malloc (msg_len);
if (sort_queue_item.mcast == NULL) {
return (-1); /* error here is corrected by the algorithm */
}
memcpy (sort_queue_item.mcast, msg, msg_len);
sort_queue_item.msg_len = msg_len;
if (sq_lt_compare (instance->my_high_seq_received,
mcast_header.seq)) {
instance->my_high_seq_received = mcast_header.seq;
}
sq_item_add (sort_queue, &sort_queue_item, mcast_header.seq);
}
update_aru (instance);
if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
messages_deliver_to_app (instance, 0, instance->my_high_seq_received);
}
/* TODO remove from retrans message queue for old ring in recovery state */
return (0);
}
static int message_handler_memb_merge_detect (
struct totemsrp_instance *instance,
const void *msg,
size_t msg_len,
int endian_conversion_needed)
{
struct memb_merge_detect memb_merge_detect;
if (endian_conversion_needed) {
memb_merge_detect_endian_convert (msg, &memb_merge_detect);
} else {
memcpy (&memb_merge_detect, msg,
sizeof (struct memb_merge_detect));
}
/*
* do nothing if this is a merge detect from this configuration
*/
if (memcmp (&instance->my_ring_id, &memb_merge_detect.ring_id,
sizeof (struct memb_ring_id)) == 0) {
return (0);
}
/*
* Execute merge operation
*/
switch (instance->memb_state) {
case MEMB_STATE_OPERATIONAL:
memb_set_merge (&memb_merge_detect.system_from, 1,
instance->my_proc_list, &instance->my_proc_list_entries);
memb_state_gather_enter (instance, 9);
break;
case MEMB_STATE_GATHER:
if (!memb_set_subset (
&memb_merge_detect.system_from,
1,
instance->my_proc_list,
instance->my_proc_list_entries)) {
memb_set_merge (&memb_merge_detect.system_from, 1,
instance->my_proc_list, &instance->my_proc_list_entries);
memb_state_gather_enter (instance, 10);
return (0);
}
break;
case MEMB_STATE_COMMIT:
/* do nothing in commit */
break;
case MEMB_STATE_RECOVERY:
/* do nothing in recovery */
break;
}
return (0);
}
static int memb_join_process (
struct totemsrp_instance *instance,
const struct memb_join *memb_join)
{
unsigned char *commit_token_storage[TOKEN_SIZE_MAX];
struct memb_commit_token *my_commit_token =
(struct memb_commit_token *)commit_token_storage;
struct srp_addr *proc_list;
struct srp_addr *failed_list;
proc_list = (struct srp_addr *)memb_join->end_of_memb_join;
failed_list = proc_list + memb_join->proc_list_entries;
if (memb_set_equal (proc_list,
memb_join->proc_list_entries,
instance->my_proc_list,
instance->my_proc_list_entries) &&
memb_set_equal (failed_list,
memb_join->failed_list_entries,
instance->my_failed_list,
instance->my_failed_list_entries)) {
memb_consensus_set (instance, &memb_join->system_from);
if (memb_consensus_agreed (instance) &&
memb_lowest_in_config (instance)) {
memb_state_commit_token_create (instance, my_commit_token);
memb_state_commit_enter (instance, my_commit_token);
} else {
return (0);
}
} else
if (memb_set_subset (proc_list,
memb_join->proc_list_entries,
instance->my_proc_list,
instance->my_proc_list_entries) &&
memb_set_subset (failed_list,
memb_join->failed_list_entries,
instance->my_failed_list,
instance->my_failed_list_entries)) {
return (0);
} else
if (memb_set_subset (&memb_join->system_from, 1,
instance->my_failed_list, instance->my_failed_list_entries)) {
return (0);
} else {
memb_set_merge (proc_list,
memb_join->proc_list_entries,
instance->my_proc_list, &instance->my_proc_list_entries);
if (memb_set_subset (
&instance->my_id, 1,
failed_list, memb_join->failed_list_entries)) {
memb_set_merge (
&memb_join->system_from, 1,
instance->my_failed_list, &instance->my_failed_list_entries);
} else {
memb_set_merge (failed_list,
memb_join->failed_list_entries,
instance->my_failed_list, &instance->my_failed_list_entries);
}
memb_state_gather_enter (instance, 11);
return (1); /* gather entered */
}
return (0); /* gather not entered */
}
static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out)
{
int i;
struct srp_addr *in_proc_list;
struct srp_addr *in_failed_list;
struct srp_addr *out_proc_list;
struct srp_addr *out_failed_list;
out->header.type = in->header.type;
out->header.endian_detector = ENDIAN_LOCAL;
out->header.nodeid = swab32 (in->header.nodeid);
srp_addr_copy_endian_convert (&out->system_from, &in->system_from);
out->proc_list_entries = swab32 (in->proc_list_entries);
out->failed_list_entries = swab32 (in->failed_list_entries);
out->ring_seq = swab64 (in->ring_seq);
in_proc_list = (struct srp_addr *)in->end_of_memb_join;
in_failed_list = in_proc_list + out->proc_list_entries;
out_proc_list = (struct srp_addr *)out->end_of_memb_join;
out_failed_list = out_proc_list + out->proc_list_entries;
for (i = 0; i < out->proc_list_entries; i++) {
srp_addr_copy_endian_convert (&out_proc_list[i], &in_proc_list[i]);
}
for (i = 0; i < out->failed_list_entries; i++) {
srp_addr_copy_endian_convert (&out_failed_list[i], &in_failed_list[i]);
}
}
static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out)
{
int i;
struct srp_addr *in_addr = (struct srp_addr *)in->end_of_commit_token;
struct srp_addr *out_addr = (struct srp_addr *)out->end_of_commit_token;
struct memb_commit_token_memb_entry *in_memb_list;
struct memb_commit_token_memb_entry *out_memb_list;
out->header.type = in->header.type;
out->header.endian_detector = ENDIAN_LOCAL;
out->header.nodeid = swab32 (in->header.nodeid);
out->token_seq = swab32 (in->token_seq);
totemip_copy_endian_convert(&out->ring_id.rep, &in->ring_id.rep);
out->ring_id.seq = swab64 (in->ring_id.seq);
out->retrans_flg = swab32 (in->retrans_flg);
out->memb_index = swab32 (in->memb_index);
out->addr_entries = swab32 (in->addr_entries);
in_memb_list = (struct memb_commit_token_memb_entry *)(in_addr + out->addr_entries);
out_memb_list = (struct memb_commit_token_memb_entry *)(out_addr + out->addr_entries);
for (i = 0; i < out->addr_entries; i++) {
srp_addr_copy_endian_convert (&out_addr[i], &in_addr[i]);
/*
* Only convert the memb entry if it has been set
*/
if (in_memb_list[i].ring_id.rep.family != 0) {
totemip_copy_endian_convert (&out_memb_list[i].ring_id.rep,
&in_memb_list[i].ring_id.rep);
out_memb_list[i].ring_id.seq =
swab64 (in_memb_list[i].ring_id.seq);
out_memb_list[i].aru = swab32 (in_memb_list[i].aru);
out_memb_list[i].high_delivered = swab32 (in_memb_list[i].high_delivered);
out_memb_list[i].received_flg = swab32 (in_memb_list[i].received_flg);
}
}
}
static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out)
{
int i;
out->header.type = in->header.type;
out->header.endian_detector = ENDIAN_LOCAL;
out->header.nodeid = swab32 (in->header.nodeid);
out->seq = swab32 (in->seq);
out->token_seq = swab32 (in->token_seq);
out->aru = swab32 (in->aru);
totemip_copy_endian_convert(&out->ring_id.rep, &in->ring_id.rep);
out->aru_addr = swab32(in->aru_addr);
out->ring_id.seq = swab64 (in->ring_id.seq);
out->fcc = swab32 (in->fcc);
out->backlog = swab32 (in->backlog);
out->retrans_flg = swab32 (in->retrans_flg);
out->rtr_list_entries = swab32 (in->rtr_list_entries);
for (i = 0; i < out->rtr_list_entries; i++) {
totemip_copy_endian_convert(&out->rtr_list[i].ring_id.rep, &in->rtr_list[i].ring_id.rep);
out->rtr_list[i].ring_id.seq = swab64 (in->rtr_list[i].ring_id.seq);
out->rtr_list[i].seq = swab32 (in->rtr_list[i].seq);
}
}
static void mcast_endian_convert (const struct mcast *in, struct mcast *out)
{
out->header.type = in->header.type;
out->header.endian_detector = ENDIAN_LOCAL;
out->header.nodeid = swab32 (in->header.nodeid);
out->header.encapsulated = in->header.encapsulated;
out->seq = swab32 (in->seq);
out->this_seqno = swab32 (in->this_seqno);
totemip_copy_endian_convert(&out->ring_id.rep, &in->ring_id.rep);
out->ring_id.seq = swab64 (in->ring_id.seq);
out->node_id = swab32 (in->node_id);
out->guarantee = swab32 (in->guarantee);
srp_addr_copy_endian_convert (&out->system_from, &in->system_from);
}
static void memb_merge_detect_endian_convert (
const struct memb_merge_detect *in,
struct memb_merge_detect *out)
{
out->header.type = in->header.type;
out->header.endian_detector = ENDIAN_LOCAL;
out->header.nodeid = swab32 (in->header.nodeid);
totemip_copy_endian_convert(&out->ring_id.rep, &in->ring_id.rep);
out->ring_id.seq = swab64 (in->ring_id.seq);
srp_addr_copy_endian_convert (&out->system_from, &in->system_from);
}
static int message_handler_memb_join (
struct totemsrp_instance *instance,
const void *msg,
size_t msg_len,
int endian_conversion_needed)
{
const struct memb_join *memb_join;
struct memb_join *memb_join_convert = (struct memb_join *)alloca (msg_len);
int gather_entered;
if (endian_conversion_needed) {
memb_join = memb_join_convert;
memb_join_endian_convert (msg, memb_join_convert);
} else {
memb_join = msg;
}
if (instance->token_ring_id_seq < memb_join->ring_seq) {
instance->token_ring_id_seq = memb_join->ring_seq;
}
switch (instance->memb_state) {
case MEMB_STATE_OPERATIONAL:
gather_entered = memb_join_process (instance,
memb_join);
if (gather_entered == 0) {
memb_state_gather_enter (instance, 12);
}
break;
case MEMB_STATE_GATHER:
memb_join_process (instance, memb_join);
break;
case MEMB_STATE_COMMIT:
if (memb_set_subset (&memb_join->system_from,
1,
instance->my_new_memb_list,
instance->my_new_memb_entries) &&
memb_join->ring_seq >= instance->my_ring_id.seq) {
memb_join_process (instance, memb_join);
memb_state_gather_enter (instance, 13);
}
break;
case MEMB_STATE_RECOVERY:
if (memb_set_subset (&memb_join->system_from,
1,
instance->my_new_memb_list,
instance->my_new_memb_entries) &&
memb_join->ring_seq >= instance->my_ring_id.seq) {
ring_state_restore (instance);
memb_join_process (instance, memb_join);
memb_state_gather_enter (instance, 14);
}
break;
}
return (0);
}
static int message_handler_memb_commit_token (
struct totemsrp_instance *instance,
const void *msg,
size_t msg_len,
int endian_conversion_needed)
{
struct memb_commit_token *memb_commit_token_convert = alloca (msg_len);
struct memb_commit_token *memb_commit_token;
struct srp_addr sub[PROCESSOR_COUNT_MAX];
int sub_entries;
struct srp_addr *addr;
struct memb_commit_token_memb_entry *memb_list;
log_printf (instance->totemsrp_log_level_debug,
"got commit token\n");
if (endian_conversion_needed) {
memb_commit_token_endian_convert (msg, memb_commit_token_convert);
} else {
memcpy (memb_commit_token_convert, msg, msg_len);
}
memb_commit_token = memb_commit_token_convert;
addr = (struct srp_addr *)memb_commit_token->end_of_commit_token;
memb_list = (struct memb_commit_token_memb_entry *)(addr + memb_commit_token->addr_entries);
#ifdef TEST_DROP_COMMIT_TOKEN_PERCENTAGE
if (random()%100 < TEST_DROP_COMMIT_TOKEN_PERCENTAGE) {
return (0);
}
#endif
switch (instance->memb_state) {
case MEMB_STATE_OPERATIONAL:
/* discard token */
break;
case MEMB_STATE_GATHER:
memb_set_subtract (sub, &sub_entries,
instance->my_proc_list, instance->my_proc_list_entries,
instance->my_failed_list, instance->my_failed_list_entries);
if (memb_set_equal (addr,
memb_commit_token->addr_entries,
sub,
sub_entries) &&
memb_commit_token->ring_id.seq > instance->my_ring_id.seq) {
memb_state_commit_enter (instance, memb_commit_token);
}
break;
case MEMB_STATE_COMMIT:
/*
* If retransmitted commit tokens are sent on this ring
* filter them out and only enter recovery once the
* commit token has traversed the array. This is
* determined by :
* memb_commit_token->memb_index == memb_commit_token->addr_entries) {
*/
if (memb_commit_token->ring_id.seq == instance->my_ring_id.seq &&
memb_commit_token->memb_index == memb_commit_token->addr_entries) {
memb_state_recovery_enter (instance, memb_commit_token);
}
break;
case MEMB_STATE_RECOVERY:
if (totemip_equal (&instance->my_id.addr[0], &instance->my_ring_id.rep)) {
log_printf (instance->totemsrp_log_level_notice,
"Sending initial ORF token\n");
// TODO convert instead of initiate
orf_token_send_initial (instance);
reset_token_timeout (instance); // REVIEWED
reset_token_retransmit_timeout (instance); // REVIEWED
}
break;
}
return (0);
}
static int message_handler_token_hold_cancel (
struct totemsrp_instance *instance,
const void *msg,
size_t msg_len,
int endian_conversion_needed)
{
const struct token_hold_cancel *token_hold_cancel = msg;
if (memcmp (&token_hold_cancel->ring_id, &instance->my_ring_id,
sizeof (struct memb_ring_id)) == 0) {
instance->my_seq_unchanged = 0;
if (totemip_equal(&instance->my_ring_id.rep, &instance->my_id.addr[0])) {
timer_function_token_retransmit_timeout (instance);
}
}
return (0);
}
void main_deliver_fn (
void *context,
const void *msg,
unsigned int msg_len)
{
struct totemsrp_instance *instance = context;
const struct message_header *message_header = msg;
if (msg_len < sizeof (struct message_header)) {
log_printf (instance->totemsrp_log_level_security,
"Received message is too short... ignoring %u.\n",
(unsigned int)msg_len);
return;
}
if ((int)message_header->type >= totemsrp_message_handlers.count) {
log_printf (instance->totemsrp_log_level_security, "Type of received message is wrong... ignoring %d.\n", (int)message_header->type);
return;
}
/*
* Handle incoming message
*/
totemsrp_message_handlers.handler_functions[(int)message_header->type] (
instance,
msg,
msg_len,
message_header->endian_detector != ENDIAN_LOCAL);
}
void main_iface_change_fn (
void *context,
const struct totem_ip_address *iface_addr,
unsigned int iface_no)
{
struct totemsrp_instance *instance = context;
totemip_copy (&instance->my_id.addr[iface_no], iface_addr);
assert (instance->my_id.addr[iface_no].nodeid);
totemip_copy (&instance->my_memb_list[0].addr[iface_no], iface_addr);
if (instance->iface_changes++ == 0) {
memb_ring_id_create_or_load (instance, &instance->my_ring_id);
log_printf (
instance->totemsrp_log_level_notice,
"Created or loaded sequence id %lld.%s for this ring.\n",
instance->my_ring_id.seq,
totemip_print (&instance->my_ring_id.rep));
}
if (instance->iface_changes >= instance->totem_config->interface_count) {
memb_state_gather_enter (instance, 15);
}
}
void totemsrp_net_mtu_adjust (struct totem_config *totem_config) {
totem_config->net_mtu -= sizeof (struct mcast);
}
diff --git a/services/cpg.c b/services/cpg.c
index 84691ac2..303f7de0 100644
--- a/services/cpg.c
+++ b/services/cpg.c
@@ -1,1086 +1,1086 @@
/*
* Copyright (c) 2006-2009 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Christine Caulfield (ccaulfie@redhat.com)
* Author: Jan Friesse (jfriesse@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <config.h>
-#ifndef COROSYNC_BSD
+#ifdef HAVE_ALLOCA_H
#include <alloca.h>
#endif
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/ioctl.h>
#include <netinet/in.h>
#include <sys/uio.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <time.h>
#include <unistd.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <corosync/corotypes.h>
#include <corosync/coroipc_types.h>
#include <corosync/corodefs.h>
#include <corosync/list.h>
#include <corosync/jhash.h>
#include <corosync/lcr/lcr_comp.h>
#include <corosync/engine/logsys.h>
#include <corosync/engine/coroapi.h>
#include <corosync/cpg.h>
#include <corosync/ipc_cpg.h>
LOGSYS_DECLARE_SUBSYS ("CPG");
#define GROUP_HASH_SIZE 32
enum cpg_message_req_types {
MESSAGE_REQ_EXEC_CPG_PROCJOIN = 0,
MESSAGE_REQ_EXEC_CPG_PROCLEAVE = 1,
MESSAGE_REQ_EXEC_CPG_JOINLIST = 2,
MESSAGE_REQ_EXEC_CPG_MCAST = 3,
MESSAGE_REQ_EXEC_CPG_DOWNLIST = 4
};
/*
* state` exec deliver
* match group name, pid -> if matched deliver for YES:
* XXX indicates impossible state
*
* join leave mcast
* UNJOINED XXX XXX NO
* LEAVE_STARTED XXX YES(unjoined_enter) YES
* JOIN_STARTED YES(join_started_enter) XXX NO
* JOIN_COMPLETED XXX NO YES
*
* join_started_enter
* set JOIN_COMPLETED
* add entry to process_info list
* unjoined_enter
* set UNJOINED
* delete entry from process_info list
*
*
* library accept join error codes
* UNJOINED YES(CPG_OK) set JOIN_STARTED
* LEAVE_STARTED NO(CPG_ERR_BUSY)
* JOIN_STARTED NO(CPG_ERR_EXIST)
* JOIN_COMPlETED NO(CPG_ERR_EXIST)
*
* library accept leave error codes
* UNJOINED NO(CPG_ERR_NOT_EXIST)
* LEAVE_STARTED NO(CPG_ERR_NOT_EXIST)
* JOIN_STARTED NO(CPG_ERR_BUSY)
* JOIN_COMPLETED YES(CPG_OK) set LEAVE_STARTED
*
* library accept mcast
* UNJOINED NO(CPG_ERR_NOT_EXIST)
* LEAVE_STARTED NO(CPG_ERR_NOT_EXIST)
* JOIN_STARTED YES(CPG_OK)
* JOIN_COMPLETED YES(CPG_OK)
*/
enum cpd_state {
CPD_STATE_UNJOINED,
CPD_STATE_LEAVE_STARTED,
CPD_STATE_JOIN_STARTED,
CPD_STATE_JOIN_COMPLETED
};
struct cpg_pd {
void *conn;
mar_cpg_name_t group_name;
uint32_t pid;
enum cpd_state cpd_state;
struct list_head list;
};
DECLARE_LIST_INIT(cpg_pd_list_head);
struct process_info {
unsigned int nodeid;
uint32_t pid;
mar_cpg_name_t group;
struct list_head list; /* on the group_info members list */
};
DECLARE_LIST_INIT(process_info_list_head);
struct join_list_entry {
uint32_t pid;
mar_cpg_name_t group_name;
};
static struct corosync_api_v1 *api = NULL;
/*
* Service Interfaces required by service_message_handler struct
*/
static void cpg_confchg_fn (
enum totem_configuration_type configuration_type,
const unsigned int *member_list, size_t member_list_entries,
const unsigned int *left_list, size_t left_list_entries,
const unsigned int *joined_list, size_t joined_list_entries,
const struct memb_ring_id *ring_id);
static int cpg_exec_init_fn (struct corosync_api_v1 *);
static int cpg_lib_init_fn (void *conn);
static int cpg_lib_exit_fn (void *conn);
static void message_handler_req_exec_cpg_procjoin (
const void *message,
unsigned int nodeid);
static void message_handler_req_exec_cpg_procleave (
const void *message,
unsigned int nodeid);
static void message_handler_req_exec_cpg_joinlist (
const void *message,
unsigned int nodeid);
static void message_handler_req_exec_cpg_mcast (
const void *message,
unsigned int nodeid);
static void message_handler_req_exec_cpg_downlist (
const void *message,
unsigned int nodeid);
static void exec_cpg_procjoin_endian_convert (void *msg);
static void exec_cpg_joinlist_endian_convert (void *msg);
static void exec_cpg_mcast_endian_convert (void *msg);
static void exec_cpg_downlist_endian_convert (void *msg);
static void message_handler_req_lib_cpg_join (void *conn, const void *message);
static void message_handler_req_lib_cpg_leave (void *conn, const void *message);
static void message_handler_req_lib_cpg_mcast (void *conn, const void *message);
static void message_handler_req_lib_cpg_membership (void *conn,
const void *message);
static void message_handler_req_lib_cpg_local_get (void *conn,
const void *message);
static int cpg_node_joinleave_send (unsigned int pid, const mar_cpg_name_t *group_name, int fn, int reason);
static int cpg_exec_send_joinlist(void);
static void cpg_sync_init (void);
static int cpg_sync_process (void);
static void cpg_sync_activate (void);
static void cpg_sync_abort (void);
/*
* Library Handler Definition
*/
static struct corosync_lib_handler cpg_lib_engine[] =
{
{ /* 0 */
.lib_handler_fn = message_handler_req_lib_cpg_join,
.flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
},
{ /* 1 */
.lib_handler_fn = message_handler_req_lib_cpg_leave,
.flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
},
{ /* 2 */
.lib_handler_fn = message_handler_req_lib_cpg_mcast,
.flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
},
{ /* 3 */
.lib_handler_fn = message_handler_req_lib_cpg_membership,
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 4 */
.lib_handler_fn = message_handler_req_lib_cpg_local_get,
.flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
}
};
static struct corosync_exec_handler cpg_exec_engine[] =
{
{ /* 0 */
.exec_handler_fn = message_handler_req_exec_cpg_procjoin,
.exec_endian_convert_fn = exec_cpg_procjoin_endian_convert
},
{ /* 1 */
.exec_handler_fn = message_handler_req_exec_cpg_procleave,
.exec_endian_convert_fn = exec_cpg_procjoin_endian_convert
},
{ /* 2 */
.exec_handler_fn = message_handler_req_exec_cpg_joinlist,
.exec_endian_convert_fn = exec_cpg_joinlist_endian_convert
},
{ /* 3 */
.exec_handler_fn = message_handler_req_exec_cpg_mcast,
.exec_endian_convert_fn = exec_cpg_mcast_endian_convert
},
{ /* 4 */
.exec_handler_fn = message_handler_req_exec_cpg_downlist,
.exec_endian_convert_fn = exec_cpg_downlist_endian_convert
},
};
struct corosync_service_engine cpg_service_engine = {
.name = "corosync cluster closed process group service v1.01",
.id = CPG_SERVICE,
.priority = 1,
.private_data_size = sizeof (struct cpg_pd),
.flow_control = CS_LIB_FLOW_CONTROL_REQUIRED,
.allow_inquorate = CS_LIB_ALLOW_INQUORATE,
.lib_init_fn = cpg_lib_init_fn,
.lib_exit_fn = cpg_lib_exit_fn,
.lib_engine = cpg_lib_engine,
.lib_engine_count = sizeof (cpg_lib_engine) / sizeof (struct corosync_lib_handler),
.exec_init_fn = cpg_exec_init_fn,
.exec_dump_fn = NULL,
.exec_engine = cpg_exec_engine,
.exec_engine_count = sizeof (cpg_exec_engine) / sizeof (struct corosync_exec_handler),
.confchg_fn = cpg_confchg_fn,
.sync_init = cpg_sync_init,
.sync_process = cpg_sync_process,
.sync_activate = cpg_sync_activate,
.sync_abort = cpg_sync_abort
};
/*
* Dynamic loader definition
*/
static struct corosync_service_engine *cpg_get_service_engine_ver0 (void);
static struct corosync_service_engine_iface_ver0 cpg_service_engine_iface = {
.corosync_get_service_engine_ver0 = cpg_get_service_engine_ver0
};
static struct lcr_iface corosync_cpg_ver0[1] = {
{
.name = "corosync_cpg",
.version = 0,
.versions_replace = 0,
.versions_replace_count = 0,
.dependencies = 0,
.dependency_count = 0,
.constructor = NULL,
.destructor = NULL,
.interfaces = NULL
}
};
static struct lcr_comp cpg_comp_ver0 = {
.iface_count = 1,
.ifaces = corosync_cpg_ver0
};
static struct corosync_service_engine *cpg_get_service_engine_ver0 (void)
{
return (&cpg_service_engine);
}
#ifdef COROSYNC_SOLARIS
void corosync_lcr_component_register (void);
void corosync_lcr_component_register (void) {
#else
__attribute__ ((constructor)) static void corosync_lcr_component_register (void) {
#endif
lcr_interfaces_set (&corosync_cpg_ver0[0], &cpg_service_engine_iface);
lcr_component_register (&cpg_comp_ver0);
}
struct req_exec_cpg_procjoin {
coroipc_request_header_t header __attribute__((aligned(8)));
mar_cpg_name_t group_name __attribute__((aligned(8)));
mar_uint32_t pid __attribute__((aligned(8)));
mar_uint32_t reason __attribute__((aligned(8)));
};
struct req_exec_cpg_mcast {
coroipc_request_header_t header __attribute__((aligned(8)));
mar_cpg_name_t group_name __attribute__((aligned(8)));
mar_uint32_t msglen __attribute__((aligned(8)));
mar_uint32_t pid __attribute__((aligned(8)));
mar_message_source_t source __attribute__((aligned(8)));
mar_uint8_t message[] __attribute__((aligned(8)));
};
struct req_exec_cpg_downlist {
coroipc_request_header_t header __attribute__((aligned(8)));
mar_uint32_t left_nodes __attribute__((aligned(8)));
mar_uint32_t nodeids[PROCESSOR_COUNT_MAX] __attribute__((aligned(8)));
};
static struct req_exec_cpg_downlist g_req_exec_cpg_downlist;
static void cpg_sync_init (void)
{
}
static int cpg_sync_process (void)
{
return cpg_exec_send_joinlist();
}
static void cpg_sync_activate (void)
{
}
static void cpg_sync_abort (void)
{
}
static int notify_lib_joinlist(
const mar_cpg_name_t *group_name,
void *conn,
int joined_list_entries,
mar_cpg_address_t *joined_list,
int left_list_entries,
mar_cpg_address_t *left_list,
int id)
{
int size;
char *buf;
struct list_head *iter;
int count;
struct res_lib_cpg_confchg_callback *res;
mar_cpg_address_t *retgi;
count = 0;
for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
struct process_info *pi = list_entry (iter, struct process_info, list);
if (mar_name_compare (&pi->group, group_name) == 0) {
int i;
int founded = 0;
for (i = 0; i < left_list_entries; i++) {
if (left_list[i].nodeid == pi->nodeid && left_list[i].pid == pi->pid) {
founded++;
}
}
if (!founded)
count++;
}
}
size = sizeof(struct res_lib_cpg_confchg_callback) +
sizeof(mar_cpg_address_t) * (count + left_list_entries + joined_list_entries);
buf = alloca(size);
if (!buf)
return CPG_ERR_LIBRARY;
res = (struct res_lib_cpg_confchg_callback *)buf;
res->joined_list_entries = joined_list_entries;
res->left_list_entries = left_list_entries;
res->member_list_entries = count;
retgi = res->member_list;
res->header.size = size;
res->header.id = id;
res->header.error = CS_OK;
memcpy(&res->group_name, group_name, sizeof(mar_cpg_name_t));
for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
struct process_info *pi=list_entry (iter, struct process_info, list);
if (mar_name_compare (&pi->group, group_name) == 0) {
int i;
int founded = 0;
for (i = 0;i < left_list_entries; i++) {
if (left_list[i].nodeid == pi->nodeid && left_list[i].pid == pi->pid) {
founded++;
}
}
if (!founded) {
retgi->nodeid = pi->nodeid;
retgi->pid = pi->pid;
retgi++;
}
}
}
if (left_list_entries) {
memcpy (retgi, left_list, left_list_entries * sizeof(mar_cpg_address_t));
retgi += left_list_entries;
}
if (joined_list_entries) {
memcpy (retgi, joined_list, joined_list_entries * sizeof(mar_cpg_address_t));
retgi += joined_list_entries;
}
if (conn) {
api->ipc_dispatch_send (conn, buf, size);
} else {
for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; iter = iter->next) {
struct cpg_pd *cpd = list_entry (iter, struct cpg_pd, list);
if (mar_name_compare (&cpd->group_name, group_name) == 0) {
assert (left_list_entries <= 1);
assert (joined_list_entries <= 1);
if (joined_list_entries) {
if (joined_list[0].pid == cpd->pid &&
joined_list[0].nodeid == api->totem_nodeid_get()) {
cpd->cpd_state = CPD_STATE_JOIN_COMPLETED;
}
}
if (cpd->cpd_state == CPD_STATE_JOIN_COMPLETED ||
cpd->cpd_state == CPD_STATE_LEAVE_STARTED) {
api->ipc_dispatch_send (cpd->conn, buf, size);
}
if (left_list_entries) {
if (left_list[0].pid == cpd->pid &&
left_list[0].nodeid == api->totem_nodeid_get()) {
cpd->pid = 0;
memset (&cpd->group_name, 0, sizeof(cpd->group_name));
cpd->cpd_state = CPD_STATE_UNJOINED;
}
}
}
}
}
return CPG_OK;
}
static int cpg_exec_init_fn (struct corosync_api_v1 *corosync_api)
{
#ifdef COROSYNC_SOLARIS
logsys_subsys_init();
#endif
api = corosync_api;
return (0);
}
static int cpg_lib_exit_fn (void *conn)
{
struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
log_printf(LOGSYS_LEVEL_DEBUG, "exit_fn for conn=%p\n", conn);
if (cpd->group_name.length > 0) {
cpg_node_joinleave_send (cpd->pid, &cpd->group_name,
MESSAGE_REQ_EXEC_CPG_PROCLEAVE, CONFCHG_CPG_REASON_LEAVE);
}
list_del (&cpd->list);
api->ipc_refcnt_dec (conn);
return (0);
}
static int cpg_node_joinleave_send (unsigned int pid, const mar_cpg_name_t *group_name, int fn, int reason)
{
struct req_exec_cpg_procjoin req_exec_cpg_procjoin;
struct iovec req_exec_cpg_iovec;
int result;
memcpy(&req_exec_cpg_procjoin.group_name, group_name, sizeof(mar_cpg_name_t));
req_exec_cpg_procjoin.pid = pid;
req_exec_cpg_procjoin.reason = reason;
req_exec_cpg_procjoin.header.size = sizeof(req_exec_cpg_procjoin);
req_exec_cpg_procjoin.header.id = SERVICE_ID_MAKE(CPG_SERVICE, fn);
req_exec_cpg_iovec.iov_base = (char *)&req_exec_cpg_procjoin;
req_exec_cpg_iovec.iov_len = sizeof(req_exec_cpg_procjoin);
result = api->totem_mcast (&req_exec_cpg_iovec, 1, TOTEM_AGREED);
return (result);
}
static void cpg_confchg_fn (
enum totem_configuration_type configuration_type,
const unsigned int *member_list, size_t member_list_entries,
const unsigned int *left_list, size_t left_list_entries,
const unsigned int *joined_list, size_t joined_list_entries,
const struct memb_ring_id *ring_id)
{
int i;
uint32_t lowest_nodeid = 0xffffffff;
struct iovec req_exec_cpg_iovec;
/* We don't send the library joinlist in here because it can end up
out of order with the rest of the messages (which are totem ordered).
So we get the lowest nodeid to send out a list of left nodes instead.
On receipt of that message, all nodes will then notify their local clients
of the new joinlist */
if (left_list_entries) {
for (i = 0; i < member_list_entries; i++) {
if (member_list[i] < lowest_nodeid)
lowest_nodeid = member_list[i];
}
log_printf(LOGSYS_LEVEL_DEBUG, "confchg, low nodeid=%d, us = %d\n", lowest_nodeid, api->totem_nodeid_get());
if (lowest_nodeid == api->totem_nodeid_get()) {
g_req_exec_cpg_downlist.header.id = SERVICE_ID_MAKE(CPG_SERVICE, MESSAGE_REQ_EXEC_CPG_DOWNLIST);
g_req_exec_cpg_downlist.header.size = sizeof(struct req_exec_cpg_downlist);
g_req_exec_cpg_downlist.left_nodes = left_list_entries;
for (i = 0; i < left_list_entries; i++) {
g_req_exec_cpg_downlist.nodeids[i] = left_list[i];
}
log_printf(LOGSYS_LEVEL_DEBUG,
"confchg, build downlist: %lu nodes\n",
(long unsigned int) left_list_entries);
}
}
/* Don't send this message until we get the final configuration message */
if (configuration_type == TOTEM_CONFIGURATION_REGULAR && g_req_exec_cpg_downlist.left_nodes) {
req_exec_cpg_iovec.iov_base = (char *)&g_req_exec_cpg_downlist;
req_exec_cpg_iovec.iov_len = g_req_exec_cpg_downlist.header.size;
api->totem_mcast (&req_exec_cpg_iovec, 1, TOTEM_AGREED);
g_req_exec_cpg_downlist.left_nodes = 0;
log_printf(LOGSYS_LEVEL_DEBUG, "confchg, sent downlist\n");
}
}
/* Can byteswap join & leave messages */
static void exec_cpg_procjoin_endian_convert (void *msg)
{
struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = msg;
req_exec_cpg_procjoin->pid = swab32(req_exec_cpg_procjoin->pid);
swab_mar_cpg_name_t (&req_exec_cpg_procjoin->group_name);
req_exec_cpg_procjoin->reason = swab32(req_exec_cpg_procjoin->reason);
}
static void exec_cpg_joinlist_endian_convert (void *msg_v)
{
char *msg = msg_v;
coroipc_response_header_t *res = (coroipc_response_header_t *)msg;
struct join_list_entry *jle = (struct join_list_entry *)(msg + sizeof(coroipc_response_header_t));
/* XXX shouldn't mar_res_header be swabbed? */
while ((const char*)jle < msg + res->size) {
jle->pid = swab32(jle->pid);
swab_mar_cpg_name_t (&jle->group_name);
jle++;
}
}
static void exec_cpg_downlist_endian_convert (void *msg)
{
struct req_exec_cpg_downlist *req_exec_cpg_downlist = msg;
unsigned int i;
req_exec_cpg_downlist->left_nodes = swab32(req_exec_cpg_downlist->left_nodes);
for (i = 0; i < req_exec_cpg_downlist->left_nodes; i++) {
req_exec_cpg_downlist->nodeids[i] = swab32(req_exec_cpg_downlist->nodeids[i]);
}
}
static void exec_cpg_mcast_endian_convert (void *msg)
{
struct req_exec_cpg_mcast *req_exec_cpg_mcast = msg;
swab_coroipc_request_header_t (&req_exec_cpg_mcast->header);
swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name);
req_exec_cpg_mcast->pid = swab32(req_exec_cpg_mcast->pid);
req_exec_cpg_mcast->msglen = swab32(req_exec_cpg_mcast->msglen);
swab_mar_message_source_t (&req_exec_cpg_mcast->source);
}
static struct process_info *process_info_find(const mar_cpg_name_t *group_name, uint32_t pid, unsigned int nodeid) {
struct list_head *iter;
for (iter = process_info_list_head.next; iter != &process_info_list_head; ) {
struct process_info *pi = list_entry (iter, struct process_info, list);
iter = iter->next;
if (pi->pid == pid && pi->nodeid == nodeid &&
mar_name_compare (&pi->group, group_name) == 0) {
return pi;
}
}
return NULL;
}
static void do_proc_join(
const mar_cpg_name_t *name,
uint32_t pid,
unsigned int nodeid,
int reason)
{
struct process_info *pi;
struct process_info *pi_entry;
mar_cpg_address_t notify_info;
struct list_head *list;
struct list_head *list_to_add = NULL;
if (process_info_find (name, pid, nodeid) != NULL) {
return ;
}
pi = malloc (sizeof (struct process_info));
if (!pi) {
log_printf(LOGSYS_LEVEL_WARNING, "Unable to allocate process_info struct");
return;
}
pi->nodeid = nodeid;
pi->pid = pid;
memcpy(&pi->group, name, sizeof(*name));
list_init(&pi->list);
/*
* Insert new process in sorted order so synchronization works properly
*/
list_to_add = &process_info_list_head;
for (list = process_info_list_head.next; list != &process_info_list_head; list = list->next) {
pi_entry = list_entry(list, struct process_info, list);
if (pi_entry->nodeid > pi->nodeid ||
(pi_entry->nodeid == pi->nodeid && pi_entry->pid > pi->pid)) {
break;
}
list_to_add = list;
}
list_splice (&pi->list, list_to_add);
notify_info.pid = pi->pid;
notify_info.nodeid = nodeid;
notify_info.reason = reason;
notify_lib_joinlist(&pi->group, NULL,
1, &notify_info,
0, NULL,
MESSAGE_RES_CPG_CONFCHG_CALLBACK);
}
static void message_handler_req_exec_cpg_downlist (
const void *message,
unsigned int nodeid)
{
const struct req_exec_cpg_downlist *req_exec_cpg_downlist = message;
int i;
mar_cpg_address_t left_list[1];
struct list_head *iter;
/*
FOR OPTIMALIZATION - Make list of lists
*/
log_printf (LOGSYS_LEVEL_DEBUG, "downlist left_list: %d\n", req_exec_cpg_downlist->left_nodes);
for (iter = process_info_list_head.next; iter != &process_info_list_head; ) {
struct process_info *pi = list_entry(iter, struct process_info, list);
iter = iter->next;
for (i = 0; i < req_exec_cpg_downlist->left_nodes; i++) {
if (pi->nodeid == req_exec_cpg_downlist->nodeids[i]) {
left_list[0].nodeid = pi->nodeid;
left_list[0].pid = pi->pid;
left_list[0].reason = CONFCHG_CPG_REASON_NODEDOWN;
notify_lib_joinlist(&pi->group, NULL,
0, NULL,
1, left_list,
MESSAGE_RES_CPG_CONFCHG_CALLBACK);
list_del (&pi->list);
free (pi);
}
}
}
}
static void message_handler_req_exec_cpg_procjoin (
const void *message,
unsigned int nodeid)
{
const struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = message;
log_printf(LOGSYS_LEVEL_DEBUG, "got procjoin message from cluster node %d\n", nodeid);
do_proc_join (&req_exec_cpg_procjoin->group_name,
req_exec_cpg_procjoin->pid, nodeid,
CONFCHG_CPG_REASON_JOIN);
}
static void message_handler_req_exec_cpg_procleave (
const void *message,
unsigned int nodeid)
{
const struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = message;
struct process_info *pi;
struct list_head *iter;
mar_cpg_address_t notify_info;
log_printf(LOGSYS_LEVEL_DEBUG, "got procleave message from cluster node %d\n", nodeid);
notify_info.pid = req_exec_cpg_procjoin->pid;
notify_info.nodeid = nodeid;
notify_info.reason = req_exec_cpg_procjoin->reason;
notify_lib_joinlist(&req_exec_cpg_procjoin->group_name, NULL,
0, NULL,
1, &notify_info,
MESSAGE_RES_CPG_CONFCHG_CALLBACK);
for (iter = process_info_list_head.next; iter != &process_info_list_head; ) {
pi = list_entry(iter, struct process_info, list);
iter = iter->next;
if (pi->pid == req_exec_cpg_procjoin->pid && pi->nodeid == nodeid &&
mar_name_compare (&pi->group, &req_exec_cpg_procjoin->group_name)==0) {
list_del (&pi->list);
free (pi);
}
}
}
/* Got a proclist from another node */
static void message_handler_req_exec_cpg_joinlist (
const void *message_v,
unsigned int nodeid)
{
const char *message = message_v;
const coroipc_response_header_t *res = (const coroipc_response_header_t *)message;
const struct join_list_entry *jle = (const struct join_list_entry *)(message + sizeof(coroipc_response_header_t));
log_printf(LOGSYS_LEVEL_NOTICE, "got joinlist message from node %d\n",
nodeid);
/* Ignore our own messages */
if (nodeid == api->totem_nodeid_get()) {
return;
}
while ((const char*)jle < message + res->size) {
do_proc_join (&jle->group_name, jle->pid, nodeid,
CONFCHG_CPG_REASON_NODEUP);
jle++;
}
}
static void message_handler_req_exec_cpg_mcast (
const void *message,
unsigned int nodeid)
{
const struct req_exec_cpg_mcast *req_exec_cpg_mcast = message;
struct res_lib_cpg_deliver_callback res_lib_cpg_mcast;
int msglen = req_exec_cpg_mcast->msglen;
struct list_head *iter;
struct cpg_pd *cpd;
struct iovec iovec[2];
res_lib_cpg_mcast.header.id = MESSAGE_RES_CPG_DELIVER_CALLBACK;
res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast) + msglen;
res_lib_cpg_mcast.msglen = msglen;
res_lib_cpg_mcast.pid = req_exec_cpg_mcast->pid;
res_lib_cpg_mcast.nodeid = nodeid;
memcpy(&res_lib_cpg_mcast.group_name, &req_exec_cpg_mcast->group_name,
sizeof(mar_cpg_name_t));
iovec[0].iov_base = (void *)&res_lib_cpg_mcast;
iovec[0].iov_len = sizeof (res_lib_cpg_mcast);
iovec[1].iov_base = (char*)message+sizeof(*req_exec_cpg_mcast);
iovec[1].iov_len = msglen;
for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; ) {
cpd = list_entry(iter, struct cpg_pd, list);
iter = iter->next;
if ((cpd->cpd_state == CPD_STATE_LEAVE_STARTED || cpd->cpd_state == CPD_STATE_JOIN_COMPLETED)
&& (mar_name_compare (&cpd->group_name, &req_exec_cpg_mcast->group_name) == 0)) {
api->ipc_dispatch_iov_send (cpd->conn, iovec, 2);
}
}
}
static int cpg_exec_send_joinlist(void)
{
int count = 0;
struct list_head *iter;
coroipc_response_header_t *res;
char *buf;
struct join_list_entry *jle;
struct iovec req_exec_cpg_iovec;
for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
struct process_info *pi = list_entry (iter, struct process_info, list);
if (pi->nodeid == api->totem_nodeid_get ()) {
count++;
}
}
/* Nothing to send */
if (!count)
return 0;
buf = alloca(sizeof(coroipc_response_header_t) + sizeof(struct join_list_entry) * count);
if (!buf) {
log_printf(LOGSYS_LEVEL_WARNING, "Unable to allocate joinlist buffer");
return -1;
}
jle = (struct join_list_entry *)(buf + sizeof(coroipc_response_header_t));
res = (coroipc_response_header_t *)buf;
for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
struct process_info *pi = list_entry (iter, struct process_info, list);
if (pi->nodeid == api->totem_nodeid_get ()) {
memcpy (&jle->group_name, &pi->group, sizeof (mar_cpg_name_t));
jle->pid = pi->pid;
jle++;
}
}
res->id = SERVICE_ID_MAKE(CPG_SERVICE, MESSAGE_REQ_EXEC_CPG_JOINLIST);
res->size = sizeof(coroipc_response_header_t)+sizeof(struct join_list_entry) * count;
req_exec_cpg_iovec.iov_base = buf;
req_exec_cpg_iovec.iov_len = res->size;
return (api->totem_mcast (&req_exec_cpg_iovec, 1, TOTEM_AGREED));
}
static int cpg_lib_init_fn (void *conn)
{
struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
memset (cpd, 0, sizeof(struct cpg_pd));
cpd->conn = conn;
list_add (&cpd->list, &cpg_pd_list_head);
api->ipc_refcnt_inc (conn);
log_printf(LOGSYS_LEVEL_DEBUG, "lib_init_fn: conn=%p, cpd=%p\n", conn, cpd);
return (0);
}
/* Join message from the library */
static void message_handler_req_lib_cpg_join (void *conn, const void *message)
{
const struct req_lib_cpg_join *req_lib_cpg_join = message;
struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
struct res_lib_cpg_join res_lib_cpg_join;
cs_error_t error = CPG_OK;
switch (cpd->cpd_state) {
case CPD_STATE_UNJOINED:
error = CPG_OK;
cpd->cpd_state = CPD_STATE_JOIN_STARTED;
cpd->pid = req_lib_cpg_join->pid;
memcpy (&cpd->group_name, &req_lib_cpg_join->group_name,
sizeof (cpd->group_name));
cpg_node_joinleave_send (req_lib_cpg_join->pid,
&req_lib_cpg_join->group_name,
MESSAGE_REQ_EXEC_CPG_PROCJOIN, CONFCHG_CPG_REASON_JOIN);
break;
case CPD_STATE_LEAVE_STARTED:
error = CPG_ERR_BUSY;
break;
case CPD_STATE_JOIN_STARTED:
error = CPG_ERR_EXIST;
break;
case CPD_STATE_JOIN_COMPLETED:
error = CPG_ERR_EXIST;
break;
}
res_lib_cpg_join.header.size = sizeof(res_lib_cpg_join);
res_lib_cpg_join.header.id = MESSAGE_RES_CPG_JOIN;
res_lib_cpg_join.header.error = error;
api->ipc_response_send (conn, &res_lib_cpg_join, sizeof(res_lib_cpg_join));
}
/* Leave message from the library */
static void message_handler_req_lib_cpg_leave (void *conn, const void *message)
{
struct res_lib_cpg_leave res_lib_cpg_leave;
cs_error_t error = CPG_OK;
struct req_lib_cpg_leave *req_lib_cpg_leave = (struct req_lib_cpg_leave *)message;
struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
log_printf(LOGSYS_LEVEL_DEBUG, "got leave request on %p\n", conn);
switch (cpd->cpd_state) {
case CPD_STATE_UNJOINED:
error = CPG_ERR_NOT_EXIST;
break;
case CPD_STATE_LEAVE_STARTED:
error = CPG_ERR_NOT_EXIST;
break;
case CPD_STATE_JOIN_STARTED:
error = CPG_ERR_BUSY;
break;
case CPD_STATE_JOIN_COMPLETED:
error = CPG_OK;
cpd->cpd_state = CPD_STATE_LEAVE_STARTED;
cpg_node_joinleave_send (req_lib_cpg_leave->pid,
&req_lib_cpg_leave->group_name,
MESSAGE_REQ_EXEC_CPG_PROCLEAVE,
CONFCHG_CPG_REASON_LEAVE);
break;
}
/* send return */
res_lib_cpg_leave.header.size = sizeof(res_lib_cpg_leave);
res_lib_cpg_leave.header.id = MESSAGE_RES_CPG_LEAVE;
res_lib_cpg_leave.header.error = error;
api->ipc_response_send(conn, &res_lib_cpg_leave, sizeof(res_lib_cpg_leave));
}
/* Mcast message from the library */
static void message_handler_req_lib_cpg_mcast (void *conn, const void *message)
{
const struct req_lib_cpg_mcast *req_lib_cpg_mcast = message;
struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
mar_cpg_name_t group_name = cpd->group_name;
struct iovec req_exec_cpg_iovec[2];
struct req_exec_cpg_mcast req_exec_cpg_mcast;
struct res_lib_cpg_mcast res_lib_cpg_mcast;
int msglen = req_lib_cpg_mcast->msglen;
int result;
cs_error_t error = CPG_ERR_NOT_EXIST;
log_printf(LOGSYS_LEVEL_DEBUG, "got mcast request on %p\n", conn);
switch (cpd->cpd_state) {
case CPD_STATE_UNJOINED:
error = CPG_ERR_NOT_EXIST;
break;
case CPD_STATE_LEAVE_STARTED:
error = CPG_ERR_NOT_EXIST;
break;
case CPD_STATE_JOIN_STARTED:
error = CPG_OK;
break;
case CPD_STATE_JOIN_COMPLETED:
error = CPG_OK;
break;
}
if (error == CPG_OK) {
req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + msglen;
req_exec_cpg_mcast.header.id = SERVICE_ID_MAKE(CPG_SERVICE,
MESSAGE_REQ_EXEC_CPG_MCAST);
req_exec_cpg_mcast.pid = cpd->pid;
req_exec_cpg_mcast.msglen = msglen;
api->ipc_source_set (&req_exec_cpg_mcast.source, conn);
memcpy(&req_exec_cpg_mcast.group_name, &group_name,
sizeof(mar_cpg_name_t));
req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast;
req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast);
req_exec_cpg_iovec[1].iov_base = (char *)&req_lib_cpg_mcast->message;
req_exec_cpg_iovec[1].iov_len = msglen;
result = api->totem_mcast (req_exec_cpg_iovec, 2, TOTEM_AGREED);
assert(result == 0);
}
res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast);
res_lib_cpg_mcast.header.id = MESSAGE_RES_CPG_MCAST;
res_lib_cpg_mcast.header.error = error;
api->ipc_response_send (conn, &res_lib_cpg_mcast,
sizeof (res_lib_cpg_mcast));
}
static void message_handler_req_lib_cpg_membership (void *conn,
const void *message)
{
struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
cs_error_t error = CPG_ERR_NOT_EXIST;
coroipc_response_header_t res;
switch (cpd->cpd_state) {
case CPD_STATE_UNJOINED:
error = CPG_ERR_NOT_EXIST;
break;
case CPD_STATE_LEAVE_STARTED:
error = CPG_ERR_NOT_EXIST;
break;
case CPD_STATE_JOIN_STARTED:
error = CPG_ERR_BUSY;
break;
case CPD_STATE_JOIN_COMPLETED:
error = CPG_OK;
break;
}
res.size = sizeof (res);
res.id = MESSAGE_RES_CPG_MEMBERSHIP;
res.error = error;
api->ipc_response_send (conn, &res, sizeof(res));
return;
if (error == CPG_OK) {
notify_lib_joinlist (&cpd->group_name, conn, 0, NULL, 0, NULL,
MESSAGE_RES_CPG_MEMBERSHIP);
}
}
static void message_handler_req_lib_cpg_local_get (void *conn,
const void *message)
{
struct res_lib_cpg_local_get res_lib_cpg_local_get;
res_lib_cpg_local_get.header.size = sizeof (res_lib_cpg_local_get);
res_lib_cpg_local_get.header.id = MESSAGE_RES_CPG_LOCAL_GET;
res_lib_cpg_local_get.header.error = CPG_OK;
res_lib_cpg_local_get.local_nodeid = api->totem_nodeid_get ();
api->ipc_response_send (conn, &res_lib_cpg_local_get,
sizeof (res_lib_cpg_local_get));
}
diff --git a/services/votequorum.c b/services/votequorum.c
index 14197ad3..83076575 100644
--- a/services/votequorum.c
+++ b/services/votequorum.c
@@ -1,1665 +1,1665 @@
/*
* Copyright (c) 2009 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Christine Caulfield (ccaulfie@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <config.h>
#include <sys/types.h>
-#ifndef COROSYNC_BSD
+#ifdef HAVE_ALLOCA_H
#include <alloca.h>
#endif
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/time.h>
#include <sys/ioctl.h>
#include <netinet/in.h>
#include <sys/uio.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <time.h>
#include <unistd.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <corosync/corotypes.h>
#include <corosync/coroipc_types.h>
#include <corosync/corodefs.h>
#include <corosync/cfg.h>
#include <corosync/list.h>
#include <corosync/lcr/lcr_comp.h>
#include <corosync/engine/logsys.h>
#include <corosync/mar_gen.h>
#include <corosync/engine/coroapi.h>
#include <corosync/engine/quorum.h>
#include <corosync/ipc_votequorum.h>
#include <corosync/list.h>
#define VOTEQUORUM_MAJOR_VERSION 7
#define VOTEQUORUM_MINOR_VERSION 0
#define VOTEQUORUM_PATCH_VERSION 0
/* Silly default to prevent accidents! */
#define DEFAULT_EXPECTED 1024
#define DEFAULT_QDEV_POLL 10000
#define DEFAULT_LEAVE_TMO 10000
LOGSYS_DECLARE_SUBSYS ("VOTEQ");
enum quorum_message_req_types {
MESSAGE_REQ_EXEC_VOTEQUORUM_NODEINFO = 0,
MESSAGE_REQ_EXEC_VOTEQUORUM_RECONFIGURE = 1,
MESSAGE_REQ_EXEC_VOTEQUORUM_KILLNODE = 2,
};
#define NODE_FLAGS_BEENDOWN 1
#define NODE_FLAGS_SEESDISALLOWED 8
#define NODE_FLAGS_HASSTATE 16
#define NODE_FLAGS_QDISK 32
#define NODE_FLAGS_REMOVED 64
#define NODE_FLAGS_US 128
#define NODEID_US 0
#define NODEID_QDEVICE -1
typedef enum { NODESTATE_JOINING=1, NODESTATE_MEMBER,
NODESTATE_DEAD, NODESTATE_LEAVING, NODESTATE_DISALLOWED } nodestate_t;
struct cluster_node {
int flags;
int node_id;
unsigned int expected_votes;
unsigned int votes;
time_t join_time;
nodestate_t state;
struct timeval last_hello; /* Only used for quorum devices */
struct list_head list;
};
static int quorum_flags;
#define VOTEQUORUM_FLAG_FEATURE_DISALLOWED 1
#define VOTEQUORUM_FLAG_FEATURE_TWONODE 1
static int quorum;
static int cluster_is_quorate;
static int first_trans = 1;
static unsigned int quorumdev_poll = DEFAULT_QDEV_POLL;
static unsigned int leaving_timeout = DEFAULT_LEAVE_TMO;
static struct cluster_node *us;
static struct cluster_node *quorum_device = NULL;
static char quorum_device_name[VOTEQUORUM_MAX_QDISK_NAME_LEN];
static corosync_timer_handle_t quorum_device_timer;
static corosync_timer_handle_t leaving_timer;
static struct list_head cluster_members_list;
static struct corosync_api_v1 *corosync_api;
static struct list_head trackers_list;
static unsigned int quorum_members[PROCESSOR_COUNT_MAX+1];
static int quorum_members_entries = 0;
static struct memb_ring_id quorum_ringid;
#define max(a,b) (((a) > (b)) ? (a) : (b))
static struct cluster_node *find_node_by_nodeid(int nodeid);
static struct cluster_node *allocate_node(int nodeid);
static const char *kill_reason(int reason);
#define list_iterate(v, head) \
for (v = (head)->next; v != head; v = v->next)
struct quorum_pd {
unsigned char track_flags;
int tracking_enabled;
uint64_t tracking_context;
struct list_head list;
void *conn;
};
/*
* Service Interfaces required by service_message_handler struct
*/
static void votequorum_init(struct corosync_api_v1 *api,
quorum_set_quorate_fn_t report);
static void quorum_confchg_fn (
enum totem_configuration_type configuration_type,
const unsigned int *member_list, size_t member_list_entries,
const unsigned int *left_list, size_t left_list_entries,
const unsigned int *joined_list, size_t joined_list_entries,
const struct memb_ring_id *ring_id);
static int votequorum_exec_init_fn (struct corosync_api_v1 *api);
static int quorum_lib_init_fn (void *conn);
static int quorum_lib_exit_fn (void *conn);
static void message_handler_req_exec_votequorum_nodeinfo (
const void *message,
unsigned int nodeid);
static void message_handler_req_exec_votequorum_reconfigure (
const void *message,
unsigned int nodeid);
static void message_handler_req_exec_votequorum_killnode (
const void *message,
unsigned int nodeid);
static void message_handler_req_lib_votequorum_getinfo (void *conn,
const void *message);
static void message_handler_req_lib_votequorum_setexpected (void *conn,
const void *message);
static void message_handler_req_lib_votequorum_setvotes (void *conn,
const void *message);
static void message_handler_req_lib_votequorum_qdisk_register (void *conn,
const void *message);
static void message_handler_req_lib_votequorum_qdisk_unregister (void *conn,
const void *message);
static void message_handler_req_lib_votequorum_qdisk_poll (void *conn,
const void *message);
static void message_handler_req_lib_votequorum_qdisk_getinfo (void *conn,
const void *message);
static void message_handler_req_lib_votequorum_setstate (void *conn,
const void *message);
static void message_handler_req_lib_votequorum_leaving (void *conn,
const void *message);
static void message_handler_req_lib_votequorum_trackstart (void *conn,
const void *msg);
static void message_handler_req_lib_votequorum_trackstop (void *conn,
const void *msg);
static int quorum_exec_send_nodeinfo(void);
static int quorum_exec_send_reconfigure(int param, int nodeid, int value);
static int quorum_exec_send_killnode(int nodeid, unsigned int reason);
static void exec_votequorum_nodeinfo_endian_convert (void *msg);
static void exec_votequorum_reconfigure_endian_convert (void *msg);
static void exec_votequorum_killnode_endian_convert (void *msg);
static void add_votequorum_config_notification(hdb_handle_t quorum_object_handle);
static void recalculate_quorum(int allow_decrease, int by_current_nodes);
/*
* Library Handler Definition
*/
static struct corosync_lib_handler quorum_lib_service[] =
{
{ /* 0 */
.lib_handler_fn = message_handler_req_lib_votequorum_getinfo,
.flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 1 */
.lib_handler_fn = message_handler_req_lib_votequorum_setexpected,
.flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 2 */
.lib_handler_fn = message_handler_req_lib_votequorum_setvotes,
.flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 3 */
.lib_handler_fn = message_handler_req_lib_votequorum_qdisk_register,
.flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 4 */
.lib_handler_fn = message_handler_req_lib_votequorum_qdisk_unregister,
.flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 5 */
.lib_handler_fn = message_handler_req_lib_votequorum_qdisk_poll,
.flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 6 */
.lib_handler_fn = message_handler_req_lib_votequorum_qdisk_getinfo,
.flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 7 */
.lib_handler_fn = message_handler_req_lib_votequorum_setstate,
.flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 8 */
.lib_handler_fn = message_handler_req_lib_votequorum_leaving,
.flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 9 */
.lib_handler_fn = message_handler_req_lib_votequorum_trackstart,
.flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED
},
{ /* 10 */
.lib_handler_fn = message_handler_req_lib_votequorum_trackstop,
.flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED
}
};
static struct corosync_exec_handler votequorum_exec_engine[] =
{
{ /* 0 */
.exec_handler_fn = message_handler_req_exec_votequorum_nodeinfo,
.exec_endian_convert_fn = exec_votequorum_nodeinfo_endian_convert
},
{ /* 1 */
.exec_handler_fn = message_handler_req_exec_votequorum_reconfigure,
.exec_endian_convert_fn = exec_votequorum_reconfigure_endian_convert
},
{ /* 2 */
.exec_handler_fn = message_handler_req_exec_votequorum_killnode,
.exec_endian_convert_fn = exec_votequorum_killnode_endian_convert
},
};
static quorum_set_quorate_fn_t set_quorum;
/*
* lcrso object definition
*/
static struct quorum_services_api_ver1 votequorum_iface_ver0 = {
.init = votequorum_init
};
static struct corosync_service_engine quorum_service_handler = {
.name = "corosync votes quorum service v0.91",
.id = VOTEQUORUM_SERVICE,
.private_data_size = sizeof (struct quorum_pd),
.allow_inquorate = CS_LIB_ALLOW_INQUORATE,
.flow_control = COROSYNC_LIB_FLOW_CONTROL_REQUIRED,
.lib_init_fn = quorum_lib_init_fn,
.lib_exit_fn = quorum_lib_exit_fn,
.lib_engine = quorum_lib_service,
.lib_engine_count = sizeof (quorum_lib_service) / sizeof (struct corosync_lib_handler),
.exec_init_fn = votequorum_exec_init_fn,
.exec_engine = votequorum_exec_engine,
.exec_engine_count = sizeof (votequorum_exec_engine) / sizeof (struct corosync_exec_handler),
.confchg_fn = quorum_confchg_fn,
};
/*
* Dynamic loader definition
*/
static struct corosync_service_engine *quorum_get_service_handler_ver0 (void);
static struct corosync_service_engine_iface_ver0 quorum_service_handler_iface = {
.corosync_get_service_engine_ver0 = quorum_get_service_handler_ver0
};
static struct lcr_iface corosync_quorum_ver0[2] = {
{
.name = "corosync_votequorum",
.version = 0,
.versions_replace = 0,
.versions_replace_count = 0,
.dependencies = 0,
.dependency_count = 0,
.constructor = NULL,
.destructor = NULL,
.interfaces = (void **)(void *)&votequorum_iface_ver0
},
{
.name = "corosync_votequorum_iface",
.version = 0,
.versions_replace = 0,
.versions_replace_count = 0,
.dependencies = 0,
.dependency_count = 0,
.constructor = NULL,
.destructor = NULL,
.interfaces = NULL
}
};
static struct lcr_comp quorum_comp_ver0 = {
.iface_count = 2,
.ifaces = corosync_quorum_ver0
};
static struct corosync_service_engine *quorum_get_service_handler_ver0 (void)
{
return (&quorum_service_handler);
}
#ifdef COROSYNC_SOLARIS
void corosync_lcr_component_register (void);
void corosync_lcr_component_register (void) {
#else
__attribute__ ((constructor)) static void corosync_lcr_component_register (void) {
#endif
lcr_interfaces_set (&corosync_quorum_ver0[0], &votequorum_iface_ver0);
lcr_interfaces_set (&corosync_quorum_ver0[1], &quorum_service_handler_iface);
lcr_component_register (&quorum_comp_ver0);
}
static void votequorum_init(struct corosync_api_v1 *api,
quorum_set_quorate_fn_t report)
{
ENTER();
set_quorum = report;
/* Load the library-servicing part of this module */
api->service_link_and_init(api, "corosync_votequorum_iface", 0);
LEAVE();
}
struct req_exec_quorum_nodeinfo {
coroipc_request_header_t header __attribute__((aligned(8)));
unsigned int first_trans;
unsigned int votes;
unsigned int expected_votes;
unsigned int major_version; /* Not backwards compatible */
unsigned int minor_version; /* Backwards compatible */
unsigned int patch_version; /* Backwards/forwards compatible */
unsigned int config_version;
unsigned int flags;
} __attribute__((packed));
/* Parameters for RECONFIG command */
#define RECONFIG_PARAM_EXPECTED_VOTES 1
#define RECONFIG_PARAM_NODE_VOTES 2
#define RECONFIG_PARAM_LEAVING 3
struct req_exec_quorum_reconfigure {
coroipc_request_header_t header __attribute__((aligned(8)));
unsigned int param;
unsigned int nodeid;
unsigned int value;
};
struct req_exec_quorum_killnode {
coroipc_request_header_t header __attribute__((aligned(8)));
unsigned int reason;
unsigned int nodeid;
};
/* These just make the access a little neater */
static inline int objdb_get_string(const struct corosync_api_v1 *corosync,
hdb_handle_t object_service_handle,
char *key, char **value)
{
int res;
*value = NULL;
if ( !(res = corosync_api->object_key_get(object_service_handle,
key,
strlen(key),
(void *)value,
NULL))) {
if (*value)
return 0;
}
return -1;
}
static inline void objdb_get_int(const struct corosync_api_v1 *corosync,
hdb_handle_t object_service_handle,
const char *key, unsigned int *intvalue,
unsigned int default_value)
{
char *value = NULL;
*intvalue = default_value;
if (!corosync_api->object_key_get(object_service_handle, key, strlen(key),
(void *)&value, NULL)) {
if (value) {
*intvalue = atoi(value);
}
}
}
static void read_quorum_config(hdb_handle_t quorum_handle)
{
unsigned int value = 0;
int cluster_members = 0;
struct list_head *tmp;
struct cluster_node *node;
log_printf(LOGSYS_LEVEL_INFO, "Reading configuration\n");
objdb_get_int(corosync_api, quorum_handle, "expected_votes", &us->expected_votes, DEFAULT_EXPECTED);
objdb_get_int(corosync_api, quorum_handle, "votes", &us->votes, 1);
objdb_get_int(corosync_api, quorum_handle, "quorumdev_poll", &quorumdev_poll, DEFAULT_QDEV_POLL);
objdb_get_int(corosync_api, quorum_handle, "leaving_timeout", &leaving_timeout, DEFAULT_LEAVE_TMO);
objdb_get_int(corosync_api, quorum_handle, "disallowed", &value, 0);
if (value)
quorum_flags |= VOTEQUORUM_FLAG_FEATURE_DISALLOWED;
else
quorum_flags &= ~VOTEQUORUM_FLAG_FEATURE_DISALLOWED;
objdb_get_int(corosync_api, quorum_handle, "two_node", &value, 0);
if (value)
quorum_flags |= VOTEQUORUM_FLAG_FEATURE_TWONODE;
else
quorum_flags &= ~VOTEQUORUM_FLAG_FEATURE_TWONODE;
/*
* two_node mode is invalid if there are more than 2 nodes in the cluster!
*/
list_iterate(tmp, &cluster_members_list) {
node = list_entry(tmp, struct cluster_node, list);
cluster_members++;
}
if (quorum_flags & VOTEQUORUM_FLAG_FEATURE_TWONODE && cluster_members > 2) {
log_printf(LOGSYS_LEVEL_WARNING, "quorum.two_node was set but there are more than 2 nodes in the cluster. It will be ignored.");
quorum_flags &= ~VOTEQUORUM_FLAG_FEATURE_TWONODE;
}
}
static int votequorum_exec_init_fn (struct corosync_api_v1 *api)
{
hdb_handle_t object_handle;
hdb_handle_t find_handle;
#ifdef COROSYNC_SOLARIS
logsys_subsys_init();
#endif
ENTER();
corosync_api = api;
list_init(&cluster_members_list);
list_init(&trackers_list);
/* Allocate a cluster_node for us */
us = allocate_node(corosync_api->totem_nodeid_get());
if (!us)
return (1);
us->flags |= NODE_FLAGS_US;
us->state = NODESTATE_MEMBER;
us->expected_votes = DEFAULT_EXPECTED;
us->votes = 1;
time(&us->join_time);
/* Get configuration variables */
corosync_api->object_find_create(OBJECT_PARENT_HANDLE, "quorum", strlen("quorum"), &find_handle);
if (corosync_api->object_find_next(find_handle, &object_handle) == 0) {
read_quorum_config(object_handle);
}
recalculate_quorum(0, 0);
/* Listen for changes */
add_votequorum_config_notification(object_handle);
corosync_api->object_find_destroy(find_handle);
LEAVE();
return (0);
}
static int quorum_lib_exit_fn (void *conn)
{
struct quorum_pd *quorum_pd = (struct quorum_pd *)corosync_api->ipc_private_data_get (conn);
ENTER();
if (quorum_pd->tracking_enabled) {
list_del (&quorum_pd->list);
list_init (&quorum_pd->list);
}
LEAVE();
return (0);
}
static int send_quorum_notification(void *conn, uint64_t context)
{
struct res_lib_votequorum_notification *res_lib_votequorum_notification;
struct list_head *tmp;
struct cluster_node *node;
int cluster_members = 0;
int i = 0;
int size;
char *buf;
ENTER();
list_iterate(tmp, &cluster_members_list) {
node = list_entry(tmp, struct cluster_node, list);
cluster_members++;
}
if (quorum_device)
cluster_members++;
size = sizeof(struct res_lib_votequorum_notification) + sizeof(struct votequorum_node) * cluster_members;
buf = alloca(size);
if (!buf) {
LEAVE();
return -1;
}
res_lib_votequorum_notification = (struct res_lib_votequorum_notification *)buf;
res_lib_votequorum_notification->quorate = cluster_is_quorate;
res_lib_votequorum_notification->node_list_entries = cluster_members;
res_lib_votequorum_notification->context = context;
list_iterate(tmp, &cluster_members_list) {
node = list_entry(tmp, struct cluster_node, list);
res_lib_votequorum_notification->node_list[i].nodeid = node->node_id;
res_lib_votequorum_notification->node_list[i++].state = node->state;
}
if (quorum_device) {
res_lib_votequorum_notification->node_list[i].nodeid = 0;
res_lib_votequorum_notification->node_list[i++].state = quorum_device->state | 0x80;
}
res_lib_votequorum_notification->header.id = MESSAGE_RES_VOTEQUORUM_NOTIFICATION;
res_lib_votequorum_notification->header.size = size;
res_lib_votequorum_notification->header.error = CS_OK;
/* Send it to all interested parties */
if (conn) {
int ret = corosync_api->ipc_dispatch_send(conn, buf, size);
LEAVE();
return ret;
}
else {
struct quorum_pd *qpd;
list_iterate(tmp, &trackers_list) {
qpd = list_entry(tmp, struct quorum_pd, list);
res_lib_votequorum_notification->context = qpd->tracking_context;
corosync_api->ipc_dispatch_send(qpd->conn, buf, size);
}
}
LEAVE();
return 0;
}
static void send_expectedvotes_notification(void)
{
struct res_lib_votequorum_expectedvotes_notification res_lib_votequorum_expectedvotes_notification;
struct quorum_pd *qpd;
struct list_head *tmp;
log_printf(LOGSYS_LEVEL_DEBUG, "Sending expected votes callback\n");
res_lib_votequorum_expectedvotes_notification.header.id = MESSAGE_RES_VOTEQUORUM_EXPECTEDVOTES_NOTIFICATION;
res_lib_votequorum_expectedvotes_notification.header.size = sizeof(res_lib_votequorum_expectedvotes_notification);
res_lib_votequorum_expectedvotes_notification.header.error = CS_OK;
res_lib_votequorum_expectedvotes_notification.expected_votes = us->expected_votes;
list_iterate(tmp, &trackers_list) {
qpd = list_entry(tmp, struct quorum_pd, list);
res_lib_votequorum_expectedvotes_notification.context = qpd->tracking_context;
corosync_api->ipc_dispatch_send(qpd->conn, &res_lib_votequorum_expectedvotes_notification,
sizeof(struct res_lib_votequorum_expectedvotes_notification));
}
}
static void set_quorate(int total_votes)
{
int quorate;
ENTER();
if (quorum > total_votes) {
quorate = 0;
}
else {
quorate = 1;
}
if (cluster_is_quorate && !quorate)
log_printf(LOGSYS_LEVEL_INFO, "quorum lost, blocking activity\n");
if (!cluster_is_quorate && quorate)
log_printf(LOGSYS_LEVEL_INFO, "quorum regained, resuming activity\n");
/* If we are newly quorate, then kill any DISALLOWED nodes */
if (!cluster_is_quorate && quorate) {
struct cluster_node *node = NULL;
struct list_head *tmp;
list_iterate(tmp, &cluster_members_list) {
node = list_entry(tmp, struct cluster_node, list);
if (node->state == NODESTATE_DISALLOWED)
quorum_exec_send_killnode(node->node_id, VOTEQUORUM_REASON_KILL_REJOIN);
}
}
cluster_is_quorate = quorate;
set_quorum(quorum_members, quorum_members_entries, quorate, &quorum_ringid);
ENTER();
}
static int calculate_quorum(int allow_decrease, int max_expected, unsigned int *ret_total_votes)
{
struct list_head *nodelist;
struct cluster_node *node;
unsigned int total_votes = 0;
unsigned int highest_expected = 0;
unsigned int newquorum, q1, q2;
unsigned int total_nodes = 0;
ENTER();
list_iterate(nodelist, &cluster_members_list) {
node = list_entry(nodelist, struct cluster_node, list);
log_printf(LOGSYS_LEVEL_DEBUG, "node %x state=%d, votes=%d, expected=%d\n",
node->node_id, node->state, node->votes, node->expected_votes);
if (node->state == NODESTATE_MEMBER) {
if (max_expected)
node->expected_votes = max_expected;
else
highest_expected = max(highest_expected, node->expected_votes);
total_votes += node->votes;
total_nodes++;
}
}
if (quorum_device && quorum_device->state == NODESTATE_MEMBER)
total_votes += quorum_device->votes;
if (max_expected > 0)
highest_expected = max_expected;
/* This quorum calculation is taken from the OpenVMS Cluster Systems
* manual, but, then, you guessed that didn't you */
q1 = (highest_expected + 2) / 2;
q2 = (total_votes + 2) / 2;
newquorum = max(q1, q2);
/* Normally quorum never decreases but the system administrator can
* force it down by setting expected votes to a maximum value */
if (!allow_decrease)
newquorum = max(quorum, newquorum);
/* The special two_node mode allows each of the two nodes to retain
* quorum if the other fails. Only one of the two should live past
* fencing (as both nodes try to fence each other in split-brain.)
* Also: if there are more than two nodes, force us inquorate to avoid
* any damage or confusion.
*/
if ((quorum_flags & VOTEQUORUM_FLAG_FEATURE_TWONODE) && total_nodes <= 2)
newquorum = 1;
if (ret_total_votes)
*ret_total_votes = total_votes;
LEAVE();
return newquorum;
}
/* Recalculate cluster quorum, set quorate and notify changes */
static void recalculate_quorum(int allow_decrease, int by_current_nodes)
{
unsigned int total_votes = 0;
int cluster_members = 0;
struct list_head *nodelist;
struct cluster_node *node;
ENTER();
list_iterate(nodelist, &cluster_members_list) {
node = list_entry(nodelist, struct cluster_node, list);
if (node->state == NODESTATE_MEMBER) {
if (by_current_nodes)
cluster_members++;
total_votes += node->votes;
}
}
/* Keep expected_votes at the highest number of votes in the cluster */
log_printf(LOGSYS_LEVEL_DEBUG, "total_votes=%d, expected_votes=%d\n", total_votes, us->expected_votes);
if (total_votes > us->expected_votes) {
us->expected_votes = total_votes;
send_expectedvotes_notification();
}
quorum = calculate_quorum(allow_decrease, cluster_members, &total_votes);
set_quorate(total_votes);
send_quorum_notification(NULL, 0L);
LEAVE();
}
static int have_disallowed(void)
{
struct cluster_node *node;
struct list_head *tmp;
list_iterate(tmp, &cluster_members_list) {
node = list_entry(tmp, struct cluster_node, list);
if (node->state == NODESTATE_DISALLOWED)
return 1;
}
return 0;
}
static void node_add_ordered(struct cluster_node *newnode)
{
struct cluster_node *node = NULL;
struct list_head *tmp;
struct list_head *newlist = &newnode->list;
list_iterate(tmp, &cluster_members_list) {
node = list_entry(tmp, struct cluster_node, list);
if (newnode->node_id < node->node_id)
break;
}
if (!node)
list_add(&newnode->list, &cluster_members_list);
else {
newlist->prev = tmp->prev;
newlist->next = tmp;
tmp->prev->next = newlist;
tmp->prev = newlist;
}
}
static struct cluster_node *allocate_node(int nodeid)
{
struct cluster_node *cl;
cl = malloc(sizeof(struct cluster_node));
if (cl) {
memset(cl, 0, sizeof(struct cluster_node));
cl->node_id = nodeid;
if (nodeid)
node_add_ordered(cl);
}
return cl;
}
static struct cluster_node *find_node_by_nodeid(int nodeid)
{
struct cluster_node *node;
struct list_head *tmp;
if (nodeid == NODEID_US)
return us;
if (nodeid == NODEID_QDEVICE)
return quorum_device;
list_iterate(tmp, &cluster_members_list) {
node = list_entry(tmp, struct cluster_node, list);
if (node->node_id == nodeid)
return node;
}
return NULL;
}
static int quorum_exec_send_nodeinfo()
{
struct req_exec_quorum_nodeinfo req_exec_quorum_nodeinfo;
struct iovec iov[1];
int ret;
ENTER();
req_exec_quorum_nodeinfo.expected_votes = us->expected_votes;
req_exec_quorum_nodeinfo.votes = us->votes;
req_exec_quorum_nodeinfo.major_version = VOTEQUORUM_MAJOR_VERSION;
req_exec_quorum_nodeinfo.minor_version = VOTEQUORUM_MINOR_VERSION;
req_exec_quorum_nodeinfo.patch_version = VOTEQUORUM_PATCH_VERSION;
req_exec_quorum_nodeinfo.flags = us->flags;
req_exec_quorum_nodeinfo.first_trans = first_trans;
if (have_disallowed())
req_exec_quorum_nodeinfo.flags |= NODE_FLAGS_SEESDISALLOWED;
req_exec_quorum_nodeinfo.header.id = SERVICE_ID_MAKE(VOTEQUORUM_SERVICE, MESSAGE_REQ_EXEC_VOTEQUORUM_NODEINFO);
req_exec_quorum_nodeinfo.header.size = sizeof(req_exec_quorum_nodeinfo);
iov[0].iov_base = (void *)&req_exec_quorum_nodeinfo;
iov[0].iov_len = sizeof(req_exec_quorum_nodeinfo);
ret = corosync_api->totem_mcast (iov, 1, TOTEM_AGREED);
LEAVE();
return ret;
}
static int quorum_exec_send_reconfigure(int param, int nodeid, int value)
{
struct req_exec_quorum_reconfigure req_exec_quorum_reconfigure;
struct iovec iov[1];
int ret;
ENTER();
req_exec_quorum_reconfigure.param = param;
req_exec_quorum_reconfigure.nodeid = nodeid;
req_exec_quorum_reconfigure.value = value;
req_exec_quorum_reconfigure.header.id = SERVICE_ID_MAKE(VOTEQUORUM_SERVICE, MESSAGE_REQ_EXEC_VOTEQUORUM_RECONFIGURE);
req_exec_quorum_reconfigure.header.size = sizeof(req_exec_quorum_reconfigure);
iov[0].iov_base = (void *)&req_exec_quorum_reconfigure;
iov[0].iov_len = sizeof(req_exec_quorum_reconfigure);
ret = corosync_api->totem_mcast (iov, 1, TOTEM_AGREED);
LEAVE();
return ret;
}
static int quorum_exec_send_killnode(int nodeid, unsigned int reason)
{
struct req_exec_quorum_killnode req_exec_quorum_killnode;
struct iovec iov[1];
int ret;
ENTER();
req_exec_quorum_killnode.nodeid = nodeid;
req_exec_quorum_killnode.reason = reason;
req_exec_quorum_killnode.header.id = SERVICE_ID_MAKE(VOTEQUORUM_SERVICE, MESSAGE_REQ_EXEC_VOTEQUORUM_KILLNODE);
req_exec_quorum_killnode.header.size = sizeof(req_exec_quorum_killnode);
iov[0].iov_base = (void *)&req_exec_quorum_killnode;
iov[0].iov_len = sizeof(req_exec_quorum_killnode);
ret = corosync_api->totem_mcast (iov, 1, TOTEM_AGREED);
LEAVE();
return ret;
}
static void quorum_confchg_fn (
enum totem_configuration_type configuration_type,
const unsigned int *member_list, size_t member_list_entries,
const unsigned int *left_list, size_t left_list_entries,
const unsigned int *joined_list, size_t joined_list_entries,
const struct memb_ring_id *ring_id)
{
int i;
int leaving = 0;
struct cluster_node *node;
ENTER();
if (member_list_entries > 1)
first_trans = 0;
if (left_list_entries) {
for (i = 0; i< left_list_entries; i++) {
node = find_node_by_nodeid(left_list[i]);
if (node) {
if (node->state == NODESTATE_LEAVING)
leaving = 1;
node->state = NODESTATE_DEAD;
node->flags |= NODE_FLAGS_BEENDOWN;
}
}
recalculate_quorum(leaving, leaving);
}
if (member_list_entries) {
memcpy(quorum_members, member_list, sizeof(unsigned int) * member_list_entries);
quorum_members_entries = member_list_entries;
if (quorum_device) {
quorum_members[quorum_members_entries++] = 0;
}
quorum_exec_send_nodeinfo();
}
memcpy(&quorum_ringid, ring_id, sizeof(*ring_id));
LEAVE();
}
static void exec_votequorum_nodeinfo_endian_convert (void *msg)
{
struct req_exec_quorum_nodeinfo *nodeinfo = msg;
nodeinfo->votes = swab32(nodeinfo->votes);
nodeinfo->expected_votes = swab32(nodeinfo->expected_votes);
nodeinfo->major_version = swab32(nodeinfo->major_version);
nodeinfo->minor_version = swab32(nodeinfo->minor_version);
nodeinfo->patch_version = swab32(nodeinfo->patch_version);
nodeinfo->config_version = swab32(nodeinfo->config_version);
nodeinfo->flags = swab32(nodeinfo->flags);
}
static void exec_votequorum_reconfigure_endian_convert (void *msg)
{
struct req_exec_quorum_reconfigure *reconfigure = msg;
reconfigure->nodeid = swab32(reconfigure->nodeid);
reconfigure->value = swab32(reconfigure->value);
}
static void exec_votequorum_killnode_endian_convert (void *msg)
{
struct req_exec_quorum_killnode *killnode = msg;
killnode->reason = swab16(killnode->reason);
killnode->nodeid = swab32(killnode->nodeid);
}
static void message_handler_req_exec_votequorum_nodeinfo (
const void *message,
unsigned int nodeid)
{
const struct req_exec_quorum_nodeinfo *req_exec_quorum_nodeinfo = message;
struct cluster_node *node;
int old_votes;
int old_expected;
nodestate_t old_state;
int new_node = 0;
ENTER();
log_printf(LOGSYS_LEVEL_DEBUG, "got nodeinfo message from cluster node %d\n", nodeid);
node = find_node_by_nodeid(nodeid);
if (!node) {
node = allocate_node(nodeid);
new_node = 1;
}
if (!node) {
corosync_api->error_memory_failure();
return;
}
/*
* If the node sending the message sees disallowed nodes and we don't, then
* we have to leave
*/
if (req_exec_quorum_nodeinfo->flags & NODE_FLAGS_SEESDISALLOWED && !have_disallowed()) {
/* Must use syslog directly here or the message will never arrive */
syslog(LOGSYS_LEVEL_CRIT, "[VOTEQ]: Joined a cluster with disallowed nodes. must die");
corosync_api->fatal_error(2, __FILE__, __LINE__);
exit(2);
}
old_votes = node->votes;
old_expected = node->expected_votes;
old_state = node->state;
/* Update node state */
node->votes = req_exec_quorum_nodeinfo->votes;
node->expected_votes = req_exec_quorum_nodeinfo->expected_votes;
node->state = NODESTATE_MEMBER;
log_printf(LOGSYS_LEVEL_DEBUG, "nodeinfo message: votes: %d, expected:%d\n", req_exec_quorum_nodeinfo->votes, req_exec_quorum_nodeinfo->expected_votes);
/* Check flags for disallowed (if enabled) */
if (quorum_flags & VOTEQUORUM_FLAG_FEATURE_DISALLOWED) {
if ((req_exec_quorum_nodeinfo->flags & NODE_FLAGS_HASSTATE && node->flags & NODE_FLAGS_BEENDOWN) ||
(req_exec_quorum_nodeinfo->flags & NODE_FLAGS_HASSTATE && req_exec_quorum_nodeinfo->first_trans && !(node->flags & NODE_FLAGS_US) && (us->flags & NODE_FLAGS_HASSTATE))) {
if (node->state != NODESTATE_DISALLOWED) {
if (cluster_is_quorate) {
log_printf(LOGSYS_LEVEL_CRIT, "Killing node %d because it has rejoined the cluster with existing state", node->node_id);
node->state = NODESTATE_DISALLOWED;
quorum_exec_send_killnode(nodeid, VOTEQUORUM_REASON_KILL_REJOIN);
}
else {
log_printf(LOGSYS_LEVEL_CRIT, "Node %d not joined to quorum because it has existing state", node->node_id);
node->state = NODESTATE_DISALLOWED;
}
}
}
}
node->flags &= ~NODE_FLAGS_BEENDOWN;
if (new_node || old_votes != node->votes || old_expected != node->expected_votes || old_state != node->state)
recalculate_quorum(0, 0);
LEAVE();
}
static void message_handler_req_exec_votequorum_killnode (
const void *message,
unsigned int nodeid)
{
const struct req_exec_quorum_killnode *req_exec_quorum_killnode = message;
if (req_exec_quorum_killnode->nodeid == corosync_api->totem_nodeid_get()) {
log_printf(LOGSYS_LEVEL_CRIT, "Killed by node %d: %s\n", nodeid, kill_reason(req_exec_quorum_killnode->reason));
corosync_api->fatal_error(1, __FILE__, __LINE__);
exit(1);
}
}
static void message_handler_req_exec_votequorum_reconfigure (
const void *message,
unsigned int nodeid)
{
const struct req_exec_quorum_reconfigure *req_exec_quorum_reconfigure = message;
struct cluster_node *node;
struct list_head *nodelist;
log_printf(LOGSYS_LEVEL_DEBUG, "got reconfigure message from cluster node %d\n", nodeid);
node = find_node_by_nodeid(req_exec_quorum_reconfigure->nodeid);
if (!node)
return;
switch(req_exec_quorum_reconfigure->param)
{
case RECONFIG_PARAM_EXPECTED_VOTES:
list_iterate(nodelist, &cluster_members_list) {
node = list_entry(nodelist, struct cluster_node, list);
if (node->state == NODESTATE_MEMBER &&
node->expected_votes > req_exec_quorum_reconfigure->value) {
node->expected_votes = req_exec_quorum_reconfigure->value;
}
}
send_expectedvotes_notification();
recalculate_quorum(1, 0); /* Allow decrease */
break;
case RECONFIG_PARAM_NODE_VOTES:
node->votes = req_exec_quorum_reconfigure->value;
recalculate_quorum(1, 0); /* Allow decrease */
break;
case RECONFIG_PARAM_LEAVING:
if (req_exec_quorum_reconfigure->value == 1 && node->state == NODESTATE_MEMBER)
node->state = NODESTATE_LEAVING;
if (req_exec_quorum_reconfigure->value == 0 && node->state == NODESTATE_LEAVING)
node->state = NODESTATE_MEMBER;
break;
}
}
static int quorum_lib_init_fn (void *conn)
{
struct quorum_pd *pd = (struct quorum_pd *)corosync_api->ipc_private_data_get (conn);
ENTER();
list_init (&pd->list);
pd->conn = conn;
LEAVE();
return (0);
}
/*
* Someone called votequorum_leave AGES ago!
* Assume they forgot to shut down the node.
*/
static void leaving_timer_fn(void *arg)
{
ENTER();
if (us->state == NODESTATE_LEAVING)
us->state = NODESTATE_MEMBER;
/* Tell everyone else we made a mistake */
quorum_exec_send_reconfigure(RECONFIG_PARAM_LEAVING, us->node_id, 0);
LEAVE();
}
/* Message from the library */
static void message_handler_req_lib_votequorum_getinfo (void *conn, const void *message)
{
const struct req_lib_votequorum_getinfo *req_lib_votequorum_getinfo = message;
struct res_lib_votequorum_getinfo res_lib_votequorum_getinfo;
struct cluster_node *node;
unsigned int highest_expected = 0;
unsigned int total_votes = 0;
cs_error_t error = CS_OK;
log_printf(LOGSYS_LEVEL_DEBUG, "got getinfo request on %p for node %d\n", conn, req_lib_votequorum_getinfo->nodeid);
node = find_node_by_nodeid(req_lib_votequorum_getinfo->nodeid);
if (node) {
struct cluster_node *iternode;
struct list_head *nodelist;
list_iterate(nodelist, &cluster_members_list) {
iternode = list_entry(nodelist, struct cluster_node, list);
if (iternode->state == NODESTATE_MEMBER) {
highest_expected =
max(highest_expected, iternode->expected_votes);
total_votes += iternode->votes;
}
}
if (quorum_device && quorum_device->state == NODESTATE_MEMBER) {
total_votes += quorum_device->votes;
}
res_lib_votequorum_getinfo.votes = us->votes;
res_lib_votequorum_getinfo.expected_votes = us->expected_votes;
res_lib_votequorum_getinfo.highest_expected = highest_expected;
res_lib_votequorum_getinfo.quorum = quorum;
res_lib_votequorum_getinfo.total_votes = total_votes;
res_lib_votequorum_getinfo.flags = 0;
res_lib_votequorum_getinfo.nodeid = node->node_id;
if (us->flags & NODE_FLAGS_HASSTATE)
res_lib_votequorum_getinfo.flags |= VOTEQUORUM_INFO_FLAG_HASSTATE;
if (quorum_flags & VOTEQUORUM_FLAG_FEATURE_TWONODE)
res_lib_votequorum_getinfo.flags |= VOTEQUORUM_INFO_FLAG_TWONODE;
if (cluster_is_quorate)
res_lib_votequorum_getinfo.flags |= VOTEQUORUM_INFO_FLAG_QUORATE;
if (us->flags & NODE_FLAGS_SEESDISALLOWED)
res_lib_votequorum_getinfo.flags |= VOTEQUORUM_INFO_FLAG_DISALLOWED;
}
else {
error = CS_ERR_NOT_EXIST;
}
res_lib_votequorum_getinfo.header.size = sizeof(res_lib_votequorum_getinfo);
res_lib_votequorum_getinfo.header.id = MESSAGE_RES_VOTEQUORUM_GETINFO;
res_lib_votequorum_getinfo.header.error = error;
corosync_api->ipc_response_send(conn, &res_lib_votequorum_getinfo, sizeof(res_lib_votequorum_getinfo));
log_printf(LOGSYS_LEVEL_DEBUG, "getinfo response error: %d\n", error);
}
/* Message from the library */
static void message_handler_req_lib_votequorum_setexpected (void *conn, const void *message)
{
const struct req_lib_votequorum_setexpected *req_lib_votequorum_setexpected = message;
struct res_lib_votequorum_status res_lib_votequorum_status;
cs_error_t error = CS_OK;
unsigned int newquorum;
unsigned int total_votes;
ENTER();
/*
* If there are disallowed nodes, then we can't allow the user
* to bypass them by fiddling with expected votes.
*/
if (quorum_flags & VOTEQUORUM_FLAG_FEATURE_DISALLOWED && have_disallowed()) {
error = CS_ERR_EXIST;
goto error_exit;
}
/* Validate new expected votes */
newquorum = calculate_quorum(1, req_lib_votequorum_setexpected->expected_votes, &total_votes);
if (newquorum < total_votes / 2
|| newquorum > total_votes) {
error = CS_ERR_INVALID_PARAM;
goto error_exit;
}
quorum_exec_send_reconfigure(RECONFIG_PARAM_EXPECTED_VOTES, us->node_id, req_lib_votequorum_setexpected->expected_votes);
/* send status */
error_exit:
res_lib_votequorum_status.header.size = sizeof(res_lib_votequorum_status);
res_lib_votequorum_status.header.id = MESSAGE_RES_VOTEQUORUM_STATUS;
res_lib_votequorum_status.header.error = error;
corosync_api->ipc_response_send(conn, &res_lib_votequorum_status, sizeof(res_lib_votequorum_status));
LEAVE();
}
/* Message from the library */
static void message_handler_req_lib_votequorum_setvotes (void *conn, const void *message)
{
const struct req_lib_votequorum_setvotes *req_lib_votequorum_setvotes = message;
struct res_lib_votequorum_status res_lib_votequorum_status;
struct cluster_node *node;
unsigned int newquorum;
unsigned int total_votes;
unsigned int saved_votes;
cs_error_t error = CS_OK;
unsigned int nodeid;
ENTER();
nodeid = req_lib_votequorum_setvotes->nodeid;
node = find_node_by_nodeid(nodeid);
if (!node) {
error = CS_ERR_NAME_NOT_FOUND;
goto error_exit;
}
/* Check votes is valid */
saved_votes = node->votes;
node->votes = req_lib_votequorum_setvotes->votes;
newquorum = calculate_quorum(1, 0, &total_votes);
if (newquorum < total_votes / 2 || newquorum > total_votes) {
node->votes = saved_votes;
error = CS_ERR_INVALID_PARAM;
goto error_exit;
}
if (!nodeid)
nodeid = corosync_api->totem_nodeid_get();
quorum_exec_send_reconfigure(RECONFIG_PARAM_NODE_VOTES, nodeid,
req_lib_votequorum_setvotes->votes);
error_exit:
/* send status */
res_lib_votequorum_status.header.size = sizeof(res_lib_votequorum_status);
res_lib_votequorum_status.header.id = MESSAGE_RES_VOTEQUORUM_STATUS;
res_lib_votequorum_status.header.error = error;
corosync_api->ipc_response_send(conn, &res_lib_votequorum_status, sizeof(res_lib_votequorum_status));
LEAVE();
}
static void message_handler_req_lib_votequorum_leaving (void *conn, const void *message)
{
struct res_lib_votequorum_status res_lib_votequorum_status;
cs_error_t error = CS_OK;
ENTER();
quorum_exec_send_reconfigure(RECONFIG_PARAM_LEAVING, us->node_id, 1);
/*
* If we don't shut down in a sensible amount of time then cancel the
* leave status.
*/
if (leaving_timeout)
corosync_api->timer_add_duration((unsigned long long)leaving_timeout*1000000, NULL,
leaving_timer_fn, &leaving_timer);
/* send status */
res_lib_votequorum_status.header.size = sizeof(res_lib_votequorum_status);
res_lib_votequorum_status.header.id = MESSAGE_RES_VOTEQUORUM_STATUS;
res_lib_votequorum_status.header.error = error;
corosync_api->ipc_response_send(conn, &res_lib_votequorum_status, sizeof(res_lib_votequorum_status));
LEAVE();
}
static void quorum_device_timer_fn(void *arg)
{
struct timeval now;
ENTER();
if (!quorum_device || quorum_device->state == NODESTATE_DEAD)
return;
gettimeofday(&now, NULL);
if (quorum_device->last_hello.tv_sec + quorumdev_poll/1000 < now.tv_sec) {
quorum_device->state = NODESTATE_DEAD;
log_printf(LOGSYS_LEVEL_INFO, "lost contact with quorum device\n");
recalculate_quorum(0, 0);
}
else {
corosync_api->timer_add_duration((unsigned long long)quorumdev_poll*1000000, quorum_device,
quorum_device_timer_fn, &quorum_device_timer);
}
LEAVE();
}
static void message_handler_req_lib_votequorum_qdisk_register (void *conn,
const void *message)
{
const struct req_lib_votequorum_qdisk_register
*req_lib_votequorum_qdisk_register = message;
struct res_lib_votequorum_status res_lib_votequorum_status;
cs_error_t error = CS_OK;
ENTER();
if (quorum_device) {
error = CS_ERR_EXIST;
}
else {
quorum_device = allocate_node(0);
quorum_device->state = NODESTATE_DEAD;
quorum_device->votes = req_lib_votequorum_qdisk_register->votes;
strcpy(quorum_device_name, req_lib_votequorum_qdisk_register->name);
list_add(&quorum_device->list, &cluster_members_list);
}
/* send status */
res_lib_votequorum_status.header.size = sizeof(res_lib_votequorum_status);
res_lib_votequorum_status.header.id = MESSAGE_RES_VOTEQUORUM_STATUS;
res_lib_votequorum_status.header.error = error;
corosync_api->ipc_response_send(conn, &res_lib_votequorum_status, sizeof(res_lib_votequorum_status));
LEAVE();
}
static void message_handler_req_lib_votequorum_qdisk_unregister (void *conn,
const void *message)
{
struct res_lib_votequorum_status res_lib_votequorum_status;
cs_error_t error = CS_OK;
ENTER();
if (quorum_device) {
struct cluster_node *node = quorum_device;
quorum_device = NULL;
list_del(&node->list);
free(node);
recalculate_quorum(0, 0);
}
else {
error = CS_ERR_NOT_EXIST;
}
/* send status */
res_lib_votequorum_status.header.size = sizeof(res_lib_votequorum_status);
res_lib_votequorum_status.header.id = MESSAGE_RES_VOTEQUORUM_STATUS;
res_lib_votequorum_status.header.error = error;
corosync_api->ipc_response_send(conn, &res_lib_votequorum_status, sizeof(res_lib_votequorum_status));
LEAVE();
}
static void message_handler_req_lib_votequorum_qdisk_poll (void *conn,
const void *message)
{
const struct req_lib_votequorum_qdisk_poll
*req_lib_votequorum_qdisk_poll = message;
struct res_lib_votequorum_status res_lib_votequorum_status;
cs_error_t error = CS_OK;
ENTER();
if (quorum_device) {
if (req_lib_votequorum_qdisk_poll->state) {
gettimeofday(&quorum_device->last_hello, NULL);
if (quorum_device->state == NODESTATE_DEAD) {
quorum_device->state = NODESTATE_MEMBER;
recalculate_quorum(0, 0);
corosync_api->timer_add_duration((unsigned long long)quorumdev_poll*1000000, quorum_device,
quorum_device_timer_fn, &quorum_device_timer);
}
}
else {
if (quorum_device->state == NODESTATE_MEMBER) {
quorum_device->state = NODESTATE_DEAD;
recalculate_quorum(0, 0);
corosync_api->timer_delete(quorum_device_timer);
}
}
}
else {
error = CS_ERR_NOT_EXIST;
}
/* send status */
res_lib_votequorum_status.header.size = sizeof(res_lib_votequorum_status);
res_lib_votequorum_status.header.id = MESSAGE_RES_VOTEQUORUM_STATUS;
res_lib_votequorum_status.header.error = error;
corosync_api->ipc_response_send(conn, &res_lib_votequorum_status, sizeof(res_lib_votequorum_status));
LEAVE();
}
static void message_handler_req_lib_votequorum_qdisk_getinfo (void *conn,
const void *message)
{
struct res_lib_votequorum_qdisk_getinfo res_lib_votequorum_qdisk_getinfo;
cs_error_t error = CS_OK;
ENTER();
if (quorum_device) {
log_printf(LOGSYS_LEVEL_DEBUG, "got qdisk_getinfo state %d\n", quorum_device->state);
res_lib_votequorum_qdisk_getinfo.votes = quorum_device->votes;
if (quorum_device->state == NODESTATE_MEMBER)
res_lib_votequorum_qdisk_getinfo.state = 1;
else
res_lib_votequorum_qdisk_getinfo.state = 0;
strcpy(res_lib_votequorum_qdisk_getinfo.name, quorum_device_name);
}
else {
error = CS_ERR_NOT_EXIST;
}
/* send status */
res_lib_votequorum_qdisk_getinfo.header.size = sizeof(res_lib_votequorum_qdisk_getinfo);
res_lib_votequorum_qdisk_getinfo.header.id = MESSAGE_RES_VOTEQUORUM_GETINFO;
res_lib_votequorum_qdisk_getinfo.header.error = error;
corosync_api->ipc_response_send(conn, &res_lib_votequorum_qdisk_getinfo, sizeof(res_lib_votequorum_qdisk_getinfo));
LEAVE();
}
static void message_handler_req_lib_votequorum_setstate (void *conn,
const void *message)
{
struct res_lib_votequorum_status res_lib_votequorum_status;
cs_error_t error = CS_OK;
ENTER();
us->flags |= NODE_FLAGS_HASSTATE;
/* send status */
res_lib_votequorum_status.header.size = sizeof(res_lib_votequorum_status);
res_lib_votequorum_status.header.id = MESSAGE_RES_VOTEQUORUM_STATUS;
res_lib_votequorum_status.header.error = error;
corosync_api->ipc_response_send(conn, &res_lib_votequorum_status, sizeof(res_lib_votequorum_status));
LEAVE();
}
static void message_handler_req_lib_votequorum_trackstart (void *conn,
const void *msg)
{
const struct req_lib_votequorum_trackstart
*req_lib_votequorum_trackstart = msg;
struct res_lib_votequorum_status res_lib_votequorum_status;
struct quorum_pd *quorum_pd = (struct quorum_pd *)corosync_api->ipc_private_data_get (conn);
ENTER();
/*
* If an immediate listing of the current cluster membership
* is requested, generate membership list
*/
if (req_lib_votequorum_trackstart->track_flags & CS_TRACK_CURRENT ||
req_lib_votequorum_trackstart->track_flags & CS_TRACK_CHANGES) {
log_printf(LOGSYS_LEVEL_DEBUG, "sending initial status to %p\n", conn);
send_quorum_notification(conn, req_lib_votequorum_trackstart->context);
}
/*
* Record requests for tracking
*/
if (req_lib_votequorum_trackstart->track_flags & CS_TRACK_CHANGES ||
req_lib_votequorum_trackstart->track_flags & CS_TRACK_CHANGES_ONLY) {
quorum_pd->track_flags = req_lib_votequorum_trackstart->track_flags;
quorum_pd->tracking_enabled = 1;
quorum_pd->tracking_context = req_lib_votequorum_trackstart->context;
list_add (&quorum_pd->list, &trackers_list);
}
/* Send status */
res_lib_votequorum_status.header.size = sizeof(res_lib_votequorum_status);
res_lib_votequorum_status.header.id = MESSAGE_RES_VOTEQUORUM_STATUS;
res_lib_votequorum_status.header.error = CS_OK;
corosync_api->ipc_response_send(conn, &res_lib_votequorum_status, sizeof(res_lib_votequorum_status));
LEAVE();
}
static void message_handler_req_lib_votequorum_trackstop (void *conn,
const void *msg)
{
struct res_lib_votequorum_status res_lib_votequorum_status;
struct quorum_pd *quorum_pd = (struct quorum_pd *)corosync_api->ipc_private_data_get (conn);
int error = CS_OK;
ENTER();
if (quorum_pd->tracking_enabled) {
error = CS_OK;
quorum_pd->tracking_enabled = 0;
list_del (&quorum_pd->list);
list_init (&quorum_pd->list);
} else {
error = CS_ERR_NOT_EXIST;
}
/* send status */
res_lib_votequorum_status.header.size = sizeof(res_lib_votequorum_status);
res_lib_votequorum_status.header.id = MESSAGE_RES_VOTEQUORUM_STATUS;
res_lib_votequorum_status.header.error = error;
corosync_api->ipc_response_send(conn, &res_lib_votequorum_status, sizeof(res_lib_votequorum_status));
LEAVE();
}
static const char *kill_reason(int reason)
{
static char msg[1024];
switch (reason)
{
case VOTEQUORUM_REASON_KILL_REJECTED:
return "our membership application was rejected";
case VOTEQUORUM_REASON_KILL_APPLICATION:
return "we were killed by an application request";
case VOTEQUORUM_REASON_KILL_REJOIN:
return "we rejoined the cluster without a full restart";
default:
sprintf(msg, "we got kill message number %d", reason);
return msg;
}
}
static void reread_config(hdb_handle_t object_handle)
{
unsigned int old_votes;
unsigned int old_expected;
old_votes = us->votes;
old_expected = us->expected_votes;
/*
* Reload the configuration
*/
read_quorum_config(object_handle);
/*
* Check for fundamental changes that we need to propogate
*/
if (old_votes != us->votes) {
quorum_exec_send_reconfigure(RECONFIG_PARAM_NODE_VOTES, us->node_id, us->votes);
}
if (old_expected != us->expected_votes) {
quorum_exec_send_reconfigure(RECONFIG_PARAM_EXPECTED_VOTES, us->node_id, us->expected_votes);
}
}
static void quorum_key_change_notify(object_change_type_t change_type,
hdb_handle_t parent_object_handle,
hdb_handle_t object_handle,
const void *object_name_pt,
size_t object_name_len,
const void *key_name_pt, size_t key_len,
const void *key_value_pt, size_t key_value_len,
void *priv_data_pt)
{
if (memcmp(object_name_pt, "quorum", object_name_len) == 0)
reread_config(object_handle);
}
/* Called when the objdb is reloaded */
static void votequorum_objdb_reload_notify(
objdb_reload_notify_type_t type, int flush,
void *priv_data_pt)
{
/*
* A new quorum {} key might exist, cancel the
* existing notification at the start of reload,
* and start a new one on the new object when
* it's all settled.
*/
if (type == OBJDB_RELOAD_NOTIFY_START) {
corosync_api->object_track_stop(
quorum_key_change_notify,
NULL,
NULL,
NULL,
NULL);
}
if (type == OBJDB_RELOAD_NOTIFY_END ||
type == OBJDB_RELOAD_NOTIFY_FAILED) {
hdb_handle_t find_handle;
hdb_handle_t object_handle;
corosync_api->object_find_create(OBJECT_PARENT_HANDLE, "quorum", strlen("quorum"), &find_handle);
if (corosync_api->object_find_next(find_handle, &object_handle) == 0) {
add_votequorum_config_notification(object_handle);
reread_config(object_handle);
}
else {
log_printf(LOGSYS_LEVEL_ERROR, "votequorum objdb tracking stopped, cannot find quorum{} handle in objdb\n");
}
}
}
static void add_votequorum_config_notification(
hdb_handle_t quorum_object_handle)
{
corosync_api->object_track_start(quorum_object_handle,
1,
quorum_key_change_notify,
NULL,
NULL,
NULL,
NULL);
/*
* Reload notify must be on the parent object
*/
corosync_api->object_track_start(OBJECT_PARENT_HANDLE,
1,
NULL,
NULL,
NULL,
votequorum_objdb_reload_notify,
NULL);
}

File Metadata

Mime Type
text/x-diff
Expires
Mon, Dec 23, 10:39 PM (1 d, 18 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1129258
Default Alt Text
(237 KB)

Event Timeline