Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F3151506
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
242 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/exec/main.c b/exec/main.c
index fc7c8c36..3dac5fb7 100644
--- a/exec/main.c
+++ b/exec/main.c
@@ -1,1261 +1,1268 @@
/*
* Copyright (c) 2002-2006 MontaVista Software, Inc.
* Copyright (c) 2006-2012 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Steven Dake (sdake@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
/**
* \mainpage Corosync
*
* This is the doxygen generated developer documentation for the Corosync
* project. For more information about Corosync, please see the project
* web site, <a href="http://www.corosync.org">corosync.org</a>.
*
* \section license License
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <config.h>
#include <pthread.h>
#include <assert.h>
#include <sys/types.h>
#include <sys/file.h>
#include <sys/poll.h>
#include <sys/uio.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/stat.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <signal.h>
#include <sched.h>
#include <time.h>
#include <semaphore.h>
#include <qb/qbdefs.h>
#include <qb/qblog.h>
#include <qb/qbloop.h>
#include <qb/qbutil.h>
#include <qb/qbipcs.h>
#include <corosync/swab.h>
#include <corosync/corotypes.h>
#include <corosync/corodefs.h>
#include <corosync/list.h>
#include <corosync/totem/totempg.h>
#include <corosync/logsys.h>
#include <corosync/icmap.h>
#include "quorum.h"
#include "totemsrp.h"
#include "logconfig.h"
#include "totemconfig.h"
#include "main.h"
#include "sync.h"
#include "timer.h"
#include "util.h"
#include "apidef.h"
#include "service.h"
#include "schedwrk.h"
#ifdef HAVE_SMALL_MEMORY_FOOTPRINT
#define IPC_LOGSYS_SIZE 1024*64
#else
#define IPC_LOGSYS_SIZE 8192*128
#endif
LOGSYS_DECLARE_SYSTEM ("corosync",
LOGSYS_MODE_OUTPUT_STDERR,
LOG_DAEMON,
LOG_INFO);
LOGSYS_DECLARE_SUBSYS ("MAIN");
#define SERVER_BACKLOG 5
static int sched_priority = 0;
static unsigned int service_count = 32;
static struct totem_logging_configuration totem_logging_configuration;
static struct corosync_api_v1 *api = NULL;
static int sync_in_process = 1;
static qb_loop_t *corosync_poll_handle;
struct sched_param global_sched_param;
static corosync_timer_handle_t corosync_stats_timer_handle;
static const char *corosync_lock_file = LOCALSTATEDIR"/run/corosync.pid";
qb_loop_t *cs_poll_handle_get (void)
{
return (corosync_poll_handle);
}
int cs_poll_dispatch_add (qb_loop_t * handle,
int fd,
int events,
void *data,
int (*dispatch_fn) (int fd,
int revents,
void *data))
{
return qb_loop_poll_add(handle, QB_LOOP_MED, fd, events, data,
dispatch_fn);
}
int cs_poll_dispatch_delete(qb_loop_t * handle, int fd)
{
return qb_loop_poll_del(handle, fd);
}
void corosync_state_dump (void)
{
int i;
for (i = 0; i < SERVICES_COUNT_MAX; i++) {
if (corosync_service[i] && corosync_service[i]->exec_dump_fn) {
corosync_service[i]->exec_dump_fn ();
}
}
}
static void corosync_blackbox_write_to_file (void)
{
char fname[PATH_MAX];
char time_str[PATH_MAX];
struct tm cur_time_tm;
time_t cur_time_t;
cur_time_t = time(NULL);
localtime_r(&cur_time_t, &cur_time_tm);
strftime(time_str, PATH_MAX, "%Y-%m-%dT%H:%M:%S", &cur_time_tm);
snprintf(fname, PATH_MAX, "%s/fdata-%s-%lld",
LOCALSTATEDIR "/lib/corosync",
time_str,
(long long int)getpid());
qb_log_blackbox_write_to_file(fname);
unlink(LOCALSTATEDIR "/lib/corosync/fdata");
symlink(fname, LOCALSTATEDIR "/lib/corosync/fdata");
}
static void unlink_all_completed (void)
{
api->timer_delete (corosync_stats_timer_handle);
qb_loop_stop (corosync_poll_handle);
icmap_fini();
}
void corosync_shutdown_request (void)
{
corosync_service_unlink_all (api, unlink_all_completed);
}
static int32_t sig_diag_handler (int num, void *data)
{
corosync_state_dump ();
return 0;
}
static int32_t sig_exit_handler (int num, void *data)
{
corosync_service_unlink_all (api, unlink_all_completed);
return 0;
}
static void sigsegv_handler (int num)
{
(void)signal (SIGSEGV, SIG_DFL);
corosync_blackbox_write_to_file ();
qb_log_fini();
raise (SIGSEGV);
}
static void sigabrt_handler (int num)
{
(void)signal (SIGABRT, SIG_DFL);
corosync_blackbox_write_to_file ();
qb_log_fini();
raise (SIGABRT);
}
#define LOCALHOST_IP inet_addr("127.0.0.1")
static void *corosync_group_handle;
static struct totempg_group corosync_group = {
.group = "a",
.group_len = 1
};
static void serialize_lock (void)
{
}
static void serialize_unlock (void)
{
}
static void corosync_sync_completed (void)
{
log_printf (LOGSYS_LEVEL_NOTICE,
"Completed service synchronization, ready to provide service.");
sync_in_process = 0;
cs_ipcs_sync_state_changed(sync_in_process);
cs_ipc_allow_connections(1);
}
static int corosync_sync_callbacks_retrieve (
int service_id,
struct sync_callbacks *callbacks)
{
if (corosync_service[service_id] == NULL) {
return (-1);
}
if (callbacks == NULL) {
return (0);
}
callbacks->name = corosync_service[service_id]->name;
callbacks->sync_init = corosync_service[service_id]->sync_init;
callbacks->sync_process = corosync_service[service_id]->sync_process;
callbacks->sync_activate = corosync_service[service_id]->sync_activate;
callbacks->sync_abort = corosync_service[service_id]->sync_abort;
return (0);
}
static struct memb_ring_id corosync_ring_id;
static void member_object_joined (unsigned int nodeid)
{
char member_ip[ICMAP_KEYNAME_MAXLEN];
char member_join_count[ICMAP_KEYNAME_MAXLEN];
char member_status[ICMAP_KEYNAME_MAXLEN];
snprintf(member_ip, ICMAP_KEYNAME_MAXLEN,
"runtime.totem.pg.mrp.srp.members.%u.ip", nodeid);
snprintf(member_join_count, ICMAP_KEYNAME_MAXLEN,
"runtime.totem.pg.mrp.srp.members.%u.join_count", nodeid);
snprintf(member_status, ICMAP_KEYNAME_MAXLEN,
"runtime.totem.pg.mrp.srp.members.%u.status", nodeid);
if (icmap_get(member_ip, NULL, NULL, NULL) == CS_OK) {
icmap_inc(member_join_count);
icmap_set_string(member_status, "joined");
} else {
icmap_set_string(member_ip, (char*)api->totem_ifaces_print (nodeid));
icmap_set_uint32(member_join_count, 1);
icmap_set_string(member_status, "joined");
}
log_printf (LOGSYS_LEVEL_DEBUG,
"Member joined: %s", api->totem_ifaces_print (nodeid));
}
static void member_object_left (unsigned int nodeid)
{
char member_status[ICMAP_KEYNAME_MAXLEN];
snprintf(member_status, ICMAP_KEYNAME_MAXLEN,
"runtime.totem.pg.mrp.srp.members.%u.status", nodeid);
icmap_set_string(member_status, "left");
log_printf (LOGSYS_LEVEL_DEBUG,
"Member left: %s", api->totem_ifaces_print (nodeid));
}
static void confchg_fn (
enum totem_configuration_type configuration_type,
const unsigned int *member_list, size_t member_list_entries,
const unsigned int *left_list, size_t left_list_entries,
const unsigned int *joined_list, size_t joined_list_entries,
const struct memb_ring_id *ring_id)
{
int i;
int abort_activate = 0;
if (sync_in_process == 1) {
abort_activate = 1;
}
sync_in_process = 1;
cs_ipcs_sync_state_changed(sync_in_process);
memcpy (&corosync_ring_id, ring_id, sizeof (struct memb_ring_id));
for (i = 0; i < left_list_entries; i++) {
member_object_left (left_list[i]);
}
for (i = 0; i < joined_list_entries; i++) {
member_object_joined (joined_list[i]);
}
/*
* Call configuration change for all services
*/
for (i = 0; i < service_count; i++) {
if (corosync_service[i] && corosync_service[i]->confchg_fn) {
corosync_service[i]->confchg_fn (configuration_type,
member_list, member_list_entries,
left_list, left_list_entries,
joined_list, joined_list_entries, ring_id);
}
}
if (abort_activate) {
sync_abort ();
}
if (configuration_type == TOTEM_CONFIGURATION_TRANSITIONAL) {
sync_save_transitional (member_list, member_list_entries, ring_id);
}
if (configuration_type == TOTEM_CONFIGURATION_REGULAR) {
sync_start (member_list, member_list_entries, ring_id);
}
}
static void priv_drop (void)
{
return; /* TODO: we are still not dropping privs */
}
static void corosync_tty_detach (void)
{
FILE *r;
/*
* Disconnect from TTY if this is not a debug run
*/
switch (fork ()) {
case -1:
corosync_exit_error (COROSYNC_DONE_FORK);
break;
case 0:
/*
* child which is disconnected, run this process
*/
break;
default:
exit (0);
break;
}
/* Create new session */
(void)setsid();
/*
* Map stdin/out/err to /dev/null.
*/
r = freopen("/dev/null", "r", stdin);
if (r == NULL) {
corosync_exit_error (COROSYNC_DONE_STD_TO_NULL_REDIR);
}
r = freopen("/dev/null", "a", stderr);
if (r == NULL) {
corosync_exit_error (COROSYNC_DONE_STD_TO_NULL_REDIR);
}
r = freopen("/dev/null", "a", stdout);
if (r == NULL) {
corosync_exit_error (COROSYNC_DONE_STD_TO_NULL_REDIR);
}
}
static void corosync_mlockall (void)
{
int res;
struct rlimit rlimit;
rlimit.rlim_cur = RLIM_INFINITY;
rlimit.rlim_max = RLIM_INFINITY;
#ifndef RLIMIT_MEMLOCK
#define RLIMIT_MEMLOCK RLIMIT_VMEM
#endif
setrlimit (RLIMIT_MEMLOCK, &rlimit);
res = mlockall (MCL_CURRENT | MCL_FUTURE);
if (res == -1) {
LOGSYS_PERROR (errno, LOGSYS_LEVEL_WARNING,
"Could not lock memory of service to avoid page faults");
};
}
static void corosync_totem_stats_updater (void *data)
{
totempg_stats_t * stats;
uint32_t total_mtt_rx_token;
uint32_t total_backlog_calc;
uint32_t total_token_holdtime;
int t, prev, i;
int32_t token_count;
char key_name[ICMAP_KEYNAME_MAXLEN];
stats = api->totem_get_stats();
icmap_set_uint32("runtime.totem.pg.msg_reserved", stats->msg_reserved);
icmap_set_uint32("runtime.totem.pg.msg_queue_avail", stats->msg_queue_avail);
icmap_set_uint64("runtime.totem.pg.mrp.srp.orf_token_tx", stats->mrp->srp->orf_token_tx);
icmap_set_uint64("runtime.totem.pg.mrp.srp.orf_token_rx", stats->mrp->srp->orf_token_rx);
icmap_set_uint64("runtime.totem.pg.mrp.srp.memb_merge_detect_tx", stats->mrp->srp->memb_merge_detect_tx);
icmap_set_uint64("runtime.totem.pg.mrp.srp.memb_merge_detect_rx", stats->mrp->srp->memb_merge_detect_rx);
icmap_set_uint64("runtime.totem.pg.mrp.srp.memb_join_tx", stats->mrp->srp->memb_join_tx);
icmap_set_uint64("runtime.totem.pg.mrp.srp.memb_join_rx", stats->mrp->srp->memb_join_rx);
icmap_set_uint64("runtime.totem.pg.mrp.srp.mcast_tx", stats->mrp->srp->mcast_tx);
icmap_set_uint64("runtime.totem.pg.mrp.srp.mcast_retx", stats->mrp->srp->mcast_retx);
icmap_set_uint64("runtime.totem.pg.mrp.srp.mcast_rx", stats->mrp->srp->mcast_rx);
icmap_set_uint64("runtime.totem.pg.mrp.srp.memb_commit_token_tx", stats->mrp->srp->memb_commit_token_tx);
icmap_set_uint64("runtime.totem.pg.mrp.srp.memb_commit_token_rx", stats->mrp->srp->memb_commit_token_rx);
icmap_set_uint64("runtime.totem.pg.mrp.srp.token_hold_cancel_tx", stats->mrp->srp->token_hold_cancel_tx);
icmap_set_uint64("runtime.totem.pg.mrp.srp.token_hold_cancel_rx", stats->mrp->srp->token_hold_cancel_rx);
icmap_set_uint64("runtime.totem.pg.mrp.srp.operational_entered", stats->mrp->srp->operational_entered);
icmap_set_uint64("runtime.totem.pg.mrp.srp.operational_token_lost", stats->mrp->srp->operational_token_lost);
icmap_set_uint64("runtime.totem.pg.mrp.srp.gather_entered", stats->mrp->srp->gather_entered);
icmap_set_uint64("runtime.totem.pg.mrp.srp.gather_token_lost", stats->mrp->srp->gather_token_lost);
icmap_set_uint64("runtime.totem.pg.mrp.srp.commit_entered", stats->mrp->srp->commit_entered);
icmap_set_uint64("runtime.totem.pg.mrp.srp.commit_token_lost", stats->mrp->srp->commit_token_lost);
icmap_set_uint64("runtime.totem.pg.mrp.srp.recovery_entered", stats->mrp->srp->recovery_entered);
icmap_set_uint64("runtime.totem.pg.mrp.srp.recovery_token_lost", stats->mrp->srp->recovery_token_lost);
icmap_set_uint64("runtime.totem.pg.mrp.srp.consensus_timeouts", stats->mrp->srp->consensus_timeouts);
icmap_set_uint64("runtime.totem.pg.mrp.srp.rx_msg_dropped", stats->mrp->srp->rx_msg_dropped);
icmap_set_uint32("runtime.totem.pg.mrp.srp.continuous_gather", stats->mrp->srp->continuous_gather);
+ icmap_set_uint32("runtime.totem.pg.mrp.srp.continuous_sendmsg_failures",
+ stats->mrp->srp->continuous_sendmsg_failures);
+
icmap_set_uint8("runtime.totem.pg.mrp.srp.firewall_enabled_or_nic_failure",
stats->mrp->srp->continuous_gather > MAX_NO_CONT_GATHER ? 1 : 0);
- if (stats->mrp->srp->continuous_gather > MAX_NO_CONT_GATHER) {
+ if (stats->mrp->srp->continuous_gather > MAX_NO_CONT_GATHER ||
+ stats->mrp->srp->continuous_sendmsg_failures > MAX_NO_CONT_SENDMSG_FAILURES) {
log_printf (LOGSYS_LEVEL_WARNING,
"Totem is unable to form a cluster because of an "
"operating system or network fault. The most common "
"cause of this message is that the local firewall is "
"configured improperly.");
+ icmap_set_uint8("runtime.totem.pg.mrp.srp.firewall_enabled_or_nic_failure", 1);
+ } else {
+ icmap_set_uint8("runtime.totem.pg.mrp.srp.firewall_enabled_or_nic_failure", 0);
}
for (i = 0; i < stats->mrp->srp->rrp->interface_count; i++) {
snprintf(key_name, ICMAP_KEYNAME_MAXLEN, "runtime.totem.pg.mrp.rrp.%u.faulty", i);
icmap_set_uint8(key_name, stats->mrp->srp->rrp->faulty[i]);
}
total_mtt_rx_token = 0;
total_token_holdtime = 0;
total_backlog_calc = 0;
token_count = 0;
t = stats->mrp->srp->latest_token;
while (1) {
if (t == 0)
prev = TOTEM_TOKEN_STATS_MAX - 1;
else
prev = t - 1;
if (prev == stats->mrp->srp->earliest_token)
break;
/* if tx == 0, then dropped token (not ours) */
if (stats->mrp->srp->token[t].tx != 0 ||
(stats->mrp->srp->token[t].rx - stats->mrp->srp->token[prev].rx) > 0 ) {
total_mtt_rx_token += (stats->mrp->srp->token[t].rx - stats->mrp->srp->token[prev].rx);
total_token_holdtime += (stats->mrp->srp->token[t].tx - stats->mrp->srp->token[t].rx);
total_backlog_calc += stats->mrp->srp->token[t].backlog_calc;
token_count++;
}
t = prev;
}
if (token_count) {
icmap_set_uint32("runtime.totem.pg.mrp.srp.mtt_rx_token", (total_mtt_rx_token / token_count));
icmap_set_uint32("runtime.totem.pg.mrp.srp.avg_token_workload", (total_token_holdtime / token_count));
icmap_set_uint32("runtime.totem.pg.mrp.srp.avg_backlog_calc", (total_backlog_calc / token_count));
}
cs_ipcs_stats_update();
api->timer_add_duration (1500 * MILLI_2_NANO_SECONDS, NULL,
corosync_totem_stats_updater,
&corosync_stats_timer_handle);
}
static void totem_dynamic_notify(
int32_t event,
const char *key_name,
struct icmap_notify_value new_val,
struct icmap_notify_value old_val,
void *user_data)
{
int res;
int ring_no;
int member_no;
struct totem_ip_address member;
int add_new_member = 0;
int remove_old_member = 0;
char tmp_str[ICMAP_KEYNAME_MAXLEN];
res = sscanf(key_name, "nodelist.node.%u.ring%u%s", &member_no, &ring_no, tmp_str);
if (res != 3)
return ;
if (strcmp(tmp_str, "_addr") != 0) {
return;
}
if (event == ICMAP_TRACK_ADD && new_val.type == ICMAP_VALUETYPE_STRING) {
add_new_member = 1;
}
if (event == ICMAP_TRACK_DELETE && old_val.type == ICMAP_VALUETYPE_STRING) {
remove_old_member = 1;
}
if (event == ICMAP_TRACK_MODIFY && new_val.type == ICMAP_VALUETYPE_STRING &&
old_val.type == ICMAP_VALUETYPE_STRING) {
add_new_member = 1;
remove_old_member = 1;
}
if (remove_old_member) {
log_printf(LOGSYS_LEVEL_DEBUG,
"removing dynamic member %s for ring %u", (char *)old_val.data, ring_no);
if (totemip_parse(&member, (char *)old_val.data, 0) == 0) {
totempg_member_remove (&member, ring_no);
}
}
if (add_new_member) {
log_printf(LOGSYS_LEVEL_DEBUG,
"adding dynamic member %s for ring %u", (char *)new_val.data, ring_no);
if (totemip_parse(&member, (char *)new_val.data, 0) == 0) {
totempg_member_add (&member, ring_no);
}
}
}
static void corosync_totem_dynamic_init (void)
{
icmap_track_t icmap_track = NULL;
icmap_track_add("nodelist.node.",
ICMAP_TRACK_ADD | ICMAP_TRACK_DELETE | ICMAP_TRACK_MODIFY | ICMAP_TRACK_PREFIX,
totem_dynamic_notify,
NULL,
&icmap_track);
}
static void corosync_totem_stats_init (void)
{
icmap_set_uint32("runtime.totem.pg.mrp.srp.mtt_rx_token", 0);
icmap_set_uint32("runtime.totem.pg.mrp.srp.avg_token_workload", 0);
icmap_set_uint32("runtime.totem.pg.mrp.srp.avg_backlog_calc", 0);
/* start stats timer */
api->timer_add_duration (1500 * MILLI_2_NANO_SECONDS, NULL,
corosync_totem_stats_updater,
&corosync_stats_timer_handle);
}
static void deliver_fn (
unsigned int nodeid,
const void *msg,
unsigned int msg_len,
int endian_conversion_required)
{
const struct qb_ipc_request_header *header;
int32_t service;
int32_t fn_id;
uint32_t id;
header = msg;
if (endian_conversion_required) {
id = swab32 (header->id);
} else {
id = header->id;
}
/*
* Call the proper executive handler
*/
service = id >> 16;
fn_id = id & 0xffff;
if (!corosync_service[service]) {
return;
}
if (fn_id >= corosync_service[service]->exec_engine_count) {
log_printf(LOGSYS_LEVEL_WARNING, "discarded unknown message %d for service %d (max id %d)",
fn_id, service, corosync_service[service]->exec_engine_count);
return;
}
icmap_fast_inc(service_stats_rx[service][fn_id]);
if (endian_conversion_required) {
assert(corosync_service[service]->exec_engine[fn_id].exec_endian_convert_fn != NULL);
corosync_service[service]->exec_engine[fn_id].exec_endian_convert_fn
((void *)msg);
}
corosync_service[service]->exec_engine[fn_id].exec_handler_fn
(msg, nodeid);
}
int main_mcast (
const struct iovec *iovec,
unsigned int iov_len,
unsigned int guarantee)
{
const struct qb_ipc_request_header *req = iovec->iov_base;
int32_t service;
int32_t fn_id;
service = req->id >> 16;
fn_id = req->id & 0xffff;
if (corosync_service[service]) {
icmap_fast_inc(service_stats_tx[service][fn_id]);
}
return (totempg_groups_mcast_joined (corosync_group_handle, iovec, iov_len, guarantee));
}
static qb_loop_timer_handle recheck_the_q_level_timer;
void corosync_recheck_the_q_level(void *data)
{
totempg_check_q_level(corosync_group_handle);
if (cs_ipcs_q_level_get() == TOTEM_Q_LEVEL_CRITICAL) {
qb_loop_timer_add(cs_poll_handle_get(), QB_LOOP_MED, 1*QB_TIME_NS_IN_MSEC,
NULL, corosync_recheck_the_q_level, &recheck_the_q_level_timer);
}
}
struct sending_allowed_private_data_struct {
int reserved_msgs;
};
int corosync_sending_allowed (
unsigned int service,
unsigned int id,
const void *msg,
void *sending_allowed_private_data)
{
struct sending_allowed_private_data_struct *pd =
(struct sending_allowed_private_data_struct *)sending_allowed_private_data;
struct iovec reserve_iovec;
struct qb_ipc_request_header *header = (struct qb_ipc_request_header *)msg;
int sending_allowed;
reserve_iovec.iov_base = (char *)header;
reserve_iovec.iov_len = header->size;
pd->reserved_msgs = totempg_groups_joined_reserve (
corosync_group_handle,
&reserve_iovec, 1);
if (pd->reserved_msgs == -1) {
return -EINVAL;
}
sending_allowed = QB_FALSE;
if (corosync_quorum_is_quorate() == 1 ||
corosync_service[service]->allow_inquorate == CS_LIB_ALLOW_INQUORATE) {
// we are quorate
// now check flow control
if (corosync_service[service]->lib_engine[id].flow_control == CS_LIB_FLOW_CONTROL_NOT_REQUIRED) {
sending_allowed = QB_TRUE;
} else if (pd->reserved_msgs && sync_in_process == 0) {
sending_allowed = QB_TRUE;
} else if (pd->reserved_msgs == 0) {
return -ENOBUFS;
} else /* (sync_in_process) */ {
return -EINPROGRESS;
}
} else {
return -EHOSTUNREACH;
}
return (sending_allowed);
}
void corosync_sending_allowed_release (void *sending_allowed_private_data)
{
struct sending_allowed_private_data_struct *pd =
(struct sending_allowed_private_data_struct *)sending_allowed_private_data;
if (pd->reserved_msgs == -1) {
return;
}
totempg_groups_joined_release (pd->reserved_msgs);
}
int message_source_is_local (const mar_message_source_t *source)
{
int ret = 0;
assert (source != NULL);
if (source->nodeid == totempg_my_nodeid_get ()) {
ret = 1;
}
return ret;
}
void message_source_set (
mar_message_source_t *source,
void *conn)
{
assert ((source != NULL) && (conn != NULL));
memset (source, 0, sizeof (mar_message_source_t));
source->nodeid = totempg_my_nodeid_get ();
source->conn = conn;
}
static void corosync_setscheduler (void)
{
#if defined(HAVE_PTHREAD_SETSCHEDPARAM) && defined(HAVE_SCHED_GET_PRIORITY_MAX) && defined(HAVE_SCHED_SETSCHEDULER)
int res;
sched_priority = sched_get_priority_max (SCHED_RR);
if (sched_priority != -1) {
global_sched_param.sched_priority = sched_priority;
res = sched_setscheduler (0, SCHED_RR, &global_sched_param);
if (res == -1) {
LOGSYS_PERROR(errno, LOGSYS_LEVEL_WARNING,
"Could not set SCHED_RR at priority %d",
global_sched_param.sched_priority);
global_sched_param.sched_priority = 0;
#ifdef HAVE_QB_LOG_THREAD_PRIORITY_SET
qb_log_thread_priority_set (SCHED_OTHER, 0);
#endif
} else {
/*
* Turn on SCHED_RR in logsys system
*/
#ifdef HAVE_QB_LOG_THREAD_PRIORITY_SET
res = qb_log_thread_priority_set (SCHED_RR, sched_priority);
#else
res = -1;
#endif
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR,
"Could not set logsys thread priority."
" Can't continue because of priority inversions.");
corosync_exit_error (COROSYNC_DONE_LOGSETUP);
}
}
} else {
LOGSYS_PERROR (errno, LOGSYS_LEVEL_WARNING,
"Could not get maximum scheduler priority");
sched_priority = 0;
}
#else
log_printf(LOGSYS_LEVEL_WARNING,
"The Platform is missing process priority setting features. Leaving at default.");
#endif
}
static void
_logsys_log_printf(int level, int subsys,
const char *function_name,
const char *file_name,
int file_line,
const char *format,
...) __attribute__((format(printf, 6, 7)));
static void
_logsys_log_printf(int level, int subsys,
const char *function_name,
const char *file_name,
int file_line,
const char *format, ...)
{
va_list ap;
va_start(ap, format);
qb_log_from_external_source_va(function_name, file_name,
format, level, file_line,
subsys, ap);
va_end(ap);
}
static void fplay_key_change_notify_fn (
int32_t event,
const char *key_name,
struct icmap_notify_value new_val,
struct icmap_notify_value old_val,
void *user_data)
{
if (strcmp(key_name, "runtime.blackbox.dump_flight_data") == 0) {
fprintf(stderr,"Writetofile\n");
corosync_blackbox_write_to_file ();
}
if (strcmp(key_name, "runtime.blackbox.dump_state") == 0) {
fprintf(stderr,"statefump\n");
corosync_state_dump ();
}
}
static void corosync_fplay_control_init (void)
{
icmap_track_t track = NULL;
icmap_set_string("runtime.blackbox.dump_flight_data", "no");
icmap_set_string("runtime.blackbox.dump_state", "no");
icmap_track_add("runtime.blackbox.dump_flight_data",
ICMAP_TRACK_ADD | ICMAP_TRACK_DELETE | ICMAP_TRACK_MODIFY,
fplay_key_change_notify_fn,
NULL, &track);
icmap_track_add("runtime.blackbox.dump_state",
ICMAP_TRACK_ADD | ICMAP_TRACK_DELETE | ICMAP_TRACK_MODIFY,
fplay_key_change_notify_fn,
NULL, &track);
}
/*
* Set RO flag for keys, which ether doesn't make sense to change by user (statistic)
* or which when changed are not reflected by runtime (totem.crypto_cipher, ...).
*
* Also some RO keys cannot be determined in this stage, so they are set later in
* other functions (like nodelist.local_node_pos, ...)
*/
static void set_icmap_ro_keys_flag (void)
{
/*
* Set RO flag for all keys of internal configuration and runtime statistics
*/
icmap_set_ro_access("internal_configuration.", CS_TRUE, CS_TRUE);
icmap_set_ro_access("runtime.connections.", CS_TRUE, CS_TRUE);
icmap_set_ro_access("runtime.totem.", CS_TRUE, CS_TRUE);
icmap_set_ro_access("runtime.services.", CS_TRUE, CS_TRUE);
/*
* Set RO flag for constrete keys of configuration which can't be changed
* during runtime
*/
icmap_set_ro_access("totem.crypto_cipher", CS_FALSE, CS_TRUE);
icmap_set_ro_access("totem.crypto_hash", CS_FALSE, CS_TRUE);
icmap_set_ro_access("totem.secauth", CS_FALSE, CS_TRUE);
icmap_set_ro_access("totem.rrp_mode", CS_FALSE, CS_TRUE);
icmap_set_ro_access("totem.netmtu", CS_FALSE, CS_TRUE);
}
static void main_service_ready (void)
{
int res;
/*
* This must occur after totempg is initialized because "this_ip" must be set
*/
res = corosync_service_defaults_link_and_init (api);
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "Could not initialize default services");
corosync_exit_error (COROSYNC_DONE_INIT_SERVICES);
}
cs_ipcs_init();
corosync_totem_stats_init ();
corosync_fplay_control_init ();
corosync_totem_dynamic_init ();
sync_init (
corosync_sync_callbacks_retrieve,
corosync_sync_completed);
}
static enum e_corosync_done corosync_flock (const char *lockfile, pid_t pid)
{
struct flock lock;
enum e_corosync_done err;
char pid_s[17];
int fd_flag;
int lf;
err = COROSYNC_DONE_EXIT;
lf = open (lockfile, O_WRONLY | O_CREAT, 0640);
if (lf == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't create lock file.");
return (COROSYNC_DONE_AQUIRE_LOCK);
}
retry_fcntl:
lock.l_type = F_WRLCK;
lock.l_start = 0;
lock.l_whence = SEEK_SET;
lock.l_len = 0;
if (fcntl (lf, F_SETLK, &lock) == -1) {
switch (errno) {
case EINTR:
goto retry_fcntl;
break;
case EAGAIN:
case EACCES:
log_printf (LOGSYS_LEVEL_ERROR, "Another Corosync instance is already running.");
err = COROSYNC_DONE_ALREADY_RUNNING;
goto error_close;
break;
default:
log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't aquire lock. Error was %s",
strerror(errno));
err = COROSYNC_DONE_AQUIRE_LOCK;
goto error_close;
break;
}
}
if (ftruncate (lf, 0) == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't truncate lock file. Error was %s",
strerror (errno));
err = COROSYNC_DONE_AQUIRE_LOCK;
goto error_close_unlink;
}
memset (pid_s, 0, sizeof (pid_s));
snprintf (pid_s, sizeof (pid_s) - 1, "%u\n", pid);
retry_write:
if (write (lf, pid_s, strlen (pid_s)) != strlen (pid_s)) {
if (errno == EINTR) {
goto retry_write;
} else {
log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't write pid to lock file. "
"Error was %s", strerror (errno));
err = COROSYNC_DONE_AQUIRE_LOCK;
goto error_close_unlink;
}
}
if ((fd_flag = fcntl (lf, F_GETFD, 0)) == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't get close-on-exec flag from lock file. "
"Error was %s", strerror (errno));
err = COROSYNC_DONE_AQUIRE_LOCK;
goto error_close_unlink;
}
fd_flag |= FD_CLOEXEC;
if (fcntl (lf, F_SETFD, fd_flag) == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't set close-on-exec flag to lock file. "
"Error was %s", strerror (errno));
err = COROSYNC_DONE_AQUIRE_LOCK;
goto error_close_unlink;
}
return (err);
error_close_unlink:
unlink (lockfile);
error_close:
close (lf);
return (err);
}
int main (int argc, char **argv, char **envp)
{
const char *error_string;
struct totem_config totem_config;
int res, ch;
int background, setprio;
struct stat stat_out;
char corosync_lib_dir[PATH_MAX];
enum e_corosync_done flock_err;
uint64_t totem_config_warnings;
/* default configuration
*/
background = 1;
setprio = 0;
while ((ch = getopt (argc, argv, "fprv")) != EOF) {
switch (ch) {
case 'f':
background = 0;
logsys_config_mode_set (NULL, LOGSYS_MODE_OUTPUT_STDERR|LOGSYS_MODE_THREADED|LOGSYS_MODE_FORK);
break;
case 'p':
break;
case 'r':
setprio = 1;
break;
case 'v':
printf ("Corosync Cluster Engine, version '%s'\n", VERSION);
printf ("Copyright (c) 2006-2009 Red Hat, Inc.\n");
return EXIT_SUCCESS;
break;
default:
fprintf(stderr, \
"usage:\n"\
" -f : Start application in foreground.\n"\
" -p : Does nothing. \n"\
" -r : Set round robin realtime scheduling \n"\
" -v : Display version and SVN revision of Corosync and exit.\n");
return EXIT_FAILURE;
}
}
/*
* Set round robin realtime scheduling with priority 99
* Lock all memory to avoid page faults which may interrupt
* application healthchecking
*/
if (setprio) {
corosync_setscheduler ();
}
corosync_mlockall ();
log_printf (LOGSYS_LEVEL_NOTICE, "Corosync Cluster Engine ('%s'): started and ready to provide service.", VERSION);
log_printf (LOGSYS_LEVEL_INFO, "Corosync built-in features:" PACKAGE_FEATURES "");
corosync_poll_handle = qb_loop_create ();
qb_loop_signal_add(corosync_poll_handle, QB_LOOP_LOW,
SIGUSR2, NULL, sig_diag_handler, NULL);
qb_loop_signal_add(corosync_poll_handle, QB_LOOP_HIGH,
SIGINT, NULL, sig_exit_handler, NULL);
qb_loop_signal_add(corosync_poll_handle, QB_LOOP_HIGH,
SIGQUIT, NULL, sig_exit_handler, NULL);
qb_loop_signal_add(corosync_poll_handle, QB_LOOP_HIGH,
SIGTERM, NULL, sig_exit_handler, NULL);
(void)signal (SIGSEGV, sigsegv_handler);
(void)signal (SIGABRT, sigabrt_handler);
#if MSG_NOSIGNAL != 0
(void)signal (SIGPIPE, SIG_IGN);
#endif
if (icmap_init() != CS_OK) {
log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't initialize configuration component.");
corosync_exit_error (COROSYNC_DONE_ICMAP);
}
set_icmap_ro_keys_flag();
/*
* Initialize the corosync_api_v1 definition
*/
api = apidef_get ();
res = coroparse_configparse(&error_string);
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string);
corosync_exit_error (COROSYNC_DONE_MAINCONFIGREAD);
}
res = corosync_log_config_read (&error_string);
if (res == -1) {
/*
* if we are here, we _must_ flush the logsys queue
* and try to inform that we couldn't read the config.
* this is a desperate attempt before certain death
* and there is no guarantee that we can print to stderr
* nor that logsys is sending the messages where we expect.
*/
log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string);
fprintf(stderr, "%s", error_string);
syslog (LOGSYS_LEVEL_ERROR, "%s", error_string);
corosync_exit_error (COROSYNC_DONE_LOGCONFIGREAD);
}
/*
* Make sure required directory is present
*/
sprintf (corosync_lib_dir, "%s/lib/corosync", LOCALSTATEDIR);
res = stat (corosync_lib_dir, &stat_out);
if ((res == -1) || (res == 0 && !S_ISDIR(stat_out.st_mode))) {
log_printf (LOGSYS_LEVEL_ERROR, "Required directory not present %s. Please create it.", corosync_lib_dir);
corosync_exit_error (COROSYNC_DONE_DIR_NOT_PRESENT);
}
res = totem_config_read (&totem_config, &error_string, &totem_config_warnings);
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string);
corosync_exit_error (COROSYNC_DONE_MAINCONFIGREAD);
}
if (totem_config_warnings & TOTEM_CONFIG_WARNING_MEMBERS_IGNORED) {
log_printf (LOGSYS_LEVEL_WARNING, "member section is used together with nodelist. Members ignored.");
}
if (totem_config_warnings & TOTEM_CONFIG_WARNING_MEMBERS_DEPRECATED) {
log_printf (LOGSYS_LEVEL_WARNING, "member section is deprecated.");
}
if (totem_config_warnings & TOTEM_CONFIG_WARNING_TOTEM_NODEID_IGNORED) {
log_printf (LOGSYS_LEVEL_WARNING, "nodeid appears both in totem section and nodelist. Nodelist one is used.");
}
if (totem_config_warnings != 0) {
log_printf (LOGSYS_LEVEL_WARNING, "Please migrate config file to nodelist.");
}
res = totem_config_keyread (&totem_config, &error_string);
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string);
corosync_exit_error (COROSYNC_DONE_MAINCONFIGREAD);
}
res = totem_config_validate (&totem_config, &error_string);
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string);
corosync_exit_error (COROSYNC_DONE_MAINCONFIGREAD);
}
totem_config.totem_logging_configuration = totem_logging_configuration;
totem_config.totem_logging_configuration.log_subsys_id = _logsys_subsys_create("TOTEM", "totem");
totem_config.totem_logging_configuration.log_level_security = LOGSYS_LEVEL_WARNING;
totem_config.totem_logging_configuration.log_level_error = LOGSYS_LEVEL_ERROR;
totem_config.totem_logging_configuration.log_level_warning = LOGSYS_LEVEL_WARNING;
totem_config.totem_logging_configuration.log_level_notice = LOGSYS_LEVEL_NOTICE;
totem_config.totem_logging_configuration.log_level_debug = LOGSYS_LEVEL_DEBUG;
totem_config.totem_logging_configuration.log_level_trace = LOGSYS_LEVEL_TRACE;
totem_config.totem_logging_configuration.log_printf = _logsys_log_printf;
logsys_config_apply();
/*
* Now we are fully initialized.
*/
if (background) {
corosync_tty_detach ();
}
if (logsys_thread_start() != 0) {
log_printf (LOGSYS_LEVEL_ERROR, "Can't initialize log thread");
corosync_exit_error (COROSYNC_DONE_LOGCONFIGREAD);
}
if ((flock_err = corosync_flock (corosync_lock_file, getpid ())) != COROSYNC_DONE_EXIT) {
corosync_exit_error (flock_err);
}
/*
* if totempg_initialize doesn't have root priveleges, it cannot
* bind to a specific interface. This only matters if
* there is more then one interface in a system, so
* in this case, only a warning is printed
*/
/*
* Join multicast group and setup delivery
* and configuration change functions
*/
totempg_initialize (
corosync_poll_handle,
&totem_config);
totempg_service_ready_register (
main_service_ready);
totempg_groups_initialize (
&corosync_group_handle,
deliver_fn,
confchg_fn);
totempg_groups_join (
corosync_group_handle,
&corosync_group,
1);
/*
* Drop root privleges to user 'corosync'
* TODO: Don't really need full root capabilities;
* needed capabilities are:
* CAP_NET_RAW (bindtodevice)
* CAP_SYS_NICE (setscheduler)
* CAP_IPC_LOCK (mlockall)
*/
priv_drop ();
schedwrk_init (
serialize_lock,
serialize_unlock);
/*
* Start main processing loop
*/
qb_loop_run (corosync_poll_handle);
/*
* Exit was requested
*/
totempg_finalize ();
/*
* free the loop resources
*/
qb_loop_destroy (corosync_poll_handle);
/*
* free up the icmap
*/
/*
* Remove pid lock file
*/
unlink (corosync_lock_file);
corosync_exit_error (COROSYNC_DONE_EXIT);
return EXIT_SUCCESS;
}
diff --git a/exec/totemiba.c b/exec/totemiba.c
index a419d1a5..55f79247 100644
--- a/exec/totemiba.c
+++ b/exec/totemiba.c
@@ -1,1564 +1,1567 @@
/*
* Copyright (c) 2009-2012 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Steven Dake (sdake@redhat.com)
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <config.h>
#include <assert.h>
#include <pthread.h>
#include <sys/mman.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <netdb.h>
#include <sys/un.h>
#include <sys/ioctl.h>
#include <sys/param.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <sched.h>
#include <time.h>
#include <sys/time.h>
#include <sys/poll.h>
#include <limits.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <rdma/rdma_cma.h>
#include <assert.h>
#include <errno.h>
#include <corosync/sq.h>
#include <corosync/list.h>
#include <corosync/hdb.h>
#include <corosync/swab.h>
#include <qb/qbdefs.h>
#include <qb/qbloop.h>
#define LOGSYS_UTILS_ONLY 1
#include <corosync/logsys.h>
#include "totemiba.h"
#define COMPLETION_QUEUE_ENTRIES 100
#define TOTAL_READ_POSTS 100
#define MAX_MTU_SIZE 4096
struct totemiba_instance {
struct sockaddr bind_addr;
struct sockaddr send_token_bind_addr;
struct sockaddr mcast_addr;
struct sockaddr token_addr;
struct sockaddr local_mcast_bind_addr;
struct totem_interface *totem_interface;
struct totem_config *totem_config;
+ totemsrp_stats_t *stats;
+
void (*totemiba_iface_change_fn) (
void *context,
const struct totem_ip_address *iface_address);
void (*totemiba_deliver_fn) (
void *context,
const void *msg,
unsigned int msg_len);
void (*totemiba_target_set_completed) (
void *context);
void *rrp_context;
qb_loop_timer_handle timer_netif_check_timeout;
qb_loop_t *totemiba_poll_handle;
struct totem_ip_address my_id;
struct rdma_event_channel *mcast_channel;
struct rdma_cm_id *mcast_cma_id;
struct ibv_pd *mcast_pd;
struct sockaddr mcast_dest_addr;
uint32_t mcast_qpn;
uint32_t mcast_qkey;
struct ibv_ah *mcast_ah;
struct ibv_comp_channel *mcast_send_completion_channel;
struct ibv_comp_channel *mcast_recv_completion_channel;
struct ibv_cq *mcast_send_cq;
struct ibv_cq *mcast_recv_cq;
int recv_token_accepted;
struct rdma_event_channel *recv_token_channel;
struct rdma_event_channel *listen_recv_token_channel;
struct rdma_cm_id *listen_recv_token_cma_id;
struct rdma_cm_id *recv_token_cma_id;
struct ibv_pd *recv_token_pd;
struct sockaddr recv_token_dest_addr;
struct ibv_comp_channel *recv_token_send_completion_channel;
struct ibv_comp_channel *recv_token_recv_completion_channel;
struct ibv_cq *recv_token_send_cq;
struct ibv_cq *recv_token_recv_cq;
int send_token_bound;
struct rdma_event_channel *send_token_channel;
struct rdma_cm_id *send_token_cma_id;
struct ibv_pd *send_token_pd;
struct sockaddr send_token_dest_addr;
uint32_t send_token_qpn;
uint32_t send_token_qkey;
struct ibv_ah *send_token_ah;
struct ibv_comp_channel *send_token_send_completion_channel;
struct ibv_comp_channel *send_token_recv_completion_channel;
struct ibv_cq *send_token_send_cq;
struct ibv_cq *send_token_recv_cq;
void (*totemiba_log_printf) (
int level,
int subsys,
const char *function,
const char *file,
int line,
const char *format,
...)__attribute__((format(printf, 6, 7)));
int totemiba_subsys_id;
struct list_head mcast_send_buf_free;
struct list_head token_send_buf_free;
struct list_head mcast_send_buf_head;
struct list_head token_send_buf_head;
struct list_head recv_token_recv_buf_head;
};
union u {
uint64_t wr_id;
void *v;
};
#define log_printf(level, format, args...) \
do { \
instance->totemiba_log_printf ( \
level, \
instance->totemiba_subsys_id, \
__FUNCTION__, __FILE__, __LINE__, \
(const char *)format, ##args); \
} while (0);
struct recv_buf {
struct list_head list_all;
struct ibv_recv_wr recv_wr;
struct ibv_sge sge;
struct ibv_mr *mr;
char buffer[MAX_MTU_SIZE];
};
struct send_buf {
struct list_head list_free;
struct list_head list_all;
struct ibv_mr *mr;
char buffer[MAX_MTU_SIZE];
};
static hdb_handle_t
void2wrid (void *v) { union u u; u.v = v; return u.wr_id; }
static void *
wrid2void (uint64_t wr_id) { union u u; u.wr_id = wr_id; return u.v; }
static void totemiba_instance_initialize (struct totemiba_instance *instance)
{
memset (instance, 0, sizeof (struct totemiba_instance));
list_init (&instance->mcast_send_buf_free);
list_init (&instance->token_send_buf_free);
list_init (&instance->mcast_send_buf_head);
list_init (&instance->token_send_buf_head);
list_init (&instance->recv_token_recv_buf_head);
}
static inline struct send_buf *mcast_send_buf_get (
struct totemiba_instance *instance)
{
struct send_buf *send_buf;
if (list_empty (&instance->mcast_send_buf_free) == 0) {
send_buf = list_entry (instance->mcast_send_buf_free.next, struct send_buf, list_free);
list_del (&send_buf->list_free);
return (send_buf);
}
send_buf = malloc (sizeof (struct send_buf));
if (send_buf == NULL) {
return (NULL);
}
send_buf->mr = ibv_reg_mr (instance->mcast_pd,
send_buf->buffer,
2048, IBV_ACCESS_LOCAL_WRITE);
if (send_buf->mr == NULL) {
log_printf (LOGSYS_LEVEL_ERROR, "couldn't register memory range");
free (send_buf);
return (NULL);
}
list_init (&send_buf->list_all);
list_add_tail (&send_buf->list_all, &instance->mcast_send_buf_head);
return (send_buf);
}
static inline void mcast_send_buf_put (
struct totemiba_instance *instance,
struct send_buf *send_buf)
{
list_init (&send_buf->list_free);
list_add_tail (&send_buf->list_free, &instance->mcast_send_buf_free);
}
static inline struct send_buf *token_send_buf_get (
struct totemiba_instance *instance)
{
struct send_buf *send_buf;
if (list_empty (&instance->token_send_buf_free) == 0) {
send_buf = list_entry (instance->token_send_buf_free.next, struct send_buf, list_free);
list_del (&send_buf->list_free);
return (send_buf);
}
send_buf = malloc (sizeof (struct send_buf));
if (send_buf == NULL) {
return (NULL);
}
send_buf->mr = ibv_reg_mr (instance->send_token_pd,
send_buf->buffer,
2048, IBV_ACCESS_LOCAL_WRITE);
if (send_buf->mr == NULL) {
log_printf (LOGSYS_LEVEL_ERROR, "couldn't register memory range");
free (send_buf);
return (NULL);
}
list_init (&send_buf->list_all);
list_add_tail (&send_buf->list_all, &instance->token_send_buf_head);
return (send_buf);
}
static inline void token_send_buf_destroy (struct totemiba_instance *instance)
{
struct list_head *list;
struct send_buf *send_buf;
for (list = instance->token_send_buf_head.next; list != &instance->token_send_buf_head;) {
send_buf = list_entry (list, struct send_buf, list_all);
list = list->next;
ibv_dereg_mr (send_buf->mr);
free (send_buf);
}
list_init (&instance->token_send_buf_free);
list_init (&instance->token_send_buf_head);
}
static inline void token_send_buf_put (
struct totemiba_instance *instance,
struct send_buf *send_buf)
{
list_init (&send_buf->list_free);
list_add_tail (&send_buf->list_free, &instance->token_send_buf_free);
}
static inline struct recv_buf *recv_token_recv_buf_create (
struct totemiba_instance *instance)
{
struct recv_buf *recv_buf;
recv_buf = malloc (sizeof (struct recv_buf));
if (recv_buf == NULL) {
return (NULL);
}
recv_buf->mr = ibv_reg_mr (instance->recv_token_pd, &recv_buf->buffer,
2048,
IBV_ACCESS_LOCAL_WRITE);
recv_buf->recv_wr.next = NULL;
recv_buf->recv_wr.sg_list = &recv_buf->sge;
recv_buf->recv_wr.num_sge = 1;
recv_buf->recv_wr.wr_id = (uintptr_t)recv_buf;
recv_buf->sge.length = 2048;
recv_buf->sge.lkey = recv_buf->mr->lkey;
recv_buf->sge.addr = (uintptr_t)recv_buf->buffer;
list_init (&recv_buf->list_all);
list_add (&recv_buf->list_all, &instance->recv_token_recv_buf_head);
return (recv_buf);
}
static inline int recv_token_recv_buf_post (struct totemiba_instance *instance, struct recv_buf *recv_buf)
{
struct ibv_recv_wr *fail_recv;
int res;
res = ibv_post_recv (instance->recv_token_cma_id->qp, &recv_buf->recv_wr, &fail_recv);
return (res);
}
static inline void recv_token_recv_buf_post_initial (struct totemiba_instance *instance)
{
struct recv_buf *recv_buf;
unsigned int i;
for (i = 0; i < TOTAL_READ_POSTS; i++) {
recv_buf = recv_token_recv_buf_create (instance);
recv_token_recv_buf_post (instance, recv_buf);
}
}
static inline void recv_token_recv_buf_post_destroy (
struct totemiba_instance *instance)
{
struct recv_buf *recv_buf;
struct list_head *list;
for (list = instance->recv_token_recv_buf_head.next;
list != &instance->recv_token_recv_buf_head;) {
recv_buf = list_entry (list, struct recv_buf, list_all);
list = list->next;
ibv_dereg_mr (recv_buf->mr);
free (recv_buf);
}
list_init (&instance->recv_token_recv_buf_head);
}
static inline struct recv_buf *mcast_recv_buf_create (struct totemiba_instance *instance)
{
struct recv_buf *recv_buf;
struct ibv_mr *mr;
recv_buf = malloc (sizeof (struct recv_buf));
if (recv_buf == NULL) {
return (NULL);
}
mr = ibv_reg_mr (instance->mcast_pd, &recv_buf->buffer,
2048,
IBV_ACCESS_LOCAL_WRITE);
recv_buf->recv_wr.next = NULL;
recv_buf->recv_wr.sg_list = &recv_buf->sge;
recv_buf->recv_wr.num_sge = 1;
recv_buf->recv_wr.wr_id = (uintptr_t)recv_buf;
recv_buf->sge.length = 2048;
recv_buf->sge.lkey = mr->lkey;
recv_buf->sge.addr = (uintptr_t)recv_buf->buffer;
return (recv_buf);
}
static inline int mcast_recv_buf_post (struct totemiba_instance *instance, struct recv_buf *recv_buf)
{
struct ibv_recv_wr *fail_recv;
int res;
res = ibv_post_recv (instance->mcast_cma_id->qp, &recv_buf->recv_wr, &fail_recv);
return (res);
}
static inline void mcast_recv_buf_post_initial (struct totemiba_instance *instance)
{
struct recv_buf *recv_buf;
unsigned int i;
for (i = 0; i < TOTAL_READ_POSTS; i++) {
recv_buf = mcast_recv_buf_create (instance);
mcast_recv_buf_post (instance, recv_buf);
}
}
static inline void iba_deliver_fn (struct totemiba_instance *instance, uint64_t wr_id, uint32_t bytes)
{
const char *addr;
const struct recv_buf *recv_buf;
recv_buf = wrid2void(wr_id);
addr = &recv_buf->buffer[sizeof (struct ibv_grh)];
instance->totemiba_deliver_fn (instance->rrp_context, addr, bytes);
}
static int mcast_cq_send_event_fn (int events, int suck, void *context)
{
struct totemiba_instance *instance = (struct totemiba_instance *)context;
struct ibv_wc wc[32];
struct ibv_cq *ev_cq;
void *ev_ctx;
int res;
int i;
ibv_get_cq_event (instance->mcast_send_completion_channel, &ev_cq, &ev_ctx);
ibv_ack_cq_events (ev_cq, 1);
res = ibv_req_notify_cq (ev_cq, 0);
res = ibv_poll_cq (instance->mcast_send_cq, 32, wc);
if (res > 0) {
for (i = 0; i < res; i++) {
mcast_send_buf_put (instance, wrid2void(wc[i].wr_id));
}
}
return (0);
}
static int mcast_cq_recv_event_fn (int events, int suck, void *context)
{
struct totemiba_instance *instance = (struct totemiba_instance *)context;
struct ibv_wc wc[64];
struct ibv_cq *ev_cq;
void *ev_ctx;
int res;
int i;
ibv_get_cq_event (instance->mcast_recv_completion_channel, &ev_cq, &ev_ctx);
ibv_ack_cq_events (ev_cq, 1);
res = ibv_req_notify_cq (ev_cq, 0);
res = ibv_poll_cq (instance->mcast_recv_cq, 64, wc);
if (res > 0) {
for (i = 0; i < res; i++) {
iba_deliver_fn (instance, wc[i].wr_id, wc[i].byte_len);
mcast_recv_buf_post (instance, wrid2void(wc[i].wr_id));
}
}
return (0);
}
static int mcast_rdma_event_fn (int events, int suck, void *context)
{
struct totemiba_instance *instance = (struct totemiba_instance *)context;
struct rdma_cm_event *event;
int res;
res = rdma_get_cm_event (instance->mcast_channel, &event);
if (res != 0) {
return (0);
}
switch (event->event) {
/*
* occurs when we resolve the multicast address
*/
case RDMA_CM_EVENT_ADDR_RESOLVED:
rdma_join_multicast (instance->mcast_cma_id, &instance->mcast_addr, instance);
break;
/*
* occurs when the CM joins the multicast group
*/
case RDMA_CM_EVENT_MULTICAST_JOIN:
instance->mcast_qpn = event->param.ud.qp_num;
instance->mcast_qkey = event->param.ud.qkey;
instance->mcast_ah = ibv_create_ah (instance->mcast_pd, &event->param.ud.ah_attr);
instance->totemiba_iface_change_fn (instance->rrp_context, &instance->my_id);
break;
case RDMA_CM_EVENT_ADDR_ERROR:
case RDMA_CM_EVENT_ROUTE_ERROR:
case RDMA_CM_EVENT_MULTICAST_ERROR:
log_printf (LOGSYS_LEVEL_ERROR, "multicast error");
break;
case RDMA_CM_EVENT_DEVICE_REMOVAL:
break;
default:
log_printf (LOGSYS_LEVEL_ERROR, "default %d", event->event);
break;
}
rdma_ack_cm_event (event);
return (0);
}
static int recv_token_cq_send_event_fn (
int fd,
int revents,
void *context)
{
struct totemiba_instance *instance = (struct totemiba_instance *)context;
struct ibv_wc wc[32];
struct ibv_cq *ev_cq;
void *ev_ctx;
int res;
int i;
ibv_get_cq_event (instance->recv_token_send_completion_channel, &ev_cq, &ev_ctx);
ibv_ack_cq_events (ev_cq, 1);
res = ibv_req_notify_cq (ev_cq, 0);
res = ibv_poll_cq (instance->recv_token_send_cq, 32, wc);
if (res > 0) {
for (i = 0; i < res; i++) {
iba_deliver_fn (instance, wc[i].wr_id, wc[i].byte_len);
ibv_dereg_mr (wrid2void(wc[i].wr_id));
}
}
return (0);
}
static int recv_token_cq_recv_event_fn (int events, int suck, void *context)
{
struct totemiba_instance *instance = (struct totemiba_instance *)context;
struct ibv_wc wc[32];
struct ibv_cq *ev_cq;
void *ev_ctx;
int res;
int i;
ibv_get_cq_event (instance->recv_token_recv_completion_channel, &ev_cq, &ev_ctx);
ibv_ack_cq_events (ev_cq, 1);
res = ibv_req_notify_cq (ev_cq, 0);
res = ibv_poll_cq (instance->recv_token_recv_cq, 32, wc);
if (res > 0) {
for (i = 0; i < res; i++) {
iba_deliver_fn (instance, wc[i].wr_id, wc[i].byte_len);
recv_token_recv_buf_post (instance, wrid2void(wc[i].wr_id));
}
}
return (0);
}
static int recv_token_accept_destroy (struct totemiba_instance *instance)
{
if (instance->recv_token_accepted == 0) {
return (0);
}
rdma_destroy_qp (instance->recv_token_cma_id);
recv_token_recv_buf_post_destroy (instance);
ibv_destroy_cq (instance->recv_token_send_cq);
ibv_destroy_cq (instance->recv_token_recv_cq);
ibv_destroy_comp_channel (instance->recv_token_send_completion_channel);
ibv_destroy_comp_channel (instance->recv_token_recv_completion_channel);
ibv_dealloc_pd (instance->recv_token_pd);
rdma_destroy_id (instance->recv_token_cma_id);
qb_loop_poll_del (
instance->totemiba_poll_handle,
instance->recv_token_recv_completion_channel->fd);
qb_loop_poll_del (
instance->totemiba_poll_handle,
instance->recv_token_send_completion_channel->fd);
return (0);
}
static int recv_token_accept_setup (struct totemiba_instance *instance)
{
struct ibv_qp_init_attr init_qp_attr;
int res = 0;
/*
* Allocate the protection domain
*/
instance->recv_token_pd = ibv_alloc_pd (instance->recv_token_cma_id->verbs);
/*
* Create a completion channel
*/
instance->recv_token_recv_completion_channel = ibv_create_comp_channel (instance->recv_token_cma_id->verbs);
if (instance->recv_token_recv_completion_channel == NULL) {
log_printf (LOGSYS_LEVEL_ERROR, "couldn't create completion channel");
return (-1);
}
/*
* Create the completion queue
*/
instance->recv_token_recv_cq = ibv_create_cq (instance->recv_token_cma_id->verbs,
COMPLETION_QUEUE_ENTRIES, instance,
instance->recv_token_recv_completion_channel, 0);
if (instance->recv_token_recv_cq == NULL) {
log_printf (LOGSYS_LEVEL_ERROR, "couldn't create completion queue");
return (-1);
}
res = ibv_req_notify_cq (instance->recv_token_recv_cq, 0);
if (res != 0) {
log_printf (LOGSYS_LEVEL_ERROR, "couldn't request notifications of the completion queue");
return (-1);
}
/*
* Create a completion channel
*/
instance->recv_token_send_completion_channel = ibv_create_comp_channel (instance->recv_token_cma_id->verbs);
if (instance->recv_token_send_completion_channel == NULL) {
log_printf (LOGSYS_LEVEL_ERROR, "couldn't create completion channel");
return (-1);
}
/*
* Create the completion queue
*/
instance->recv_token_send_cq = ibv_create_cq (instance->recv_token_cma_id->verbs,
COMPLETION_QUEUE_ENTRIES, instance,
instance->recv_token_send_completion_channel, 0);
if (instance->recv_token_send_cq == NULL) {
log_printf (LOGSYS_LEVEL_ERROR, "couldn't create completion queue");
return (-1);
}
res = ibv_req_notify_cq (instance->recv_token_send_cq, 0);
if (res != 0) {
log_printf (LOGSYS_LEVEL_ERROR, "couldn't request notifications of the completion queue");
return (-1);
}
memset (&init_qp_attr, 0, sizeof (struct ibv_qp_init_attr));
init_qp_attr.cap.max_send_wr = 50;
init_qp_attr.cap.max_recv_wr = TOTAL_READ_POSTS;
init_qp_attr.cap.max_send_sge = 1;
init_qp_attr.cap.max_recv_sge = 1;
init_qp_attr.qp_context = instance;
init_qp_attr.sq_sig_all = 0;
init_qp_attr.qp_type = IBV_QPT_UD;
init_qp_attr.send_cq = instance->recv_token_send_cq;
init_qp_attr.recv_cq = instance->recv_token_recv_cq;
res = rdma_create_qp (instance->recv_token_cma_id, instance->recv_token_pd,
&init_qp_attr);
if (res != 0) {
log_printf (LOGSYS_LEVEL_ERROR, "couldn't create queue pair");
return (-1);
}
recv_token_recv_buf_post_initial (instance);
qb_loop_poll_add (
instance->totemiba_poll_handle,
QB_LOOP_MED,
instance->recv_token_recv_completion_channel->fd,
POLLIN, instance, recv_token_cq_recv_event_fn);
qb_loop_poll_add (
instance->totemiba_poll_handle,
QB_LOOP_MED,
instance->recv_token_send_completion_channel->fd,
POLLIN, instance, recv_token_cq_send_event_fn);
instance->recv_token_accepted = 1;
return (res);
};
static int recv_token_rdma_event_fn (int events, int suck, void *context)
{
struct totemiba_instance *instance = (struct totemiba_instance *)context;
struct rdma_cm_event *event;
struct rdma_conn_param conn_param;
int res;
res = rdma_get_cm_event (instance->listen_recv_token_channel, &event);
if (res != 0) {
return (0);
}
switch (event->event) {
case RDMA_CM_EVENT_CONNECT_REQUEST:
recv_token_accept_destroy (instance);
instance->recv_token_cma_id = event->id;
recv_token_accept_setup (instance);
memset (&conn_param, 0, sizeof (struct rdma_conn_param));
conn_param.qp_num = instance->recv_token_cma_id->qp->qp_num;
res = rdma_accept (instance->recv_token_cma_id, &conn_param);
break;
default:
log_printf (LOGSYS_LEVEL_ERROR, "default %d", event->event);
break;
}
res = rdma_ack_cm_event (event);
return (0);
}
static int send_token_cq_send_event_fn (int events, int suck, void *context)
{
struct totemiba_instance *instance = (struct totemiba_instance *)context;
struct ibv_wc wc[32];
struct ibv_cq *ev_cq;
void *ev_ctx;
int res;
int i;
ibv_get_cq_event (instance->send_token_send_completion_channel, &ev_cq, &ev_ctx);
ibv_ack_cq_events (ev_cq, 1);
res = ibv_req_notify_cq (ev_cq, 0);
res = ibv_poll_cq (instance->send_token_send_cq, 32, wc);
if (res > 0) {
for (i = 0; i < res; i++) {
token_send_buf_put (instance, wrid2void(wc[i].wr_id));
}
}
return (0);
}
static int send_token_cq_recv_event_fn (int events, int suck, void *context)
{
struct totemiba_instance *instance = (struct totemiba_instance *)context;
struct ibv_wc wc[32];
struct ibv_cq *ev_cq;
void *ev_ctx;
int res;
int i;
ibv_get_cq_event (instance->send_token_recv_completion_channel, &ev_cq, &ev_ctx);
ibv_ack_cq_events (ev_cq, 1);
res = ibv_req_notify_cq (ev_cq, 0);
res = ibv_poll_cq (instance->send_token_recv_cq, 32, wc);
if (res > 0) {
for (i = 0; i < res; i++) {
iba_deliver_fn (instance, wc[i].wr_id, wc[i].byte_len);
}
}
return (0);
}
static int send_token_rdma_event_fn (int events, int suck, void *context)
{
struct totemiba_instance *instance = (struct totemiba_instance *)context;
struct rdma_cm_event *event;
struct rdma_conn_param conn_param;
int res;
res = rdma_get_cm_event (instance->send_token_channel, &event);
if (res != 0) {
return (0);
}
switch (event->event) {
/*
* occurs when we resolve the multicast address
*/
case RDMA_CM_EVENT_ADDR_RESOLVED:
res = rdma_resolve_route (instance->send_token_cma_id, 2000);
break;
/*
* occurs when the CM joins the multicast group
*/
case RDMA_CM_EVENT_ROUTE_RESOLVED:
memset (&conn_param, 0, sizeof (struct rdma_conn_param));
conn_param.private_data = NULL;
conn_param.private_data_len = 0;
res = rdma_connect (instance->send_token_cma_id, &conn_param);
break;
case RDMA_CM_EVENT_ESTABLISHED:
instance->send_token_qpn = event->param.ud.qp_num;
instance->send_token_qkey = event->param.ud.qkey;
instance->send_token_ah = ibv_create_ah (instance->send_token_pd, &event->param.ud.ah_attr);
instance->totemiba_target_set_completed (instance->rrp_context);
break;
case RDMA_CM_EVENT_ADDR_ERROR:
case RDMA_CM_EVENT_ROUTE_ERROR:
case RDMA_CM_EVENT_MULTICAST_ERROR:
log_printf (LOGSYS_LEVEL_ERROR,
"send_token_rdma_event_fn multicast error");
break;
case RDMA_CM_EVENT_DEVICE_REMOVAL:
break;
case RDMA_CM_EVENT_UNREACHABLE:
log_printf (LOGSYS_LEVEL_ERROR,
"send_token_rdma_event_fn unreachable");
break;
default:
log_printf (LOGSYS_LEVEL_ERROR,
"send_token_rdma_event_fn unknown event %d",
event->event);
break;
}
rdma_ack_cm_event (event);
return (0);
}
static int send_token_bind (struct totemiba_instance *instance)
{
int res;
struct ibv_qp_init_attr init_qp_attr;
instance->send_token_channel = rdma_create_event_channel();
if (instance->send_token_channel == NULL) {
log_printf (LOGSYS_LEVEL_ERROR, "couldn't create rdma channel");
return (-1);
}
res = rdma_create_id (instance->send_token_channel,
&instance->send_token_cma_id, NULL, RDMA_PS_UDP);
if (res) {
log_printf (LOGSYS_LEVEL_ERROR, "error creating send_token_cma_id");
return (-1);
}
res = rdma_bind_addr (instance->send_token_cma_id,
&instance->send_token_bind_addr);
if (res) {
log_printf (LOGSYS_LEVEL_ERROR, "error doing rdma_bind_addr for send token");
return (-1);
}
/*
* Resolve the send_token address into a GUID
*/
res = rdma_resolve_addr (instance->send_token_cma_id,
&instance->bind_addr, &instance->token_addr, 2000);
if (res) {
log_printf (LOGSYS_LEVEL_ERROR, "error resolving send token address %d %d", res, errno);
return (-1);
}
/*
* Allocate the protection domain
*/
instance->send_token_pd = ibv_alloc_pd (instance->send_token_cma_id->verbs);
/*
* Create a completion channel
*/
instance->send_token_recv_completion_channel = ibv_create_comp_channel (instance->send_token_cma_id->verbs);
if (instance->send_token_recv_completion_channel == NULL) {
log_printf (LOGSYS_LEVEL_ERROR, "couldn't create completion channel");
return (-1);
}
/*
* Create the completion queue
*/
instance->send_token_recv_cq = ibv_create_cq (instance->send_token_cma_id->verbs,
COMPLETION_QUEUE_ENTRIES, instance,
instance->send_token_recv_completion_channel, 0);
if (instance->send_token_recv_cq == NULL) {
log_printf (LOGSYS_LEVEL_ERROR, "couldn't create completion queue");
return (-1);
}
res = ibv_req_notify_cq (instance->send_token_recv_cq, 0);
if (res != 0) {
log_printf (LOGSYS_LEVEL_ERROR,
"couldn't request notifications of the completion queue");
return (-1);
}
/*
* Create a completion channel
*/
instance->send_token_send_completion_channel =
ibv_create_comp_channel (instance->send_token_cma_id->verbs);
if (instance->send_token_send_completion_channel == NULL) {
log_printf (LOGSYS_LEVEL_ERROR, "couldn't create completion channel");
return (-1);
}
/*
* Create the completion queue
*/
instance->send_token_send_cq = ibv_create_cq (
instance->send_token_cma_id->verbs,
COMPLETION_QUEUE_ENTRIES, instance,
instance->send_token_send_completion_channel, 0);
if (instance->send_token_send_cq == NULL) {
log_printf (LOGSYS_LEVEL_ERROR, "couldn't create completion queue");
return (-1);
}
res = ibv_req_notify_cq (instance->send_token_send_cq, 0);
if (res != 0) {
log_printf (LOGSYS_LEVEL_ERROR,
"couldn't request notifications of the completion queue");
return (-1);
}
memset (&init_qp_attr, 0, sizeof (struct ibv_qp_init_attr));
init_qp_attr.cap.max_send_wr = 50;
init_qp_attr.cap.max_recv_wr = TOTAL_READ_POSTS;
init_qp_attr.cap.max_send_sge = 1;
init_qp_attr.cap.max_recv_sge = 1;
init_qp_attr.qp_context = instance;
init_qp_attr.sq_sig_all = 0;
init_qp_attr.qp_type = IBV_QPT_UD;
init_qp_attr.send_cq = instance->send_token_send_cq;
init_qp_attr.recv_cq = instance->send_token_recv_cq;
res = rdma_create_qp (instance->send_token_cma_id,
instance->send_token_pd, &init_qp_attr);
if (res != 0) {
log_printf (LOGSYS_LEVEL_ERROR, "couldn't create queue pair");
return (-1);
}
qb_loop_poll_add (
instance->totemiba_poll_handle,
QB_LOOP_MED,
instance->send_token_recv_completion_channel->fd,
POLLIN, instance, send_token_cq_recv_event_fn);
qb_loop_poll_add (
instance->totemiba_poll_handle,
QB_LOOP_MED,
instance->send_token_send_completion_channel->fd,
POLLIN, instance, send_token_cq_send_event_fn);
qb_loop_poll_add (
instance->totemiba_poll_handle,
QB_LOOP_MED,
instance->send_token_channel->fd,
POLLIN, instance, send_token_rdma_event_fn);
instance->send_token_bound = 1;
return (0);
}
static int send_token_unbind (struct totemiba_instance *instance)
{
if (instance->send_token_bound == 0) {
return (0);
}
qb_loop_poll_del (
instance->totemiba_poll_handle,
instance->send_token_recv_completion_channel->fd);
qb_loop_poll_del (
instance->totemiba_poll_handle,
instance->send_token_send_completion_channel->fd);
qb_loop_poll_del (
instance->totemiba_poll_handle,
instance->send_token_channel->fd);
rdma_destroy_qp (instance->send_token_cma_id);
ibv_destroy_cq (instance->send_token_send_cq);
ibv_destroy_cq (instance->send_token_recv_cq);
ibv_destroy_comp_channel (instance->send_token_send_completion_channel);
ibv_destroy_comp_channel (instance->send_token_recv_completion_channel);
token_send_buf_destroy (instance);
ibv_dealloc_pd (instance->send_token_pd);
rdma_destroy_id (instance->send_token_cma_id);
rdma_destroy_event_channel (instance->send_token_channel);
return (0);
}
static int recv_token_bind (struct totemiba_instance *instance)
{
int res;
instance->listen_recv_token_channel = rdma_create_event_channel();
if (instance->listen_recv_token_channel == NULL) {
log_printf (LOGSYS_LEVEL_ERROR, "couldn't create rdma channel");
return (-1);
}
res = rdma_create_id (instance->listen_recv_token_channel,
&instance->listen_recv_token_cma_id, NULL, RDMA_PS_UDP);
if (res) {
log_printf (LOGSYS_LEVEL_ERROR, "error creating recv_token_cma_id");
return (-1);
}
res = rdma_bind_addr (instance->listen_recv_token_cma_id,
&instance->bind_addr);
if (res) {
log_printf (LOGSYS_LEVEL_ERROR, "error doing rdma_bind_addr for recv token");
return (-1);
}
/*
* Resolve the recv_token address into a GUID
*/
res = rdma_listen (instance->listen_recv_token_cma_id, 10);
if (res) {
log_printf (LOGSYS_LEVEL_ERROR, "error listening %d %d", res, errno);
return (-1);
}
qb_loop_poll_add (
instance->totemiba_poll_handle,
QB_LOOP_MED,
instance->listen_recv_token_channel->fd,
POLLIN, instance, recv_token_rdma_event_fn);
return (0);
}
static int mcast_bind (struct totemiba_instance *instance)
{
int res;
struct ibv_qp_init_attr init_qp_attr;
instance->mcast_channel = rdma_create_event_channel();
if (instance->mcast_channel == NULL) {
log_printf (LOGSYS_LEVEL_ERROR, "couldn't create rdma channel");
return (-1);
}
res = rdma_create_id (instance->mcast_channel, &instance->mcast_cma_id, NULL, RDMA_PS_UDP);
if (res) {
log_printf (LOGSYS_LEVEL_ERROR, "error creating mcast_cma_id");
return (-1);
}
res = rdma_bind_addr (instance->mcast_cma_id, &instance->local_mcast_bind_addr);
if (res) {
log_printf (LOGSYS_LEVEL_ERROR, "error doing rdma_bind_addr for mcast");
return (-1);
}
/*
* Resolve the multicast address into a GUID
*/
res = rdma_resolve_addr (instance->mcast_cma_id, &instance->local_mcast_bind_addr,
&instance->mcast_addr, 5000);
if (res) {
log_printf (LOGSYS_LEVEL_ERROR, "error resolving multicast address %d %d", res, errno);
return (-1);
}
/*
* Allocate the protection domain
*/
instance->mcast_pd = ibv_alloc_pd (instance->mcast_cma_id->verbs);
/*
* Create a completion channel
*/
instance->mcast_recv_completion_channel = ibv_create_comp_channel (instance->mcast_cma_id->verbs);
if (instance->mcast_recv_completion_channel == NULL) {
log_printf (LOGSYS_LEVEL_ERROR, "couldn't create completion channel");
return (-1);
}
/*
* Create the completion queue
*/
instance->mcast_recv_cq = ibv_create_cq (instance->mcast_cma_id->verbs,
COMPLETION_QUEUE_ENTRIES, instance,
instance->mcast_recv_completion_channel, 0);
if (instance->mcast_recv_cq == NULL) {
log_printf (LOGSYS_LEVEL_ERROR, "couldn't create completion queue");
return (-1);
}
res = ibv_req_notify_cq (instance->mcast_recv_cq, 0);
if (res != 0) {
log_printf (LOGSYS_LEVEL_ERROR, "couldn't request notifications of the completion queue");
return (-1);
}
/*
* Create a completion channel
*/
instance->mcast_send_completion_channel = ibv_create_comp_channel (instance->mcast_cma_id->verbs);
if (instance->mcast_send_completion_channel == NULL) {
log_printf (LOGSYS_LEVEL_ERROR, "couldn't create completion channel");
return (-1);
}
/*
* Create the completion queue
*/
instance->mcast_send_cq = ibv_create_cq (instance->mcast_cma_id->verbs,
COMPLETION_QUEUE_ENTRIES, instance,
instance->mcast_send_completion_channel, 0);
if (instance->mcast_send_cq == NULL) {
log_printf (LOGSYS_LEVEL_ERROR, "couldn't create completion queue");
return (-1);
}
res = ibv_req_notify_cq (instance->mcast_send_cq, 0);
if (res != 0) {
log_printf (LOGSYS_LEVEL_ERROR, "couldn't request notifications of the completion queue");
return (-1);
}
memset (&init_qp_attr, 0, sizeof (struct ibv_qp_init_attr));
init_qp_attr.cap.max_send_wr = 50;
init_qp_attr.cap.max_recv_wr = TOTAL_READ_POSTS;
init_qp_attr.cap.max_send_sge = 1;
init_qp_attr.cap.max_recv_sge = 1;
init_qp_attr.qp_context = instance;
init_qp_attr.sq_sig_all = 0;
init_qp_attr.qp_type = IBV_QPT_UD;
init_qp_attr.send_cq = instance->mcast_send_cq;
init_qp_attr.recv_cq = instance->mcast_recv_cq;
res = rdma_create_qp (instance->mcast_cma_id, instance->mcast_pd,
&init_qp_attr);
if (res != 0) {
log_printf (LOGSYS_LEVEL_ERROR, "couldn't create queue pair");
return (-1);
}
mcast_recv_buf_post_initial (instance);
qb_loop_poll_add (
instance->totemiba_poll_handle,
QB_LOOP_MED,
instance->mcast_recv_completion_channel->fd,
POLLIN, instance, mcast_cq_recv_event_fn);
qb_loop_poll_add (
instance->totemiba_poll_handle,
QB_LOOP_MED,
instance->mcast_send_completion_channel->fd,
POLLIN, instance, mcast_cq_send_event_fn);
qb_loop_poll_add (
instance->totemiba_poll_handle,
QB_LOOP_MED,
instance->mcast_channel->fd,
POLLIN, instance, mcast_rdma_event_fn);
return (0);
}
static void timer_function_netif_check_timeout (
void *data)
{
struct totemiba_instance *instance = (struct totemiba_instance *)data;
int res;
int interface_up;
int interface_num;
int addr_len;
totemip_iface_check (&instance->totem_interface->bindnet,
&instance->totem_interface->boundto, &interface_up, &interface_num, instance->totem_config->clear_node_high_bit);
totemip_totemip_to_sockaddr_convert(&instance->totem_interface->boundto,
instance->totem_interface->ip_port, (struct sockaddr_storage *)&instance->bind_addr,
&addr_len);
totemip_totemip_to_sockaddr_convert(&instance->totem_interface->boundto,
0, (struct sockaddr_storage *)&instance->send_token_bind_addr,
&addr_len);
totemip_totemip_to_sockaddr_convert(&instance->totem_interface->boundto,
0, (struct sockaddr_storage *)&instance->local_mcast_bind_addr,
&addr_len);
totemip_totemip_to_sockaddr_convert(&instance->totem_interface->boundto,
instance->totem_interface->ip_port, (struct sockaddr_storage *)&instance->my_id,
&addr_len);
totemip_sockaddr_to_totemip_convert(
(const struct sockaddr_storage *)&instance->bind_addr,
&instance->my_id);
memcpy (&instance->my_id, &instance->totem_interface->boundto,
sizeof (struct totem_ip_address));
totemip_totemip_to_sockaddr_convert(&instance->totem_interface->mcast_addr,
instance->totem_interface->ip_port,
(struct sockaddr_storage *)&instance->mcast_addr, &addr_len);
res = recv_token_bind (instance);
res = mcast_bind (instance);
}
int totemiba_crypto_set (
void *iba_context,
const char *cipher_type,
const char *hash_type)
{
struct totemiba_instance *instance = (struct totemiba_instance *)iba_context;
int res = 0;
instance = NULL;
return (res);
}
int totemiba_finalize (
void *iba_context)
{
struct totemiba_instance *instance = (struct totemiba_instance *)iba_context;
int res = 0;
instance = NULL;
return (res);
}
/*
* Create an instance
*/
int totemiba_initialize (
qb_loop_t *qb_poll_handle,
void **iba_context,
struct totem_config *totem_config,
int interface_no,
void *context,
void (*deliver_fn) (
void *context,
const void *msg,
unsigned int msg_len),
void (*iface_change_fn) (
void *context,
const struct totem_ip_address *iface_address),
void (*target_set_completed) (
void *context))
{
struct totemiba_instance *instance;
int res = 0;
instance = malloc (sizeof (struct totemiba_instance));
if (instance == NULL) {
return (-1);
}
totemiba_instance_initialize (instance);
instance->totem_interface = &totem_config->interfaces[interface_no];
instance->totemiba_poll_handle = qb_poll_handle;
instance->totem_interface->bindnet.nodeid = totem_config->node_id;
instance->totemiba_deliver_fn = deliver_fn;
instance->totemiba_target_set_completed = target_set_completed;
instance->totemiba_iface_change_fn = iface_change_fn;
instance->totem_config = totem_config;
+ instance->stats = stats;
instance->rrp_context = context;
qb_loop_timer_add (instance->totemiba_poll_handle,
QB_LOOP_MED,
100*QB_TIME_NS_IN_MSEC,
(void *)instance,
timer_function_netif_check_timeout,
&instance->timer_netif_check_timeout);
instance->totemiba_subsys_id = totem_config->totem_logging_configuration.log_subsys_id;
instance->totemiba_log_printf = totem_config->totem_logging_configuration.log_printf;
*iba_context = instance;
return (res);
}
void *totemiba_buffer_alloc (void)
{
return malloc (MAX_MTU_SIZE);
}
void totemiba_buffer_release (void *ptr)
{
return free (ptr);
}
int totemiba_processor_count_set (
void *iba_context,
int processor_count)
{
struct totemiba_instance *instance = (struct totemiba_instance *)iba_context;
int res = 0;
instance = NULL;
return (res);
}
int totemiba_recv_flush (void *iba_context)
{
struct totemiba_instance *instance = (struct totemiba_instance *)iba_context;
int res = 0;
instance = NULL;
return (res);
}
int totemiba_send_flush (void *iba_context)
{
struct totemiba_instance *instance = (struct totemiba_instance *)iba_context;
int res = 0;
instance = NULL;
return (res);
}
int totemiba_token_send (
void *iba_context,
const void *ms,
unsigned int msg_len)
{
struct totemiba_instance *instance = (struct totemiba_instance *)iba_context;
int res = 0;
struct ibv_send_wr send_wr, *failed_send_wr;
struct ibv_sge sge;
void *msg;
struct send_buf *send_buf;
send_buf = token_send_buf_get (instance);
if (send_buf == NULL) {
return (-1);
}
msg = send_buf->buffer;
memcpy (msg, ms, msg_len);
send_wr.next = NULL;
send_wr.sg_list = &sge;
send_wr.num_sge = 1;
send_wr.opcode = IBV_WR_SEND;
send_wr.send_flags = IBV_SEND_SIGNALED;
send_wr.wr_id = void2wrid(send_buf);
send_wr.imm_data = 0;
send_wr.wr.ud.ah = instance->send_token_ah;
send_wr.wr.ud.remote_qpn = instance->send_token_qpn;
send_wr.wr.ud.remote_qkey = instance->send_token_qkey;
sge.length = msg_len;
sge.lkey = send_buf->mr->lkey;
sge.addr = (uintptr_t)msg;
res = ibv_post_send (instance->send_token_cma_id->qp, &send_wr, &failed_send_wr);
return (res);
}
int totemiba_mcast_flush_send (
void *iba_context,
const void *ms,
unsigned int msg_len)
{
struct totemiba_instance *instance = (struct totemiba_instance *)iba_context;
int res = 0;
struct ibv_send_wr send_wr, *failed_send_wr;
struct ibv_sge sge;
void *msg;
struct send_buf *send_buf;
send_buf = mcast_send_buf_get (instance);
if (send_buf == NULL) {
return (-1);
}
msg = send_buf->buffer;
memcpy (msg, ms, msg_len);
send_wr.next = NULL;
send_wr.sg_list = &sge;
send_wr.num_sge = 1;
send_wr.opcode = IBV_WR_SEND;
send_wr.send_flags = IBV_SEND_SIGNALED;
send_wr.wr_id = void2wrid(send_buf);
send_wr.imm_data = 0;
send_wr.wr.ud.ah = instance->mcast_ah;
send_wr.wr.ud.remote_qpn = instance->mcast_qpn;
send_wr.wr.ud.remote_qkey = instance->mcast_qkey;
sge.length = msg_len;
sge.lkey = send_buf->mr->lkey;
sge.addr = (uintptr_t)msg;
res = ibv_post_send (instance->mcast_cma_id->qp, &send_wr, &failed_send_wr);
return (res);
}
int totemiba_mcast_noflush_send (
void *iba_context,
const void *ms,
unsigned int msg_len)
{
struct totemiba_instance *instance = (struct totemiba_instance *)iba_context;
int res = 0;
struct ibv_send_wr send_wr, *failed_send_wr;
struct ibv_sge sge;
void *msg;
struct send_buf *send_buf;
send_buf = mcast_send_buf_get (instance);
if (send_buf == NULL) {
return (-1);
}
msg = send_buf->buffer;
memcpy (msg, ms, msg_len);
send_wr.next = NULL;
send_wr.sg_list = &sge;
send_wr.num_sge = 1;
send_wr.opcode = IBV_WR_SEND;
send_wr.send_flags = IBV_SEND_SIGNALED;
send_wr.wr_id = void2wrid(send_buf);
send_wr.imm_data = 0;
send_wr.wr.ud.ah = instance->mcast_ah;
send_wr.wr.ud.remote_qpn = instance->mcast_qpn;
send_wr.wr.ud.remote_qkey = instance->mcast_qkey;
sge.length = msg_len;
sge.lkey = send_buf->mr->lkey;
sge.addr = (uintptr_t)msg;
res = ibv_post_send (instance->mcast_cma_id->qp, &send_wr, &failed_send_wr);
return (res);
}
extern int totemiba_iface_check (void *iba_context)
{
struct totemiba_instance *instance = (struct totemiba_instance *)iba_context;
int res = 0;
instance = NULL;
return (res);
}
extern void totemiba_net_mtu_adjust (void *iba_context, struct totem_config *totem_config)
{
struct totemiba_instance *instance = (struct totemiba_instance *)iba_context;
instance = NULL;
}
const char *totemiba_iface_print (void *iba_context) {
struct totemiba_instance *instance = (struct totemiba_instance *)iba_context;
const char *ret_char;
ret_char = totemip_print (&instance->my_id);
return (ret_char);
}
int totemiba_iface_get (
void *iba_context,
struct totem_ip_address *addr)
{
struct totemiba_instance *instance = (struct totemiba_instance *)iba_context;
int res = 0;
memcpy (addr, &instance->my_id, sizeof (struct totem_ip_address));
return (res);
}
int totemiba_token_target_set (
void *iba_context,
const struct totem_ip_address *token_target)
{
struct totemiba_instance *instance = (struct totemiba_instance *)iba_context;
int res = 0;
int addr_len = 16;
totemip_totemip_to_sockaddr_convert((struct totem_ip_address *)token_target,
instance->totem_interface->ip_port, (struct sockaddr_storage *)&instance->token_addr,
&addr_len);
res = send_token_unbind (instance);
res = send_token_bind (instance);
return (res);
}
extern int totemiba_recv_mcast_empty (
void *iba_context)
{
struct totemiba_instance *instance = (struct totemiba_instance *)iba_context;
int res = 0;
instance = NULL;
return (res);
}
diff --git a/exec/totemiba.h b/exec/totemiba.h
index de19756e..7e7a689a 100644
--- a/exec/totemiba.h
+++ b/exec/totemiba.h
@@ -1,117 +1,118 @@
/*
* Copyright (c) 2009-2011 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Steven Dake (sdake@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef TOTEMIBA_H_DEFINED
#define TOTEMIBA_H_DEFINED
#include <sys/types.h>
#include <sys/socket.h>
#include <corosync/hdb.h>
#include <qb/qbloop.h>
#include <corosync/totem/totem.h>
/**
* Create an instance
*/
extern int totemiba_initialize (
qb_loop_t* qb_poll_handle,
void **iba_handle,
struct totem_config *totem_config,
+ totemsrp_stats_t *stats,
int interface_no,
void *context,
void (*deliver_fn) (
void *context,
const void *msg,
unsigned int msg_len),
void (*iface_change_fn) (
void *context,
const struct totem_ip_address *iface_address),
void (*target_set_completed) (
void *context));
extern void *totemiba_buffer_alloc (void);
extern void totemiba_buffer_release (void *ptr);
extern int totemiba_processor_count_set (
void *iba_context,
int processor_count);
extern int totemiba_token_send (
void *iba_context,
const void *msg,
unsigned int msg_len);
extern int totemiba_mcast_flush_send (
void *iba_context,
const void *msg,
unsigned int msg_len);
extern int totemiba_mcast_noflush_send (
void *iba_context,
const void *msg,
unsigned int msg_len);
extern int totemiba_recv_flush (void *iba_context);
extern int totemiba_send_flush (void *iba_context);
extern int totemiba_iface_check (void *iba_context);
extern int totemiba_finalize (void *iba_context);
extern void totemiba_net_mtu_adjust (void *iba_context, struct totem_config *totem_config);
extern const char *totemiba_iface_print (void *iba_context);
extern int totemiba_iface_get (
void *iba_context,
struct totem_ip_address *addr);
extern int totemiba_token_target_set (
void *iba_context,
const struct totem_ip_address *token_target);
extern int totemiba_crypto_set (
void *iba_context,
const char *cipher_type,
const char *hash_type);
extern int totemiba_recv_mcast_empty (
void *iba_context);
#endif /* TOTEMIBA_H_DEFINED */
diff --git a/exec/totemnet.c b/exec/totemnet.c
index fd7c76e9..2571d92a 100644
--- a/exec/totemnet.c
+++ b/exec/totemnet.c
@@ -1,492 +1,494 @@
/*
* Copyright (c) 2005 MontaVista Software, Inc.
* Copyright (c) 2006-2012 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Steven Dake (sdake@redhat.com)
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <config.h>
#include <assert.h>
#ifdef HAVE_RDMA
#include <totemiba.h>
#endif
#include <totemudp.h>
#include <totemudpu.h>
#include <totemnet.h>
#include <qb/qbloop.h>
#define LOGSYS_UTILS_ONLY 1
#include <corosync/logsys.h>
struct transport {
const char *name;
int (*initialize) (
qb_loop_t *loop_pt,
void **transport_instance,
struct totem_config *totem_config,
+ totemsrp_stats_t *stats,
int interface_no,
void *context,
void (*deliver_fn) (
void *context,
const void *msg,
unsigned int msg_len),
void (*iface_change_fn) (
void *context,
const struct totem_ip_address *iface_address),
void (*target_set_completed) (
void *context));
void *(*buffer_alloc) (void);
void (*buffer_release) (void *ptr);
int (*processor_count_set) (
void *transport_context,
int processor_count);
int (*token_send) (
void *transport_context,
const void *msg,
unsigned int msg_len);
int (*mcast_flush_send) (
void *transport_context,
const void *msg,
unsigned int msg_len);
int (*mcast_noflush_send) (
void *transport_context,
const void *msg,
unsigned int msg_len);
int (*recv_flush) (void *transport_context);
int (*send_flush) (void *transport_context);
int (*iface_check) (void *transport_context);
int (*finalize) (void *transport_context);
void (*net_mtu_adjust) (void *transport_context, struct totem_config *totem_config);
const char *(*iface_print) (void *transport_context);
int (*iface_get) (
void *transport_context,
struct totem_ip_address *addr);
int (*token_target_set) (
void *transport_context,
const struct totem_ip_address *token_target);
int (*crypto_set) (
void *transport_context,
const char *cipher_type,
const char *hash_type);
int (*recv_mcast_empty) (
void *transport_context);
int (*member_add) (
void *transport_context,
const struct totem_ip_address *member);
int (*member_remove) (
void *transport_context,
const struct totem_ip_address *member);
};
struct transport transport_entries[] = {
{
.name = "UDP/IP Multicast",
.initialize = totemudp_initialize,
.buffer_alloc = totemudp_buffer_alloc,
.buffer_release = totemudp_buffer_release,
.processor_count_set = totemudp_processor_count_set,
.token_send = totemudp_token_send,
.mcast_flush_send = totemudp_mcast_flush_send,
.mcast_noflush_send = totemudp_mcast_noflush_send,
.recv_flush = totemudp_recv_flush,
.send_flush = totemudp_send_flush,
.iface_check = totemudp_iface_check,
.finalize = totemudp_finalize,
.net_mtu_adjust = totemudp_net_mtu_adjust,
.iface_print = totemudp_iface_print,
.iface_get = totemudp_iface_get,
.token_target_set = totemudp_token_target_set,
.crypto_set = totemudp_crypto_set,
.recv_mcast_empty = totemudp_recv_mcast_empty
},
{
.name = "UDP/IP Unicast",
.initialize = totemudpu_initialize,
.buffer_alloc = totemudpu_buffer_alloc,
.buffer_release = totemudpu_buffer_release,
.processor_count_set = totemudpu_processor_count_set,
.token_send = totemudpu_token_send,
.mcast_flush_send = totemudpu_mcast_flush_send,
.mcast_noflush_send = totemudpu_mcast_noflush_send,
.recv_flush = totemudpu_recv_flush,
.send_flush = totemudpu_send_flush,
.iface_check = totemudpu_iface_check,
.finalize = totemudpu_finalize,
.net_mtu_adjust = totemudpu_net_mtu_adjust,
.iface_print = totemudpu_iface_print,
.iface_get = totemudpu_iface_get,
.token_target_set = totemudpu_token_target_set,
.crypto_set = totemudpu_crypto_set,
.recv_mcast_empty = totemudpu_recv_mcast_empty,
.member_add = totemudpu_member_add,
.member_remove = totemudpu_member_remove
},
#ifdef HAVE_RDMA
{
.name = "Infiniband/IP",
.initialize = totemiba_initialize,
.buffer_alloc = totemiba_buffer_alloc,
.buffer_release = totemiba_buffer_release,
.processor_count_set = totemiba_processor_count_set,
.token_send = totemiba_token_send,
.mcast_flush_send = totemiba_mcast_flush_send,
.mcast_noflush_send = totemiba_mcast_noflush_send,
.recv_flush = totemiba_recv_flush,
.send_flush = totemiba_send_flush,
.iface_check = totemiba_iface_check,
.finalize = totemiba_finalize,
.net_mtu_adjust = totemiba_net_mtu_adjust,
.iface_print = totemiba_iface_print,
.iface_get = totemiba_iface_get,
.token_target_set = totemiba_token_target_set,
.crypto_set = totemiba_crypto_set,
.recv_mcast_empty = totemiba_recv_mcast_empty
}
#endif
};
struct totemnet_instance {
void *transport_context;
struct transport *transport;
void (*totemnet_log_printf) (
int level,
int subsys,
const char *function,
const char *file,
int line,
const char *format,
...)__attribute__((format(printf, 6, 7)));
int totemnet_subsys_id;
};
#define log_printf(level, format, args...) \
do { \
instance->totemnet_log_printf ( \
level, \
instance->totemnet_subsys_id, \
__FUNCTION__, __FILE__, __LINE__, \
(const char *)format, ##args); \
} while (0);
static void totemnet_instance_initialize (
struct totemnet_instance *instance,
struct totem_config *config)
{
int transport;
instance->totemnet_log_printf = config->totem_logging_configuration.log_printf;
instance->totemnet_subsys_id = config->totem_logging_configuration.log_subsys_id;
transport = config->transport_number;
log_printf (LOGSYS_LEVEL_NOTICE,
"Initializing transport (%s).", transport_entries[transport].name);
instance->transport = &transport_entries[transport];
}
int totemnet_crypto_set (
void *net_context,
const char *cipher_type,
const char *hash_type)
{
struct totemnet_instance *instance = (struct totemnet_instance *)net_context;
int res = 0;
res = instance->transport->crypto_set (instance->transport_context,
cipher_type, hash_type);
return res;
}
int totemnet_finalize (
void *net_context)
{
struct totemnet_instance *instance = (struct totemnet_instance *)net_context;
int res = 0;
res = instance->transport->finalize (instance->transport_context);
return (res);
}
int totemnet_initialize (
qb_loop_t *loop_pt,
void **net_context,
struct totem_config *totem_config,
+ totemsrp_stats_t *stats,
int interface_no,
void *context,
void (*deliver_fn) (
void *context,
const void *msg,
unsigned int msg_len),
void (*iface_change_fn) (
void *context,
const struct totem_ip_address *iface_address),
void (*target_set_completed) (
void *context))
{
struct totemnet_instance *instance;
unsigned int res;
instance = malloc (sizeof (struct totemnet_instance));
if (instance == NULL) {
return (-1);
}
totemnet_instance_initialize (instance, totem_config);
res = instance->transport->initialize (loop_pt,
- &instance->transport_context, totem_config,
+ &instance->transport_context, totem_config, stats,
interface_no, context, deliver_fn, iface_change_fn, target_set_completed);
if (res == -1) {
goto error_destroy;
}
*net_context = instance;
return (0);
error_destroy:
free (instance);
return (-1);
}
void *totemnet_buffer_alloc (void *net_context)
{
struct totemnet_instance *instance = net_context;
assert (instance != NULL);
assert (instance->transport != NULL);
return instance->transport->buffer_alloc();
}
void totemnet_buffer_release (void *net_context, void *ptr)
{
struct totemnet_instance *instance = net_context;
assert (instance != NULL);
assert (instance->transport != NULL);
instance->transport->buffer_release (ptr);
}
int totemnet_processor_count_set (
void *net_context,
int processor_count)
{
struct totemnet_instance *instance = (struct totemnet_instance *)net_context;
int res = 0;
res = instance->transport->processor_count_set (instance->transport_context, processor_count);
return (res);
}
int totemnet_recv_flush (void *net_context)
{
struct totemnet_instance *instance = (struct totemnet_instance *)net_context;
int res = 0;
res = instance->transport->recv_flush (instance->transport_context);
return (res);
}
int totemnet_send_flush (void *net_context)
{
struct totemnet_instance *instance = (struct totemnet_instance *)net_context;
int res = 0;
res = instance->transport->send_flush (instance->transport_context);
return (res);
}
int totemnet_token_send (
void *net_context,
const void *msg,
unsigned int msg_len)
{
struct totemnet_instance *instance = (struct totemnet_instance *)net_context;
int res = 0;
res = instance->transport->token_send (instance->transport_context, msg, msg_len);
return (res);
}
int totemnet_mcast_flush_send (
void *net_context,
const void *msg,
unsigned int msg_len)
{
struct totemnet_instance *instance = (struct totemnet_instance *)net_context;
int res = 0;
res = instance->transport->mcast_flush_send (instance->transport_context, msg, msg_len);
return (res);
}
int totemnet_mcast_noflush_send (
void *net_context,
const void *msg,
unsigned int msg_len)
{
struct totemnet_instance *instance = (struct totemnet_instance *)net_context;
int res = 0;
res = instance->transport->mcast_noflush_send (instance->transport_context, msg, msg_len);
return (res);
}
extern int totemnet_iface_check (void *net_context)
{
struct totemnet_instance *instance = (struct totemnet_instance *)net_context;
int res = 0;
res = instance->transport->iface_check (instance->transport_context);
return (res);
}
extern int totemnet_net_mtu_adjust (void *net_context, struct totem_config *totem_config)
{
struct totemnet_instance *instance = (struct totemnet_instance *)net_context;
int res = 0;
instance->transport->net_mtu_adjust (instance->transport_context, totem_config);
return (res);
}
const char *totemnet_iface_print (void *net_context) {
struct totemnet_instance *instance = (struct totemnet_instance *)net_context;
const char *ret_char;
ret_char = instance->transport->iface_print (instance->transport_context);
return (ret_char);
}
int totemnet_iface_get (
void *net_context,
struct totem_ip_address *addr)
{
struct totemnet_instance *instance = (struct totemnet_instance *)net_context;
unsigned int res;
res = instance->transport->iface_get (instance->transport_context, addr);
return (res);
}
int totemnet_token_target_set (
void *net_context,
const struct totem_ip_address *token_target)
{
struct totemnet_instance *instance = (struct totemnet_instance *)net_context;
unsigned int res;
res = instance->transport->token_target_set (instance->transport_context, token_target);
return (res);
}
extern int totemnet_recv_mcast_empty (
void *net_context)
{
struct totemnet_instance *instance = (struct totemnet_instance *)net_context;
unsigned int res;
res = instance->transport->recv_mcast_empty (instance->transport_context);
return (res);
}
extern int totemnet_member_add (
void *net_context,
const struct totem_ip_address *member)
{
struct totemnet_instance *instance = (struct totemnet_instance *)net_context;
unsigned int res = 0;
if (instance->transport->member_add) {
res = instance->transport->member_add (
instance->transport_context,
member);
}
return (res);
}
extern int totemnet_member_remove (
void *net_context,
const struct totem_ip_address *member)
{
struct totemnet_instance *instance = (struct totemnet_instance *)net_context;
unsigned int res = 0;
if (instance->transport->member_remove) {
res = instance->transport->member_remove (
instance->transport_context,
member);
}
return (res);
}
diff --git a/exec/totemnet.h b/exec/totemnet.h
index 232c5cf1..0adc1073 100644
--- a/exec/totemnet.h
+++ b/exec/totemnet.h
@@ -1,135 +1,136 @@
/*
* Copyright (c) 2005 MontaVista Software, Inc.
* Copyright (c) 2006-2007, 2009 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Steven Dake (sdake@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
/**
* @file
* Totem Network interface - also does encryption/decryption
*
* depends on poll abstraction, POSIX, IPV4
*/
#ifndef TOTEMNET_H_DEFINED
#define TOTEMNET_H_DEFINED
#include <sys/types.h>
#include <sys/socket.h>
#include <corosync/totem/totem.h>
#define TOTEMNET_NOFLUSH 0
#define TOTEMNET_FLUSH 1
/**
* Create an instance
*/
extern int totemnet_initialize (
qb_loop_t *poll_handle,
void **net_context,
struct totem_config *totem_config,
+ totemsrp_stats_t *stats,
int interface_no,
void *context,
void (*deliver_fn) (
void *context,
const void *msg,
unsigned int msg_len),
void (*iface_change_fn) (
void *context,
const struct totem_ip_address *iface_address),
void (*target_set_completed) (
void *context));
extern void *totemnet_buffer_alloc (void *net_context);
extern void totemnet_buffer_release (void *net_context, void *ptr);
extern int totemnet_processor_count_set (
void *net_context,
int processor_count);
extern int totemnet_token_send (
void *net_context,
const void *msg,
unsigned int msg_len);
extern int totemnet_mcast_flush_send (
void *net_context,
const void *msg,
unsigned int msg_len);
extern int totemnet_mcast_noflush_send (
void *net_context,
const void *msg,
unsigned int msg_len);
extern int totemnet_recv_flush (void *net_context);
extern int totemnet_send_flush (void *net_context);
extern int totemnet_iface_check (void *net_context);
extern int totemnet_finalize (void *net_context);
extern int totemnet_net_mtu_adjust (void *net_context, struct totem_config *totem_config);
extern const char *totemnet_iface_print (void *net_context);
extern int totemnet_iface_get (
void *net_context,
struct totem_ip_address *addr);
extern int totemnet_token_target_set (
void *net_context,
const struct totem_ip_address *token_target);
extern int totemnet_crypto_set (
void *net_context,
const char *cipher_type,
const char *hash_type);
extern int totemnet_recv_mcast_empty (
void *net_context);
extern int totemnet_member_add (
void *net_context,
const struct totem_ip_address *member);
extern int totemnet_member_remove (
void *net_context,
const struct totem_ip_address *member);
#endif /* TOTEMNET_H_DEFINED */
diff --git a/exec/totemrrp.c b/exec/totemrrp.c
index de6cd394..0623892f 100644
--- a/exec/totemrrp.c
+++ b/exec/totemrrp.c
@@ -1,2154 +1,2155 @@
/*
* Copyright (c) 2005 MontaVista Software, Inc.
* Copyright (c) 2006-2012 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Steven Dake (sdake@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <config.h>
#include <assert.h>
#include <pthread.h>
#include <sys/mman.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <netdb.h>
#include <sys/un.h>
#include <sys/ioctl.h>
#include <sys/param.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <sched.h>
#include <time.h>
#include <sys/time.h>
#include <sys/poll.h>
#include <limits.h>
#include <corosync/sq.h>
#include <corosync/list.h>
#include <corosync/swab.h>
#include <qb/qbdefs.h>
#include <qb/qbloop.h>
#define LOGSYS_UTILS_ONLY 1
#include <corosync/logsys.h>
#include "totemnet.h"
#include "totemrrp.h"
void rrp_deliver_fn (
void *context,
const void *msg,
unsigned int msg_len);
void rrp_iface_change_fn (
void *context,
const struct totem_ip_address *iface_addr);
struct totemrrp_instance;
struct passive_instance {
struct totemrrp_instance *rrp_instance;
unsigned int *faulty;
unsigned int *token_recv_count;
unsigned int *mcast_recv_count;
unsigned char token[15000];
unsigned int token_len;
qb_loop_timer_handle timer_expired_token;
qb_loop_timer_handle timer_problem_decrementer;
void *totemrrp_context;
unsigned int token_xmit_iface;
unsigned int msg_xmit_iface;
};
struct active_instance {
struct totemrrp_instance *rrp_instance;
unsigned int *faulty;
unsigned int *last_token_recv;
unsigned int *counter_problems;
unsigned char token[15000];
unsigned int token_len;
unsigned int last_token_seq;
qb_loop_timer_handle timer_expired_token;
qb_loop_timer_handle timer_problem_decrementer;
void *totemrrp_context;
};
struct rrp_algo {
const char *name;
void * (*initialize) (
struct totemrrp_instance *rrp_instance,
int interface_count);
void (*mcast_recv) (
struct totemrrp_instance *instance,
unsigned int iface_no,
void *context,
const void *msg,
unsigned int msg_len);
void (*mcast_noflush_send) (
struct totemrrp_instance *instance,
const void *msg,
unsigned int msg_len);
void (*mcast_flush_send) (
struct totemrrp_instance *instance,
const void *msg,
unsigned int msg_len);
void (*token_recv) (
struct totemrrp_instance *instance,
unsigned int iface_no,
void *context,
const void *msg,
unsigned int msg_len,
unsigned int token_seqid);
void (*token_send) (
struct totemrrp_instance *instance,
const void *msg,
unsigned int msg_len);
void (*recv_flush) (
struct totemrrp_instance *instance);
void (*send_flush) (
struct totemrrp_instance *instance);
void (*iface_check) (
struct totemrrp_instance *instance);
void (*processor_count_set) (
struct totemrrp_instance *instance,
unsigned int processor_count);
void (*token_target_set) (
struct totemrrp_instance *instance,
struct totem_ip_address *token_target,
unsigned int iface_no);
void (*ring_reenable) (
struct totemrrp_instance *instance,
unsigned int iface_no);
int (*mcast_recv_empty) (
struct totemrrp_instance *instance);
int (*member_add) (
struct totemrrp_instance *instance,
const struct totem_ip_address *member,
unsigned int iface_no);
int (*member_remove) (
struct totemrrp_instance *instance,
const struct totem_ip_address *member,
unsigned int iface_no);
};
struct totemrrp_instance {
qb_loop_t *poll_handle;
struct totem_interface *interfaces;
struct rrp_algo *rrp_algo;
void *context;
char *status[INTERFACE_MAX];
void (*totemrrp_deliver_fn) (
void *context,
const void *msg,
unsigned int msg_len);
void (*totemrrp_iface_change_fn) (
void *context,
const struct totem_ip_address *iface_addr,
unsigned int iface_no);
void (*totemrrp_token_seqid_get) (
const void *msg,
unsigned int *seqid,
unsigned int *token_is);
void (*totemrrp_target_set_completed) (
void *context);
unsigned int (*totemrrp_msgs_missing) (void);
/*
* Function and data used to log messages
*/
int totemrrp_log_level_security;
int totemrrp_log_level_error;
int totemrrp_log_level_warning;
int totemrrp_log_level_notice;
int totemrrp_log_level_debug;
int totemrrp_subsys_id;
void (*totemrrp_log_printf) (
int level,
int subsys,
const char *function,
const char *file,
int line,
const char *format, ...)__attribute__((format(printf, 6, 7)));
void **net_handles;
void *rrp_algo_instance;
int interface_count;
int processor_count;
int my_nodeid;
struct totem_config *totem_config;
void *deliver_fn_context[INTERFACE_MAX];
qb_loop_timer_handle timer_active_test_ring_timeout[INTERFACE_MAX];
totemrrp_stats_t stats;
};
static void stats_set_interface_faulty(struct totemrrp_instance *rrp_instance,
unsigned int iface_no, int is_faulty);
/*
* None Replication Forward Declerations
*/
static void none_mcast_recv (
struct totemrrp_instance *instance,
unsigned int iface_no,
void *context,
const void *msg,
unsigned int msg_len);
static void none_mcast_noflush_send (
struct totemrrp_instance *instance,
const void *msg,
unsigned int msg_len);
static void none_mcast_flush_send (
struct totemrrp_instance *instance,
const void *msg,
unsigned int msg_len);
static void none_token_recv (
struct totemrrp_instance *instance,
unsigned int iface_no,
void *context,
const void *msg,
unsigned int msg_len,
unsigned int token_seqid);
static void none_token_send (
struct totemrrp_instance *instance,
const void *msg,
unsigned int msg_len);
static void none_recv_flush (
struct totemrrp_instance *instance);
static void none_send_flush (
struct totemrrp_instance *instance);
static void none_iface_check (
struct totemrrp_instance *instance);
static void none_processor_count_set (
struct totemrrp_instance *instance,
unsigned int processor_count_set);
static void none_token_target_set (
struct totemrrp_instance *instance,
struct totem_ip_address *token_target,
unsigned int iface_no);
static void none_ring_reenable (
struct totemrrp_instance *instance,
unsigned int iface_no);
static int none_mcast_recv_empty (
struct totemrrp_instance *instance);
static int none_member_add (
struct totemrrp_instance *instance,
const struct totem_ip_address *member,
unsigned int iface_no);
static int none_member_remove (
struct totemrrp_instance *instance,
const struct totem_ip_address *member,
unsigned int iface_no);
/*
* Passive Replication Forward Declerations
*/
static void *passive_instance_initialize (
struct totemrrp_instance *rrp_instance,
int interface_count);
static void passive_mcast_recv (
struct totemrrp_instance *instance,
unsigned int iface_no,
void *context,
const void *msg,
unsigned int msg_len);
static void passive_mcast_noflush_send (
struct totemrrp_instance *instance,
const void *msg,
unsigned int msg_len);
static void passive_mcast_flush_send (
struct totemrrp_instance *instance,
const void *msg,
unsigned int msg_len);
static void passive_monitor (
struct totemrrp_instance *rrp_instance,
unsigned int iface_no,
int is_token_recv_count);
static void passive_token_recv (
struct totemrrp_instance *instance,
unsigned int iface_no,
void *context,
const void *msg,
unsigned int msg_len,
unsigned int token_seqid);
static void passive_token_send (
struct totemrrp_instance *instance,
const void *msg,
unsigned int msg_len);
static void passive_recv_flush (
struct totemrrp_instance *instance);
static void passive_send_flush (
struct totemrrp_instance *instance);
static void passive_iface_check (
struct totemrrp_instance *instance);
static void passive_processor_count_set (
struct totemrrp_instance *instance,
unsigned int processor_count_set);
static void passive_token_target_set (
struct totemrrp_instance *instance,
struct totem_ip_address *token_target,
unsigned int iface_no);
static void passive_ring_reenable (
struct totemrrp_instance *instance,
unsigned int iface_no);
static int passive_mcast_recv_empty (
struct totemrrp_instance *instance);
static int passive_member_add (
struct totemrrp_instance *instance,
const struct totem_ip_address *member,
unsigned int iface_no);
static int passive_member_remove (
struct totemrrp_instance *instance,
const struct totem_ip_address *member,
unsigned int iface_no);
/*
* Active Replication Forward Definitions
*/
static void *active_instance_initialize (
struct totemrrp_instance *rrp_instance,
int interface_count);
static void active_mcast_recv (
struct totemrrp_instance *instance,
unsigned int iface_no,
void *context,
const void *msg,
unsigned int msg_len);
static void active_mcast_noflush_send (
struct totemrrp_instance *instance,
const void *msg,
unsigned int msg_len);
static void active_mcast_flush_send (
struct totemrrp_instance *instance,
const void *msg,
unsigned int msg_len);
static void active_token_recv (
struct totemrrp_instance *instance,
unsigned int iface_no,
void *context,
const void *msg,
unsigned int msg_len,
unsigned int token_seqid);
static void active_token_send (
struct totemrrp_instance *instance,
const void *msg,
unsigned int msg_len);
static void active_recv_flush (
struct totemrrp_instance *instance);
static void active_send_flush (
struct totemrrp_instance *instance);
static void active_iface_check (
struct totemrrp_instance *instance);
static void active_processor_count_set (
struct totemrrp_instance *instance,
unsigned int processor_count_set);
static void active_token_target_set (
struct totemrrp_instance *instance,
struct totem_ip_address *token_target,
unsigned int iface_no);
static void active_ring_reenable (
struct totemrrp_instance *instance,
unsigned int iface_no);
static int active_mcast_recv_empty (
struct totemrrp_instance *instance);
static int active_member_add (
struct totemrrp_instance *instance,
const struct totem_ip_address *member,
unsigned int iface_no);
static int active_member_remove (
struct totemrrp_instance *instance,
const struct totem_ip_address *member,
unsigned int iface_no);
static void active_timer_expired_token_start (
struct active_instance *active_instance);
static void active_timer_expired_token_cancel (
struct active_instance *active_instance);
static void active_timer_problem_decrementer_start (
struct active_instance *active_instance);
static void active_timer_problem_decrementer_cancel (
struct active_instance *active_instance);
/*
* 0-5 reserved for totemsrp.c
*/
#define MESSAGE_TYPE_RING_TEST_ACTIVE 6
#define MESSAGE_TYPE_RING_TEST_ACTIVATE 7
#define ENDIAN_LOCAL 0xff22
/*
* Rollover handling:
*
* ARR_SEQNO_START_TOKEN is the starting sequence number of last seen sequence
* for a token for active redundand ring. This should remain zero, unless testing
* overflow in which case 07fffff00 or 0xffffff00 are good starting values.
* It should be same as on defined in totemsrp.c
*/
#define ARR_SEQNO_START_TOKEN 0x0
/*
* These can be used ot test different rollover points
* #define ARR_SEQNO_START_MSG 0xfffffe00
*/
/*
* Threshold value when recv_count for passive rrp should be adjusted.
* Set this value to some smaller for testing of adjusting proper
* functionality. Also keep in mind that this value must be smaller
* then rrp_problem_count_threshold
*/
#define PASSIVE_RECV_COUNT_THRESHOLD (INT_MAX / 2)
struct message_header {
char type;
char encapsulated;
unsigned short endian_detector;
int ring_number;
int nodeid_activator;
} __attribute__((packed));
struct deliver_fn_context {
struct totemrrp_instance *instance;
void *context;
int iface_no;
};
struct rrp_algo none_algo = {
.name = "none",
.initialize = NULL,
.mcast_recv = none_mcast_recv,
.mcast_noflush_send = none_mcast_noflush_send,
.mcast_flush_send = none_mcast_flush_send,
.token_recv = none_token_recv,
.token_send = none_token_send,
.recv_flush = none_recv_flush,
.send_flush = none_send_flush,
.iface_check = none_iface_check,
.processor_count_set = none_processor_count_set,
.token_target_set = none_token_target_set,
.ring_reenable = none_ring_reenable,
.mcast_recv_empty = none_mcast_recv_empty,
.member_add = none_member_add,
.member_remove = none_member_remove
};
struct rrp_algo passive_algo = {
.name = "passive",
.initialize = passive_instance_initialize,
.mcast_recv = passive_mcast_recv,
.mcast_noflush_send = passive_mcast_noflush_send,
.mcast_flush_send = passive_mcast_flush_send,
.token_recv = passive_token_recv,
.token_send = passive_token_send,
.recv_flush = passive_recv_flush,
.send_flush = passive_send_flush,
.iface_check = passive_iface_check,
.processor_count_set = passive_processor_count_set,
.token_target_set = passive_token_target_set,
.ring_reenable = passive_ring_reenable,
.mcast_recv_empty = passive_mcast_recv_empty,
.member_add = passive_member_add,
.member_remove = passive_member_remove
};
struct rrp_algo active_algo = {
.name = "active",
.initialize = active_instance_initialize,
.mcast_recv = active_mcast_recv,
.mcast_noflush_send = active_mcast_noflush_send,
.mcast_flush_send = active_mcast_flush_send,
.token_recv = active_token_recv,
.token_send = active_token_send,
.recv_flush = active_recv_flush,
.send_flush = active_send_flush,
.iface_check = active_iface_check,
.processor_count_set = active_processor_count_set,
.token_target_set = active_token_target_set,
.ring_reenable = active_ring_reenable,
.mcast_recv_empty = active_mcast_recv_empty,
.member_add = active_member_add,
.member_remove = active_member_remove
};
struct rrp_algo *rrp_algos[] = {
&none_algo,
&passive_algo,
&active_algo
};
#define RRP_ALGOS_COUNT 3
#define log_printf(level, format, args...) \
do { \
rrp_instance->totemrrp_log_printf ( \
level, rrp_instance->totemrrp_subsys_id, \
__FUNCTION__, __FILE__, __LINE__, \
format, ##args); \
} while (0);
static void stats_set_interface_faulty(struct totemrrp_instance *rrp_instance,
unsigned int iface_no, int is_faulty)
{
rrp_instance->stats.faulty[iface_no] = (is_faulty ? 1 : 0);
}
static void test_active_msg_endian_convert(const struct message_header *in, struct message_header *out)
{
out->type = in->type;
out->encapsulated = in->encapsulated;
out->endian_detector = ENDIAN_LOCAL;
out->ring_number = swab32 (in->ring_number);
out->nodeid_activator = swab32(in->nodeid_activator);
}
static void timer_function_test_ring_timeout (void *context)
{
struct deliver_fn_context *deliver_fn_context = (struct deliver_fn_context *)context;
struct totemrrp_instance *rrp_instance = deliver_fn_context->instance;
unsigned int *faulty = NULL;
int iface_no = deliver_fn_context->iface_no;
struct message_header msg = {
.type = MESSAGE_TYPE_RING_TEST_ACTIVE,
.endian_detector = ENDIAN_LOCAL,
};
if (strcmp(rrp_instance->totem_config->rrp_mode, "active") == 0)
faulty = ((struct active_instance *)(rrp_instance->rrp_algo_instance))->faulty;
if (strcmp(rrp_instance->totem_config->rrp_mode, "passive") == 0)
faulty = ((struct passive_instance *)(rrp_instance->rrp_algo_instance))->faulty;
assert (faulty != NULL);
if (faulty[iface_no] == 1) {
msg.ring_number = iface_no;
msg.nodeid_activator = rrp_instance->my_nodeid;
totemnet_token_send (
rrp_instance->net_handles[iface_no],
&msg, sizeof (struct message_header));
qb_loop_timer_add (rrp_instance->poll_handle,
QB_LOOP_MED,
rrp_instance->totem_config->rrp_autorecovery_check_timeout*QB_TIME_NS_IN_MSEC,
(void *)deliver_fn_context,
timer_function_test_ring_timeout,
&rrp_instance->timer_active_test_ring_timeout[iface_no]);
}
}
/*
* None Replication Implementation
*/
static void none_mcast_recv (
struct totemrrp_instance *rrp_instance,
unsigned int iface_no,
void *context,
const void *msg,
unsigned int msg_len)
{
rrp_instance->totemrrp_deliver_fn (
context,
msg,
msg_len);
}
static void none_mcast_flush_send (
struct totemrrp_instance *instance,
const void *msg,
unsigned int msg_len)
{
totemnet_mcast_flush_send (instance->net_handles[0], msg, msg_len);
}
static void none_mcast_noflush_send (
struct totemrrp_instance *instance,
const void *msg,
unsigned int msg_len)
{
totemnet_mcast_noflush_send (instance->net_handles[0], msg, msg_len);
}
static void none_token_recv (
struct totemrrp_instance *rrp_instance,
unsigned int iface_no,
void *context,
const void *msg,
unsigned int msg_len,
unsigned int token_seq)
{
rrp_instance->totemrrp_deliver_fn (
context,
msg,
msg_len);
}
static void none_token_send (
struct totemrrp_instance *instance,
const void *msg,
unsigned int msg_len)
{
totemnet_token_send (
instance->net_handles[0],
msg, msg_len);
}
static void none_recv_flush (struct totemrrp_instance *instance)
{
totemnet_recv_flush (instance->net_handles[0]);
}
static void none_send_flush (struct totemrrp_instance *instance)
{
totemnet_send_flush (instance->net_handles[0]);
}
static void none_iface_check (struct totemrrp_instance *instance)
{
totemnet_iface_check (instance->net_handles[0]);
}
static void none_processor_count_set (
struct totemrrp_instance *instance,
unsigned int processor_count)
{
totemnet_processor_count_set (instance->net_handles[0],
processor_count);
}
static void none_token_target_set (
struct totemrrp_instance *instance,
struct totem_ip_address *token_target,
unsigned int iface_no)
{
totemnet_token_target_set (instance->net_handles[0], token_target);
}
static void none_ring_reenable (
struct totemrrp_instance *instance,
unsigned int iface_no)
{
/*
* No operation
*/
}
static int none_mcast_recv_empty (
struct totemrrp_instance *instance)
{
int res;
res = totemnet_recv_mcast_empty (instance->net_handles[0]);
return (res);
}
static int none_member_add (
struct totemrrp_instance *instance,
const struct totem_ip_address *member,
unsigned int iface_no)
{
int res;
res = totemnet_member_add (instance->net_handles[0], member);
return (res);
}
static int none_member_remove (
struct totemrrp_instance *instance,
const struct totem_ip_address *member,
unsigned int iface_no)
{
int res;
res = totemnet_member_remove (instance->net_handles[0], member);
return (res);
}
/*
* Passive Replication Implementation
*/
void *passive_instance_initialize (
struct totemrrp_instance *rrp_instance,
int interface_count)
{
struct passive_instance *instance;
int i;
instance = malloc (sizeof (struct passive_instance));
if (instance == 0) {
goto error_exit;
}
memset (instance, 0, sizeof (struct passive_instance));
instance->faulty = malloc (sizeof (int) * interface_count);
if (instance->faulty == 0) {
free (instance);
instance = 0;
goto error_exit;
}
memset (instance->faulty, 0, sizeof (int) * interface_count);
for (i = 0; i < interface_count; i++) {
stats_set_interface_faulty (rrp_instance, i, 0);
}
instance->token_recv_count = malloc (sizeof (int) * interface_count);
if (instance->token_recv_count == 0) {
free (instance->faulty);
free (instance);
instance = 0;
goto error_exit;
}
memset (instance->token_recv_count, 0, sizeof (int) * interface_count);
instance->mcast_recv_count = malloc (sizeof (int) * interface_count);
if (instance->mcast_recv_count == 0) {
free (instance->token_recv_count);
free (instance->faulty);
free (instance);
instance = 0;
goto error_exit;
}
memset (instance->mcast_recv_count, 0, sizeof (int) * interface_count);
error_exit:
return ((void *)instance);
}
static void timer_function_passive_token_expired (void *context)
{
struct passive_instance *passive_instance = (struct passive_instance *)context;
struct totemrrp_instance *rrp_instance = passive_instance->rrp_instance;
rrp_instance->totemrrp_deliver_fn (
passive_instance->totemrrp_context,
passive_instance->token,
passive_instance->token_len);
}
/* TODO
static void timer_function_passive_problem_decrementer (void *context)
{
// struct passive_instance *passive_instance = (struct passive_instance *)context;
// struct totemrrp_instance *rrp_instance = passive_instance->rrp_instance;
}
*/
static void passive_timer_expired_token_start (
struct passive_instance *passive_instance)
{
qb_loop_timer_add (
passive_instance->rrp_instance->poll_handle,
QB_LOOP_MED,
passive_instance->rrp_instance->totem_config->rrp_token_expired_timeout*QB_TIME_NS_IN_MSEC,
(void *)passive_instance,
timer_function_passive_token_expired,
&passive_instance->timer_expired_token);
}
static void passive_timer_expired_token_cancel (
struct passive_instance *passive_instance)
{
qb_loop_timer_del (
passive_instance->rrp_instance->poll_handle,
passive_instance->timer_expired_token);
}
/*
static void passive_timer_problem_decrementer_start (
struct passive_instance *passive_instance)
{
qb_loop_timer_add (
QB_LOOP_MED,
passive_instance->rrp_instance->poll_handle,
passive_instance->rrp_instance->totem_config->rrp_problem_count_timeout*QB_TIME_NS_IN_MSEC,
(void *)passive_instance,
timer_function_passive_problem_decrementer,
&passive_instance->timer_problem_decrementer);
}
static void passive_timer_problem_decrementer_cancel (
struct passive_instance *passive_instance)
{
qb_loop_timer_del (
passive_instance->rrp_instance->poll_handle,
passive_instance->timer_problem_decrementer);
}
*/
/*
* Monitor function implementation from rrp paper.
* rrp_instance is passive rrp instance, iface_no is interface with received messgae/token and
* is_token_recv_count is boolean variable which donates if message is token (>1) or regular
* message (= 0)
*/
static void passive_monitor (
struct totemrrp_instance *rrp_instance,
unsigned int iface_no,
int is_token_recv_count)
{
struct passive_instance *passive_instance = (struct passive_instance *)rrp_instance->rrp_algo_instance;
unsigned int *recv_count;
unsigned int max;
unsigned int i;
unsigned int min_all, min_active;
unsigned int threshold;
/*
* Monitor for failures
*/
if (is_token_recv_count) {
recv_count = passive_instance->token_recv_count;
threshold = rrp_instance->totem_config->rrp_problem_count_threshold;
} else {
recv_count = passive_instance->mcast_recv_count;
threshold = rrp_instance->totem_config->rrp_problem_count_mcast_threshold;
}
recv_count[iface_no] += 1;
max = 0;
for (i = 0; i < rrp_instance->interface_count; i++) {
if (max < recv_count[i]) {
max = recv_count[i];
}
}
/*
* Max is larger then threshold -> start adjusting process
*/
if (max > PASSIVE_RECV_COUNT_THRESHOLD) {
min_all = min_active = recv_count[iface_no];
for (i = 0; i < rrp_instance->interface_count; i++) {
if (recv_count[i] < min_all) {
min_all = recv_count[i];
}
if (passive_instance->faulty[i] == 0 &&
recv_count[i] < min_active) {
min_active = recv_count[i];
}
}
if (min_all > 0) {
/*
* There is one or more faulty device with recv_count > 0
*/
for (i = 0; i < rrp_instance->interface_count; i++) {
recv_count[i] -= min_all;
}
} else {
/*
* No faulty device with recv_count > 0, adjust only active
* devices
*/
for (i = 0; i < rrp_instance->interface_count; i++) {
if (passive_instance->faulty[i] == 0) {
recv_count[i] -= min_active;
}
}
}
/*
* Find again max
*/
max = 0;
for (i = 0; i < rrp_instance->interface_count; i++) {
if (max < recv_count[i]) {
max = recv_count[i];
}
}
}
for (i = 0; i < rrp_instance->interface_count; i++) {
if ((passive_instance->faulty[i] == 0) &&
(max - recv_count[i] > threshold)) {
passive_instance->faulty[i] = 1;
qb_loop_timer_add (rrp_instance->poll_handle,
QB_LOOP_MED,
rrp_instance->totem_config->rrp_autorecovery_check_timeout*QB_TIME_NS_IN_MSEC,
rrp_instance->deliver_fn_context[i],
timer_function_test_ring_timeout,
&rrp_instance->timer_active_test_ring_timeout[i]);
stats_set_interface_faulty (rrp_instance, i, passive_instance->faulty[i]);
sprintf (rrp_instance->status[i],
"Marking ringid %u interface %s FAULTY",
i,
totemnet_iface_print (rrp_instance->net_handles[i]));
log_printf (
rrp_instance->totemrrp_log_level_error,
"%s",
rrp_instance->status[i]);
}
}
}
static void passive_mcast_recv (
struct totemrrp_instance *rrp_instance,
unsigned int iface_no,
void *context,
const void *msg,
unsigned int msg_len)
{
struct passive_instance *passive_instance = (struct passive_instance *)rrp_instance->rrp_algo_instance;
rrp_instance->totemrrp_deliver_fn (
context,
msg,
msg_len);
if (rrp_instance->totemrrp_msgs_missing() == 0 &&
passive_instance->timer_expired_token) {
/*
* Delivers the last token
*/
rrp_instance->totemrrp_deliver_fn (
passive_instance->totemrrp_context,
passive_instance->token,
passive_instance->token_len);
passive_timer_expired_token_cancel (passive_instance);
}
passive_monitor (rrp_instance, iface_no, 0);
}
static void passive_mcast_flush_send (
struct totemrrp_instance *instance,
const void *msg,
unsigned int msg_len)
{
struct passive_instance *passive_instance = (struct passive_instance *)instance->rrp_algo_instance;
int i = 0;
do {
passive_instance->msg_xmit_iface = (passive_instance->msg_xmit_iface + 1) % instance->interface_count;
i++;
} while ((i <= instance->interface_count) && (passive_instance->faulty[passive_instance->msg_xmit_iface] == 1));
if (i <= instance->interface_count) {
totemnet_mcast_flush_send (instance->net_handles[passive_instance->msg_xmit_iface], msg, msg_len);
}
}
static void passive_mcast_noflush_send (
struct totemrrp_instance *instance,
const void *msg,
unsigned int msg_len)
{
struct passive_instance *passive_instance = (struct passive_instance *)instance->rrp_algo_instance;
int i = 0;
do {
passive_instance->msg_xmit_iface = (passive_instance->msg_xmit_iface + 1) % instance->interface_count;
i++;
} while ((i <= instance->interface_count) && (passive_instance->faulty[passive_instance->msg_xmit_iface] == 1));
if (i <= instance->interface_count) {
totemnet_mcast_noflush_send (instance->net_handles[passive_instance->msg_xmit_iface], msg, msg_len);
}
}
static void passive_token_recv (
struct totemrrp_instance *rrp_instance,
unsigned int iface_no,
void *context,
const void *msg,
unsigned int msg_len,
unsigned int token_seq)
{
struct passive_instance *passive_instance = (struct passive_instance *)rrp_instance->rrp_algo_instance;
passive_instance->totemrrp_context = context; // this should be in totemrrp_instance ? TODO
if (rrp_instance->totemrrp_msgs_missing() == 0) {
rrp_instance->totemrrp_deliver_fn (
context,
msg,
msg_len);
} else {
memcpy (passive_instance->token, msg, msg_len);
passive_timer_expired_token_start (passive_instance);
}
passive_monitor (rrp_instance, iface_no, 1);
}
static void passive_token_send (
struct totemrrp_instance *instance,
const void *msg,
unsigned int msg_len)
{
struct passive_instance *passive_instance = (struct passive_instance *)instance->rrp_algo_instance;
int i = 0;
do {
passive_instance->token_xmit_iface = (passive_instance->token_xmit_iface + 1) % instance->interface_count;
i++;
} while ((i <= instance->interface_count) && (passive_instance->faulty[passive_instance->token_xmit_iface] == 1));
if (i <= instance->interface_count) {
totemnet_token_send (
instance->net_handles[passive_instance->token_xmit_iface],
msg, msg_len);
}
}
static void passive_recv_flush (struct totemrrp_instance *instance)
{
struct passive_instance *rrp_algo_instance = (struct passive_instance *)instance->rrp_algo_instance;
unsigned int i;
for (i = 0; i < instance->interface_count; i++) {
if (rrp_algo_instance->faulty[i] == 0) {
totemnet_recv_flush (instance->net_handles[i]);
}
}
}
static void passive_send_flush (struct totemrrp_instance *instance)
{
struct passive_instance *rrp_algo_instance = (struct passive_instance *)instance->rrp_algo_instance;
unsigned int i;
for (i = 0; i < instance->interface_count; i++) {
if (rrp_algo_instance->faulty[i] == 0) {
totemnet_send_flush (instance->net_handles[i]);
}
}
}
static void passive_iface_check (struct totemrrp_instance *instance)
{
struct passive_instance *rrp_algo_instance = (struct passive_instance *)instance->rrp_algo_instance;
unsigned int i;
for (i = 0; i < instance->interface_count; i++) {
if (rrp_algo_instance->faulty[i] == 0) {
totemnet_iface_check (instance->net_handles[i]);
}
}
}
static void passive_processor_count_set (
struct totemrrp_instance *instance,
unsigned int processor_count)
{
struct passive_instance *rrp_algo_instance = (struct passive_instance *)instance->rrp_algo_instance;
unsigned int i;
for (i = 0; i < instance->interface_count; i++) {
if (rrp_algo_instance->faulty[i] == 0) {
totemnet_processor_count_set (instance->net_handles[i],
processor_count);
}
}
}
static void passive_token_target_set (
struct totemrrp_instance *instance,
struct totem_ip_address *token_target,
unsigned int iface_no)
{
totemnet_token_target_set (instance->net_handles[iface_no], token_target);
}
static int passive_mcast_recv_empty (
struct totemrrp_instance *instance)
{
int res;
int msgs_emptied = 0;
int i;
for (i = 0; i < instance->interface_count; i++) {
res = totemnet_recv_mcast_empty (instance->net_handles[i]);
if (res == -1) {
return (-1);
}
if (res == 1) {
msgs_emptied = 1;
}
}
return (msgs_emptied);
}
static int passive_member_add (
struct totemrrp_instance *instance,
const struct totem_ip_address *member,
unsigned int iface_no)
{
int res;
res = totemnet_member_add (instance->net_handles[iface_no], member);
return (res);
}
static int passive_member_remove (
struct totemrrp_instance *instance,
const struct totem_ip_address *member,
unsigned int iface_no)
{
int res;
res = totemnet_member_remove (instance->net_handles[iface_no], member);
return (res);
}
static void passive_ring_reenable (
struct totemrrp_instance *instance,
unsigned int iface_no)
{
struct passive_instance *rrp_algo_instance = (struct passive_instance *)instance->rrp_algo_instance;
int i;
memset (rrp_algo_instance->mcast_recv_count, 0, sizeof (unsigned int) *
instance->interface_count);
memset (rrp_algo_instance->token_recv_count, 0, sizeof (unsigned int) *
instance->interface_count);
if (iface_no == instance->interface_count) {
memset (rrp_algo_instance->faulty, 0, sizeof (unsigned int) *
instance->interface_count);
for (i = 0; i < instance->interface_count; i++) {
stats_set_interface_faulty (instance, i, 0);
}
} else {
rrp_algo_instance->faulty[iface_no] = 0;
stats_set_interface_faulty (instance, iface_no, 0);
}
}
/*
* Active Replication Implementation
*/
void *active_instance_initialize (
struct totemrrp_instance *rrp_instance,
int interface_count)
{
struct active_instance *instance;
int i;
instance = malloc (sizeof (struct active_instance));
if (instance == 0) {
goto error_exit;
}
memset (instance, 0, sizeof (struct active_instance));
instance->faulty = malloc (sizeof (int) * interface_count);
if (instance->faulty == 0) {
free (instance);
instance = 0;
goto error_exit;
}
memset (instance->faulty, 0, sizeof (unsigned int) * interface_count);
for (i = 0; i < interface_count; i++) {
stats_set_interface_faulty (rrp_instance, i, 0);
}
instance->last_token_recv = malloc (sizeof (int) * interface_count);
if (instance->last_token_recv == 0) {
free (instance->faulty);
free (instance);
instance = 0;
goto error_exit;
}
memset (instance->last_token_recv, 0, sizeof (unsigned int) * interface_count);
instance->counter_problems = malloc (sizeof (int) * interface_count);
if (instance->counter_problems == 0) {
free (instance->last_token_recv);
free (instance->faulty);
free (instance);
instance = 0;
goto error_exit;
}
memset (instance->counter_problems, 0, sizeof (unsigned int) * interface_count);
instance->timer_expired_token = 0;
instance->timer_problem_decrementer = 0;
instance->rrp_instance = rrp_instance;
instance->last_token_seq = ARR_SEQNO_START_TOKEN - 1;
error_exit:
return ((void *)instance);
}
static void timer_function_active_problem_decrementer (void *context)
{
struct active_instance *active_instance = (struct active_instance *)context;
struct totemrrp_instance *rrp_instance = active_instance->rrp_instance;
unsigned int problem_found = 0;
unsigned int i;
for (i = 0; i < rrp_instance->interface_count; i++) {
if (active_instance->counter_problems[i] > 0) {
problem_found = 1;
active_instance->counter_problems[i] -= 1;
if (active_instance->counter_problems[i] == 0) {
sprintf (rrp_instance->status[i],
"ring %d active with no faults", i);
} else {
sprintf (rrp_instance->status[i],
"Decrementing problem counter for iface %s to [%d of %d]",
totemnet_iface_print (rrp_instance->net_handles[i]),
active_instance->counter_problems[i],
rrp_instance->totem_config->rrp_problem_count_threshold);
}
log_printf (
rrp_instance->totemrrp_log_level_warning,
"%s",
rrp_instance->status[i]);
}
}
if (problem_found) {
active_timer_problem_decrementer_start (active_instance);
} else {
active_instance->timer_problem_decrementer = 0;
}
}
static void timer_function_active_token_expired (void *context)
{
struct active_instance *active_instance = (struct active_instance *)context;
struct totemrrp_instance *rrp_instance = active_instance->rrp_instance;
unsigned int i;
for (i = 0; i < rrp_instance->interface_count; i++) {
if (active_instance->last_token_recv[i] == 0) {
active_instance->counter_problems[i] += 1;
if (active_instance->timer_problem_decrementer == 0) {
active_timer_problem_decrementer_start (active_instance);
}
sprintf (rrp_instance->status[i],
"Incrementing problem counter for seqid %d iface %s to [%d of %d]",
active_instance->last_token_seq,
totemnet_iface_print (rrp_instance->net_handles[i]),
active_instance->counter_problems[i],
rrp_instance->totem_config->rrp_problem_count_threshold);
log_printf (
rrp_instance->totemrrp_log_level_warning,
"%s",
rrp_instance->status[i]);
}
}
for (i = 0; i < rrp_instance->interface_count; i++) {
if (active_instance->counter_problems[i] >= rrp_instance->totem_config->rrp_problem_count_threshold &&
active_instance->faulty[i] == 0) {
active_instance->faulty[i] = 1;
qb_loop_timer_add (rrp_instance->poll_handle,
QB_LOOP_MED,
rrp_instance->totem_config->rrp_autorecovery_check_timeout*QB_TIME_NS_IN_MSEC,
rrp_instance->deliver_fn_context[i],
timer_function_test_ring_timeout,
&rrp_instance->timer_active_test_ring_timeout[i]);
stats_set_interface_faulty (rrp_instance, i, active_instance->faulty[i]);
sprintf (rrp_instance->status[i],
"Marking seqid %d ringid %u interface %s FAULTY",
active_instance->last_token_seq,
i,
totemnet_iface_print (rrp_instance->net_handles[i]));
log_printf (
rrp_instance->totemrrp_log_level_error,
"%s",
rrp_instance->status[i]);
active_timer_problem_decrementer_cancel (active_instance);
}
}
rrp_instance->totemrrp_deliver_fn (
active_instance->totemrrp_context,
active_instance->token,
active_instance->token_len);
}
static void active_timer_expired_token_start (
struct active_instance *active_instance)
{
qb_loop_timer_add (
active_instance->rrp_instance->poll_handle,
QB_LOOP_MED,
active_instance->rrp_instance->totem_config->rrp_token_expired_timeout*QB_TIME_NS_IN_MSEC,
(void *)active_instance,
timer_function_active_token_expired,
&active_instance->timer_expired_token);
}
static void active_timer_expired_token_cancel (
struct active_instance *active_instance)
{
qb_loop_timer_del (
active_instance->rrp_instance->poll_handle,
active_instance->timer_expired_token);
}
static void active_timer_problem_decrementer_start (
struct active_instance *active_instance)
{
qb_loop_timer_add (
active_instance->rrp_instance->poll_handle,
QB_LOOP_MED,
active_instance->rrp_instance->totem_config->rrp_problem_count_timeout*QB_TIME_NS_IN_MSEC,
(void *)active_instance,
timer_function_active_problem_decrementer,
&active_instance->timer_problem_decrementer);
}
static void active_timer_problem_decrementer_cancel (
struct active_instance *active_instance)
{
qb_loop_timer_del (
active_instance->rrp_instance->poll_handle,
active_instance->timer_problem_decrementer);
}
/*
* active replication
*/
static void active_mcast_recv (
struct totemrrp_instance *instance,
unsigned int iface_no,
void *context,
const void *msg,
unsigned int msg_len)
{
instance->totemrrp_deliver_fn (
context,
msg,
msg_len);
}
static void active_mcast_flush_send (
struct totemrrp_instance *instance,
const void *msg,
unsigned int msg_len)
{
int i;
struct active_instance *rrp_algo_instance = (struct active_instance *)instance->rrp_algo_instance;
for (i = 0; i < instance->interface_count; i++) {
if (rrp_algo_instance->faulty[i] == 0) {
totemnet_mcast_flush_send (instance->net_handles[i], msg, msg_len);
}
}
}
static void active_mcast_noflush_send (
struct totemrrp_instance *instance,
const void *msg,
unsigned int msg_len)
{
int i;
struct active_instance *rrp_algo_instance = (struct active_instance *)instance->rrp_algo_instance;
for (i = 0; i < instance->interface_count; i++) {
if (rrp_algo_instance->faulty[i] == 0) {
totemnet_mcast_noflush_send (instance->net_handles[i], msg, msg_len);
}
}
}
static void active_token_recv (
struct totemrrp_instance *rrp_instance,
unsigned int iface_no,
void *context,
const void *msg,
unsigned int msg_len,
unsigned int token_seq)
{
int i;
struct active_instance *active_instance = (struct active_instance *)rrp_instance->rrp_algo_instance;
active_instance->totemrrp_context = context;
if (sq_lt_compare (active_instance->last_token_seq, token_seq)) {
memcpy (active_instance->token, msg, msg_len);
active_instance->token_len = msg_len;
for (i = 0; i < rrp_instance->interface_count; i++) {
active_instance->last_token_recv[i] = 0;
}
active_instance->last_token_recv[iface_no] = 1;
active_timer_expired_token_start (active_instance);
}
/*
* This doesn't follow spec because the spec assumes we will know
* when token resets occur.
*/
active_instance->last_token_seq = token_seq;
if (token_seq == active_instance->last_token_seq) {
active_instance->last_token_recv[iface_no] = 1;
for (i = 0; i < rrp_instance->interface_count; i++) {
if ((active_instance->last_token_recv[i] == 0) &&
active_instance->faulty[i] == 0) {
return; /* don't deliver token */
}
}
active_timer_expired_token_cancel (active_instance);
rrp_instance->totemrrp_deliver_fn (
context,
msg,
msg_len);
}
}
static void active_token_send (
struct totemrrp_instance *instance,
const void *msg,
unsigned int msg_len)
{
struct active_instance *rrp_algo_instance = (struct active_instance *)instance->rrp_algo_instance;
int i;
for (i = 0; i < instance->interface_count; i++) {
if (rrp_algo_instance->faulty[i] == 0) {
totemnet_token_send (
instance->net_handles[i],
msg, msg_len);
}
}
}
static void active_recv_flush (struct totemrrp_instance *instance)
{
struct active_instance *rrp_algo_instance = (struct active_instance *)instance->rrp_algo_instance;
unsigned int i;
for (i = 0; i < instance->interface_count; i++) {
if (rrp_algo_instance->faulty[i] == 0) {
totemnet_recv_flush (instance->net_handles[i]);
}
}
}
static void active_send_flush (struct totemrrp_instance *instance)
{
struct active_instance *rrp_algo_instance = (struct active_instance *)instance->rrp_algo_instance;
unsigned int i;
for (i = 0; i < instance->interface_count; i++) {
if (rrp_algo_instance->faulty[i] == 0) {
totemnet_send_flush (instance->net_handles[i]);
}
}
}
static int active_member_add (
struct totemrrp_instance *instance,
const struct totem_ip_address *member,
unsigned int iface_no)
{
int res;
res = totemnet_member_add (instance->net_handles[iface_no], member);
return (res);
}
static int active_member_remove (
struct totemrrp_instance *instance,
const struct totem_ip_address *member,
unsigned int iface_no)
{
int res;
res = totemnet_member_remove (instance->net_handles[iface_no], member);
return (res);
}
static void active_iface_check (struct totemrrp_instance *instance)
{
struct active_instance *rrp_algo_instance = (struct active_instance *)instance->rrp_algo_instance;
unsigned int i;
for (i = 0; i < instance->interface_count; i++) {
if (rrp_algo_instance->faulty[i] == 0) {
totemnet_iface_check (instance->net_handles[i]);
}
}
}
static void active_processor_count_set (
struct totemrrp_instance *instance,
unsigned int processor_count)
{
struct active_instance *rrp_algo_instance = (struct active_instance *)instance->rrp_algo_instance;
unsigned int i;
for (i = 0; i < instance->interface_count; i++) {
if (rrp_algo_instance->faulty[i] == 0) {
totemnet_processor_count_set (instance->net_handles[i],
processor_count);
}
}
}
static void active_token_target_set (
struct totemrrp_instance *instance,
struct totem_ip_address *token_target,
unsigned int iface_no)
{
totemnet_token_target_set (instance->net_handles[iface_no], token_target);
}
static int active_mcast_recv_empty (
struct totemrrp_instance *instance)
{
int res;
int msgs_emptied = 0;
int i;
for (i = 0; i < instance->interface_count; i++) {
res = totemnet_recv_mcast_empty (instance->net_handles[i]);
if (res == -1) {
return (-1);
}
if (res == 1) {
msgs_emptied = 1;
}
}
return (msgs_emptied);
}
static void active_ring_reenable (
struct totemrrp_instance *instance,
unsigned int iface_no)
{
struct active_instance *rrp_algo_instance = (struct active_instance *)instance->rrp_algo_instance;
int i;
if (iface_no == instance->interface_count) {
memset (rrp_algo_instance->last_token_recv, 0, sizeof (unsigned int) *
instance->interface_count);
memset (rrp_algo_instance->faulty, 0, sizeof (unsigned int) *
instance->interface_count);
memset (rrp_algo_instance->counter_problems, 0, sizeof (unsigned int) *
instance->interface_count);
for (i = 0; i < instance->interface_count; i++) {
stats_set_interface_faulty (instance, i, 0);
}
} else {
rrp_algo_instance->last_token_recv[iface_no] = 0;
rrp_algo_instance->faulty[iface_no] = 0;
rrp_algo_instance->counter_problems[iface_no] = 0;
stats_set_interface_faulty (instance, iface_no, 0);
}
}
static void totemrrp_instance_initialize (struct totemrrp_instance *instance)
{
memset (instance, 0, sizeof (struct totemrrp_instance));
}
static int totemrrp_algorithm_set (
struct totem_config *totem_config,
struct totemrrp_instance *instance)
{
unsigned int res = -1;
unsigned int i;
for (i = 0; i < RRP_ALGOS_COUNT; i++) {
if (strcmp (totem_config->rrp_mode, rrp_algos[i]->name) == 0) {
instance->rrp_algo = rrp_algos[i];
if (rrp_algos[i]->initialize) {
instance->rrp_algo_instance = rrp_algos[i]->initialize (
instance,
totem_config->interface_count);
}
res = 0;
break;
}
}
for (i = 0; i < totem_config->interface_count; i++) {
instance->status[i] = malloc (1024);
sprintf (instance->status[i], "ring %d active with no faults", i);
}
return (res);
}
void rrp_deliver_fn (
void *context,
const void *msg,
unsigned int msg_len)
{
unsigned int token_seqid;
unsigned int token_is;
struct deliver_fn_context *deliver_fn_context = (struct deliver_fn_context *)context;
struct totemrrp_instance *rrp_instance = deliver_fn_context->instance;
const struct message_header *hdr = msg;
struct message_header tmp_msg, activate_msg;
memset(&tmp_msg, 0, sizeof(struct message_header));
memset(&activate_msg, 0, sizeof(struct message_header));
rrp_instance->totemrrp_token_seqid_get (
msg,
&token_seqid,
&token_is);
if (hdr->type == MESSAGE_TYPE_RING_TEST_ACTIVE) {
log_printf (
rrp_instance->totemrrp_log_level_debug,
"received message requesting test of ring now active");
if (hdr->endian_detector != ENDIAN_LOCAL) {
test_active_msg_endian_convert(hdr, &tmp_msg);
hdr = &tmp_msg;
}
if (hdr->nodeid_activator == rrp_instance->my_nodeid) {
/*
* Send an activate message
*/
activate_msg.type = MESSAGE_TYPE_RING_TEST_ACTIVATE;
activate_msg.endian_detector = ENDIAN_LOCAL;
activate_msg.ring_number = hdr->ring_number;
activate_msg.nodeid_activator = rrp_instance->my_nodeid;
totemnet_token_send (
rrp_instance->net_handles[deliver_fn_context->iface_no],
&activate_msg, sizeof (struct message_header));
} else {
/*
* Send a ring test message
*/
totemnet_token_send (
rrp_instance->net_handles[deliver_fn_context->iface_no],
msg, msg_len);
}
} else
if (hdr->type == MESSAGE_TYPE_RING_TEST_ACTIVATE) {
log_printf (
rrp_instance->totemrrp_log_level_notice,
"Automatically recovered ring %d", hdr->ring_number);
if (hdr->endian_detector != ENDIAN_LOCAL) {
test_active_msg_endian_convert(hdr, &tmp_msg);
hdr = &tmp_msg;
}
totemrrp_ring_reenable (rrp_instance, deliver_fn_context->iface_no);
if (hdr->nodeid_activator != rrp_instance->my_nodeid) {
totemnet_token_send (
rrp_instance->net_handles[deliver_fn_context->iface_no],
msg, msg_len);
}
} else
if (token_is) {
/*
* Deliver to the token receiver for this rrp algorithm
*/
rrp_instance->rrp_algo->token_recv (
rrp_instance,
deliver_fn_context->iface_no,
deliver_fn_context->context,
msg,
msg_len,
token_seqid);
} else {
/*
* Deliver to the mcast receiver for this rrp algorithm
*/
rrp_instance->rrp_algo->mcast_recv (
rrp_instance,
deliver_fn_context->iface_no,
deliver_fn_context->context,
msg,
msg_len);
}
}
void rrp_iface_change_fn (
void *context,
const struct totem_ip_address *iface_addr)
{
struct deliver_fn_context *deliver_fn_context = (struct deliver_fn_context *)context;
deliver_fn_context->instance->my_nodeid = iface_addr->nodeid;
deliver_fn_context->instance->totemrrp_iface_change_fn (
deliver_fn_context->context,
iface_addr,
deliver_fn_context->iface_no);
}
int totemrrp_finalize (
void *rrp_context)
{
struct totemrrp_instance *instance = (struct totemrrp_instance *)rrp_context;
int i;
for (i = 0; i < instance->interface_count; i++) {
totemnet_finalize (instance->net_handles[i]);
}
free (instance->net_handles);
free (instance);
return (0);
}
static void rrp_target_set_completed (void *context)
{
struct deliver_fn_context *deliver_fn_context = (struct deliver_fn_context *)context;
deliver_fn_context->instance->totemrrp_target_set_completed (deliver_fn_context->context);
}
/*
* Totem Redundant Ring interface
* depends on poll abstraction, POSIX, IPV4
*/
/*
* Create an instance
*/
int totemrrp_initialize (
qb_loop_t *poll_handle,
void **rrp_context,
struct totem_config *totem_config,
totemsrp_stats_t *stats,
void *context,
void (*deliver_fn) (
void *context,
const void *msg,
unsigned int msg_len),
void (*iface_change_fn) (
void *context,
const struct totem_ip_address *iface_addr,
unsigned int iface_no),
void (*token_seqid_get) (
const void *msg,
unsigned int *seqid,
unsigned int *token_is),
unsigned int (*msgs_missing) (void),
void (*target_set_completed) (void *context))
{
struct totemrrp_instance *instance;
unsigned int res;
int i;
instance = malloc (sizeof (struct totemrrp_instance));
if (instance == 0) {
return (-1);
}
totemrrp_instance_initialize (instance);
instance->totem_config = totem_config;
stats->rrp = &instance->stats;
instance->stats.interface_count = totem_config->interface_count;
instance->stats.faulty = calloc(instance->stats.interface_count, sizeof(uint8_t));
res = totemrrp_algorithm_set (
instance->totem_config,
instance);
if (res == -1) {
goto error_destroy;
}
/*
* Configure logging
*/
instance->totemrrp_log_level_security = totem_config->totem_logging_configuration.log_level_security;
instance->totemrrp_log_level_error = totem_config->totem_logging_configuration.log_level_error;
instance->totemrrp_log_level_warning = totem_config->totem_logging_configuration.log_level_warning;
instance->totemrrp_log_level_notice = totem_config->totem_logging_configuration.log_level_notice;
instance->totemrrp_log_level_debug = totem_config->totem_logging_configuration.log_level_debug;
instance->totemrrp_subsys_id = totem_config->totem_logging_configuration.log_subsys_id;
instance->totemrrp_log_printf = totem_config->totem_logging_configuration.log_printf;
instance->interfaces = totem_config->interfaces;
instance->poll_handle = poll_handle;
instance->totemrrp_deliver_fn = deliver_fn;
instance->totemrrp_iface_change_fn = iface_change_fn;
instance->totemrrp_token_seqid_get = token_seqid_get;
instance->totemrrp_target_set_completed = target_set_completed;
instance->totemrrp_msgs_missing = msgs_missing;
instance->interface_count = totem_config->interface_count;
instance->net_handles = malloc (sizeof (void *) * totem_config->interface_count);
instance->context = context;
instance->poll_handle = poll_handle;
for (i = 0; i < totem_config->interface_count; i++) {
struct deliver_fn_context *deliver_fn_context;
deliver_fn_context = malloc (sizeof (struct deliver_fn_context));
assert (deliver_fn_context);
deliver_fn_context->instance = instance;
deliver_fn_context->context = context;
deliver_fn_context->iface_no = i;
instance->deliver_fn_context[i] = (void *)deliver_fn_context;
totemnet_initialize (
poll_handle,
&instance->net_handles[i],
totem_config,
+ stats,
i,
(void *)deliver_fn_context,
rrp_deliver_fn,
rrp_iface_change_fn,
rrp_target_set_completed);
totemnet_net_mtu_adjust (instance->net_handles[i], totem_config);
}
*rrp_context = instance;
return (0);
error_destroy:
free (instance);
return (res);
}
void *totemrrp_buffer_alloc (void *rrp_context)
{
struct totemrrp_instance *instance = rrp_context;
assert (instance != NULL);
return totemnet_buffer_alloc (instance->net_handles[0]);
}
void totemrrp_buffer_release (void *rrp_context, void *ptr)
{
struct totemrrp_instance *instance = rrp_context;
assert (instance != NULL);
totemnet_buffer_release (instance->net_handles[0], ptr);
}
int totemrrp_processor_count_set (
void *rrp_context,
unsigned int processor_count)
{
struct totemrrp_instance *instance = (struct totemrrp_instance *)rrp_context;
instance->rrp_algo->processor_count_set (instance, processor_count);
instance->processor_count = processor_count;
return (0);
}
int totemrrp_token_target_set (
void *rrp_context,
struct totem_ip_address *addr,
unsigned int iface_no)
{
struct totemrrp_instance *instance = (struct totemrrp_instance *)rrp_context;
instance->rrp_algo->token_target_set (instance, addr, iface_no);
return (0);
}
int totemrrp_recv_flush (void *rrp_context)
{
struct totemrrp_instance *instance = (struct totemrrp_instance *)rrp_context;
instance->rrp_algo->recv_flush (instance);
return (0);
}
int totemrrp_send_flush (void *rrp_context)
{
struct totemrrp_instance *instance = (struct totemrrp_instance *)rrp_context;
instance->rrp_algo->send_flush (instance);
return (0);
}
int totemrrp_token_send (
void *rrp_context,
const void *msg,
unsigned int msg_len)
{
struct totemrrp_instance *instance = (struct totemrrp_instance *)rrp_context;
instance->rrp_algo->token_send (instance, msg, msg_len);
return (0);
}
int totemrrp_mcast_flush_send (
void *rrp_context,
const void *msg,
unsigned int msg_len)
{
struct totemrrp_instance *instance = (struct totemrrp_instance *)rrp_context;
int res = 0;
// TODO this needs to return the result
instance->rrp_algo->mcast_flush_send (instance, msg, msg_len);
return (res);
}
int totemrrp_mcast_noflush_send (
void *rrp_context,
const void *msg,
unsigned int msg_len)
{
struct totemrrp_instance *instance = (struct totemrrp_instance *)rrp_context;
/*
* merge detects go out through mcast_flush_send so it is safe to
* flush these messages if we are only one processor. This avoids
* an encryption/hmac and decryption/hmac
*/
if (instance->processor_count > 1) {
// TODO this needs to return the result
instance->rrp_algo->mcast_noflush_send (instance, msg, msg_len);
}
return (0);
}
int totemrrp_iface_check (void *rrp_context)
{
struct totemrrp_instance *instance = (struct totemrrp_instance *)rrp_context;
instance->rrp_algo->iface_check (instance);
return (0);
}
int totemrrp_ifaces_get (
void *rrp_context,
char ***status,
unsigned int *iface_count)
{
struct totemrrp_instance *instance = (struct totemrrp_instance *)rrp_context;
*status = instance->status;
if (iface_count) {
*iface_count = instance->interface_count;
}
return (0);
}
int totemrrp_crypto_set (
void *rrp_context,
const char *cipher_type,
const char *hash_type)
{
struct totemrrp_instance *instance = (struct totemrrp_instance *)rrp_context;
int res;
res = totemnet_crypto_set(instance->net_handles[0], cipher_type, hash_type);
return (res);
}
/*
* iface_no indicates the interface number [0, ..., interface_count-1] of the
* specific ring which will be reenabled. We specify iface_no == interface_count
* means reenabling all the rings.
*/
int totemrrp_ring_reenable (
void *rrp_context,
unsigned int iface_no)
{
struct totemrrp_instance *instance = (struct totemrrp_instance *)rrp_context;
int res = 0;
unsigned int i;
instance->rrp_algo->ring_reenable (instance, iface_no);
if (iface_no == instance->interface_count) {
for (i = 0; i < instance->interface_count; i++) {
sprintf (instance->status[i], "ring %d active with no faults", i);
}
} else {
sprintf (instance->status[iface_no], "ring %d active with no faults", iface_no);
}
return (res);
}
extern int totemrrp_mcast_recv_empty (
void *rrp_context)
{
struct totemrrp_instance *instance = (struct totemrrp_instance *)rrp_context;
int res;
res = instance->rrp_algo->mcast_recv_empty (instance);
return (res);
}
int totemrrp_member_add (
void *rrp_context,
const struct totem_ip_address *member,
int iface_no)
{
struct totemrrp_instance *instance = (struct totemrrp_instance *)rrp_context;
int res;
res = instance->rrp_algo->member_add (instance, member, iface_no);
return (res);
}
int totemrrp_member_remove (
void *rrp_context,
const struct totem_ip_address *member,
int iface_no)
{
struct totemrrp_instance *instance = (struct totemrrp_instance *)rrp_context;
int res;
res = instance->rrp_algo->member_remove (instance, member, iface_no);
return (res);
}
diff --git a/exec/totemudp.c b/exec/totemudp.c
index e702a32b..a5169c2a 100644
--- a/exec/totemudp.c
+++ b/exec/totemudp.c
@@ -1,1415 +1,1423 @@
/*
* Copyright (c) 2005 MontaVista Software, Inc.
* Copyright (c) 2006-2012 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Steven Dake (sdake@redhat.com)
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <config.h>
#include <assert.h>
#include <pthread.h>
#include <sys/mman.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <netdb.h>
#include <sys/un.h>
#include <sys/ioctl.h>
#include <sys/param.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <sched.h>
#include <time.h>
#include <sys/time.h>
#include <sys/poll.h>
#include <sys/uio.h>
#include <limits.h>
#include <corosync/sq.h>
#include <corosync/swab.h>
#include <corosync/list.h>
#include <qb/qbdefs.h>
#include <qb/qbloop.h>
#define LOGSYS_UTILS_ONLY 1
#include <corosync/logsys.h>
#include "totemudp.h"
#include "util.h"
#include "totemcrypto.h"
#include <nss.h>
#include <pk11pub.h>
#include <pkcs11.h>
#include <prerror.h>
#ifndef MSG_NOSIGNAL
#define MSG_NOSIGNAL 0
#endif
#define MCAST_SOCKET_BUFFER_SIZE (TRANSMITS_ALLOWED * FRAME_SIZE_MAX)
#define NETIF_STATE_REPORT_UP 1
#define NETIF_STATE_REPORT_DOWN 2
#define BIND_STATE_UNBOUND 0
#define BIND_STATE_REGULAR 1
#define BIND_STATE_LOOPBACK 2
#define MESSAGE_TYPE_MEMB_JOIN 3
struct totemudp_socket {
int mcast_recv;
int mcast_send;
int token;
/*
* Socket used for local multicast delivery. We don't rely on multicast
* loop and rather this UNIX DGRAM socket is used. Socket is created by
* socketpair call and they are used in same way as pipe (so [0] is read
* end and [1] is write end)
*/
int local_mcast_loop[2];
};
struct totemudp_instance {
struct crypto_instance *crypto_inst;
qb_loop_t *totemudp_poll_handle;
struct totem_interface *totem_interface;
int netif_state_report;
int netif_bind_state;
void *context;
void (*totemudp_deliver_fn) (
void *context,
const void *msg,
unsigned int msg_len);
void (*totemudp_iface_change_fn) (
void *context,
const struct totem_ip_address *iface_address);
void (*totemudp_target_set_completed) (void *context);
/*
* Function and data used to log messages
*/
int totemudp_log_level_security;
int totemudp_log_level_error;
int totemudp_log_level_warning;
int totemudp_log_level_notice;
int totemudp_log_level_debug;
int totemudp_subsys_id;
void (*totemudp_log_printf) (
int level,
int subsys,
const char *function,
const char *file,
int line,
const char *format,
...)__attribute__((format(printf, 6, 7)));
void *udp_context;
char iov_buffer[FRAME_SIZE_MAX];
char iov_buffer_flush[FRAME_SIZE_MAX];
struct iovec totemudp_iov_recv;
struct iovec totemudp_iov_recv_flush;
struct totemudp_socket totemudp_sockets;
struct totem_ip_address mcast_address;
int stats_sent;
int stats_recv;
int stats_delv;
int stats_remcasts;
int stats_orf_token;
struct timeval stats_tv_start;
struct totem_ip_address my_id;
int firstrun;
qb_loop_timer_handle timer_netif_check_timeout;
unsigned int my_memb_entries;
int flushing;
struct totem_config *totem_config;
+ totemsrp_stats_t *stats;
+
struct totem_ip_address token_target;
};
struct work_item {
const void *msg;
unsigned int msg_len;
struct totemudp_instance *instance;
};
static int totemudp_build_sockets (
struct totemudp_instance *instance,
struct totem_ip_address *bindnet_address,
struct totem_ip_address *mcastaddress,
struct totemudp_socket *sockets,
struct totem_ip_address *bound_to);
static struct totem_ip_address localhost;
static void totemudp_instance_initialize (struct totemudp_instance *instance)
{
memset (instance, 0, sizeof (struct totemudp_instance));
instance->netif_state_report = NETIF_STATE_REPORT_UP | NETIF_STATE_REPORT_DOWN;
instance->totemudp_iov_recv.iov_base = instance->iov_buffer;
instance->totemudp_iov_recv.iov_len = FRAME_SIZE_MAX; //sizeof (instance->iov_buffer);
instance->totemudp_iov_recv_flush.iov_base = instance->iov_buffer_flush;
instance->totemudp_iov_recv_flush.iov_len = FRAME_SIZE_MAX; //sizeof (instance->iov_buffer);
/*
* There is always atleast 1 processor
*/
instance->my_memb_entries = 1;
}
#define log_printf(level, format, args...) \
do { \
instance->totemudp_log_printf ( \
level, instance->totemudp_subsys_id, \
__FUNCTION__, __FILE__, __LINE__, \
(const char *)format, ##args); \
} while (0);
#define LOGSYS_PERROR(err_num, level, fmt, args...) \
do { \
char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \
const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \
instance->totemudp_log_printf ( \
level, instance->totemudp_subsys_id, \
__FUNCTION__, __FILE__, __LINE__, \
fmt ": %s (%d)\n", ##args, _error_ptr, err_num); \
} while(0)
int totemudp_crypto_set (
void *udp_context,
const char *cipher_type,
const char *hash_type)
{
return (0);
}
static inline void ucast_sendmsg (
struct totemudp_instance *instance,
struct totem_ip_address *system_to,
const void *msg,
unsigned int msg_len)
{
struct msghdr msg_ucast;
int res = 0;
size_t buf_out_len;
unsigned char buf_out[FRAME_SIZE_MAX];
struct sockaddr_storage sockaddr;
struct iovec iovec;
int addrlen;
/*
* Encrypt and digest the message
*/
if (crypto_encrypt_and_sign (
instance->crypto_inst,
(const unsigned char *)msg,
msg_len,
buf_out,
&buf_out_len) != 0) {
log_printf(LOGSYS_LEVEL_CRIT, "Error encrypting/signing packet (non-critical)");
return;
}
iovec.iov_base = (void *)buf_out;
iovec.iov_len = buf_out_len;
/*
* Build unicast message
*/
memset(&msg_ucast, 0, sizeof(msg_ucast));
totemip_totemip_to_sockaddr_convert(system_to,
instance->totem_interface->ip_port, &sockaddr, &addrlen);
msg_ucast.msg_name = &sockaddr;
msg_ucast.msg_namelen = addrlen;
msg_ucast.msg_iov = (void *)&iovec;
msg_ucast.msg_iovlen = 1;
#ifdef HAVE_MSGHDR_CONTROL
msg_ucast.msg_control = 0;
#endif
#ifdef HAVE_MSGHDR_CONTROLLEN
msg_ucast.msg_controllen = 0;
#endif
#ifdef HAVE_MSGHDR_FLAGS
msg_ucast.msg_flags = 0;
#endif
#ifdef HAVE_MSGHDR_ACCRIGHTS
msg_ucast.msg_accrights = NULL;
#endif
#ifdef HAVE_MSGHDR_ACCRIGHTSLEN
msg_ucast.msg_accrightslen = 0;
#endif
/*
* Transmit unicast message
* An error here is recovered by totemsrp
*/
res = sendmsg (instance->totemudp_sockets.mcast_send, &msg_ucast,
MSG_NOSIGNAL);
if (res < 0) {
LOGSYS_PERROR (errno, instance->totemudp_log_level_debug,
"sendmsg(ucast) failed (non-critical)");
}
}
static inline void mcast_sendmsg (
struct totemudp_instance *instance,
const void *msg,
unsigned int msg_len)
{
struct msghdr msg_mcast;
int res = 0;
size_t buf_out_len;
unsigned char buf_out[FRAME_SIZE_MAX];
struct iovec iovec;
struct sockaddr_storage sockaddr;
int addrlen;
/*
* Encrypt and digest the message
*/
if (crypto_encrypt_and_sign (
instance->crypto_inst,
(const unsigned char *)msg,
msg_len,
buf_out,
&buf_out_len) != 0) {
log_printf(LOGSYS_LEVEL_CRIT, "Error encrypting/signing packet (non-critical)");
return;
}
iovec.iov_base = (void *)&buf_out;
iovec.iov_len = buf_out_len;
/*
* Build multicast message
*/
totemip_totemip_to_sockaddr_convert(&instance->mcast_address,
instance->totem_interface->ip_port, &sockaddr, &addrlen);
memset(&msg_mcast, 0, sizeof(msg_mcast));
msg_mcast.msg_name = &sockaddr;
msg_mcast.msg_namelen = addrlen;
msg_mcast.msg_iov = (void *)&iovec;
msg_mcast.msg_iovlen = 1;
#ifdef HAVE_MSGHDR_CONTROL
msg_mcast.msg_control = 0;
#endif
#ifdef HAVE_MSGHDR_CONTROLLEN
msg_mcast.msg_controllen = 0;
#endif
#ifdef HAVE_MSGHDR_FLAGS
msg_mcast.msg_flags = 0;
#endif
#ifdef HAVE_MSGHDR_ACCRIGHTS
msg_mcast.msg_accrights = NULL;
#endif
#ifdef HAVE_MSGHDR_ACCRIGHTSLEN
msg_mcast.msg_accrightslen = 0;
#endif
/*
* Transmit multicast message
* An error here is recovered by totemsrp
*/
res = sendmsg (instance->totemudp_sockets.mcast_send, &msg_mcast,
MSG_NOSIGNAL);
if (res < 0) {
LOGSYS_PERROR (errno, instance->totemudp_log_level_debug,
"sendmsg(mcast) failed (non-critical)");
+ instance->stats->continuous_sendmsg_failures++;
+ } else {
+ instance->stats->continuous_sendmsg_failures = 0;
}
/*
* Transmit multicast message to local unix mcast loop
* An error here is recovered by totemsrp
*/
msg_mcast.msg_name = NULL;
msg_mcast.msg_namelen = 0;
res = sendmsg (instance->totemudp_sockets.local_mcast_loop[1], &msg_mcast,
MSG_NOSIGNAL);
if (res < 0) {
LOGSYS_PERROR (errno, instance->totemudp_log_level_debug,
"sendmsg(local mcast loop) failed (non-critical)");
}
}
int totemudp_finalize (
void *udp_context)
{
struct totemudp_instance *instance = (struct totemudp_instance *)udp_context;
int res = 0;
if (instance->totemudp_sockets.mcast_recv > 0) {
qb_loop_poll_del (instance->totemudp_poll_handle,
instance->totemudp_sockets.mcast_recv);
close (instance->totemudp_sockets.mcast_recv);
}
if (instance->totemudp_sockets.mcast_send > 0) {
close (instance->totemudp_sockets.mcast_send);
}
if (instance->totemudp_sockets.local_mcast_loop[0] > 0) {
qb_loop_poll_del (instance->totemudp_poll_handle,
instance->totemudp_sockets.local_mcast_loop[0]);
close (instance->totemudp_sockets.local_mcast_loop[0]);
close (instance->totemudp_sockets.local_mcast_loop[1]);
}
if (instance->totemudp_sockets.token > 0) {
qb_loop_poll_del (instance->totemudp_poll_handle,
instance->totemudp_sockets.token);
close (instance->totemudp_sockets.token);
}
return (res);
}
/*
* Only designed to work with a message with one iov
*/
static int net_deliver_fn (
int fd,
int revents,
void *data)
{
struct totemudp_instance *instance = (struct totemudp_instance *)data;
struct msghdr msg_recv;
struct iovec *iovec;
struct sockaddr_storage system_from;
int bytes_received;
int res = 0;
char *message_type;
if (instance->flushing == 1) {
iovec = &instance->totemudp_iov_recv_flush;
} else {
iovec = &instance->totemudp_iov_recv;
}
/*
* Receive datagram
*/
msg_recv.msg_name = &system_from;
msg_recv.msg_namelen = sizeof (struct sockaddr_storage);
msg_recv.msg_iov = iovec;
msg_recv.msg_iovlen = 1;
#ifdef HAVE_MSGHDR_CONTROL
msg_recv.msg_control = 0;
#endif
#ifdef HAVE_MSGHDR_CONTROLLEN
msg_recv.msg_controllen = 0;
#endif
#ifdef HAVE_MSGHDR_FLAGS
msg_recv.msg_flags = 0;
#endif
#ifdef HAVE_MSGHDR_ACCRIGHTS
msg_recv.msg_accrights = NULL;
#endif
#ifdef HAVE_MSGHDR_ACCRIGHTSLEN
msg_recv.msg_accrightslen = 0;
#endif
bytes_received = recvmsg (fd, &msg_recv, MSG_NOSIGNAL | MSG_DONTWAIT);
if (bytes_received == -1) {
return (0);
} else {
instance->stats_recv += bytes_received;
}
/*
* Authenticate and if authenticated, decrypt datagram
*/
res = crypto_authenticate_and_decrypt (instance->crypto_inst, iovec->iov_base, &bytes_received);
if (res == -1) {
log_printf (instance->totemudp_log_level_security, "Received message has invalid digest... ignoring.");
log_printf (instance->totemudp_log_level_security,
"Invalid packet data");
iovec->iov_len = FRAME_SIZE_MAX;
return 0;
}
iovec->iov_len = bytes_received;
/*
* Drop all non-mcast messages (more specifically join
* messages should be dropped)
*/
message_type = (char *)iovec->iov_base;
if (instance->flushing == 1 && *message_type == MESSAGE_TYPE_MEMB_JOIN) {
iovec->iov_len = FRAME_SIZE_MAX;
return (0);
}
/*
* Handle incoming message
*/
instance->totemudp_deliver_fn (
instance->context,
iovec->iov_base,
iovec->iov_len);
iovec->iov_len = FRAME_SIZE_MAX;
return (0);
}
static int netif_determine (
struct totemudp_instance *instance,
struct totem_ip_address *bindnet,
struct totem_ip_address *bound_to,
int *interface_up,
int *interface_num)
{
int res;
res = totemip_iface_check (bindnet, bound_to,
interface_up, interface_num,
instance->totem_config->clear_node_high_bit);
return (res);
}
/*
* If the interface is up, the sockets for totem are built. If the interface is down
* this function is requeued in the timer list to retry building the sockets later.
*/
static void timer_function_netif_check_timeout (
void *data)
{
struct totemudp_instance *instance = (struct totemudp_instance *)data;
int interface_up;
int interface_num;
struct totem_ip_address *bind_address;
/*
* Build sockets for every interface
*/
netif_determine (instance,
&instance->totem_interface->bindnet,
&instance->totem_interface->boundto,
&interface_up, &interface_num);
/*
* If the network interface isn't back up and we are already
* in loopback mode, add timer to check again and return
*/
if ((instance->netif_bind_state == BIND_STATE_LOOPBACK &&
interface_up == 0) ||
(instance->my_memb_entries == 1 &&
instance->netif_bind_state == BIND_STATE_REGULAR &&
interface_up == 1)) {
qb_loop_timer_add (instance->totemudp_poll_handle,
QB_LOOP_MED,
instance->totem_config->downcheck_timeout*QB_TIME_NS_IN_MSEC,
(void *)instance,
timer_function_netif_check_timeout,
&instance->timer_netif_check_timeout);
/*
* Add a timer to check for a downed regular interface
*/
return;
}
if (instance->totemudp_sockets.mcast_recv > 0) {
qb_loop_poll_del (instance->totemudp_poll_handle,
instance->totemudp_sockets.mcast_recv);
close (instance->totemudp_sockets.mcast_recv);
}
if (instance->totemudp_sockets.mcast_send > 0) {
close (instance->totemudp_sockets.mcast_send);
}
if (instance->totemudp_sockets.local_mcast_loop[0] > 0) {
qb_loop_poll_del (instance->totemudp_poll_handle,
instance->totemudp_sockets.local_mcast_loop[0]);
close (instance->totemudp_sockets.local_mcast_loop[0]);
close (instance->totemudp_sockets.local_mcast_loop[1]);
}
if (instance->totemudp_sockets.token > 0) {
qb_loop_poll_del (instance->totemudp_poll_handle,
instance->totemudp_sockets.token);
close (instance->totemudp_sockets.token);
}
if (interface_up == 0) {
/*
* Interface is not up
*/
instance->netif_bind_state = BIND_STATE_LOOPBACK;
bind_address = &localhost;
/*
* Add a timer to retry building interfaces and request memb_gather_enter
*/
qb_loop_timer_add (instance->totemudp_poll_handle,
QB_LOOP_MED,
instance->totem_config->downcheck_timeout*QB_TIME_NS_IN_MSEC,
(void *)instance,
timer_function_netif_check_timeout,
&instance->timer_netif_check_timeout);
} else {
/*
* Interface is up
*/
instance->netif_bind_state = BIND_STATE_REGULAR;
bind_address = &instance->totem_interface->bindnet;
}
/*
* Create and bind the multicast and unicast sockets
*/
(void)totemudp_build_sockets (instance,
&instance->mcast_address,
bind_address,
&instance->totemudp_sockets,
&instance->totem_interface->boundto);
qb_loop_poll_add (
instance->totemudp_poll_handle,
QB_LOOP_MED,
instance->totemudp_sockets.mcast_recv,
POLLIN, instance, net_deliver_fn);
qb_loop_poll_add (
instance->totemudp_poll_handle,
QB_LOOP_MED,
instance->totemudp_sockets.local_mcast_loop[0],
POLLIN, instance, net_deliver_fn);
qb_loop_poll_add (
instance->totemudp_poll_handle,
QB_LOOP_MED,
instance->totemudp_sockets.token,
POLLIN, instance, net_deliver_fn);
totemip_copy (&instance->my_id, &instance->totem_interface->boundto);
/*
* This reports changes in the interface to the user and totemsrp
*/
if (instance->netif_bind_state == BIND_STATE_REGULAR) {
if (instance->netif_state_report & NETIF_STATE_REPORT_UP) {
log_printf (instance->totemudp_log_level_notice,
"The network interface [%s] is now up.",
totemip_print (&instance->totem_interface->boundto));
instance->netif_state_report = NETIF_STATE_REPORT_DOWN;
instance->totemudp_iface_change_fn (instance->context, &instance->my_id);
}
/*
* Add a timer to check for interface going down in single membership
*/
if (instance->my_memb_entries == 1) {
qb_loop_timer_add (instance->totemudp_poll_handle,
QB_LOOP_MED,
instance->totem_config->downcheck_timeout*QB_TIME_NS_IN_MSEC,
(void *)instance,
timer_function_netif_check_timeout,
&instance->timer_netif_check_timeout);
}
} else {
if (instance->netif_state_report & NETIF_STATE_REPORT_DOWN) {
log_printf (instance->totemudp_log_level_notice,
"The network interface is down.");
instance->totemudp_iface_change_fn (instance->context, &instance->my_id);
}
instance->netif_state_report = NETIF_STATE_REPORT_UP;
}
}
/* Set the socket priority to INTERACTIVE to ensure
that our messages don't get queued behind anything else */
static void totemudp_traffic_control_set(struct totemudp_instance *instance, int sock)
{
#ifdef SO_PRIORITY
int prio = 6; /* TC_PRIO_INTERACTIVE */
if (setsockopt(sock, SOL_SOCKET, SO_PRIORITY, &prio, sizeof(int))) {
LOGSYS_PERROR (errno, instance->totemudp_log_level_warning, "Could not set traffic priority");
}
#endif
}
static int totemudp_build_sockets_ip (
struct totemudp_instance *instance,
struct totem_ip_address *mcast_address,
struct totem_ip_address *bindnet_address,
struct totemudp_socket *sockets,
struct totem_ip_address *bound_to,
int interface_num)
{
struct sockaddr_storage sockaddr;
struct ipv6_mreq mreq6;
struct ip_mreq mreq;
struct sockaddr_storage mcast_ss, boundto_ss;
struct sockaddr_in6 *mcast_sin6 = (struct sockaddr_in6 *)&mcast_ss;
struct sockaddr_in *mcast_sin = (struct sockaddr_in *)&mcast_ss;
struct sockaddr_in *boundto_sin = (struct sockaddr_in *)&boundto_ss;
unsigned int sendbuf_size;
unsigned int recvbuf_size;
unsigned int optlen = sizeof (sendbuf_size);
int addrlen;
int res;
int flag;
uint8_t sflag;
int i;
/*
* Create multicast recv socket
*/
sockets->mcast_recv = socket (bindnet_address->family, SOCK_DGRAM, 0);
if (sockets->mcast_recv == -1) {
LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
"socket() failed");
return (-1);
}
totemip_nosigpipe (sockets->mcast_recv);
res = fcntl (sockets->mcast_recv, F_SETFL, O_NONBLOCK);
if (res == -1) {
LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
"Could not set non-blocking operation on multicast socket");
return (-1);
}
/*
* Force reuse
*/
flag = 1;
if ( setsockopt(sockets->mcast_recv, SOL_SOCKET, SO_REUSEADDR, (char *)&flag, sizeof (flag)) < 0) {
LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
"setsockopt(SO_REUSEADDR) failed");
return (-1);
}
/*
* Bind to multicast socket used for multicast receives
*/
totemip_totemip_to_sockaddr_convert(mcast_address,
instance->totem_interface->ip_port, &sockaddr, &addrlen);
res = bind (sockets->mcast_recv, (struct sockaddr *)&sockaddr, addrlen);
if (res == -1) {
LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
"Unable to bind the socket to receive multicast packets");
return (-1);
}
/*
* Create local multicast loop socket
*/
if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets->local_mcast_loop) == -1) {
LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
"socket() failed");
return (-1);
}
for (i = 0; i < 2; i++) {
totemip_nosigpipe (sockets->local_mcast_loop[i]);
res = fcntl (sockets->local_mcast_loop[i], F_SETFL, O_NONBLOCK);
if (res == -1) {
LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
"Could not set non-blocking operation on multicast socket");
return (-1);
}
}
/*
* Setup mcast send socket
*/
sockets->mcast_send = socket (bindnet_address->family, SOCK_DGRAM, 0);
if (sockets->mcast_send == -1) {
LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
"socket() failed");
return (-1);
}
totemip_nosigpipe (sockets->mcast_send);
res = fcntl (sockets->mcast_send, F_SETFL, O_NONBLOCK);
if (res == -1) {
LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
"Could not set non-blocking operation on multicast socket");
return (-1);
}
/*
* Force reuse
*/
flag = 1;
if ( setsockopt(sockets->mcast_send, SOL_SOCKET, SO_REUSEADDR, (char *)&flag, sizeof (flag)) < 0) {
LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
"setsockopt(SO_REUSEADDR) failed");
return (-1);
}
totemip_totemip_to_sockaddr_convert(bound_to, instance->totem_interface->ip_port - 1,
&sockaddr, &addrlen);
res = bind (sockets->mcast_send, (struct sockaddr *)&sockaddr, addrlen);
if (res == -1) {
LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
"Unable to bind the socket to send multicast packets");
return (-1);
}
/*
* Setup unicast socket
*/
sockets->token = socket (bindnet_address->family, SOCK_DGRAM, 0);
if (sockets->token == -1) {
LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
"socket() failed");
return (-1);
}
totemip_nosigpipe (sockets->token);
res = fcntl (sockets->token, F_SETFL, O_NONBLOCK);
if (res == -1) {
LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
"Could not set non-blocking operation on token socket");
return (-1);
}
/*
* Force reuse
*/
flag = 1;
if ( setsockopt(sockets->token, SOL_SOCKET, SO_REUSEADDR, (char *)&flag, sizeof (flag)) < 0) {
LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
"setsockopt(SO_REUSEADDR) failed");
return (-1);
}
/*
* Bind to unicast socket used for token send/receives
* This has the side effect of binding to the correct interface
*/
totemip_totemip_to_sockaddr_convert(bound_to, instance->totem_interface->ip_port, &sockaddr, &addrlen);
res = bind (sockets->token, (struct sockaddr *)&sockaddr, addrlen);
if (res == -1) {
LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
"Unable to bind UDP unicast socket");
return (-1);
}
recvbuf_size = MCAST_SOCKET_BUFFER_SIZE;
sendbuf_size = MCAST_SOCKET_BUFFER_SIZE;
/*
* Set buffer sizes to avoid overruns
*/
res = setsockopt (sockets->mcast_recv, SOL_SOCKET, SO_RCVBUF, &recvbuf_size, optlen);
if (res == -1) {
LOGSYS_PERROR (errno, instance->totemudp_log_level_debug,
"Unable to set SO_RCVBUF size on UDP mcast socket");
return (-1);
}
res = setsockopt (sockets->mcast_send, SOL_SOCKET, SO_SNDBUF, &sendbuf_size, optlen);
if (res == -1) {
LOGSYS_PERROR (errno, instance->totemudp_log_level_debug,
"Unable to set SO_SNDBUF size on UDP mcast socket");
return (-1);
}
res = setsockopt (sockets->local_mcast_loop[0], SOL_SOCKET, SO_RCVBUF, &recvbuf_size, optlen);
if (res == -1) {
LOGSYS_PERROR (errno, instance->totemudp_log_level_debug,
"Unable to set SO_RCVBUF size on UDP local mcast loop socket");
return (-1);
}
res = setsockopt (sockets->local_mcast_loop[1], SOL_SOCKET, SO_SNDBUF, &sendbuf_size, optlen);
if (res == -1) {
LOGSYS_PERROR (errno, instance->totemudp_log_level_debug,
"Unable to set SO_SNDBUF size on UDP local mcast loop socket");
return (-1);
}
res = getsockopt (sockets->mcast_recv, SOL_SOCKET, SO_RCVBUF, &recvbuf_size, &optlen);
if (res == 0) {
log_printf (instance->totemudp_log_level_debug,
"Receive multicast socket recv buffer size (%d bytes).", recvbuf_size);
}
res = getsockopt (sockets->mcast_send, SOL_SOCKET, SO_SNDBUF, &sendbuf_size, &optlen);
if (res == 0) {
log_printf (instance->totemudp_log_level_debug,
"Transmit multicast socket send buffer size (%d bytes).", sendbuf_size);
}
res = getsockopt (sockets->local_mcast_loop[0], SOL_SOCKET, SO_RCVBUF, &recvbuf_size, &optlen);
if (res == 0) {
log_printf (instance->totemudp_log_level_debug,
"Local receive multicast loop socket recv buffer size (%d bytes).", recvbuf_size);
}
res = getsockopt (sockets->local_mcast_loop[1], SOL_SOCKET, SO_SNDBUF, &sendbuf_size, &optlen);
if (res == 0) {
log_printf (instance->totemudp_log_level_debug,
"Local transmit multicast loop socket send buffer size (%d bytes).", sendbuf_size);
}
/*
* Join group membership on socket
*/
totemip_totemip_to_sockaddr_convert(mcast_address, instance->totem_interface->ip_port, &mcast_ss, &addrlen);
totemip_totemip_to_sockaddr_convert(bound_to, instance->totem_interface->ip_port, &boundto_ss, &addrlen);
if (instance->totem_config->broadcast_use == 1) {
unsigned int broadcast = 1;
if ((setsockopt(sockets->mcast_recv, SOL_SOCKET,
SO_BROADCAST, &broadcast, sizeof (broadcast))) == -1) {
LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
"setting broadcast option failed");
return (-1);
}
if ((setsockopt(sockets->mcast_send, SOL_SOCKET,
SO_BROADCAST, &broadcast, sizeof (broadcast))) == -1) {
LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
"setting broadcast option failed");
return (-1);
}
} else {
switch (bindnet_address->family) {
case AF_INET:
memset(&mreq, 0, sizeof(mreq));
mreq.imr_multiaddr.s_addr = mcast_sin->sin_addr.s_addr;
mreq.imr_interface.s_addr = boundto_sin->sin_addr.s_addr;
res = setsockopt (sockets->mcast_recv, IPPROTO_IP, IP_ADD_MEMBERSHIP,
&mreq, sizeof (mreq));
if (res == -1) {
LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
"join ipv4 multicast group failed");
return (-1);
}
break;
case AF_INET6:
memset(&mreq6, 0, sizeof(mreq6));
memcpy(&mreq6.ipv6mr_multiaddr, &mcast_sin6->sin6_addr, sizeof(struct in6_addr));
mreq6.ipv6mr_interface = interface_num;
res = setsockopt (sockets->mcast_recv, IPPROTO_IPV6, IPV6_JOIN_GROUP,
&mreq6, sizeof (mreq6));
if (res == -1) {
LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
"join ipv6 multicast group failed");
return (-1);
}
break;
}
}
/*
* Turn off multicast loopback
*/
flag = 0;
switch ( bindnet_address->family ) {
case AF_INET:
sflag = 0;
res = setsockopt (sockets->mcast_send, IPPROTO_IP, IP_MULTICAST_LOOP,
&sflag, sizeof (sflag));
break;
case AF_INET6:
res = setsockopt (sockets->mcast_send, IPPROTO_IPV6, IPV6_MULTICAST_LOOP,
&flag, sizeof (flag));
}
if (res == -1) {
LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
"Unable to turn off multicast loopback");
return (-1);
}
/*
* Set multicast packets TTL
*/
flag = instance->totem_interface->ttl;
if (bindnet_address->family == AF_INET6) {
res = setsockopt (sockets->mcast_send, IPPROTO_IPV6, IPV6_MULTICAST_HOPS,
&flag, sizeof (flag));
if (res == -1) {
LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
"set mcast v6 TTL failed");
return (-1);
}
} else {
sflag = flag;
res = setsockopt(sockets->mcast_send, IPPROTO_IP, IP_MULTICAST_TTL,
&sflag, sizeof(sflag));
if (res == -1) {
LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
"set mcast v4 TTL failed");
return (-1);
}
}
/*
* Bind to a specific interface for multicast send and receive
*/
switch ( bindnet_address->family ) {
case AF_INET:
if (setsockopt (sockets->mcast_send, IPPROTO_IP, IP_MULTICAST_IF,
&boundto_sin->sin_addr, sizeof (boundto_sin->sin_addr)) < 0) {
LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
"cannot select interface for multicast packets (send)");
return (-1);
}
if (setsockopt (sockets->mcast_recv, IPPROTO_IP, IP_MULTICAST_IF,
&boundto_sin->sin_addr, sizeof (boundto_sin->sin_addr)) < 0) {
LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
"cannot select interface for multicast packets (recv)");
return (-1);
}
break;
case AF_INET6:
if (setsockopt (sockets->mcast_send, IPPROTO_IPV6, IPV6_MULTICAST_IF,
&interface_num, sizeof (interface_num)) < 0) {
LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
"cannot select interface for multicast packets (send v6)");
return (-1);
}
if (setsockopt (sockets->mcast_recv, IPPROTO_IPV6, IPV6_MULTICAST_IF,
&interface_num, sizeof (interface_num)) < 0) {
LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
"cannot select interface for multicast packets (recv v6)");
return (-1);
}
break;
}
return 0;
}
static int totemudp_build_sockets (
struct totemudp_instance *instance,
struct totem_ip_address *mcast_address,
struct totem_ip_address *bindnet_address,
struct totemudp_socket *sockets,
struct totem_ip_address *bound_to)
{
int interface_num;
int interface_up;
int res;
/*
* Determine the ip address bound to and the interface name
*/
res = netif_determine (instance,
bindnet_address,
bound_to,
&interface_up,
&interface_num);
if (res == -1) {
return (-1);
}
totemip_copy(&instance->my_id, bound_to);
res = totemudp_build_sockets_ip (instance, mcast_address,
bindnet_address, sockets, bound_to, interface_num);
/* We only send out of the token socket */
totemudp_traffic_control_set(instance, sockets->token);
return res;
}
/*
* Totem Network interface - also does encryption/decryption
* depends on poll abstraction, POSIX, IPV4
*/
/*
* Create an instance
*/
int totemudp_initialize (
qb_loop_t *poll_handle,
void **udp_context,
struct totem_config *totem_config,
+ totemsrp_stats_t *stats,
int interface_no,
void *context,
void (*deliver_fn) (
void *context,
const void *msg,
unsigned int msg_len),
void (*iface_change_fn) (
void *context,
const struct totem_ip_address *iface_address),
void (*target_set_completed) (
void *context))
{
struct totemudp_instance *instance;
instance = malloc (sizeof (struct totemudp_instance));
if (instance == NULL) {
return (-1);
}
totemudp_instance_initialize (instance);
instance->totem_config = totem_config;
+ instance->stats = stats;
+
/*
* Configure logging
*/
instance->totemudp_log_level_security = 1; //totem_config->totem_logging_configuration.log_level_security;
instance->totemudp_log_level_error = totem_config->totem_logging_configuration.log_level_error;
instance->totemudp_log_level_warning = totem_config->totem_logging_configuration.log_level_warning;
instance->totemudp_log_level_notice = totem_config->totem_logging_configuration.log_level_notice;
instance->totemudp_log_level_debug = totem_config->totem_logging_configuration.log_level_debug;
instance->totemudp_subsys_id = totem_config->totem_logging_configuration.log_subsys_id;
instance->totemudp_log_printf = totem_config->totem_logging_configuration.log_printf;
/*
* Initialize random number generator for later use to generate salt
*/
instance->crypto_inst = crypto_init (totem_config->private_key,
totem_config->private_key_len,
totem_config->crypto_cipher_type,
totem_config->crypto_hash_type,
instance->totemudp_log_printf,
instance->totemudp_log_level_security,
instance->totemudp_log_level_notice,
instance->totemudp_log_level_error,
instance->totemudp_subsys_id);
if (instance->crypto_inst == NULL) {
return (-1);
}
/*
* Initialize local variables for totemudp
*/
instance->totem_interface = &totem_config->interfaces[interface_no];
totemip_copy (&instance->mcast_address, &instance->totem_interface->mcast_addr);
memset (instance->iov_buffer, 0, FRAME_SIZE_MAX);
instance->totemudp_poll_handle = poll_handle;
instance->totem_interface->bindnet.nodeid = instance->totem_config->node_id;
instance->context = context;
instance->totemudp_deliver_fn = deliver_fn;
instance->totemudp_iface_change_fn = iface_change_fn;
instance->totemudp_target_set_completed = target_set_completed;
totemip_localhost (instance->mcast_address.family, &localhost);
localhost.nodeid = instance->totem_config->node_id;
/*
* RRP layer isn't ready to receive message because it hasn't
* initialized yet. Add short timer to check the interfaces.
*/
qb_loop_timer_add (instance->totemudp_poll_handle,
QB_LOOP_MED,
100*QB_TIME_NS_IN_MSEC,
(void *)instance,
timer_function_netif_check_timeout,
&instance->timer_netif_check_timeout);
*udp_context = instance;
return (0);
}
void *totemudp_buffer_alloc (void)
{
return malloc (FRAME_SIZE_MAX);
}
void totemudp_buffer_release (void *ptr)
{
return free (ptr);
}
int totemudp_processor_count_set (
void *udp_context,
int processor_count)
{
struct totemudp_instance *instance = (struct totemudp_instance *)udp_context;
int res = 0;
instance->my_memb_entries = processor_count;
qb_loop_timer_del (instance->totemudp_poll_handle,
instance->timer_netif_check_timeout);
if (processor_count == 1) {
qb_loop_timer_add (instance->totemudp_poll_handle,
QB_LOOP_MED,
instance->totem_config->downcheck_timeout*QB_TIME_NS_IN_MSEC,
(void *)instance,
timer_function_netif_check_timeout,
&instance->timer_netif_check_timeout);
}
return (res);
}
int totemudp_recv_flush (void *udp_context)
{
struct totemudp_instance *instance = (struct totemudp_instance *)udp_context;
struct pollfd ufd;
int nfds;
int res = 0;
int i;
int sock;
instance->flushing = 1;
for (i = 0; i < 2; i++) {
sock = -1;
if (i == 0) {
sock = instance->totemudp_sockets.mcast_recv;
}
if (i == 1) {
sock = instance->totemudp_sockets.local_mcast_loop[0];
}
assert(sock != -1);
do {
ufd.fd = sock;
ufd.events = POLLIN;
nfds = poll (&ufd, 1, 0);
if (nfds == 1 && ufd.revents & POLLIN) {
net_deliver_fn (sock, ufd.revents, instance);
}
} while (nfds == 1);
}
instance->flushing = 0;
return (res);
}
int totemudp_send_flush (void *udp_context)
{
return 0;
}
int totemudp_token_send (
void *udp_context,
const void *msg,
unsigned int msg_len)
{
struct totemudp_instance *instance = (struct totemudp_instance *)udp_context;
int res = 0;
ucast_sendmsg (instance, &instance->token_target, msg, msg_len);
return (res);
}
int totemudp_mcast_flush_send (
void *udp_context,
const void *msg,
unsigned int msg_len)
{
struct totemudp_instance *instance = (struct totemudp_instance *)udp_context;
int res = 0;
mcast_sendmsg (instance, msg, msg_len);
return (res);
}
int totemudp_mcast_noflush_send (
void *udp_context,
const void *msg,
unsigned int msg_len)
{
struct totemudp_instance *instance = (struct totemudp_instance *)udp_context;
int res = 0;
mcast_sendmsg (instance, msg, msg_len);
return (res);
}
extern int totemudp_iface_check (void *udp_context)
{
struct totemudp_instance *instance = (struct totemudp_instance *)udp_context;
int res = 0;
timer_function_netif_check_timeout (instance);
return (res);
}
extern void totemudp_net_mtu_adjust (void *udp_context, struct totem_config *totem_config)
{
#define UDPIP_HEADER_SIZE (20 + 8) /* 20 bytes for ip 8 bytes for udp */
totem_config->net_mtu -= crypto_sec_header_size(totem_config->crypto_cipher_type,
totem_config->crypto_hash_type) +
UDPIP_HEADER_SIZE;
}
const char *totemudp_iface_print (void *udp_context) {
struct totemudp_instance *instance = (struct totemudp_instance *)udp_context;
const char *ret_char;
ret_char = totemip_print (&instance->my_id);
return (ret_char);
}
int totemudp_iface_get (
void *udp_context,
struct totem_ip_address *addr)
{
struct totemudp_instance *instance = (struct totemudp_instance *)udp_context;
int res = 0;
memcpy (addr, &instance->my_id, sizeof (struct totem_ip_address));
return (res);
}
int totemudp_token_target_set (
void *udp_context,
const struct totem_ip_address *token_target)
{
struct totemudp_instance *instance = (struct totemudp_instance *)udp_context;
int res = 0;
memcpy (&instance->token_target, token_target,
sizeof (struct totem_ip_address));
instance->totemudp_target_set_completed (instance->context);
return (res);
}
extern int totemudp_recv_mcast_empty (
void *udp_context)
{
struct totemudp_instance *instance = (struct totemudp_instance *)udp_context;
unsigned int res;
struct sockaddr_storage system_from;
struct msghdr msg_recv;
struct pollfd ufd;
int nfds;
int msg_processed = 0;
int i;
int sock;
/*
* Receive datagram
*/
msg_recv.msg_name = &system_from;
msg_recv.msg_namelen = sizeof (struct sockaddr_storage);
msg_recv.msg_iov = &instance->totemudp_iov_recv_flush;
msg_recv.msg_iovlen = 1;
#ifdef HAVE_MSGHDR_CONTROL
msg_recv.msg_control = 0;
#endif
#ifdef HAVE_MSGHDR_CONTROLLEN
msg_recv.msg_controllen = 0;
#endif
#ifdef HAVE_MSGHDR_FLAGS
msg_recv.msg_flags = 0;
#endif
#ifdef HAVE_MSGHDR_ACCRIGHTS
msg_recv.msg_accrights = NULL;
#endif
#ifdef HAVE_MSGHDR_ACCRIGHTSLEN
msg_recv.msg_accrightslen = 0;
#endif
for (i = 0; i < 2; i++) {
sock = -1;
if (i == 0) {
sock = instance->totemudp_sockets.mcast_recv;
}
if (i == 1) {
sock = instance->totemudp_sockets.local_mcast_loop[0];
}
assert(sock != -1);
do {
ufd.fd = sock;
ufd.events = POLLIN;
nfds = poll (&ufd, 1, 0);
if (nfds == 1 && ufd.revents & POLLIN) {
res = recvmsg (sock, &msg_recv, MSG_NOSIGNAL | MSG_DONTWAIT);
if (res != -1) {
msg_processed = 1;
} else {
msg_processed = -1;
}
}
} while (nfds == 1);
}
return (msg_processed);
}
diff --git a/exec/totemudp.h b/exec/totemudp.h
index ba22b4b3..697307a9 100644
--- a/exec/totemudp.h
+++ b/exec/totemudp.h
@@ -1,117 +1,118 @@
/*
* Copyright (c) 2005 MontaVista Software, Inc.
* Copyright (c) 2006-2011 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Steven Dake (sdake@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef TOTEMUDP_H_DEFINED
#define TOTEMUDP_H_DEFINED
#include <sys/types.h>
#include <sys/socket.h>
#include <qb/qbloop.h>
#include <corosync/totem/totem.h>
/**
* Create an instance
*/
extern int totemudp_initialize (
qb_loop_t* poll_handle,
void **udp_context,
struct totem_config *totem_config,
+ totemsrp_stats_t *stats,
int interface_no,
void *context,
void (*deliver_fn) (
void *context,
const void *msg,
unsigned int msg_len),
void (*iface_change_fn) (
void *context,
const struct totem_ip_address *iface_address),
void (*target_set_completed) (
void *context));
extern void *totemudp_buffer_alloc (void);
extern void totemudp_buffer_release (void *ptr);
extern int totemudp_processor_count_set (
void *udp_context,
int processor_count);
extern int totemudp_token_send (
void *udp_context,
const void *msg,
unsigned int msg_len);
extern int totemudp_mcast_flush_send (
void *udp_context,
const void *msg,
unsigned int msg_len);
extern int totemudp_mcast_noflush_send (
void *udp_context,
const void *msg,
unsigned int msg_len);
extern int totemudp_recv_flush (void *udp_context);
extern int totemudp_send_flush (void *udp_context);
extern int totemudp_iface_check (void *udp_context);
extern int totemudp_finalize (void *udp_context);
extern void totemudp_net_mtu_adjust (void *udp_context, struct totem_config *totem_config);
extern const char *totemudp_iface_print (void *udp_context);
extern int totemudp_iface_get (
void *udp_context,
struct totem_ip_address *addr);
extern int totemudp_token_target_set (
void *udp_context,
const struct totem_ip_address *token_target);
extern int totemudp_crypto_set (
void *udp_context,
const char *cipher_type,
const char *hash_type);
extern int totemudp_recv_mcast_empty (
void *udp_context);
#endif /* TOTEMUDP_H_DEFINED */
diff --git a/exec/totemudpu.c b/exec/totemudpu.c
index ed3fa603..12ec63c0 100644
--- a/exec/totemudpu.c
+++ b/exec/totemudpu.c
@@ -1,1158 +1,1163 @@
/*
* Copyright (c) 2005 MontaVista Software, Inc.
* Copyright (c) 2006-2012 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Steven Dake (sdake@redhat.com)
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <config.h>
#include <assert.h>
#include <sys/mman.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <netdb.h>
#include <sys/un.h>
#include <sys/ioctl.h>
#include <sys/param.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <sched.h>
#include <time.h>
#include <sys/time.h>
#include <sys/poll.h>
#include <sys/uio.h>
#include <limits.h>
#include <qb/qbdefs.h>
#include <qb/qbloop.h>
#include <corosync/sq.h>
#include <corosync/list.h>
#include <corosync/swab.h>
#define LOGSYS_UTILS_ONLY 1
#include <corosync/logsys.h>
#include "totemudpu.h"
#include "util.h"
#include "totemcrypto.h"
#include <nss.h>
#include <pk11pub.h>
#include <pkcs11.h>
#include <prerror.h>
#ifndef MSG_NOSIGNAL
#define MSG_NOSIGNAL 0
#endif
#define MCAST_SOCKET_BUFFER_SIZE (TRANSMITS_ALLOWED * FRAME_SIZE_MAX)
#define NETIF_STATE_REPORT_UP 1
#define NETIF_STATE_REPORT_DOWN 2
#define BIND_STATE_UNBOUND 0
#define BIND_STATE_REGULAR 1
#define BIND_STATE_LOOPBACK 2
struct totemudpu_member {
struct list_head list;
struct totem_ip_address member;
int fd;
};
struct totemudpu_instance {
struct crypto_instance *crypto_inst;
qb_loop_t *totemudpu_poll_handle;
struct totem_interface *totem_interface;
int netif_state_report;
int netif_bind_state;
void *context;
void (*totemudpu_deliver_fn) (
void *context,
const void *msg,
unsigned int msg_len);
void (*totemudpu_iface_change_fn) (
void *context,
const struct totem_ip_address *iface_address);
void (*totemudpu_target_set_completed) (void *context);
/*
* Function and data used to log messages
*/
int totemudpu_log_level_security;
int totemudpu_log_level_error;
int totemudpu_log_level_warning;
int totemudpu_log_level_notice;
int totemudpu_log_level_debug;
int totemudpu_subsys_id;
void (*totemudpu_log_printf) (
int level,
int subsys,
const char *function,
const char *file,
int line,
const char *format,
...)__attribute__((format(printf, 6, 7)));
void *udpu_context;
char iov_buffer[FRAME_SIZE_MAX];
struct iovec totemudpu_iov_recv;
struct list_head member_list;
int stats_sent;
int stats_recv;
int stats_delv;
int stats_remcasts;
int stats_orf_token;
struct timeval stats_tv_start;
struct totem_ip_address my_id;
int firstrun;
qb_loop_timer_handle timer_netif_check_timeout;
unsigned int my_memb_entries;
struct totem_config *totem_config;
+ totemsrp_stats_t *stats;
+
struct totem_ip_address token_target;
int token_socket;
};
struct work_item {
const void *msg;
unsigned int msg_len;
struct totemudpu_instance *instance;
};
static int totemudpu_build_sockets (
struct totemudpu_instance *instance,
struct totem_ip_address *bindnet_address,
struct totem_ip_address *bound_to);
static int totemudpu_create_sending_socket(
void *udpu_context,
const struct totem_ip_address *member);
int totemudpu_member_list_rebind_ip (
void *udpu_context);
static struct totem_ip_address localhost;
static void totemudpu_instance_initialize (struct totemudpu_instance *instance)
{
memset (instance, 0, sizeof (struct totemudpu_instance));
instance->netif_state_report = NETIF_STATE_REPORT_UP | NETIF_STATE_REPORT_DOWN;
instance->totemudpu_iov_recv.iov_base = instance->iov_buffer;
instance->totemudpu_iov_recv.iov_len = FRAME_SIZE_MAX; //sizeof (instance->iov_buffer);
/*
* There is always atleast 1 processor
*/
instance->my_memb_entries = 1;
list_init (&instance->member_list);
}
#define log_printf(level, format, args...) \
do { \
instance->totemudpu_log_printf ( \
level, instance->totemudpu_subsys_id, \
__FUNCTION__, __FILE__, __LINE__, \
(const char *)format, ##args); \
} while (0);
#define LOGSYS_PERROR(err_num, level, fmt, args...) \
do { \
char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \
const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \
instance->totemudpu_log_printf ( \
level, instance->totemudpu_subsys_id, \
__FUNCTION__, __FILE__, __LINE__, \
fmt ": %s (%d)", ##args, _error_ptr, err_num); \
} while(0)
int totemudpu_crypto_set (
void *udpu_context,
const char *cipher_type,
const char *hash_type)
{
return (0);
}
static inline void ucast_sendmsg (
struct totemudpu_instance *instance,
struct totem_ip_address *system_to,
const void *msg,
unsigned int msg_len)
{
struct msghdr msg_ucast;
int res = 0;
size_t buf_out_len;
unsigned char buf_out[FRAME_SIZE_MAX];
struct sockaddr_storage sockaddr;
struct iovec iovec;
int addrlen;
/*
* Encrypt and digest the message
*/
if (crypto_encrypt_and_sign (
instance->crypto_inst,
(const unsigned char *)msg,
msg_len,
buf_out,
&buf_out_len) != 0) {
log_printf(LOGSYS_LEVEL_CRIT, "Error encrypting/signing packet (non-critical)");
return;
}
iovec.iov_base = (void *)buf_out;
iovec.iov_len = buf_out_len;
/*
* Build unicast message
*/
totemip_totemip_to_sockaddr_convert(system_to,
instance->totem_interface->ip_port, &sockaddr, &addrlen);
memset(&msg_ucast, 0, sizeof(msg_ucast));
msg_ucast.msg_name = &sockaddr;
msg_ucast.msg_namelen = addrlen;
msg_ucast.msg_iov = (void *)&iovec;
msg_ucast.msg_iovlen = 1;
#ifdef HAVE_MSGHDR_CONTROL
msg_ucast.msg_control = 0;
#endif
#ifdef HAVE_MSGHDR_CONTROLLEN
msg_ucast.msg_controllen = 0;
#endif
#ifdef HAVE_MSGHDR_FLAGS
msg_ucast.msg_flags = 0;
#endif
#ifdef HAVE_MSGHDR_ACCRIGHTS
msg_ucast.msg_accrights = NULL;
#endif
#ifdef HAVE_MSGHDR_ACCRIGHTSLEN
msg_ucast.msg_accrightslen = 0;
#endif
/*
* Transmit unicast message
* An error here is recovered by totemsrp
*/
res = sendmsg (instance->token_socket, &msg_ucast, MSG_NOSIGNAL);
if (res < 0) {
LOGSYS_PERROR (errno, instance->totemudpu_log_level_debug,
"sendmsg(ucast) failed (non-critical)");
}
}
static inline void mcast_sendmsg (
struct totemudpu_instance *instance,
const void *msg,
unsigned int msg_len)
{
struct msghdr msg_mcast;
int res = 0;
size_t buf_out_len;
unsigned char buf_out[FRAME_SIZE_MAX];
struct iovec iovec;
struct sockaddr_storage sockaddr;
int addrlen;
struct list_head *list;
struct totemudpu_member *member;
/*
* Encrypt and digest the message
*/
if (crypto_encrypt_and_sign (
instance->crypto_inst,
(const unsigned char *)msg,
msg_len,
buf_out,
&buf_out_len) != 0) {
log_printf(LOGSYS_LEVEL_CRIT, "Error encrypting/signing packet (non-critical)");
return;
}
iovec.iov_base = (void *)buf_out;
iovec.iov_len = buf_out_len;
memset(&msg_mcast, 0, sizeof(msg_mcast));
/*
* Build multicast message
*/
for (list = instance->member_list.next;
list != &instance->member_list;
list = list->next) {
member = list_entry (list,
struct totemudpu_member,
list);
totemip_totemip_to_sockaddr_convert(&member->member,
instance->totem_interface->ip_port, &sockaddr, &addrlen);
msg_mcast.msg_name = &sockaddr;
msg_mcast.msg_namelen = addrlen;
msg_mcast.msg_iov = (void *)&iovec;
msg_mcast.msg_iovlen = 1;
#ifdef HAVE_MSGHDR_CONTROL
msg_mcast.msg_control = 0;
#endif
#ifdef HAVE_MSGHDR_CONTROLLEN
msg_mcast.msg_controllen = 0;
#endif
#ifdef HAVE_MSGHDR_FLAGS
msg_mcast.msg_flags = 0;
#endif
#ifdef HAVE_MSGHDR_ACCRIGHTS
msg_mcast.msg_accrights = NULL;
#endif
#ifdef HAVE_MSGHDR_ACCRIGHTSLEN
msg_mcast.msg_accrightslen = 0;
#endif
/*
* Transmit multicast message
* An error here is recovered by totemsrp
*/
res = sendmsg (member->fd, &msg_mcast, MSG_NOSIGNAL);
if (res < 0) {
LOGSYS_PERROR (errno, instance->totemudpu_log_level_debug,
"sendmsg(mcast) failed (non-critical)");
}
}
}
int totemudpu_finalize (
void *udpu_context)
{
struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
int res = 0;
if (instance->token_socket > 0) {
qb_loop_poll_del (instance->totemudpu_poll_handle,
instance->token_socket);
close (instance->token_socket);
}
return (res);
}
static int net_deliver_fn (
int fd,
int revents,
void *data)
{
struct totemudpu_instance *instance = (struct totemudpu_instance *)data;
struct msghdr msg_recv;
struct iovec *iovec;
struct sockaddr_storage system_from;
int bytes_received;
int res = 0;
iovec = &instance->totemudpu_iov_recv;
/*
* Receive datagram
*/
msg_recv.msg_name = &system_from;
msg_recv.msg_namelen = sizeof (struct sockaddr_storage);
msg_recv.msg_iov = iovec;
msg_recv.msg_iovlen = 1;
#ifdef HAVE_MSGHDR_CONTROL
msg_recv.msg_control = 0;
#endif
#ifdef HAVE_MSGHDR_CONTROLLEN
msg_recv.msg_controllen = 0;
#endif
#ifdef HAVE_MSGHDR_FLAGS
msg_recv.msg_flags = 0;
#endif
#ifdef HAVE_MSGHDR_ACCRIGHTS
msg_recv.msg_accrights = NULL;
#endif
#ifdef HAVE_MSGHDR_ACCRIGHTSLEN
msg_recv.msg_accrightslen = 0;
#endif
bytes_received = recvmsg (fd, &msg_recv, MSG_NOSIGNAL | MSG_DONTWAIT);
if (bytes_received == -1) {
return (0);
} else {
instance->stats_recv += bytes_received;
}
/*
* Authenticate and if authenticated, decrypt datagram
*/
res = crypto_authenticate_and_decrypt (instance->crypto_inst, iovec->iov_base, &bytes_received);
if (res == -1) {
log_printf (instance->totemudpu_log_level_security, "Received message has invalid digest... ignoring.");
log_printf (instance->totemudpu_log_level_security,
"Invalid packet data");
iovec->iov_len = FRAME_SIZE_MAX;
return 0;
}
iovec->iov_len = bytes_received;
/*
* Handle incoming message
*/
instance->totemudpu_deliver_fn (
instance->context,
iovec->iov_base,
iovec->iov_len);
iovec->iov_len = FRAME_SIZE_MAX;
return (0);
}
static int netif_determine (
struct totemudpu_instance *instance,
struct totem_ip_address *bindnet,
struct totem_ip_address *bound_to,
int *interface_up,
int *interface_num)
{
int res;
res = totemip_iface_check (bindnet, bound_to,
interface_up, interface_num,
instance->totem_config->clear_node_high_bit);
return (res);
}
/*
* If the interface is up, the sockets for totem are built. If the interface is down
* this function is requeued in the timer list to retry building the sockets later.
*/
static void timer_function_netif_check_timeout (
void *data)
{
struct totemudpu_instance *instance = (struct totemudpu_instance *)data;
int interface_up;
int interface_num;
struct totem_ip_address *bind_address;
/*
* Build sockets for every interface
*/
netif_determine (instance,
&instance->totem_interface->bindnet,
&instance->totem_interface->boundto,
&interface_up, &interface_num);
/*
* If the network interface isn't back up and we are already
* in loopback mode, add timer to check again and return
*/
if ((instance->netif_bind_state == BIND_STATE_LOOPBACK &&
interface_up == 0) ||
(instance->my_memb_entries == 1 &&
instance->netif_bind_state == BIND_STATE_REGULAR &&
interface_up == 1)) {
qb_loop_timer_add (instance->totemudpu_poll_handle,
QB_LOOP_MED,
instance->totem_config->downcheck_timeout*QB_TIME_NS_IN_MSEC,
(void *)instance,
timer_function_netif_check_timeout,
&instance->timer_netif_check_timeout);
/*
* Add a timer to check for a downed regular interface
*/
return;
}
if (instance->token_socket > 0) {
qb_loop_poll_del (instance->totemudpu_poll_handle,
instance->token_socket);
close (instance->token_socket);
}
if (interface_up == 0) {
/*
* Interface is not up
*/
instance->netif_bind_state = BIND_STATE_LOOPBACK;
bind_address = &localhost;
/*
* Add a timer to retry building interfaces and request memb_gather_enter
*/
qb_loop_timer_add (instance->totemudpu_poll_handle,
QB_LOOP_MED,
instance->totem_config->downcheck_timeout*QB_TIME_NS_IN_MSEC,
(void *)instance,
timer_function_netif_check_timeout,
&instance->timer_netif_check_timeout);
} else {
/*
* Interface is up
*/
instance->netif_bind_state = BIND_STATE_REGULAR;
bind_address = &instance->totem_interface->bindnet;
}
/*
* Create and bind the multicast and unicast sockets
*/
totemudpu_build_sockets (instance,
bind_address,
&instance->totem_interface->boundto);
qb_loop_poll_add (instance->totemudpu_poll_handle,
QB_LOOP_MED,
instance->token_socket,
POLLIN, instance, net_deliver_fn);
totemip_copy (&instance->my_id, &instance->totem_interface->boundto);
/*
* This reports changes in the interface to the user and totemsrp
*/
if (instance->netif_bind_state == BIND_STATE_REGULAR) {
if (instance->netif_state_report & NETIF_STATE_REPORT_UP) {
log_printf (instance->totemudpu_log_level_notice,
"The network interface [%s] is now up.",
totemip_print (&instance->totem_interface->boundto));
instance->netif_state_report = NETIF_STATE_REPORT_DOWN;
instance->totemudpu_iface_change_fn (instance->context, &instance->my_id);
}
/*
* Add a timer to check for interface going down in single membership
*/
if (instance->my_memb_entries == 1) {
qb_loop_timer_add (instance->totemudpu_poll_handle,
QB_LOOP_MED,
instance->totem_config->downcheck_timeout*QB_TIME_NS_IN_MSEC,
(void *)instance,
timer_function_netif_check_timeout,
&instance->timer_netif_check_timeout);
}
} else {
if (instance->netif_state_report & NETIF_STATE_REPORT_DOWN) {
log_printf (instance->totemudpu_log_level_notice,
"The network interface is down.");
instance->totemudpu_iface_change_fn (instance->context, &instance->my_id);
}
instance->netif_state_report = NETIF_STATE_REPORT_UP;
}
}
/* Set the socket priority to INTERACTIVE to ensure
that our messages don't get queued behind anything else */
static void totemudpu_traffic_control_set(struct totemudpu_instance *instance, int sock)
{
#ifdef SO_PRIORITY
int prio = 6; /* TC_PRIO_INTERACTIVE */
if (setsockopt(sock, SOL_SOCKET, SO_PRIORITY, &prio, sizeof(int))) {
LOGSYS_PERROR (errno, instance->totemudpu_log_level_warning,
"Could not set traffic priority");
}
#endif
}
static int totemudpu_build_sockets_ip (
struct totemudpu_instance *instance,
struct totem_ip_address *bindnet_address,
struct totem_ip_address *bound_to,
int interface_num)
{
struct sockaddr_storage sockaddr;
int addrlen;
int res;
unsigned int recvbuf_size;
unsigned int optlen = sizeof (recvbuf_size);
/*
* Setup unicast socket
*/
instance->token_socket = socket (bindnet_address->family, SOCK_DGRAM, 0);
if (instance->token_socket == -1) {
LOGSYS_PERROR (errno, instance->totemudpu_log_level_warning,
"socket() failed");
return (-1);
}
totemip_nosigpipe (instance->token_socket);
res = fcntl (instance->token_socket, F_SETFL, O_NONBLOCK);
if (res == -1) {
LOGSYS_PERROR (errno, instance->totemudpu_log_level_warning,
"Could not set non-blocking operation on token socket");
return (-1);
}
/*
* Bind to unicast socket used for token send/receives
* This has the side effect of binding to the correct interface
*/
totemip_totemip_to_sockaddr_convert(bound_to, instance->totem_interface->ip_port, &sockaddr, &addrlen);
res = bind (instance->token_socket, (struct sockaddr *)&sockaddr, addrlen);
if (res == -1) {
LOGSYS_PERROR (errno, instance->totemudpu_log_level_warning,
"bind token socket failed");
return (-1);
}
/*
* the token_socket can receive many messages. Allow a large number
* of receive messages on this socket
*/
recvbuf_size = MCAST_SOCKET_BUFFER_SIZE;
res = setsockopt (instance->token_socket, SOL_SOCKET, SO_RCVBUF,
&recvbuf_size, optlen);
if (res == -1) {
LOGSYS_PERROR (errno, instance->totemudpu_log_level_notice,
"Could not set recvbuf size");
}
return 0;
}
static int totemudpu_build_sockets (
struct totemudpu_instance *instance,
struct totem_ip_address *bindnet_address,
struct totem_ip_address *bound_to)
{
int interface_num;
int interface_up;
int res;
/*
* Determine the ip address bound to and the interface name
*/
res = netif_determine (instance,
bindnet_address,
bound_to,
&interface_up,
&interface_num);
if (res == -1) {
return (-1);
}
totemip_copy(&instance->my_id, bound_to);
res = totemudpu_build_sockets_ip (instance,
bindnet_address, bound_to, interface_num);
/* We only send out of the token socket */
totemudpu_traffic_control_set(instance, instance->token_socket);
/*
* Rebind all members to new ips
*/
totemudpu_member_list_rebind_ip(instance);
return res;
}
/*
* Totem Network interface - also does encryption/decryption
* depends on poll abstraction, POSIX, IPV4
*/
/*
* Create an instance
*/
int totemudpu_initialize (
qb_loop_t *poll_handle,
void **udpu_context,
struct totem_config *totem_config,
+ totemsrp_stats_t *stats,
int interface_no,
void *context,
void (*deliver_fn) (
void *context,
const void *msg,
unsigned int msg_len),
void (*iface_change_fn) (
void *context,
const struct totem_ip_address *iface_address),
void (*target_set_completed) (
void *context))
{
struct totemudpu_instance *instance;
instance = malloc (sizeof (struct totemudpu_instance));
if (instance == NULL) {
return (-1);
}
totemudpu_instance_initialize (instance);
instance->totem_config = totem_config;
+ instance->stats = stats;
+
/*
* Configure logging
*/
instance->totemudpu_log_level_security = 1; //totem_config->totem_logging_configuration.log_level_security;
instance->totemudpu_log_level_error = totem_config->totem_logging_configuration.log_level_error;
instance->totemudpu_log_level_warning = totem_config->totem_logging_configuration.log_level_warning;
instance->totemudpu_log_level_notice = totem_config->totem_logging_configuration.log_level_notice;
instance->totemudpu_log_level_debug = totem_config->totem_logging_configuration.log_level_debug;
instance->totemudpu_subsys_id = totem_config->totem_logging_configuration.log_subsys_id;
instance->totemudpu_log_printf = totem_config->totem_logging_configuration.log_printf;
/*
* Initialize random number generator for later use to generate salt
*/
instance->crypto_inst = crypto_init (totem_config->private_key,
totem_config->private_key_len,
totem_config->crypto_cipher_type,
totem_config->crypto_hash_type,
instance->totemudpu_log_printf,
instance->totemudpu_log_level_security,
instance->totemudpu_log_level_notice,
instance->totemudpu_log_level_error,
instance->totemudpu_subsys_id);
if (instance->crypto_inst == NULL) {
return (-1);
}
/*
* Initialize local variables for totemudpu
*/
instance->totem_interface = &totem_config->interfaces[interface_no];
memset (instance->iov_buffer, 0, FRAME_SIZE_MAX);
instance->totemudpu_poll_handle = poll_handle;
instance->totem_interface->bindnet.nodeid = instance->totem_config->node_id;
instance->context = context;
instance->totemudpu_deliver_fn = deliver_fn;
instance->totemudpu_iface_change_fn = iface_change_fn;
instance->totemudpu_target_set_completed = target_set_completed;
totemip_localhost (AF_INET, &localhost);
localhost.nodeid = instance->totem_config->node_id;
/*
* RRP layer isn't ready to receive message because it hasn't
* initialized yet. Add short timer to check the interfaces.
*/
qb_loop_timer_add (instance->totemudpu_poll_handle,
QB_LOOP_MED,
100*QB_TIME_NS_IN_MSEC,
(void *)instance,
timer_function_netif_check_timeout,
&instance->timer_netif_check_timeout);
*udpu_context = instance;
return (0);
}
void *totemudpu_buffer_alloc (void)
{
return malloc (FRAME_SIZE_MAX);
}
void totemudpu_buffer_release (void *ptr)
{
return free (ptr);
}
int totemudpu_processor_count_set (
void *udpu_context,
int processor_count)
{
struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
int res = 0;
instance->my_memb_entries = processor_count;
qb_loop_timer_del (instance->totemudpu_poll_handle,
instance->timer_netif_check_timeout);
if (processor_count == 1) {
qb_loop_timer_add (instance->totemudpu_poll_handle,
QB_LOOP_MED,
instance->totem_config->downcheck_timeout*QB_TIME_NS_IN_MSEC,
(void *)instance,
timer_function_netif_check_timeout,
&instance->timer_netif_check_timeout);
}
return (res);
}
int totemudpu_recv_flush (void *udpu_context)
{
int res = 0;
return (res);
}
int totemudpu_send_flush (void *udpu_context)
{
int res = 0;
return (res);
}
int totemudpu_token_send (
void *udpu_context,
const void *msg,
unsigned int msg_len)
{
struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
int res = 0;
ucast_sendmsg (instance, &instance->token_target, msg, msg_len);
return (res);
}
int totemudpu_mcast_flush_send (
void *udpu_context,
const void *msg,
unsigned int msg_len)
{
struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
int res = 0;
mcast_sendmsg (instance, msg, msg_len);
return (res);
}
int totemudpu_mcast_noflush_send (
void *udpu_context,
const void *msg,
unsigned int msg_len)
{
struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
int res = 0;
mcast_sendmsg (instance, msg, msg_len);
return (res);
}
extern int totemudpu_iface_check (void *udpu_context)
{
struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
int res = 0;
timer_function_netif_check_timeout (instance);
return (res);
}
extern void totemudpu_net_mtu_adjust (void *udpu_context, struct totem_config *totem_config)
{
#define UDPIP_HEADER_SIZE (20 + 8) /* 20 bytes for ip 8 bytes for udp */
totem_config->net_mtu -= crypto_sec_header_size(totem_config->crypto_cipher_type,
totem_config->crypto_hash_type) +
UDPIP_HEADER_SIZE;
}
const char *totemudpu_iface_print (void *udpu_context) {
struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
const char *ret_char;
ret_char = totemip_print (&instance->my_id);
return (ret_char);
}
int totemudpu_iface_get (
void *udpu_context,
struct totem_ip_address *addr)
{
struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
int res = 0;
memcpy (addr, &instance->my_id, sizeof (struct totem_ip_address));
return (res);
}
int totemudpu_token_target_set (
void *udpu_context,
const struct totem_ip_address *token_target)
{
struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
int res = 0;
memcpy (&instance->token_target, token_target,
sizeof (struct totem_ip_address));
instance->totemudpu_target_set_completed (instance->context);
return (res);
}
extern int totemudpu_recv_mcast_empty (
void *udpu_context)
{
struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
unsigned int res;
struct sockaddr_storage system_from;
struct msghdr msg_recv;
struct pollfd ufd;
int nfds;
int msg_processed = 0;
/*
* Receive datagram
*/
msg_recv.msg_name = &system_from;
msg_recv.msg_namelen = sizeof (struct sockaddr_storage);
msg_recv.msg_iov = &instance->totemudpu_iov_recv;
msg_recv.msg_iovlen = 1;
#ifdef HAVE_MSGHDR_CONTROL
msg_recv.msg_control = 0;
#endif
#ifdef HAVE_MSGHDR_CONTROLLEN
msg_recv.msg_controllen = 0;
#endif
#ifdef HAVE_MSGHDR_FLAGS
msg_recv.msg_flags = 0;
#endif
#ifdef HAVE_MSGHDR_ACCRIGHTS
msg_recv.msg_accrights = NULL;
#endif
#ifdef HAVE_MSGHDR_ACCRIGHTSLEN
msg_recv.msg_accrightslen = 0;
#endif
do {
ufd.fd = instance->token_socket;
ufd.events = POLLIN;
nfds = poll (&ufd, 1, 0);
if (nfds == 1 && ufd.revents & POLLIN) {
res = recvmsg (instance->token_socket, &msg_recv, MSG_NOSIGNAL | MSG_DONTWAIT);
if (res != -1) {
msg_processed = 1;
} else {
msg_processed = -1;
}
}
} while (nfds == 1);
return (msg_processed);
}
static int totemudpu_create_sending_socket(
void *udpu_context,
const struct totem_ip_address *member)
{
struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
int fd;
int res;
unsigned int sendbuf_size;
unsigned int optlen = sizeof (sendbuf_size);
struct sockaddr_storage sockaddr;
int addrlen;
fd = socket (member->family, SOCK_DGRAM, 0);
if (fd == -1) {
LOGSYS_PERROR (errno, instance->totemudpu_log_level_warning,
"Could not create socket for new member");
return (-1);
}
totemip_nosigpipe (fd);
res = fcntl (fd, F_SETFL, O_NONBLOCK);
if (res == -1) {
LOGSYS_PERROR (errno, instance->totemudpu_log_level_warning,
"Could not set non-blocking operation on token socket");
return (-1);
}
/*
* These sockets are used to send multicast messages, so their buffers
* should be large
*/
sendbuf_size = MCAST_SOCKET_BUFFER_SIZE;
res = setsockopt (fd, SOL_SOCKET, SO_SNDBUF,
&sendbuf_size, optlen);
if (res == -1) {
LOGSYS_PERROR (errno, instance->totemudpu_log_level_notice,
"Could not set sendbuf size");
}
/*
* Bind to sending interface
*/
totemip_totemip_to_sockaddr_convert(&instance->my_id, 0, &sockaddr, &addrlen);
res = bind (fd, (struct sockaddr *)&sockaddr, addrlen);
if (res == -1) {
LOGSYS_PERROR (errno, instance->totemudpu_log_level_warning,
"bind token socket failed");
return (-1);
}
return (fd);
}
int totemudpu_member_add (
void *udpu_context,
const struct totem_ip_address *member)
{
struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
struct totemudpu_member *new_member;
new_member = malloc (sizeof (struct totemudpu_member));
if (new_member == NULL) {
return (-1);
}
log_printf (LOGSYS_LEVEL_NOTICE, "adding new UDPU member {%s}",
totemip_print(member));
list_init (&new_member->list);
list_add_tail (&new_member->list, &instance->member_list);
memcpy (&new_member->member, member, sizeof (struct totem_ip_address));
new_member->fd = totemudpu_create_sending_socket(udpu_context, member);
return (0);
}
int totemudpu_member_remove (
void *udpu_context,
const struct totem_ip_address *token_target)
{
int found = 0;
struct list_head *list;
struct totemudpu_member *member;
struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
/*
* Find the member to remove and close its socket
*/
for (list = instance->member_list.next;
list != &instance->member_list;
list = list->next) {
member = list_entry (list,
struct totemudpu_member,
list);
if (totemip_compare (token_target, &member->member)==0) {
log_printf(LOGSYS_LEVEL_NOTICE,
"removing UDPU member {%s}",
totemip_print(&member->member));
if (member->fd > 0) {
log_printf(LOGSYS_LEVEL_DEBUG,
"Closing socket to: {%s}",
totemip_print(&member->member));
qb_loop_poll_del (instance->totemudpu_poll_handle,
member->fd);
close (member->fd);
}
found = 1;
break;
}
}
/*
* Delete the member from the list
*/
if (found) {
list_del (list);
}
instance = NULL;
return (0);
}
int totemudpu_member_list_rebind_ip (
void *udpu_context)
{
struct list_head *list;
struct totemudpu_member *member;
struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
for (list = instance->member_list.next;
list != &instance->member_list;
list = list->next) {
member = list_entry (list,
struct totemudpu_member,
list);
if (member->fd > 0) {
close (member->fd);
}
member->fd = totemudpu_create_sending_socket(udpu_context, &member->member);
}
return (0);
}
diff --git a/exec/totemudpu.h b/exec/totemudpu.h
index 136960cf..7e80ed74 100644
--- a/exec/totemudpu.h
+++ b/exec/totemudpu.h
@@ -1,125 +1,126 @@
/*
* Copyright (c) 2005 MontaVista Software, Inc.
* Copyright (c) 2006-2011 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Steven Dake (sdake@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef TOTEMUDPU_H_DEFINED
#define TOTEMUDPU_H_DEFINED
#include <sys/types.h>
#include <sys/socket.h>
#include <qb/qbloop.h>
#include <corosync/totem/totem.h>
/**
* Create an instance
*/
extern int totemudpu_initialize (
qb_loop_t *poll_handle,
void **udpu_context,
struct totem_config *totem_config,
+ totemsrp_stats_t *stats,
int interface_no,
void *context,
void (*deliver_fn) (
void *context,
const void *msg,
unsigned int msg_len),
void (*iface_change_fn) (
void *context,
const struct totem_ip_address *iface_address),
void (*target_set_completed) (
void *context));
extern void *totemudpu_buffer_alloc (void);
extern void totemudpu_buffer_release (void *ptr);
extern int totemudpu_processor_count_set (
void *udpu_context,
int processor_count);
extern int totemudpu_token_send (
void *udpu_context,
const void *msg,
unsigned int msg_len);
extern int totemudpu_mcast_flush_send (
void *udpu_context,
const void *msg,
unsigned int msg_len);
extern int totemudpu_mcast_noflush_send (
void *udpu_context,
const void *msg,
unsigned int msg_len);
extern int totemudpu_recv_flush (void *udpu_context);
extern int totemudpu_send_flush (void *udpu_context);
extern int totemudpu_iface_check (void *udpu_context);
extern int totemudpu_finalize (void *udpu_context);
extern void totemudpu_net_mtu_adjust (void *udpu_context, struct totem_config *totem_config);
extern const char *totemudpu_iface_print (void *udpu_context);
extern int totemudpu_iface_get (
void *udpu_context,
struct totem_ip_address *addr);
extern int totemudpu_token_target_set (
void *udpu_context,
const struct totem_ip_address *token_target);
extern int totemudpu_crypto_set (
void *udpu_context,
const char *cipher_type,
const char *hash_type);
extern int totemudpu_recv_mcast_empty (
void *udpu_context);
extern int totemudpu_member_add (
void *udpu_context,
const struct totem_ip_address *member);
extern int totemudpu_member_remove (
void *udpu_context,
const struct totem_ip_address *member);
#endif /* TOTEMUDPU_H_DEFINED */
diff --git a/include/corosync/totem/totem.h b/include/corosync/totem/totem.h
index 11fb581a..02a8a2cb 100644
--- a/include/corosync/totem/totem.h
+++ b/include/corosync/totem/totem.h
@@ -1,278 +1,283 @@
/*
* Copyright (c) 2005 MontaVista Software, Inc.
* Copyright (c) 2006-2012 Red Hat, Inc.
*
* Author: Steven Dake (sdake@redhat.com)
*
* All rights reserved.
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef TOTEM_H_DEFINED
#define TOTEM_H_DEFINED
#include "totemip.h"
#include <corosync/hdb.h>
#ifdef HAVE_SMALL_MEMORY_FOOTPRINT
#define PROCESSOR_COUNT_MAX 16
#define MESSAGE_SIZE_MAX 1024*64
#define MESSAGE_QUEUE_MAX 512
#else
#define PROCESSOR_COUNT_MAX 384
#define MESSAGE_SIZE_MAX 1024*1024 /* (1MB) */
#define MESSAGE_QUEUE_MAX ((4 * MESSAGE_SIZE_MAX) / totem_config->net_mtu)
#endif /* HAVE_SMALL_MEMORY_FOOTPRINT */
#define FRAME_SIZE_MAX 10000
#define TRANSMITS_ALLOWED 16
#define SEND_THREADS_MAX 16
#define INTERFACE_MAX 2
/**
* Maximum number of continuous gather states
*/
#define MAX_NO_CONT_GATHER 3
+/*
+ * Maximum number of continuous failures get from sendmsg call
+ */
+#define MAX_NO_CONT_SENDMSG_FAILURES 30
struct totem_interface {
struct totem_ip_address bindnet;
struct totem_ip_address boundto;
struct totem_ip_address mcast_addr;
uint16_t ip_port;
uint16_t ttl;
int member_count;
struct totem_ip_address member_list[PROCESSOR_COUNT_MAX];
};
struct totem_logging_configuration {
void (*log_printf) (
int level,
int subsys,
const char *function_name,
const char *file_name,
int file_line,
const char *format,
...) __attribute__((format(printf, 6, 7)));
int log_level_security;
int log_level_error;
int log_level_warning;
int log_level_notice;
int log_level_debug;
int log_level_trace;
int log_subsys_id;
};
enum { TOTEM_PRIVATE_KEY_LEN = 128 };
enum { TOTEM_RRP_MODE_BYTES = 64 };
typedef enum {
TOTEM_TRANSPORT_UDP = 0,
TOTEM_TRANSPORT_UDPU = 1,
TOTEM_TRANSPORT_RDMA = 2
} totem_transport_t;
struct totem_config {
int version;
/*
* network
*/
struct totem_interface *interfaces;
unsigned int interface_count;
unsigned int node_id;
unsigned int clear_node_high_bit;
/*
* key information
*/
unsigned char private_key[TOTEM_PRIVATE_KEY_LEN];
unsigned int private_key_len;
/*
* Totem configuration parameters
*/
unsigned int token_timeout;
unsigned int token_retransmit_timeout;
unsigned int token_hold_timeout;
unsigned int token_retransmits_before_loss_const;
unsigned int join_timeout;
unsigned int send_join_timeout;
unsigned int consensus_timeout;
unsigned int merge_timeout;
unsigned int downcheck_timeout;
unsigned int fail_to_recv_const;
unsigned int seqno_unchanged_const;
unsigned int rrp_token_expired_timeout;
unsigned int rrp_problem_count_timeout;
unsigned int rrp_problem_count_threshold;
unsigned int rrp_problem_count_mcast_threshold;
unsigned int rrp_autorecovery_check_timeout;
char rrp_mode[TOTEM_RRP_MODE_BYTES];
struct totem_logging_configuration totem_logging_configuration;
unsigned int net_mtu;
unsigned int threads;
unsigned int heartbeat_failures_allowed;
unsigned int max_network_delay;
unsigned int window_size;
unsigned int max_messages;
const char *vsf_type;
unsigned int broadcast_use;
char *crypto_cipher_type;
char *crypto_hash_type;
totem_transport_t transport_number;
unsigned int miss_count_const;
};
#define TOTEM_CONFIGURATION_TYPE
enum totem_configuration_type {
TOTEM_CONFIGURATION_REGULAR,
TOTEM_CONFIGURATION_TRANSITIONAL
};
#define TOTEM_CALLBACK_TOKEN_TYPE
enum totem_callback_token_type {
TOTEM_CALLBACK_TOKEN_RECEIVED = 1,
TOTEM_CALLBACK_TOKEN_SENT = 2
};
enum totem_event_type {
TOTEM_EVENT_DELIVERY_CONGESTED,
TOTEM_EVENT_NEW_MSG,
};
#define MEMB_RING_ID
struct memb_ring_id {
struct totem_ip_address rep;
unsigned long long seq;
} __attribute__((packed));
typedef struct {
int is_dirty;
time_t last_updated;
} totem_stats_header_t;
typedef struct {
totem_stats_header_t hdr;
uint32_t iface_changes;
} totemnet_stats_t;
typedef struct {
totem_stats_header_t hdr;
totemnet_stats_t *net;
char *algo_name;
uint8_t *faulty;
uint32_t interface_count;
} totemrrp_stats_t;
typedef struct {
uint32_t rx;
uint32_t tx;
int backlog_calc;
} totemsrp_token_stats_t;
typedef struct {
totem_stats_header_t hdr;
totemrrp_stats_t *rrp;
uint64_t orf_token_tx;
uint64_t orf_token_rx;
uint64_t memb_merge_detect_tx;
uint64_t memb_merge_detect_rx;
uint64_t memb_join_tx;
uint64_t memb_join_rx;
uint64_t mcast_tx;
uint64_t mcast_retx;
uint64_t mcast_rx;
uint64_t memb_commit_token_tx;
uint64_t memb_commit_token_rx;
uint64_t token_hold_cancel_tx;
uint64_t token_hold_cancel_rx;
uint64_t operational_entered;
uint64_t operational_token_lost;
uint64_t gather_entered;
uint64_t gather_token_lost;
uint64_t commit_entered;
uint64_t commit_token_lost;
uint64_t recovery_entered;
uint64_t recovery_token_lost;
uint64_t consensus_timeouts;
uint64_t rx_msg_dropped;
uint32_t continuous_gather;
+ uint32_t continuous_sendmsg_failures;
int earliest_token;
int latest_token;
#define TOTEM_TOKEN_STATS_MAX 100
totemsrp_token_stats_t token[TOTEM_TOKEN_STATS_MAX];
} totemsrp_stats_t;
#define TOTEM_CONFIGURATION_TYPE
typedef struct {
totem_stats_header_t hdr;
totemsrp_stats_t *srp;
} totemmrp_stats_t;
typedef struct {
totem_stats_header_t hdr;
totemmrp_stats_t *mrp;
uint32_t msg_reserved;
uint32_t msg_queue_avail;
} totempg_stats_t;
#endif /* TOTEM_H_DEFINED */
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Mon, Feb 24, 5:15 AM (1 d, 15 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1463972
Default Alt Text
(242 KB)
Attached To
Mode
rC Corosync
Attached
Detach File
Event Timeline
Log In to Comment