diff --git a/TODO b/TODO index 6b6a9288..4607e655 100644 --- a/TODO +++ b/TODO @@ -1,132 +1,135 @@ -------------------------------------------------------- The Corosync Cluster Engine Topic Branches and Backlog -------------------------------------------------------- ---------------------------- Last Updated: October 2011 ---------------------------- -------------------------------------- Current priority list for Needle 2.0 -------------------------------------- 1. implement topic-map 2. replace confdb callers with map callers 3. quorum debugging and rework 4. implement topic-xmlconfig 5. remove external plug-in api 6. remove logsys.h from external headers 7. remove hardcoded values in totempg.c check_q_level 8. check max message size restrictions 9. investigate if https://github.com/asalkeld/libqb/issues/1 is still an issue. 10. allow a cluster name to autogenerate a mcastaddr 11. ring status change via corosync-notifyd +12. remove dashes in UDPU notifier and replace with dots once map code is in +13. put addition/removal of members in totem.interface.member objects +14. dynamic UDPU needs ring id settings for multiple rings -------------------------------------- Current priority list for Needle 2.1 -------------------------------------- 1. implement topic-onecrypt 2. implement add/remove nodes from udpu 3. logsys glue layer removal 4. implement topic-zerocopy 5. implement topic-rdmaud 6. harden and finish ykd algorithm We use topic branches in our git repository to develop new disruptive features that define our future roadmap. This file describes the topic branches the developers have interest in investigating further. targets can be: whitetank, needle2.0, needle3.0, or future (3.0+). Finished can be: percentage or date merged to master. Once in a shipped version, please remove from the topic list. ------------------------------------------------------------------------------ topic-map ------------------------------------------------------------------------------ Main Developer: Honza Friesse Started: not started Finished: 20% target: needle2.0 Currently confdb is very difficult to use. We use this component for our diagnostic feature set as well as storing our runtime configuration. A map is a better choice for a data structure here. Current thinking is to use the trie implementation from libqb to provide the core of this functionality ------------------------------------------------------------------------------ topic-xmlconfig ------------------------------------------------------------------------------ Main Developer: Honza Friesse Started: not started Finished: 0% target: needle2.0 Test suites and users alike would like to configure the software via XML configuration. Current thinking is we will implement a separate binary which converts xml to native config format via XSLT. This keeps libxml out of the corosync process address space. During startup, corosync could either fork and exec this process, or it could be part of the system startup mechanism. ------------------------------------------------------------------------------ topic-onecrypt ------------------------------------------------------------------------------ Main Developer: Honza Friesse Started: not started Finished: 0% target: needle2.1 Description: Currently encryption code is located in totemudp.c, totemudpu.c, and iba has no encryption support. This topic merges the encryption code into a new file such as totemcrp.c and provides a mechanism for totemnet.c to register encrypt and decrypt functions with totem[udp|iba|udpu] and use them as requested by the configuration. ------------------------------------------------------------------------------ topic-netmalloc ------------------------------------------------------------------------------ Main Developer: Honza Friesse Started: not started Finished: 0% target: needle2.1 Description: The totemiba.c driver must allocate memory and assign it to a protection domain in order for an infiniband driver to transmit memory. In the current implementation, totemsrp.c also allocates these same frames. This results in an extra memcpy when transmitting with libibverbs technology. Memory copies are to be avoided. The simple solution is to have each network driver provide a memory allocation function. When totemsrp wants a free frame, it requests it from the network driver. ------------------------------------------------------------------------------ topic-rdmaud ------------------------------------------------------------------------------ Main Developer: Honza Friesse Steven Dake Started: not started Finished: 0% target: needle2.1 Description: Currently our RDMA code uses librdmacm to setup connections. We are not certain this extra library is needed, and may be able to use only ibverbs. If this is possible, the totem code may be more reliable, especially around failure conditions. ------------------------------------------------------------------------------ topic-zerocopy ------------------------------------------------------------------------------ Main Developer: Honza Friesse Started: not started Finished: 0% target: needle2.1 Description: Totem has many copies involved in messaging which we would like to investigate removing. Our goal is to deliver wire speed performance for rdma networks, and if this can be achieved by our other topic investigations, we may not further investigate this topic. The basic idea of the topic is to handle message assembly/fragmentation in libcpg, and have totem be responsible for sending these pages that are shared via posix shared memory. ------------------------------------------------------------------------------ other topics not yet defined: * disallow binding to localhost interfae in redundant ring configuation. * doxygenize include and lib directories. * sort out binding to localhost in general * totem multiring * load balancing over different speed links in RRP diff --git a/exec/main.c b/exec/main.c index 3260aa85..e2ab9795 100644 --- a/exec/main.c +++ b/exec/main.c @@ -1,1530 +1,1646 @@ /* * Copyright (c) 2002-2006 MontaVista Software, Inc. * Copyright (c) 2006-2009 Red Hat, Inc. * * All rights reserved. * * Author: Steven Dake (sdake@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ /** * \mainpage Corosync * * This is the doxygen generated developer documentation for the Corosync * project. For more information about Corosync, please see the project * web site, corosync.org. * * \section license License * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "quorum.h" #include "totemsrp.h" #include "mainconfig.h" #include "totemconfig.h" #include "main.h" #include "sync.h" #include "syncv2.h" #include "timer.h" #include "util.h" #include "apidef.h" #include "service.h" #include "schedwrk.h" #include "evil.h" #ifdef HAVE_SMALL_MEMORY_FOOTPRINT #define IPC_LOGSYS_SIZE 1024*64 #else #define IPC_LOGSYS_SIZE 8192*128 #endif LOGSYS_DECLARE_SYSTEM ("corosync", LOGSYS_MODE_OUTPUT_STDERR, LOG_DAEMON, LOG_INFO); LOGSYS_DECLARE_SUBSYS ("MAIN"); #define SERVER_BACKLOG 5 static int sched_priority = 0; static unsigned int service_count = 32; static struct totem_logging_configuration totem_logging_configuration; static int num_config_modules; static struct config_iface_ver0 *config_modules[MAX_DYNAMIC_SERVICES]; static struct objdb_iface_ver0 *objdb = NULL; static struct corosync_api_v1 *api = NULL; static enum cs_sync_mode minimum_sync_mode; static int sync_in_process = 1; static qb_loop_t *corosync_poll_handle; struct sched_param global_sched_param; static hdb_handle_t object_memb_handle; static corosync_timer_handle_t corosync_stats_timer_handle; static const char *corosync_lock_file = LOCALSTATEDIR"/run/corosync.pid"; qb_loop_t *cs_poll_handle_get (void) { return (corosync_poll_handle); } int cs_poll_dispatch_add (qb_loop_t * handle, int fd, int events, void *data, int (*dispatch_fn) (int fd, int revents, void *data)) { return qb_loop_poll_add(handle, QB_LOOP_MED, fd, events, data, dispatch_fn); } int cs_poll_dispatch_delete(qb_loop_t * handle, int fd) { return qb_loop_poll_del(handle, fd); } void corosync_state_dump (void) { int i; for (i = 0; i < SERVICE_HANDLER_MAXIMUM_COUNT; i++) { if (ais_service[i] && ais_service[i]->exec_dump_fn) { ais_service[i]->exec_dump_fn (); } } } static void unlink_all_completed (void) { api->timer_delete (corosync_stats_timer_handle); qb_loop_stop (corosync_poll_handle); } void corosync_shutdown_request (void) { corosync_service_unlink_all (api, unlink_all_completed); } static int32_t sig_diag_handler (int num, void *data) { corosync_state_dump (); qb_log_blackbox_write_to_file(LOCALSTATEDIR "/lib/corosync/fdata"); return 0; } static int32_t sig_exit_handler (int num, void *data) { corosync_service_unlink_all (api, unlink_all_completed); return 0; } static void sigsegv_handler (int num) { (void)signal (SIGSEGV, SIG_DFL); qb_log_blackbox_write_to_file(LOCALSTATEDIR "/lib/corosync/fdata"); qb_log_fini(); raise (SIGSEGV); } static void sigabrt_handler (int num) { (void)signal (SIGABRT, SIG_DFL); qb_log_blackbox_write_to_file(LOCALSTATEDIR "/lib/corosync/fdata"); qb_log_fini(); raise (SIGABRT); } #define LOCALHOST_IP inet_addr("127.0.0.1") static void *corosync_group_handle; static struct totempg_group corosync_group = { .group = "a", .group_len = 1 }; static void serialize_lock (void) { } static void serialize_unlock (void) { } static void corosync_sync_completed (void) { log_printf (LOGSYS_LEVEL_NOTICE, "Completed service synchronization, ready to provide service.\n"); sync_in_process = 0; cs_ipcs_sync_state_changed(sync_in_process); } static int corosync_sync_callbacks_retrieve (int sync_id, struct sync_callbacks *callbacks) { unsigned int ais_service_index; int res; for (ais_service_index = 0; ais_service_index < SERVICE_HANDLER_MAXIMUM_COUNT; ais_service_index++) { if (ais_service[ais_service_index] != NULL && (ais_service[ais_service_index]->sync_mode == CS_SYNC_V1 || ais_service[ais_service_index]->sync_mode == CS_SYNC_V1_APIV2)) { if (ais_service_index == sync_id) { break; } } } /* * Try to load backwards compat sync engines */ if (ais_service_index == SERVICE_HANDLER_MAXIMUM_COUNT) { res = evil_callbacks_load (sync_id, callbacks); return (res); } callbacks->name = ais_service[ais_service_index]->name; callbacks->sync_init_api.sync_init_v1 = ais_service[ais_service_index]->sync_init; callbacks->api_version = 1; if (ais_service[ais_service_index]->sync_mode == CS_SYNC_V1_APIV2) { callbacks->api_version = 2; } callbacks->sync_process = ais_service[ais_service_index]->sync_process; callbacks->sync_activate = ais_service[ais_service_index]->sync_activate; callbacks->sync_abort = ais_service[ais_service_index]->sync_abort; return (0); } static int corosync_sync_v2_callbacks_retrieve ( int service_id, struct sync_callbacks *callbacks) { int res; if (minimum_sync_mode == CS_SYNC_V2 && service_id == CLM_SERVICE && ais_service[CLM_SERVICE] == NULL) { res = evil_callbacks_load (service_id, callbacks); return (res); } if (minimum_sync_mode == CS_SYNC_V2 && service_id == EVT_SERVICE && ais_service[EVT_SERVICE] == NULL) { res = evil_callbacks_load (service_id, callbacks); return (res); } if (ais_service[service_id] == NULL) { return (-1); } if (minimum_sync_mode == CS_SYNC_V1 && ais_service[service_id]->sync_mode != CS_SYNC_V2) { return (-1); } callbacks->name = ais_service[service_id]->name; callbacks->api_version = 1; if (ais_service[service_id]->sync_mode == CS_SYNC_V1_APIV2) { callbacks->api_version = 2; } callbacks->sync_init_api.sync_init_v1 = ais_service[service_id]->sync_init; callbacks->sync_process = ais_service[service_id]->sync_process; callbacks->sync_activate = ais_service[service_id]->sync_activate; callbacks->sync_abort = ais_service[service_id]->sync_abort; return (0); } static struct memb_ring_id corosync_ring_id; static void member_object_joined (unsigned int nodeid) { hdb_handle_t object_find_handle; hdb_handle_t object_node_handle; char * nodeint_str; char nodeid_str[64]; unsigned int key_incr_dummy; snprintf (nodeid_str, 64, "%d", nodeid); objdb->object_find_create ( object_memb_handle, nodeid_str, strlen (nodeid_str), &object_find_handle); if (objdb->object_find_next (object_find_handle, &object_node_handle) == 0) { objdb->object_key_increment (object_node_handle, "join_count", strlen("join_count"), &key_incr_dummy); objdb->object_key_replace (object_node_handle, "status", strlen("status"), "joined", strlen("joined")); } else { nodeint_str = (char*)api->totem_ifaces_print (nodeid); objdb->object_create (object_memb_handle, &object_node_handle, nodeid_str, strlen (nodeid_str)); objdb->object_key_create_typed (object_node_handle, "ip", nodeint_str, strlen(nodeint_str), OBJDB_VALUETYPE_STRING); key_incr_dummy = 1; objdb->object_key_create_typed (object_node_handle, "join_count", &key_incr_dummy, sizeof (key_incr_dummy), OBJDB_VALUETYPE_UINT32); objdb->object_key_create_typed (object_node_handle, "status", "joined", strlen("joined"), OBJDB_VALUETYPE_STRING); } } static void member_object_left (unsigned int nodeid) { hdb_handle_t object_find_handle; hdb_handle_t object_node_handle; char nodeid_str[64]; snprintf (nodeid_str, 64, "%u", nodeid); objdb->object_find_create ( object_memb_handle, nodeid_str, strlen (nodeid_str), &object_find_handle); if (objdb->object_find_next (object_find_handle, &object_node_handle) == 0) { objdb->object_key_replace (object_node_handle, "status", strlen("status"), "left", strlen("left")); } } static void confchg_fn ( enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id) { int i; int abort_activate = 0; if (sync_in_process == 1) { abort_activate = 1; } sync_in_process = 1; cs_ipcs_sync_state_changed(sync_in_process); memcpy (&corosync_ring_id, ring_id, sizeof (struct memb_ring_id)); for (i = 0; i < left_list_entries; i++) { member_object_left (left_list[i]); } for (i = 0; i < joined_list_entries; i++) { member_object_joined (joined_list[i]); } /* * Call configuration change for all services */ for (i = 0; i < service_count; i++) { if (ais_service[i] && ais_service[i]->confchg_fn) { ais_service[i]->confchg_fn (configuration_type, member_list, member_list_entries, left_list, left_list_entries, joined_list, joined_list_entries, ring_id); } } if (abort_activate) { sync_v2_abort (); } if (minimum_sync_mode == CS_SYNC_V2 && configuration_type == TOTEM_CONFIGURATION_TRANSITIONAL) { sync_v2_save_transitional (member_list, member_list_entries, ring_id); } if (minimum_sync_mode == CS_SYNC_V2 && configuration_type == TOTEM_CONFIGURATION_REGULAR) { sync_v2_start (member_list, member_list_entries, ring_id); } } static void priv_drop (void) { return; /* TODO: we are still not dropping privs */ } static void corosync_tty_detach (void) { FILE *r; /* * Disconnect from TTY if this is not a debug run */ switch (fork ()) { case -1: corosync_exit_error (AIS_DONE_FORK); break; case 0: /* * child which is disconnected, run this process */ break; default: exit (0); break; } /* Create new session */ (void)setsid(); /* * Map stdin/out/err to /dev/null. */ r = freopen("/dev/null", "r", stdin); if (r == NULL) { corosync_exit_error (AIS_DONE_STD_TO_NULL_REDIR); } r = freopen("/dev/null", "a", stderr); if (r == NULL) { corosync_exit_error (AIS_DONE_STD_TO_NULL_REDIR); } r = freopen("/dev/null", "a", stdout); if (r == NULL) { corosync_exit_error (AIS_DONE_STD_TO_NULL_REDIR); } } static void corosync_mlockall (void) { #if !defined(COROSYNC_BSD) || defined(COROSYNC_FREEBSD_GE_8) int res; #endif struct rlimit rlimit; rlimit.rlim_cur = RLIM_INFINITY; rlimit.rlim_max = RLIM_INFINITY; #ifndef COROSYNC_SOLARIS setrlimit (RLIMIT_MEMLOCK, &rlimit); #else setrlimit (RLIMIT_VMEM, &rlimit); #endif #if defined(COROSYNC_BSD) && !defined(COROSYNC_FREEBSD_GE_8) /* under FreeBSD < 8 a process with locked page cannot call dlopen * code disabled until FreeBSD bug i386/93396 was solved */ log_printf (LOGSYS_LEVEL_WARNING, "Could not lock memory of service to avoid page faults\n"); #else res = mlockall (MCL_CURRENT | MCL_FUTURE); if (res == -1) { LOGSYS_PERROR (errno, LOGSYS_LEVEL_WARNING, "Could not lock memory of service to avoid page faults"); }; #endif } static void corosync_totem_stats_updater (void *data) { totempg_stats_t * stats; uint32_t mtt_rx_token; uint32_t total_mtt_rx_token; uint32_t avg_backlog_calc; uint32_t total_backlog_calc; uint32_t avg_token_holdtime; uint32_t total_token_holdtime; int t, prev; int32_t token_count; uint32_t firewall_enabled_or_nic_failure; stats = api->totem_get_stats(); objdb->object_key_replace (stats->hdr.handle, "msg_reserved", strlen("msg_reserved"), &stats->msg_reserved, sizeof (stats->msg_reserved)); objdb->object_key_replace (stats->hdr.handle, "msg_queue_avail", strlen("msg_queue_avail"), &stats->msg_queue_avail, sizeof (stats->msg_queue_avail)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "orf_token_tx", strlen("orf_token_tx"), &stats->mrp->srp->orf_token_tx, sizeof (stats->mrp->srp->orf_token_tx)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "orf_token_rx", strlen("orf_token_rx"), &stats->mrp->srp->orf_token_rx, sizeof (stats->mrp->srp->orf_token_rx)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "memb_merge_detect_tx", strlen("memb_merge_detect_tx"), &stats->mrp->srp->memb_merge_detect_tx, sizeof (stats->mrp->srp->memb_merge_detect_tx)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "memb_merge_detect_rx", strlen("memb_merge_detect_rx"), &stats->mrp->srp->memb_merge_detect_rx, sizeof (stats->mrp->srp->memb_merge_detect_rx)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "memb_join_tx", strlen("memb_join_tx"), &stats->mrp->srp->memb_join_tx, sizeof (stats->mrp->srp->memb_join_tx)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "memb_join_rx", strlen("memb_join_rx"), &stats->mrp->srp->memb_join_rx, sizeof (stats->mrp->srp->memb_join_rx)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "mcast_tx", strlen("mcast_tx"), &stats->mrp->srp->mcast_tx, sizeof (stats->mrp->srp->mcast_tx)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "mcast_retx", strlen("mcast_retx"), &stats->mrp->srp->mcast_retx, sizeof (stats->mrp->srp->mcast_retx)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "mcast_rx", strlen("mcast_rx"), &stats->mrp->srp->mcast_rx, sizeof (stats->mrp->srp->mcast_rx)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "memb_commit_token_tx", strlen("memb_commit_token_tx"), &stats->mrp->srp->memb_commit_token_tx, sizeof (stats->mrp->srp->memb_commit_token_tx)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "memb_commit_token_rx", strlen("memb_commit_token_rx"), &stats->mrp->srp->memb_commit_token_rx, sizeof (stats->mrp->srp->memb_commit_token_rx)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "token_hold_cancel_tx", strlen("token_hold_cancel_tx"), &stats->mrp->srp->token_hold_cancel_tx, sizeof (stats->mrp->srp->token_hold_cancel_tx)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "token_hold_cancel_rx", strlen("token_hold_cancel_rx"), &stats->mrp->srp->token_hold_cancel_rx, sizeof (stats->mrp->srp->token_hold_cancel_rx)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "operational_entered", strlen("operational_entered"), &stats->mrp->srp->operational_entered, sizeof (stats->mrp->srp->operational_entered)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "operational_token_lost", strlen("operational_token_lost"), &stats->mrp->srp->operational_token_lost, sizeof (stats->mrp->srp->operational_token_lost)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "gather_entered", strlen("gather_entered"), &stats->mrp->srp->gather_entered, sizeof (stats->mrp->srp->gather_entered)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "gather_token_lost", strlen("gather_token_lost"), &stats->mrp->srp->gather_token_lost, sizeof (stats->mrp->srp->gather_token_lost)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "commit_entered", strlen("commit_entered"), &stats->mrp->srp->commit_entered, sizeof (stats->mrp->srp->commit_entered)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "commit_token_lost", strlen("commit_token_lost"), &stats->mrp->srp->commit_token_lost, sizeof (stats->mrp->srp->commit_token_lost)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "recovery_entered", strlen("recovery_entered"), &stats->mrp->srp->recovery_entered, sizeof (stats->mrp->srp->recovery_entered)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "recovery_token_lost", strlen("recovery_token_lost"), &stats->mrp->srp->recovery_token_lost, sizeof (stats->mrp->srp->recovery_token_lost)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "consensus_timeouts", strlen("consensus_timeouts"), &stats->mrp->srp->consensus_timeouts, sizeof (stats->mrp->srp->consensus_timeouts)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "rx_msg_dropped", strlen("rx_msg_dropped"), &stats->mrp->srp->rx_msg_dropped, sizeof (stats->mrp->srp->rx_msg_dropped)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "continuous_gather", strlen("continuous_gather"), &stats->mrp->srp->continuous_gather, sizeof (stats->mrp->srp->continuous_gather)); firewall_enabled_or_nic_failure = (stats->mrp->srp->continuous_gather > MAX_NO_CONT_GATHER ? 1 : 0); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "firewall_enabled_or_nic_failure", strlen("firewall_enabled_or_nic_failure"), &firewall_enabled_or_nic_failure, sizeof (firewall_enabled_or_nic_failure)); total_mtt_rx_token = 0; total_token_holdtime = 0; total_backlog_calc = 0; token_count = 0; t = stats->mrp->srp->latest_token; while (1) { if (t == 0) prev = TOTEM_TOKEN_STATS_MAX - 1; else prev = t - 1; if (prev == stats->mrp->srp->earliest_token) break; /* if tx == 0, then dropped token (not ours) */ if (stats->mrp->srp->token[t].tx != 0 || (stats->mrp->srp->token[t].rx - stats->mrp->srp->token[prev].rx) > 0 ) { total_mtt_rx_token += (stats->mrp->srp->token[t].rx - stats->mrp->srp->token[prev].rx); total_token_holdtime += (stats->mrp->srp->token[t].tx - stats->mrp->srp->token[t].rx); total_backlog_calc += stats->mrp->srp->token[t].backlog_calc; token_count++; } t = prev; } if (token_count) { mtt_rx_token = (total_mtt_rx_token / token_count); avg_backlog_calc = (total_backlog_calc / token_count); avg_token_holdtime = (total_token_holdtime / token_count); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "mtt_rx_token", strlen("mtt_rx_token"), &mtt_rx_token, sizeof (mtt_rx_token)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "avg_token_workload", strlen("avg_token_workload"), &avg_token_holdtime, sizeof (avg_token_holdtime)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "avg_backlog_calc", strlen("avg_backlog_calc"), &avg_backlog_calc, sizeof (avg_backlog_calc)); } cs_ipcs_stats_update(); api->timer_add_duration (1500 * MILLI_2_NANO_SECONDS, NULL, corosync_totem_stats_updater, &corosync_stats_timer_handle); } +static void totem_dynamic_name_to_ip (char *dest, + size_t dest_size, + const void *src, + size_t src_len) +{ + char *p; + size_t len; + + len = (src_len + 1 > dest_size) ? dest_size-1 : src_len; + memset(dest, 0, dest_size); + memcpy(dest, src, len); + for (p = dest; p != dest + len; p++) { + if (*p == '-') { + *p = '.'; + } + } +} + +static void totem_dynamic_create_notify_fn ( + hdb_handle_t parent_object_handle, + hdb_handle_t object_handle, + const void *name_pt, size_t name_len, + void *priv_data_pt) +{ + struct totem_ip_address member; + int ring_no; + char object_name[128]; + + totem_dynamic_name_to_ip (object_name, + sizeof object_name, name_pt, name_len); + log_printf (LOGSYS_LEVEL_DEBUG, + "adding dynamic member: %s\n", object_name); + + /* + * add new member + */ + if (totemip_parse (&member, object_name, 0) == 0) { + ring_no = 0; + totempg_member_add (&member, ring_no); + } +} + +static void totem_dynamic_destroy_notify_fn( + hdb_handle_t parent_object_handle, + const void *name_pt, size_t name_len, + void *priv_data_pt) +{ + struct totem_ip_address member; + int ring_no; + char object_name[128]; + + totem_dynamic_name_to_ip (object_name, sizeof object_name, + name_pt, name_len); + log_printf(LOGSYS_LEVEL_DEBUG, + "removing dynamic member: %s\n", object_name); + + /* + * remove member + */ + if (totemip_parse(&member, object_name, 0) == 0) { + ring_no = 0; + totempg_member_remove (&member, ring_no); + } +} + +static void corosync_totem_dynamic_init (void) +{ + hdb_handle_t object_find_handle; + hdb_handle_t object_totem_handle; + hdb_handle_t object_interface_handle; + hdb_handle_t object_dynamic_handle; + + if (objdb->object_find_create (OBJECT_PARENT_HANDLE, + "totem", strlen("totem"), &object_find_handle) != 0) { + log_printf(LOGSYS_LEVEL_ERROR, + "corosync_totem_dynamic_init:: FAILED to find totem!\n"); + return; + } + if (objdb->object_find_next (object_find_handle, + &object_totem_handle) != 0) { + return; + } + + if (objdb->object_find_create(object_totem_handle, + "interface", strlen("interface"), &object_find_handle) != 0) { + + log_printf(LOGSYS_LEVEL_ERROR, + "corosync_totem_dynamic_init:: FAILED to find totem.interface!\n"); + return; + } + if (objdb->object_find_next (object_find_handle, + &object_interface_handle) != 0) { + + return; + } + + /* + * create new child object: dynamic + */ + if (objdb->object_create (object_interface_handle, + &object_dynamic_handle, + "dynamic", strlen("dynamic")) != 0) { + + log_printf(LOGSYS_LEVEL_ERROR, + "unable to create object: \"totem.interface.dynamic\"\n"); + return; + } + + objdb->object_track_start (object_dynamic_handle, + OBJECT_TRACK_DEPTH_RECURSIVE, + NULL, + totem_dynamic_create_notify_fn, + totem_dynamic_destroy_notify_fn, + NULL, NULL); +} static void corosync_totem_stats_init (void) { totempg_stats_t * stats; hdb_handle_t object_find_handle; hdb_handle_t object_runtime_handle; hdb_handle_t object_totem_handle; uint32_t zero_32 = 0; uint64_t zero_64 = 0; stats = api->totem_get_stats(); objdb->object_find_create ( OBJECT_PARENT_HANDLE, "runtime", strlen ("runtime"), &object_find_handle); if (objdb->object_find_next (object_find_handle, &object_runtime_handle) == 0) { objdb->object_create (object_runtime_handle, &object_totem_handle, "totem", strlen ("totem")); objdb->object_create (object_totem_handle, &stats->hdr.handle, "pg", strlen ("pg")); objdb->object_create (stats->hdr.handle, &stats->mrp->hdr.handle, "mrp", strlen ("mrp")); objdb->object_create (stats->mrp->hdr.handle, &stats->mrp->srp->hdr.handle, "srp", strlen ("srp")); objdb->object_key_create_typed (stats->hdr.handle, "msg_reserved", &stats->msg_reserved, sizeof (stats->msg_reserved), OBJDB_VALUETYPE_UINT32); objdb->object_key_create_typed (stats->hdr.handle, "msg_queue_avail", &stats->msg_queue_avail, sizeof (stats->msg_queue_avail), OBJDB_VALUETYPE_UINT32); /* Members object */ objdb->object_create (stats->mrp->srp->hdr.handle, &object_memb_handle, "members", strlen ("members")); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "orf_token_tx", &stats->mrp->srp->orf_token_tx, sizeof (stats->mrp->srp->orf_token_tx), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "orf_token_rx", &stats->mrp->srp->orf_token_rx, sizeof (stats->mrp->srp->orf_token_rx), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "memb_merge_detect_tx", &stats->mrp->srp->memb_merge_detect_tx, sizeof (stats->mrp->srp->memb_merge_detect_tx), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "memb_merge_detect_rx", &stats->mrp->srp->memb_merge_detect_rx, sizeof (stats->mrp->srp->memb_merge_detect_rx), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "memb_join_tx", &stats->mrp->srp->memb_join_tx, sizeof (stats->mrp->srp->memb_join_tx), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "memb_join_rx", &stats->mrp->srp->memb_join_rx, sizeof (stats->mrp->srp->memb_join_rx), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "mcast_tx", &stats->mrp->srp->mcast_tx, sizeof (stats->mrp->srp->mcast_tx), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "mcast_retx", &stats->mrp->srp->mcast_retx, sizeof (stats->mrp->srp->mcast_retx), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "mcast_rx", &stats->mrp->srp->mcast_rx, sizeof (stats->mrp->srp->mcast_rx), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "memb_commit_token_tx", &stats->mrp->srp->memb_commit_token_tx, sizeof (stats->mrp->srp->memb_commit_token_tx), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "memb_commit_token_rx", &stats->mrp->srp->memb_commit_token_rx, sizeof (stats->mrp->srp->memb_commit_token_rx), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "token_hold_cancel_tx", &stats->mrp->srp->token_hold_cancel_tx, sizeof (stats->mrp->srp->token_hold_cancel_tx), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "token_hold_cancel_rx", &stats->mrp->srp->token_hold_cancel_rx, sizeof (stats->mrp->srp->token_hold_cancel_rx), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "operational_entered", &stats->mrp->srp->operational_entered, sizeof (stats->mrp->srp->operational_entered), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "operational_token_lost", &stats->mrp->srp->operational_token_lost, sizeof (stats->mrp->srp->operational_token_lost), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "gather_entered", &stats->mrp->srp->gather_entered, sizeof (stats->mrp->srp->gather_entered), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "gather_token_lost", &stats->mrp->srp->gather_token_lost, sizeof (stats->mrp->srp->gather_token_lost), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "commit_entered", &stats->mrp->srp->commit_entered, sizeof (stats->mrp->srp->commit_entered), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "commit_token_lost", &stats->mrp->srp->commit_token_lost, sizeof (stats->mrp->srp->commit_token_lost), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "recovery_entered", &stats->mrp->srp->recovery_entered, sizeof (stats->mrp->srp->recovery_entered), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "recovery_token_lost", &stats->mrp->srp->recovery_token_lost, sizeof (stats->mrp->srp->recovery_token_lost), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "consensus_timeouts", &stats->mrp->srp->consensus_timeouts, sizeof (stats->mrp->srp->consensus_timeouts), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "mtt_rx_token", &zero_32, sizeof (zero_32), OBJDB_VALUETYPE_UINT32); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "avg_token_workload", &zero_32, sizeof (zero_32), OBJDB_VALUETYPE_UINT32); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "avg_backlog_calc", &zero_32, sizeof (zero_32), OBJDB_VALUETYPE_UINT32); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "rx_msg_dropped", &zero_64, sizeof (zero_64), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "continuous_gather", &zero_32, sizeof (zero_32), OBJDB_VALUETYPE_UINT32); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "firewall_enabled_or_nic_failure", &zero_32, sizeof (zero_32), OBJDB_VALUETYPE_UINT32); } /* start stats timer */ api->timer_add_duration (1500 * MILLI_2_NANO_SECONDS, NULL, corosync_totem_stats_updater, &corosync_stats_timer_handle); } static void deliver_fn ( unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required) { const struct qb_ipc_request_header *header; int32_t service; int32_t fn_id; uint32_t id; uint32_t key_incr_dummy; header = msg; if (endian_conversion_required) { id = swab32 (header->id); } else { id = header->id; } /* * Call the proper executive handler */ service = id >> 16; fn_id = id & 0xffff; if (ais_service[service] == NULL && service == EVT_SERVICE) { evil_deliver_fn (nodeid, service, fn_id, msg, endian_conversion_required); } if (!ais_service[service]) { return; } if (fn_id >= ais_service[service]->exec_engine_count) { log_printf(LOGSYS_LEVEL_WARNING, "discarded unknown message %d for service %d (max id %d)", fn_id, service, ais_service[service]->exec_engine_count); return; } objdb->object_key_increment (service_stats_handle[service][fn_id], "rx", strlen("rx"), &key_incr_dummy); if (endian_conversion_required) { assert(ais_service[service]->exec_engine[fn_id].exec_endian_convert_fn != NULL); ais_service[service]->exec_engine[fn_id].exec_endian_convert_fn ((void *)msg); } ais_service[service]->exec_engine[fn_id].exec_handler_fn (msg, nodeid); } void main_get_config_modules(struct config_iface_ver0 ***modules, int *num) { *modules = config_modules; *num = num_config_modules; } int main_mcast ( const struct iovec *iovec, unsigned int iov_len, unsigned int guarantee) { const struct qb_ipc_request_header *req = iovec->iov_base; int32_t service; int32_t fn_id; uint32_t key_incr_dummy; service = req->id >> 16; fn_id = req->id & 0xffff; if (ais_service[service]) { objdb->object_key_increment (service_stats_handle[service][fn_id], "tx", strlen("tx"), &key_incr_dummy); } return (totempg_groups_mcast_joined (corosync_group_handle, iovec, iov_len, guarantee)); } static qb_loop_timer_handle recheck_the_q_level_timer; void corosync_recheck_the_q_level(void *data) { totempg_check_q_level(corosync_group_handle); if (cs_ipcs_q_level_get() == TOTEM_Q_LEVEL_CRITICAL) { qb_loop_timer_add(cs_poll_handle_get(), QB_LOOP_MED, 1*QB_TIME_NS_IN_MSEC, NULL, corosync_recheck_the_q_level, &recheck_the_q_level_timer); } } struct sending_allowed_private_data_struct { int reserved_msgs; }; int corosync_sending_allowed ( unsigned int service, unsigned int id, const void *msg, void *sending_allowed_private_data) { struct sending_allowed_private_data_struct *pd = (struct sending_allowed_private_data_struct *)sending_allowed_private_data; struct iovec reserve_iovec; struct qb_ipc_request_header *header = (struct qb_ipc_request_header *)msg; int sending_allowed; reserve_iovec.iov_base = (char *)header; reserve_iovec.iov_len = header->size; pd->reserved_msgs = totempg_groups_joined_reserve ( corosync_group_handle, &reserve_iovec, 1); if (pd->reserved_msgs == -1) { return -EINVAL; } sending_allowed = QB_FALSE; if (corosync_quorum_is_quorate() == 1 || ais_service[service]->allow_inquorate == CS_LIB_ALLOW_INQUORATE) { // we are quorate // now check flow control if (ais_service[service]->lib_engine[id].flow_control == CS_LIB_FLOW_CONTROL_NOT_REQUIRED) { sending_allowed = QB_TRUE; } else if (pd->reserved_msgs && sync_in_process == 0) { sending_allowed = QB_TRUE; } else if (pd->reserved_msgs == 0) { return -ENOBUFS; } else /* (sync_in_process) */ { return -EINPROGRESS; } } else { return -EHOSTUNREACH; } return (sending_allowed); } void corosync_sending_allowed_release (void *sending_allowed_private_data) { struct sending_allowed_private_data_struct *pd = (struct sending_allowed_private_data_struct *)sending_allowed_private_data; if (pd->reserved_msgs == -1) { return; } totempg_groups_joined_release (pd->reserved_msgs); } int message_source_is_local (const mar_message_source_t *source) { int ret = 0; assert (source != NULL); if (source->nodeid == totempg_my_nodeid_get ()) { ret = 1; } return ret; } void message_source_set ( mar_message_source_t *source, void *conn) { assert ((source != NULL) && (conn != NULL)); memset (source, 0, sizeof (mar_message_source_t)); source->nodeid = totempg_my_nodeid_get (); source->conn = conn; } static void corosync_setscheduler (void) { #if defined(HAVE_PTHREAD_SETSCHEDPARAM) && defined(HAVE_SCHED_GET_PRIORITY_MAX) && defined(HAVE_SCHED_SETSCHEDULER) int res; sched_priority = sched_get_priority_max (SCHED_RR); if (sched_priority != -1) { global_sched_param.sched_priority = sched_priority; res = sched_setscheduler (0, SCHED_RR, &global_sched_param); if (res == -1) { LOGSYS_PERROR(errno, LOGSYS_LEVEL_WARNING, "Could not set SCHED_RR at priority %d", global_sched_param.sched_priority); global_sched_param.sched_priority = 0; #ifdef HAVE_QB_LOG_THREAD_PRIORITY_SET qb_log_thread_priority_set (SCHED_OTHER, 0); #endif } else { /* * Turn on SCHED_RR in logsys system */ #ifdef HAVE_QB_LOG_THREAD_PRIORITY_SET res = qb_log_thread_priority_set (SCHED_RR, sched_priority); #else res = -1; #endif if (res == -1) { log_printf (LOGSYS_LEVEL_ERROR, "Could not set logsys thread priority." " Can't continue because of priority inversions."); corosync_exit_error (AIS_DONE_LOGSETUP); } } } else { LOGSYS_PERROR (errno, LOGSYS_LEVEL_WARNING, "Could not get maximum scheduler priority"); sched_priority = 0; } #else log_printf(LOGSYS_LEVEL_WARNING, "The Platform is missing process priority setting features. Leaving at default."); #endif } static void _logsys_log_printf(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format, ...) __attribute__((format(printf, 6, 7))); static void _logsys_log_printf(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format, ...) { va_list ap; va_start(ap, format); qb_log_from_external_source_va(function_name, file_name, format, level, file_line, subsys, ap); va_end(ap); } static void fplay_key_change_notify_fn ( object_change_type_t change_type, hdb_handle_t parent_object_handle, hdb_handle_t object_handle, const void *object_name_pt, size_t object_name_len, const void *key_name_pt, size_t key_len, const void *key_value_pt, size_t key_value_len, void *priv_data_pt) { if (key_len == strlen ("dump_flight_data") && memcmp ("dump_flight_data", key_name_pt, key_len) == 0) { qb_log_blackbox_write_to_file (LOCALSTATEDIR "/lib/corosync/fdata"); } if (key_len == strlen ("dump_state") && memcmp ("dump_state", key_name_pt, key_len) == 0) { corosync_state_dump (); } } static void corosync_fplay_control_init (void) { hdb_handle_t object_find_handle; hdb_handle_t object_runtime_handle; hdb_handle_t object_blackbox_handle; objdb->object_find_create (OBJECT_PARENT_HANDLE, "runtime", strlen ("runtime"), &object_find_handle); if (objdb->object_find_next (object_find_handle, &object_runtime_handle) != 0) { return; } objdb->object_create (object_runtime_handle, &object_blackbox_handle, "blackbox", strlen ("blackbox")); objdb->object_key_create_typed (object_blackbox_handle, "dump_flight_data", "no", strlen("no"), OBJDB_VALUETYPE_STRING); objdb->object_key_create_typed (object_blackbox_handle, "dump_state", "no", strlen("no"), OBJDB_VALUETYPE_STRING); objdb->object_track_start (object_blackbox_handle, OBJECT_TRACK_DEPTH_RECURSIVE, fplay_key_change_notify_fn, NULL, NULL, NULL, NULL); } static void main_service_ready (void) { int res; /* * This must occur after totempg is initialized because "this_ip" must be set */ res = corosync_service_defaults_link_and_init (api); if (res == -1) { log_printf (LOGSYS_LEVEL_ERROR, "Could not initialize default services\n"); corosync_exit_error (AIS_DONE_INIT_SERVICES); } evil_init (api); cs_ipcs_init(); corosync_totem_stats_init (); corosync_fplay_control_init (); + corosync_totem_dynamic_init (); if (minimum_sync_mode == CS_SYNC_V2) { log_printf (LOGSYS_LEVEL_NOTICE, "Compatibility mode set to none. Using V2 of the synchronization engine.\n"); sync_v2_init ( corosync_sync_v2_callbacks_retrieve, corosync_sync_completed); } else if (minimum_sync_mode == CS_SYNC_V1) { log_printf (LOGSYS_LEVEL_NOTICE, "Compatibility mode set to whitetank. Using V1 and V2 of the synchronization engine.\n"); sync_register ( corosync_sync_callbacks_retrieve, sync_v2_memb_list_determine, sync_v2_memb_list_abort, sync_v2_start); sync_v2_init ( corosync_sync_v2_callbacks_retrieve, corosync_sync_completed); } } static enum e_ais_done corosync_flock (const char *lockfile, pid_t pid) { struct flock lock; enum e_ais_done err; char pid_s[17]; int fd_flag; int lf; err = AIS_DONE_EXIT; lf = open (lockfile, O_WRONLY | O_CREAT, 0640); if (lf == -1) { log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't create lock file.\n"); return (AIS_DONE_AQUIRE_LOCK); } retry_fcntl: lock.l_type = F_WRLCK; lock.l_start = 0; lock.l_whence = SEEK_SET; lock.l_len = 0; if (fcntl (lf, F_SETLK, &lock) == -1) { switch (errno) { case EINTR: goto retry_fcntl; break; case EAGAIN: case EACCES: log_printf (LOGSYS_LEVEL_ERROR, "Another Corosync instance is already running.\n"); err = AIS_DONE_ALREADY_RUNNING; goto error_close; break; default: log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't aquire lock. Error was %s\n", strerror(errno)); err = AIS_DONE_AQUIRE_LOCK; goto error_close; break; } } if (ftruncate (lf, 0) == -1) { log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't truncate lock file. Error was %s\n", strerror (errno)); err = AIS_DONE_AQUIRE_LOCK; goto error_close_unlink; } memset (pid_s, 0, sizeof (pid_s)); snprintf (pid_s, sizeof (pid_s) - 1, "%u\n", pid); retry_write: if (write (lf, pid_s, strlen (pid_s)) != strlen (pid_s)) { if (errno == EINTR) { goto retry_write; } else { log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't write pid to lock file. " "Error was %s\n", strerror (errno)); err = AIS_DONE_AQUIRE_LOCK; goto error_close_unlink; } } if ((fd_flag = fcntl (lf, F_GETFD, 0)) == -1) { log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't get close-on-exec flag from lock file. " "Error was %s\n", strerror (errno)); err = AIS_DONE_AQUIRE_LOCK; goto error_close_unlink; } fd_flag |= FD_CLOEXEC; if (fcntl (lf, F_SETFD, fd_flag) == -1) { log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't set close-on-exec flag to lock file. " "Error was %s\n", strerror (errno)); err = AIS_DONE_AQUIRE_LOCK; goto error_close_unlink; } return (err); error_close_unlink: unlink (lockfile); error_close: close (lf); return (err); } int main (int argc, char **argv, char **envp) { const char *error_string; struct totem_config totem_config; hdb_handle_t objdb_handle; hdb_handle_t config_handle; unsigned int config_version = 0; void *objdb_p; struct config_iface_ver0 *config; void *config_p; const char *config_iface_init; char *config_iface; char *iface; char *strtok_save_pt; int res, ch; int background, setprio; struct stat stat_out; char corosync_lib_dir[PATH_MAX]; hdb_handle_t object_runtime_handle; enum e_ais_done flock_err; /* default configuration */ background = 1; setprio = 0; while ((ch = getopt (argc, argv, "fprv")) != EOF) { switch (ch) { case 'f': background = 0; logsys_config_mode_set (NULL, LOGSYS_MODE_OUTPUT_STDERR|LOGSYS_MODE_THREADED|LOGSYS_MODE_FORK); break; case 'p': break; case 'r': setprio = 1; break; case 'v': printf ("Corosync Cluster Engine, version '%s'\n", VERSION); printf ("Copyright (c) 2006-2009 Red Hat, Inc.\n"); return EXIT_SUCCESS; break; default: fprintf(stderr, \ "usage:\n"\ " -f : Start application in foreground.\n"\ " -p : Does nothing. \n"\ " -r : Set round robin realtime scheduling \n"\ " -v : Display version and SVN revision of Corosync and exit.\n"); return EXIT_FAILURE; } } /* * Set round robin realtime scheduling with priority 99 * Lock all memory to avoid page faults which may interrupt * application healthchecking */ if (setprio) { corosync_setscheduler (); } corosync_mlockall (); log_printf (LOGSYS_LEVEL_NOTICE, "Corosync Cluster Engine ('%s'): started and ready to provide service.\n", VERSION); log_printf (LOGSYS_LEVEL_INFO, "Corosync built-in features:" PACKAGE_FEATURES "\n"); corosync_poll_handle = qb_loop_create (); qb_loop_signal_add(corosync_poll_handle, QB_LOOP_LOW, SIGUSR2, NULL, sig_diag_handler, NULL); qb_loop_signal_add(corosync_poll_handle, QB_LOOP_HIGH, SIGINT, NULL, sig_exit_handler, NULL); qb_loop_signal_add(corosync_poll_handle, QB_LOOP_HIGH, SIGQUIT, NULL, sig_exit_handler, NULL); qb_loop_signal_add(corosync_poll_handle, QB_LOOP_HIGH, SIGTERM, NULL, sig_exit_handler, NULL); (void)signal (SIGSEGV, sigsegv_handler); (void)signal (SIGABRT, sigabrt_handler); #if MSG_NOSIGNAL != 0 (void)signal (SIGPIPE, SIG_IGN); #endif /* * Load the object database interface */ res = lcr_ifact_reference ( &objdb_handle, "objdb", 0, &objdb_p, 0); if (res == -1) { log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't open configuration object database component.\n"); corosync_exit_error (AIS_DONE_OBJDB); } objdb = (struct objdb_iface_ver0 *)objdb_p; objdb->objdb_init (); /* * Initialize the corosync_api_v1 definition */ apidef_init (objdb); api = apidef_get (); num_config_modules = 0; /* * Bootstrap in the default configuration parser or use * the corosync default built in parser if the configuration parser * isn't overridden */ config_iface_init = getenv("COROSYNC_DEFAULT_CONFIG_IFACE"); if (!config_iface_init) { config_iface_init = "corosync_parser"; } /* Make a copy so we can deface it with strtok */ if ((config_iface = strdup(config_iface_init)) == NULL) { log_printf (LOGSYS_LEVEL_ERROR, "exhausted virtual memory"); corosync_exit_error (AIS_DONE_OBJDB); } iface = strtok_r(config_iface, ":", &strtok_save_pt); while (iface) { res = lcr_ifact_reference ( &config_handle, iface, config_version, &config_p, 0); config = (struct config_iface_ver0 *)config_p; if (res == -1) { log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't open configuration component '%s'\n", iface); corosync_exit_error (AIS_DONE_MAINCONFIGREAD); } res = config->config_readconfig(objdb, &error_string); if (res == -1) { log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string); corosync_exit_error (AIS_DONE_MAINCONFIGREAD); } log_printf (LOGSYS_LEVEL_NOTICE, "%s", error_string); config_modules[num_config_modules++] = config; iface = strtok_r(NULL, ":", &strtok_save_pt); } free(config_iface); res = corosync_main_config_read (objdb, &error_string); if (res == -1) { /* * if we are here, we _must_ flush the logsys queue * and try to inform that we couldn't read the config. * this is a desperate attempt before certain death * and there is no guarantee that we can print to stderr * nor that logsys is sending the messages where we expect. */ log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string); fprintf(stderr, "%s", error_string); syslog (LOGSYS_LEVEL_ERROR, "%s", error_string); corosync_exit_error (AIS_DONE_MAINCONFIGREAD); } /* * Make sure required directory is present */ sprintf (corosync_lib_dir, "%s/lib/corosync", LOCALSTATEDIR); res = stat (corosync_lib_dir, &stat_out); if ((res == -1) || (res == 0 && !S_ISDIR(stat_out.st_mode))) { log_printf (LOGSYS_LEVEL_ERROR, "Required directory not present %s. Please create it.\n", corosync_lib_dir); corosync_exit_error (AIS_DONE_DIR_NOT_PRESENT); } res = totem_config_read (objdb, &totem_config, &error_string); if (res == -1) { log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string); corosync_exit_error (AIS_DONE_MAINCONFIGREAD); } res = totem_config_keyread (objdb, &totem_config, &error_string); if (res == -1) { log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string); corosync_exit_error (AIS_DONE_MAINCONFIGREAD); } res = totem_config_validate (&totem_config, &error_string); if (res == -1) { log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string); corosync_exit_error (AIS_DONE_MAINCONFIGREAD); } totem_config.totem_logging_configuration = totem_logging_configuration; totem_config.totem_logging_configuration.log_subsys_id = _logsys_subsys_create("TOTEM", "totem"); totem_config.totem_logging_configuration.log_level_security = LOGSYS_LEVEL_WARNING; totem_config.totem_logging_configuration.log_level_error = LOGSYS_LEVEL_ERROR; totem_config.totem_logging_configuration.log_level_warning = LOGSYS_LEVEL_WARNING; totem_config.totem_logging_configuration.log_level_notice = LOGSYS_LEVEL_NOTICE; totem_config.totem_logging_configuration.log_level_debug = LOGSYS_LEVEL_DEBUG; totem_config.totem_logging_configuration.log_printf = _logsys_log_printf; logsys_config_apply(); res = corosync_main_config_compatibility_read (objdb, &minimum_sync_mode, &error_string); if (res == -1) { log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string); corosync_exit_error (AIS_DONE_MAINCONFIGREAD); } /* create the main runtime object */ objdb->object_create (OBJECT_PARENT_HANDLE, &object_runtime_handle, "runtime", strlen ("runtime")); /* * Now we are fully initialized. */ if (background) { corosync_tty_detach (); } qb_log_thread_start(); if ((flock_err = corosync_flock (corosync_lock_file, getpid ())) != AIS_DONE_EXIT) { corosync_exit_error (flock_err); } /* * if totempg_initialize doesn't have root priveleges, it cannot * bind to a specific interface. This only matters if * there is more then one interface in a system, so * in this case, only a warning is printed */ /* * Join multicast group and setup delivery * and configuration change functions */ totempg_initialize ( corosync_poll_handle, &totem_config); totempg_service_ready_register ( main_service_ready); totempg_groups_initialize ( &corosync_group_handle, deliver_fn, confchg_fn); totempg_groups_join ( corosync_group_handle, &corosync_group, 1); /* * Drop root privleges to user 'ais' * TODO: Don't really need full root capabilities; * needed capabilities are: * CAP_NET_RAW (bindtodevice) * CAP_SYS_NICE (setscheduler) * CAP_IPC_LOCK (mlockall) */ priv_drop (); schedwrk_init ( serialize_lock, serialize_unlock); /* * Start main processing loop */ qb_loop_run (corosync_poll_handle); /* * Exit was requested */ totempg_finalize (); /* * Remove pid lock file */ unlink (corosync_lock_file); corosync_exit_error (AIS_DONE_EXIT); return EXIT_SUCCESS; } diff --git a/exec/totempg.c b/exec/totempg.c index 651d39d5..3ece489a 100644 --- a/exec/totempg.c +++ b/exec/totempg.c @@ -1,1458 +1,1464 @@ /* * Copyright (c) 2003-2005 MontaVista Software, Inc. * Copyright (c) 2005 OSDL. * Copyright (c) 2006-2009 Red Hat, Inc. * * All rights reserved. * * Author: Steven Dake (sdake@redhat.com) * Author: Mark Haverkamp (markh@osdl.org) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ /* * FRAGMENTATION AND PACKING ALGORITHM: * * Assemble the entire message into one buffer * if full fragment * store fragment into lengths list * for each full fragment * multicast fragment * set length and fragment fields of pg mesage * store remaining multicast into head of fragmentation data and set lens field * * If a message exceeds the maximum packet size allowed by the totem * single ring protocol, the protocol could lose forward progress. * Statically calculating the allowed data amount doesn't work because * the amount of data allowed depends on the number of fragments in * each message. In this implementation, the maximum fragment size * is dynamically calculated for each fragment added to the message. * It is possible for a message to be two bytes short of the maximum * packet size. This occurs when a message or collection of * messages + the mcast header + the lens are two bytes short of the * end of the packet. Since another len field consumes two bytes, the * len field would consume the rest of the packet without room for data. * * One optimization would be to forgo the final len field and determine * it from the size of the udp datagram. Then this condition would no * longer occur. */ /* * ASSEMBLY AND UNPACKING ALGORITHM: * * copy incoming packet into assembly data buffer indexed by current * location of end of fragment * * if not fragmented * deliver all messages in assembly data buffer * else * if msg_count > 1 and fragmented * deliver all messages except last message in assembly data buffer * copy last fragmented section to start of assembly data buffer * else * if msg_count = 1 and fragmented * do nothing * */ #include #ifdef HAVE_ALLOCA_H #include #endif #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define LOGSYS_UTILS_ONLY 1 #include #include "totemmrp.h" #include "totemsrp.h" #define min(a,b) ((a) < (b)) ? a : b struct totempg_mcast_header { short version; short type; }; #if !(defined(__i386__) || defined(__x86_64__)) /* * Need align on architectures different then i386 or x86_64 */ #define TOTEMPG_NEED_ALIGN 1 #endif /* * totempg_mcast structure * * header: Identify the mcast. * fragmented: Set if this message continues into next message * continuation: Set if this message is a continuation from last message * msg_count Indicates how many packed messages are contained * in the mcast. * Also, the size of each packed message and the messages themselves are * appended to the end of this structure when sent. */ struct totempg_mcast { struct totempg_mcast_header header; unsigned char fragmented; unsigned char continuation; unsigned short msg_count; /* * short msg_len[msg_count]; */ /* * data for messages */ }; /* * Maximum packet size for totem pg messages */ #define TOTEMPG_PACKET_SIZE (totempg_totem_config->net_mtu - \ sizeof (struct totempg_mcast)) /* * Local variables used for packing small messages */ static unsigned short mcast_packed_msg_lens[FRAME_SIZE_MAX]; static int mcast_packed_msg_count = 0; static int totempg_reserved = 1; static unsigned int totempg_size_limit; static totem_queue_level_changed_fn totem_queue_level_changed = NULL; static uint32_t totempg_threaded_mode = 0; /* * Function and data used to log messages */ static int totempg_log_level_security; static int totempg_log_level_error; static int totempg_log_level_warning; static int totempg_log_level_notice; static int totempg_log_level_debug; static int totempg_subsys_id; static void (*totempg_log_printf) ( int level, int subsys, const char *function, const char *file, int line, const char *format, ...) __attribute__((format(printf, 6, 7))); struct totem_config *totempg_totem_config; static totempg_stats_t totempg_stats; enum throw_away_mode { THROW_AWAY_INACTIVE, THROW_AWAY_ACTIVE }; struct assembly { unsigned int nodeid; unsigned char data[MESSAGE_SIZE_MAX]; int index; unsigned char last_frag_num; enum throw_away_mode throw_away_mode; struct list_head list; }; static void assembly_deref (struct assembly *assembly); static int callback_token_received_fn (enum totem_callback_token_type type, const void *data); DECLARE_LIST_INIT(assembly_list_inuse); DECLARE_LIST_INIT(assembly_list_free); DECLARE_LIST_INIT(totempg_groups_list); /* * Staging buffer for packed messages. Messages are staged in this buffer * before sending. Multiple messages may fit which cuts down on the * number of mcasts sent. If a message doesn't completely fit, then * the mcast header has a fragment bit set that says that there are more * data to follow. fragment_size is an index into the buffer. It indicates * the size of message data and where to place new message data. * fragment_contuation indicates whether the first packed message in * the buffer is a continuation of a previously packed fragment. */ static unsigned char *fragmentation_data; static int fragment_size = 0; static int fragment_continuation = 0; static struct iovec iov_delv; struct totempg_group_instance { void (*deliver_fn) ( unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required); void (*confchg_fn) ( enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id); struct totempg_group *groups; int groups_cnt; int32_t q_level; struct list_head list; }; static unsigned char next_fragment = 1; static pthread_mutex_t totempg_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t callback_token_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t mcast_msg_mutex = PTHREAD_MUTEX_INITIALIZER; #define log_printf(level, format, args...) \ do { \ totempg_log_printf(level, \ totempg_subsys_id, \ __FUNCTION__, __FILE__, __LINE__, \ format, ##args); \ } while (0); static int msg_count_send_ok (int msg_count); static int byte_count_send_ok (int byte_count); static struct assembly *assembly_ref (unsigned int nodeid) { struct assembly *assembly; struct list_head *list; /* * Search inuse list for node id and return assembly buffer if found */ for (list = assembly_list_inuse.next; list != &assembly_list_inuse; list = list->next) { assembly = list_entry (list, struct assembly, list); if (nodeid == assembly->nodeid) { return (assembly); } } /* * Nothing found in inuse list get one from free list if available */ if (list_empty (&assembly_list_free) == 0) { assembly = list_entry (assembly_list_free.next, struct assembly, list); list_del (&assembly->list); list_add (&assembly->list, &assembly_list_inuse); assembly->nodeid = nodeid; assembly->index = 0; assembly->last_frag_num = 0; assembly->throw_away_mode = THROW_AWAY_INACTIVE; return (assembly); } /* * Nothing available in inuse or free list, so allocate a new one */ assembly = malloc (sizeof (struct assembly)); /* * TODO handle memory allocation failure here */ assert (assembly); assembly->nodeid = nodeid; assembly->data[0] = 0; assembly->index = 0; assembly->last_frag_num = 0; assembly->throw_away_mode = THROW_AWAY_INACTIVE; list_init (&assembly->list); list_add (&assembly->list, &assembly_list_inuse); return (assembly); } static void assembly_deref (struct assembly *assembly) { list_del (&assembly->list); list_add (&assembly->list, &assembly_list_free); } static inline void app_confchg_fn ( enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id) { int i; struct totempg_group_instance *instance; struct assembly *assembly; struct list_head *list; /* * For every leaving processor, add to free list * This also has the side effect of clearing out the dataset * In the leaving processor's assembly buffer. */ for (i = 0; i < left_list_entries; i++) { assembly = assembly_ref (left_list[i]); list_del (&assembly->list); list_add (&assembly->list, &assembly_list_free); } for (list = totempg_groups_list.next; list != &totempg_groups_list; list = list->next) { instance = list_entry (list, struct totempg_group_instance, list); if (instance->confchg_fn) { instance->confchg_fn ( configuration_type, member_list, member_list_entries, left_list, left_list_entries, joined_list, joined_list_entries, ring_id); } } } static inline void group_endian_convert ( void *msg, int msg_len) { unsigned short *group_len; int i; char *aligned_msg; #ifdef TOTEMPG_NEED_ALIGN /* * Align data structure for not i386 or x86_64 */ if ((size_t)msg % 4 != 0) { aligned_msg = alloca(msg_len); memcpy(aligned_msg, msg, msg_len); } else { aligned_msg = msg; } #else aligned_msg = msg; #endif group_len = (unsigned short *)aligned_msg; group_len[0] = swab16(group_len[0]); for (i = 1; i < group_len[0] + 1; i++) { group_len[i] = swab16(group_len[i]); } if (aligned_msg != msg) { memcpy(msg, aligned_msg, msg_len); } } static inline int group_matches ( struct iovec *iovec, unsigned int iov_len, struct totempg_group *groups_b, unsigned int group_b_cnt, unsigned int *adjust_iovec) { unsigned short *group_len; char *group_name; int i; int j; #ifdef TOTEMPG_NEED_ALIGN struct iovec iovec_aligned = { NULL, 0 }; #endif assert (iov_len == 1); #ifdef TOTEMPG_NEED_ALIGN /* * Align data structure for not i386 or x86_64 */ if ((size_t)iovec->iov_base % 4 != 0) { iovec_aligned.iov_base = alloca(iovec->iov_len); memcpy(iovec_aligned.iov_base, iovec->iov_base, iovec->iov_len); iovec_aligned.iov_len = iovec->iov_len; iovec = &iovec_aligned; } #endif group_len = (unsigned short *)iovec->iov_base; group_name = ((char *)iovec->iov_base) + sizeof (unsigned short) * (group_len[0] + 1); /* * Calculate amount to adjust the iovec by before delivering to app */ *adjust_iovec = sizeof (unsigned short) * (group_len[0] + 1); for (i = 1; i < group_len[0] + 1; i++) { *adjust_iovec += group_len[i]; } /* * Determine if this message should be delivered to this instance */ for (i = 1; i < group_len[0] + 1; i++) { for (j = 0; j < group_b_cnt; j++) { if ((group_len[i] == groups_b[j].group_len) && (memcmp (groups_b[j].group, group_name, group_len[i]) == 0)) { return (1); } } group_name += group_len[i]; } return (0); } static inline void app_deliver_fn ( unsigned int nodeid, void *msg, unsigned int msg_len, int endian_conversion_required) { struct totempg_group_instance *instance; struct iovec stripped_iovec; unsigned int adjust_iovec; struct iovec *iovec; struct list_head *list; struct iovec aligned_iovec = { NULL, 0 }; if (endian_conversion_required) { group_endian_convert (msg, msg_len); } /* * TODO: segmentation/assembly need to be redesigned to provide aligned access * in all cases to avoid memory copies on non386 archs. Probably broke backwars * compatibility */ #ifdef TOTEMPG_NEED_ALIGN /* * Align data structure for not i386 or x86_64 */ aligned_iovec.iov_base = alloca(msg_len); aligned_iovec.iov_len = msg_len; memcpy(aligned_iovec.iov_base, msg, msg_len); #else aligned_iovec.iov_base = msg; aligned_iovec.iov_len = msg_len; #endif iovec = &aligned_iovec; for (list = totempg_groups_list.next; list != &totempg_groups_list; list = list->next) { instance = list_entry (list, struct totempg_group_instance, list); if (group_matches (iovec, 1, instance->groups, instance->groups_cnt, &adjust_iovec)) { stripped_iovec.iov_len = iovec->iov_len - adjust_iovec; stripped_iovec.iov_base = (char *)iovec->iov_base + adjust_iovec; #ifdef TOTEMPG_NEED_ALIGN /* * Align data structure for not i386 or x86_64 */ if ((char *)iovec->iov_base + adjust_iovec % 4 != 0) { /* * Deal with misalignment */ stripped_iovec.iov_base = alloca (stripped_iovec.iov_len); memcpy (stripped_iovec.iov_base, (char *)iovec->iov_base + adjust_iovec, stripped_iovec.iov_len); } #endif instance->deliver_fn ( nodeid, stripped_iovec.iov_base, stripped_iovec.iov_len, endian_conversion_required); } } } static void totempg_confchg_fn ( enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id) { // TODO optimize this app_confchg_fn (configuration_type, member_list, member_list_entries, left_list, left_list_entries, joined_list, joined_list_entries, ring_id); } static void totempg_deliver_fn ( unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required) { struct totempg_mcast *mcast; unsigned short *msg_lens; int i; struct assembly *assembly; char header[FRAME_SIZE_MAX]; int msg_count; int continuation; int start; const char *data; int datasize; assembly = assembly_ref (nodeid); assert (assembly); /* * Assemble the header into one block of data and * assemble the packet contents into one block of data to simplify delivery */ mcast = (struct totempg_mcast *)msg; if (endian_conversion_required) { mcast->msg_count = swab16 (mcast->msg_count); } msg_count = mcast->msg_count; datasize = sizeof (struct totempg_mcast) + msg_count * sizeof (unsigned short); memcpy (header, msg, datasize); data = msg; msg_lens = (unsigned short *) (header + sizeof (struct totempg_mcast)); if (endian_conversion_required) { for (i = 0; i < mcast->msg_count; i++) { msg_lens[i] = swab16 (msg_lens[i]); } } memcpy (&assembly->data[assembly->index], &data[datasize], msg_len - datasize); /* * If the last message in the buffer is a fragment, then we * can't deliver it. We'll first deliver the full messages * then adjust the assembly buffer so we can add the rest of the * fragment when it arrives. */ msg_count = mcast->fragmented ? mcast->msg_count - 1 : mcast->msg_count; continuation = mcast->continuation; iov_delv.iov_base = (void *)&assembly->data[0]; iov_delv.iov_len = assembly->index + msg_lens[0]; /* * Make sure that if this message is a continuation, that it * matches the sequence number of the previous fragment. * Also, if the first packed message is a continuation * of a previous message, but the assembly buffer * is empty, then we need to discard it since we can't * assemble a complete message. Likewise, if this message isn't a * continuation and the assembly buffer is empty, we have to discard * the continued message. */ start = 0; if (assembly->throw_away_mode == THROW_AWAY_ACTIVE) { /* Throw away the first msg block */ if (mcast->fragmented == 0 || mcast->fragmented == 1) { assembly->throw_away_mode = THROW_AWAY_INACTIVE; assembly->index += msg_lens[0]; iov_delv.iov_base = (void *)&assembly->data[assembly->index]; iov_delv.iov_len = msg_lens[1]; start = 1; } } else if (assembly->throw_away_mode == THROW_AWAY_INACTIVE) { if (continuation == assembly->last_frag_num) { assembly->last_frag_num = mcast->fragmented; for (i = start; i < msg_count; i++) { app_deliver_fn(nodeid, iov_delv.iov_base, iov_delv.iov_len, endian_conversion_required); assembly->index += msg_lens[i]; iov_delv.iov_base = (void *)&assembly->data[assembly->index]; if (i < (msg_count - 1)) { iov_delv.iov_len = msg_lens[i + 1]; } } } else { assembly->throw_away_mode = THROW_AWAY_ACTIVE; } } if (mcast->fragmented == 0) { /* * End of messages, dereference assembly struct */ assembly->last_frag_num = 0; assembly->index = 0; assembly_deref (assembly); } else { /* * Message is fragmented, keep around assembly list */ if (mcast->msg_count > 1) { memmove (&assembly->data[0], &assembly->data[assembly->index], msg_lens[msg_count]); assembly->index = 0; } assembly->index += msg_lens[msg_count]; } } /* * Totem Process Group Abstraction * depends on poll abstraction, POSIX, IPV4 */ void *callback_token_received_handle; int callback_token_received_fn (enum totem_callback_token_type type, const void *data) { struct totempg_mcast mcast; struct iovec iovecs[3]; if (totempg_threaded_mode == 1) { pthread_mutex_lock (&mcast_msg_mutex); } if (mcast_packed_msg_count == 0) { if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&mcast_msg_mutex); } return (0); } if (totemmrp_avail() == 0) { if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&mcast_msg_mutex); } return (0); } mcast.header.version = 0; mcast.header.type = 0; mcast.fragmented = 0; /* * Was the first message in this buffer a continuation of a * fragmented message? */ mcast.continuation = fragment_continuation; fragment_continuation = 0; mcast.msg_count = mcast_packed_msg_count; iovecs[0].iov_base = (void *)&mcast; iovecs[0].iov_len = sizeof (struct totempg_mcast); iovecs[1].iov_base = (void *)mcast_packed_msg_lens; iovecs[1].iov_len = mcast_packed_msg_count * sizeof (unsigned short); iovecs[2].iov_base = (void *)&fragmentation_data[0]; iovecs[2].iov_len = fragment_size; (void)totemmrp_mcast (iovecs, 3, 0); mcast_packed_msg_count = 0; fragment_size = 0; if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&mcast_msg_mutex); } return (0); } /* * Initialize the totem process group abstraction */ int totempg_initialize ( qb_loop_t *poll_handle, struct totem_config *totem_config) { int res; totempg_totem_config = totem_config; totempg_log_level_security = totem_config->totem_logging_configuration.log_level_security; totempg_log_level_error = totem_config->totem_logging_configuration.log_level_error; totempg_log_level_warning = totem_config->totem_logging_configuration.log_level_warning; totempg_log_level_notice = totem_config->totem_logging_configuration.log_level_notice; totempg_log_level_debug = totem_config->totem_logging_configuration.log_level_debug; totempg_log_printf = totem_config->totem_logging_configuration.log_printf; totempg_subsys_id = totem_config->totem_logging_configuration.log_subsys_id; fragmentation_data = malloc (TOTEMPG_PACKET_SIZE); if (fragmentation_data == 0) { return (-1); } totemsrp_net_mtu_adjust (totem_config); res = totemmrp_initialize ( poll_handle, totem_config, &totempg_stats, totempg_deliver_fn, totempg_confchg_fn); totemmrp_callback_token_create ( &callback_token_received_handle, TOTEM_CALLBACK_TOKEN_RECEIVED, 0, callback_token_received_fn, 0); totempg_size_limit = (totemmrp_avail() - 1) * (totempg_totem_config->net_mtu - sizeof (struct totempg_mcast) - 16); list_init (&totempg_groups_list); return (res); } void totempg_finalize (void) { if (totempg_threaded_mode == 1) { pthread_mutex_lock (&totempg_mutex); } totemmrp_finalize (); if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&totempg_mutex); } } /* * Multicast a message */ static int mcast_msg ( struct iovec *iovec_in, unsigned int iov_len, int guarantee) { int res = 0; struct totempg_mcast mcast; struct iovec iovecs[3]; struct iovec iovec[64]; int i; int dest, src; int max_packet_size = 0; int copy_len = 0; int copy_base = 0; int total_size = 0; if (totempg_threaded_mode == 1) { pthread_mutex_lock (&mcast_msg_mutex); } totemmrp_event_signal (TOTEM_EVENT_NEW_MSG, 1); /* * Remove zero length iovectors from the list */ assert (iov_len < 64); for (dest = 0, src = 0; src < iov_len; src++) { if (iovec_in[src].iov_len) { memcpy (&iovec[dest++], &iovec_in[src], sizeof (struct iovec)); } } iov_len = dest; max_packet_size = TOTEMPG_PACKET_SIZE - (sizeof (unsigned short) * (mcast_packed_msg_count + 1)); mcast_packed_msg_lens[mcast_packed_msg_count] = 0; /* * Check if we would overwrite new message queue */ for (i = 0; i < iov_len; i++) { total_size += iovec[i].iov_len; } if (byte_count_send_ok (total_size + sizeof(unsigned short) * (mcast_packed_msg_count)) == 0) { if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&mcast_msg_mutex); } return(-1); } mcast.header.version = 0; for (i = 0; i < iov_len; ) { mcast.fragmented = 0; mcast.continuation = fragment_continuation; copy_len = iovec[i].iov_len - copy_base; /* * If it all fits with room left over, copy it in. * We need to leave at least sizeof(short) + 1 bytes in the * fragment_buffer on exit so that max_packet_size + fragment_size * doesn't exceed the size of the fragment_buffer on the next call. */ if ((copy_len + fragment_size) < (max_packet_size - sizeof (unsigned short))) { memcpy (&fragmentation_data[fragment_size], (char *)iovec[i].iov_base + copy_base, copy_len); fragment_size += copy_len; mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len; next_fragment = 1; copy_len = 0; copy_base = 0; i++; continue; /* * If it just fits or is too big, then send out what fits. */ } else { unsigned char *data_ptr; copy_len = min(copy_len, max_packet_size - fragment_size); if( copy_len == max_packet_size ) data_ptr = (unsigned char *)iovec[i].iov_base + copy_base; else { data_ptr = fragmentation_data; memcpy (&fragmentation_data[fragment_size], (unsigned char *)iovec[i].iov_base + copy_base, copy_len); } memcpy (&fragmentation_data[fragment_size], (unsigned char *)iovec[i].iov_base + copy_base, copy_len); mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len; /* * if we're not on the last iovec or the iovec is too large to * fit, then indicate a fragment. This also means that the next * message will have the continuation of this one. */ if ((i < (iov_len - 1)) || ((copy_base + copy_len) < iovec[i].iov_len)) { if (!next_fragment) { next_fragment++; } fragment_continuation = next_fragment; mcast.fragmented = next_fragment++; assert(fragment_continuation != 0); assert(mcast.fragmented != 0); } else { fragment_continuation = 0; } /* * assemble the message and send it */ mcast.msg_count = ++mcast_packed_msg_count; iovecs[0].iov_base = (void *)&mcast; iovecs[0].iov_len = sizeof(struct totempg_mcast); iovecs[1].iov_base = (void *)mcast_packed_msg_lens; iovecs[1].iov_len = mcast_packed_msg_count * sizeof(unsigned short); iovecs[2].iov_base = (void *)data_ptr; iovecs[2].iov_len = max_packet_size; assert (totemmrp_avail() > 0); res = totemmrp_mcast (iovecs, 3, guarantee); if (res == -1) { goto error_exit; } /* * Recalculate counts and indexes for the next. */ mcast_packed_msg_lens[0] = 0; mcast_packed_msg_count = 0; fragment_size = 0; max_packet_size = TOTEMPG_PACKET_SIZE - (sizeof(unsigned short)); /* * If the iovec all fit, go to the next iovec */ if ((copy_base + copy_len) == iovec[i].iov_len) { copy_len = 0; copy_base = 0; i++; /* * Continue with the rest of the current iovec. */ } else { copy_base += copy_len; } } } /* * Bump only if we added message data. This may be zero if * the last buffer just fit into the fragmentation_data buffer * and we were at the last iovec. */ if (mcast_packed_msg_lens[mcast_packed_msg_count]) { mcast_packed_msg_count++; } error_exit: if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&mcast_msg_mutex); } return (res); } /* * Determine if a message of msg_size could be queued */ static int msg_count_send_ok ( int msg_count) { int avail = 0; avail = totemmrp_avail (); totempg_stats.msg_queue_avail = avail; return ((avail - totempg_reserved) > msg_count); } static int byte_count_send_ok ( int byte_count) { unsigned int msg_count = 0; int avail = 0; avail = totemmrp_avail (); msg_count = (byte_count / (totempg_totem_config->net_mtu - sizeof (struct totempg_mcast) - 16)) + 1; return (avail >= msg_count); } static int send_reserve ( int msg_size) { unsigned int msg_count = 0; msg_count = (msg_size / (totempg_totem_config->net_mtu - sizeof (struct totempg_mcast) - 16)) + 1; totempg_reserved += msg_count; totempg_stats.msg_reserved = totempg_reserved; return (msg_count); } static void send_release ( int msg_count) { totempg_reserved -= msg_count; totempg_stats.msg_reserved = totempg_reserved; } int totempg_callback_token_create ( void **handle_out, enum totem_callback_token_type type, int delete, int (*callback_fn) (enum totem_callback_token_type type, const void *), const void *data) { unsigned int res; if (totempg_threaded_mode == 1) { pthread_mutex_lock (&callback_token_mutex); } res = totemmrp_callback_token_create (handle_out, type, delete, callback_fn, data); if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&callback_token_mutex); } return (res); } void totempg_callback_token_destroy ( void *handle_out) { if (totempg_threaded_mode == 1) { pthread_mutex_lock (&callback_token_mutex); } totemmrp_callback_token_destroy (handle_out); if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&callback_token_mutex); } } /* * vi: set autoindent tabstop=4 shiftwidth=4 : */ int totempg_groups_initialize ( void **totempg_groups_instance, void (*deliver_fn) ( unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void (*confchg_fn) ( enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)) { struct totempg_group_instance *instance; if (totempg_threaded_mode == 1) { pthread_mutex_lock (&totempg_mutex); } instance = malloc (sizeof (struct totempg_group_instance)); if (instance == NULL) { goto error_exit; } instance->deliver_fn = deliver_fn; instance->confchg_fn = confchg_fn; instance->groups = 0; instance->groups_cnt = 0; instance->q_level = QB_LOOP_MED; list_init (&instance->list); list_add (&instance->list, &totempg_groups_list); if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&totempg_mutex); } *totempg_groups_instance = instance; return (0); error_exit: if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&totempg_mutex); } return (-1); } int totempg_groups_join ( void *totempg_groups_instance, const struct totempg_group *groups, size_t group_cnt) { struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance; struct totempg_group *new_groups; unsigned int res = 0; if (totempg_threaded_mode == 1) { pthread_mutex_lock (&totempg_mutex); } new_groups = realloc (instance->groups, sizeof (struct totempg_group) * (instance->groups_cnt + group_cnt)); if (new_groups == 0) { res = ENOMEM; goto error_exit; } memcpy (&new_groups[instance->groups_cnt], groups, group_cnt * sizeof (struct totempg_group)); instance->groups = new_groups; instance->groups_cnt += group_cnt; error_exit: if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&totempg_mutex); } return (res); } int totempg_groups_leave ( void *totempg_groups_instance, const struct totempg_group *groups, size_t group_cnt) { if (totempg_threaded_mode == 1) { pthread_mutex_lock (&totempg_mutex); } if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&totempg_mutex); } return (0); } #define MAX_IOVECS_FROM_APP 32 #define MAX_GROUPS_PER_MSG 32 int totempg_groups_mcast_joined ( void *totempg_groups_instance, const struct iovec *iovec, unsigned int iov_len, int guarantee) { struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance; unsigned short group_len[MAX_GROUPS_PER_MSG + 1]; struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP]; int i; unsigned int res; if (totempg_threaded_mode == 1) { pthread_mutex_lock (&totempg_mutex); } /* * Build group_len structure and the iovec_mcast structure */ group_len[0] = instance->groups_cnt; for (i = 0; i < instance->groups_cnt; i++) { group_len[i + 1] = instance->groups[i].group_len; iovec_mcast[i + 1].iov_len = instance->groups[i].group_len; iovec_mcast[i + 1].iov_base = (void *) instance->groups[i].group; } iovec_mcast[0].iov_len = (instance->groups_cnt + 1) * sizeof (unsigned short); iovec_mcast[0].iov_base = group_len; for (i = 0; i < iov_len; i++) { iovec_mcast[i + instance->groups_cnt + 1].iov_len = iovec[i].iov_len; iovec_mcast[i + instance->groups_cnt + 1].iov_base = iovec[i].iov_base; } res = mcast_msg (iovec_mcast, iov_len + instance->groups_cnt + 1, guarantee); if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&totempg_mutex); } return (res); } static void check_q_level( void *totempg_groups_instance) { int32_t old_level; int32_t percent_used = 0; struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance; old_level = instance->q_level; percent_used = 100 - (totemmrp_avail () * 100 / 800); /*(1024*1024/1500)*/ if (percent_used > 90 && instance->q_level != TOTEM_Q_LEVEL_CRITICAL) { instance->q_level = TOTEM_Q_LEVEL_CRITICAL; } else if (percent_used < 30 && instance->q_level != TOTEM_Q_LEVEL_LOW) { instance->q_level = TOTEM_Q_LEVEL_LOW; } else if (percent_used > 40 && percent_used < 60 && instance->q_level != TOTEM_Q_LEVEL_GOOD) { instance->q_level = TOTEM_Q_LEVEL_GOOD; } else if (percent_used > 70 && percent_used < 80 && instance->q_level != TOTEM_Q_LEVEL_HIGH) { instance->q_level = TOTEM_Q_LEVEL_HIGH; } if (totem_queue_level_changed && old_level != instance->q_level) { totem_queue_level_changed(instance->q_level); } } void totempg_check_q_level( void *totempg_groups_instance) { struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance; check_q_level(instance); } int totempg_groups_joined_reserve ( void *totempg_groups_instance, const struct iovec *iovec, unsigned int iov_len) { struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance; unsigned int size = 0; unsigned int i; unsigned int reserved = 0; if (totempg_threaded_mode == 1) { pthread_mutex_lock (&totempg_mutex); pthread_mutex_lock (&mcast_msg_mutex); } for (i = 0; i < instance->groups_cnt; i++) { size += instance->groups[i].group_len; } for (i = 0; i < iov_len; i++) { size += iovec[i].iov_len; } check_q_level(instance); if (size >= totempg_size_limit) { reserved = -1; goto error_exit; } reserved = send_reserve (size); if (msg_count_send_ok (reserved) == 0) { send_release (reserved); reserved = 0; } error_exit: if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&mcast_msg_mutex); pthread_mutex_unlock (&totempg_mutex); } return (reserved); } int totempg_groups_joined_release (int msg_count) { if (totempg_threaded_mode == 1) { pthread_mutex_lock (&totempg_mutex); pthread_mutex_lock (&mcast_msg_mutex); } send_release (msg_count); if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&mcast_msg_mutex); pthread_mutex_unlock (&totempg_mutex); } return 0; } int totempg_groups_mcast_groups ( void *totempg_groups_instance, int guarantee, const struct totempg_group *groups, size_t groups_cnt, const struct iovec *iovec, unsigned int iov_len) { unsigned short group_len[MAX_GROUPS_PER_MSG + 1]; struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP]; int i; unsigned int res; if (totempg_threaded_mode == 1) { pthread_mutex_lock (&totempg_mutex); } /* * Build group_len structure and the iovec_mcast structure */ group_len[0] = groups_cnt; for (i = 0; i < groups_cnt; i++) { group_len[i + 1] = groups[i].group_len; iovec_mcast[i + 1].iov_len = groups[i].group_len; iovec_mcast[i + 1].iov_base = (void *) groups[i].group; } iovec_mcast[0].iov_len = (groups_cnt + 1) * sizeof (unsigned short); iovec_mcast[0].iov_base = group_len; for (i = 0; i < iov_len; i++) { iovec_mcast[i + groups_cnt + 1].iov_len = iovec[i].iov_len; iovec_mcast[i + groups_cnt + 1].iov_base = iovec[i].iov_base; } res = mcast_msg (iovec_mcast, iov_len + groups_cnt + 1, guarantee); if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&totempg_mutex); } return (res); } /* * Returns -1 if error, 0 if can't send, 1 if can send the message */ int totempg_groups_send_ok_groups ( void *totempg_groups_instance, const struct totempg_group *groups, size_t groups_cnt, const struct iovec *iovec, unsigned int iov_len) { unsigned int size = 0; unsigned int i; unsigned int res; if (totempg_threaded_mode == 1) { pthread_mutex_lock (&totempg_mutex); } for (i = 0; i < groups_cnt; i++) { size += groups[i].group_len; } for (i = 0; i < iov_len; i++) { size += iovec[i].iov_len; } res = msg_count_send_ok (size); if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&totempg_mutex); } return (res); } int totempg_ifaces_get ( unsigned int nodeid, struct totem_ip_address *interfaces, char ***status, unsigned int *iface_count) { int res; res = totemmrp_ifaces_get ( nodeid, interfaces, status, iface_count); return (res); } void totempg_event_signal (enum totem_event_type type, int value) { totemmrp_event_signal (type, value); } void* totempg_get_stats (void) { return &totempg_stats; } int totempg_crypto_set ( unsigned int type) { int res; res = totemmrp_crypto_set ( type); return (res); } int totempg_ring_reenable (void) { int res; res = totemmrp_ring_reenable (); return (res); } const char *totempg_ifaces_print (unsigned int nodeid) { static char iface_string[256 * INTERFACE_MAX]; char one_iface[64]; struct totem_ip_address interfaces[INTERFACE_MAX]; char **status; unsigned int iface_count; unsigned int i; int res; iface_string[0] = '\0'; res = totempg_ifaces_get (nodeid, interfaces, &status, &iface_count); if (res == -1) { return ("no interface found for nodeid"); } for (i = 0; i < iface_count; i++) { sprintf (one_iface, "r(%d) ip(%s) ", i, totemip_print (&interfaces[i])); strcat (iface_string, one_iface); } return (iface_string); } unsigned int totempg_my_nodeid_get (void) { return (totemmrp_my_nodeid_get()); } int totempg_my_family_get (void) { return (totemmrp_my_family_get()); } extern void totempg_service_ready_register ( void (*totem_service_ready) (void)) { totemmrp_service_ready_register (totem_service_ready); } void totempg_queue_level_register_callback (totem_queue_level_changed_fn fn) { totem_queue_level_changed = fn; } extern int totempg_member_add ( const struct totem_ip_address *member, - int ring_no); + int ring_no) +{ + return totemmrp_member_add (member, ring_no); +} extern int totempg_member_remove ( const struct totem_ip_address *member, - int ring_no); + int ring_no) +{ + return totemmrp_member_remove (member, ring_no); +} void totempg_threaded_mode_enable (void) { totempg_threaded_mode = 1; totemmrp_threaded_mode_enable (); } diff --git a/exec/totemudpu.c b/exec/totemudpu.c index 21e57c76..f9e07d05 100644 --- a/exec/totemudpu.c +++ b/exec/totemudpu.c @@ -1,1719 +1,1761 @@ /* * Copyright (c) 2005 MontaVista Software, Inc. * Copyright (c) 2006-2009 Red Hat, Inc. * * All rights reserved. * * Author: Steven Dake (sdake@redhat.com) * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define LOGSYS_UTILS_ONLY 1 #include #include "totemudpu.h" #include "crypto.h" #include "util.h" #ifdef HAVE_LIBNSS #include #include #include #include #endif #ifndef MSG_NOSIGNAL #define MSG_NOSIGNAL 0 #endif #define MCAST_SOCKET_BUFFER_SIZE (TRANSMITS_ALLOWED * FRAME_SIZE_MAX) #define NETIF_STATE_REPORT_UP 1 #define NETIF_STATE_REPORT_DOWN 2 #define BIND_STATE_UNBOUND 0 #define BIND_STATE_REGULAR 1 #define BIND_STATE_LOOPBACK 2 #define HMAC_HASH_SIZE 20 struct security_header { unsigned char hash_digest[HMAC_HASH_SIZE]; /* The hash *MUST* be first in the data structure */ unsigned char salt[16]; /* random number */ char msg[0]; } __attribute__((packed)); struct totemudpu_member { struct list_head list; struct totem_ip_address member; int fd; }; struct totemudpu_instance { hmac_state totemudpu_hmac_state; prng_state totemudpu_prng_state; #ifdef HAVE_LIBNSS PK11SymKey *nss_sym_key; PK11SymKey *nss_sym_key_sign; #endif unsigned char totemudpu_private_key[1024]; unsigned int totemudpu_private_key_len; qb_loop_t *totemudpu_poll_handle; struct totem_interface *totem_interface; int netif_state_report; int netif_bind_state; void *context; void (*totemudpu_deliver_fn) ( void *context, const void *msg, unsigned int msg_len); void (*totemudpu_iface_change_fn) ( void *context, const struct totem_ip_address *iface_address); void (*totemudpu_target_set_completed) (void *context); /* * Function and data used to log messages */ int totemudpu_log_level_security; int totemudpu_log_level_error; int totemudpu_log_level_warning; int totemudpu_log_level_notice; int totemudpu_log_level_debug; int totemudpu_subsys_id; void (*totemudpu_log_printf) ( int level, int subsys, const char *function, const char *file, int line, const char *format, ...)__attribute__((format(printf, 6, 7))); void *udpu_context; char iov_buffer[FRAME_SIZE_MAX]; struct iovec totemudpu_iov_recv; struct list_head member_list; int stats_sent; int stats_recv; int stats_delv; int stats_remcasts; int stats_orf_token; struct timeval stats_tv_start; struct totem_ip_address my_id; int firstrun; qb_loop_timer_handle timer_netif_check_timeout; unsigned int my_memb_entries; struct totem_config *totem_config; struct totem_ip_address token_target; int token_socket; }; struct work_item { const void *msg; unsigned int msg_len; struct totemudpu_instance *instance; }; static int totemudpu_build_sockets ( struct totemudpu_instance *instance, struct totem_ip_address *bindnet_address, struct totem_ip_address *bound_to); static struct totem_ip_address localhost; static void totemudpu_instance_initialize (struct totemudpu_instance *instance) { memset (instance, 0, sizeof (struct totemudpu_instance)); instance->netif_state_report = NETIF_STATE_REPORT_UP | NETIF_STATE_REPORT_DOWN; instance->totemudpu_iov_recv.iov_base = instance->iov_buffer; instance->totemudpu_iov_recv.iov_len = FRAME_SIZE_MAX; //sizeof (instance->iov_buffer); /* * There is always atleast 1 processor */ instance->my_memb_entries = 1; list_init (&instance->member_list); } #define log_printf(level, format, args...) \ do { \ instance->totemudpu_log_printf ( \ level, instance->totemudpu_subsys_id, \ __FUNCTION__, __FILE__, __LINE__, \ (const char *)format, ##args); \ } while (0); #define LOGSYS_PERROR(err_num, level, fmt, args...) \ do { \ char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \ const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \ instance->totemudpu_log_printf ( \ level, instance->totemudpu_subsys_id, \ __FUNCTION__, __FILE__, __LINE__, \ fmt ": %s (%d)\n", ##args, _error_ptr, err_num); \ } while(0) static int authenticate_and_decrypt_sober ( struct totemudpu_instance *instance, struct iovec *iov, unsigned int iov_len) { unsigned char keys[48]; struct security_header *header = (struct security_header *)iov[0].iov_base; prng_state keygen_prng_state; prng_state stream_prng_state; unsigned char *hmac_key = &keys[32]; unsigned char *cipher_key = &keys[16]; unsigned char *initial_vector = &keys[0]; unsigned char digest_comparison[HMAC_HASH_SIZE]; unsigned long len; /* * Generate MAC, CIPHER, IV keys from private key */ memset (keys, 0, sizeof (keys)); sober128_start (&keygen_prng_state); sober128_add_entropy (instance->totemudpu_private_key, instance->totemudpu_private_key_len, &keygen_prng_state); sober128_add_entropy (header->salt, sizeof (header->salt), &keygen_prng_state); sober128_read (keys, sizeof (keys), &keygen_prng_state); /* * Setup stream cipher */ sober128_start (&stream_prng_state); sober128_add_entropy (cipher_key, 16, &stream_prng_state); sober128_add_entropy (initial_vector, 16, &stream_prng_state); /* * Authenticate contents of message */ hmac_init (&instance->totemudpu_hmac_state, DIGEST_SHA1, hmac_key, 16); hmac_process (&instance->totemudpu_hmac_state, (unsigned char *)iov->iov_base + HMAC_HASH_SIZE, iov->iov_len - HMAC_HASH_SIZE); len = hash_descriptor[DIGEST_SHA1]->hashsize; assert (HMAC_HASH_SIZE >= len); hmac_done (&instance->totemudpu_hmac_state, digest_comparison, &len); if (memcmp (digest_comparison, header->hash_digest, len) != 0) { return (-1); } /* * Decrypt the contents of the message with the cipher key */ sober128_read ((unsigned char*)iov->iov_base + sizeof (struct security_header), iov->iov_len - sizeof (struct security_header), &stream_prng_state); return (0); } static void init_sober_crypto( struct totemudpu_instance *instance) { log_printf(instance->totemudpu_log_level_notice, "Initializing transmit/receive security: libtomcrypt SOBER128/SHA1HMAC (mode 0).\n"); rng_make_prng (128, PRNG_SOBER, &instance->totemudpu_prng_state, NULL); } #ifdef HAVE_LIBNSS static unsigned char *copy_from_iovec( const struct iovec *iov, unsigned int iov_len, size_t *buf_size) { int i; size_t bufptr; size_t buflen = 0; unsigned char *newbuf; for (i=0; i buf_size) { copylen = buf_size - bufptr; } memcpy(iov[i].iov_base, buf+bufptr, copylen); bufptr += copylen; if (iov[i].iov_len != copylen) { iov[i].iov_len = copylen; return; } } } static void init_nss_crypto( struct totemudpu_instance *instance) { PK11SlotInfo* aes_slot = NULL; PK11SlotInfo* sha1_slot = NULL; SECItem key_item; SECStatus rv; log_printf(instance->totemudpu_log_level_notice, "Initializing transmit/receive security: NSS AES128CBC/SHA1HMAC (mode 1).\n"); rv = NSS_NoDB_Init("."); if (rv != SECSuccess) { log_printf(instance->totemudpu_log_level_security, "NSS initialization failed (err %d)\n", PR_GetError()); goto out; } aes_slot = PK11_GetBestSlot(instance->totem_config->crypto_crypt_type, NULL); if (aes_slot == NULL) { log_printf(instance->totemudpu_log_level_security, "Unable to find security slot (err %d)\n", PR_GetError()); goto out; } sha1_slot = PK11_GetBestSlot(CKM_SHA_1_HMAC, NULL); if (sha1_slot == NULL) { log_printf(instance->totemudpu_log_level_security, "Unable to find security slot (err %d)\n", PR_GetError()); goto out; } /* * Make the private key into a SymKey that we can use */ key_item.type = siBuffer; key_item.data = instance->totem_config->private_key; key_item.len = 32; /* Use 128 bits */ instance->nss_sym_key = PK11_ImportSymKey(aes_slot, instance->totem_config->crypto_crypt_type, PK11_OriginUnwrap, CKA_ENCRYPT|CKA_DECRYPT, &key_item, NULL); if (instance->nss_sym_key == NULL) { log_printf(instance->totemudpu_log_level_security, "Failure to import key into NSS (err %d)\n", PR_GetError()); goto out; } instance->nss_sym_key_sign = PK11_ImportSymKey(sha1_slot, CKM_SHA_1_HMAC, PK11_OriginUnwrap, CKA_SIGN, &key_item, NULL); if (instance->nss_sym_key_sign == NULL) { log_printf(instance->totemudpu_log_level_security, "Failure to import key into NSS (err %d)\n", PR_GetError()); goto out; } out: return; } static int encrypt_and_sign_nss ( struct totemudpu_instance *instance, unsigned char *buf, size_t *buf_len, const struct iovec *iovec, unsigned int iov_len) { PK11Context* enc_context = NULL; SECStatus rv1, rv2; int tmp1_outlen; unsigned int tmp2_outlen; unsigned char *inbuf; unsigned char *data; unsigned char *outdata; size_t datalen; SECItem no_params; SECItem iv_item; struct security_header *header; SECItem *nss_sec_param; unsigned char nss_iv_data[16]; SECStatus rv; no_params.type = siBuffer; no_params.data = 0; no_params.len = 0; tmp1_outlen = tmp2_outlen = 0; inbuf = copy_from_iovec(iovec, iov_len, &datalen); if (!inbuf) { log_printf(instance->totemudpu_log_level_security, "malloc error copying buffer from iovec\n"); return -1; } data = inbuf + sizeof (struct security_header); datalen -= sizeof (struct security_header); outdata = buf + sizeof (struct security_header); header = (struct security_header *)buf; rv = PK11_GenerateRandom ( nss_iv_data, sizeof (nss_iv_data)); if (rv != SECSuccess) { log_printf(instance->totemudpu_log_level_security, "Failure to generate a random number %d\n", PR_GetError()); } memcpy(header->salt, nss_iv_data, sizeof(nss_iv_data)); iv_item.type = siBuffer; iv_item.data = nss_iv_data; iv_item.len = sizeof (nss_iv_data); nss_sec_param = PK11_ParamFromIV ( instance->totem_config->crypto_crypt_type, &iv_item); if (nss_sec_param == NULL) { log_printf(instance->totemudpu_log_level_security, "Failure to set up PKCS11 param (err %d)\n", PR_GetError()); free (inbuf); return (-1); } /* * Create cipher context for encryption */ enc_context = PK11_CreateContextBySymKey ( instance->totem_config->crypto_crypt_type, CKA_ENCRYPT, instance->nss_sym_key, nss_sec_param); if (!enc_context) { char err[1024]; PR_GetErrorText(err); err[PR_GetErrorTextLength()] = 0; log_printf(instance->totemudpu_log_level_security, "PK11_CreateContext failed (encrypt) crypt_type=%d (err %d): %s\n", instance->totem_config->crypto_crypt_type, PR_GetError(), err); free(inbuf); return -1; } rv1 = PK11_CipherOp(enc_context, outdata, &tmp1_outlen, FRAME_SIZE_MAX - sizeof(struct security_header), data, datalen); rv2 = PK11_DigestFinal(enc_context, outdata + tmp1_outlen, &tmp2_outlen, FRAME_SIZE_MAX - tmp1_outlen); PK11_DestroyContext(enc_context, PR_TRUE); *buf_len = tmp1_outlen + tmp2_outlen; free(inbuf); // memcpy(&outdata[*buf_len], nss_iv_data, sizeof(nss_iv_data)); if (rv1 != SECSuccess || rv2 != SECSuccess) goto out; /* Now do the digest */ enc_context = PK11_CreateContextBySymKey(CKM_SHA_1_HMAC, CKA_SIGN, instance->nss_sym_key_sign, &no_params); if (!enc_context) { char err[1024]; PR_GetErrorText(err); err[PR_GetErrorTextLength()] = 0; log_printf(instance->totemudpu_log_level_security, "encrypt: PK11_CreateContext failed (digest) err %d: %s\n", PR_GetError(), err); return -1; } PK11_DigestBegin(enc_context); rv1 = PK11_DigestOp(enc_context, outdata - 16, *buf_len + 16); rv2 = PK11_DigestFinal(enc_context, header->hash_digest, &tmp2_outlen, sizeof(header->hash_digest)); PK11_DestroyContext(enc_context, PR_TRUE); if (rv1 != SECSuccess || rv2 != SECSuccess) goto out; *buf_len = *buf_len + sizeof(struct security_header); SECITEM_FreeItem(nss_sec_param, PR_TRUE); return 0; out: return -1; } static int authenticate_and_decrypt_nss ( struct totemudpu_instance *instance, struct iovec *iov, unsigned int iov_len) { PK11Context* enc_context = NULL; SECStatus rv1, rv2; int tmp1_outlen; unsigned int tmp2_outlen; unsigned char outbuf[FRAME_SIZE_MAX]; unsigned char digest[HMAC_HASH_SIZE]; unsigned char *outdata; int result_len; unsigned char *data; unsigned char *inbuf; size_t datalen; struct security_header *header = (struct security_header *)iov[0].iov_base; SECItem no_params; SECItem ivdata; no_params.type = siBuffer; no_params.data = 0; no_params.len = 0; tmp1_outlen = tmp2_outlen = 0; if (iov_len > 1) { inbuf = copy_from_iovec(iov, iov_len, &datalen); if (!inbuf) { log_printf(instance->totemudpu_log_level_security, "malloc error copying buffer from iovec\n"); return -1; } } else { inbuf = (unsigned char *)iov[0].iov_base; datalen = iov[0].iov_len; } data = inbuf + sizeof (struct security_header) - 16; datalen = datalen - sizeof (struct security_header) + 16; outdata = outbuf + sizeof (struct security_header); /* Check the digest */ enc_context = PK11_CreateContextBySymKey ( CKM_SHA_1_HMAC, CKA_SIGN, instance->nss_sym_key_sign, &no_params); if (!enc_context) { char err[1024]; PR_GetErrorText(err); err[PR_GetErrorTextLength()] = 0; log_printf(instance->totemudpu_log_level_security, "PK11_CreateContext failed (check digest) err %d: %s\n", PR_GetError(), err); free (inbuf); return -1; } PK11_DigestBegin(enc_context); rv1 = PK11_DigestOp(enc_context, data, datalen); rv2 = PK11_DigestFinal(enc_context, digest, &tmp2_outlen, sizeof(digest)); PK11_DestroyContext(enc_context, PR_TRUE); if (rv1 != SECSuccess || rv2 != SECSuccess) { log_printf(instance->totemudpu_log_level_security, "Digest check failed\n"); return -1; } if (memcmp(digest, header->hash_digest, tmp2_outlen) != 0) { log_printf(instance->totemudpu_log_level_error, "Digest does not match\n"); return -1; } /* * Get rid of salt */ data += 16; datalen -= 16; /* Create cipher context for decryption */ ivdata.type = siBuffer; ivdata.data = header->salt; ivdata.len = sizeof(header->salt); enc_context = PK11_CreateContextBySymKey( instance->totem_config->crypto_crypt_type, CKA_DECRYPT, instance->nss_sym_key, &ivdata); if (!enc_context) { log_printf(instance->totemudpu_log_level_security, "PK11_CreateContext (decrypt) failed (err %d)\n", PR_GetError()); return -1; } rv1 = PK11_CipherOp(enc_context, outdata, &tmp1_outlen, sizeof(outbuf) - sizeof (struct security_header), data, datalen); if (rv1 != SECSuccess) { log_printf(instance->totemudpu_log_level_security, "PK11_CipherOp (decrypt) failed (err %d)\n", PR_GetError()); } rv2 = PK11_DigestFinal(enc_context, outdata + tmp1_outlen, &tmp2_outlen, sizeof(outbuf) - tmp1_outlen); PK11_DestroyContext(enc_context, PR_TRUE); result_len = tmp1_outlen + tmp2_outlen + sizeof (struct security_header); /* Copy it back to the buffer */ copy_to_iovec(iov, iov_len, outbuf, result_len); if (iov_len > 1) free(inbuf); if (rv1 != SECSuccess || rv2 != SECSuccess) return -1; return 0; } #endif static int encrypt_and_sign_sober ( struct totemudpu_instance *instance, unsigned char *buf, size_t *buf_len, const struct iovec *iovec, unsigned int iov_len) { int i; unsigned char *addr; unsigned char keys[48]; struct security_header *header; unsigned char *hmac_key = &keys[32]; unsigned char *cipher_key = &keys[16]; unsigned char *initial_vector = &keys[0]; unsigned long len; size_t outlen = 0; hmac_state hmac_st; prng_state keygen_prng_state; prng_state stream_prng_state; prng_state *prng_state_in = &instance->totemudpu_prng_state; header = (struct security_header *)buf; addr = buf + sizeof (struct security_header); memset (keys, 0, sizeof (keys)); memset (header->salt, 0, sizeof (header->salt)); /* * Generate MAC, CIPHER, IV keys from private key */ sober128_read (header->salt, sizeof (header->salt), prng_state_in); sober128_start (&keygen_prng_state); sober128_add_entropy (instance->totemudpu_private_key, instance->totemudpu_private_key_len, &keygen_prng_state); sober128_add_entropy (header->salt, sizeof (header->salt), &keygen_prng_state); sober128_read (keys, sizeof (keys), &keygen_prng_state); /* * Setup stream cipher */ sober128_start (&stream_prng_state); sober128_add_entropy (cipher_key, 16, &stream_prng_state); sober128_add_entropy (initial_vector, 16, &stream_prng_state); outlen = sizeof (struct security_header); /* * Copy remainder of message, then encrypt it */ for (i = 1; i < iov_len; i++) { memcpy (addr, iovec[i].iov_base, iovec[i].iov_len); addr += iovec[i].iov_len; outlen += iovec[i].iov_len; } /* * Encrypt message by XORing stream cipher data */ sober128_read (buf + sizeof (struct security_header), outlen - sizeof (struct security_header), &stream_prng_state); memset (&hmac_st, 0, sizeof (hmac_st)); /* * Sign the contents of the message with the hmac key and store signature in message */ hmac_init (&hmac_st, DIGEST_SHA1, hmac_key, 16); hmac_process (&hmac_st, buf + HMAC_HASH_SIZE, outlen - HMAC_HASH_SIZE); len = hash_descriptor[DIGEST_SHA1]->hashsize; hmac_done (&hmac_st, header->hash_digest, &len); *buf_len = outlen; return 0; } static int encrypt_and_sign_worker ( struct totemudpu_instance *instance, unsigned char *buf, size_t *buf_len, const struct iovec *iovec, unsigned int iov_len) { if (instance->totem_config->crypto_type == TOTEM_CRYPTO_SOBER || instance->totem_config->crypto_accept == TOTEM_CRYPTO_ACCEPT_OLD) return encrypt_and_sign_sober(instance, buf, buf_len, iovec, iov_len); #ifdef HAVE_LIBNSS if (instance->totem_config->crypto_type == TOTEM_CRYPTO_NSS) return encrypt_and_sign_nss(instance, buf, buf_len, iovec, iov_len); #endif return -1; } static int authenticate_and_decrypt ( struct totemudpu_instance *instance, struct iovec *iov, unsigned int iov_len) { unsigned char type; unsigned char *endbuf = (unsigned char *)iov[iov_len-1].iov_base; int res = -1; /* * Get the encryption type and remove it from the buffer */ type = endbuf[iov[iov_len-1].iov_len-1]; iov[iov_len-1].iov_len -= 1; if (type == TOTEM_CRYPTO_SOBER) res = authenticate_and_decrypt_sober(instance, iov, iov_len); /* * Only try higher crypto options if NEW has been requested */ if (instance->totem_config->crypto_accept == TOTEM_CRYPTO_ACCEPT_NEW) { #ifdef HAVE_LIBNSS if (type == TOTEM_CRYPTO_NSS) res = authenticate_and_decrypt_nss(instance, iov, iov_len); #endif } /* * If it failed, then try decrypting the whole packet as it might be * from aisexec */ if (res == -1) { iov[iov_len-1].iov_len += 1; res = authenticate_and_decrypt_sober(instance, iov, iov_len); } return res; } static void init_crypto( struct totemudpu_instance *instance) { /* * If we are expecting NEW crypto type then initialise all available * crypto options. For OLD then we only need SOBER128. */ init_sober_crypto(instance); if (instance->totem_config->crypto_accept == TOTEM_CRYPTO_ACCEPT_OLD) return; #ifdef HAVE_LIBNSS init_nss_crypto(instance); #endif } int totemudpu_crypto_set ( void *udpu_context, unsigned int type) { struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; int res = 0; /* * Can't set crypto type if OLD is selected */ if (instance->totem_config->crypto_accept == TOTEM_CRYPTO_ACCEPT_OLD) { res = -1; } else { /* * Validate crypto algorithm */ switch (type) { case TOTEM_CRYPTO_SOBER: log_printf(instance->totemudpu_log_level_security, "Transmit security set to: libtomcrypt SOBER128/SHA1HMAC (mode 0)"); break; case TOTEM_CRYPTO_NSS: log_printf(instance->totemudpu_log_level_security, "Transmit security set to: NSS AES128CBC/SHA1HMAC (mode 1)"); break; default: res = -1; break; } } return (res); } static inline void ucast_sendmsg ( struct totemudpu_instance *instance, struct totem_ip_address *system_to, const void *msg, unsigned int msg_len) { struct msghdr msg_ucast; int res = 0; size_t buf_len; unsigned char sheader[sizeof (struct security_header)]; unsigned char encrypt_data[FRAME_SIZE_MAX]; struct iovec iovec_encrypt[2]; const struct iovec *iovec_sendmsg; struct sockaddr_storage sockaddr; struct iovec iovec; unsigned int iov_len; int addrlen; if (instance->totem_config->secauth == 1) { iovec_encrypt[0].iov_base = (void *)sheader; iovec_encrypt[0].iov_len = sizeof (struct security_header); iovec_encrypt[1].iov_base = (void *)msg; iovec_encrypt[1].iov_len = msg_len; /* * Encrypt and digest the message */ encrypt_and_sign_worker ( instance, encrypt_data, &buf_len, iovec_encrypt, 2); if (instance->totem_config->crypto_accept == TOTEM_CRYPTO_ACCEPT_NEW) { encrypt_data[buf_len++] = instance->totem_config->crypto_type; } else { encrypt_data[buf_len++] = 0; } iovec_encrypt[0].iov_base = (void *)encrypt_data; iovec_encrypt[0].iov_len = buf_len; iovec_sendmsg = &iovec_encrypt[0]; iov_len = 1; } else { iovec.iov_base = (void *)msg; iovec.iov_len = msg_len; iovec_sendmsg = &iovec; iov_len = 1; } /* * Build unicast message */ totemip_totemip_to_sockaddr_convert(system_to, instance->totem_interface->ip_port, &sockaddr, &addrlen); msg_ucast.msg_name = &sockaddr; msg_ucast.msg_namelen = addrlen; msg_ucast.msg_iov = (void *) iovec_sendmsg; msg_ucast.msg_iovlen = iov_len; #if !defined(COROSYNC_SOLARIS) msg_ucast.msg_control = 0; msg_ucast.msg_controllen = 0; msg_ucast.msg_flags = 0; #else msg_ucast.msg_accrights = NULL; msg_ucast.msg_accrightslen = 0; #endif /* * Transmit unicast message * An error here is recovered by totemsrp */ res = sendmsg (instance->token_socket, &msg_ucast, MSG_NOSIGNAL); if (res < 0) { LOGSYS_PERROR (errno, instance->totemudpu_log_level_debug, "sendmsg(ucast) failed (non-critical)"); } } static inline void mcast_sendmsg ( struct totemudpu_instance *instance, const void *msg, unsigned int msg_len) { struct msghdr msg_mcast; int res = 0; size_t buf_len; unsigned char sheader[sizeof (struct security_header)]; unsigned char encrypt_data[FRAME_SIZE_MAX]; struct iovec iovec_encrypt[2]; struct iovec iovec; const struct iovec *iovec_sendmsg; struct sockaddr_storage sockaddr; unsigned int iov_len; int addrlen; struct list_head *list; struct totemudpu_member *member; if (instance->totem_config->secauth == 1) { iovec_encrypt[0].iov_base = (void *)sheader; iovec_encrypt[0].iov_len = sizeof (struct security_header); iovec_encrypt[1].iov_base = (void *)msg; iovec_encrypt[1].iov_len = msg_len; /* * Encrypt and digest the message */ encrypt_and_sign_worker ( instance, encrypt_data, &buf_len, iovec_encrypt, 2); if (instance->totem_config->crypto_accept == TOTEM_CRYPTO_ACCEPT_NEW) { encrypt_data[buf_len++] = instance->totem_config->crypto_type; } else { encrypt_data[buf_len++] = 0; } iovec_encrypt[0].iov_base = (void *)encrypt_data; iovec_encrypt[0].iov_len = buf_len; iovec_sendmsg = &iovec_encrypt[0]; iov_len = 1; } else { iovec.iov_base = (void *)msg; iovec.iov_len = msg_len; iovec_sendmsg = &iovec; iov_len = 1; } /* * Build multicast message */ for (list = instance->member_list.next; list != &instance->member_list; list = list->next) { member = list_entry (list, struct totemudpu_member, list); totemip_totemip_to_sockaddr_convert(&member->member, instance->totem_interface->ip_port, &sockaddr, &addrlen); msg_mcast.msg_name = &sockaddr; msg_mcast.msg_namelen = addrlen; msg_mcast.msg_iov = (void *) iovec_sendmsg; msg_mcast.msg_iovlen = iov_len; #if !defined(COROSYNC_SOLARIS) msg_mcast.msg_control = 0; msg_mcast.msg_controllen = 0; msg_mcast.msg_flags = 0; #else msg_mcast.msg_accrights = NULL; msg_mcast.msg_accrightslen = 0; #endif /* * Transmit multicast message * An error here is recovered by totemsrp */ res = sendmsg (member->fd, &msg_mcast, MSG_NOSIGNAL); if (res < 0) { LOGSYS_PERROR (errno, instance->totemudpu_log_level_debug, "sendmsg(mcast) failed (non-critical)"); } } } int totemudpu_finalize ( void *udpu_context) { struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; int res = 0; if (instance->token_socket > 0) { close (instance->token_socket); qb_loop_poll_del (instance->totemudpu_poll_handle, instance->token_socket); } return (res); } static int net_deliver_fn ( int fd, int revents, void *data) { struct totemudpu_instance *instance = (struct totemudpu_instance *)data; struct msghdr msg_recv; struct iovec *iovec; struct sockaddr_storage system_from; int bytes_received; int res = 0; unsigned char *msg_offset; unsigned int size_delv; iovec = &instance->totemudpu_iov_recv; /* * Receive datagram */ msg_recv.msg_name = &system_from; msg_recv.msg_namelen = sizeof (struct sockaddr_storage); msg_recv.msg_iov = iovec; msg_recv.msg_iovlen = 1; #if !defined(COROSYNC_SOLARIS) msg_recv.msg_control = 0; msg_recv.msg_controllen = 0; msg_recv.msg_flags = 0; #else msg_recv.msg_accrights = NULL; msg_recv.msg_accrightslen = 0; #endif bytes_received = recvmsg (fd, &msg_recv, MSG_NOSIGNAL | MSG_DONTWAIT); if (bytes_received == -1) { return (0); } else { instance->stats_recv += bytes_received; } if ((instance->totem_config->secauth == 1) && (bytes_received < sizeof (struct security_header))) { log_printf (instance->totemudpu_log_level_security, "Received message is too short... ignoring %d.\n", bytes_received); return (0); } iovec->iov_len = bytes_received; if (instance->totem_config->secauth == 1) { /* * Authenticate and if authenticated, decrypt datagram */ res = authenticate_and_decrypt (instance, iovec, 1); if (res == -1) { log_printf (instance->totemudpu_log_level_security, "Received message has invalid digest... ignoring.\n"); log_printf (instance->totemudpu_log_level_security, "Invalid packet data\n"); iovec->iov_len = FRAME_SIZE_MAX; return 0; } msg_offset = (unsigned char *)iovec->iov_base + sizeof (struct security_header); size_delv = bytes_received - sizeof (struct security_header); } else { msg_offset = (void *)iovec->iov_base; size_delv = bytes_received; } /* * Handle incoming message */ instance->totemudpu_deliver_fn ( instance->context, msg_offset, size_delv); iovec->iov_len = FRAME_SIZE_MAX; return (0); } static int netif_determine ( struct totemudpu_instance *instance, struct totem_ip_address *bindnet, struct totem_ip_address *bound_to, int *interface_up, int *interface_num) { int res; res = totemip_iface_check (bindnet, bound_to, interface_up, interface_num, instance->totem_config->clear_node_high_bit); return (res); } /* * If the interface is up, the sockets for totem are built. If the interface is down * this function is requeued in the timer list to retry building the sockets later. */ static void timer_function_netif_check_timeout ( void *data) { struct totemudpu_instance *instance = (struct totemudpu_instance *)data; int interface_up; int interface_num; struct totem_ip_address *bind_address; /* * Build sockets for every interface */ netif_determine (instance, &instance->totem_interface->bindnet, &instance->totem_interface->boundto, &interface_up, &interface_num); /* * If the network interface isn't back up and we are already * in loopback mode, add timer to check again and return */ if ((instance->netif_bind_state == BIND_STATE_LOOPBACK && interface_up == 0) || (instance->my_memb_entries == 1 && instance->netif_bind_state == BIND_STATE_REGULAR && interface_up == 1)) { qb_loop_timer_add (instance->totemudpu_poll_handle, QB_LOOP_MED, instance->totem_config->downcheck_timeout*QB_TIME_NS_IN_MSEC, (void *)instance, timer_function_netif_check_timeout, &instance->timer_netif_check_timeout); /* * Add a timer to check for a downed regular interface */ return; } if (instance->token_socket > 0) { close (instance->token_socket); qb_loop_poll_del (instance->totemudpu_poll_handle, instance->token_socket); } if (interface_up == 0) { /* * Interface is not up */ instance->netif_bind_state = BIND_STATE_LOOPBACK; bind_address = &localhost; /* * Add a timer to retry building interfaces and request memb_gather_enter */ qb_loop_timer_add (instance->totemudpu_poll_handle, QB_LOOP_MED, instance->totem_config->downcheck_timeout*QB_TIME_NS_IN_MSEC, (void *)instance, timer_function_netif_check_timeout, &instance->timer_netif_check_timeout); } else { /* * Interface is up */ instance->netif_bind_state = BIND_STATE_REGULAR; bind_address = &instance->totem_interface->bindnet; } /* * Create and bind the multicast and unicast sockets */ totemudpu_build_sockets (instance, bind_address, &instance->totem_interface->boundto); qb_loop_poll_add (instance->totemudpu_poll_handle, QB_LOOP_MED, instance->token_socket, POLLIN, instance, net_deliver_fn); totemip_copy (&instance->my_id, &instance->totem_interface->boundto); /* * This reports changes in the interface to the user and totemsrp */ if (instance->netif_bind_state == BIND_STATE_REGULAR) { if (instance->netif_state_report & NETIF_STATE_REPORT_UP) { log_printf (instance->totemudpu_log_level_notice, "The network interface [%s] is now up.\n", totemip_print (&instance->totem_interface->boundto)); instance->netif_state_report = NETIF_STATE_REPORT_DOWN; instance->totemudpu_iface_change_fn (instance->context, &instance->my_id); } /* * Add a timer to check for interface going down in single membership */ if (instance->my_memb_entries == 1) { qb_loop_timer_add (instance->totemudpu_poll_handle, QB_LOOP_MED, instance->totem_config->downcheck_timeout*QB_TIME_NS_IN_MSEC, (void *)instance, timer_function_netif_check_timeout, &instance->timer_netif_check_timeout); } } else { if (instance->netif_state_report & NETIF_STATE_REPORT_DOWN) { log_printf (instance->totemudpu_log_level_notice, "The network interface is down.\n"); instance->totemudpu_iface_change_fn (instance->context, &instance->my_id); } instance->netif_state_report = NETIF_STATE_REPORT_UP; } } /* Set the socket priority to INTERACTIVE to ensure that our messages don't get queued behind anything else */ static void totemudpu_traffic_control_set(struct totemudpu_instance *instance, int sock) { #ifdef SO_PRIORITY int prio = 6; /* TC_PRIO_INTERACTIVE */ if (setsockopt(sock, SOL_SOCKET, SO_PRIORITY, &prio, sizeof(int))) { LOGSYS_PERROR (errno, instance->totemudpu_log_level_warning, "Could not set traffic priority"); } #endif } static int totemudpu_build_sockets_ip ( struct totemudpu_instance *instance, struct totem_ip_address *bindnet_address, struct totem_ip_address *bound_to, int interface_num) { struct sockaddr_storage sockaddr; int addrlen; int res; unsigned int recvbuf_size; unsigned int optlen = sizeof (recvbuf_size); /* * Setup unicast socket */ instance->token_socket = socket (bindnet_address->family, SOCK_DGRAM, 0); if (instance->token_socket == -1) { LOGSYS_PERROR (errno, instance->totemudpu_log_level_warning, "socket() failed"); return (-1); } totemip_nosigpipe (instance->token_socket); res = fcntl (instance->token_socket, F_SETFL, O_NONBLOCK); if (res == -1) { LOGSYS_PERROR (errno, instance->totemudpu_log_level_warning, "Could not set non-blocking operation on token socket"); return (-1); } /* * Bind to unicast socket used for token send/receives * This has the side effect of binding to the correct interface */ totemip_totemip_to_sockaddr_convert(bound_to, instance->totem_interface->ip_port, &sockaddr, &addrlen); res = bind (instance->token_socket, (struct sockaddr *)&sockaddr, addrlen); if (res == -1) { LOGSYS_PERROR (errno, instance->totemudpu_log_level_warning, "bind token socket failed"); return (-1); } /* * the token_socket can receive many messages. Allow a large number * of receive messages on this socket */ recvbuf_size = MCAST_SOCKET_BUFFER_SIZE; res = setsockopt (instance->token_socket, SOL_SOCKET, SO_RCVBUF, &recvbuf_size, optlen); if (res == -1) { LOGSYS_PERROR (errno, instance->totemudpu_log_level_notice, "Could not set recvbuf size"); } return 0; } static int totemudpu_build_sockets ( struct totemudpu_instance *instance, struct totem_ip_address *bindnet_address, struct totem_ip_address *bound_to) { int interface_num; int interface_up; int res; /* * Determine the ip address bound to and the interface name */ res = netif_determine (instance, bindnet_address, bound_to, &interface_up, &interface_num); if (res == -1) { return (-1); } totemip_copy(&instance->my_id, bound_to); res = totemudpu_build_sockets_ip (instance, bindnet_address, bound_to, interface_num); /* We only send out of the token socket */ totemudpu_traffic_control_set(instance, instance->token_socket); return res; } /* * Totem Network interface - also does encryption/decryption * depends on poll abstraction, POSIX, IPV4 */ /* * Create an instance */ int totemudpu_initialize ( qb_loop_t *poll_handle, void **udpu_context, struct totem_config *totem_config, int interface_no, void *context, void (*deliver_fn) ( void *context, const void *msg, unsigned int msg_len), void (*iface_change_fn) ( void *context, const struct totem_ip_address *iface_address), void (*target_set_completed) ( void *context)) { struct totemudpu_instance *instance; instance = malloc (sizeof (struct totemudpu_instance)); if (instance == NULL) { return (-1); } totemudpu_instance_initialize (instance); instance->totem_config = totem_config; /* * Configure logging */ instance->totemudpu_log_level_security = 1; //totem_config->totem_logging_configuration.log_level_security; instance->totemudpu_log_level_error = totem_config->totem_logging_configuration.log_level_error; instance->totemudpu_log_level_warning = totem_config->totem_logging_configuration.log_level_warning; instance->totemudpu_log_level_notice = totem_config->totem_logging_configuration.log_level_notice; instance->totemudpu_log_level_debug = totem_config->totem_logging_configuration.log_level_debug; instance->totemudpu_subsys_id = totem_config->totem_logging_configuration.log_subsys_id; instance->totemudpu_log_printf = totem_config->totem_logging_configuration.log_printf; /* * Initialize random number generator for later use to generate salt */ memcpy (instance->totemudpu_private_key, totem_config->private_key, totem_config->private_key_len); instance->totemudpu_private_key_len = totem_config->private_key_len; init_crypto(instance); /* * Initialize local variables for totemudpu */ instance->totem_interface = &totem_config->interfaces[interface_no]; memset (instance->iov_buffer, 0, FRAME_SIZE_MAX); instance->totemudpu_poll_handle = poll_handle; instance->totem_interface->bindnet.nodeid = instance->totem_config->node_id; instance->context = context; instance->totemudpu_deliver_fn = deliver_fn; instance->totemudpu_iface_change_fn = iface_change_fn; instance->totemudpu_target_set_completed = target_set_completed; totemip_localhost (AF_INET, &localhost); localhost.nodeid = instance->totem_config->node_id; /* * RRP layer isn't ready to receive message because it hasn't * initialized yet. Add short timer to check the interfaces. */ qb_loop_timer_add (instance->totemudpu_poll_handle, QB_LOOP_MED, 100*QB_TIME_NS_IN_MSEC, (void *)instance, timer_function_netif_check_timeout, &instance->timer_netif_check_timeout); *udpu_context = instance; return (0); } void *totemudpu_buffer_alloc (void) { return malloc (FRAME_SIZE_MAX); } void totemudpu_buffer_release (void *ptr) { return free (ptr); } int totemudpu_processor_count_set ( void *udpu_context, int processor_count) { struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; int res = 0; instance->my_memb_entries = processor_count; qb_loop_timer_del (instance->totemudpu_poll_handle, instance->timer_netif_check_timeout); if (processor_count == 1) { qb_loop_timer_add (instance->totemudpu_poll_handle, QB_LOOP_MED, instance->totem_config->downcheck_timeout*QB_TIME_NS_IN_MSEC, (void *)instance, timer_function_netif_check_timeout, &instance->timer_netif_check_timeout); } return (res); } int totemudpu_recv_flush (void *udpu_context) { int res = 0; return (res); } int totemudpu_send_flush (void *udpu_context) { int res = 0; return (res); } int totemudpu_token_send ( void *udpu_context, const void *msg, unsigned int msg_len) { struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; int res = 0; ucast_sendmsg (instance, &instance->token_target, msg, msg_len); return (res); } int totemudpu_mcast_flush_send ( void *udpu_context, const void *msg, unsigned int msg_len) { struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; int res = 0; mcast_sendmsg (instance, msg, msg_len); return (res); } int totemudpu_mcast_noflush_send ( void *udpu_context, const void *msg, unsigned int msg_len) { struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; int res = 0; mcast_sendmsg (instance, msg, msg_len); return (res); } extern int totemudpu_iface_check (void *udpu_context) { struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; int res = 0; timer_function_netif_check_timeout (instance); return (res); } extern void totemudpu_net_mtu_adjust (void *udpu_context, struct totem_config *totem_config) { #define UDPIP_HEADER_SIZE (20 + 8) /* 20 bytes for ip 8 bytes for udp */ if (totem_config->secauth == 1) { totem_config->net_mtu -= sizeof (struct security_header) + UDPIP_HEADER_SIZE; } else { totem_config->net_mtu -= UDPIP_HEADER_SIZE; } } const char *totemudpu_iface_print (void *udpu_context) { struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; const char *ret_char; ret_char = totemip_print (&instance->my_id); return (ret_char); } int totemudpu_iface_get ( void *udpu_context, struct totem_ip_address *addr) { struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; int res = 0; memcpy (addr, &instance->my_id, sizeof (struct totem_ip_address)); return (res); } int totemudpu_token_target_set ( void *udpu_context, const struct totem_ip_address *token_target) { struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; int res = 0; memcpy (&instance->token_target, token_target, sizeof (struct totem_ip_address)); instance->totemudpu_target_set_completed (instance->context); return (res); } extern int totemudpu_recv_mcast_empty ( void *udpu_context) { struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; unsigned int res; struct sockaddr_storage system_from; struct msghdr msg_recv; struct pollfd ufd; int nfds; int msg_processed = 0; /* * Receive datagram */ msg_recv.msg_name = &system_from; msg_recv.msg_namelen = sizeof (struct sockaddr_storage); msg_recv.msg_iov = &instance->totemudpu_iov_recv; msg_recv.msg_iovlen = 1; #if !defined(COROSYNC_SOLARIS) msg_recv.msg_control = 0; msg_recv.msg_controllen = 0; msg_recv.msg_flags = 0; #else msg_recv.msg_accrights = NULL; msg_recv.msg_accrightslen = 0; #endif do { ufd.fd = instance->token_socket; ufd.events = POLLIN; nfds = poll (&ufd, 1, 0); if (nfds == 1 && ufd.revents & POLLIN) { res = recvmsg (instance->token_socket, &msg_recv, MSG_NOSIGNAL | MSG_DONTWAIT); if (res != -1) { msg_processed = 1; } else { msg_processed = -1; } } } while (nfds == 1); return (msg_processed); } int totemudpu_member_add ( void *udpu_context, const struct totem_ip_address *member) { struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; struct totemudpu_member *new_member; int res; unsigned int sendbuf_size; unsigned int optlen = sizeof (sendbuf_size); new_member = malloc (sizeof (struct totemudpu_member)); if (new_member == NULL) { return (-1); } + log_printf (LOGSYS_LEVEL_NOTICE, "adding new UDPU member {%s}\n", + totemip_print(member)); list_init (&new_member->list); list_add_tail (&new_member->list, &instance->member_list); memcpy (&new_member->member, member, sizeof (struct totem_ip_address)); new_member->fd = socket (member->family, SOCK_DGRAM, 0); if (new_member->fd == -1) { LOGSYS_PERROR (errno, instance->totemudpu_log_level_warning, "Could not create socket for new member"); return (-1); } totemip_nosigpipe (new_member->fd); res = fcntl (new_member->fd, F_SETFL, O_NONBLOCK); if (res == -1) { LOGSYS_PERROR (errno, instance->totemudpu_log_level_warning, "Could not set non-blocking operation on token socket"); return (-1); } /* * These sockets are used to send multicast messages, so their buffers * should be large */ sendbuf_size = MCAST_SOCKET_BUFFER_SIZE; res = setsockopt (new_member->fd, SOL_SOCKET, SO_SNDBUF, &sendbuf_size, optlen); if (res == -1) { LOGSYS_PERROR (errno, instance->totemudpu_log_level_notice, "Could not set sendbuf size"); } return (0); } int totemudpu_member_remove ( void *udpu_context, const struct totem_ip_address *token_target) { + int found = 0; + struct list_head *list; + struct totemudpu_member *member; + struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; + /* + * Find the member to remove and close its socket + */ + for (list = instance->member_list.next; + list != &instance->member_list; + list = list->next) { + + member = list_entry (list, + struct totemudpu_member, + list); + + if (totemip_compare (token_target, &member->member)==0) { + log_printf(LOGSYS_LEVEL_NOTICE, + "removing UDPU member {%s}\n", + totemip_print(&member->member)); + + if (member->fd > 0) { + log_printf(LOGSYS_LEVEL_DEBUG, + "Closing socket to: {%s}\n", + totemip_print(&member->member)); + qb_loop_poll_del (instance->totemudpu_poll_handle, + member->fd); + close (member->fd); + } + found = 1; + break; + } + } + + /* + * Delete the member from the list + */ + if (found) { + list_del (list); + } + instance = NULL; return (0); }