diff --git a/exec/main.c b/exec/main.c index fde77da5..582f1e2c 100644 --- a/exec/main.c +++ b/exec/main.c @@ -1,1540 +1,1540 @@ /* * Copyright (c) 2002-2006 MontaVista Software, Inc. * Copyright (c) 2006-2009 Red Hat, Inc. * * All rights reserved. * * Author: Steven Dake (sdake@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ /** * \mainpage Corosync * * This is the doxygen generated developer documentation for the Corosync * project. For more information about Corosync, please see the project * web site, corosync.org. * * \section license License * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "quorum.h" #include "totemsrp.h" #include "mainconfig.h" #include "totemconfig.h" #include "main.h" #include "sync.h" #include "syncv2.h" #include "timer.h" #include "util.h" #include "apidef.h" #include "service.h" #include "schedwrk.h" #include "evil.h" #ifdef HAVE_SMALL_MEMORY_FOOTPRINT #define IPC_LOGSYS_SIZE 1024*64 #else #define IPC_LOGSYS_SIZE 8192*128 #endif LOGSYS_DECLARE_SYSTEM ("corosync", LOGSYS_MODE_OUTPUT_STDERR, LOG_DAEMON, LOG_INFO); LOGSYS_DECLARE_SUBSYS ("MAIN"); #define SERVER_BACKLOG 5 static int sched_priority = 0; static unsigned int service_count = 32; static struct totem_logging_configuration totem_logging_configuration; static int num_config_modules; static struct config_iface_ver0 *config_modules[MAX_DYNAMIC_SERVICES]; static struct objdb_iface_ver0 *objdb = NULL; static struct corosync_api_v1 *api = NULL; static enum cs_sync_mode minimum_sync_mode; static int sync_in_process = 1; static qb_loop_t *corosync_poll_handle; struct sched_param global_sched_param; static hdb_handle_t object_memb_handle; static corosync_timer_handle_t corosync_stats_timer_handle; static const char *corosync_lock_file = LOCALSTATEDIR"/run/corosync.pid"; qb_loop_t *cs_poll_handle_get (void) { return (corosync_poll_handle); } int cs_poll_dispatch_add (qb_loop_t * handle, int fd, int events, void *data, int (*dispatch_fn) (int fd, int revents, void *data)) { return qb_loop_poll_add(handle, QB_LOOP_MED, fd, events, data, dispatch_fn); } int cs_poll_dispatch_delete(qb_loop_t * handle, int fd) { return qb_loop_poll_del(handle, fd); } void corosync_state_dump (void) { int i; for (i = 0; i < SERVICE_HANDLER_MAXIMUM_COUNT; i++) { if (ais_service[i] && ais_service[i]->exec_dump_fn) { ais_service[i]->exec_dump_fn (); } } } static void unlink_all_completed (void) { api->timer_delete (corosync_stats_timer_handle); qb_loop_stop (corosync_poll_handle); } void corosync_shutdown_request (void) { corosync_service_unlink_all (api, unlink_all_completed); } static int32_t sig_diag_handler (int num, void *data) { corosync_state_dump (); qb_log_blackbox_write_to_file(LOCALSTATEDIR "/lib/corosync/fdata"); return 0; } static int32_t sig_exit_handler (int num, void *data) { corosync_service_unlink_all (api, unlink_all_completed); return 0; } static void sigsegv_handler (int num) { (void)signal (SIGSEGV, SIG_DFL); qb_log_blackbox_write_to_file(LOCALSTATEDIR "/lib/corosync/fdata"); qb_log_fini(); raise (SIGSEGV); } static void sigabrt_handler (int num) { (void)signal (SIGABRT, SIG_DFL); qb_log_blackbox_write_to_file(LOCALSTATEDIR "/lib/corosync/fdata"); qb_log_fini(); raise (SIGABRT); } #define LOCALHOST_IP inet_addr("127.0.0.1") -static hdb_handle_t corosync_group_handle; +static void *corosync_group_handle; static struct totempg_group corosync_group = { .group = "a", .group_len = 1 }; static void serialize_lock (void) { } static void serialize_unlock (void) { } static void corosync_sync_completed (void) { log_printf (LOGSYS_LEVEL_NOTICE, "Completed service synchronization, ready to provide service.\n"); sync_in_process = 0; cs_ipcs_sync_state_changed(sync_in_process); } static int corosync_sync_callbacks_retrieve (int sync_id, struct sync_callbacks *callbacks) { unsigned int ais_service_index; int res; for (ais_service_index = 0; ais_service_index < SERVICE_HANDLER_MAXIMUM_COUNT; ais_service_index++) { if (ais_service[ais_service_index] != NULL && (ais_service[ais_service_index]->sync_mode == CS_SYNC_V1 || ais_service[ais_service_index]->sync_mode == CS_SYNC_V1_APIV2)) { if (ais_service_index == sync_id) { break; } } } /* * Try to load backwards compat sync engines */ if (ais_service_index == SERVICE_HANDLER_MAXIMUM_COUNT) { res = evil_callbacks_load (sync_id, callbacks); return (res); } callbacks->name = ais_service[ais_service_index]->name; callbacks->sync_init_api.sync_init_v1 = ais_service[ais_service_index]->sync_init; callbacks->api_version = 1; if (ais_service[ais_service_index]->sync_mode == CS_SYNC_V1_APIV2) { callbacks->api_version = 2; } callbacks->sync_process = ais_service[ais_service_index]->sync_process; callbacks->sync_activate = ais_service[ais_service_index]->sync_activate; callbacks->sync_abort = ais_service[ais_service_index]->sync_abort; return (0); } static int corosync_sync_v2_callbacks_retrieve ( int service_id, struct sync_callbacks *callbacks) { int res; if (minimum_sync_mode == CS_SYNC_V2 && service_id == CLM_SERVICE && ais_service[CLM_SERVICE] == NULL) { res = evil_callbacks_load (service_id, callbacks); return (res); } if (minimum_sync_mode == CS_SYNC_V2 && service_id == EVT_SERVICE && ais_service[EVT_SERVICE] == NULL) { res = evil_callbacks_load (service_id, callbacks); return (res); } if (ais_service[service_id] == NULL) { return (-1); } if (minimum_sync_mode == CS_SYNC_V1 && ais_service[service_id]->sync_mode != CS_SYNC_V2) { return (-1); } callbacks->name = ais_service[service_id]->name; callbacks->api_version = 1; if (ais_service[service_id]->sync_mode == CS_SYNC_V1_APIV2) { callbacks->api_version = 2; } callbacks->sync_init_api.sync_init_v1 = ais_service[service_id]->sync_init; callbacks->sync_process = ais_service[service_id]->sync_process; callbacks->sync_activate = ais_service[service_id]->sync_activate; callbacks->sync_abort = ais_service[service_id]->sync_abort; return (0); } static struct memb_ring_id corosync_ring_id; static void member_object_joined (unsigned int nodeid) { hdb_handle_t object_find_handle; hdb_handle_t object_node_handle; char * nodeint_str; char nodeid_str[64]; unsigned int key_incr_dummy; snprintf (nodeid_str, 64, "%d", nodeid); objdb->object_find_create ( object_memb_handle, nodeid_str, strlen (nodeid_str), &object_find_handle); if (objdb->object_find_next (object_find_handle, &object_node_handle) == 0) { objdb->object_key_increment (object_node_handle, "join_count", strlen("join_count"), &key_incr_dummy); objdb->object_key_replace (object_node_handle, "status", strlen("status"), "joined", strlen("joined")); } else { nodeint_str = (char*)api->totem_ifaces_print (nodeid); objdb->object_create (object_memb_handle, &object_node_handle, nodeid_str, strlen (nodeid_str)); objdb->object_key_create_typed (object_node_handle, "ip", nodeint_str, strlen(nodeint_str), OBJDB_VALUETYPE_STRING); key_incr_dummy = 1; objdb->object_key_create_typed (object_node_handle, "join_count", &key_incr_dummy, sizeof (key_incr_dummy), OBJDB_VALUETYPE_UINT32); objdb->object_key_create_typed (object_node_handle, "status", "joined", strlen("joined"), OBJDB_VALUETYPE_STRING); } } static void member_object_left (unsigned int nodeid) { hdb_handle_t object_find_handle; hdb_handle_t object_node_handle; char nodeid_str[64]; snprintf (nodeid_str, 64, "%u", nodeid); objdb->object_find_create ( object_memb_handle, nodeid_str, strlen (nodeid_str), &object_find_handle); if (objdb->object_find_next (object_find_handle, &object_node_handle) == 0) { objdb->object_key_replace (object_node_handle, "status", strlen("status"), "left", strlen("left")); } } static void confchg_fn ( enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id) { int i; int abort_activate = 0; if (sync_in_process == 1) { abort_activate = 1; } sync_in_process = 1; cs_ipcs_sync_state_changed(sync_in_process); memcpy (&corosync_ring_id, ring_id, sizeof (struct memb_ring_id)); for (i = 0; i < left_list_entries; i++) { member_object_left (left_list[i]); } for (i = 0; i < joined_list_entries; i++) { member_object_joined (joined_list[i]); } /* * Call configuration change for all services */ for (i = 0; i < service_count; i++) { if (ais_service[i] && ais_service[i]->confchg_fn) { ais_service[i]->confchg_fn (configuration_type, member_list, member_list_entries, left_list, left_list_entries, joined_list, joined_list_entries, ring_id); } } if (abort_activate) { sync_v2_abort (); } if (minimum_sync_mode == CS_SYNC_V2 && configuration_type == TOTEM_CONFIGURATION_TRANSITIONAL) { sync_v2_save_transitional (member_list, member_list_entries, ring_id); } if (minimum_sync_mode == CS_SYNC_V2 && configuration_type == TOTEM_CONFIGURATION_REGULAR) { sync_v2_start (member_list, member_list_entries, ring_id); } } static void priv_drop (void) { return; /* TODO: we are still not dropping privs */ } static void corosync_tty_detach (void) { FILE *r; /* * Disconnect from TTY if this is not a debug run */ switch (fork ()) { case -1: corosync_exit_error (AIS_DONE_FORK); break; case 0: /* * child which is disconnected, run this process */ break; default: exit (0); break; } /* Create new session */ (void)setsid(); /* * Map stdin/out/err to /dev/null. */ r = freopen("/dev/null", "r", stdin); if (r == NULL) { corosync_exit_error (AIS_DONE_STD_TO_NULL_REDIR); } r = freopen("/dev/null", "a", stderr); if (r == NULL) { corosync_exit_error (AIS_DONE_STD_TO_NULL_REDIR); } r = freopen("/dev/null", "a", stdout); if (r == NULL) { corosync_exit_error (AIS_DONE_STD_TO_NULL_REDIR); } } static void corosync_mlockall (void) { #if !defined(COROSYNC_BSD) || defined(COROSYNC_FREEBSD_GE_8) int res; #endif struct rlimit rlimit; rlimit.rlim_cur = RLIM_INFINITY; rlimit.rlim_max = RLIM_INFINITY; #ifndef COROSYNC_SOLARIS setrlimit (RLIMIT_MEMLOCK, &rlimit); #else setrlimit (RLIMIT_VMEM, &rlimit); #endif #if defined(COROSYNC_BSD) && !defined(COROSYNC_FREEBSD_GE_8) /* under FreeBSD < 8 a process with locked page cannot call dlopen * code disabled until FreeBSD bug i386/93396 was solved */ log_printf (LOGSYS_LEVEL_WARNING, "Could not lock memory of service to avoid page faults\n"); #else res = mlockall (MCL_CURRENT | MCL_FUTURE); if (res == -1) { LOGSYS_PERROR (errno, LOGSYS_LEVEL_WARNING, "Could not lock memory of service to avoid page faults"); }; #endif } static void corosync_totem_stats_updater (void *data) { totempg_stats_t * stats; uint32_t mtt_rx_token; uint32_t total_mtt_rx_token; uint32_t avg_backlog_calc; uint32_t total_backlog_calc; uint32_t avg_token_holdtime; uint32_t total_token_holdtime; int t, prev; int32_t token_count; uint32_t firewall_enabled_or_nic_failure; stats = api->totem_get_stats(); objdb->object_key_replace (stats->hdr.handle, "msg_reserved", strlen("msg_reserved"), &stats->msg_reserved, sizeof (stats->msg_reserved)); objdb->object_key_replace (stats->hdr.handle, "msg_queue_avail", strlen("msg_queue_avail"), &stats->msg_queue_avail, sizeof (stats->msg_queue_avail)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "orf_token_tx", strlen("orf_token_tx"), &stats->mrp->srp->orf_token_tx, sizeof (stats->mrp->srp->orf_token_tx)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "orf_token_rx", strlen("orf_token_rx"), &stats->mrp->srp->orf_token_rx, sizeof (stats->mrp->srp->orf_token_rx)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "memb_merge_detect_tx", strlen("memb_merge_detect_tx"), &stats->mrp->srp->memb_merge_detect_tx, sizeof (stats->mrp->srp->memb_merge_detect_tx)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "memb_merge_detect_rx", strlen("memb_merge_detect_rx"), &stats->mrp->srp->memb_merge_detect_rx, sizeof (stats->mrp->srp->memb_merge_detect_rx)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "memb_join_tx", strlen("memb_join_tx"), &stats->mrp->srp->memb_join_tx, sizeof (stats->mrp->srp->memb_join_tx)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "memb_join_rx", strlen("memb_join_rx"), &stats->mrp->srp->memb_join_rx, sizeof (stats->mrp->srp->memb_join_rx)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "mcast_tx", strlen("mcast_tx"), &stats->mrp->srp->mcast_tx, sizeof (stats->mrp->srp->mcast_tx)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "mcast_retx", strlen("mcast_retx"), &stats->mrp->srp->mcast_retx, sizeof (stats->mrp->srp->mcast_retx)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "mcast_rx", strlen("mcast_rx"), &stats->mrp->srp->mcast_rx, sizeof (stats->mrp->srp->mcast_rx)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "memb_commit_token_tx", strlen("memb_commit_token_tx"), &stats->mrp->srp->memb_commit_token_tx, sizeof (stats->mrp->srp->memb_commit_token_tx)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "memb_commit_token_rx", strlen("memb_commit_token_rx"), &stats->mrp->srp->memb_commit_token_rx, sizeof (stats->mrp->srp->memb_commit_token_rx)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "token_hold_cancel_tx", strlen("token_hold_cancel_tx"), &stats->mrp->srp->token_hold_cancel_tx, sizeof (stats->mrp->srp->token_hold_cancel_tx)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "token_hold_cancel_rx", strlen("token_hold_cancel_rx"), &stats->mrp->srp->token_hold_cancel_rx, sizeof (stats->mrp->srp->token_hold_cancel_rx)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "operational_entered", strlen("operational_entered"), &stats->mrp->srp->operational_entered, sizeof (stats->mrp->srp->operational_entered)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "operational_token_lost", strlen("operational_token_lost"), &stats->mrp->srp->operational_token_lost, sizeof (stats->mrp->srp->operational_token_lost)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "gather_entered", strlen("gather_entered"), &stats->mrp->srp->gather_entered, sizeof (stats->mrp->srp->gather_entered)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "gather_token_lost", strlen("gather_token_lost"), &stats->mrp->srp->gather_token_lost, sizeof (stats->mrp->srp->gather_token_lost)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "commit_entered", strlen("commit_entered"), &stats->mrp->srp->commit_entered, sizeof (stats->mrp->srp->commit_entered)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "commit_token_lost", strlen("commit_token_lost"), &stats->mrp->srp->commit_token_lost, sizeof (stats->mrp->srp->commit_token_lost)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "recovery_entered", strlen("recovery_entered"), &stats->mrp->srp->recovery_entered, sizeof (stats->mrp->srp->recovery_entered)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "recovery_token_lost", strlen("recovery_token_lost"), &stats->mrp->srp->recovery_token_lost, sizeof (stats->mrp->srp->recovery_token_lost)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "consensus_timeouts", strlen("consensus_timeouts"), &stats->mrp->srp->consensus_timeouts, sizeof (stats->mrp->srp->consensus_timeouts)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "rx_msg_dropped", strlen("rx_msg_dropped"), &stats->mrp->srp->rx_msg_dropped, sizeof (stats->mrp->srp->rx_msg_dropped)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "continuous_gather", strlen("continuous_gather"), &stats->mrp->srp->continuous_gather, sizeof (stats->mrp->srp->continuous_gather)); firewall_enabled_or_nic_failure = (stats->mrp->srp->continuous_gather > MAX_NO_CONT_GATHER ? 1 : 0); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "firewall_enabled_or_nic_failure", strlen("firewall_enabled_or_nic_failure"), &firewall_enabled_or_nic_failure, sizeof (firewall_enabled_or_nic_failure)); total_mtt_rx_token = 0; total_token_holdtime = 0; total_backlog_calc = 0; token_count = 0; t = stats->mrp->srp->latest_token; while (1) { if (t == 0) prev = TOTEM_TOKEN_STATS_MAX - 1; else prev = t - 1; if (prev == stats->mrp->srp->earliest_token) break; /* if tx == 0, then dropped token (not ours) */ if (stats->mrp->srp->token[t].tx != 0 || (stats->mrp->srp->token[t].rx - stats->mrp->srp->token[prev].rx) > 0 ) { total_mtt_rx_token += (stats->mrp->srp->token[t].rx - stats->mrp->srp->token[prev].rx); total_token_holdtime += (stats->mrp->srp->token[t].tx - stats->mrp->srp->token[t].rx); total_backlog_calc += stats->mrp->srp->token[t].backlog_calc; token_count++; } t = prev; } if (token_count) { mtt_rx_token = (total_mtt_rx_token / token_count); avg_backlog_calc = (total_backlog_calc / token_count); avg_token_holdtime = (total_token_holdtime / token_count); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "mtt_rx_token", strlen("mtt_rx_token"), &mtt_rx_token, sizeof (mtt_rx_token)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "avg_token_workload", strlen("avg_token_workload"), &avg_token_holdtime, sizeof (avg_token_holdtime)); objdb->object_key_replace (stats->mrp->srp->hdr.handle, "avg_backlog_calc", strlen("avg_backlog_calc"), &avg_backlog_calc, sizeof (avg_backlog_calc)); } cs_ipcs_stats_update(); api->timer_add_duration (1500 * MILLI_2_NANO_SECONDS, NULL, corosync_totem_stats_updater, &corosync_stats_timer_handle); } static void corosync_totem_stats_init (void) { totempg_stats_t * stats; hdb_handle_t object_find_handle; hdb_handle_t object_runtime_handle; hdb_handle_t object_totem_handle; uint32_t zero_32 = 0; uint64_t zero_64 = 0; stats = api->totem_get_stats(); objdb->object_find_create ( OBJECT_PARENT_HANDLE, "runtime", strlen ("runtime"), &object_find_handle); if (objdb->object_find_next (object_find_handle, &object_runtime_handle) == 0) { objdb->object_create (object_runtime_handle, &object_totem_handle, "totem", strlen ("totem")); objdb->object_create (object_totem_handle, &stats->hdr.handle, "pg", strlen ("pg")); objdb->object_create (stats->hdr.handle, &stats->mrp->hdr.handle, "mrp", strlen ("mrp")); objdb->object_create (stats->mrp->hdr.handle, &stats->mrp->srp->hdr.handle, "srp", strlen ("srp")); objdb->object_key_create_typed (stats->hdr.handle, "msg_reserved", &stats->msg_reserved, sizeof (stats->msg_reserved), OBJDB_VALUETYPE_UINT32); objdb->object_key_create_typed (stats->hdr.handle, "msg_queue_avail", &stats->msg_queue_avail, sizeof (stats->msg_queue_avail), OBJDB_VALUETYPE_UINT32); /* Members object */ objdb->object_create (stats->mrp->srp->hdr.handle, &object_memb_handle, "members", strlen ("members")); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "orf_token_tx", &stats->mrp->srp->orf_token_tx, sizeof (stats->mrp->srp->orf_token_tx), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "orf_token_rx", &stats->mrp->srp->orf_token_rx, sizeof (stats->mrp->srp->orf_token_rx), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "memb_merge_detect_tx", &stats->mrp->srp->memb_merge_detect_tx, sizeof (stats->mrp->srp->memb_merge_detect_tx), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "memb_merge_detect_rx", &stats->mrp->srp->memb_merge_detect_rx, sizeof (stats->mrp->srp->memb_merge_detect_rx), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "memb_join_tx", &stats->mrp->srp->memb_join_tx, sizeof (stats->mrp->srp->memb_join_tx), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "memb_join_rx", &stats->mrp->srp->memb_join_rx, sizeof (stats->mrp->srp->memb_join_rx), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "mcast_tx", &stats->mrp->srp->mcast_tx, sizeof (stats->mrp->srp->mcast_tx), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "mcast_retx", &stats->mrp->srp->mcast_retx, sizeof (stats->mrp->srp->mcast_retx), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "mcast_rx", &stats->mrp->srp->mcast_rx, sizeof (stats->mrp->srp->mcast_rx), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "memb_commit_token_tx", &stats->mrp->srp->memb_commit_token_tx, sizeof (stats->mrp->srp->memb_commit_token_tx), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "memb_commit_token_rx", &stats->mrp->srp->memb_commit_token_rx, sizeof (stats->mrp->srp->memb_commit_token_rx), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "token_hold_cancel_tx", &stats->mrp->srp->token_hold_cancel_tx, sizeof (stats->mrp->srp->token_hold_cancel_tx), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "token_hold_cancel_rx", &stats->mrp->srp->token_hold_cancel_rx, sizeof (stats->mrp->srp->token_hold_cancel_rx), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "operational_entered", &stats->mrp->srp->operational_entered, sizeof (stats->mrp->srp->operational_entered), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "operational_token_lost", &stats->mrp->srp->operational_token_lost, sizeof (stats->mrp->srp->operational_token_lost), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "gather_entered", &stats->mrp->srp->gather_entered, sizeof (stats->mrp->srp->gather_entered), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "gather_token_lost", &stats->mrp->srp->gather_token_lost, sizeof (stats->mrp->srp->gather_token_lost), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "commit_entered", &stats->mrp->srp->commit_entered, sizeof (stats->mrp->srp->commit_entered), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "commit_token_lost", &stats->mrp->srp->commit_token_lost, sizeof (stats->mrp->srp->commit_token_lost), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "recovery_entered", &stats->mrp->srp->recovery_entered, sizeof (stats->mrp->srp->recovery_entered), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "recovery_token_lost", &stats->mrp->srp->recovery_token_lost, sizeof (stats->mrp->srp->recovery_token_lost), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "consensus_timeouts", &stats->mrp->srp->consensus_timeouts, sizeof (stats->mrp->srp->consensus_timeouts), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "mtt_rx_token", &zero_32, sizeof (zero_32), OBJDB_VALUETYPE_UINT32); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "avg_token_workload", &zero_32, sizeof (zero_32), OBJDB_VALUETYPE_UINT32); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "avg_backlog_calc", &zero_32, sizeof (zero_32), OBJDB_VALUETYPE_UINT32); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "rx_msg_dropped", &zero_64, sizeof (zero_64), OBJDB_VALUETYPE_UINT64); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "continuous_gather", &zero_32, sizeof (zero_32), OBJDB_VALUETYPE_UINT32); objdb->object_key_create_typed (stats->mrp->srp->hdr.handle, "firewall_enabled_or_nic_failure", &zero_32, sizeof (zero_32), OBJDB_VALUETYPE_UINT32); } /* start stats timer */ api->timer_add_duration (1500 * MILLI_2_NANO_SECONDS, NULL, corosync_totem_stats_updater, &corosync_stats_timer_handle); } static void deliver_fn ( unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required) { const struct qb_ipc_request_header *header; int32_t service; int32_t fn_id; uint32_t id; uint32_t key_incr_dummy; header = msg; if (endian_conversion_required) { id = swab32 (header->id); } else { id = header->id; } /* * Call the proper executive handler */ service = id >> 16; fn_id = id & 0xffff; if (ais_service[service] == NULL && service == EVT_SERVICE) { evil_deliver_fn (nodeid, service, fn_id, msg, endian_conversion_required); } if (!ais_service[service]) { return; } if (fn_id >= ais_service[service]->exec_engine_count) { log_printf(LOGSYS_LEVEL_WARNING, "discarded unknown message %d for service %d (max id %d)", fn_id, service, ais_service[service]->exec_engine_count); return; } objdb->object_key_increment (service_stats_handle[service][fn_id], "rx", strlen("rx"), &key_incr_dummy); if (endian_conversion_required) { assert(ais_service[service]->exec_engine[fn_id].exec_endian_convert_fn != NULL); ais_service[service]->exec_engine[fn_id].exec_endian_convert_fn ((void *)msg); } ais_service[service]->exec_engine[fn_id].exec_handler_fn (msg, nodeid); } void main_get_config_modules(struct config_iface_ver0 ***modules, int *num) { *modules = config_modules; *num = num_config_modules; } int main_mcast ( const struct iovec *iovec, unsigned int iov_len, unsigned int guarantee) { const struct qb_ipc_request_header *req = iovec->iov_base; int32_t service; int32_t fn_id; uint32_t key_incr_dummy; service = req->id >> 16; fn_id = req->id & 0xffff; if (ais_service[service]) { objdb->object_key_increment (service_stats_handle[service][fn_id], "tx", strlen("tx"), &key_incr_dummy); } return (totempg_groups_mcast_joined (corosync_group_handle, iovec, iov_len, guarantee)); } static qb_loop_timer_handle recheck_the_q_level_timer; void corosync_recheck_the_q_level(void *data) { totempg_check_q_level(corosync_group_handle); if (cs_ipcs_q_level_get() == TOTEM_Q_LEVEL_CRITICAL) { qb_loop_timer_add(cs_poll_handle_get(), QB_LOOP_MED, 1*QB_TIME_NS_IN_MSEC, NULL, corosync_recheck_the_q_level, &recheck_the_q_level_timer); } } struct sending_allowed_private_data_struct { int reserved_msgs; }; int corosync_sending_allowed ( unsigned int service, unsigned int id, const void *msg, void *sending_allowed_private_data) { struct sending_allowed_private_data_struct *pd = (struct sending_allowed_private_data_struct *)sending_allowed_private_data; struct iovec reserve_iovec; struct qb_ipc_request_header *header = (struct qb_ipc_request_header *)msg; int sending_allowed; reserve_iovec.iov_base = (char *)header; reserve_iovec.iov_len = header->size; pd->reserved_msgs = totempg_groups_joined_reserve ( corosync_group_handle, &reserve_iovec, 1); if (pd->reserved_msgs == -1) { return -EINVAL; } sending_allowed = QB_FALSE; if (corosync_quorum_is_quorate() == 1 || ais_service[service]->allow_inquorate == CS_LIB_ALLOW_INQUORATE) { // we are quorate // now check flow control if (ais_service[service]->lib_engine[id].flow_control == CS_LIB_FLOW_CONTROL_NOT_REQUIRED) { sending_allowed = QB_TRUE; } else if (pd->reserved_msgs && sync_in_process == 0) { sending_allowed = QB_TRUE; } else if (pd->reserved_msgs == 0) { return -ENOBUFS; } else /* (sync_in_process) */ { return -EINPROGRESS; } } else { return -EHOSTUNREACH; } return (sending_allowed); } void corosync_sending_allowed_release (void *sending_allowed_private_data) { struct sending_allowed_private_data_struct *pd = (struct sending_allowed_private_data_struct *)sending_allowed_private_data; if (pd->reserved_msgs == -1) { return; } totempg_groups_joined_release (pd->reserved_msgs); } int message_source_is_local (const mar_message_source_t *source) { int ret = 0; assert (source != NULL); if (source->nodeid == totempg_my_nodeid_get ()) { ret = 1; } return ret; } void message_source_set ( mar_message_source_t *source, void *conn) { assert ((source != NULL) && (conn != NULL)); memset (source, 0, sizeof (mar_message_source_t)); source->nodeid = totempg_my_nodeid_get (); source->conn = conn; } static void corosync_setscheduler (void) { #if defined(HAVE_PTHREAD_SETSCHEDPARAM) && defined(HAVE_SCHED_GET_PRIORITY_MAX) && defined(HAVE_SCHED_SETSCHEDULER) int res; sched_priority = sched_get_priority_max (SCHED_RR); if (sched_priority != -1) { global_sched_param.sched_priority = sched_priority; res = sched_setscheduler (0, SCHED_RR, &global_sched_param); if (res == -1) { LOGSYS_PERROR(errno, LOGSYS_LEVEL_WARNING, "Could not set SCHED_RR at priority %d", global_sched_param.sched_priority); global_sched_param.sched_priority = 0; #ifdef HAVE_QB_LOG_THREAD_PRIORITY_SET qb_log_thread_priority_set (SCHED_OTHER, 0); #endif } else { /* * Turn on SCHED_RR in logsys system */ #ifdef HAVE_QB_LOG_THREAD_PRIORITY_SET res = qb_log_thread_priority_set (SCHED_RR, sched_priority); #else res = -1; #endif if (res == -1) { log_printf (LOGSYS_LEVEL_ERROR, "Could not set logsys thread priority." " Can't continue because of priority inversions."); corosync_exit_error (AIS_DONE_LOGSETUP); } } } else { LOGSYS_PERROR (errno, LOGSYS_LEVEL_WARNING, "Could not get maximum scheduler priority"); sched_priority = 0; } #else log_printf(LOGSYS_LEVEL_WARNING, "The Platform is missing process priority setting features. Leaving at default."); #endif } static void _logsys_log_printf(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format, ...) __attribute__((format(printf, 6, 7))); static void _logsys_log_printf(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format, ...) { va_list ap; char buf[QB_LOG_MAX_LEN]; size_t len; va_start(ap, format); qb_log_from_external_source_va(function_name, file_name, format, level, file_line, subsys, ap); va_end(ap); } static void fplay_key_change_notify_fn ( object_change_type_t change_type, hdb_handle_t parent_object_handle, hdb_handle_t object_handle, const void *object_name_pt, size_t object_name_len, const void *key_name_pt, size_t key_len, const void *key_value_pt, size_t key_value_len, void *priv_data_pt) { if (key_len == strlen ("dump_flight_data") && memcmp ("dump_flight_data", key_name_pt, key_len) == 0) { qb_log_blackbox_write_to_file (LOCALSTATEDIR "/lib/corosync/fdata"); } if (key_len == strlen ("dump_state") && memcmp ("dump_state", key_name_pt, key_len) == 0) { corosync_state_dump (); } } static void corosync_fplay_control_init (void) { hdb_handle_t object_find_handle; hdb_handle_t object_runtime_handle; hdb_handle_t object_blackbox_handle; objdb->object_find_create (OBJECT_PARENT_HANDLE, "runtime", strlen ("runtime"), &object_find_handle); if (objdb->object_find_next (object_find_handle, &object_runtime_handle) != 0) { return; } objdb->object_create (object_runtime_handle, &object_blackbox_handle, "blackbox", strlen ("blackbox")); objdb->object_key_create_typed (object_blackbox_handle, "dump_flight_data", "no", strlen("no"), OBJDB_VALUETYPE_STRING); objdb->object_key_create_typed (object_blackbox_handle, "dump_state", "no", strlen("no"), OBJDB_VALUETYPE_STRING); objdb->object_track_start (object_blackbox_handle, OBJECT_TRACK_DEPTH_RECURSIVE, fplay_key_change_notify_fn, NULL, NULL, NULL, NULL); } static void main_service_ready (void) { int res; /* * This must occur after totempg is initialized because "this_ip" must be set */ res = corosync_service_defaults_link_and_init (api); if (res == -1) { log_printf (LOGSYS_LEVEL_ERROR, "Could not initialize default services\n"); corosync_exit_error (AIS_DONE_INIT_SERVICES); } evil_init (api); cs_ipcs_init(); corosync_totem_stats_init (); corosync_fplay_control_init (); if (minimum_sync_mode == CS_SYNC_V2) { log_printf (LOGSYS_LEVEL_NOTICE, "Compatibility mode set to none. Using V2 of the synchronization engine.\n"); sync_v2_init ( corosync_sync_v2_callbacks_retrieve, corosync_sync_completed); } else if (minimum_sync_mode == CS_SYNC_V1) { log_printf (LOGSYS_LEVEL_NOTICE, "Compatibility mode set to whitetank. Using V1 and V2 of the synchronization engine.\n"); sync_register ( corosync_sync_callbacks_retrieve, sync_v2_memb_list_determine, sync_v2_memb_list_abort, sync_v2_start); sync_v2_init ( corosync_sync_v2_callbacks_retrieve, corosync_sync_completed); } } static enum e_ais_done corosync_flock (const char *lockfile, pid_t pid) { struct flock lock; enum e_ais_done err; char pid_s[17]; int fd_flag; int lf; err = AIS_DONE_EXIT; lf = open (lockfile, O_WRONLY | O_CREAT, 0640); if (lf == -1) { log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't create lock file.\n"); return (AIS_DONE_AQUIRE_LOCK); } retry_fcntl: lock.l_type = F_WRLCK; lock.l_start = 0; lock.l_whence = SEEK_SET; lock.l_len = 0; if (fcntl (lf, F_SETLK, &lock) == -1) { switch (errno) { case EINTR: goto retry_fcntl; break; case EAGAIN: case EACCES: log_printf (LOGSYS_LEVEL_ERROR, "Another Corosync instance is already running.\n"); err = AIS_DONE_ALREADY_RUNNING; goto error_close; break; default: log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't aquire lock. Error was %s\n", strerror(errno)); err = AIS_DONE_AQUIRE_LOCK; goto error_close; break; } } if (ftruncate (lf, 0) == -1) { log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't truncate lock file. Error was %s\n", strerror (errno)); err = AIS_DONE_AQUIRE_LOCK; goto error_close_unlink; } memset (pid_s, 0, sizeof (pid_s)); snprintf (pid_s, sizeof (pid_s) - 1, "%u\n", pid); retry_write: if (write (lf, pid_s, strlen (pid_s)) != strlen (pid_s)) { if (errno == EINTR) { goto retry_write; } else { log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't write pid to lock file. " "Error was %s\n", strerror (errno)); err = AIS_DONE_AQUIRE_LOCK; goto error_close_unlink; } } if ((fd_flag = fcntl (lf, F_GETFD, 0)) == -1) { log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't get close-on-exec flag from lock file. " "Error was %s\n", strerror (errno)); err = AIS_DONE_AQUIRE_LOCK; goto error_close_unlink; } fd_flag |= FD_CLOEXEC; if (fcntl (lf, F_SETFD, fd_flag) == -1) { log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't set close-on-exec flag to lock file. " "Error was %s\n", strerror (errno)); err = AIS_DONE_AQUIRE_LOCK; goto error_close_unlink; } return (err); error_close_unlink: unlink (lockfile); error_close: close (lf); return (err); } int main (int argc, char **argv, char **envp) { const char *error_string; struct totem_config totem_config; hdb_handle_t objdb_handle; hdb_handle_t config_handle; unsigned int config_version = 0; void *objdb_p; struct config_iface_ver0 *config; void *config_p; const char *config_iface_init; char *config_iface; char *iface; char *strtok_save_pt; int res, ch; int background, setprio; struct stat stat_out; char corosync_lib_dir[PATH_MAX]; hdb_handle_t object_runtime_handle; enum e_ais_done flock_err; /* default configuration */ background = 1; setprio = 0; while ((ch = getopt (argc, argv, "fprv")) != EOF) { switch (ch) { case 'f': background = 0; logsys_config_mode_set (NULL, LOGSYS_MODE_OUTPUT_STDERR|LOGSYS_MODE_THREADED|LOGSYS_MODE_FORK); break; case 'p': break; case 'r': setprio = 1; break; case 'v': printf ("Corosync Cluster Engine, version '%s'\n", VERSION); printf ("Copyright (c) 2006-2009 Red Hat, Inc.\n"); return EXIT_SUCCESS; break; default: fprintf(stderr, \ "usage:\n"\ " -f : Start application in foreground.\n"\ " -p : Does nothing. \n"\ " -r : Set round robin realtime scheduling \n"\ " -v : Display version and SVN revision of Corosync and exit.\n"); return EXIT_FAILURE; } } /* * Set round robin realtime scheduling with priority 99 * Lock all memory to avoid page faults which may interrupt * application healthchecking */ if (setprio) { corosync_setscheduler (); } corosync_mlockall (); log_printf (LOGSYS_LEVEL_NOTICE, "Corosync Cluster Engine ('%s'): started and ready to provide service.\n", VERSION); log_printf (LOGSYS_LEVEL_INFO, "Corosync built-in features:" PACKAGE_FEATURES "\n"); corosync_poll_handle = qb_loop_create (); qb_loop_signal_add(corosync_poll_handle, QB_LOOP_LOW, SIGUSR2, NULL, sig_diag_handler, NULL); qb_loop_signal_add(corosync_poll_handle, QB_LOOP_HIGH, SIGINT, NULL, sig_exit_handler, NULL); qb_loop_signal_add(corosync_poll_handle, QB_LOOP_HIGH, SIGQUIT, NULL, sig_exit_handler, NULL); qb_loop_signal_add(corosync_poll_handle, QB_LOOP_HIGH, SIGTERM, NULL, sig_exit_handler, NULL); (void)signal (SIGSEGV, sigsegv_handler); (void)signal (SIGABRT, sigabrt_handler); #if MSG_NOSIGNAL != 0 (void)signal (SIGPIPE, SIG_IGN); #endif /* * Load the object database interface */ res = lcr_ifact_reference ( &objdb_handle, "objdb", 0, &objdb_p, 0); if (res == -1) { log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't open configuration object database component.\n"); corosync_exit_error (AIS_DONE_OBJDB); } objdb = (struct objdb_iface_ver0 *)objdb_p; objdb->objdb_init (); /* * Initialize the corosync_api_v1 definition */ apidef_init (objdb); api = apidef_get (); num_config_modules = 0; /* * Bootstrap in the default configuration parser or use * the corosync default built in parser if the configuration parser * isn't overridden */ config_iface_init = getenv("COROSYNC_DEFAULT_CONFIG_IFACE"); if (!config_iface_init) { config_iface_init = "corosync_parser"; } /* Make a copy so we can deface it with strtok */ if ((config_iface = strdup(config_iface_init)) == NULL) { log_printf (LOGSYS_LEVEL_ERROR, "exhausted virtual memory"); corosync_exit_error (AIS_DONE_OBJDB); } iface = strtok_r(config_iface, ":", &strtok_save_pt); while (iface) { res = lcr_ifact_reference ( &config_handle, iface, config_version, &config_p, 0); config = (struct config_iface_ver0 *)config_p; if (res == -1) { log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't open configuration component '%s'\n", iface); corosync_exit_error (AIS_DONE_MAINCONFIGREAD); } res = config->config_readconfig(objdb, &error_string); if (res == -1) { log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string); corosync_exit_error (AIS_DONE_MAINCONFIGREAD); } log_printf (LOGSYS_LEVEL_NOTICE, "%s", error_string); config_modules[num_config_modules++] = config; iface = strtok_r(NULL, ":", &strtok_save_pt); } free(config_iface); res = corosync_main_config_read (objdb, &error_string); if (res == -1) { /* * if we are here, we _must_ flush the logsys queue * and try to inform that we couldn't read the config. * this is a desperate attempt before certain death * and there is no guarantee that we can print to stderr * nor that logsys is sending the messages where we expect. */ log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string); fprintf(stderr, "%s", error_string); syslog (LOGSYS_LEVEL_ERROR, "%s", error_string); corosync_exit_error (AIS_DONE_MAINCONFIGREAD); } /* * Make sure required directory is present */ sprintf (corosync_lib_dir, "%s/lib/corosync", LOCALSTATEDIR); res = stat (corosync_lib_dir, &stat_out); if ((res == -1) || (res == 0 && !S_ISDIR(stat_out.st_mode))) { log_printf (LOGSYS_LEVEL_ERROR, "Required directory not present %s. Please create it.\n", corosync_lib_dir); corosync_exit_error (AIS_DONE_DIR_NOT_PRESENT); } res = totem_config_read (objdb, &totem_config, &error_string); if (res == -1) { log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string); corosync_exit_error (AIS_DONE_MAINCONFIGREAD); } res = totem_config_keyread (objdb, &totem_config, &error_string); if (res == -1) { log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string); corosync_exit_error (AIS_DONE_MAINCONFIGREAD); } res = totem_config_validate (&totem_config, &error_string); if (res == -1) { log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string); corosync_exit_error (AIS_DONE_MAINCONFIGREAD); } totem_config.totem_logging_configuration = totem_logging_configuration; totem_config.totem_logging_configuration.log_subsys_id = _logsys_subsys_create("TOTEM", "totem"); totem_config.totem_logging_configuration.log_level_security = LOGSYS_LEVEL_WARNING; totem_config.totem_logging_configuration.log_level_error = LOGSYS_LEVEL_ERROR; totem_config.totem_logging_configuration.log_level_warning = LOGSYS_LEVEL_WARNING; totem_config.totem_logging_configuration.log_level_notice = LOGSYS_LEVEL_NOTICE; totem_config.totem_logging_configuration.log_level_debug = LOGSYS_LEVEL_DEBUG; totem_config.totem_logging_configuration.log_printf = _logsys_log_printf; logsys_config_apply(); res = corosync_main_config_compatibility_read (objdb, &minimum_sync_mode, &error_string); if (res == -1) { log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string); corosync_exit_error (AIS_DONE_MAINCONFIGREAD); } res = corosync_main_config_compatibility_read (objdb, &minimum_sync_mode, &error_string); if (res == -1) { log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string); corosync_exit_error (AIS_DONE_MAINCONFIGREAD); } /* create the main runtime object */ objdb->object_create (OBJECT_PARENT_HANDLE, &object_runtime_handle, "runtime", strlen ("runtime")); /* * Now we are fully initialized. */ if (background) { corosync_tty_detach (); } qb_log_thread_start(); if ((flock_err = corosync_flock (corosync_lock_file, getpid ())) != AIS_DONE_EXIT) { corosync_exit_error (flock_err); } /* * if totempg_initialize doesn't have root priveleges, it cannot * bind to a specific interface. This only matters if * there is more then one interface in a system, so * in this case, only a warning is printed */ /* * Join multicast group and setup delivery * and configuration change functions */ totempg_initialize ( corosync_poll_handle, &totem_config); totempg_service_ready_register ( main_service_ready); totempg_groups_initialize ( &corosync_group_handle, deliver_fn, confchg_fn); totempg_groups_join ( corosync_group_handle, &corosync_group, 1); /* * Drop root privleges to user 'ais' * TODO: Don't really need full root capabilities; * needed capabilities are: * CAP_NET_RAW (bindtodevice) * CAP_SYS_NICE (setscheduler) * CAP_IPC_LOCK (mlockall) */ priv_drop (); schedwrk_init ( serialize_lock, serialize_unlock); /* * Start main processing loop */ qb_loop_run (corosync_poll_handle); /* * Exit was requested */ totempg_finalize (); /* * Remove pid lock file */ unlink (corosync_lock_file); corosync_exit_error (AIS_DONE_EXIT); return EXIT_SUCCESS; } diff --git a/exec/sync.c b/exec/sync.c index b9cc84a5..ce991299 100644 --- a/exec/sync.c +++ b/exec/sync.c @@ -1,487 +1,487 @@ /* * Copyright (c) 2005-2006 MontaVista Software, Inc. * Copyright (c) 2006-2007, 2009 Red Hat, Inc. * * All rights reserved. * * Author: Steven Dake (sdake@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "quorum.h" #include "sync.h" LOGSYS_DECLARE_SUBSYS ("SYNC"); #define MESSAGE_REQ_SYNC_BARRIER 0 struct barrier_data { unsigned int nodeid; int completed; }; static const struct memb_ring_id *sync_ring_id; static int (*sync_callbacks_retrieve) (int sync_id, struct sync_callbacks *callack); static void (*sync_started) ( const struct memb_ring_id *ring_id); static void (*sync_aborted) (void); static struct sync_callbacks sync_callbacks; static int sync_processing = 0; static void (*sync_next_start) ( const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id); static int sync_recovery_index = 0; static void *sync_callback_token_handle = 0; static struct barrier_data barrier_data_confchg[PROCESSOR_COUNT_MAX]; static size_t barrier_data_confchg_entries; static struct barrier_data barrier_data_process[PROCESSOR_COUNT_MAX]; static unsigned int my_member_list[PROCESSOR_COUNT_MAX]; static unsigned int my_trans_list[PROCESSOR_COUNT_MAX]; static unsigned int my_member_list_entries; static unsigned int my_trans_list_entries; static int sync_barrier_send (const struct memb_ring_id *ring_id); static int sync_start_process (enum totem_callback_token_type type, const void *data); static void sync_service_init (struct memb_ring_id *ring_id); static int sync_service_process (enum totem_callback_token_type type, const void *data); static void sync_deliver_fn ( unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required); static void sync_confchg_fn ( enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id); static void sync_primary_callback_fn ( const unsigned int *view_list, size_t view_list_entries, const struct memb_ring_id *ring_id); static struct totempg_group sync_group = { .group = "sync", .group_len = 4 }; -static hdb_handle_t sync_group_handle; +static void *sync_group_handle; struct req_exec_sync_barrier_start { struct qb_ipc_request_header header; struct memb_ring_id ring_id; }; /* * Send a barrier data structure */ static int sync_barrier_send (const struct memb_ring_id *ring_id) { struct req_exec_sync_barrier_start req_exec_sync_barrier_start; struct iovec iovec; int res; req_exec_sync_barrier_start.header.size = sizeof (struct req_exec_sync_barrier_start); req_exec_sync_barrier_start.header.id = MESSAGE_REQ_SYNC_BARRIER; memcpy (&req_exec_sync_barrier_start.ring_id, ring_id, sizeof (struct memb_ring_id)); iovec.iov_base = (char *)&req_exec_sync_barrier_start; iovec.iov_len = sizeof (req_exec_sync_barrier_start); res = totempg_groups_mcast_joined (sync_group_handle, &iovec, 1, TOTEMPG_AGREED); return (res); } static void sync_start_init (const struct memb_ring_id *ring_id) { totempg_callback_token_create ( &sync_callback_token_handle, TOTEM_CALLBACK_TOKEN_SENT, 0, /* don't delete after callback */ sync_start_process, ring_id); } static void sync_service_init (struct memb_ring_id *ring_id) { if (sync_callbacks.api_version == 1) { sync_callbacks.sync_init_api.sync_init_v1 (my_member_list, my_member_list_entries, ring_id); } else { sync_callbacks.sync_init_api.sync_init_v2 (my_trans_list, my_trans_list_entries, my_member_list, my_member_list_entries, ring_id); } totempg_callback_token_destroy (&sync_callback_token_handle); /* * Create the token callback for the processing */ totempg_callback_token_create ( &sync_callback_token_handle, TOTEM_CALLBACK_TOKEN_SENT, 0, /* don't delete after callback */ sync_service_process, ring_id); } static int sync_start_process (enum totem_callback_token_type type, const void *data) { int res; const struct memb_ring_id *ring_id = data; res = sync_barrier_send (ring_id); if (res == 0) { /* * Delete the token callback for the barrier */ totempg_callback_token_destroy (&sync_callback_token_handle); } return (0); } static void sync_callbacks_load (void) { int res; for (;;) { res = sync_callbacks_retrieve (sync_recovery_index, &sync_callbacks); /* * No more service handlers have sync callbacks at this time ` */ if (res == -1) { sync_processing = 0; break; } sync_recovery_index += 1; if (sync_callbacks.sync_init_api.sync_init_v1) { break; } } } static int sync_service_process (enum totem_callback_token_type type, const void *data) { int res; const struct memb_ring_id *ring_id = data; /* * If process operation not from this ring id, then ignore it and stop * processing */ if (memcmp (ring_id, sync_ring_id, sizeof (struct memb_ring_id)) != 0) { return (0); } /* * If process returns 0, then its time to activate * and start the next service's synchronization */ res = sync_callbacks.sync_process (); if (res != 0) { return (0); } totempg_callback_token_destroy (&sync_callback_token_handle); sync_start_init (ring_id); return (0); } int sync_register ( int (*callbacks_retrieve) ( int sync_id, struct sync_callbacks *callbacks), void (*started) ( const struct memb_ring_id *ring_id), void (*aborted) (void), void (*next_start) ( const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id)) { unsigned int res; res = totempg_groups_initialize ( &sync_group_handle, sync_deliver_fn, sync_confchg_fn); if (res == -1) { log_printf (LOGSYS_LEVEL_ERROR, "Couldn't initialize groups interface.\n"); return (-1); } res = totempg_groups_join ( sync_group_handle, &sync_group, 1); if (res == -1) { log_printf (LOGSYS_LEVEL_ERROR, "Couldn't join group.\n"); return (-1); } sync_callbacks_retrieve = callbacks_retrieve; sync_next_start = next_start; sync_started = started; sync_aborted = aborted; return (0); } static void sync_primary_callback_fn ( const unsigned int *view_list, size_t view_list_entries, const struct memb_ring_id *ring_id) { int i; /* * Execute configuration change for synchronization service */ sync_processing = 1; totempg_callback_token_destroy (&sync_callback_token_handle); sync_recovery_index = 0; memset (&barrier_data_confchg, 0, sizeof (barrier_data_confchg)); for (i = 0; i < view_list_entries; i++) { barrier_data_confchg[i].nodeid = view_list[i]; barrier_data_confchg[i].completed = 0; } memcpy (barrier_data_process, barrier_data_confchg, sizeof (barrier_data_confchg)); barrier_data_confchg_entries = view_list_entries; sync_start_init (sync_ring_id); } static struct memb_ring_id deliver_ring_id; static void sync_endian_convert (struct req_exec_sync_barrier_start *req_exec_sync_barrier_start) { totemip_copy_endian_convert(&req_exec_sync_barrier_start->ring_id.rep, &req_exec_sync_barrier_start->ring_id.rep); req_exec_sync_barrier_start->ring_id.seq = swab64 (req_exec_sync_barrier_start->ring_id.seq); } static void sync_deliver_fn ( unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required) { struct req_exec_sync_barrier_start *req_exec_sync_barrier_start = (struct req_exec_sync_barrier_start *)msg; unsigned int barrier_completed; int i; log_printf (LOGSYS_LEVEL_DEBUG, "confchg entries %lu\n", (unsigned long int) barrier_data_confchg_entries); if (endian_conversion_required) { sync_endian_convert (req_exec_sync_barrier_start); } barrier_completed = 1; memcpy (&deliver_ring_id, &req_exec_sync_barrier_start->ring_id, sizeof (struct memb_ring_id)); /* * Is this barrier from this configuration, if not, ignore it */ if (memcmp (&req_exec_sync_barrier_start->ring_id, sync_ring_id, sizeof (struct memb_ring_id)) != 0) { return; } /* * Set completion for source_addr's address */ for (i = 0; i < barrier_data_confchg_entries; i++) { if (nodeid == barrier_data_process[i].nodeid) { barrier_data_process[i].completed = 1; log_printf (LOGSYS_LEVEL_DEBUG, "Barrier Start Received From %d\n", barrier_data_process[i].nodeid); break; } } /* * Test if barrier is complete */ for (i = 0; i < barrier_data_confchg_entries; i++) { log_printf (LOGSYS_LEVEL_DEBUG, "Barrier completion status for nodeid %d = %d. \n", barrier_data_process[i].nodeid, barrier_data_process[i].completed); if (barrier_data_process[i].completed == 0) { barrier_completed = 0; } } if (barrier_completed) { log_printf (LOGSYS_LEVEL_DEBUG, "Synchronization barrier completed\n"); } /* * This sync is complete so activate and start next service sync */ if (barrier_completed && sync_callbacks.sync_activate) { sync_callbacks.sync_activate (); log_printf (LOGSYS_LEVEL_DEBUG, "Committing synchronization for (%s)\n", sync_callbacks.name); } /* * Start synchronization if the barrier has completed */ if (barrier_completed) { memcpy (barrier_data_process, barrier_data_confchg, sizeof (barrier_data_confchg)); sync_callbacks_load(); /* * if sync service found, execute it */ if (sync_processing && sync_callbacks.sync_init_api.sync_init_v1) { log_printf (LOGSYS_LEVEL_DEBUG, "Synchronization actions starting for (%s)\n", sync_callbacks.name); sync_service_init (&deliver_ring_id); } if (sync_processing == 0) { sync_next_start (my_member_list, my_member_list_entries, sync_ring_id); } } return; } static void sync_confchg_fn ( enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id) { sync_ring_id = ring_id; if (configuration_type != TOTEM_CONFIGURATION_REGULAR) { memcpy (my_trans_list, member_list, member_list_entries * sizeof (unsigned int)); my_trans_list_entries = member_list_entries; return; } memcpy (my_member_list, member_list, member_list_entries * sizeof (unsigned int)); my_member_list_entries = member_list_entries; sync_aborted (); if (sync_processing && sync_callbacks.sync_abort != NULL) { sync_callbacks.sync_abort (); sync_callbacks.sync_activate = NULL; } sync_started ( ring_id); sync_primary_callback_fn ( member_list, member_list_entries, ring_id); } diff --git a/exec/syncv2.c b/exec/syncv2.c index f9eebacf..8a966152 100644 --- a/exec/syncv2.c +++ b/exec/syncv2.c @@ -1,624 +1,624 @@ /* * Copyright (c) 2009 Red Hat, Inc. * * All rights reserved. * * Author: Steven Dake (sdake@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "schedwrk.h" #include "quorum.h" #include "sync.h" #include "syncv2.h" LOGSYS_DECLARE_SUBSYS ("SYNCV2"); #define MESSAGE_REQ_SYNC_BARRIER 0 #define MESSAGE_REQ_SYNC_SERVICE_BUILD 1 #define MESSAGE_REQ_SYNC_MEMB_DETERMINE 2 enum sync_process_state { INIT, PROCESS, ACTIVATE }; enum sync_state { SYNC_SERVICELIST_BUILD, SYNC_PROCESS, SYNC_BARRIER }; struct service_entry { int service_id; int api_version; union sync_init_api sync_init_api; void (*sync_abort) (void); int (*sync_process) (void); void (*sync_activate) (void); enum sync_process_state state; char name[128]; }; struct processor_entry { int nodeid; int received; }; struct req_exec_memb_determine_message { struct qb_ipc_request_header header __attribute__((aligned(8))); struct memb_ring_id ring_id __attribute__((aligned(8))); }; struct req_exec_service_build_message { struct qb_ipc_request_header header __attribute__((aligned(8))); struct memb_ring_id ring_id __attribute__((aligned(8))); int service_list_entries __attribute__((aligned(8))); int service_list[128] __attribute__((aligned(8))); }; struct req_exec_barrier_message { struct qb_ipc_request_header header __attribute__((aligned(8))); struct memb_ring_id ring_id __attribute__((aligned(8))); }; static enum sync_state my_state = SYNC_BARRIER; static struct memb_ring_id my_ring_id; static struct memb_ring_id my_memb_determine_ring_id; static int my_memb_determine = 0; static unsigned int my_memb_determine_list[PROCESSOR_COUNT_MAX]; static unsigned int my_memb_determine_list_entries = 0; static int my_processing_idx = 0; static hdb_handle_t my_schedwrk_handle; static struct processor_entry my_processor_list[PROCESSOR_COUNT_MAX]; static unsigned int my_member_list[PROCESSOR_COUNT_MAX]; static unsigned int my_trans_list[PROCESSOR_COUNT_MAX]; static size_t my_member_list_entries = 0; static size_t my_trans_list_entries = 0; static int my_processor_list_entries = 0; static struct service_entry my_service_list[128]; static int my_service_list_entries = 0; static const struct memb_ring_id sync_ring_id; static struct service_entry my_initial_service_list[PROCESSOR_COUNT_MAX]; static int my_initial_service_list_entries; static void (*sync_synchronization_completed) (void); static void sync_deliver_fn ( unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required); static int schedwrk_processor (const void *context); static void sync_process_enter (void); static struct totempg_group sync_group = { .group = "syncv2", .group_len = 6 }; -static hdb_handle_t sync_group_handle; +static void *sync_group_handle; int sync_v2_init ( int (*sync_callbacks_retrieve) ( int service_id, struct sync_callbacks *callbacks), void (*synchronization_completed) (void)) { unsigned int res; int i; struct sync_callbacks sync_callbacks; res = totempg_groups_initialize ( &sync_group_handle, sync_deliver_fn, NULL); if (res == -1) { log_printf (LOGSYS_LEVEL_ERROR, "Couldn't initialize groups interface."); return (-1); } res = totempg_groups_join ( sync_group_handle, &sync_group, 1); if (res == -1) { log_printf (LOGSYS_LEVEL_ERROR, "Couldn't join group.\n"); return (-1); } sync_synchronization_completed = synchronization_completed; for (i = 0; i < 64; i++) { res = sync_callbacks_retrieve (i, &sync_callbacks); if (res == -1) { continue; } if (sync_callbacks.sync_init_api.sync_init_v1 == NULL) { continue; } my_initial_service_list[my_initial_service_list_entries].state = INIT; my_initial_service_list[my_initial_service_list_entries].service_id = i; strcpy (my_initial_service_list[my_initial_service_list_entries].name, sync_callbacks.name); my_initial_service_list[my_initial_service_list_entries].api_version = sync_callbacks.api_version; my_initial_service_list[my_initial_service_list_entries].sync_init_api = sync_callbacks.sync_init_api; my_initial_service_list[my_initial_service_list_entries].sync_process = sync_callbacks.sync_process; my_initial_service_list[my_initial_service_list_entries].sync_abort = sync_callbacks.sync_abort; my_initial_service_list[my_initial_service_list_entries].sync_activate = sync_callbacks.sync_activate; my_initial_service_list_entries += 1; } return (0); } static void sync_barrier_handler (unsigned int nodeid, const void *msg) { const struct req_exec_barrier_message *req_exec_barrier_message = msg; int i; int barrier_reached = 1; if (memcmp (&my_ring_id, &req_exec_barrier_message->ring_id, sizeof (struct memb_ring_id)) != 0) { log_printf (LOGSYS_LEVEL_DEBUG, "barrier for old ring - discarding\n"); return; } for (i = 0; i < my_processor_list_entries; i++) { if (my_processor_list[i].nodeid == nodeid) { my_processor_list[i].received = 1; } } for (i = 0; i < my_processor_list_entries; i++) { if (my_processor_list[i].received == 0) { barrier_reached = 0; } } if (barrier_reached) { log_printf (LOGSYS_LEVEL_DEBUG, "Committing synchronization for %s\n", my_service_list[my_processing_idx].name); my_service_list[my_processing_idx].state = ACTIVATE; my_service_list[my_processing_idx].sync_activate (); my_processing_idx += 1; if (my_service_list_entries == my_processing_idx) { my_memb_determine_list_entries = 0; sync_synchronization_completed (); } else { sync_process_enter (); } } } static void dummy_sync_init ( const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id) { } static void dummy_sync_abort (void) { } static int dummy_sync_process (void) { return (0); } static void dummy_sync_activate (void) { } static int service_entry_compare (const void *a, const void *b) { const struct service_entry *service_entry_a = a; const struct service_entry *service_entry_b = b; return (service_entry_a->service_id > service_entry_b->service_id); } static void sync_memb_determine (unsigned int nodeid, const void *msg) { const struct req_exec_memb_determine_message *req_exec_memb_determine_message = msg; int found = 0; int i; if (memcmp (&req_exec_memb_determine_message->ring_id, &my_memb_determine_ring_id, sizeof (struct memb_ring_id)) != 0) { log_printf (LOGSYS_LEVEL_DEBUG, "memb determine for old ring - discarding\n"); return; } my_memb_determine = 1; for (i = 0; i < my_memb_determine_list_entries; i++) { if (my_memb_determine_list[i] == nodeid) { found = 1; } } if (found == 0) { my_memb_determine_list[my_memb_determine_list_entries] = nodeid; my_memb_determine_list_entries += 1; } } static void sync_service_build_handler (unsigned int nodeid, const void *msg) { const struct req_exec_service_build_message *req_exec_service_build_message = msg; int i, j; int barrier_reached = 1; int found; int qsort_trigger = 0; if (memcmp (&my_ring_id, &req_exec_service_build_message->ring_id, sizeof (struct memb_ring_id)) != 0) { log_printf (LOGSYS_LEVEL_DEBUG, "service build for old ring - discarding\n"); return; } for (i = 0; i < req_exec_service_build_message->service_list_entries; i++) { found = 0; for (j = 0; j < my_service_list_entries; j++) { if (req_exec_service_build_message->service_list[i] == my_service_list[j].service_id) { found = 1; break; } } if (found == 0) { my_service_list[my_service_list_entries].state = INIT; my_service_list[my_service_list_entries].service_id = req_exec_service_build_message->service_list[i]; sprintf (my_service_list[my_service_list_entries].name, "External Service (id = %d)\n", req_exec_service_build_message->service_list[i]); my_service_list[my_service_list_entries].api_version = 1; my_service_list[my_service_list_entries].sync_init_api.sync_init_v1 = dummy_sync_init; my_service_list[my_service_list_entries].sync_abort = dummy_sync_abort; my_service_list[my_service_list_entries].sync_process = dummy_sync_process; my_service_list[my_service_list_entries].sync_activate = dummy_sync_activate; my_service_list_entries += 1; qsort_trigger = 1; } } if (qsort_trigger) { qsort (my_service_list, my_service_list_entries, sizeof (struct service_entry), service_entry_compare); } for (i = 0; i < my_processor_list_entries; i++) { if (my_processor_list[i].nodeid == nodeid) { my_processor_list[i].received = 1; } } for (i = 0; i < my_processor_list_entries; i++) { if (my_processor_list[i].received == 0) { barrier_reached = 0; } } if (barrier_reached) { sync_process_enter (); } } static void sync_deliver_fn ( unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required) { struct qb_ipc_request_header *header = (struct qb_ipc_request_header *)msg; switch (header->id) { case MESSAGE_REQ_SYNC_BARRIER: sync_barrier_handler (nodeid, msg); break; case MESSAGE_REQ_SYNC_SERVICE_BUILD: sync_service_build_handler (nodeid, msg); break; case MESSAGE_REQ_SYNC_MEMB_DETERMINE: sync_memb_determine (nodeid, msg); break; } } static void memb_determine_message_transmit (void) { struct iovec iovec; struct req_exec_memb_determine_message req_exec_memb_determine_message; req_exec_memb_determine_message.header.size = sizeof (struct req_exec_memb_determine_message); req_exec_memb_determine_message.header.id = MESSAGE_REQ_SYNC_MEMB_DETERMINE; memcpy (&req_exec_memb_determine_message.ring_id, &my_memb_determine_ring_id, sizeof (struct memb_ring_id)); iovec.iov_base = (char *)&req_exec_memb_determine_message; iovec.iov_len = sizeof (req_exec_memb_determine_message); (void)totempg_groups_mcast_joined (sync_group_handle, &iovec, 1, TOTEMPG_AGREED); } static void barrier_message_transmit (void) { struct iovec iovec; struct req_exec_barrier_message req_exec_barrier_message; req_exec_barrier_message.header.size = sizeof (struct req_exec_barrier_message); req_exec_barrier_message.header.id = MESSAGE_REQ_SYNC_BARRIER; memcpy (&req_exec_barrier_message.ring_id, &my_ring_id, sizeof (struct memb_ring_id)); iovec.iov_base = (char *)&req_exec_barrier_message; iovec.iov_len = sizeof (req_exec_barrier_message); (void)totempg_groups_mcast_joined (sync_group_handle, &iovec, 1, TOTEMPG_AGREED); } static void service_build_message_transmit (struct req_exec_service_build_message *service_build_message) { struct iovec iovec; service_build_message->header.size = sizeof (struct req_exec_service_build_message); service_build_message->header.id = MESSAGE_REQ_SYNC_SERVICE_BUILD; memcpy (&service_build_message->ring_id, &my_ring_id, sizeof (struct memb_ring_id)); iovec.iov_base = (void *)service_build_message; iovec.iov_len = sizeof (struct req_exec_service_build_message); (void)totempg_groups_mcast_joined (sync_group_handle, &iovec, 1, TOTEMPG_AGREED); } static void sync_barrier_enter (void) { my_state = SYNC_BARRIER; barrier_message_transmit (); } static void sync_process_enter (void) { int i; my_state = SYNC_PROCESS; /* * No syncv2 services */ if (my_service_list_entries == 0) { my_state = SYNC_SERVICELIST_BUILD; my_memb_determine_list_entries = 0; sync_synchronization_completed (); return; } for (i = 0; i < my_processor_list_entries; i++) { my_processor_list[i].received = 0; } schedwrk_create (&my_schedwrk_handle, schedwrk_processor, NULL); } static void sync_servicelist_build_enter ( const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id) { struct req_exec_service_build_message service_build; int i; my_state = SYNC_SERVICELIST_BUILD; for (i = 0; i < member_list_entries; i++) { my_processor_list[i].nodeid = member_list[i]; my_processor_list[i].received = 0; } my_processor_list_entries = member_list_entries; memcpy (my_member_list, member_list, member_list_entries * sizeof (unsigned int)); my_member_list_entries = member_list_entries; my_processing_idx = 0; memcpy (my_service_list, my_initial_service_list, sizeof (struct service_entry) * my_initial_service_list_entries); my_service_list_entries = my_initial_service_list_entries; for (i = 0; i < my_initial_service_list[i].service_id; i++) { service_build.service_list[i] = my_initial_service_list[i].service_id; } service_build.service_list_entries = i; service_build_message_transmit (&service_build); } static int schedwrk_processor (const void *context) { int res = 0; if (my_service_list[my_processing_idx].state == INIT) { my_service_list[my_processing_idx].state = PROCESS; if (my_service_list[my_processing_idx].api_version == 1) { my_service_list[my_processing_idx].sync_init_api.sync_init_v1 (my_member_list, my_member_list_entries, &my_ring_id); } else { unsigned int old_trans_list[PROCESSOR_COUNT_MAX]; size_t old_trans_list_entries = 0; int o, m; memcpy (old_trans_list, my_trans_list, my_trans_list_entries * sizeof (unsigned int)); old_trans_list_entries = my_trans_list_entries; my_trans_list_entries = 0; for (o = 0; o < old_trans_list_entries; o++) { for (m = 0; m < my_member_list_entries; m++) { if (old_trans_list[o] == my_member_list[m]) { my_trans_list[my_trans_list_entries] = my_member_list[m]; my_trans_list_entries++; break; } } } my_service_list[my_processing_idx].sync_init_api.sync_init_v2 (my_trans_list, my_trans_list_entries, my_member_list, my_member_list_entries, &my_ring_id); } } if (my_service_list[my_processing_idx].state == PROCESS) { my_service_list[my_processing_idx].state = PROCESS; res = my_service_list[my_processing_idx].sync_process (); if (res == 0) { sync_barrier_enter(); } else { return (-1); } } return (0); } void sync_v2_start ( const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id) { ENTER(); memcpy (&my_ring_id, ring_id, sizeof (struct memb_ring_id)); if (my_memb_determine) { my_memb_determine = 0; sync_servicelist_build_enter (my_memb_determine_list, my_memb_determine_list_entries, ring_id); } else { sync_servicelist_build_enter (member_list, member_list_entries, ring_id); } } void sync_v2_save_transitional ( const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id) { ENTER(); memcpy (my_trans_list, member_list, member_list_entries * sizeof (unsigned int)); my_trans_list_entries = member_list_entries; } void sync_v2_abort (void) { ENTER(); if (my_state == SYNC_PROCESS) { schedwrk_destroy (my_schedwrk_handle); my_service_list[my_processing_idx].sync_abort (); } /* this will cause any "old" barrier messages from causing * problems. */ memset (&my_ring_id, 0, sizeof (struct memb_ring_id)); } void sync_v2_memb_list_determine (const struct memb_ring_id *ring_id) { ENTER(); memcpy (&my_memb_determine_ring_id, ring_id, sizeof (struct memb_ring_id)); memb_determine_message_transmit (); } void sync_v2_memb_list_abort (void) { ENTER(); my_memb_determine_list_entries = 0; memset (&my_memb_determine_ring_id, 0, sizeof (struct memb_ring_id)); } diff --git a/exec/totempg.c b/exec/totempg.c index c5ba01c8..a3eee15b 100644 --- a/exec/totempg.c +++ b/exec/totempg.c @@ -1,1531 +1,1460 @@ /* * Copyright (c) 2003-2005 MontaVista Software, Inc. * Copyright (c) 2005 OSDL. * Copyright (c) 2006-2009 Red Hat, Inc. * * All rights reserved. * * Author: Steven Dake (sdake@redhat.com) * Author: Mark Haverkamp (markh@osdl.org) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ /* * FRAGMENTATION AND PACKING ALGORITHM: * * Assemble the entire message into one buffer * if full fragment * store fragment into lengths list * for each full fragment * multicast fragment * set length and fragment fields of pg mesage * store remaining multicast into head of fragmentation data and set lens field * * If a message exceeds the maximum packet size allowed by the totem * single ring protocol, the protocol could lose forward progress. * Statically calculating the allowed data amount doesn't work because * the amount of data allowed depends on the number of fragments in * each message. In this implementation, the maximum fragment size * is dynamically calculated for each fragment added to the message. * It is possible for a message to be two bytes short of the maximum * packet size. This occurs when a message or collection of * messages + the mcast header + the lens are two bytes short of the * end of the packet. Since another len field consumes two bytes, the * len field would consume the rest of the packet without room for data. * * One optimization would be to forgo the final len field and determine * it from the size of the udp datagram. Then this condition would no * longer occur. */ /* * ASSEMBLY AND UNPACKING ALGORITHM: * * copy incoming packet into assembly data buffer indexed by current * location of end of fragment * * if not fragmented * deliver all messages in assembly data buffer * else * if msg_count > 1 and fragmented * deliver all messages except last message in assembly data buffer * copy last fragmented section to start of assembly data buffer * else * if msg_count = 1 and fragmented * do nothing * */ #include #ifdef HAVE_ALLOCA_H #include #endif #include #include #include #include #include #include #include #include #include #include -#include #include #include #include #include #define LOGSYS_UTILS_ONLY 1 #include #include "totemmrp.h" #include "totemsrp.h" #define min(a,b) ((a) < (b)) ? a : b struct totempg_mcast_header { short version; short type; }; #if !(defined(__i386__) || defined(__x86_64__)) /* * Need align on architectures different then i386 or x86_64 */ #define TOTEMPG_NEED_ALIGN 1 #endif /* * totempg_mcast structure * * header: Identify the mcast. * fragmented: Set if this message continues into next message * continuation: Set if this message is a continuation from last message * msg_count Indicates how many packed messages are contained * in the mcast. * Also, the size of each packed message and the messages themselves are * appended to the end of this structure when sent. */ struct totempg_mcast { struct totempg_mcast_header header; unsigned char fragmented; unsigned char continuation; unsigned short msg_count; /* * short msg_len[msg_count]; */ /* * data for messages */ }; /* * Maximum packet size for totem pg messages */ #define TOTEMPG_PACKET_SIZE (totempg_totem_config->net_mtu - \ sizeof (struct totempg_mcast)) /* * Local variables used for packing small messages */ static unsigned short mcast_packed_msg_lens[FRAME_SIZE_MAX]; static int mcast_packed_msg_count = 0; static int totempg_reserved = 1; static unsigned int totempg_size_limit; static totem_queue_level_changed_fn totem_queue_level_changed = NULL; static uint32_t totempg_threaded_mode = 0; /* * Function and data used to log messages */ static int totempg_log_level_security; static int totempg_log_level_error; static int totempg_log_level_warning; static int totempg_log_level_notice; static int totempg_log_level_debug; static int totempg_subsys_id; static void (*totempg_log_printf) ( int level, int subsys, const char *function, const char *file, int line, const char *format, ...) __attribute__((format(printf, 6, 7))); struct totem_config *totempg_totem_config; static totempg_stats_t totempg_stats; enum throw_away_mode { THROW_AWAY_INACTIVE, THROW_AWAY_ACTIVE }; struct assembly { unsigned int nodeid; unsigned char data[MESSAGE_SIZE_MAX]; int index; unsigned char last_frag_num; enum throw_away_mode throw_away_mode; struct list_head list; }; static void assembly_deref (struct assembly *assembly); static int callback_token_received_fn (enum totem_callback_token_type type, const void *data); DECLARE_LIST_INIT(assembly_list_inuse); DECLARE_LIST_INIT(assembly_list_free); +DECLARE_LIST_INIT(totempg_groups_list); + /* * Staging buffer for packed messages. Messages are staged in this buffer * before sending. Multiple messages may fit which cuts down on the * number of mcasts sent. If a message doesn't completely fit, then * the mcast header has a fragment bit set that says that there are more * data to follow. fragment_size is an index into the buffer. It indicates * the size of message data and where to place new message data. * fragment_contuation indicates whether the first packed message in * the buffer is a continuation of a previously packed fragment. */ static unsigned char *fragmentation_data; static int fragment_size = 0; static int fragment_continuation = 0; static struct iovec iov_delv; -static unsigned int totempg_max_handle = 0; - struct totempg_group_instance { void (*deliver_fn) ( unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required); void (*confchg_fn) ( enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id); struct totempg_group *groups; int groups_cnt; int32_t q_level; + + struct list_head list; }; DECLARE_HDB_DATABASE (totempg_groups_instance_database,NULL); static unsigned char next_fragment = 1; static pthread_mutex_t totempg_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t callback_token_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t mcast_msg_mutex = PTHREAD_MUTEX_INITIALIZER; #define log_printf(level, format, args...) \ do { \ totempg_log_printf(level, \ totempg_subsys_id, \ __FUNCTION__, __FILE__, __LINE__, \ format, ##args); \ } while (0); static int msg_count_send_ok (int msg_count); static int byte_count_send_ok (int byte_count); static struct assembly *assembly_ref (unsigned int nodeid) { struct assembly *assembly; struct list_head *list; /* * Search inuse list for node id and return assembly buffer if found */ for (list = assembly_list_inuse.next; list != &assembly_list_inuse; list = list->next) { assembly = list_entry (list, struct assembly, list); if (nodeid == assembly->nodeid) { return (assembly); } } /* * Nothing found in inuse list get one from free list if available */ if (list_empty (&assembly_list_free) == 0) { assembly = list_entry (assembly_list_free.next, struct assembly, list); list_del (&assembly->list); list_add (&assembly->list, &assembly_list_inuse); assembly->nodeid = nodeid; assembly->index = 0; assembly->last_frag_num = 0; assembly->throw_away_mode = THROW_AWAY_INACTIVE; return (assembly); } /* * Nothing available in inuse or free list, so allocate a new one */ assembly = malloc (sizeof (struct assembly)); /* * TODO handle memory allocation failure here */ assert (assembly); assembly->nodeid = nodeid; assembly->data[0] = 0; assembly->index = 0; assembly->last_frag_num = 0; assembly->throw_away_mode = THROW_AWAY_INACTIVE; list_init (&assembly->list); list_add (&assembly->list, &assembly_list_inuse); return (assembly); } static void assembly_deref (struct assembly *assembly) { list_del (&assembly->list); list_add (&assembly->list, &assembly_list_free); } static inline void app_confchg_fn ( enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id) { int i; struct totempg_group_instance *instance; struct assembly *assembly; - unsigned int res; + struct list_head *list; /* * For every leaving processor, add to free list * This also has the side effect of clearing out the dataset * In the leaving processor's assembly buffer. */ for (i = 0; i < left_list_entries; i++) { assembly = assembly_ref (left_list[i]); list_del (&assembly->list); list_add (&assembly->list, &assembly_list_free); } - for (i = 0; i <= totempg_max_handle; i++) { - res = hdb_handle_get (&totempg_groups_instance_database, - hdb_nocheck_convert (i), (void *)&instance); - - if (res == 0) { - if (instance->confchg_fn) { - instance->confchg_fn ( - configuration_type, - member_list, - member_list_entries, - left_list, - left_list_entries, - joined_list, - joined_list_entries, - ring_id); - } - hdb_handle_put (&totempg_groups_instance_database, - hdb_nocheck_convert (i)); + for (list = totempg_groups_list.next; + list != &totempg_groups_list; + list = list->next) { + + instance = list_entry (list, struct totempg_group_instance, list); + + if (instance->confchg_fn) { + instance->confchg_fn ( + configuration_type, + member_list, + member_list_entries, + left_list, + left_list_entries, + joined_list, + joined_list_entries, + ring_id); } } } static inline void group_endian_convert ( void *msg, int msg_len) { unsigned short *group_len; int i; char *aligned_msg; #ifdef TOTEMPG_NEED_ALIGN /* * Align data structure for not i386 or x86_64 */ if ((size_t)msg % 4 != 0) { aligned_msg = alloca(msg_len); memcpy(aligned_msg, msg, msg_len); } else { aligned_msg = msg; } #else aligned_msg = msg; #endif group_len = (unsigned short *)aligned_msg; group_len[0] = swab16(group_len[0]); for (i = 1; i < group_len[0] + 1; i++) { group_len[i] = swab16(group_len[i]); } if (aligned_msg != msg) { memcpy(msg, aligned_msg, msg_len); } } static inline int group_matches ( struct iovec *iovec, unsigned int iov_len, struct totempg_group *groups_b, unsigned int group_b_cnt, unsigned int *adjust_iovec) { unsigned short *group_len; char *group_name; int i; int j; #ifdef TOTEMPG_NEED_ALIGN struct iovec iovec_aligned = { NULL, 0 }; #endif assert (iov_len == 1); #ifdef TOTEMPG_NEED_ALIGN /* * Align data structure for not i386 or x86_64 */ if ((size_t)iovec->iov_base % 4 != 0) { iovec_aligned.iov_base = alloca(iovec->iov_len); memcpy(iovec_aligned.iov_base, iovec->iov_base, iovec->iov_len); iovec_aligned.iov_len = iovec->iov_len; iovec = &iovec_aligned; } #endif group_len = (unsigned short *)iovec->iov_base; group_name = ((char *)iovec->iov_base) + sizeof (unsigned short) * (group_len[0] + 1); /* * Calculate amount to adjust the iovec by before delivering to app */ *adjust_iovec = sizeof (unsigned short) * (group_len[0] + 1); for (i = 1; i < group_len[0] + 1; i++) { *adjust_iovec += group_len[i]; } /* * Determine if this message should be delivered to this instance */ for (i = 1; i < group_len[0] + 1; i++) { for (j = 0; j < group_b_cnt; j++) { if ((group_len[i] == groups_b[j].group_len) && (memcmp (groups_b[j].group, group_name, group_len[i]) == 0)) { return (1); } } group_name += group_len[i]; } return (0); } static inline void app_deliver_fn ( unsigned int nodeid, void *msg, unsigned int msg_len, int endian_conversion_required) { - int i; struct totempg_group_instance *instance; struct iovec stripped_iovec; unsigned int adjust_iovec; - unsigned int res; struct iovec *iovec; + struct list_head *list; struct iovec aligned_iovec = { NULL, 0 }; if (endian_conversion_required) { group_endian_convert (msg, msg_len); } /* * TODO: segmentation/assembly need to be redesigned to provide aligned access * in all cases to avoid memory copies on non386 archs. Probably broke backwars * compatibility */ #ifdef TOTEMPG_NEED_ALIGN /* * Align data structure for not i386 or x86_64 */ aligned_iovec.iov_base = alloca(msg_len); aligned_iovec.iov_len = msg_len; memcpy(aligned_iovec.iov_base, msg, msg_len); #else aligned_iovec.iov_base = msg; aligned_iovec.iov_len = msg_len; #endif iovec = &aligned_iovec; - for (i = 0; i <= totempg_max_handle; i++) { - res = hdb_handle_get (&totempg_groups_instance_database, - hdb_nocheck_convert (i), (void *)&instance); + for (list = totempg_groups_list.next; + list != &totempg_groups_list; + list = list->next) { - if (res == 0) { - if (group_matches (iovec, 1, instance->groups, instance->groups_cnt, &adjust_iovec)) { - stripped_iovec.iov_len = iovec->iov_len - adjust_iovec; - stripped_iovec.iov_base = (char *)iovec->iov_base + adjust_iovec; + instance = list_entry (list, struct totempg_group_instance, list); + if (group_matches (iovec, 1, instance->groups, instance->groups_cnt, &adjust_iovec)) { + stripped_iovec.iov_len = iovec->iov_len - adjust_iovec; + stripped_iovec.iov_base = (char *)iovec->iov_base + adjust_iovec; #ifdef TOTEMPG_NEED_ALIGN + /* + * Align data structure for not i386 or x86_64 + */ + if ((char *)iovec->iov_base + adjust_iovec % 4 != 0) { /* - * Align data structure for not i386 or x86_64 + * Deal with misalignment */ - if ((char *)iovec->iov_base + adjust_iovec % 4 != 0) { - /* - * Deal with misalignment - */ - stripped_iovec.iov_base = - alloca (stripped_iovec.iov_len); - memcpy (stripped_iovec.iov_base, - (char *)iovec->iov_base + adjust_iovec, - stripped_iovec.iov_len); - } -#endif - instance->deliver_fn ( - nodeid, - stripped_iovec.iov_base, - stripped_iovec.iov_len, - endian_conversion_required); + stripped_iovec.iov_base = + alloca (stripped_iovec.iov_len); + memcpy (stripped_iovec.iov_base, + (char *)iovec->iov_base + adjust_iovec, + stripped_iovec.iov_len); } - - hdb_handle_put (&totempg_groups_instance_database, hdb_nocheck_convert(i)); +#endif + instance->deliver_fn ( + nodeid, + stripped_iovec.iov_base, + stripped_iovec.iov_len, + endian_conversion_required); } } } static void totempg_confchg_fn ( enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id) { // TODO optimize this app_confchg_fn (configuration_type, member_list, member_list_entries, left_list, left_list_entries, joined_list, joined_list_entries, ring_id); } static void totempg_deliver_fn ( unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required) { struct totempg_mcast *mcast; unsigned short *msg_lens; int i; struct assembly *assembly; char header[FRAME_SIZE_MAX]; int msg_count; int continuation; int start; const char *data; int datasize; assembly = assembly_ref (nodeid); assert (assembly); /* * Assemble the header into one block of data and * assemble the packet contents into one block of data to simplify delivery */ mcast = (struct totempg_mcast *)msg; if (endian_conversion_required) { mcast->msg_count = swab16 (mcast->msg_count); } msg_count = mcast->msg_count; datasize = sizeof (struct totempg_mcast) + msg_count * sizeof (unsigned short); memcpy (header, msg, datasize); data = msg; msg_lens = (unsigned short *) (header + sizeof (struct totempg_mcast)); if (endian_conversion_required) { for (i = 0; i < mcast->msg_count; i++) { msg_lens[i] = swab16 (msg_lens[i]); } } memcpy (&assembly->data[assembly->index], &data[datasize], msg_len - datasize); /* * If the last message in the buffer is a fragment, then we * can't deliver it. We'll first deliver the full messages * then adjust the assembly buffer so we can add the rest of the * fragment when it arrives. */ msg_count = mcast->fragmented ? mcast->msg_count - 1 : mcast->msg_count; continuation = mcast->continuation; iov_delv.iov_base = (void *)&assembly->data[0]; iov_delv.iov_len = assembly->index + msg_lens[0]; /* * Make sure that if this message is a continuation, that it * matches the sequence number of the previous fragment. * Also, if the first packed message is a continuation * of a previous message, but the assembly buffer * is empty, then we need to discard it since we can't * assemble a complete message. Likewise, if this message isn't a * continuation and the assembly buffer is empty, we have to discard * the continued message. */ start = 0; if (assembly->throw_away_mode == THROW_AWAY_ACTIVE) { /* Throw away the first msg block */ if (mcast->fragmented == 0 || mcast->fragmented == 1) { assembly->throw_away_mode = THROW_AWAY_INACTIVE; assembly->index += msg_lens[0]; iov_delv.iov_base = (void *)&assembly->data[assembly->index]; iov_delv.iov_len = msg_lens[1]; start = 1; } } else if (assembly->throw_away_mode == THROW_AWAY_INACTIVE) { if (continuation == assembly->last_frag_num) { assembly->last_frag_num = mcast->fragmented; for (i = start; i < msg_count; i++) { app_deliver_fn(nodeid, iov_delv.iov_base, iov_delv.iov_len, endian_conversion_required); assembly->index += msg_lens[i]; iov_delv.iov_base = (void *)&assembly->data[assembly->index]; if (i < (msg_count - 1)) { iov_delv.iov_len = msg_lens[i + 1]; } } } else { assembly->throw_away_mode = THROW_AWAY_ACTIVE; } } if (mcast->fragmented == 0) { /* * End of messages, dereference assembly struct */ assembly->last_frag_num = 0; assembly->index = 0; assembly_deref (assembly); } else { /* * Message is fragmented, keep around assembly list */ if (mcast->msg_count > 1) { memmove (&assembly->data[0], &assembly->data[assembly->index], msg_lens[msg_count]); assembly->index = 0; } assembly->index += msg_lens[msg_count]; } } /* * Totem Process Group Abstraction * depends on poll abstraction, POSIX, IPV4 */ void *callback_token_received_handle; int callback_token_received_fn (enum totem_callback_token_type type, const void *data) { struct totempg_mcast mcast; struct iovec iovecs[3]; if (totempg_threaded_mode == 1) { pthread_mutex_lock (&mcast_msg_mutex); } if (mcast_packed_msg_count == 0) { if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&mcast_msg_mutex); } return (0); } if (totemmrp_avail() == 0) { if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&mcast_msg_mutex); } return (0); } mcast.header.version = 0; mcast.header.type = 0; mcast.fragmented = 0; /* * Was the first message in this buffer a continuation of a * fragmented message? */ mcast.continuation = fragment_continuation; fragment_continuation = 0; mcast.msg_count = mcast_packed_msg_count; iovecs[0].iov_base = (void *)&mcast; iovecs[0].iov_len = sizeof (struct totempg_mcast); iovecs[1].iov_base = (void *)mcast_packed_msg_lens; iovecs[1].iov_len = mcast_packed_msg_count * sizeof (unsigned short); iovecs[2].iov_base = (void *)&fragmentation_data[0]; iovecs[2].iov_len = fragment_size; (void)totemmrp_mcast (iovecs, 3, 0); mcast_packed_msg_count = 0; fragment_size = 0; if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&mcast_msg_mutex); } return (0); } /* * Initialize the totem process group abstraction */ int totempg_initialize ( qb_loop_t *poll_handle, struct totem_config *totem_config) { int res; totempg_totem_config = totem_config; totempg_log_level_security = totem_config->totem_logging_configuration.log_level_security; totempg_log_level_error = totem_config->totem_logging_configuration.log_level_error; totempg_log_level_warning = totem_config->totem_logging_configuration.log_level_warning; totempg_log_level_notice = totem_config->totem_logging_configuration.log_level_notice; totempg_log_level_debug = totem_config->totem_logging_configuration.log_level_debug; totempg_log_printf = totem_config->totem_logging_configuration.log_printf; totempg_subsys_id = totem_config->totem_logging_configuration.log_subsys_id; fragmentation_data = malloc (TOTEMPG_PACKET_SIZE); if (fragmentation_data == 0) { return (-1); } totemsrp_net_mtu_adjust (totem_config); res = totemmrp_initialize ( poll_handle, totem_config, &totempg_stats, totempg_deliver_fn, totempg_confchg_fn); totemmrp_callback_token_create ( &callback_token_received_handle, TOTEM_CALLBACK_TOKEN_RECEIVED, 0, callback_token_received_fn, 0); totempg_size_limit = (totemmrp_avail() - 1) * (totempg_totem_config->net_mtu - sizeof (struct totempg_mcast) - 16); + list_init (&totempg_groups_list); + return (res); } void totempg_finalize (void) { if (totempg_threaded_mode == 1) { pthread_mutex_lock (&totempg_mutex); } totemmrp_finalize (); if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&totempg_mutex); } } /* * Multicast a message */ static int mcast_msg ( struct iovec *iovec_in, unsigned int iov_len, int guarantee) { int res = 0; struct totempg_mcast mcast; struct iovec iovecs[3]; struct iovec iovec[64]; int i; int dest, src; int max_packet_size = 0; int copy_len = 0; int copy_base = 0; int total_size = 0; if (totempg_threaded_mode == 1) { pthread_mutex_lock (&mcast_msg_mutex); } totemmrp_event_signal (TOTEM_EVENT_NEW_MSG, 1); /* * Remove zero length iovectors from the list */ assert (iov_len < 64); for (dest = 0, src = 0; src < iov_len; src++) { if (iovec_in[src].iov_len) { memcpy (&iovec[dest++], &iovec_in[src], sizeof (struct iovec)); } } iov_len = dest; max_packet_size = TOTEMPG_PACKET_SIZE - (sizeof (unsigned short) * (mcast_packed_msg_count + 1)); mcast_packed_msg_lens[mcast_packed_msg_count] = 0; /* * Check if we would overwrite new message queue */ for (i = 0; i < iov_len; i++) { total_size += iovec[i].iov_len; } if (byte_count_send_ok (total_size + sizeof(unsigned short) * (mcast_packed_msg_count)) == 0) { if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&mcast_msg_mutex); } return(-1); } mcast.header.version = 0; for (i = 0; i < iov_len; ) { mcast.fragmented = 0; mcast.continuation = fragment_continuation; copy_len = iovec[i].iov_len - copy_base; /* * If it all fits with room left over, copy it in. * We need to leave at least sizeof(short) + 1 bytes in the * fragment_buffer on exit so that max_packet_size + fragment_size * doesn't exceed the size of the fragment_buffer on the next call. */ if ((copy_len + fragment_size) < (max_packet_size - sizeof (unsigned short))) { memcpy (&fragmentation_data[fragment_size], (char *)iovec[i].iov_base + copy_base, copy_len); fragment_size += copy_len; mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len; next_fragment = 1; copy_len = 0; copy_base = 0; i++; continue; /* * If it just fits or is too big, then send out what fits. */ } else { unsigned char *data_ptr; copy_len = min(copy_len, max_packet_size - fragment_size); if( copy_len == max_packet_size ) data_ptr = (unsigned char *)iovec[i].iov_base + copy_base; else { data_ptr = fragmentation_data; memcpy (&fragmentation_data[fragment_size], (unsigned char *)iovec[i].iov_base + copy_base, copy_len); } memcpy (&fragmentation_data[fragment_size], (unsigned char *)iovec[i].iov_base + copy_base, copy_len); mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len; /* * if we're not on the last iovec or the iovec is too large to * fit, then indicate a fragment. This also means that the next * message will have the continuation of this one. */ if ((i < (iov_len - 1)) || ((copy_base + copy_len) < iovec[i].iov_len)) { if (!next_fragment) { next_fragment++; } fragment_continuation = next_fragment; mcast.fragmented = next_fragment++; assert(fragment_continuation != 0); assert(mcast.fragmented != 0); } else { fragment_continuation = 0; } /* * assemble the message and send it */ mcast.msg_count = ++mcast_packed_msg_count; iovecs[0].iov_base = (void *)&mcast; iovecs[0].iov_len = sizeof(struct totempg_mcast); iovecs[1].iov_base = (void *)mcast_packed_msg_lens; iovecs[1].iov_len = mcast_packed_msg_count * sizeof(unsigned short); iovecs[2].iov_base = (void *)data_ptr; iovecs[2].iov_len = max_packet_size; assert (totemmrp_avail() > 0); res = totemmrp_mcast (iovecs, 3, guarantee); if (res == -1) { goto error_exit; } /* * Recalculate counts and indexes for the next. */ mcast_packed_msg_lens[0] = 0; mcast_packed_msg_count = 0; fragment_size = 0; max_packet_size = TOTEMPG_PACKET_SIZE - (sizeof(unsigned short)); /* * If the iovec all fit, go to the next iovec */ if ((copy_base + copy_len) == iovec[i].iov_len) { copy_len = 0; copy_base = 0; i++; /* * Continue with the rest of the current iovec. */ } else { copy_base += copy_len; } } } /* * Bump only if we added message data. This may be zero if * the last buffer just fit into the fragmentation_data buffer * and we were at the last iovec. */ if (mcast_packed_msg_lens[mcast_packed_msg_count]) { mcast_packed_msg_count++; } error_exit: if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&mcast_msg_mutex); } return (res); } /* * Determine if a message of msg_size could be queued */ static int msg_count_send_ok ( int msg_count) { int avail = 0; avail = totemmrp_avail (); totempg_stats.msg_queue_avail = avail; return ((avail - totempg_reserved) > msg_count); } static int byte_count_send_ok ( int byte_count) { unsigned int msg_count = 0; int avail = 0; avail = totemmrp_avail (); msg_count = (byte_count / (totempg_totem_config->net_mtu - sizeof (struct totempg_mcast) - 16)) + 1; return (avail >= msg_count); } static int send_reserve ( int msg_size) { unsigned int msg_count = 0; msg_count = (msg_size / (totempg_totem_config->net_mtu - sizeof (struct totempg_mcast) - 16)) + 1; totempg_reserved += msg_count; totempg_stats.msg_reserved = totempg_reserved; return (msg_count); } static void send_release ( int msg_count) { totempg_reserved -= msg_count; totempg_stats.msg_reserved = totempg_reserved; } int totempg_callback_token_create ( void **handle_out, enum totem_callback_token_type type, int delete, int (*callback_fn) (enum totem_callback_token_type type, const void *), const void *data) { unsigned int res; if (totempg_threaded_mode == 1) { pthread_mutex_lock (&callback_token_mutex); } res = totemmrp_callback_token_create (handle_out, type, delete, callback_fn, data); if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&callback_token_mutex); } return (res); } void totempg_callback_token_destroy ( void *handle_out) { if (totempg_threaded_mode == 1) { pthread_mutex_lock (&callback_token_mutex); } totemmrp_callback_token_destroy (handle_out); if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&callback_token_mutex); } } /* * vi: set autoindent tabstop=4 shiftwidth=4 : */ int totempg_groups_initialize ( - hdb_handle_t *handle, + void **totempg_groups_instance, void (*deliver_fn) ( unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void (*confchg_fn) ( enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)) { struct totempg_group_instance *instance; - unsigned int res; if (totempg_threaded_mode == 1) { pthread_mutex_lock (&totempg_mutex); } - res = hdb_handle_create (&totempg_groups_instance_database, - sizeof (struct totempg_group_instance), handle); - if (res != 0) { + + instance = malloc (sizeof (struct totempg_group_instance)); + if (instance == NULL) { goto error_exit; } - if (*handle > totempg_max_handle) { - totempg_max_handle = *handle; - } - - res = hdb_handle_get (&totempg_groups_instance_database, *handle, - (void *)&instance); - if (res != 0) { - goto error_destroy; - } - instance->deliver_fn = deliver_fn; instance->confchg_fn = confchg_fn; instance->groups = 0; instance->groups_cnt = 0; instance->q_level = QB_LOOP_MED; - - - hdb_handle_put (&totempg_groups_instance_database, *handle); + list_init (&instance->list); + list_add (&instance->list, &totempg_groups_list); if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&totempg_mutex); } + *totempg_groups_instance = instance; return (0); -error_destroy: - hdb_handle_destroy (&totempg_groups_instance_database, *handle); error_exit: if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&totempg_mutex); } return (-1); } int totempg_groups_join ( - hdb_handle_t handle, + void *totempg_groups_instance, const struct totempg_group *groups, size_t group_cnt) { - struct totempg_group_instance *instance; + struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance; struct totempg_group *new_groups; unsigned int res; if (totempg_threaded_mode == 1) { pthread_mutex_lock (&totempg_mutex); } - res = hdb_handle_get (&totempg_groups_instance_database, handle, - (void *)&instance); - if (res != 0) { - goto error_exit; - } - new_groups = realloc (instance->groups, sizeof (struct totempg_group) * (instance->groups_cnt + group_cnt)); if (new_groups == 0) { res = ENOMEM; goto error_exit; } memcpy (&new_groups[instance->groups_cnt], groups, group_cnt * sizeof (struct totempg_group)); instance->groups = new_groups; instance->groups_cnt += group_cnt; - hdb_handle_put (&totempg_groups_instance_database, handle); - error_exit: if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&totempg_mutex); } return (res); } int totempg_groups_leave ( - hdb_handle_t handle, + void *totempg_groups_instance, const struct totempg_group *groups, size_t group_cnt) { - struct totempg_group_instance *instance; - unsigned int res; - if (totempg_threaded_mode == 1) { pthread_mutex_lock (&totempg_mutex); } - res = hdb_handle_get (&totempg_groups_instance_database, handle, - (void *)&instance); - if (res != 0) { - goto error_exit; - } - hdb_handle_put (&totempg_groups_instance_database, handle); - -error_exit: if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&totempg_mutex); } - return (res); + return (0); } #define MAX_IOVECS_FROM_APP 32 #define MAX_GROUPS_PER_MSG 32 int totempg_groups_mcast_joined ( - hdb_handle_t handle, + void *totempg_groups_instance, const struct iovec *iovec, unsigned int iov_len, int guarantee) { - struct totempg_group_instance *instance; + struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance; unsigned short group_len[MAX_GROUPS_PER_MSG + 1]; struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP]; int i; unsigned int res; if (totempg_threaded_mode == 1) { pthread_mutex_lock (&totempg_mutex); } - res = hdb_handle_get (&totempg_groups_instance_database, handle, - (void *)&instance); - if (res != 0) { - goto error_exit; - } - /* * Build group_len structure and the iovec_mcast structure */ group_len[0] = instance->groups_cnt; for (i = 0; i < instance->groups_cnt; i++) { group_len[i + 1] = instance->groups[i].group_len; iovec_mcast[i + 1].iov_len = instance->groups[i].group_len; iovec_mcast[i + 1].iov_base = (void *) instance->groups[i].group; } iovec_mcast[0].iov_len = (instance->groups_cnt + 1) * sizeof (unsigned short); iovec_mcast[0].iov_base = group_len; for (i = 0; i < iov_len; i++) { iovec_mcast[i + instance->groups_cnt + 1].iov_len = iovec[i].iov_len; iovec_mcast[i + instance->groups_cnt + 1].iov_base = iovec[i].iov_base; } res = mcast_msg (iovec_mcast, iov_len + instance->groups_cnt + 1, guarantee); - hdb_handle_put (&totempg_groups_instance_database, handle); -error_exit: if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&totempg_mutex); } return (res); } -static void check_q_level(struct totempg_group_instance *instance) +static void check_q_level( + void *totempg_groups_instance) { int32_t old_level; int32_t percent_used = 0; + struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance; old_level = instance->q_level; percent_used = 100 - (totemmrp_avail () * 100 / 800); /*(1024*1024/1500)*/ if (percent_used > 90 && instance->q_level != TOTEM_Q_LEVEL_CRITICAL) { instance->q_level = TOTEM_Q_LEVEL_CRITICAL; } else if (percent_used < 30 && instance->q_level != TOTEM_Q_LEVEL_LOW) { instance->q_level = TOTEM_Q_LEVEL_LOW; } else if (percent_used > 40 && percent_used < 60 && instance->q_level != TOTEM_Q_LEVEL_GOOD) { instance->q_level = TOTEM_Q_LEVEL_GOOD; } else if (percent_used > 70 && percent_used < 80 && instance->q_level != TOTEM_Q_LEVEL_HIGH) { instance->q_level = TOTEM_Q_LEVEL_HIGH; } if (totem_queue_level_changed && old_level != instance->q_level) { totem_queue_level_changed(instance->q_level); } } -void totempg_check_q_level(qb_handle_t handle) +void totempg_check_q_level( + void *totempg_groups_instance) { - struct totempg_group_instance *instance; + struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance; - if (hdb_handle_get (&totempg_groups_instance_database, handle, - (void *)&instance) != 0) { - return; - } check_q_level(instance); - - hdb_handle_put (&totempg_groups_instance_database, handle); } int totempg_groups_joined_reserve ( - hdb_handle_t handle, + void *totempg_groups_instance, const struct iovec *iovec, unsigned int iov_len) { - struct totempg_group_instance *instance; + struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance; unsigned int size = 0; unsigned int i; - unsigned int res; unsigned int reserved = 0; if (totempg_threaded_mode == 1) { pthread_mutex_lock (&totempg_mutex); pthread_mutex_lock (&mcast_msg_mutex); } - res = hdb_handle_get (&totempg_groups_instance_database, handle, - (void *)&instance); - if (res != 0) { - goto error_exit; - } for (i = 0; i < instance->groups_cnt; i++) { size += instance->groups[i].group_len; } for (i = 0; i < iov_len; i++) { size += iovec[i].iov_len; } check_q_level(instance); if (size >= totempg_size_limit) { reserved = -1; - goto error_put; + goto error_exit; } reserved = send_reserve (size); if (msg_count_send_ok (reserved) == 0) { send_release (reserved); reserved = 0; } -error_put: - hdb_handle_put (&totempg_groups_instance_database, handle); error_exit: if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&mcast_msg_mutex); pthread_mutex_unlock (&totempg_mutex); } return (reserved); } int totempg_groups_joined_release (int msg_count) { if (totempg_threaded_mode == 1) { pthread_mutex_lock (&totempg_mutex); pthread_mutex_lock (&mcast_msg_mutex); } send_release (msg_count); if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&mcast_msg_mutex); pthread_mutex_unlock (&totempg_mutex); } return 0; } int totempg_groups_mcast_groups ( - hdb_handle_t handle, + void *totempg_groups_instance, int guarantee, const struct totempg_group *groups, size_t groups_cnt, const struct iovec *iovec, unsigned int iov_len) { - struct totempg_group_instance *instance; unsigned short group_len[MAX_GROUPS_PER_MSG + 1]; struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP]; int i; unsigned int res; if (totempg_threaded_mode == 1) { pthread_mutex_lock (&totempg_mutex); } - res = hdb_handle_get (&totempg_groups_instance_database, handle, - (void *)&instance); - if (res != 0) { - goto error_exit; - } /* * Build group_len structure and the iovec_mcast structure */ group_len[0] = groups_cnt; for (i = 0; i < groups_cnt; i++) { group_len[i + 1] = groups[i].group_len; iovec_mcast[i + 1].iov_len = groups[i].group_len; iovec_mcast[i + 1].iov_base = (void *) groups[i].group; } iovec_mcast[0].iov_len = (groups_cnt + 1) * sizeof (unsigned short); iovec_mcast[0].iov_base = group_len; for (i = 0; i < iov_len; i++) { iovec_mcast[i + groups_cnt + 1].iov_len = iovec[i].iov_len; iovec_mcast[i + groups_cnt + 1].iov_base = iovec[i].iov_base; } res = mcast_msg (iovec_mcast, iov_len + groups_cnt + 1, guarantee); - hdb_handle_put (&totempg_groups_instance_database, handle); - -error_exit: if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&totempg_mutex); } return (res); } /* * Returns -1 if error, 0 if can't send, 1 if can send the message */ int totempg_groups_send_ok_groups ( - hdb_handle_t handle, + void *totempg_groups_instance, const struct totempg_group *groups, size_t groups_cnt, const struct iovec *iovec, unsigned int iov_len) { - struct totempg_group_instance *instance; unsigned int size = 0; unsigned int i; unsigned int res; if (totempg_threaded_mode == 1) { pthread_mutex_lock (&totempg_mutex); } - res = hdb_handle_get (&totempg_groups_instance_database, handle, - (void *)&instance); - if (res != 0) { - goto error_exit; - } for (i = 0; i < groups_cnt; i++) { size += groups[i].group_len; } for (i = 0; i < iov_len; i++) { size += iovec[i].iov_len; } res = msg_count_send_ok (size); - hdb_handle_put (&totempg_groups_instance_database, handle); -error_exit: if (totempg_threaded_mode == 1) { pthread_mutex_unlock (&totempg_mutex); } return (res); } int totempg_ifaces_get ( unsigned int nodeid, struct totem_ip_address *interfaces, char ***status, unsigned int *iface_count) { int res; res = totemmrp_ifaces_get ( nodeid, interfaces, status, iface_count); return (res); } void totempg_event_signal (enum totem_event_type type, int value) { totemmrp_event_signal (type, value); } void* totempg_get_stats (void) { return &totempg_stats; } int totempg_crypto_set ( unsigned int type) { int res; res = totemmrp_crypto_set ( type); return (res); } int totempg_ring_reenable (void) { int res; res = totemmrp_ring_reenable (); return (res); } const char *totempg_ifaces_print (unsigned int nodeid) { static char iface_string[256 * INTERFACE_MAX]; char one_iface[64]; struct totem_ip_address interfaces[INTERFACE_MAX]; char **status; unsigned int iface_count; unsigned int i; int res; iface_string[0] = '\0'; res = totempg_ifaces_get (nodeid, interfaces, &status, &iface_count); if (res == -1) { return ("no interface found for nodeid"); } for (i = 0; i < iface_count; i++) { sprintf (one_iface, "r(%d) ip(%s) ", i, totemip_print (&interfaces[i])); strcat (iface_string, one_iface); } return (iface_string); } unsigned int totempg_my_nodeid_get (void) { return (totemmrp_my_nodeid_get()); } int totempg_my_family_get (void) { return (totemmrp_my_family_get()); } extern void totempg_service_ready_register ( void (*totem_service_ready) (void)) { totemmrp_service_ready_register (totem_service_ready); } void totempg_queue_level_register_callback (totem_queue_level_changed_fn fn) { totem_queue_level_changed = fn; } extern int totempg_member_add ( const struct totem_ip_address *member, int ring_no); extern int totempg_member_remove ( const struct totem_ip_address *member, int ring_no); void totempg_threaded_mode_enable (void) { totempg_threaded_mode = 1; totemmrp_threaded_mode_enable (); } diff --git a/include/corosync/totem/totempg.h b/include/corosync/totem/totempg.h index 30ccfe46..9fda82ea 100644 --- a/include/corosync/totem/totempg.h +++ b/include/corosync/totem/totempg.h @@ -1,190 +1,188 @@ /* * Copyright (c) 2003-2005 MontaVista Software, Inc. * Copyright (c) 2006-2007, 2009 Red Hat, Inc. * * All rights reserved. * * Author: Steven Dake (sdake@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ /** * @file * Totem Single Ring Protocol * * depends on poll abstraction, POSIX, IPV4 */ #ifndef TOTEMPG_H_DEFINED #define TOTEMPG_H_DEFINED #ifdef __cplusplus extern "C" { #endif #include #include "totem.h" -#include #include struct totempg_group { const void *group; size_t group_len; }; #define TOTEMPG_AGREED 0 #define TOTEMPG_SAFE 1 /** * Initialize the totem process groups abstraction */ extern int totempg_initialize ( qb_loop_t* poll_handle, struct totem_config *totem_config ); extern void totempg_finalize (void); extern int totempg_callback_token_create (void **handle_out, enum totem_callback_token_type type, int delete, int (*callback_fn) (enum totem_callback_token_type type, const void *), const void *data); extern void totempg_callback_token_destroy (void *handle); /** * Initialize a groups instance */ extern int totempg_groups_initialize ( - hdb_handle_t *handle, + void **instance, void (*deliver_fn) ( unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void (*confchg_fn) ( enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)); -extern int totempg_groups_finalize ( - hdb_handle_t handle); +extern int totempg_groups_finalize (void *instance); extern int totempg_groups_join ( - hdb_handle_t handle, + void *instance, const struct totempg_group *groups, size_t group_cnt); extern int totempg_groups_leave ( - hdb_handle_t handle, + void *instance, const struct totempg_group *groups, size_t group_cnt); extern int totempg_groups_mcast_joined ( - hdb_handle_t handle, + void *instance, const struct iovec *iovec, unsigned int iov_len, int guarantee); extern int totempg_groups_joined_reserve ( - hdb_handle_t handle, + void *instance, const struct iovec *iovec, unsigned int iov_len); extern int totempg_groups_joined_release ( int msg_count); extern int totempg_groups_mcast_groups ( - hdb_handle_t handle, + void *instance, int guarantee, const struct totempg_group *groups, size_t groups_cnt, const struct iovec *iovec, unsigned int iov_len); extern int totempg_groups_send_ok_groups ( - hdb_handle_t handle, + void *instance, const struct totempg_group *groups, size_t groups_cnt, const struct iovec *iovec, unsigned int iov_len); extern int totempg_ifaces_get ( unsigned int nodeid, struct totem_ip_address *interfaces, char ***status, unsigned int *iface_count); extern void* totempg_get_stats (void); void totempg_event_signal (enum totem_event_type type, int value); extern const char *totempg_ifaces_print (unsigned int nodeid); extern unsigned int totempg_my_nodeid_get (void); extern int totempg_my_family_get (void); extern int totempg_crypto_set (unsigned int type); extern int totempg_ring_reenable (void); extern void totempg_service_ready_register ( void (*totem_service_ready) (void)); extern int totempg_member_add ( const struct totem_ip_address *member, int ring_no); extern int totempg_member_remove ( const struct totem_ip_address *member, int ring_no); enum totem_q_level { TOTEM_Q_LEVEL_LOW, TOTEM_Q_LEVEL_GOOD, TOTEM_Q_LEVEL_HIGH, TOTEM_Q_LEVEL_CRITICAL }; -void totempg_check_q_level(hdb_handle_t handle); +void totempg_check_q_level(void *instance); typedef void (*totem_queue_level_changed_fn) (enum totem_q_level level); extern void totempg_queue_level_register_callback (totem_queue_level_changed_fn); #ifdef __cplusplus } #endif #endif /* TOTEMPG_H_DEFINED */