diff --git a/exec/main.c b/exec/main.c index d427cbcd..cd680ecc 100644 --- a/exec/main.c +++ b/exec/main.c @@ -1,924 +1,924 @@ /* * Copyright (c) 2002-2006 MontaVista Software, Inc. * Copyright (c) 2006-2009 Red Hat, Inc. * * All rights reserved. * * Author: Steven Dake (sdake@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "quorum.h" #include "totemsrp.h" #include "mempool.h" #include "mainconfig.h" #include "totemconfig.h" #include "main.h" #include "sync.h" #include "tlist.h" #include "coroipcs.h" #include "timer.h" #include "util.h" #include "apidef.h" #include "service.h" #include "version.h" LOGSYS_DECLARE_SYSTEM ("corosync", LOG_MODE_OUTPUT_STDERR | LOG_MODE_THREADED | LOG_MODE_FORK, NULL, LOG_DAEMON, NULL, 1000000); LOGSYS_DECLARE_SUBSYS ("MAIN", LOG_INFO); #define SERVER_BACKLOG 5 static int sched_priority = 0; static unsigned int service_count = 32; #if defined(HAVE_PTHREAD_SPIN_LOCK) static pthread_spinlock_t serialize_spin; #else static pthread_mutex_t serialize_mutex = PTHREAD_MUTEX_INITIALIZER; #endif static struct totem_logging_configuration totem_logging_configuration; static char delivery_data[MESSAGE_SIZE_MAX]; static int num_config_modules; static struct config_iface_ver0 *config_modules[MAX_DYNAMIC_SERVICES]; static struct objdb_iface_ver0 *objdb = NULL; static struct corosync_api_v1 *api = NULL; static struct main_config main_config; unsigned long long *(*main_clm_get_by_nodeid) (unsigned int node_id); hdb_handle_t corosync_poll_handle; static void sigusr2_handler (int num) { int i; for (i = 0; ais_service[i]; i++) { if (ais_service[i]->exec_dump_fn) { ais_service[i]->exec_dump_fn (); } } } static void *corosync_exit (void *arg) __attribute__((__noreturn__)); static void *corosync_exit (void *arg) { if (api) { corosync_service_unlink_all (api); } #ifdef DEBUG_MEMPOOL int stats_inuse[MEMPOOL_GROUP_SIZE]; int stats_avail[MEMPOOL_GROUP_SIZE]; int stats_memoryused[MEMPOOL_GROUP_SIZE]; int i; mempool_getstats (stats_inuse, stats_avail, stats_memoryused); log_printf (LOG_LEVEL_DEBUG, "Memory pools:\n"); for (i = 0; i < MEMPOOL_GROUP_SIZE; i++) { log_printf (LOG_LEVEL_DEBUG, "order %d size %d inuse %d avail %d memory used %d\n", i, 1<name = ais_service[ais_service_index]->name; callbacks->sync_init = ais_service[ais_service_index]->sync_init; callbacks->sync_process = ais_service[ais_service_index]->sync_process; callbacks->sync_activate = ais_service[ais_service_index]->sync_activate; callbacks->sync_abort = ais_service[ais_service_index]->sync_abort; return (0); } static struct memb_ring_id corosync_ring_id; static void confchg_fn ( enum totem_configuration_type configuration_type, - unsigned int *member_list, int member_list_entries, - unsigned int *left_list, int left_list_entries, - unsigned int *joined_list, int joined_list_entries, - struct memb_ring_id *ring_id) + const unsigned int *member_list, size_t member_list_entries, + const unsigned int *left_list, size_t left_list_entries, + const unsigned int *joined_list, size_t joined_list_entries, + const struct memb_ring_id *ring_id) { int i; serialize_lock (); memcpy (&corosync_ring_id, ring_id, sizeof (struct memb_ring_id)); /* * Call configuration change for all services */ for (i = 0; i < service_count; i++) { if (ais_service[i] && ais_service[i]->confchg_fn) { ais_service[i]->confchg_fn (configuration_type, member_list, member_list_entries, left_list, left_list_entries, joined_list, joined_list_entries, ring_id); } } serialize_unlock (); } static void priv_drop (void) { return; /* TODO: we are still not dropping privs */ setuid (main_config.uid); setegid (main_config.gid); } static void corosync_mempool_init (void) { int res; res = mempool_init (pool_sizes); if (res == ENOMEM) { log_printf (LOG_LEVEL_ERROR, "Couldn't allocate memory pools, not enough memory"); corosync_exit_error (AIS_DONE_MEMPOOL_INIT); } } static void corosync_tty_detach (void) { int fd; /* * Disconnect from TTY if this is not a debug run */ switch (fork ()) { case -1: corosync_exit_error (AIS_DONE_FORK); break; case 0: /* * child which is disconnected, run this process */ /* setset(); close (0); close (1); close (2); */ break; default: exit (0); break; } /* Create new session */ (void)setsid(); /* * Map stdin/out/err to /dev/null. */ fd = open("/dev/null", O_RDWR); if (fd >= 0) { /* dup2 to 0 / 1 / 2 (stdin / stdout / stderr) */ dup2(fd, STDIN_FILENO); /* 0 */ dup2(fd, STDOUT_FILENO); /* 1 */ dup2(fd, STDERR_FILENO); /* 2 */ /* Should be 0, but just in case it isn't... */ if (fd > 2) close(fd); } } static void corosync_setscheduler (void) { #if ! defined(TS_CLASS) && (defined(COROSYNC_BSD) || defined(COROSYNC_LINUX) || defined(COROSYNC_SOLARIS)) struct sched_param sched_param; int res; sched_priority = sched_get_priority_max (SCHED_RR); if (sched_priority != -1) { sched_param.sched_priority = sched_priority; res = sched_setscheduler (0, SCHED_RR, &sched_param); if (res == -1) { log_printf (LOG_LEVEL_WARNING, "Could not set SCHED_RR at priority %d: %s\n", sched_param.sched_priority, strerror (errno)); } } else { log_printf (LOG_LEVEL_WARNING, "Could not get maximum scheduler priority: %s\n", strerror (errno)); sched_priority = 0; } #else log_printf(LOG_LEVEL_WARNING, "Scheduler priority left to default value (no OS support)\n"); #endif } static void corosync_mlockall (void) { #if !defined(COROSYNC_BSD) int res; #endif struct rlimit rlimit; rlimit.rlim_cur = RLIM_INFINITY; rlimit.rlim_max = RLIM_INFINITY; #ifndef COROSYNC_SOLARIS setrlimit (RLIMIT_MEMLOCK, &rlimit); #else setrlimit (RLIMIT_VMEM, &rlimit); #endif #if defined(COROSYNC_BSD) /* under FreeBSD a process with locked page cannot call dlopen * code disabled until FreeBSD bug i386/93396 was solved */ log_printf (LOG_LEVEL_WARNING, "Could not lock memory of service to avoid page faults\n"); #else res = mlockall (MCL_CURRENT | MCL_FUTURE); if (res == -1) { log_printf (LOG_LEVEL_WARNING, "Could not lock memory of service to avoid page faults: %s\n", strerror (errno)); }; #endif } static void deliver_fn ( unsigned int nodeid, struct iovec *iovec, int iov_len, int endian_conversion_required) { mar_req_header_t *header; int pos = 0; int i; int service; int fn_id; /* * Build buffer without iovecs to make processing easier * This is only used for messages which are multicast with iovecs * and self-delivered. All other mechanisms avoid the copy. */ if (iov_len > 1) { for (i = 0; i < iov_len; i++) { memcpy (&delivery_data[pos], iovec[i].iov_base, iovec[i].iov_len); pos += iovec[i].iov_len; assert (pos < MESSAGE_SIZE_MAX); } header = (mar_req_header_t *)delivery_data; } else { header = (mar_req_header_t *)iovec[0].iov_base; } if (endian_conversion_required) { header->id = swab32 (header->id); header->size = swab32 (header->size); } // assert(iovec->iov_len == header->size); /* * Call the proper executive handler */ service = header->id >> 16; fn_id = header->id & 0xffff; if (!ais_service[service]) return; serialize_lock(); if (endian_conversion_required) { assert(ais_service[service]->exec_engine[fn_id].exec_endian_convert_fn != NULL); ais_service[service]->exec_engine[fn_id].exec_endian_convert_fn (header); } ais_service[service]->exec_engine[fn_id].exec_handler_fn (header, nodeid); serialize_unlock(); } void main_get_config_modules(struct config_iface_ver0 ***modules, int *num) { *modules = config_modules; *num = num_config_modules; } int main_mcast ( struct iovec *iovec, int iov_len, unsigned int guarantee) { return (totempg_groups_mcast_joined (corosync_group_handle, iovec, iov_len, guarantee)); } int message_source_is_local (const mar_message_source_t *source) { int ret = 0; assert (source != NULL); if (source->nodeid == totempg_my_nodeid_get ()) { ret = 1; } return ret; } void message_source_set ( mar_message_source_t *source, void *conn) { assert ((source != NULL) && (conn != NULL)); memset (source, 0, sizeof (mar_message_source_t)); source->nodeid = totempg_my_nodeid_get (); source->conn = conn; } /* * Provides the glue from corosync to the IPC Service */ static int corosync_private_data_size_get (unsigned int service) { return (ais_service[service]->private_data_size); } static coroipcs_init_fn_lvalue corosync_init_fn_get (unsigned int service) { return (ais_service[service]->lib_init_fn); } static coroipcs_exit_fn_lvalue corosync_exit_fn_get (unsigned int service) { return (ais_service[service]->lib_exit_fn); } static coroipcs_handler_fn_lvalue corosync_handler_fn_get (unsigned int service, unsigned int id) { return (ais_service[service]->lib_engine[id].lib_handler_fn); } static int corosync_security_valid (int euid, int egid) { if (euid == 0 || egid == 0) { return (1); } if (euid == main_config.uid || egid == main_config.gid) { return (1); } return (0); } static int corosync_service_available (unsigned int service) { return (ais_service[service]); } static int corosync_response_size_get (unsigned int service, unsigned int id) { return (ais_service[service]->lib_engine[id].response_size); } static int corosync_response_id_get (unsigned int service, unsigned int id) { return (ais_service[service]->lib_engine[id].response_id); } struct sending_allowed_private_data_struct { int reserved_msgs; }; static int corosync_sending_allowed ( unsigned int service, unsigned int id, void *msg, void *sending_allowed_private_data) { struct sending_allowed_private_data_struct *pd = (struct sending_allowed_private_data_struct *)sending_allowed_private_data; struct iovec reserve_iovec; mar_req_header_t *header = (mar_req_header_t *)msg; int sending_allowed; reserve_iovec.iov_base = (char *)header; reserve_iovec.iov_len = header->size; pd->reserved_msgs = totempg_groups_joined_reserve ( corosync_group_handle, &reserve_iovec, 1); sending_allowed = (corosync_quorum_is_quorate() == 1 || ais_service[service]->allow_inquorate == CS_LIB_ALLOW_INQUORATE) && ((ais_service[service]->lib_engine[id].flow_control == CS_LIB_FLOW_CONTROL_NOT_REQUIRED) || ((ais_service[service]->lib_engine[id].flow_control == CS_LIB_FLOW_CONTROL_REQUIRED) && (pd->reserved_msgs) && (sync_in_process() == 0))); return (sending_allowed); } static void corosync_sending_allowed_release (void *sending_allowed_private_data) { struct sending_allowed_private_data_struct *pd = (struct sending_allowed_private_data_struct *)sending_allowed_private_data; totempg_groups_joined_release (pd->reserved_msgs); } static int ipc_subsys_id = -1; static void ipc_log_printf (const char *format, ...) { va_list ap; va_start (ap, format); _logsys_log_printf (ipc_subsys_id, __FUNCTION__, __FILE__, __LINE__, LOG_LEVEL_ERROR, format, ap); va_end (ap); } static int corosync_poll_handler_accept ( hdb_handle_t handle, int fd, int revent, void *context) { return (coroipcs_handler_accept (fd, revent, context)); } static int corosync_poll_handler_dispatch ( hdb_handle_t handle, int fd, int revent, void *context) { return (coroipcs_handler_dispatch (fd, revent, context)); } static void corosync_poll_accept_add ( int fd) { poll_dispatch_add (corosync_poll_handle, fd, POLLIN|POLLNVAL, 0, corosync_poll_handler_accept); } static void corosync_poll_dispatch_add ( int fd, void *context) { poll_dispatch_add (corosync_poll_handle, fd, POLLIN|POLLNVAL, context, corosync_poll_handler_dispatch); } static void corosync_poll_dispatch_modify ( int fd, int events) { poll_dispatch_modify (corosync_poll_handle, fd, events, corosync_poll_handler_dispatch); } struct coroipcs_init_state ipc_init_state = { .socket_name = IPC_SOCKET_NAME, .malloc = malloc, .free = free, .log_printf = ipc_log_printf, .security_valid = corosync_security_valid, .service_available = corosync_service_available, .private_data_size_get = corosync_private_data_size_get, .serialize_lock = serialize_lock, .serialize_unlock = serialize_unlock, .sending_allowed = corosync_sending_allowed, .sending_allowed_release = corosync_sending_allowed_release, .response_size_get = corosync_response_size_get, .response_id_get = corosync_response_id_get, .poll_accept_add = corosync_poll_accept_add, .poll_dispatch_add = corosync_poll_dispatch_add, .poll_dispatch_modify = corosync_poll_dispatch_modify, .init_fn_get = corosync_init_fn_get, .exit_fn_get = corosync_exit_fn_get, .handler_fn_get = corosync_handler_fn_get }; int main (int argc, char **argv) { const char *error_string; struct totem_config totem_config; hdb_handle_t objdb_handle; hdb_handle_t config_handle; unsigned int config_version = 0; void *objdb_p; struct config_iface_ver0 *config; void *config_p; const char *config_iface_init; char *config_iface; char *iface; int res, ch; int background, setprio; #if defined(HAVE_PTHREAD_SPIN_LOCK) pthread_spin_init (&serialize_spin, 0); #endif /* default configuration */ background = 1; setprio = 1; while ((ch = getopt (argc, argv, "fp")) != EOF) { switch (ch) { case 'f': background = 0; logsys_config_mode_set (LOG_MODE_OUTPUT_STDERR|LOG_MODE_THREADED|LOG_MODE_FORK); break; case 'p': setprio = 0; break; default: fprintf(stderr, \ "usage:\n"\ " -f : Start application in foreground.\n"\ " -p : Do not set process priority. \n"); return EXIT_FAILURE; } } if (background) corosync_tty_detach (); log_printf (LOG_LEVEL_NOTICE, "Corosync Executive Service RELEASE '%s'\n", RELEASE_VERSION); log_printf (LOG_LEVEL_NOTICE, "Copyright (C) 2002-2006 MontaVista Software, Inc and contributors.\n"); log_printf (LOG_LEVEL_NOTICE, "Copyright (C) 2006-2008 Red Hat, Inc.\n"); (void)signal (SIGINT, sigintr_handler); (void)signal (SIGUSR2, sigusr2_handler); (void)signal (SIGSEGV, sigsegv_handler); (void)signal (SIGABRT, sigabrt_handler); (void)signal (SIGQUIT, sigquit_handler); #if MSG_NOSIGNAL == 0 (void)signal (SIGPIPE, SIG_IGN); #endif corosync_timer_init ( serialize_lock, serialize_unlock, sched_priority); log_printf (LOG_LEVEL_NOTICE, "Corosync Executive Service: started and ready to provide service.\n"); corosync_poll_handle = poll_create (); /* * Load the object database interface */ res = lcr_ifact_reference ( &objdb_handle, "objdb", 0, &objdb_p, 0); if (res == -1) { log_printf (LOG_LEVEL_ERROR, "Corosync Executive couldn't open configuration object database component.\n"); corosync_exit_error (AIS_DONE_OBJDB); } objdb = (struct objdb_iface_ver0 *)objdb_p; objdb->objdb_init (); /* * Initialize the corosync_api_v1 definition */ apidef_init (objdb); api = apidef_get (); num_config_modules = 0; /* * Bootstrap in the default configuration parser or use * the corosync default built in parser if the configuration parser * isn't overridden */ config_iface_init = getenv("COROSYNC_DEFAULT_CONFIG_IFACE"); if (!config_iface_init) { config_iface_init = "corosync_parser"; } /* Make a copy so we can deface it with strtok */ config_iface = strdup(config_iface_init); iface = strtok(config_iface, ":"); while (iface) { res = lcr_ifact_reference ( &config_handle, iface, config_version, &config_p, 0); config = (struct config_iface_ver0 *)config_p; if (res == -1) { log_printf (LOG_LEVEL_ERROR, "Corosync Executive couldn't open configuration component '%s'\n", iface); corosync_exit_error (AIS_DONE_MAINCONFIGREAD); } res = config->config_readconfig(objdb, &error_string); if (res == -1) { log_printf (LOG_LEVEL_ERROR, "%s", error_string); corosync_exit_error (AIS_DONE_MAINCONFIGREAD); } log_printf (LOG_LEVEL_NOTICE, "%s", error_string); config_modules[num_config_modules++] = config; iface = strtok(NULL, ":"); } if (config_iface) free(config_iface); res = corosync_main_config_read (objdb, &error_string, &main_config); if (res == -1) { log_printf (LOG_LEVEL_ERROR, "%s", error_string); corosync_exit_error (AIS_DONE_MAINCONFIGREAD); } res = totem_config_read (objdb, &totem_config, &error_string); if (res == -1) { log_printf (LOG_LEVEL_ERROR, "%s", error_string); corosync_exit_error (AIS_DONE_MAINCONFIGREAD); } res = totem_config_keyread (objdb, &totem_config, &error_string); if (res == -1) { log_printf (LOG_LEVEL_ERROR, "%s", error_string); corosync_exit_error (AIS_DONE_MAINCONFIGREAD); } res = totem_config_validate (&totem_config, &error_string); if (res == -1) { log_printf (LOG_LEVEL_ERROR, "%s", error_string); corosync_exit_error (AIS_DONE_MAINCONFIGREAD); } /* * Set round robin realtime scheduling with priority 99 * Lock all memory to avoid page faults which may interrupt * application healthchecking */ if (setprio) corosync_setscheduler (); corosync_mlockall (); totem_config.totem_logging_configuration = totem_logging_configuration; totem_config.totem_logging_configuration.log_subsys_id = _logsys_subsys_create ("TOTEM", LOG_INFO); totem_config.totem_logging_configuration.log_level_security = LOG_LEVEL_SECURITY; totem_config.totem_logging_configuration.log_level_error = LOG_LEVEL_ERROR; totem_config.totem_logging_configuration.log_level_warning = LOG_LEVEL_WARNING; totem_config.totem_logging_configuration.log_level_notice = LOG_LEVEL_NOTICE; totem_config.totem_logging_configuration.log_level_debug = LOG_LEVEL_DEBUG; totem_config.totem_logging_configuration.log_printf = _logsys_log_printf; /* * Sleep for a while to let other nodes in the cluster * understand that this node has been away (if it was * an corosync restart). */ // TODO what is this hack for? usleep(totem_config.token_timeout * 2000); /* * if totempg_initialize doesn't have root priveleges, it cannot * bind to a specific interface. This only matters if * there is more then one interface in a system, so * in this case, only a warning is printed */ /* * Join multicast group and setup delivery * and configuration change functions */ totempg_initialize ( corosync_poll_handle, &totem_config); totempg_groups_initialize ( &corosync_group_handle, deliver_fn, confchg_fn); totempg_groups_join ( corosync_group_handle, &corosync_group, 1); /* * This must occur after totempg is initialized because "this_ip" must be set */ res = corosync_service_defaults_link_and_init (api); if (res == -1) { log_printf (LOG_LEVEL_ERROR, "Could not initialize default services\n"); corosync_exit_error (AIS_DONE_INIT_SERVICES); } sync_register (corosync_sync_callbacks_retrieve, corosync_sync_completed); /* * Drop root privleges to user 'ais' * TODO: Don't really need full root capabilities; * needed capabilities are: * CAP_NET_RAW (bindtodevice) * CAP_SYS_NICE (setscheduler) * CAP_IPC_LOCK (mlockall) */ priv_drop (); corosync_mempool_init (); ipc_subsys_id = _logsys_subsys_create ("IPC", LOG_INFO); ipc_init_state.sched_priority = sched_priority; coroipcs_ipc_init (&ipc_init_state); /* * Start main processing loop */ poll_run (corosync_poll_handle); return EXIT_SUCCESS; } diff --git a/exec/sync.c b/exec/sync.c index 0173afcd..68cdcc44 100644 --- a/exec/sync.c +++ b/exec/sync.c @@ -1,451 +1,451 @@ /* * Copyright (c) 2005-2006 MontaVista Software, Inc. * Copyright (c) 2006-2007, 2009 Red Hat, Inc. * * All rights reserved. * * Author: Steven Dake (sdake@redhat.com) * * This software licensed under BSD license, the text of which follows: - * + * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "quorum.h" #include "sync.h" LOGSYS_DECLARE_SUBSYS ("SYNC", LOG_INFO); #define MESSAGE_REQ_SYNC_BARRIER 0 struct barrier_data { unsigned int nodeid; int completed; }; -static struct memb_ring_id *sync_ring_id; +static const struct memb_ring_id *sync_ring_id; static int vsf_none = 0; static int (*sync_callbacks_retrieve) (int sync_id, struct sync_callbacks *callack); static struct sync_callbacks sync_callbacks; static int sync_processing = 0; static void (*sync_synchronization_completed) (void); static int sync_recovery_index = 0; static void *sync_callback_token_handle = 0; static struct barrier_data barrier_data_confchg[PROCESSOR_COUNT_MAX]; static size_t barrier_data_confchg_entries; static struct barrier_data barrier_data_process[PROCESSOR_COUNT_MAX]; static struct openais_vsf_iface_ver0 *vsf_iface; static int sync_barrier_send (struct memb_ring_id *ring_id); static int sync_start_process (enum totem_callback_token_type type, void *data); static void sync_service_init (struct memb_ring_id *ring_id); static int sync_service_process (enum totem_callback_token_type type, void *data); static void sync_deliver_fn ( unsigned int nodeid, struct iovec *iovec, int iov_len, int endian_conversion_required); static void sync_confchg_fn ( enum totem_configuration_type configuration_type, - unsigned int *member_list, int member_list_entries, - unsigned int *left_list, int left_list_entries, - unsigned int *joined_list, int joined_list_entries, - struct memb_ring_id *ring_id); + const unsigned int *member_list, size_t member_list_entries, + const unsigned int *left_list, size_t left_list_entries, + const unsigned int *joined_list, size_t joined_list_entries, + const struct memb_ring_id *ring_id); static void sync_primary_callback_fn ( - unsigned int *view_list, - int view_list_entries, + const unsigned int *view_list, + size_t view_list_entries, int primary_designated, - struct memb_ring_id *ring_id); + const struct memb_ring_id *ring_id); static struct totempg_group sync_group = { .group = "sync", .group_len = 4 }; static hdb_handle_t sync_group_handle; struct req_exec_sync_barrier_start { mar_req_header_t header; struct memb_ring_id ring_id; }; /* * Send a barrier data structure */ static int sync_barrier_send (struct memb_ring_id *ring_id) { struct req_exec_sync_barrier_start req_exec_sync_barrier_start; struct iovec iovec; int res; req_exec_sync_barrier_start.header.size = sizeof (struct req_exec_sync_barrier_start); req_exec_sync_barrier_start.header.id = MESSAGE_REQ_SYNC_BARRIER; memcpy (&req_exec_sync_barrier_start.ring_id, ring_id, sizeof (struct memb_ring_id)); iovec.iov_base = (char *)&req_exec_sync_barrier_start; iovec.iov_len = sizeof (req_exec_sync_barrier_start); res = totempg_groups_mcast_joined (sync_group_handle, &iovec, 1, TOTEMPG_AGREED); return (res); } -static void sync_start_init (struct memb_ring_id *ring_id) +static void sync_start_init (const struct memb_ring_id *ring_id) { totempg_callback_token_create ( &sync_callback_token_handle, TOTEM_CALLBACK_TOKEN_SENT, 0, /* don't delete after callback */ sync_start_process, (void *)ring_id); } static void sync_service_init (struct memb_ring_id *ring_id) { sync_callbacks.sync_init (); totempg_callback_token_destroy (&sync_callback_token_handle); /* * Create the token callback for the processing */ totempg_callback_token_create ( &sync_callback_token_handle, TOTEM_CALLBACK_TOKEN_SENT, 0, /* don't delete after callback */ sync_service_process, (void *)ring_id); } static int sync_start_process (enum totem_callback_token_type type, void *data) { int res; struct memb_ring_id *ring_id = (struct memb_ring_id *)data; res = sync_barrier_send (ring_id); if (res == 0) { /* * Delete the token callback for the barrier */ totempg_callback_token_destroy (&sync_callback_token_handle); } return (0); } static void sync_callbacks_load (void) { int res; for (;;) { res = sync_callbacks_retrieve (sync_recovery_index, &sync_callbacks); /* * No more service handlers have sync callbacks at this time ` */ if (res == -1) { sync_processing = 0; break; } sync_recovery_index += 1; if (sync_callbacks.sync_init) { break; } } } static int sync_service_process (enum totem_callback_token_type type, void *data) { int res; struct memb_ring_id *ring_id = (struct memb_ring_id *)data; - + /* * If process operation not from this ring id, then ignore it and stop * processing */ if (memcmp (ring_id, sync_ring_id, sizeof (struct memb_ring_id)) != 0) { return (0); } - + /* * If process returns 0, then its time to activate * and start the next service's synchronization */ res = sync_callbacks.sync_process (); if (res != 0) { return (0); } totempg_callback_token_destroy (&sync_callback_token_handle); sync_start_init (ring_id); return (0); } int sync_register ( int (*callbacks_retrieve) (int sync_id, struct sync_callbacks *callack), void (*synchronization_completed) (void)) { unsigned int res; res = totempg_groups_initialize ( &sync_group_handle, sync_deliver_fn, sync_confchg_fn); if (res == -1) { log_printf (LOG_LEVEL_ERROR, "Couldn't initialize groups interface.\n"); return (-1); } res = totempg_groups_join ( sync_group_handle, &sync_group, 1); if (res == -1) { log_printf (LOG_LEVEL_ERROR, "Couldn't join group.\n"); return (-1); } sync_callbacks_retrieve = callbacks_retrieve; sync_synchronization_completed = synchronization_completed; return (0); } static void sync_primary_callback_fn ( const unsigned int *view_list, size_t view_list_entries, int primary_designated, - struct memb_ring_id *ring_id) + const struct memb_ring_id *ring_id) { int i; if (primary_designated) { log_printf (LOG_LEVEL_DEBUG, "This node is within the primary component and will provide service.\n"); } else { log_printf (LOG_LEVEL_DEBUG, "This node is within the non-primary component and will NOT provide any services.\n"); return; } /* * Execute configuration change for synchronization service */ sync_processing = 1; totempg_callback_token_destroy (&sync_callback_token_handle); sync_recovery_index = 0; memset (&barrier_data_confchg, 0, sizeof (barrier_data_confchg)); for (i = 0; i < view_list_entries; i++) { barrier_data_confchg[i].nodeid = view_list[i]; barrier_data_confchg[i].completed = 0; } memcpy (barrier_data_process, barrier_data_confchg, sizeof (barrier_data_confchg)); barrier_data_confchg_entries = view_list_entries; sync_start_init (sync_ring_id); } static struct memb_ring_id deliver_ring_id; static void sync_endian_convert (struct req_exec_sync_barrier_start *req_exec_sync_barrier_start) { totemip_copy_endian_convert(&req_exec_sync_barrier_start->ring_id.rep, &req_exec_sync_barrier_start->ring_id.rep); req_exec_sync_barrier_start->ring_id.seq = swab64 (req_exec_sync_barrier_start->ring_id.seq); } static void sync_deliver_fn ( unsigned int nodeid, struct iovec *iovec, int iov_len, int endian_conversion_required) { struct req_exec_sync_barrier_start *req_exec_sync_barrier_start = (struct req_exec_sync_barrier_start *)iovec[0].iov_base; unsigned int barrier_completed; int i; log_printf (LOG_LEVEL_DEBUG, "confchg entries %d\n", barrier_data_confchg_entries); if (endian_conversion_required) { sync_endian_convert (req_exec_sync_barrier_start); } barrier_completed = 1; memcpy (&deliver_ring_id, &req_exec_sync_barrier_start->ring_id, sizeof (struct memb_ring_id)); /* * Is this barrier from this configuration, if not, ignore it */ if (memcmp (&req_exec_sync_barrier_start->ring_id, sync_ring_id, sizeof (struct memb_ring_id)) != 0) { return; } /* * Set completion for source_addr's address */ for (i = 0; i < barrier_data_confchg_entries; i++) { if (nodeid == barrier_data_process[i].nodeid) { barrier_data_process[i].completed = 1; log_printf (LOG_LEVEL_DEBUG, "Barrier Start Recieved From %d\n", barrier_data_process[i].nodeid); break; } } /* * Test if barrier is complete */ for (i = 0; i < barrier_data_confchg_entries; i++) { log_printf (LOG_LEVEL_DEBUG, - "Barrier completion status for nodeid %d = %d. \n", + "Barrier completion status for nodeid %d = %d. \n", barrier_data_process[i].nodeid, barrier_data_process[i].completed); if (barrier_data_process[i].completed == 0) { barrier_completed = 0; } } if (barrier_completed) { log_printf (LOG_LEVEL_DEBUG, "Synchronization barrier completed\n"); } /* * This sync is complete so activate and start next service sync */ if (barrier_completed && sync_callbacks.sync_activate) { sync_callbacks.sync_activate (); - + log_printf (LOG_LEVEL_DEBUG, "Committing synchronization for (%s)\n", sync_callbacks.name); } /* * Start synchronization if the barrier has completed */ if (barrier_completed) { memcpy (barrier_data_process, barrier_data_confchg, sizeof (barrier_data_confchg)); sync_callbacks_load(); /* * if sync service found, execute it */ if (sync_processing && sync_callbacks.sync_init) { log_printf (LOG_LEVEL_DEBUG, "Synchronization actions starting for (%s)\n", sync_callbacks.name); sync_service_init (&deliver_ring_id); } } return; } static void sync_confchg_fn ( enum totem_configuration_type configuration_type, - unsigned int *member_list, int member_list_entries, - unsigned int *left_list, int left_list_entries, - unsigned int *joined_list, int joined_list_entries, - struct memb_ring_id *ring_id) + const unsigned int *member_list, size_t member_list_entries, + const unsigned int *left_list, size_t left_list_entries, + const unsigned int *joined_list, size_t joined_list_entries, + const struct memb_ring_id *ring_id) { sync_ring_id = ring_id; if (configuration_type != TOTEM_CONFIGURATION_REGULAR) { return; } if (sync_processing && sync_callbacks.sync_abort != NULL) { sync_callbacks.sync_abort (); sync_callbacks.sync_activate = NULL; } sync_primary_callback_fn ( member_list, member_list_entries, 1, ring_id); } int sync_in_process (void) { return (sync_processing); } int sync_primary_designated (void) { return (1); } diff --git a/exec/totemmrp.c b/exec/totemmrp.c index d48c0440..22ff37c0 100644 --- a/exec/totemmrp.c +++ b/exec/totemmrp.c @@ -1,232 +1,231 @@ /* * Copyright (c) 2005 MontaVista Software, Inc. - * Copyright (c) 2006-2007 Red Hat, Inc. + * Copyright (c) 2006-2007, 2009 Red Hat, Inc. * * All rights reserved. * * Author: Steven Dake (sdake@redhat.com) * * This software licensed under BSD license, the text of which follows: - * + * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "totemmrp.h" #include "totemsrp.h" hdb_handle_t totemsrp_handle_in; void totemmrp_deliver_fn ( unsigned int nodeid, struct iovec *iovec, int iov_len, int endian_conversion_required); void totemmrp_confchg_fn ( enum totem_configuration_type configuration_type, - unsigned int *member_list, int member_list_entries, - unsigned int *left_list, int left_list_entries, - unsigned int *joined_list, int joined_list_entries, - struct memb_ring_id *ring_id); + const unsigned int *member_list, size_t member_list_entries, + const unsigned int *left_list, size_t left_list_entries, + const unsigned int *joined_list, size_t joined_list_entries, + const struct memb_ring_id *ring_id); void (*pg_deliver_fn) ( unsigned int nodeid, struct iovec *iovec, int iov_len, int endian_conversion_required) = 0; void (*pg_confchg_fn) ( enum totem_configuration_type configuration_type, - unsigned int *member_list, int member_list_entries, - unsigned int *left_list, int left_list_entries, - unsigned int *joined_list, int joined_list_entries, - struct memb_ring_id *ring_id) = 0; + const unsigned int *member_list, size_t member_list_entries, + const unsigned int *left_list, size_t left_list_entries, + const unsigned int *joined_list, size_t joined_list_entries, + const struct memb_ring_id *ring_id) = 0; void totemmrp_deliver_fn ( unsigned int nodeid, struct iovec *iovec, int iov_len, int endian_conversion_required) { pg_deliver_fn (nodeid, iovec, iov_len, endian_conversion_required); } void totemmrp_confchg_fn ( enum totem_configuration_type configuration_type, - unsigned int *member_list, int member_list_entries, - unsigned int *left_list, int left_list_entries, - unsigned int *joined_list, int joined_list_entries, - struct memb_ring_id *ring_id) + const unsigned int *member_list, size_t member_list_entries, + const unsigned int *left_list, size_t left_list_entries, + const unsigned int *joined_list, size_t joined_list_entries, + const struct memb_ring_id *ring_id) { pg_confchg_fn (configuration_type, member_list, member_list_entries, left_list, left_list_entries, joined_list, joined_list_entries, ring_id); } /* * Initialize the totem multiple ring protocol */ int totemmrp_initialize ( hdb_handle_t poll_handle, struct totem_config *totem_config, void (*deliver_fn) ( unsigned int nodeid, struct iovec *iovec, int iov_len, int endian_conversion_required), void (*confchg_fn) ( enum totem_configuration_type configuration_type, - unsigned int *member_list, int member_list_entries, - unsigned int *left_list, int left_list_entries, - unsigned int *joined_list, int joined_list_entries, - struct memb_ring_id *ring_id)) + const unsigned int *member_list, size_t member_list_entries, + const unsigned int *left_list, size_t left_list_entries, + const unsigned int *joined_list, size_t joined_list_entries, + const struct memb_ring_id *ring_id)) { int result; pg_deliver_fn = deliver_fn; pg_confchg_fn = confchg_fn; result = totemsrp_initialize ( poll_handle, &totemsrp_handle_in, totem_config, totemmrp_deliver_fn, totemmrp_confchg_fn); return (result); } void totemmrp_finalize (void) { totemsrp_finalize (totemsrp_handle_in); } /* * Multicast a message */ int totemmrp_mcast ( struct iovec *iovec, int iov_len, int priority) { return totemsrp_mcast (totemsrp_handle_in, iovec, iov_len, priority); } /* * Return number of available messages that can be queued */ int totemmrp_avail (void) { return (totemsrp_avail (totemsrp_handle_in)); } int totemmrp_callback_token_create ( void **handle_out, enum totem_callback_token_type type, int delete, int (*callback_fn) (enum totem_callback_token_type type, void *), void *data) { return totemsrp_callback_token_create (totemsrp_handle_in, handle_out, type, delete, callback_fn, data); } void totemmrp_callback_token_destroy ( void *handle_out) { totemsrp_callback_token_destroy (totemsrp_handle_in, handle_out); } void totemmrp_new_msg_signal (void) { totemsrp_new_msg_signal (totemsrp_handle_in); } int totemmrp_ifaces_get ( unsigned int nodeid, struct totem_ip_address *interfaces, char ***status, unsigned int *iface_count) { int res; res = totemsrp_ifaces_get ( totemsrp_handle_in, nodeid, interfaces, status, iface_count); return (res); } unsigned int totemmrp_my_nodeid_get (void) { return (totemsrp_my_nodeid_get (totemsrp_handle_in)); } int totemmrp_my_family_get (void) { return (totemsrp_my_family_get (totemsrp_handle_in)); } extern int totemmrp_ring_reenable (void) { int res; res = totemsrp_ring_reenable ( totemsrp_handle_in); return (res); } - diff --git a/exec/totemmrp.h b/exec/totemmrp.h index 1a3e5142..72511f17 100644 --- a/exec/totemmrp.h +++ b/exec/totemmrp.h @@ -1,113 +1,113 @@ /* * Copyright (c) 2005 MontaVista Software, Inc. - * Copyright (c) 2006-2007 Red Hat, Inc. + * Copyright (c) 2006-2007, 2009 Red Hat, Inc. * * All rights reserved. * * Author: Steven Dake (sdake@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #ifndef TOTEMMRP_H_DEFINED #define TOTEMMRP_H_DEFINED #include /* * Totem Single Ring Protocol * depends on poll abstraction, POSIX, IPV4 */ /* * Initialize the logger */ extern void totemmrp_log_printf_init ( void (*log_printf) (int , char *, ...), int log_level_security, int log_level_error, int log_level_warning, int log_level_notice, int log_level_debug); /* * Initialize the group messaging interface */ extern int totemmrp_initialize ( hdb_handle_t poll_handle, struct totem_config *totem_config, void (*deliver_fn) ( unsigned int nodeid, struct iovec *iovec, int iov_len, int endian_conversion_required), void (*confchg_fn) ( enum totem_configuration_type configuration_type, - unsigned int *member_list, int member_list_entries, - unsigned int *left_list, int left_list_entries, - unsigned int *joined_list, int joined_list_entries, - struct memb_ring_id *ring_id)); + const unsigned int *member_list, size_t member_list_entries, + const unsigned int *left_list, size_t left_list_entries, + const unsigned int *joined_list, size_t joined_list_entries, + const struct memb_ring_id *ring_id)); extern void totemmrp_finalize (void); /* * Multicast a message */ extern int totemmrp_mcast ( struct iovec *iovec, int iov_len, int priority); /* * Return number of available messages that can be queued */ extern int totemmrp_avail (void); extern int totemmrp_callback_token_create ( void **handle_out, enum totem_callback_token_type type, int delete, int (*callback_fn) (enum totem_callback_token_type type, void *), void *data); extern void totemmrp_callback_token_destroy ( void *handle_out); extern void totemmrp_new_msg_signal (void); extern int totemmrp_ifaces_get ( unsigned int nodeid, struct totem_ip_address *interfaces, char ***status, unsigned int *iface_count); extern unsigned int totemmrp_my_nodeid_get (void); extern int totemmrp_my_family_get (void); extern int totemmrp_ring_reenable (void); #endif /* TOTEMMRP_H_DEFINED */ diff --git a/exec/totempg.c b/exec/totempg.c index b9e97e4d..bdd357f5 100644 --- a/exec/totempg.c +++ b/exec/totempg.c @@ -1,1337 +1,1337 @@ /* * Copyright (c) 2003-2005 MontaVista Software, Inc. * Copyright (c) 2005 OSDL. * Copyright (c) 2006-2009 Red Hat, Inc. * * All rights reserved. * * Author: Steven Dake (sdake@redhat.com) * Author: Mark Haverkamp (markh@osdl.org) * * This software licensed under BSD license, the text of which follows: - * + * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ /* * FRAGMENTATION AND PACKING ALGORITHM: * * Assemble the entire message into one buffer * if full fragment * store fragment into lengths list * for each full fragment * multicast fragment * set length and fragment fields of pg mesage * store remaining multicast into head of fragmentation data and set lens field * * If a message exceeds the maximum packet size allowed by the totem * single ring protocol, the protocol could lose forward progress. * Statically calculating the allowed data amount doesn't work because * the amount of data allowed depends on the number of fragments in * each message. In this implementation, the maximum fragment size * is dynamically calculated for each fragment added to the message. * It is possible for a message to be two bytes short of the maximum * packet size. This occurs when a message or collection of * messages + the mcast header + the lens are two bytes short of the * end of the packet. Since another len field consumes two bytes, the * len field would consume the rest of the packet without room for data. * * One optimization would be to forgo the final len field and determine * it from the size of the udp datagram. Then this condition would no * longer occur. */ /* * ASSEMBLY AND UNPACKING ALGORITHM: * * copy incoming packet into assembly data buffer indexed by current * location of end of fragment * * if not fragmented * deliver all messages in assembly data buffer * else * if msg_count > 1 and fragmented * deliver all messages except last message in assembly data buffer * copy last fragmented section to start of assembly data buffer * else * if msg_count = 1 and fragmented * do nothing * */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "totemmrp.h" #include "totemsrp.h" #define min(a,b) ((a) < (b)) ? a : b struct totempg_mcast_header { short version; short type; }; /* * totempg_mcast structure * * header: Identify the mcast. * fragmented: Set if this message continues into next message * continuation: Set if this message is a continuation from last message * msg_count Indicates how many packed messages are contained * in the mcast. * Also, the size of each packed message and the messages themselves are * appended to the end of this structure when sent. */ struct totempg_mcast { struct totempg_mcast_header header; unsigned char fragmented; unsigned char continuation; unsigned short msg_count; /* * short msg_len[msg_count]; */ /* * data for messages */ }; /* * Maximum packet size for totem pg messages */ #define TOTEMPG_PACKET_SIZE (totempg_totem_config->net_mtu - \ sizeof (struct totempg_mcast)) /* * Local variables used for packing small messages */ static unsigned short mcast_packed_msg_lens[FRAME_SIZE_MAX]; static int mcast_packed_msg_count = 0; static int totempg_reserved = 0; /* * Function and data used to log messages */ static int totempg_log_level_security; static int totempg_log_level_error; static int totempg_log_level_warning; static int totempg_log_level_notice; static int totempg_log_level_debug; static int totempg_subsys_id; static void (*totempg_log_printf) (int subsys_id, const char *function, const char *file, int line, unsigned int level, const char *format, ...) __attribute__((format(printf, 6, 7))); struct totem_config *totempg_totem_config; struct assembly { unsigned int nodeid; unsigned char data[MESSAGE_SIZE_MAX]; int index; unsigned char last_frag_num; struct list_head list; }; static void assembly_deref (struct assembly *assembly); static int callback_token_received_fn (enum totem_callback_token_type type, void *data); enum throw_away_mode_t { THROW_AWAY_INACTIVE, THROW_AWAY_ACTIVE }; static enum throw_away_mode_t throw_away_mode = THROW_AWAY_INACTIVE; DECLARE_LIST_INIT(assembly_list_inuse); DECLARE_LIST_INIT(assembly_list_free); /* * Staging buffer for packed messages. Messages are staged in this buffer * before sending. Multiple messages may fit which cuts down on the * number of mcasts sent. If a message doesn't completely fit, then * the mcast header has a fragment bit set that says that there are more * data to follow. fragment_size is an index into the buffer. It indicates * the size of message data and where to place new message data. * fragment_contuation indicates whether the first packed message in * the buffer is a continuation of a previously packed fragment. */ static unsigned char *fragmentation_data; static int fragment_size = 0; static int fragment_continuation = 0; static struct iovec iov_delv; static unsigned int totempg_max_handle = 0; struct totempg_group_instance { void (*deliver_fn) ( unsigned int nodeid, struct iovec *iovec, int iov_len, int endian_conversion_required); void (*confchg_fn) ( enum totem_configuration_type configuration_type, - unsigned int *member_list, int member_list_entries, - unsigned int *left_list, int left_list_entries, - unsigned int *joined_list, int joined_list_entries, - struct memb_ring_id *ring_id); + const unsigned int *member_list, size_t member_list_entries, + const unsigned int *left_list, size_t left_list_entries, + const unsigned int *joined_list, size_t joined_list_entries, + const struct memb_ring_id *ring_id); struct totempg_group *groups; int groups_cnt; }; static struct hdb_handle_database totempg_groups_instance_database = { .handle_count = 0, .handles = 0, .iterator = 0, .mutex = PTHREAD_MUTEX_INITIALIZER }; static unsigned char next_fragment = 1; static pthread_mutex_t totempg_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t callback_token_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t mcast_msg_mutex = PTHREAD_MUTEX_INITIALIZER; #define log_printf(level, format, args...) \ do { \ totempg_log_printf (totempg_subsys_id, __FUNCTION__, \ __FILE__, __LINE__, level, format, ##args); \ } while (0); static int msg_count_send_ok (int msg_count); static int byte_count_send_ok (int byte_count); static struct assembly *assembly_ref (unsigned int nodeid) { struct assembly *assembly; struct list_head *list; /* * Search inuse list for node id and return assembly buffer if found */ for (list = assembly_list_inuse.next; list != &assembly_list_inuse; list = list->next) { assembly = list_entry (list, struct assembly, list); if (nodeid == assembly->nodeid) { return (assembly); } } /* * Nothing found in inuse list get one from free list if available */ if (list_empty (&assembly_list_free) == 0) { assembly = list_entry (assembly_list_free.next, struct assembly, list); list_del (&assembly->list); list_add (&assembly->list, &assembly_list_inuse); assembly->nodeid = nodeid; return (assembly); } /* * Nothing available in inuse or free list, so allocate a new one */ assembly = malloc (sizeof (struct assembly)); memset (assembly, 0, sizeof (struct assembly)); /* * TODO handle memory allocation failure here */ assert (assembly); assembly->nodeid = nodeid; list_init (&assembly->list); list_add (&assembly->list, &assembly_list_inuse); return (assembly); } static void assembly_deref (struct assembly *assembly) { list_del (&assembly->list); list_add (&assembly->list, &assembly_list_free); } static inline void app_confchg_fn ( enum totem_configuration_type configuration_type, unsigned int *member_list, int member_list_entries, unsigned int *left_list, int left_list_entries, unsigned int *joined_list, int joined_list_entries, struct memb_ring_id *ring_id) { int i; struct totempg_group_instance *instance; unsigned int res; for (i = 0; i <= totempg_max_handle; i++) { res = hdb_handle_get (&totempg_groups_instance_database, hdb_nocheck_convert (i), (void *)&instance); if (res == 0) { if (instance->confchg_fn) { instance->confchg_fn ( configuration_type, member_list, member_list_entries, left_list, left_list_entries, joined_list, joined_list_entries, ring_id); } hdb_handle_put (&totempg_groups_instance_database, hdb_nocheck_convert (i)); } } } static inline void group_endian_convert ( struct iovec *iovec) { unsigned short *group_len; int i; struct iovec iovec_aligned = { NULL, 0 }; struct iovec *iovec_swab; /* * Align data structure for sparc and ia64 */ if ((size_t)iovec->iov_base % 4 != 0) { iovec_aligned.iov_base = alloca(iovec->iov_len); memcpy(iovec_aligned.iov_base, iovec->iov_base, iovec->iov_len); iovec_aligned.iov_len = iovec->iov_len; iovec_swab = &iovec_aligned; } else { iovec_swab = iovec; } group_len = (unsigned short *)iovec_swab->iov_base; group_len[0] = swab16(group_len[0]); for (i = 1; i < group_len[0] + 1; i++) { group_len[i] = swab16(group_len[i]); } if (iovec_swab == &iovec_aligned) { memcpy(iovec->iov_base, iovec_aligned.iov_base, iovec->iov_len); } } static inline int group_matches ( struct iovec *iovec, unsigned int iov_len, struct totempg_group *groups_b, unsigned int group_b_cnt, unsigned int *adjust_iovec) { unsigned short *group_len; char *group_name; int i; int j; struct iovec iovec_aligned = { NULL, 0 }; assert (iov_len == 1); /* * Align data structure for sparc and ia64 */ if ((size_t)iovec->iov_base % 4 != 0) { iovec_aligned.iov_base = alloca(iovec->iov_len); memcpy(iovec_aligned.iov_base, iovec->iov_base, iovec->iov_len); iovec_aligned.iov_len = iovec->iov_len; iovec = &iovec_aligned; } group_len = (unsigned short *)iovec->iov_base; group_name = ((char *)iovec->iov_base) + sizeof (unsigned short) * (group_len[0] + 1); /* * Calculate amount to adjust the iovec by before delivering to app */ *adjust_iovec = sizeof (unsigned short) * (group_len[0] + 1); for (i = 1; i < group_len[0] + 1; i++) { *adjust_iovec += group_len[i]; } /* * Determine if this message should be delivered to this instance */ for (i = 1; i < group_len[0] + 1; i++) { for (j = 0; j < group_b_cnt; j++) { if ((group_len[i] == groups_b[j].group_len) && (memcmp (groups_b[j].group, group_name, group_len[i]) == 0)) { return (1); } } group_name += group_len[i]; } return (0); } static inline void app_deliver_fn ( unsigned int nodeid, struct iovec *iovec, unsigned int iov_len, int endian_conversion_required) { int i; struct totempg_group_instance *instance; struct iovec stripped_iovec; unsigned int adjust_iovec; unsigned int res; struct iovec aligned_iovec = { NULL, 0 }; if (endian_conversion_required) { group_endian_convert (iovec); } /* * Align data structure for sparc and ia64 */ aligned_iovec.iov_base = alloca(iovec->iov_len); aligned_iovec.iov_len = iovec->iov_len; memcpy(aligned_iovec.iov_base, iovec->iov_base, iovec->iov_len); iovec = &aligned_iovec; for (i = 0; i <= totempg_max_handle; i++) { res = hdb_handle_get (&totempg_groups_instance_database, hdb_nocheck_convert (i), (void *)&instance); if (res == 0) { assert (iov_len == 1); if (group_matches (iovec, iov_len, instance->groups, instance->groups_cnt, &adjust_iovec)) { stripped_iovec.iov_len = iovec->iov_len - adjust_iovec; // stripped_iovec.iov_base = (char *)iovec->iov_base + adjust_iovec; /* * Align data structure for sparc and ia64 */ if ((char *)iovec->iov_base + adjust_iovec % 4 != 0) { /* * Deal with misalignment */ stripped_iovec.iov_base = alloca (stripped_iovec.iov_len); memcpy (stripped_iovec.iov_base, (char *)iovec->iov_base + adjust_iovec, stripped_iovec.iov_len); } instance->deliver_fn ( nodeid, &stripped_iovec, iov_len, endian_conversion_required); } hdb_handle_put (&totempg_groups_instance_database, hdb_nocheck_convert(i)); } } } static void totempg_confchg_fn ( enum totem_configuration_type configuration_type, - unsigned int *member_list, int member_list_entries, - unsigned int *left_list, int left_list_entries, - unsigned int *joined_list, int joined_list_entries, - struct memb_ring_id *ring_id) + const unsigned int *member_list, size_t member_list_entries, + const unsigned int *left_list, size_t left_list_entries, + const unsigned int *joined_list, size_t joined_list_entries, + const struct memb_ring_id *ring_id) { // TODO optimize this app_confchg_fn (configuration_type, member_list, member_list_entries, left_list, left_list_entries, joined_list, joined_list_entries, ring_id); } static void totempg_deliver_fn ( unsigned int nodeid, struct iovec *iovec, int iov_len, int endian_conversion_required) { struct totempg_mcast *mcast; unsigned short *msg_lens; int i; struct assembly *assembly; char header[FRAME_SIZE_MAX]; int h_index; int a_i = 0; int msg_count; int continuation; int start; assembly = assembly_ref (nodeid); assert (assembly); /* * Assemble the header into one block of data and * assemble the packet contents into one block of data to simplify delivery */ if (iov_len == 1) { /* * This message originated from external processor * because there is only one iovec for the full msg. */ char *data; int datasize; mcast = (struct totempg_mcast *)iovec[0].iov_base; if (endian_conversion_required) { mcast->msg_count = swab16 (mcast->msg_count); } msg_count = mcast->msg_count; datasize = sizeof (struct totempg_mcast) + msg_count * sizeof (unsigned short); memcpy (header, iovec[0].iov_base, datasize); assert(iovec); data = iovec[0].iov_base; msg_lens = (unsigned short *) (header + sizeof (struct totempg_mcast)); if (endian_conversion_required) { for (i = 0; i < mcast->msg_count; i++) { msg_lens[i] = swab16 (msg_lens[i]); } } memcpy (&assembly->data[assembly->index], &data[datasize], iovec[0].iov_len - datasize); } else { /* * The message originated from local processor * becasue there is greater than one iovec for then full msg. */ h_index = 0; for (i = 0; i < 2; i++) { memcpy (&header[h_index], iovec[i].iov_base, iovec[i].iov_len); h_index += iovec[i].iov_len; } mcast = (struct totempg_mcast *)header; // TODO make sure we are using a copy of mcast not the actual data itself msg_lens = (unsigned short *) (header + sizeof (struct totempg_mcast)); for (i = 2; i < iov_len; i++) { a_i = assembly->index; assert (iovec[i].iov_len + a_i <= MESSAGE_SIZE_MAX); memcpy (&assembly->data[a_i], iovec[i].iov_base, iovec[i].iov_len); a_i += msg_lens[i - 2]; } iov_len -= 2; } /* * If the last message in the buffer is a fragment, then we * can't deliver it. We'll first deliver the full messages * then adjust the assembly buffer so we can add the rest of the * fragment when it arrives. */ msg_count = mcast->fragmented ? mcast->msg_count - 1 : mcast->msg_count; continuation = mcast->continuation; iov_delv.iov_base = &assembly->data[0]; iov_delv.iov_len = assembly->index + msg_lens[0]; /* * Make sure that if this message is a continuation, that it * matches the sequence number of the previous fragment. * Also, if the first packed message is a continuation * of a previous message, but the assembly buffer * is empty, then we need to discard it since we can't * assemble a complete message. Likewise, if this message isn't a * continuation and the assembly buffer is empty, we have to discard * the continued message. */ start = 0; if (throw_away_mode == THROW_AWAY_ACTIVE) { /* Throw away the first msg block */ if (mcast->fragmented == 0 || mcast->fragmented == 1) { throw_away_mode = THROW_AWAY_INACTIVE; assembly->index += msg_lens[0]; iov_delv.iov_base = &assembly->data[assembly->index]; iov_delv.iov_len = msg_lens[1]; start = 1; } } else if (throw_away_mode == THROW_AWAY_INACTIVE) { if (continuation == assembly->last_frag_num) { assembly->last_frag_num = mcast->fragmented; for (i = start; i < msg_count; i++) { app_deliver_fn(nodeid, &iov_delv, 1, endian_conversion_required); assembly->index += msg_lens[i]; iov_delv.iov_base = &assembly->data[assembly->index]; if (i < (msg_count - 1)) { iov_delv.iov_len = msg_lens[i + 1]; } } } else { throw_away_mode = THROW_AWAY_ACTIVE; } } if (mcast->fragmented == 0) { /* * End of messages, dereference assembly struct */ assembly->last_frag_num = 0; assembly->index = 0; assembly_deref (assembly); } else { /* * Message is fragmented, keep around assembly list */ if (mcast->msg_count > 1) { memmove (&assembly->data[0], &assembly->data[assembly->index], msg_lens[msg_count]); assembly->index = 0; } assembly->index += msg_lens[msg_count]; } } /* * Totem Process Group Abstraction * depends on poll abstraction, POSIX, IPV4 */ void *callback_token_received_handle; int callback_token_received_fn (enum totem_callback_token_type type, void *data) { struct totempg_mcast mcast; struct iovec iovecs[3]; int res; pthread_mutex_lock (&mcast_msg_mutex); if (mcast_packed_msg_count == 0) { pthread_mutex_unlock (&mcast_msg_mutex); return (0); } if (totemmrp_avail() == 0) { pthread_mutex_unlock (&mcast_msg_mutex); return (0); } mcast.fragmented = 0; /* * Was the first message in this buffer a continuation of a * fragmented message? */ mcast.continuation = fragment_continuation; fragment_continuation = 0; mcast.msg_count = mcast_packed_msg_count; iovecs[0].iov_base = &mcast; iovecs[0].iov_len = sizeof (struct totempg_mcast); iovecs[1].iov_base = mcast_packed_msg_lens; iovecs[1].iov_len = mcast_packed_msg_count * sizeof (unsigned short); iovecs[2].iov_base = &fragmentation_data[0]; iovecs[2].iov_len = fragment_size; res = totemmrp_mcast (iovecs, 3, 0); mcast_packed_msg_count = 0; fragment_size = 0; pthread_mutex_unlock (&mcast_msg_mutex); return (0); } /* * Initialize the totem process group abstraction */ int totempg_initialize ( hdb_handle_t poll_handle, struct totem_config *totem_config) { int res; totempg_totem_config = totem_config; totempg_log_level_security = totem_config->totem_logging_configuration.log_level_security; totempg_log_level_error = totem_config->totem_logging_configuration.log_level_error; totempg_log_level_warning = totem_config->totem_logging_configuration.log_level_warning; totempg_log_level_notice = totem_config->totem_logging_configuration.log_level_notice; totempg_log_level_debug = totem_config->totem_logging_configuration.log_level_debug; totempg_log_printf = totem_config->totem_logging_configuration.log_printf; totempg_subsys_id = totem_config->totem_logging_configuration.log_subsys_id; fragmentation_data = malloc (TOTEMPG_PACKET_SIZE); if (fragmentation_data == 0) { return (-1); } res = totemmrp_initialize ( poll_handle, totem_config, totempg_deliver_fn, totempg_confchg_fn); totemmrp_callback_token_create ( &callback_token_received_handle, TOTEM_CALLBACK_TOKEN_RECEIVED, 0, callback_token_received_fn, 0); totemsrp_net_mtu_adjust (totem_config); return (res); } void totempg_finalize (void) { pthread_mutex_lock (&totempg_mutex); totemmrp_finalize (); pthread_mutex_unlock (&totempg_mutex); } /* * Multicast a message */ static int mcast_msg ( struct iovec *iovec_in, int iov_len, int guarantee) { int res = 0; struct totempg_mcast mcast; struct iovec iovecs[3]; struct iovec iovec[64]; int i; int dest, src; int max_packet_size = 0; int copy_len = 0; int copy_base = 0; int total_size = 0; pthread_mutex_lock (&mcast_msg_mutex); totemmrp_new_msg_signal (); /* * Remove zero length iovectors from the list */ assert (iov_len < 64); for (dest = 0, src = 0; src < iov_len; src++) { if (iovec_in[src].iov_len) { memcpy (&iovec[dest++], &iovec_in[src], sizeof (struct iovec)); } } iov_len = dest; max_packet_size = TOTEMPG_PACKET_SIZE - (sizeof (unsigned short) * (mcast_packed_msg_count + 1)); mcast_packed_msg_lens[mcast_packed_msg_count] = 0; /* * Check if we would overwrite new message queue */ for (i = 0; i < iov_len; i++) { total_size += iovec[i].iov_len; } if (byte_count_send_ok (total_size + sizeof(unsigned short) * (mcast_packed_msg_count+1)) == 0) { pthread_mutex_unlock (&mcast_msg_mutex); return(-1); } for (i = 0; i < iov_len; ) { mcast.fragmented = 0; mcast.continuation = fragment_continuation; copy_len = iovec[i].iov_len - copy_base; /* * If it all fits with room left over, copy it in. * We need to leave at least sizeof(short) + 1 bytes in the * fragment_buffer on exit so that max_packet_size + fragment_size * doesn't exceed the size of the fragment_buffer on the next call. */ if ((copy_len + fragment_size) < (max_packet_size - sizeof (unsigned short))) { memcpy (&fragmentation_data[fragment_size], (char *)iovec[i].iov_base + copy_base, copy_len); fragment_size += copy_len; mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len; next_fragment = 1; copy_len = 0; copy_base = 0; i++; continue; /* * If it just fits or is too big, then send out what fits. */ } else { unsigned char *data_ptr; copy_len = min(copy_len, max_packet_size - fragment_size); if( copy_len == max_packet_size ) data_ptr = (unsigned char *)iovec[i].iov_base + copy_base; else { data_ptr = fragmentation_data; memcpy (&fragmentation_data[fragment_size], (unsigned char *)iovec[i].iov_base + copy_base, copy_len); } memcpy (&fragmentation_data[fragment_size], (unsigned char *)iovec[i].iov_base + copy_base, copy_len); mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len; /* * if we're not on the last iovec or the iovec is too large to * fit, then indicate a fragment. This also means that the next * message will have the continuation of this one. */ if ((i < (iov_len - 1)) || ((copy_base + copy_len) < iovec[i].iov_len)) { if (!next_fragment) { next_fragment++; } fragment_continuation = next_fragment; mcast.fragmented = next_fragment++; assert(fragment_continuation != 0); assert(mcast.fragmented != 0); } else { fragment_continuation = 0; } /* * assemble the message and send it */ mcast.msg_count = ++mcast_packed_msg_count; iovecs[0].iov_base = &mcast; iovecs[0].iov_len = sizeof(struct totempg_mcast); iovecs[1].iov_base = mcast_packed_msg_lens; iovecs[1].iov_len = mcast_packed_msg_count * sizeof(unsigned short); iovecs[2].iov_base = data_ptr; iovecs[2].iov_len = max_packet_size; assert (totemmrp_avail() > 0); res = totemmrp_mcast (iovecs, 3, guarantee); /* * Recalculate counts and indexes for the next. */ mcast_packed_msg_lens[0] = 0; mcast_packed_msg_count = 0; fragment_size = 0; max_packet_size = TOTEMPG_PACKET_SIZE - (sizeof(unsigned short)); /* * If the iovec all fit, go to the next iovec */ if ((copy_base + copy_len) == iovec[i].iov_len) { copy_len = 0; copy_base = 0; i++; /* * Continue with the rest of the current iovec. */ } else { copy_base += copy_len; } } } /* * Bump only if we added message data. This may be zero if * the last buffer just fit into the fragmentation_data buffer * and we were at the last iovec. */ if (mcast_packed_msg_lens[mcast_packed_msg_count]) { mcast_packed_msg_count++; } pthread_mutex_unlock (&mcast_msg_mutex); return (res); } /* * Determine if a message of msg_size could be queued */ static int msg_count_send_ok ( int msg_count) { int avail = 0; avail = totemmrp_avail () - totempg_reserved - 1; return (avail > msg_count); } static int byte_count_send_ok ( int byte_count) { unsigned int msg_count = 0; int avail = 0; avail = totemmrp_avail () - 1; msg_count = (byte_count / (totempg_totem_config->net_mtu - 25)) + 1; return (avail > msg_count); } static int send_reserve ( int msg_size) { unsigned int msg_count = 0; msg_count = (msg_size / (totempg_totem_config->net_mtu - 25)) + 1; totempg_reserved += msg_count; return (msg_count); } static void send_release ( int msg_count) { totempg_reserved -= msg_count; } int totempg_callback_token_create ( void **handle_out, enum totem_callback_token_type type, int delete, int (*callback_fn) (enum totem_callback_token_type type, void *), void *data) { unsigned int res; pthread_mutex_lock (&callback_token_mutex); res = totemmrp_callback_token_create (handle_out, type, delete, callback_fn, data); pthread_mutex_unlock (&callback_token_mutex); return (res); } void totempg_callback_token_destroy ( void *handle_out) { pthread_mutex_lock (&callback_token_mutex); totemmrp_callback_token_destroy (handle_out); pthread_mutex_unlock (&callback_token_mutex); } /* * vi: set autoindent tabstop=4 shiftwidth=4 : */ int totempg_groups_initialize ( hdb_handle_t *handle, void (*deliver_fn) ( unsigned int nodeid, struct iovec *iovec, int iov_len, int endian_conversion_required), void (*confchg_fn) ( enum totem_configuration_type configuration_type, - unsigned int *member_list, int member_list_entries, - unsigned int *left_list, int left_list_entries, - unsigned int *joined_list, int joined_list_entries, - struct memb_ring_id *ring_id)) + const unsigned int *member_list, size_t member_list_entries, + const unsigned int *left_list, size_t left_list_entries, + const unsigned int *joined_list, size_t joined_list_entries, + const struct memb_ring_id *ring_id)) { struct totempg_group_instance *instance; unsigned int res; pthread_mutex_lock (&totempg_mutex); res = hdb_handle_create (&totempg_groups_instance_database, sizeof (struct totempg_group_instance), handle); if (res != 0) { goto error_exit; } if (*handle > totempg_max_handle) { totempg_max_handle = *handle; } res = hdb_handle_get (&totempg_groups_instance_database, *handle, (void *)&instance); if (res != 0) { goto error_destroy; } instance->deliver_fn = deliver_fn; instance->confchg_fn = confchg_fn; instance->groups = 0; instance->groups_cnt = 0; hdb_handle_put (&totempg_groups_instance_database, *handle); pthread_mutex_unlock (&totempg_mutex); return (0); error_destroy: hdb_handle_destroy (&totempg_groups_instance_database, *handle); error_exit: pthread_mutex_unlock (&totempg_mutex); return (-1); } int totempg_groups_join ( hdb_handle_t handle, const struct totempg_group *groups, size_t group_cnt) { struct totempg_group_instance *instance; struct totempg_group *new_groups; unsigned int res; pthread_mutex_lock (&totempg_mutex); res = hdb_handle_get (&totempg_groups_instance_database, handle, (void *)&instance); if (res != 0) { goto error_exit; } new_groups = realloc (instance->groups, sizeof (struct totempg_group) * (instance->groups_cnt + group_cnt)); if (new_groups == 0) { res = ENOMEM; goto error_exit; } memcpy (&new_groups[instance->groups_cnt], groups, group_cnt * sizeof (struct totempg_group)); instance->groups = new_groups; instance->groups_cnt = instance->groups_cnt = group_cnt; hdb_handle_put (&totempg_groups_instance_database, handle); error_exit: pthread_mutex_unlock (&totempg_mutex); return (res); } int totempg_groups_leave ( hdb_handle_t handle, const struct totempg_group *groups, size_t group_cnt) { struct totempg_group_instance *instance; unsigned int res; pthread_mutex_lock (&totempg_mutex); res = hdb_handle_get (&totempg_groups_instance_database, handle, (void *)&instance); if (res != 0) { goto error_exit; } hdb_handle_put (&totempg_groups_instance_database, handle); error_exit: pthread_mutex_unlock (&totempg_mutex); return (res); } #define MAX_IOVECS_FROM_APP 32 #define MAX_GROUPS_PER_MSG 32 int totempg_groups_mcast_joined ( hdb_handle_t handle, const struct iovec *iovec, int iov_len, int guarantee) { struct totempg_group_instance *instance; unsigned short group_len[MAX_GROUPS_PER_MSG + 1]; struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP]; int i; unsigned int res; pthread_mutex_lock (&totempg_mutex); res = hdb_handle_get (&totempg_groups_instance_database, handle, (void *)&instance); if (res != 0) { goto error_exit; } /* * Build group_len structure and the iovec_mcast structure */ group_len[0] = instance->groups_cnt; for (i = 0; i < instance->groups_cnt; i++) { group_len[i + 1] = instance->groups[i].group_len; iovec_mcast[i + 1].iov_len = instance->groups[i].group_len; iovec_mcast[i + 1].iov_base = instance->groups[i].group; } iovec_mcast[0].iov_len = (instance->groups_cnt + 1) * sizeof (unsigned short); iovec_mcast[0].iov_base = group_len; for (i = 0; i < iov_len; i++) { iovec_mcast[i + instance->groups_cnt + 1].iov_len = iovec[i].iov_len; iovec_mcast[i + instance->groups_cnt + 1].iov_base = iovec[i].iov_base; } res = mcast_msg (iovec_mcast, iov_len + instance->groups_cnt + 1, guarantee); hdb_handle_put (&totempg_groups_instance_database, handle); error_exit: pthread_mutex_unlock (&totempg_mutex); return (res); } int totempg_groups_joined_reserve ( hdb_handle_t handle, const struct iovec *iovec, int iov_len) { struct totempg_group_instance *instance; unsigned int size = 0; unsigned int i; unsigned int res; unsigned int reserved = 0; pthread_mutex_lock (&totempg_mutex); pthread_mutex_lock (&mcast_msg_mutex); res = hdb_handle_get (&totempg_groups_instance_database, handle, (void *)&instance); if (res != 0) { goto error_exit; } for (i = 0; i < instance->groups_cnt; i++) { size += instance->groups[i].group_len; } for (i = 0; i < iov_len; i++) { size += iovec[i].iov_len; } reserved = send_reserve (size); if (msg_count_send_ok (reserved) == 0) { send_release (reserved); reserved = 0; } hdb_handle_put (&totempg_groups_instance_database, handle); error_exit: pthread_mutex_unlock (&mcast_msg_mutex); pthread_mutex_unlock (&totempg_mutex); return (reserved); } void totempg_groups_joined_release (int msg_count) { pthread_mutex_lock (&totempg_mutex); pthread_mutex_lock (&mcast_msg_mutex); send_release (msg_count); pthread_mutex_unlock (&mcast_msg_mutex); pthread_mutex_unlock (&totempg_mutex); } int totempg_groups_mcast_groups ( hdb_handle_t handle, int guarantee, const struct totempg_group *groups, size_t groups_cnt, const struct iovec *iovec, size_t iov_len) { struct totempg_group_instance *instance; unsigned short group_len[MAX_GROUPS_PER_MSG + 1]; struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP]; int i; unsigned int res; pthread_mutex_lock (&totempg_mutex); res = hdb_handle_get (&totempg_groups_instance_database, handle, (void *)&instance); if (res != 0) { goto error_exit; } /* * Build group_len structure and the iovec_mcast structure */ group_len[0] = groups_cnt; for (i = 0; i < groups_cnt; i++) { group_len[i + 1] = groups[i].group_len; iovec_mcast[i + 1].iov_len = groups[i].group_len; iovec_mcast[i + 1].iov_base = groups[i].group; } iovec_mcast[0].iov_len = (groups_cnt + 1) * sizeof (unsigned short); iovec_mcast[0].iov_base = group_len; for (i = 0; i < iov_len; i++) { iovec_mcast[i + groups_cnt + 1].iov_len = iovec[i].iov_len; iovec_mcast[i + groups_cnt + 1].iov_base = iovec[i].iov_base; } res = mcast_msg (iovec_mcast, iov_len + groups_cnt + 1, guarantee); hdb_handle_put (&totempg_groups_instance_database, handle); error_exit: pthread_mutex_unlock (&totempg_mutex); return (res); } /* * Returns -1 if error, 0 if can't send, 1 if can send the message */ int totempg_groups_send_ok_groups ( hdb_handle_t handle, const struct totempg_group *groups, size_t groups_cnt, const struct iovec *iovec, size_t iov_len) { struct totempg_group_instance *instance; unsigned int size = 0; unsigned int i; unsigned int res; pthread_mutex_lock (&totempg_mutex); res = hdb_handle_get (&totempg_groups_instance_database, handle, (void *)&instance); if (res != 0) { goto error_exit; } for (i = 0; i < groups_cnt; i++) { size += groups[i].group_len; } for (i = 0; i < iov_len; i++) { size += iovec[i].iov_len; } res = msg_count_send_ok (size); hdb_handle_put (&totempg_groups_instance_database, handle); error_exit: pthread_mutex_unlock (&totempg_mutex); return (res); } int totempg_ifaces_get ( unsigned int nodeid, struct totem_ip_address *interfaces, char ***status, unsigned int *iface_count) { int res; res = totemmrp_ifaces_get ( nodeid, interfaces, status, iface_count); return (res); } int totempg_ring_reenable (void) { int res; res = totemmrp_ring_reenable (); return (res); } const char *totempg_ifaces_print (unsigned int nodeid) { static char iface_string[256 * INTERFACE_MAX]; char one_iface[64]; struct totem_ip_address interfaces[INTERFACE_MAX]; char **status; unsigned int iface_count; unsigned int i; int res; iface_string[0] = '\0'; res = totempg_ifaces_get (nodeid, interfaces, &status, &iface_count); if (res == -1) { return ("no interface found for nodeid"); } for (i = 0; i < iface_count; i++) { sprintf (one_iface, "r(%d) ip(%s) ", i, totemip_print (&interfaces[i])); strcat (iface_string, one_iface); } return (iface_string); } unsigned int totempg_my_nodeid_get (void) { return (totemmrp_my_nodeid_get()); } int totempg_my_family_get (void) { return (totemmrp_my_family_get()); } diff --git a/include/corosync/engine/coroapi.h b/include/corosync/engine/coroapi.h index ece4c216..915a8185 100644 --- a/include/corosync/engine/coroapi.h +++ b/include/corosync/engine/coroapi.h @@ -1,599 +1,601 @@ /* * Copyright (c) 2008, 2009 Red Hat, Inc. * * All rights reserved. * * Author: Steven Dake (sdake@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #ifndef COROAPI_H_DEFINED #define COROAPI_H_DEFINED #include #ifdef COROSYNC_BSD #include #endif #include typedef void * corosync_timer_handle_t; struct corosync_tpg_group { const void *group; int group_len; }; #define TOTEMIP_ADDRLEN (sizeof(struct in6_addr)) #define PROCESSOR_COUNT_MAX 384 #define INTERFACE_MAX 2 #ifndef MESSAGE_SIZE_MAX #define MESSAGE_SIZE_MAX 1024*1024 /* (1MB) */ #endif /* MESSAGE_SIZE_MAX */ #ifndef MESSAGE_QUEUE_MAX #define MESSAGE_QUEUE_MAX MESSAGE_SIZE_MAX / totem_config->net_mtu #endif /* MESSAGE_QUEUE_MAX */ #define TOTEM_AGREED 0 #define TOTEM_SAFE 1 #define MILLI_2_NANO_SECONDS 1000000ULL #if !defined(TOTEM_IP_ADDRESS) struct totem_ip_address { unsigned int nodeid; unsigned short family; unsigned char addr[TOTEMIP_ADDRLEN]; } __attribute__((packed)); #endif #if !defined(MEMB_RING_ID) struct memb_ring_id { struct totem_ip_address rep; unsigned long long seq; } __attribute__((packed)); #endif #if !defined(TOTEM_CONFIGURATION_TYPE) enum totem_configuration_type { TOTEM_CONFIGURATION_REGULAR, TOTEM_CONFIGURATION_TRANSITIONAL }; #endif #if !defined(TOTEM_CALLBACK_TOKEN_TYPE) enum totem_callback_token_type { TOTEM_CALLBACK_TOKEN_RECEIVED = 1, TOTEM_CALLBACK_TOKEN_SENT = 2 }; #endif enum cs_lib_flow_control { CS_LIB_FLOW_CONTROL_REQUIRED = 1, CS_LIB_FLOW_CONTROL_NOT_REQUIRED = 2 }; #define corosync_lib_flow_control cs_lib_flow_control #define COROSYNC_LIB_FLOW_CONTROL_REQUIRED CS_LIB_FLOW_CONTROL_REQUIRED #define COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED CS_LIB_FLOW_CONTROL_NOT_REQUIRED enum cs_lib_allow_inquorate { CS_LIB_DISALLOW_INQUORATE = 0, /* default */ CS_LIB_ALLOW_INQUORATE = 1 }; #if !defined (COROSYNC_FLOW_CONTROL_STATE) enum cs_flow_control_state { CS_FLOW_CONTROL_STATE_DISABLED, CS_FLOW_CONTROL_STATE_ENABLED }; #define corosync_flow_control_state cs_flow_control_state #define CS_FLOW_CONTROL_STATE_DISABLED CS_FLOW_CONTROL_STATE_DISABLED #define CS_FLOW_CONTROL_STATE_ENABLED CS_FLOW_CONTROL_STATE_ENABLED #endif /* COROSYNC_FLOW_CONTROL_STATE */ typedef enum { COROSYNC_FATAL_ERROR_EXIT = -1, COROSYNC_LIBAIS_SOCKET = -6, COROSYNC_LIBAIS_BIND = -7, COROSYNC_READKEY = -8, COROSYNC_INVALID_CONFIG = -9, COROSYNC_DYNAMICLOAD = -12, COROSYNC_OUT_OF_MEMORY = -15, COROSYNC_FATAL_ERR = -16 } cs_fatal_error_t; #define corosync_fatal_error_t cs_fatal_error_t; #ifndef OBJECT_PARENT_HANDLE #define OBJECT_PARENT_HANDLE 0xffffffff00000000ULL struct object_valid { char *object_name; int object_len; }; struct object_key_valid { char *key_name; int key_len; int (*validate_callback) (const void *key, int key_len, const void *value, int value_len); }; /* deprecated */ typedef enum { OBJECT_TRACK_DEPTH_ONE, OBJECT_TRACK_DEPTH_RECURSIVE } object_track_depth_t; typedef enum { OBJECT_KEY_CREATED, OBJECT_KEY_REPLACED, OBJECT_KEY_DELETED } object_change_type_t; typedef enum { OBJDB_RELOAD_NOTIFY_START, OBJDB_RELOAD_NOTIFY_END, OBJDB_RELOAD_NOTIFY_FAILED } objdb_reload_notify_type_t; typedef void (*object_key_change_notify_fn_t)( object_change_type_t change_type, hdb_handle_t parent_object_handle, hdb_handle_t object_handle, const void *object_name_pt, int object_name_len, const void *key_name_pt, int key_len, const void *key_value_pt, int key_value_len, void *priv_data_pt); typedef void (*object_create_notify_fn_t) ( hdb_handle_t parent_object_handle, hdb_handle_t object_handle, const uint8_t *name_pt, int name_len, void *priv_data_pt); typedef void (*object_destroy_notify_fn_t) ( hdb_handle_t parent_object_handle, const uint8_t *name_pt, int name_len, void *priv_data_pt); typedef void (*object_notify_callback_fn_t)( hdb_handle_t object_handle, const void *key_name, int key_len, const void *value, int value_len, object_change_type_t type, void * priv_data_pt); typedef void (*object_reload_notify_fn_t) ( objdb_reload_notify_type_t, int flush, void *priv_data_pt); #endif /* OBJECT_PARENT_HANDLE_DEFINED */ #ifndef QUORUM_H_DEFINED typedef void (*quorum_callback_fn_t) (int quorate, void *context); struct quorum_callin_functions { int (*quorate) (void); int (*register_callback) (quorum_callback_fn_t callback_fn, void *context); int (*unregister_callback) (quorum_callback_fn_t callback_fn, void *context); }; typedef void (*sync_callback_fn_t) ( const unsigned int *view_list, size_t view_list_entries, int primary_designated, struct memb_ring_id *ring_id); #endif /* QUORUM_H_DEFINED */ struct corosync_api_v1 { /* * Object and configuration APIs */ int (*object_create) ( hdb_handle_t parent_object_handle, hdb_handle_t *object_handle, const void *object_name, unsigned int object_name_len); int (*object_priv_set) ( hdb_handle_t object_handle, void *priv); int (*object_key_create) ( hdb_handle_t object_handle, const void *key_name, int key_len, const void *value, int value_len); int (*object_destroy) ( hdb_handle_t object_handle); int (*object_valid_set) ( hdb_handle_t object_handle, struct object_valid *object_valid_list, unsigned int object_valid_list_entries); int (*object_key_valid_set) ( hdb_handle_t object_handle, struct object_key_valid *object_key_valid_list, unsigned int object_key_valid_list_entries); int (*object_find_create) ( hdb_handle_t parent_object_handle, const void *object_name, int object_name_len, hdb_handle_t *object_find_handle); int (*object_find_next) ( hdb_handle_t object_find_handle, hdb_handle_t *object_handle); int (*object_find_destroy) ( hdb_handle_t object_find_handle); int (*object_key_get) ( hdb_handle_t object_handle, const void *key_name, int key_len, void **value, int *value_len); int (*object_priv_get) ( hdb_handle_t jobject_handle, void **priv); int (*object_key_replace) ( hdb_handle_t object_handle, const void *key_name, int key_len, const void *new_value, int new_value_len); int (*object_key_delete) ( hdb_handle_t object_handle, const void *key_name, int key_len); int (*object_iter_reset) ( hdb_handle_t parent_object_handle); int (*object_iter) ( hdb_handle_t parent_object_handle, void **object_name, int *name_len, hdb_handle_t *object_handle); int (*object_key_iter_reset) ( hdb_handle_t object_handle); int (*object_key_iter) ( hdb_handle_t parent_object_handle, void **key_name, int *key_len, void **value, int *value_len); int (*object_parent_get) ( hdb_handle_t object_handle, hdb_handle_t *parent_handle); int (*object_name_get) ( hdb_handle_t object_handle, char *object_name, int *object_name_len); int (*object_dump) ( hdb_handle_t object_handle, FILE *file); int (*object_key_iter_from) ( hdb_handle_t parent_object_handle, hdb_handle_t start_pos, void **key_name, int *key_len, void **value, int *value_len); int (*object_track_start) ( hdb_handle_t object_handle, object_track_depth_t depth, object_key_change_notify_fn_t key_change_notify_fn, object_create_notify_fn_t object_create_notify_fn, object_destroy_notify_fn_t object_destroy_notify_fn, object_reload_notify_fn_t object_reload_notify_fn, void * priv_data_pt); void (*object_track_stop) ( object_key_change_notify_fn_t key_change_notify_fn, object_create_notify_fn_t object_create_notify_fn, object_destroy_notify_fn_t object_destroy_notify_fn, object_reload_notify_fn_t object_reload_notify_fn, void * priv_data_pt); int (*object_write_config) (const char **error_string); int (*object_reload_config) (int flush, const char **error_string); int (*object_key_increment) ( hdb_handle_t object_handle, const void *key_name, int key_len, unsigned int *value); int (*object_key_decrement) ( hdb_handle_t object_handle, const void *key_name, int key_len, unsigned int *value); /* * Time and timer APIs */ int (*timer_add_duration) ( unsigned long long nanoseconds_in_future, void *data, void (*timer_nf) (void *data), corosync_timer_handle_t *handle); int (*timer_add_absolute) ( unsigned long long nanoseconds_from_epoch, void *data, void (*timer_fn) (void *data), corosync_timer_handle_t *handle); void (*timer_delete) ( corosync_timer_handle_t timer_handle); unsigned long long (*timer_time_get) (void); unsigned long long (*timer_expire_time_get) ( corosync_timer_handle_t timer_handle); /* * IPC APIs */ void (*ipc_source_set) (mar_message_source_t *source, void *conn); int (*ipc_source_is_local) (const mar_message_source_t *source); void *(*ipc_private_data_get) (void *conn); int (*ipc_response_send) (void *conn, const void *msg, int mlen); int (*ipc_response_iov_send) (void *conn, const struct iovec *iov, int iov_len); int (*ipc_dispatch_send) (void *conn, const void *msg, int mlen); int (*ipc_dispatch_iov_send) (void *conn, const struct iovec *iov, int iov_len); void (*ipc_refcnt_inc) (void *conn); void (*ipc_refcnt_dec) (void *conn); /* * Totem APIs */ unsigned int (*totem_nodeid_get) (void); int (*totem_family_get) (void); int (*totem_ring_reenable) (void); int (*totem_mcast) (struct iovec *iovec, int iov_len, unsigned int guarantee); int (*totem_ifaces_get) ( unsigned int nodeid, struct totem_ip_address *interfaces, char ***status, unsigned int *iface_count); char *(*totem_ifaces_print) (unsigned int nodeid); char *(*totem_ip_print) (struct totem_ip_address *addr); int (*totem_callback_token_create) ( void **handle_out, enum totem_callback_token_type type, int delete, int (*callback_fn) (enum totem_callback_token_type type, void *), void *data); /* * Totem open process groups API for those service engines * wanting their own groups */ int (*tpg_init) ( hdb_handle_t *handle, void (*deliver_fn) ( unsigned int nodeid, struct iovec *iovec, int iov_len, int endian_conversion_required), void (*confchg_fn) ( enum totem_configuration_type configuration_type, - unsigned int *member_list, int member_list_entries, - unsigned int *left_list, int left_list_entries, - unsigned int *joined_list, int joined_list_entries, - struct memb_ring_id *ring_id)); + const unsigned int *member_list, + size_t member_list_entries, + const unsigned int *left_list, + size_t left_list_entries, + const unsigned int *joined_list, + size_t joined_list_entries, + const struct memb_ring_id *ring_id)); int (*tpg_exit) ( hdb_handle_t handle); int (*tpg_join) ( hdb_handle_t handle, struct corosync_tpg_group *groups, int group_cnt); int (*tpg_leave) ( hdb_handle_t handle, struct corosync_tpg_group *groups, int group_cnt); int (*tpg_joined_mcast) ( hdb_handle_t handle, const struct iovec *iovec, int iov_len, int guarantee); int (*tpg_joined_reserve) ( hdb_handle_t handle, const struct iovec *iovec, int iov_len); int (*tpg_joined_release) ( int reserved_msgs); int (*tpg_groups_mcast) ( hdb_handle_t handle, int guarantee, const struct corosync_tpg_group *groups, int groups_cnt, const struct iovec *iovec, int iov_len); int (*tpg_groups_reserve) ( hdb_handle_t handle, const struct corosync_tpg_group *groups, int groups_cnt, const struct iovec *iovec, int iov_len); int (*tpg_groups_release) ( int reserved_msgs); int (*sync_request) ( char *service_name); /* * User plugin-callable functions for quorum */ int (*quorum_is_quorate) (void); int (*quorum_register_callback) (quorum_callback_fn_t callback_fn, void *context); int (*quorum_unregister_callback) (quorum_callback_fn_t callback_fn, void *context); /* * This one is for the quorum management plugin's use */ int (*quorum_initialize)(struct quorum_callin_functions *fns); /* * Plugin loading and unloading */ int (*plugin_interface_reference) ( hdb_handle_t *handle, const char *iface_name, int version, void **interface, void *context); int (*plugin_interface_release) (hdb_handle_t handle); /* * Service loading and unloading APIs */ unsigned int (*service_link_and_init) ( struct corosync_api_v1 *corosync_api_v1, const char *service_name, unsigned int service_ver); unsigned int (*service_unlink_and_exit) ( struct corosync_api_v1 *corosync_api_v1, const char *service_name, unsigned int service_ver); /* * Error handling APIs */ void (*error_memory_failure) (void); #define corosync_fatal_error(err) api->fatal_error ((err), __FILE__, __LINE__) void (*fatal_error) (cs_fatal_error_t err, const char *file, unsigned int line); }; #define SERVICE_ID_MAKE(a,b) ( ((a)<<16) | (b) ) #define SERVICE_HANDLER_MAXIMUM_COUNT 64 struct corosync_lib_handler { void (*lib_handler_fn) (void *conn, void *msg); int response_size; int response_id; enum cs_lib_flow_control flow_control; }; struct corosync_exec_handler { void (*exec_handler_fn) (const void *msg, unsigned int nodeid); void (*exec_endian_convert_fn) (void *msg); }; struct corosync_service_engine_iface_ver0 { struct corosync_service_engine *(*corosync_get_service_engine_ver0) (void); }; struct corosync_service_engine { const char *name; unsigned short id; unsigned int private_data_size; enum cs_lib_flow_control flow_control; enum cs_lib_allow_inquorate allow_inquorate; int (*exec_init_fn) (struct corosync_api_v1 *); int (*exec_exit_fn) (void); void (*exec_dump_fn) (void); int (*lib_init_fn) (void *conn); int (*lib_exit_fn) (void *conn); struct corosync_lib_handler *lib_engine; int lib_engine_count; struct corosync_exec_handler *exec_engine; int exec_engine_count; int (*config_init_fn) (struct corosync_api_v1 *); void (*confchg_fn) ( enum totem_configuration_type configuration_type, - unsigned int *member_list, int member_list_entries, - unsigned int *left_list, int left_list_entries, - unsigned int *joined_list, int joined_list_entries, - struct memb_ring_id *ring_id); + const unsigned int *member_list, size_t member_list_entries, + const unsigned int *left_list, size_t left_list_entries, + const unsigned int *joined_list, size_t joined_list_entries, + const struct memb_ring_id *ring_id); void (*sync_init) (void); int (*sync_process) (void); void (*sync_activate) (void); void (*sync_abort) (void); }; #endif /* COROAPI_H_DEFINED */ - diff --git a/include/corosync/totem/totempg.h b/include/corosync/totem/totempg.h index 3268374b..b58df9f7 100644 --- a/include/corosync/totem/totempg.h +++ b/include/corosync/totem/totempg.h @@ -1,150 +1,150 @@ /* * Copyright (c) 2003-2005 MontaVista Software, Inc. * Copyright (c) 2006-2007, 2009 Red Hat, Inc. * * All rights reserved. * * Author: Steven Dake (sdake@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #ifndef TOTEMPG_H_DEFINED #define TOTEMPG_H_DEFINED #include #include "totem.h" #include "coropoll.h" #include struct totempg_group { const void *group; int group_len; }; #define TOTEMPG_AGREED 0 #define TOTEMPG_SAFE 1 /* * Totem Single Ring Protocol * depends on poll abstraction, POSIX, IPV4 */ /* * Initialize the totem process groups abstraction */ extern int totempg_initialize ( hdb_handle_t poll_handle, struct totem_config *totem_config ); extern void totempg_finalize (void); extern int totempg_callback_token_create (void **handle_out, enum totem_callback_token_type type, int delete, int (*callback_fn) (enum totem_callback_token_type type, void *), void *data); extern void totempg_callback_token_destroy (void *handle); /* * Initialize a groups instance */ extern int totempg_groups_initialize ( hdb_handle_t *handle, void (*deliver_fn) ( unsigned int nodeid, struct iovec *iovec, int iov_len, int endian_conversion_required), void (*confchg_fn) ( enum totem_configuration_type configuration_type, - unsigned int *member_list, int member_list_entries, - unsigned int *left_list, int left_list_entries, - unsigned int *joined_list, int joined_list_entries, - struct memb_ring_id *ring_id)); + const unsigned int *member_list, size_t member_list_entries, + const unsigned int *left_list, size_t left_list_entries, + const unsigned int *joined_list, size_t joined_list_entries, + const struct memb_ring_id *ring_id)); extern int totempg_groups_finalize ( hdb_handle_t handle); extern int totempg_groups_join ( hdb_handle_t handle, const struct totempg_group *groups, size_t group_cnt); extern int totempg_groups_leave ( hdb_handle_t handle, const struct totempg_group *groups, size_t group_cnt); extern int totempg_groups_mcast_joined ( hdb_handle_t handle, const struct iovec *iovec, int iov_len, int guarantee); extern int totempg_groups_joined_reserve ( hdb_handle_t handle, const struct iovec *iovec, int iov_len); extern void totempg_groups_joined_release ( int msg_count); extern int totempg_groups_mcast_groups ( hdb_handle_t handle, int guarantee, const struct totempg_group *groups, size_t groups_cnt, const struct iovec *iovec, size_t iov_len); extern int totempg_groups_send_ok_groups ( hdb_handle_t handle, const struct totempg_group *groups, size_t groups_cnt, const struct iovec *iovec, size_t iov_len); extern int totempg_ifaces_get ( unsigned int nodeid, struct totem_ip_address *interfaces, char ***status, unsigned int *iface_count); extern const char *totempg_ifaces_print (unsigned int nodeid); extern unsigned int totempg_my_nodeid_get (void); extern int totempg_my_family_get (void); extern int totempg_ring_reenable (void); #endif /* TOTEMPG_H_DEFINED */ diff --git a/services/cfg.c b/services/cfg.c index 27e3842d..d93324ad 100644 --- a/services/cfg.c +++ b/services/cfg.c @@ -1,1058 +1,1058 @@ /* * Copyright (c) 2005-2006 MontaVista Software, Inc. * Copyright (c) 2006-2009 Red Hat, Inc. * * All rights reserved. * * Author: Steven Dake (sdake@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include LOGSYS_DECLARE_SUBSYS ("CFG", LOG_INFO); enum cfg_message_req_types { MESSAGE_REQ_EXEC_CFG_RINGREENABLE = 0, MESSAGE_REQ_EXEC_CFG_KILLNODE = 1, MESSAGE_REQ_EXEC_CFG_SHUTDOWN = 2 }; #define DEFAULT_SHUTDOWN_TIMEOUT 5 static struct list_head trackers_list; /* * Variables controlling a requested shutdown */ static corosync_timer_handle_t shutdown_timer; static struct cfg_info *shutdown_con; static uint32_t shutdown_flags; static int shutdown_yes; static int shutdown_no; static int shutdown_expected; struct cfg_info { struct list_head list; void *conn; void *tracker_conn; enum {SHUTDOWN_REPLY_UNKNOWN, SHUTDOWN_REPLY_YES, SHUTDOWN_REPLY_NO} shutdown_reply; }; static void cfg_confchg_fn ( enum totem_configuration_type configuration_type, - unsigned int *member_list, int member_list_entries, - unsigned int *left_list, int left_list_entries, - unsigned int *joined_list, int joined_list_entries, - struct memb_ring_id *ring_id); + const unsigned int *member_list, size_t member_list_entries, + const unsigned int *left_list, size_t left_list_entries, + const unsigned int *joined_list, size_t joined_list_entries, + const struct memb_ring_id *ring_id); static int cfg_exec_init_fn (struct corosync_api_v1 *corosync_api_v1); static struct corosync_api_v1 *api; static int cfg_lib_init_fn (void *conn); static int cfg_lib_exit_fn (void *conn); static void message_handler_req_exec_cfg_ringreenable ( const void *message, unsigned int nodeid); static void message_handler_req_exec_cfg_killnode ( const void *message, unsigned int nodeid); static void message_handler_req_exec_cfg_shutdown ( const void *message, unsigned int nodeid); static void exec_cfg_killnode_endian_convert (void *msg); static void message_handler_req_lib_cfg_ringstatusget ( void *conn, void *msg); static void message_handler_req_lib_cfg_ringreenable ( void *conn, void *msg); static void message_handler_req_lib_cfg_statetrack ( void *conn, void *msg); static void message_handler_req_lib_cfg_statetrackstop ( void *conn, void *msg); static void message_handler_req_lib_cfg_administrativestateset ( void *conn, void *msg); static void message_handler_req_lib_cfg_administrativestateget ( void *conn, void *msg); static void message_handler_req_lib_cfg_serviceload ( void *conn, void *msg); static void message_handler_req_lib_cfg_serviceunload ( void *conn, void *msg); static void message_handler_req_lib_cfg_killnode ( void *conn, void *msg); static void message_handler_req_lib_cfg_tryshutdown ( void *conn, void *msg); static void message_handler_req_lib_cfg_replytoshutdown ( void *conn, void *msg); static void message_handler_req_lib_cfg_get_node_addrs ( void *conn, void *msg); static void message_handler_req_lib_cfg_local_get ( void *conn, void *msg); /* * Service Handler Definition */ static struct corosync_lib_handler cfg_lib_engine[] = { { /* 0 */ .lib_handler_fn = message_handler_req_lib_cfg_ringstatusget, .response_size = sizeof (struct res_lib_cfg_ringstatusget), .response_id = MESSAGE_RES_CFG_RINGSTATUSGET, .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED }, { /* 1 */ .lib_handler_fn = message_handler_req_lib_cfg_ringreenable, .response_size = sizeof (struct res_lib_cfg_ringreenable), .response_id = MESSAGE_RES_CFG_RINGREENABLE, .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED }, { /* 2 */ .lib_handler_fn = message_handler_req_lib_cfg_statetrack, .response_size = sizeof (struct res_lib_cfg_statetrack), .response_id = MESSAGE_RES_CFG_STATETRACKSTART, .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED }, { /* 3 */ .lib_handler_fn = message_handler_req_lib_cfg_statetrackstop, .response_size = sizeof (struct res_lib_cfg_statetrackstop), .response_id = MESSAGE_RES_CFG_STATETRACKSTOP, .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED }, { /* 4 */ .lib_handler_fn = message_handler_req_lib_cfg_administrativestateset, .response_size = sizeof (struct res_lib_cfg_administrativestateset), .response_id = MESSAGE_RES_CFG_ADMINISTRATIVESTATESET, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 5 */ .lib_handler_fn = message_handler_req_lib_cfg_administrativestateget, .response_size = sizeof (struct res_lib_cfg_administrativestateget), .response_id = MESSAGE_RES_CFG_ADMINISTRATIVESTATEGET, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 6 */ .lib_handler_fn = message_handler_req_lib_cfg_serviceload, .response_size = sizeof (struct res_lib_cfg_serviceload), .response_id = MESSAGE_RES_CFG_SERVICELOAD, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 7 */ .lib_handler_fn = message_handler_req_lib_cfg_serviceunload, .response_size = sizeof (struct res_lib_cfg_serviceunload), .response_id = MESSAGE_RES_CFG_SERVICEUNLOAD, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 8 */ .lib_handler_fn = message_handler_req_lib_cfg_killnode, .response_size = sizeof (struct res_lib_cfg_killnode), .response_id = MESSAGE_RES_CFG_KILLNODE, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 9 */ .lib_handler_fn = message_handler_req_lib_cfg_tryshutdown, .response_size = sizeof (struct res_lib_cfg_tryshutdown), .response_id = MESSAGE_RES_CFG_TRYSHUTDOWN, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 10 */ .lib_handler_fn = message_handler_req_lib_cfg_replytoshutdown, .response_size = sizeof (struct res_lib_cfg_replytoshutdown), .response_id = MESSAGE_RES_CFG_REPLYTOSHUTDOWN, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 11 */ .lib_handler_fn = message_handler_req_lib_cfg_get_node_addrs, .response_size = sizeof (struct res_lib_cfg_get_node_addrs), .response_id = MESSAGE_RES_CFG_GET_NODE_ADDRS, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 12 */ .lib_handler_fn = message_handler_req_lib_cfg_local_get, .response_size = sizeof (struct res_lib_cfg_local_get), .response_id = MESSAGE_RES_CFG_LOCAL_GET, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED } }; static struct corosync_exec_handler cfg_exec_engine[] = { { /* 0 */ .exec_handler_fn = message_handler_req_exec_cfg_ringreenable, }, { /* 1 */ .exec_handler_fn = message_handler_req_exec_cfg_killnode, .exec_endian_convert_fn = exec_cfg_killnode_endian_convert }, { /* 2 */ .exec_handler_fn = message_handler_req_exec_cfg_shutdown, } }; /* * Exports the interface for the service */ struct corosync_service_engine cfg_service_engine = { .name = "corosync configuration service", .id = CFG_SERVICE, .private_data_size = sizeof(struct cfg_info), .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED, .allow_inquorate = CS_LIB_ALLOW_INQUORATE, .lib_init_fn = cfg_lib_init_fn, .lib_exit_fn = cfg_lib_exit_fn, .lib_engine = cfg_lib_engine, .lib_engine_count = sizeof (cfg_lib_engine) / sizeof (struct corosync_lib_handler), .exec_init_fn = cfg_exec_init_fn, .exec_engine = cfg_exec_engine, .exec_engine_count = 0, /* sizeof (cfg_aisexec_handler_fns) / sizeof (coroync_exec_handler), */ .confchg_fn = cfg_confchg_fn, }; /* * Dynamic Loader definition */ static struct corosync_service_engine *cfg_get_service_engine_ver0 (void); static struct corosync_service_engine_iface_ver0 cfg_service_engine_iface = { .corosync_get_service_engine_ver0 = cfg_get_service_engine_ver0 }; static struct lcr_iface corosync_cfg_ver0[1] = { { .name = "corosync_cfg", .version = 0, .versions_replace = 0, .versions_replace_count = 0, .dependencies = 0, .dependency_count = 0, .constructor = NULL, .destructor = NULL, .interfaces = NULL } }; static struct lcr_comp cfg_comp_ver0 = { .iface_count = 1, .ifaces = corosync_cfg_ver0 }; static struct corosync_service_engine *cfg_get_service_engine_ver0 (void) { return (&cfg_service_engine); } __attribute__ ((constructor)) static void register_this_component (void) { lcr_interfaces_set (&corosync_cfg_ver0[0], &cfg_service_engine_iface); lcr_component_register (&cfg_comp_ver0); } struct req_exec_cfg_ringreenable { mar_req_header_t header __attribute__((aligned(8))); mar_message_source_t source __attribute__((aligned(8))); }; struct req_exec_cfg_killnode { mar_req_header_t header __attribute__((aligned(8))); mar_uint32_t nodeid __attribute__((aligned(8))); mar_name_t reason __attribute__((aligned(8))); }; struct req_exec_cfg_shutdown { mar_req_header_t header __attribute__((aligned(8))); }; /* IMPL */ static int cfg_exec_init_fn ( struct corosync_api_v1 *corosync_api_v1) { api = corosync_api_v1; list_init(&trackers_list); return (0); } static void cfg_confchg_fn ( enum totem_configuration_type configuration_type, - unsigned int *member_list, int member_list_entries, - unsigned int *left_list, int left_list_entries, - unsigned int *joined_list, int joined_list_entries, - struct memb_ring_id *ring_id) + const unsigned int *member_list, size_t member_list_entries, + const unsigned int *left_list, size_t left_list_entries, + const unsigned int *joined_list, size_t joined_list_entries, + const struct memb_ring_id *ring_id) { } /* * Tell other nodes we are shutting down */ static int send_shutdown(void) { struct req_exec_cfg_shutdown req_exec_cfg_shutdown; struct iovec iovec; ENTER(); req_exec_cfg_shutdown.header.size = sizeof (struct req_exec_cfg_shutdown); req_exec_cfg_shutdown.header.id = SERVICE_ID_MAKE (CFG_SERVICE, MESSAGE_REQ_EXEC_CFG_SHUTDOWN); iovec.iov_base = (char *)&req_exec_cfg_shutdown; iovec.iov_len = sizeof (struct req_exec_cfg_shutdown); assert (api->totem_mcast (&iovec, 1, TOTEM_SAFE) == 0); LEAVE(); return 0; } static void send_test_shutdown(void *only_conn, void *exclude_conn, int status) { struct res_lib_cfg_testshutdown res_lib_cfg_testshutdown; struct list_head *iter; ENTER(); res_lib_cfg_testshutdown.header.size = sizeof(struct res_lib_cfg_testshutdown); res_lib_cfg_testshutdown.header.id = MESSAGE_RES_CFG_TESTSHUTDOWN; res_lib_cfg_testshutdown.header.error = status; res_lib_cfg_testshutdown.flags = shutdown_flags; if (only_conn) { TRACE1("sending testshutdown to only %p", only_conn); api->ipc_dispatch_send(only_conn, &res_lib_cfg_testshutdown, sizeof(res_lib_cfg_testshutdown)); } else { for (iter = trackers_list.next; iter != &trackers_list; iter = iter->next) { struct cfg_info *ci = list_entry(iter, struct cfg_info, list); if (ci->conn != exclude_conn) { TRACE1("sending testshutdown to %p", ci->tracker_conn); api->ipc_dispatch_send(ci->tracker_conn, &res_lib_cfg_testshutdown, sizeof(res_lib_cfg_testshutdown)); } } } LEAVE(); } static void check_shutdown_status(void) { ENTER(); /* * Shutdown client might have gone away */ if (!shutdown_con) { LEAVE(); return; } /* * All replies safely gathered in ? */ if (shutdown_yes + shutdown_no >= shutdown_expected) { struct res_lib_cfg_tryshutdown res_lib_cfg_tryshutdown; api->timer_delete(shutdown_timer); if (shutdown_yes >= shutdown_expected || shutdown_flags == CFG_SHUTDOWN_FLAG_REGARDLESS) { TRACE1("shutdown confirmed"); res_lib_cfg_tryshutdown.header.size = sizeof(struct res_lib_cfg_tryshutdown); res_lib_cfg_tryshutdown.header.id = MESSAGE_RES_CFG_TRYSHUTDOWN; res_lib_cfg_tryshutdown.header.error = CS_OK; /* * Tell originator that shutdown was confirmed */ api->ipc_response_send(shutdown_con->conn, &res_lib_cfg_tryshutdown, sizeof(res_lib_cfg_tryshutdown)); shutdown_con = NULL; /* * Tell other nodes we are going down */ send_shutdown(); } else { TRACE1("shutdown cancelled"); res_lib_cfg_tryshutdown.header.size = sizeof(struct res_lib_cfg_tryshutdown); res_lib_cfg_tryshutdown.header.id = MESSAGE_RES_CFG_TRYSHUTDOWN; res_lib_cfg_tryshutdown.header.error = CS_ERR_BUSY; /* * Tell originator that shutdown was cancelled */ api->ipc_response_send(shutdown_con->conn, &res_lib_cfg_tryshutdown, sizeof(res_lib_cfg_tryshutdown)); shutdown_con = NULL; } log_printf(LOG_DEBUG, "shutdown decision is: (yes count: %d, no count: %d) flags=%x\n", shutdown_yes, shutdown_no, shutdown_flags); } LEAVE(); } /* * Not all nodes responded to the shutdown (in time) */ static void shutdown_timer_fn(void *arg) { ENTER(); /* * Mark undecideds as "NO" */ shutdown_no = shutdown_expected; check_shutdown_status(); send_test_shutdown(NULL, NULL, CS_ERR_TIMEOUT); LEAVE(); } static void remove_ci_from_shutdown(struct cfg_info *ci) { ENTER(); /* * If the controlling shutdown process has quit, then cancel the * shutdown session */ if (ci == shutdown_con) { shutdown_con = NULL; api->timer_delete(shutdown_timer); } if (!list_empty(&ci->list)) { list_del(&ci->list); list_init(&ci->list); /* * Remove our option */ if (shutdown_con) { if (ci->shutdown_reply == SHUTDOWN_REPLY_YES) shutdown_yes--; if (ci->shutdown_reply == SHUTDOWN_REPLY_NO) shutdown_no--; } /* * If we are leaving, then that's an implicit YES to shutdown */ ci->shutdown_reply = SHUTDOWN_REPLY_YES; shutdown_yes++; check_shutdown_status(); } LEAVE(); } int cfg_lib_exit_fn (void *conn) { struct cfg_info *ci = (struct cfg_info *)api->ipc_private_data_get (conn); ENTER(); if (!list_empty(&ci->list)) { list_del(&ci->list); remove_ci_from_shutdown(ci); } LEAVE(); return (0); } static int cfg_lib_init_fn (void *conn) { struct cfg_info *ci = (struct cfg_info *)api->ipc_private_data_get (conn); ENTER(); list_init(&ci->list); LEAVE(); return (0); } /* * Executive message handlers */ static void message_handler_req_exec_cfg_ringreenable ( const void *message, unsigned int nodeid) { - struct req_exec_cfg_ringreenable *req_exec_cfg_ringreenable = - (struct req_exec_cfg_ringreenable *)message; + const struct req_exec_cfg_ringreenable *req_exec_cfg_ringreenable + = message; struct res_lib_cfg_ringreenable res_lib_cfg_ringreenable; ENTER(); api->totem_ring_reenable (); if (api->ipc_source_is_local(&req_exec_cfg_ringreenable->source)) { res_lib_cfg_ringreenable.header.id = MESSAGE_RES_CFG_RINGREENABLE; res_lib_cfg_ringreenable.header.size = sizeof (struct res_lib_cfg_ringreenable); res_lib_cfg_ringreenable.header.error = CS_OK; api->ipc_response_send ( req_exec_cfg_ringreenable->source.conn, &res_lib_cfg_ringreenable, sizeof (struct res_lib_cfg_ringreenable)); } LEAVE(); } static void exec_cfg_killnode_endian_convert (void *msg) { struct req_exec_cfg_killnode *req_exec_cfg_killnode = (struct req_exec_cfg_killnode *)msg; ENTER(); swab_mar_name_t(&req_exec_cfg_killnode->reason); LEAVE(); } static void message_handler_req_exec_cfg_killnode ( const void *message, unsigned int nodeid) { const struct req_exec_cfg_killnode *req_exec_cfg_killnode = message; cs_name_t reason; ENTER(); log_printf(LOG_DEBUG, "request to kill node %d(us=%d): %s\n", req_exec_cfg_killnode->nodeid, api->totem_nodeid_get(), reason.value); if (req_exec_cfg_killnode->nodeid == api->totem_nodeid_get()) { marshall_from_mar_name_t(&reason, &req_exec_cfg_killnode->reason); log_printf(LOG_NOTICE, "Killed by node %d: %s\n", nodeid, reason.value); corosync_fatal_error(COROSYNC_FATAL_ERROR_EXIT); } LEAVE(); } /* * Self shutdown */ static void message_handler_req_exec_cfg_shutdown ( const void *message, unsigned int nodeid) { ENTER(); log_printf(LOG_NOTICE, "Node %d was shut down by sysadmin\n", nodeid); if (nodeid == api->totem_nodeid_get()) { corosync_fatal_error(COROSYNC_FATAL_ERROR_EXIT); } LEAVE(); } /* * Library Interface Implementation */ static void message_handler_req_lib_cfg_ringstatusget ( void *conn, void *msg) { struct res_lib_cfg_ringstatusget res_lib_cfg_ringstatusget; struct totem_ip_address interfaces[INTERFACE_MAX]; unsigned int iface_count; char **status; char *totem_ip_string; unsigned int i; ENTER(); res_lib_cfg_ringstatusget.header.id = MESSAGE_RES_CFG_RINGSTATUSGET; res_lib_cfg_ringstatusget.header.size = sizeof (struct res_lib_cfg_ringstatusget); res_lib_cfg_ringstatusget.header.error = CS_OK; api->totem_ifaces_get ( api->totem_nodeid_get(), interfaces, &status, &iface_count); res_lib_cfg_ringstatusget.interface_count = iface_count; for (i = 0; i < iface_count; i++) { totem_ip_string = (char *)api->totem_ip_print (&interfaces[i]); strcpy ((char *)&res_lib_cfg_ringstatusget.interface_status[i], status[i]); strcpy ((char *)&res_lib_cfg_ringstatusget.interface_name[i], totem_ip_string); } api->ipc_response_send ( conn, &res_lib_cfg_ringstatusget, sizeof (struct res_lib_cfg_ringstatusget)); LEAVE(); } static void message_handler_req_lib_cfg_ringreenable ( void *conn, void *msg) { struct req_exec_cfg_ringreenable req_exec_cfg_ringreenable; struct iovec iovec; ENTER(); req_exec_cfg_ringreenable.header.size = sizeof (struct req_exec_cfg_ringreenable); req_exec_cfg_ringreenable.header.id = SERVICE_ID_MAKE (CFG_SERVICE, MESSAGE_REQ_EXEC_CFG_RINGREENABLE); api->ipc_source_set (&req_exec_cfg_ringreenable.source, conn); iovec.iov_base = (char *)&req_exec_cfg_ringreenable; iovec.iov_len = sizeof (struct req_exec_cfg_ringreenable); assert (api->totem_mcast (&iovec, 1, TOTEM_SAFE) == 0); LEAVE(); } static void message_handler_req_lib_cfg_statetrack ( void *conn, void *msg) { struct cfg_info *ci = (struct cfg_info *)api->ipc_private_data_get (conn); struct res_lib_cfg_statetrack res_lib_cfg_statetrack; ENTER(); /* * We only do shutdown tracking at the moment */ if (list_empty(&ci->list)) { list_add(&ci->list, &trackers_list); ci->tracker_conn = conn; if (shutdown_con) { /* * Shutdown already in progress, ask the newcomer's opinion */ ci->shutdown_reply = SHUTDOWN_REPLY_UNKNOWN; shutdown_expected++; send_test_shutdown(conn, NULL, CS_OK); } } res_lib_cfg_statetrack.header.size = sizeof(struct res_lib_cfg_statetrack); res_lib_cfg_statetrack.header.id = MESSAGE_RES_CFG_STATETRACKSTART; res_lib_cfg_statetrack.header.error = CS_OK; api->ipc_response_send(conn, &res_lib_cfg_statetrack, sizeof(res_lib_cfg_statetrack)); LEAVE(); } static void message_handler_req_lib_cfg_statetrackstop ( void *conn, void *msg) { struct cfg_info *ci = (struct cfg_info *)api->ipc_private_data_get (conn); // struct req_lib_cfg_statetrackstop *req_lib_cfg_statetrackstop = (struct req_lib_cfg_statetrackstop *)message; ENTER(); remove_ci_from_shutdown(ci); LEAVE(); } static void message_handler_req_lib_cfg_administrativestateset ( void *conn, void *msg) { // struct req_lib_cfg_administrativestateset *req_lib_cfg_administrativestateset = (struct req_lib_cfg_administrativestateset *)message; ENTER(); LEAVE(); } static void message_handler_req_lib_cfg_administrativestateget ( void *conn, void *msg) { // struct req_lib_cfg_administrativestateget *req_lib_cfg_administrativestateget = (struct req_lib_cfg_administrativestateget *)message; ENTER(); LEAVE(); } static void message_handler_req_lib_cfg_serviceload ( void *conn, void *msg) { struct req_lib_cfg_serviceload *req_lib_cfg_serviceload = (struct req_lib_cfg_serviceload *)msg; struct res_lib_cfg_serviceload res_lib_cfg_serviceload; ENTER(); api->service_link_and_init ( api, (char *)req_lib_cfg_serviceload->service_name, req_lib_cfg_serviceload->service_ver); res_lib_cfg_serviceload.header.id = MESSAGE_RES_CFG_SERVICEUNLOAD; res_lib_cfg_serviceload.header.size = sizeof (struct res_lib_cfg_serviceload); res_lib_cfg_serviceload.header.error = CS_OK; api->ipc_response_send ( conn, &res_lib_cfg_serviceload, sizeof (struct res_lib_cfg_serviceload)); LEAVE(); } static void message_handler_req_lib_cfg_serviceunload ( void *conn, void *msg) { struct req_lib_cfg_serviceunload *req_lib_cfg_serviceunload = (struct req_lib_cfg_serviceunload *)msg; struct res_lib_cfg_serviceunload res_lib_cfg_serviceunload; ENTER(); api->service_unlink_and_exit ( api, (char *)req_lib_cfg_serviceunload->service_name, req_lib_cfg_serviceunload->service_ver); res_lib_cfg_serviceunload.header.id = MESSAGE_RES_CFG_SERVICEUNLOAD; res_lib_cfg_serviceunload.header.size = sizeof (struct res_lib_cfg_serviceunload); res_lib_cfg_serviceunload.header.error = CS_OK; api->ipc_response_send ( conn, &res_lib_cfg_serviceunload, sizeof (struct res_lib_cfg_serviceunload)); LEAVE(); } static void message_handler_req_lib_cfg_killnode ( void *conn, void *msg) { struct req_lib_cfg_killnode *req_lib_cfg_killnode = (struct req_lib_cfg_killnode *)msg; struct res_lib_cfg_killnode res_lib_cfg_killnode; struct req_exec_cfg_killnode req_exec_cfg_killnode; struct iovec iovec; int res; ENTER(); req_exec_cfg_killnode.header.size = sizeof (struct req_exec_cfg_killnode); req_exec_cfg_killnode.header.id = SERVICE_ID_MAKE (CFG_SERVICE, MESSAGE_REQ_EXEC_CFG_KILLNODE); req_exec_cfg_killnode.nodeid = req_lib_cfg_killnode->nodeid; marshall_to_mar_name_t(&req_exec_cfg_killnode.reason, &req_lib_cfg_killnode->reason); iovec.iov_base = (char *)&req_exec_cfg_killnode; iovec.iov_len = sizeof (struct req_exec_cfg_killnode); res = api->totem_mcast (&iovec, 1, TOTEM_SAFE); res_lib_cfg_killnode.header.size = sizeof(struct res_lib_cfg_killnode); res_lib_cfg_killnode.header.id = MESSAGE_RES_CFG_KILLNODE; res_lib_cfg_killnode.header.error = CS_OK; api->ipc_response_send(conn, &res_lib_cfg_killnode, sizeof(res_lib_cfg_killnode)); LEAVE(); } static void message_handler_req_lib_cfg_tryshutdown ( void *conn, void *msg) { struct cfg_info *ci = (struct cfg_info *)api->ipc_private_data_get (conn); struct req_lib_cfg_tryshutdown *req_lib_cfg_tryshutdown = (struct req_lib_cfg_tryshutdown *)msg; struct res_lib_cfg_tryshutdown res_lib_cfg_tryshutdown; struct list_head *iter; ENTER(); if (req_lib_cfg_tryshutdown->flags == CFG_SHUTDOWN_FLAG_IMMEDIATE) { /* * Tell other nodes */ send_shutdown(); res_lib_cfg_tryshutdown.header.size = sizeof(struct res_lib_cfg_tryshutdown); res_lib_cfg_tryshutdown.header.id = MESSAGE_RES_CFG_TRYSHUTDOWN; res_lib_cfg_tryshutdown.header.error = CS_OK; api->ipc_response_send(conn, &res_lib_cfg_tryshutdown, sizeof(res_lib_cfg_tryshutdown)); LEAVE(); return; } /* * Shutdown in progress, return an error */ if (shutdown_con) { struct res_lib_cfg_tryshutdown res_lib_cfg_tryshutdown; res_lib_cfg_tryshutdown.header.size = sizeof(struct res_lib_cfg_tryshutdown); res_lib_cfg_tryshutdown.header.id = MESSAGE_RES_CFG_TRYSHUTDOWN; res_lib_cfg_tryshutdown.header.error = CS_ERR_EXIST; api->ipc_response_send(conn, &res_lib_cfg_tryshutdown, sizeof(res_lib_cfg_tryshutdown)); LEAVE(); return; } ci->conn = conn; shutdown_con = (struct cfg_info *)api->ipc_private_data_get (conn); shutdown_flags = req_lib_cfg_tryshutdown->flags; shutdown_yes = 0; shutdown_no = 0; /* * Count the number of listeners */ shutdown_expected = 0; for (iter = trackers_list.next; iter != &trackers_list; iter = iter->next) { struct cfg_info *testci = list_entry(iter, struct cfg_info, list); /* * It is assumed that we will allow shutdown */ if (testci != ci) { testci->shutdown_reply = SHUTDOWN_REPLY_UNKNOWN; shutdown_expected++; } } /* * If no-one is listening for events then we can just go down now */ if (shutdown_expected == 0) { struct res_lib_cfg_tryshutdown res_lib_cfg_tryshutdown; res_lib_cfg_tryshutdown.header.size = sizeof(struct res_lib_cfg_tryshutdown); res_lib_cfg_tryshutdown.header.id = MESSAGE_RES_CFG_TRYSHUTDOWN; res_lib_cfg_tryshutdown.header.error = CS_OK; /* * Tell originator that shutdown was confirmed */ api->ipc_response_send(conn, &res_lib_cfg_tryshutdown, sizeof(res_lib_cfg_tryshutdown)); send_shutdown(); LEAVE(); return; } else { hdb_handle_t cfg_handle; hdb_handle_t find_handle; char *timeout_str; unsigned int shutdown_timeout = DEFAULT_SHUTDOWN_TIMEOUT; /* * Look for a shutdown timeout in objdb */ api->object_find_create(OBJECT_PARENT_HANDLE, "cfg", strlen("cfg"), &find_handle); api->object_find_next(find_handle, &cfg_handle); api->object_find_destroy(find_handle); if (cfg_handle) { if ( !api->object_key_get(cfg_handle, "shutdown_timeout", strlen("shutdown_timeout"), (void *)&timeout_str, NULL)) { shutdown_timeout = atoi(timeout_str); } } /* * Start the timer. If we don't get a full set of replies before this goes * off we'll cancel the shutdown */ api->timer_add_duration((unsigned long long)shutdown_timeout*1000000000, NULL, shutdown_timer_fn, &shutdown_timer); /* * Tell the users we would like to shut down */ send_test_shutdown(NULL, conn, CS_OK); } /* * We don't sent a reply to the caller here. * We send it when we know if we can shut down or not */ LEAVE(); } static void message_handler_req_lib_cfg_replytoshutdown ( void *conn, void *msg) { struct cfg_info *ci = (struct cfg_info *)api->ipc_private_data_get (conn); struct req_lib_cfg_replytoshutdown *req_lib_cfg_replytoshutdown = (struct req_lib_cfg_replytoshutdown *)msg; struct res_lib_cfg_replytoshutdown res_lib_cfg_replytoshutdown; int status = CS_OK; ENTER(); if (!shutdown_con) { status = CS_ERR_ACCESS; goto exit_fn; } if (req_lib_cfg_replytoshutdown->response) { shutdown_yes++; ci->shutdown_reply = SHUTDOWN_REPLY_YES; } else { shutdown_no++; ci->shutdown_reply = SHUTDOWN_REPLY_NO; } check_shutdown_status(); exit_fn: res_lib_cfg_replytoshutdown.header.error = status; res_lib_cfg_replytoshutdown.header.id = MESSAGE_RES_CFG_REPLYTOSHUTDOWN; res_lib_cfg_replytoshutdown.header.size = sizeof(res_lib_cfg_replytoshutdown); api->ipc_response_send(conn, &res_lib_cfg_replytoshutdown, sizeof(res_lib_cfg_replytoshutdown)); LEAVE(); } static void message_handler_req_lib_cfg_get_node_addrs (void *conn, void *msg) { struct totem_ip_address node_ifs[INTERFACE_MAX]; char buf[PIPE_BUF]; char **status; unsigned int num_interfaces = 0; int ret = CS_OK; int i; struct req_lib_cfg_get_node_addrs *req_lib_cfg_get_node_addrs = (struct req_lib_cfg_get_node_addrs *)msg; struct res_lib_cfg_get_node_addrs *res_lib_cfg_get_node_addrs = (struct res_lib_cfg_get_node_addrs *)buf; if (req_lib_cfg_get_node_addrs->nodeid == 0) req_lib_cfg_get_node_addrs->nodeid = api->totem_nodeid_get(); api->totem_ifaces_get(req_lib_cfg_get_node_addrs->nodeid, node_ifs, &status, &num_interfaces); res_lib_cfg_get_node_addrs->header.size = sizeof(struct res_lib_cfg_get_node_addrs) + (num_interfaces * TOTEMIP_ADDRLEN); res_lib_cfg_get_node_addrs->header.id = MESSAGE_RES_CFG_GET_NODE_ADDRS; res_lib_cfg_get_node_addrs->header.error = ret; res_lib_cfg_get_node_addrs->num_addrs = num_interfaces; if (num_interfaces) { res_lib_cfg_get_node_addrs->family = node_ifs[0].family; for (i = 0; iaddrs[i][0], node_ifs[i].addr, TOTEMIP_ADDRLEN); } } else { res_lib_cfg_get_node_addrs->header.error = CS_ERR_NOT_EXIST; } api->ipc_response_send(conn, res_lib_cfg_get_node_addrs, res_lib_cfg_get_node_addrs->header.size); } static void message_handler_req_lib_cfg_local_get (void *conn, void *message) { struct res_lib_cfg_local_get res_lib_cfg_local_get; res_lib_cfg_local_get.header.size = sizeof(res_lib_cfg_local_get); res_lib_cfg_local_get.header.id = MESSAGE_RES_CFG_LOCAL_GET; res_lib_cfg_local_get.header.error = CS_OK; res_lib_cfg_local_get.local_nodeid = api->totem_nodeid_get (); api->ipc_response_send(conn, &res_lib_cfg_local_get, sizeof(res_lib_cfg_local_get)); } diff --git a/services/cpg.c b/services/cpg.c index 7cedcd6c..ede9f8f1 100644 --- a/services/cpg.c +++ b/services/cpg.c @@ -1,1229 +1,1229 @@ /* * Copyright (c) 2006-2009 Red Hat, Inc. * * All rights reserved. * * Author: Christine Caulfield (ccaulfie@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #ifndef COROSYNC_BSD #include #endif #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include LOGSYS_DECLARE_SUBSYS ("CPG", LOG_INFO); #define GROUP_HASH_SIZE 32 #define PI_FLAG_MEMBER 1 enum cpg_message_req_types { MESSAGE_REQ_EXEC_CPG_PROCJOIN = 0, MESSAGE_REQ_EXEC_CPG_PROCLEAVE = 1, MESSAGE_REQ_EXEC_CPG_JOINLIST = 2, MESSAGE_REQ_EXEC_CPG_MCAST = 3, MESSAGE_REQ_EXEC_CPG_DOWNLIST = 4 }; struct removed_group { struct group_info *gi; struct list_head list; /* on removed_list */ int left_list_entries; mar_cpg_address_t left_list[PROCESSOR_COUNT_MAX]; int left_list_size; }; struct group_info { mar_cpg_name_t group_name; struct list_head members; struct list_head list; /* on hash list */ struct removed_group *rg; /* when a node goes down */ }; struct process_info { unsigned int nodeid; uint32_t pid; uint32_t flags; void *conn; void *trackerconn; struct group_info *group; struct list_head list; /* on the group_info members list */ }; struct join_list_entry { uint32_t pid; mar_cpg_name_t group_name; }; static struct list_head group_lists[GROUP_HASH_SIZE]; static struct corosync_api_v1 *api = NULL; /* * Service Interfaces required by service_message_handler struct */ static void cpg_confchg_fn ( enum totem_configuration_type configuration_type, - unsigned int *member_list, int member_list_entries, - unsigned int *left_list, int left_list_entries, - unsigned int *joined_list, int joined_list_entries, - struct memb_ring_id *ring_id); + const unsigned int *member_list, size_t member_list_entries, + const unsigned int *left_list, size_t left_list_entries, + const unsigned int *joined_list, size_t joined_list_entries, + const struct memb_ring_id *ring_id); static int cpg_exec_init_fn (struct corosync_api_v1 *); static int cpg_lib_init_fn (void *conn); static int cpg_lib_exit_fn (void *conn); static void message_handler_req_exec_cpg_procjoin ( const void *message, unsigned int nodeid); static void message_handler_req_exec_cpg_procleave ( const void *message, unsigned int nodeid); static void message_handler_req_exec_cpg_joinlist ( const void *message, unsigned int nodeid); static void message_handler_req_exec_cpg_mcast ( const void *message, unsigned int nodeid); static void message_handler_req_exec_cpg_downlist ( const void *message, unsigned int nodeid); static void exec_cpg_procjoin_endian_convert (void *msg); static void exec_cpg_joinlist_endian_convert (void *msg); static void exec_cpg_mcast_endian_convert (void *msg); static void exec_cpg_downlist_endian_convert (void *msg); static void message_handler_req_lib_cpg_join (void *conn, void *message); static void message_handler_req_lib_cpg_leave (void *conn, void *message); static void message_handler_req_lib_cpg_mcast (void *conn, void *message); static void message_handler_req_lib_cpg_membership (void *conn, void *message); static void message_handler_req_lib_cpg_trackstart (void *conn, void *message); static void message_handler_req_lib_cpg_trackstop (void *conn, void *message); static void message_handler_req_lib_cpg_local_get (void *conn, void *message); static void message_handler_req_lib_cpg_groups_get (void *conn, void *message); static int cpg_node_joinleave_send (struct group_info *gi, struct process_info *pi, int fn, int reason); static int cpg_exec_send_joinlist(void); static void cpg_sync_init (void); static int cpg_sync_process (void); static void cpg_sync_activate (void); static void cpg_sync_abort (void); /* * Library Handler Definition */ static struct corosync_lib_handler cpg_lib_engine[] = { { /* 0 */ .lib_handler_fn = message_handler_req_lib_cpg_join, .response_size = sizeof (struct res_lib_cpg_join), .response_id = MESSAGE_RES_CPG_JOIN, .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED }, { /* 1 */ .lib_handler_fn = message_handler_req_lib_cpg_leave, .response_size = sizeof (struct res_lib_cpg_leave), .response_id = MESSAGE_RES_CPG_LEAVE, .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED }, { /* 2 */ .lib_handler_fn = message_handler_req_lib_cpg_mcast, .response_size = sizeof (struct res_lib_cpg_mcast), .response_id = MESSAGE_RES_CPG_MCAST, .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED }, { /* 3 */ .lib_handler_fn = message_handler_req_lib_cpg_membership, .response_size = sizeof (mar_res_header_t), .response_id = MESSAGE_RES_CPG_MEMBERSHIP, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 4 */ .lib_handler_fn = message_handler_req_lib_cpg_trackstart, .response_size = sizeof (struct res_lib_cpg_trackstart), .response_id = MESSAGE_RES_CPG_TRACKSTART, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 5 */ .lib_handler_fn = message_handler_req_lib_cpg_trackstop, .response_size = sizeof (struct res_lib_cpg_trackstart), .response_id = MESSAGE_RES_CPG_TRACKSTOP, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 6 */ .lib_handler_fn = message_handler_req_lib_cpg_local_get, .response_size = sizeof (struct res_lib_cpg_local_get), .response_id = MESSAGE_RES_CPG_LOCAL_GET, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 7 */ .lib_handler_fn = message_handler_req_lib_cpg_groups_get, .response_size = sizeof (struct res_lib_cpg_groups_get), .response_id = MESSAGE_RES_CPG_GROUPS_GET, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED } }; static struct corosync_exec_handler cpg_exec_engine[] = { { /* 0 */ .exec_handler_fn = message_handler_req_exec_cpg_procjoin, .exec_endian_convert_fn = exec_cpg_procjoin_endian_convert }, { /* 1 */ .exec_handler_fn = message_handler_req_exec_cpg_procleave, .exec_endian_convert_fn = exec_cpg_procjoin_endian_convert }, { /* 2 */ .exec_handler_fn = message_handler_req_exec_cpg_joinlist, .exec_endian_convert_fn = exec_cpg_joinlist_endian_convert }, { /* 3 */ .exec_handler_fn = message_handler_req_exec_cpg_mcast, .exec_endian_convert_fn = exec_cpg_mcast_endian_convert }, { /* 4 */ .exec_handler_fn = message_handler_req_exec_cpg_downlist, .exec_endian_convert_fn = exec_cpg_downlist_endian_convert }, }; struct corosync_service_engine cpg_service_engine = { .name = "corosync cluster closed process group service v1.01", .id = CPG_SERVICE, .private_data_size = sizeof (struct process_info), .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED, .allow_inquorate = CS_LIB_ALLOW_INQUORATE, .lib_init_fn = cpg_lib_init_fn, .lib_exit_fn = cpg_lib_exit_fn, .lib_engine = cpg_lib_engine, .lib_engine_count = sizeof (cpg_lib_engine) / sizeof (struct corosync_lib_handler), .exec_init_fn = cpg_exec_init_fn, .exec_dump_fn = NULL, .exec_engine = cpg_exec_engine, .exec_engine_count = sizeof (cpg_exec_engine) / sizeof (struct corosync_exec_handler), .confchg_fn = cpg_confchg_fn, .sync_init = cpg_sync_init, .sync_process = cpg_sync_process, .sync_activate = cpg_sync_activate, .sync_abort = cpg_sync_abort }; /* * Dynamic loader definition */ static struct corosync_service_engine *cpg_get_service_engine_ver0 (void); static struct corosync_service_engine_iface_ver0 cpg_service_engine_iface = { .corosync_get_service_engine_ver0 = cpg_get_service_engine_ver0 }; static struct lcr_iface corosync_cpg_ver0[1] = { { .name = "corosync_cpg", .version = 0, .versions_replace = 0, .versions_replace_count = 0, .dependencies = 0, .dependency_count = 0, .constructor = NULL, .destructor = NULL, .interfaces = NULL } }; static struct lcr_comp cpg_comp_ver0 = { .iface_count = 1, .ifaces = corosync_cpg_ver0 }; static struct corosync_service_engine *cpg_get_service_engine_ver0 (void) { return (&cpg_service_engine); } __attribute__ ((constructor)) static void cpg_comp_register (void) { lcr_interfaces_set (&corosync_cpg_ver0[0], &cpg_service_engine_iface); lcr_component_register (&cpg_comp_ver0); } struct req_exec_cpg_procjoin { mar_req_header_t header __attribute__((aligned(8))); mar_cpg_name_t group_name __attribute__((aligned(8))); mar_uint32_t pid __attribute__((aligned(8))); mar_uint32_t reason __attribute__((aligned(8))); }; struct req_exec_cpg_mcast { mar_req_header_t header __attribute__((aligned(8))); mar_cpg_name_t group_name __attribute__((aligned(8))); mar_uint32_t msglen __attribute__((aligned(8))); mar_uint32_t pid __attribute__((aligned(8))); mar_message_source_t source __attribute__((aligned(8))); mar_uint8_t message[] __attribute__((aligned(8))); }; struct req_exec_cpg_downlist { mar_req_header_t header __attribute__((aligned(8))); mar_uint32_t left_nodes __attribute__((aligned(8))); mar_uint32_t nodeids[PROCESSOR_COUNT_MAX] __attribute__((aligned(8))); }; static struct req_exec_cpg_downlist req_exec_cpg_downlist; static void cpg_sync_init (void) { } static int cpg_sync_process (void) { return cpg_exec_send_joinlist(); } static void cpg_sync_activate (void) { } static void cpg_sync_abort (void) { } static int notify_lib_joinlist( struct group_info *gi, void *conn, int joined_list_entries, mar_cpg_address_t *joined_list, int left_list_entries, mar_cpg_address_t *left_list, int id) { int count = 0; char *buf; struct res_lib_cpg_confchg_callback *res; struct list_head *iter; struct list_head *tmp; mar_cpg_address_t *retgi; int size; /* First, we need to know how many nodes are in the list. While we're traversing this list, look for the 'us' entry so we know which connection to send back down */ for (iter = gi->members.next; iter != &gi->members; iter = iter->next) { struct process_info *pi = list_entry(iter, struct process_info, list); if (pi->pid) count++; } log_printf(LOG_LEVEL_DEBUG, "Sending new joinlist (%d elements) to clients\n", count); size = sizeof(struct res_lib_cpg_confchg_callback) + sizeof(mar_cpg_address_t) * (count + left_list_entries + joined_list_entries); buf = alloca(size); if (!buf) return CS_ERR_NO_SPACE; res = (struct res_lib_cpg_confchg_callback *)buf; res->joined_list_entries = joined_list_entries; res->left_list_entries = left_list_entries; retgi = res->member_list; res->header.size = size; res->header.id = id; res->header.error = CS_OK; memcpy(&res->group_name, &gi->group_name, sizeof(mar_cpg_name_t)); /* Build up the message */ count = 0; for (iter = gi->members.next; iter != &gi->members; iter = iter->next) { struct process_info *pi = list_entry(iter, struct process_info, list); if (pi->pid) { /* Processes leaving will be removed AFTER this is done (so that they get their own leave notifications), so exclude them from the members list here */ int i; for (i=0; ipid && left_list[i].nodeid == pi->nodeid) goto next_member; } retgi->nodeid = pi->nodeid; retgi->pid = pi->pid; retgi++; count++; next_member: ; } } res->member_list_entries = count; if (left_list_entries) { memcpy(retgi, left_list, left_list_entries * sizeof(mar_cpg_address_t)); retgi += left_list_entries; } if (joined_list_entries) { memcpy(retgi, joined_list, joined_list_entries * sizeof(mar_cpg_address_t)); retgi += joined_list_entries; } if (conn) { api->ipc_dispatch_send(conn, buf, size); } else { /* Send it to all listeners */ for (iter = gi->members.next, tmp=iter->next; iter != &gi->members; iter = tmp, tmp=iter->next) { struct process_info *pi = list_entry(iter, struct process_info, list); if (pi->trackerconn && (pi->flags & PI_FLAG_MEMBER)) { if (api->ipc_dispatch_send(pi->trackerconn, buf, size) == -1) { // Error ?? } } } } return CS_OK; } static void remove_group(struct group_info *gi) { list_del(&gi->list); free(gi); } static int cpg_exec_init_fn (struct corosync_api_v1 *corosync_api) { int i; for (i=0; iipc_private_data_get (conn); struct group_info *gi = pi->group; mar_cpg_address_t notify_info; log_printf(LOG_LEVEL_DEBUG, "exit_fn for conn=%p\n", conn); if (gi) { notify_info.pid = pi->pid; notify_info.nodeid = api->totem_nodeid_get(); notify_info.reason = CONFCHG_CPG_REASON_PROCDOWN; cpg_node_joinleave_send(gi, pi, MESSAGE_REQ_EXEC_CPG_PROCLEAVE, CONFCHG_CPG_REASON_PROCDOWN); } if (pi->pid) list_del(&pi->list); api->ipc_refcnt_dec (conn); return (0); } static int count_groups(void) { struct list_head *iter; int num_groups = 0; uint32_t hash; for (hash=0 ; hash < GROUP_HASH_SIZE; hash++) { for (iter = group_lists[hash].next; iter != &group_lists[hash]; iter = iter->next) { num_groups++; } } return num_groups; } static struct group_info *get_group(const mar_cpg_name_t *name) { struct list_head *iter; struct group_info *gi = NULL; struct group_info *itergi; uint32_t hash = jhash(name->value, name->length, 0) % GROUP_HASH_SIZE; for (iter = group_lists[hash].next; iter != &group_lists[hash]; iter = iter->next) { itergi = list_entry(iter, struct group_info, list); if (memcmp(itergi->group_name.value, name->value, name->length) == 0) { gi = itergi; break; } } if (!gi) { gi = malloc(sizeof(struct group_info)); if (!gi) { log_printf(LOG_LEVEL_WARNING, "Unable to allocate group_info struct"); return NULL; } memcpy(&gi->group_name, name, sizeof(mar_cpg_name_t)); gi->rg = NULL; list_init(&gi->members); list_add(&gi->list, &group_lists[hash]); } return gi; } static int cpg_node_joinleave_send (struct group_info *gi, struct process_info *pi, int fn, int reason) { struct req_exec_cpg_procjoin req_exec_cpg_procjoin; struct iovec req_exec_cpg_iovec; int result; memcpy(&req_exec_cpg_procjoin.group_name, &gi->group_name, sizeof(mar_cpg_name_t)); req_exec_cpg_procjoin.pid = pi->pid; req_exec_cpg_procjoin.reason = reason; req_exec_cpg_procjoin.header.size = sizeof(req_exec_cpg_procjoin); req_exec_cpg_procjoin.header.id = SERVICE_ID_MAKE(CPG_SERVICE, fn); req_exec_cpg_iovec.iov_base = (char *)&req_exec_cpg_procjoin; req_exec_cpg_iovec.iov_len = sizeof(req_exec_cpg_procjoin); result = api->totem_mcast (&req_exec_cpg_iovec, 1, TOTEM_AGREED); return (result); } static void remove_node_from_groups( unsigned int nodeid, struct list_head *remlist) { int i; struct list_head *iter, *iter2, *tmp; struct process_info *pi; struct group_info *gi; for (i=0; i < GROUP_HASH_SIZE; i++) { for (iter = group_lists[i].next; iter != &group_lists[i]; iter = iter->next) { gi = list_entry(iter, struct group_info, list); for (iter2 = gi->members.next, tmp = iter2->next; iter2 != &gi->members; iter2 = tmp, tmp = iter2->next) { pi = list_entry(iter2, struct process_info, list); if (pi->nodeid == nodeid) { /* Add it to the list of nodes to send notifications for */ if (!gi->rg) { gi->rg = malloc(sizeof(struct removed_group)); if (gi->rg) { list_add(&gi->rg->list, remlist); gi->rg->gi = gi; gi->rg->left_list_entries = 0; gi->rg->left_list_size = PROCESSOR_COUNT_MAX; } else { log_printf(LOG_LEVEL_CRIT, "Unable to allocate removed group struct. CPG callbacks will be junk."); return; } } /* Do we need to increase the size ? * Yes, I increase this exponentially. Generally, if you've got a lot of groups, * you'll have a /lot/ of groups, and cgp_groupinfo is pretty small anyway */ if (gi->rg->left_list_size == gi->rg->left_list_entries) { int newsize; struct removed_group *newrg; list_del(&gi->rg->list); newsize = gi->rg->left_list_size * 2; newrg = realloc(gi->rg, sizeof(struct removed_group) + newsize*sizeof(mar_cpg_address_t)); if (!newrg) { log_printf(LOG_LEVEL_CRIT, "Unable to realloc removed group struct. CPG callbacks will be junk."); return; } newrg->left_list_size = newsize+PROCESSOR_COUNT_MAX; gi->rg = newrg; list_add(&gi->rg->list, remlist); } gi->rg->left_list[gi->rg->left_list_entries].pid = pi->pid; gi->rg->left_list[gi->rg->left_list_entries].nodeid = pi->nodeid; gi->rg->left_list[gi->rg->left_list_entries].reason = CONFCHG_CPG_REASON_NODEDOWN; gi->rg->left_list_entries++; /* Remove node info for dead node */ list_del(&pi->list); free(pi); } } } } } static void cpg_confchg_fn ( enum totem_configuration_type configuration_type, - unsigned int *member_list, int member_list_entries, - unsigned int *left_list, int left_list_entries, - unsigned int *joined_list, int joined_list_entries, - struct memb_ring_id *ring_id) + const unsigned int *member_list, size_t member_list_entries, + const unsigned int *left_list, size_t left_list_entries, + const unsigned int *joined_list, size_t joined_list_entries, + const struct memb_ring_id *ring_id) { int i; uint32_t lowest_nodeid = 0xffffffff; struct iovec req_exec_cpg_iovec; /* We don't send the library joinlist in here because it can end up out of order with the rest of the messages (which are totem ordered). So we get the lowest nodeid to send out a list of left nodes instead. On receipt of that message, all nodes will then notify their local clients of the new joinlist */ if (left_list_entries) { for (i = 0; i < member_list_entries; i++) { if (member_list[i] < lowest_nodeid) lowest_nodeid = member_list[i]; } log_printf(LOG_LEVEL_DEBUG, "confchg, low nodeid=%d, us = %d\n", lowest_nodeid, api->totem_nodeid_get()); if (lowest_nodeid == api->totem_nodeid_get()) { req_exec_cpg_downlist.header.id = SERVICE_ID_MAKE(CPG_SERVICE, MESSAGE_REQ_EXEC_CPG_DOWNLIST); req_exec_cpg_downlist.header.size = sizeof(struct req_exec_cpg_downlist); req_exec_cpg_downlist.left_nodes = left_list_entries; for (i = 0; i < left_list_entries; i++) { req_exec_cpg_downlist.nodeids[i] = left_list[i]; } log_printf(LOG_LEVEL_DEBUG, "confchg, build downlist: %d nodes\n", left_list_entries); } } /* Don't send this message until we get the final configuration message */ if (configuration_type == TOTEM_CONFIGURATION_REGULAR && req_exec_cpg_downlist.left_nodes) { req_exec_cpg_iovec.iov_base = (char *)&req_exec_cpg_downlist; req_exec_cpg_iovec.iov_len = req_exec_cpg_downlist.header.size; api->totem_mcast (&req_exec_cpg_iovec, 1, TOTEM_AGREED); req_exec_cpg_downlist.left_nodes = 0; log_printf(LOG_LEVEL_DEBUG, "confchg, sent downlist\n"); } } /* Can byteswap join & leave messages */ static void exec_cpg_procjoin_endian_convert (void *msg) { struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = (struct req_exec_cpg_procjoin *)msg; req_exec_cpg_procjoin->pid = swab32(req_exec_cpg_procjoin->pid); swab_mar_cpg_name_t (&req_exec_cpg_procjoin->group_name); req_exec_cpg_procjoin->reason = swab32(req_exec_cpg_procjoin->reason); } static void exec_cpg_joinlist_endian_convert (void *msg_v) { char *msg = msg_v; mar_res_header_t *res = (mar_res_header_t *)msg; struct join_list_entry *jle = (struct join_list_entry *)(msg + sizeof(mar_res_header_t)); /* XXX shouldn't mar_res_header be swabbed? */ while ((const char*)jle < msg + res->size) { jle->pid = swab32(jle->pid); swab_mar_cpg_name_t (&jle->group_name); jle++; } } static void exec_cpg_downlist_endian_convert (void *msg) { struct req_exec_cpg_downlist *req_exec_cpg_downlist = (struct req_exec_cpg_downlist *)msg; unsigned int i; req_exec_cpg_downlist->left_nodes = swab32(req_exec_cpg_downlist->left_nodes); for (i = 0; i < req_exec_cpg_downlist->left_nodes; i++) { req_exec_cpg_downlist->nodeids[i] = swab32(req_exec_cpg_downlist->nodeids[i]); } } static void exec_cpg_mcast_endian_convert (void *msg) { struct req_exec_cpg_mcast *req_exec_cpg_mcast = (struct req_exec_cpg_mcast *)msg; swab_mar_req_header_t (&req_exec_cpg_mcast->header); swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name); req_exec_cpg_mcast->pid = swab32(req_exec_cpg_mcast->pid); req_exec_cpg_mcast->msglen = swab32(req_exec_cpg_mcast->msglen); swab_mar_message_source_t (&req_exec_cpg_mcast->source); } static void do_proc_join( const mar_cpg_name_t *name, uint32_t pid, unsigned int nodeid, int reason) { struct group_info *gi; struct process_info *pi; struct list_head *iter; mar_cpg_address_t notify_info; gi = get_group(name); /* this will always succeed ! */ assert(gi); /* See if it already exists in this group */ for (iter = gi->members.next; iter != &gi->members; iter = iter->next) { pi = list_entry(iter, struct process_info, list); if (pi->pid == pid && pi->nodeid == nodeid) { /* It could be a local join message */ if ((nodeid == api->totem_nodeid_get()) && (!pi->flags & PI_FLAG_MEMBER)) { goto local_join; } else { return; } } } pi = malloc(sizeof(struct process_info)); if (!pi) { log_printf(LOG_LEVEL_WARNING, "Unable to allocate process_info struct"); return; } pi->nodeid = nodeid; pi->pid = pid; pi->group = gi; pi->conn = NULL; pi->trackerconn = NULL; list_add_tail(&pi->list, &gi->members); local_join: pi->flags = PI_FLAG_MEMBER; notify_info.pid = pi->pid; notify_info.nodeid = nodeid; notify_info.reason = reason; notify_lib_joinlist(gi, NULL, 1, ¬ify_info, 0, NULL, MESSAGE_RES_CPG_CONFCHG_CALLBACK); } static void message_handler_req_exec_cpg_downlist ( const void *message, unsigned int nodeid) { const struct req_exec_cpg_downlist *req_exec_cpg_downlist = message; int i; struct list_head removed_list; log_printf(LOG_LEVEL_DEBUG, "downlist left_list: %d\n", req_exec_cpg_downlist->left_nodes); list_init(&removed_list); /* Remove nodes from joined groups and add removed groups to the list */ for (i = 0; i < req_exec_cpg_downlist->left_nodes; i++) { remove_node_from_groups( req_exec_cpg_downlist->nodeids[i], &removed_list); } if (!list_empty(&removed_list)) { struct list_head *iter, *tmp; for (iter = removed_list.next, tmp=iter->next; iter != &removed_list; iter = tmp, tmp = iter->next) { struct removed_group *rg = list_entry(iter, struct removed_group, list); notify_lib_joinlist(rg->gi, NULL, 0, NULL, rg->left_list_entries, rg->left_list, MESSAGE_RES_CPG_CONFCHG_CALLBACK); rg->gi->rg = NULL; free(rg); } } } static void message_handler_req_exec_cpg_procjoin ( const void *message, unsigned int nodeid) { const struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = message; log_printf(LOG_LEVEL_DEBUG, "got procjoin message from cluster node %d\n", nodeid); do_proc_join(&req_exec_cpg_procjoin->group_name, req_exec_cpg_procjoin->pid, nodeid, CONFCHG_CPG_REASON_JOIN); } static void message_handler_req_exec_cpg_procleave ( const void *message, unsigned int nodeid) { const struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = message; struct group_info *gi; struct process_info *pi; struct list_head *iter; mar_cpg_address_t notify_info; log_printf(LOG_LEVEL_DEBUG, "got procleave message from cluster node %d\n", nodeid); gi = get_group(&req_exec_cpg_procjoin->group_name); /* this will always succeed ! */ assert(gi); notify_info.pid = req_exec_cpg_procjoin->pid; notify_info.nodeid = nodeid; notify_info.reason = req_exec_cpg_procjoin->reason; notify_lib_joinlist(gi, NULL, 0, NULL, 1, ¬ify_info, MESSAGE_RES_CPG_CONFCHG_CALLBACK); /* Find the node/PID to remove */ for (iter = gi->members.next; iter != &gi->members; iter = iter->next) { pi = list_entry(iter, struct process_info, list); if (pi->pid == req_exec_cpg_procjoin->pid && pi->nodeid == nodeid) { list_del(&pi->list); if (!pi->conn) free(pi); else pi->pid = 0; if (list_empty(&gi->members)) { remove_group(gi); } break; } } } /* Got a proclist from another node */ static void message_handler_req_exec_cpg_joinlist ( const void *message_v, unsigned int nodeid) { const char *message = message_v; const mar_res_header_t *res = (const mar_res_header_t *)message; const struct join_list_entry *jle = (const struct join_list_entry *)(message + sizeof(mar_res_header_t)); log_printf(LOG_LEVEL_NOTICE, "got joinlist message from node %d\n", nodeid); /* Ignore our own messages */ if (nodeid == api->totem_nodeid_get()) { return; } while ((const char*)jle < message + res->size) { do_proc_join(&jle->group_name, jle->pid, nodeid, CONFCHG_CPG_REASON_NODEUP); jle++; } } static void message_handler_req_exec_cpg_mcast ( const void *message, unsigned int nodeid) { const struct req_exec_cpg_mcast *req_exec_cpg_mcast = message; struct res_lib_cpg_deliver_callback *res_lib_cpg_mcast; int msglen = req_exec_cpg_mcast->msglen; char buf[sizeof(*res_lib_cpg_mcast) + msglen]; struct group_info *gi; struct list_head *iter; /* * Track local messages so that flow is controlled on the local node */ gi = get_group(&req_exec_cpg_mcast->group_name); /* this will always succeed ! */ assert(gi); res_lib_cpg_mcast = (struct res_lib_cpg_deliver_callback *)buf; res_lib_cpg_mcast->header.id = MESSAGE_RES_CPG_DELIVER_CALLBACK; res_lib_cpg_mcast->header.size = sizeof(*res_lib_cpg_mcast) + msglen; res_lib_cpg_mcast->msglen = msglen; res_lib_cpg_mcast->pid = req_exec_cpg_mcast->pid; res_lib_cpg_mcast->nodeid = nodeid; if (api->ipc_source_is_local (&req_exec_cpg_mcast->source)) { api->ipc_refcnt_dec (req_exec_cpg_mcast->source.conn); } memcpy(&res_lib_cpg_mcast->group_name, &gi->group_name, sizeof(mar_cpg_name_t)); memcpy(&res_lib_cpg_mcast->message, (const char*)message+sizeof(*req_exec_cpg_mcast), msglen); /* Send to all interested members */ for (iter = gi->members.next; iter != &gi->members; iter = iter->next) { struct process_info *pi = list_entry(iter, struct process_info, list); if (pi->trackerconn && (pi->flags & PI_FLAG_MEMBER)) { api->ipc_dispatch_send( pi->trackerconn, buf, res_lib_cpg_mcast->header.size); } } } static int cpg_exec_send_joinlist(void) { int count = 0; char *buf; int i; struct list_head *iter; struct list_head *iter2; struct group_info *gi; mar_res_header_t *res; struct join_list_entry *jle; struct iovec req_exec_cpg_iovec; log_printf(LOG_LEVEL_DEBUG, "sending joinlist to cluster\n"); /* Count the number of groups we are a member of */ for (i=0; inext) { gi = list_entry(iter, struct group_info, list); for (iter2 = gi->members.next; iter2 != &gi->members; iter2 = iter2->next) { struct process_info *pi = list_entry(iter2, struct process_info, list); if (pi->pid && pi->nodeid == api->totem_nodeid_get()) { count++; } } } } /* Nothing to send */ if (!count) return 0; buf = alloca(sizeof(mar_res_header_t) + sizeof(struct join_list_entry) * count); if (!buf) { log_printf(LOG_LEVEL_WARNING, "Unable to allocate joinlist buffer"); return -1; } jle = (struct join_list_entry *)(buf + sizeof(mar_res_header_t)); res = (mar_res_header_t *)buf; for (i=0; inext) { gi = list_entry(iter, struct group_info, list); for (iter2 = gi->members.next; iter2 != &gi->members; iter2 = iter2->next) { struct process_info *pi = list_entry(iter2, struct process_info, list); if (pi->pid && pi->nodeid == api->totem_nodeid_get()) { memcpy(&jle->group_name, &gi->group_name, sizeof(mar_cpg_name_t)); jle->pid = pi->pid; jle++; } } } } res->id = SERVICE_ID_MAKE(CPG_SERVICE, MESSAGE_REQ_EXEC_CPG_JOINLIST); res->size = sizeof(mar_res_header_t)+sizeof(struct join_list_entry) * count; req_exec_cpg_iovec.iov_base = buf; req_exec_cpg_iovec.iov_len = res->size; return (api->totem_mcast (&req_exec_cpg_iovec, 1, TOTEM_AGREED)); } static int cpg_lib_init_fn (void *conn) { struct process_info *pi = (struct process_info *)api->ipc_private_data_get (conn); api->ipc_refcnt_inc (conn); pi->conn = conn; log_printf(LOG_LEVEL_DEBUG, "lib_init_fn: conn=%p, pi=%p\n", conn, pi); return (0); } /* Join message from the library */ static void message_handler_req_lib_cpg_join (void *conn, void *message) { struct req_lib_cpg_join *req_lib_cpg_join = (struct req_lib_cpg_join *)message; struct process_info *pi = (struct process_info *)api->ipc_private_data_get (conn); struct res_lib_cpg_join res_lib_cpg_join; struct group_info *gi; cs_error_t error = CS_OK; log_printf(LOG_LEVEL_DEBUG, "got join request on %p, pi=%p, pi->pid=%d\n", conn, pi, pi->pid); /* Already joined on this conn */ if (pi->pid) { error = CS_ERR_INVALID_PARAM; goto join_err; } gi = get_group(&req_lib_cpg_join->group_name); if (!gi) { error = CS_ERR_NO_SPACE; goto join_err; } /* Add a node entry for us */ pi->nodeid = api->totem_nodeid_get(); pi->pid = req_lib_cpg_join->pid; pi->group = gi; list_add(&pi->list, &gi->members); /* Tell the rest of the cluster */ cpg_node_joinleave_send(gi, pi, MESSAGE_REQ_EXEC_CPG_PROCJOIN, CONFCHG_CPG_REASON_JOIN); join_err: res_lib_cpg_join.header.size = sizeof(res_lib_cpg_join); res_lib_cpg_join.header.id = MESSAGE_RES_CPG_JOIN; res_lib_cpg_join.header.error = error; api->ipc_response_send(conn, &res_lib_cpg_join, sizeof(res_lib_cpg_join)); } /* Leave message from the library */ static void message_handler_req_lib_cpg_leave (void *conn, void *message) { struct process_info *pi = (struct process_info *)api->ipc_private_data_get (conn); struct res_lib_cpg_leave res_lib_cpg_leave; struct group_info *gi; cs_error_t error = CS_OK; log_printf(LOG_LEVEL_DEBUG, "got leave request on %p\n", conn); if (!pi || !pi->pid || !pi->group) { error = CS_ERR_INVALID_PARAM; goto leave_ret; } gi = pi->group; /* Tell other nodes we are leaving. When we get this message back we will leave too */ cpg_node_joinleave_send(gi, pi, MESSAGE_REQ_EXEC_CPG_PROCLEAVE, CONFCHG_CPG_REASON_LEAVE); pi->group = NULL; leave_ret: /* send return */ res_lib_cpg_leave.header.size = sizeof(res_lib_cpg_leave); res_lib_cpg_leave.header.id = MESSAGE_RES_CPG_LEAVE; res_lib_cpg_leave.header.error = error; api->ipc_response_send(conn, &res_lib_cpg_leave, sizeof(res_lib_cpg_leave)); } /* Mcast message from the library */ static void message_handler_req_lib_cpg_mcast (void *conn, void *message) { struct req_lib_cpg_mcast *req_lib_cpg_mcast = (struct req_lib_cpg_mcast *)message; struct process_info *pi = (struct process_info *)api->ipc_private_data_get (conn); struct group_info *gi = pi->group; struct iovec req_exec_cpg_iovec[2]; struct req_exec_cpg_mcast req_exec_cpg_mcast; struct res_lib_cpg_mcast res_lib_cpg_mcast; int msglen = req_lib_cpg_mcast->msglen; int result; log_printf(LOG_LEVEL_DEBUG, "got mcast request on %p\n", conn); /* Can't send if we're not joined */ if (!gi) { res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast); res_lib_cpg_mcast.header.id = MESSAGE_RES_CPG_MCAST; res_lib_cpg_mcast.header.error = CS_ERR_ACCESS; /* TODO Better error code ?? */ api->ipc_response_send(conn, &res_lib_cpg_mcast, sizeof(res_lib_cpg_mcast)); return; } req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + msglen; req_exec_cpg_mcast.header.id = SERVICE_ID_MAKE(CPG_SERVICE, MESSAGE_REQ_EXEC_CPG_MCAST); req_exec_cpg_mcast.pid = pi->pid; req_exec_cpg_mcast.msglen = msglen; api->ipc_source_set (&req_exec_cpg_mcast.source, conn); memcpy(&req_exec_cpg_mcast.group_name, &gi->group_name, sizeof(mar_cpg_name_t)); req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast; req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast); req_exec_cpg_iovec[1].iov_base = (char *)&req_lib_cpg_mcast->message; req_exec_cpg_iovec[1].iov_len = msglen; // TODO: guarantee type... result = api->totem_mcast (req_exec_cpg_iovec, 2, TOTEM_AGREED); api->ipc_refcnt_inc (conn); res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast); res_lib_cpg_mcast.header.id = MESSAGE_RES_CPG_MCAST; res_lib_cpg_mcast.header.error = CS_OK; api->ipc_response_send(conn, &res_lib_cpg_mcast, sizeof(res_lib_cpg_mcast)); } static void message_handler_req_lib_cpg_membership (void *conn, void *message) { struct process_info *pi = (struct process_info *)api->ipc_private_data_get (conn); log_printf(LOG_LEVEL_DEBUG, "got membership request on %p\n", conn); if (!pi->group) { mar_res_header_t res; res.size = sizeof(res); res.id = MESSAGE_RES_CPG_MEMBERSHIP; res.error = CS_ERR_ACCESS; /* TODO Better error code */ api->ipc_response_send(conn, &res, sizeof(res)); return; } notify_lib_joinlist(pi->group, conn, 0, NULL, 0, NULL, MESSAGE_RES_CPG_MEMBERSHIP); } static void message_handler_req_lib_cpg_trackstart (void *conn, void *message) { struct req_lib_cpg_trackstart *req_lib_cpg_trackstart = (struct req_lib_cpg_trackstart *)message; struct res_lib_cpg_trackstart res_lib_cpg_trackstart; struct group_info *gi; struct process_info *otherpi; cs_error_t error = CS_OK; log_printf(LOG_LEVEL_DEBUG, "got trackstart request on %p\n", conn); gi = get_group(&req_lib_cpg_trackstart->group_name); if (!gi) { error = CS_ERR_NO_SPACE; goto tstart_ret; } /* Find the partner connection and add us to it's process_info struct */ otherpi = (struct process_info *)api->ipc_private_data_get (conn); otherpi->trackerconn = conn; tstart_ret: res_lib_cpg_trackstart.header.size = sizeof(res_lib_cpg_trackstart); res_lib_cpg_trackstart.header.id = MESSAGE_RES_CPG_TRACKSTART; res_lib_cpg_trackstart.header.error = CS_OK; api->ipc_response_send(conn, &res_lib_cpg_trackstart, sizeof(res_lib_cpg_trackstart)); } static void message_handler_req_lib_cpg_trackstop (void *conn, void *message) { struct req_lib_cpg_trackstop *req_lib_cpg_trackstop = (struct req_lib_cpg_trackstop *)message; struct res_lib_cpg_trackstop res_lib_cpg_trackstop; struct process_info *otherpi; struct group_info *gi; cs_error_t error = CS_OK; log_printf(LOG_LEVEL_DEBUG, "got trackstop request on %p\n", conn); gi = get_group(&req_lib_cpg_trackstop->group_name); if (!gi) { error = CS_ERR_NO_SPACE; goto tstop_ret; } /* Find the partner connection and add us to it's process_info struct */ otherpi = (struct process_info *)api->ipc_private_data_get (conn); otherpi->trackerconn = NULL; tstop_ret: res_lib_cpg_trackstop.header.size = sizeof(res_lib_cpg_trackstop); res_lib_cpg_trackstop.header.id = MESSAGE_RES_CPG_TRACKSTOP; res_lib_cpg_trackstop.header.error = CS_OK; api->ipc_response_send(conn, &res_lib_cpg_trackstop.header, sizeof(res_lib_cpg_trackstop)); } static void message_handler_req_lib_cpg_local_get (void *conn, void *message) { struct res_lib_cpg_local_get res_lib_cpg_local_get; res_lib_cpg_local_get.header.size = sizeof(res_lib_cpg_local_get); res_lib_cpg_local_get.header.id = MESSAGE_RES_CPG_LOCAL_GET; res_lib_cpg_local_get.header.error = CS_OK; res_lib_cpg_local_get.local_nodeid = api->totem_nodeid_get (); api->ipc_response_send(conn, &res_lib_cpg_local_get, sizeof(res_lib_cpg_local_get)); } static void message_handler_req_lib_cpg_groups_get (void *conn, void *message) { struct res_lib_cpg_groups_get res_lib_cpg_groups_get; res_lib_cpg_groups_get.header.size = sizeof(res_lib_cpg_groups_get); res_lib_cpg_groups_get.header.id = MESSAGE_RES_CPG_GROUPS_GET; res_lib_cpg_groups_get.header.error = CS_OK; res_lib_cpg_groups_get.num_groups = count_groups(); api->ipc_response_send(conn, &res_lib_cpg_groups_get, sizeof(res_lib_cpg_groups_get)); } diff --git a/services/evs.c b/services/evs.c index 113447da..389af981 100644 --- a/services/evs.c +++ b/services/evs.c @@ -1,527 +1,527 @@ /* * Copyright (c) 2004-2006 MontaVista Software, Inc. * Copyright (c) 2006-2009 Red Hat, Inc. * * All rights reserved. * * Author: Steven Dake (sdake@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include LOGSYS_DECLARE_SUBSYS ("EVS", LOG_INFO); enum evs_exec_message_req_types { MESSAGE_REQ_EXEC_EVS_MCAST = 0 }; /* * Service Interfaces required by service_message_handler struct */ static int evs_exec_init_fn ( struct corosync_api_v1 *corosync_api); static void evs_confchg_fn ( enum totem_configuration_type configuration_type, - unsigned int *member_list, int member_list_entries, - unsigned int *left_list, int left_list_entries, - unsigned int *joined_list, int joined_list_entries, - struct memb_ring_id *ring_id); + const unsigned int *member_list, size_t member_list_entries, + const unsigned int *left_list, size_t left_list_entries, + const unsigned int *joined_list, size_t joined_list_entries, + const struct memb_ring_id *ring_id); static void message_handler_req_exec_mcast (const void *msg, unsigned int nodeid); static void req_exec_mcast_endian_convert (void *msg); static void message_handler_req_evs_join (void *conn, void *msg); static void message_handler_req_evs_leave (void *conn, void *msg); static void message_handler_req_evs_mcast_joined (void *conn, void *msg); static void message_handler_req_evs_mcast_groups (void *conn, void *msg); static void message_handler_req_evs_membership_get (void *conn, void *msg); static int evs_lib_init_fn (void *conn); static int evs_lib_exit_fn (void *conn); struct evs_pd { struct evs_group *groups; int group_entries; struct list_head list; void *conn; }; static struct corosync_api_v1 *api; static struct corosync_lib_handler evs_lib_engine[] = { { /* 0 */ .lib_handler_fn = message_handler_req_evs_join, .response_size = sizeof (struct res_lib_evs_join), .response_id = MESSAGE_RES_EVS_JOIN, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 1 */ .lib_handler_fn = message_handler_req_evs_leave, .response_size = sizeof (struct res_lib_evs_leave), .response_id = MESSAGE_RES_EVS_LEAVE, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 2 */ .lib_handler_fn = message_handler_req_evs_mcast_joined, .response_size = sizeof (struct res_lib_evs_mcast_joined), .response_id = MESSAGE_RES_EVS_MCAST_JOINED, .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED }, { /* 3 */ .lib_handler_fn = message_handler_req_evs_mcast_groups, .response_size = sizeof (struct res_lib_evs_mcast_groups), .response_id = MESSAGE_RES_EVS_MCAST_GROUPS, .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED }, { /* 4 */ .lib_handler_fn = message_handler_req_evs_membership_get, .response_size = sizeof (struct res_lib_evs_membership_get), .response_id = MESSAGE_RES_EVS_MEMBERSHIP_GET, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED } }; static struct corosync_exec_handler evs_exec_engine[] = { { .exec_handler_fn = message_handler_req_exec_mcast, .exec_endian_convert_fn = req_exec_mcast_endian_convert } }; struct corosync_service_engine evs_service_engine = { .name = "corosync extended virtual synchrony service", .id = EVS_SERVICE, .private_data_size = sizeof (struct evs_pd), .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED, .lib_init_fn = evs_lib_init_fn, .lib_exit_fn = evs_lib_exit_fn, .lib_engine = evs_lib_engine, .lib_engine_count = sizeof (evs_lib_engine) / sizeof (struct corosync_lib_handler), .exec_engine = evs_exec_engine, .exec_engine_count = sizeof (evs_exec_engine) / sizeof (struct corosync_exec_handler), .confchg_fn = evs_confchg_fn, .exec_init_fn = evs_exec_init_fn, .exec_dump_fn = NULL }; static DECLARE_LIST_INIT (confchg_notify); /* * Dynamic loading descriptor */ static struct corosync_service_engine *evs_get_service_engine_ver0 (void); static struct corosync_service_engine_iface_ver0 evs_service_engine_iface = { .corosync_get_service_engine_ver0 = evs_get_service_engine_ver0 }; static struct lcr_iface corosync_evs_ver0[1] = { { .name = "corosync_evs", .version = 0, .versions_replace = 0, .versions_replace_count = 0, .dependencies = 0, .dependency_count = 0, .constructor = NULL, .destructor = NULL, .interfaces = NULL, } }; static struct lcr_comp evs_comp_ver0 = { .iface_count = 1, .ifaces = corosync_evs_ver0 }; static struct corosync_service_engine *evs_get_service_engine_ver0 (void) { return (&evs_service_engine); } __attribute__ ((constructor)) static void evs_comp_register (void) { lcr_interfaces_set (&corosync_evs_ver0[0], &evs_service_engine_iface); lcr_component_register (&evs_comp_ver0); } static int evs_exec_init_fn ( struct corosync_api_v1 *corosync_api) { api = corosync_api; return 0; } struct res_evs_confchg_callback res_evs_confchg_callback; static void evs_confchg_fn ( enum totem_configuration_type configuration_type, - unsigned int *member_list, int member_list_entries, - unsigned int *left_list, int left_list_entries, - unsigned int *joined_list, int joined_list_entries, - struct memb_ring_id *ring_id) + const unsigned int *member_list, size_t member_list_entries, + const unsigned int *left_list, size_t left_list_entries, + const unsigned int *joined_list, size_t joined_list_entries, + const struct memb_ring_id *ring_id) { struct list_head *list; struct evs_pd *evs_pd; /* * Build configuration change message */ res_evs_confchg_callback.header.size = sizeof (struct res_evs_confchg_callback); res_evs_confchg_callback.header.id = MESSAGE_RES_EVS_CONFCHG_CALLBACK; res_evs_confchg_callback.header.error = CS_OK; memcpy (res_evs_confchg_callback.member_list, member_list, member_list_entries * sizeof(*member_list)); res_evs_confchg_callback.member_list_entries = member_list_entries; memcpy (res_evs_confchg_callback.left_list, left_list, left_list_entries * sizeof(*left_list)); res_evs_confchg_callback.left_list_entries = left_list_entries; memcpy (res_evs_confchg_callback.joined_list, joined_list, joined_list_entries * sizeof(*joined_list)); res_evs_confchg_callback.joined_list_entries = joined_list_entries; /* * Send configuration change message to every EVS library user */ for (list = confchg_notify.next; list != &confchg_notify; list = list->next) { evs_pd = list_entry (list, struct evs_pd, list); api->ipc_response_send (evs_pd->conn, &res_evs_confchg_callback, sizeof (res_evs_confchg_callback)); } } static int evs_lib_init_fn (void *conn) { struct evs_pd *evs_pd = (struct evs_pd *)api->ipc_private_data_get (conn); log_printf (LOG_LEVEL_DEBUG, "Got request to initalize evs service.\n"); evs_pd->groups = NULL; evs_pd->group_entries = 0; evs_pd->conn = conn; list_init (&evs_pd->list); list_add (&evs_pd->list, &confchg_notify); api->ipc_response_send (conn, &res_evs_confchg_callback, sizeof (res_evs_confchg_callback)); return (0); } static int evs_lib_exit_fn (void *conn) { struct evs_pd *evs_pd = (struct evs_pd *)api->ipc_private_data_get (conn); list_del (&evs_pd->list); return (0); } static void message_handler_req_evs_join (void *conn, void *msg) { cs_error_t error = CS_OK; struct req_lib_evs_join *req_lib_evs_join = (struct req_lib_evs_join *)msg; struct res_lib_evs_join res_lib_evs_join; void *addr; struct evs_pd *evs_pd = (struct evs_pd *)api->ipc_private_data_get (conn); if (req_lib_evs_join->group_entries > 50) { error = CS_ERR_TOO_MANY_GROUPS; goto exit_error; } addr = realloc (evs_pd->groups, sizeof (struct evs_group) * (evs_pd->group_entries + req_lib_evs_join->group_entries)); if (addr == NULL) { error = CS_ERR_NO_MEMORY; goto exit_error; } evs_pd->groups = addr; memcpy (&evs_pd->groups[evs_pd->group_entries], req_lib_evs_join->groups, sizeof (struct evs_group) * req_lib_evs_join->group_entries); evs_pd->group_entries += req_lib_evs_join->group_entries; exit_error: res_lib_evs_join.header.size = sizeof (struct res_lib_evs_join); res_lib_evs_join.header.id = MESSAGE_RES_EVS_JOIN; res_lib_evs_join.header.error = error; api->ipc_response_send (conn, &res_lib_evs_join, sizeof (struct res_lib_evs_join)); } static void message_handler_req_evs_leave (void *conn, void *msg) { struct req_lib_evs_leave *req_lib_evs_leave = (struct req_lib_evs_leave *)msg; struct res_lib_evs_leave res_lib_evs_leave; cs_error_t error = CS_OK; int error_index; int i, j; int found; struct evs_pd *evs_pd = (struct evs_pd *)api->ipc_private_data_get (conn); for (i = 0; i < req_lib_evs_leave->group_entries; i++) { found = 0; for (j = 0; j < evs_pd->group_entries;) { if (memcmp (&req_lib_evs_leave->groups[i], &evs_pd->groups[j], sizeof (struct evs_group)) == 0) { /* * Delete entry */ memmove (&evs_pd->groups[j], &evs_pd->groups[j + 1], (evs_pd->group_entries - j - 1) * sizeof (struct evs_group)); evs_pd->group_entries -= 1; found = 1; break; } else { j++; } } if (found == 0) { error = CS_ERR_NOT_EXIST; error_index = i; break; } } res_lib_evs_leave.header.size = sizeof (struct res_lib_evs_leave); res_lib_evs_leave.header.id = MESSAGE_RES_EVS_LEAVE; res_lib_evs_leave.header.error = error; api->ipc_response_send (conn, &res_lib_evs_leave, sizeof (struct res_lib_evs_leave)); } static void message_handler_req_evs_mcast_joined (void *conn, void *msg) { cs_error_t error = CS_ERR_TRY_AGAIN; struct req_lib_evs_mcast_joined *req_lib_evs_mcast_joined = (struct req_lib_evs_mcast_joined *)msg; struct res_lib_evs_mcast_joined res_lib_evs_mcast_joined; struct iovec req_exec_evs_mcast_iovec[3]; struct req_exec_evs_mcast req_exec_evs_mcast; int res; struct evs_pd *evs_pd = (struct evs_pd *)api->ipc_private_data_get (conn); req_exec_evs_mcast.header.size = sizeof (struct req_exec_evs_mcast) + evs_pd->group_entries * sizeof (struct evs_group) + req_lib_evs_mcast_joined->msg_len; req_exec_evs_mcast.header.id = SERVICE_ID_MAKE (EVS_SERVICE, MESSAGE_REQ_EXEC_EVS_MCAST); req_exec_evs_mcast.msg_len = req_lib_evs_mcast_joined->msg_len; req_exec_evs_mcast.group_entries = evs_pd->group_entries; req_exec_evs_mcast_iovec[0].iov_base = (char *)&req_exec_evs_mcast; req_exec_evs_mcast_iovec[0].iov_len = sizeof (req_exec_evs_mcast); req_exec_evs_mcast_iovec[1].iov_base = (char *)evs_pd->groups; req_exec_evs_mcast_iovec[1].iov_len = evs_pd->group_entries * sizeof (struct evs_group); req_exec_evs_mcast_iovec[2].iov_base = (char *)&req_lib_evs_mcast_joined->msg; req_exec_evs_mcast_iovec[2].iov_len = req_lib_evs_mcast_joined->msg_len; res = api->totem_mcast (req_exec_evs_mcast_iovec, 3, TOTEM_AGREED); // TODO if (res == 0) { error = CS_OK; } res_lib_evs_mcast_joined.header.size = sizeof (struct res_lib_evs_mcast_joined); res_lib_evs_mcast_joined.header.id = MESSAGE_RES_EVS_MCAST_JOINED; res_lib_evs_mcast_joined.header.error = error; api->ipc_response_send (conn, &res_lib_evs_mcast_joined, sizeof (struct res_lib_evs_mcast_joined)); } static void message_handler_req_evs_mcast_groups (void *conn, void *msg) { cs_error_t error = CS_ERR_TRY_AGAIN; struct req_lib_evs_mcast_groups *req_lib_evs_mcast_groups = (struct req_lib_evs_mcast_groups *)msg; struct res_lib_evs_mcast_groups res_lib_evs_mcast_groups; struct iovec req_exec_evs_mcast_iovec[3]; struct req_exec_evs_mcast req_exec_evs_mcast; char *msg_addr; int res; req_exec_evs_mcast.header.size = sizeof (struct req_exec_evs_mcast) + sizeof (struct evs_group) * req_lib_evs_mcast_groups->group_entries + req_lib_evs_mcast_groups->msg_len; req_exec_evs_mcast.header.id = SERVICE_ID_MAKE (EVS_SERVICE, MESSAGE_REQ_EXEC_EVS_MCAST); req_exec_evs_mcast.msg_len = req_lib_evs_mcast_groups->msg_len; req_exec_evs_mcast.group_entries = req_lib_evs_mcast_groups->group_entries; msg_addr = (char *)req_lib_evs_mcast_groups + sizeof (struct req_lib_evs_mcast_groups) + (sizeof (struct evs_group) * req_lib_evs_mcast_groups->group_entries); req_exec_evs_mcast_iovec[0].iov_base = (char *)&req_exec_evs_mcast; req_exec_evs_mcast_iovec[0].iov_len = sizeof (req_exec_evs_mcast); req_exec_evs_mcast_iovec[1].iov_base = (char *)&req_lib_evs_mcast_groups->groups; req_exec_evs_mcast_iovec[1].iov_len = sizeof (struct evs_group) * req_lib_evs_mcast_groups->group_entries; req_exec_evs_mcast_iovec[2].iov_base = msg_addr; req_exec_evs_mcast_iovec[2].iov_len = req_lib_evs_mcast_groups->msg_len; res = api->totem_mcast (req_exec_evs_mcast_iovec, 3, TOTEM_AGREED); if (res == 0) { error = CS_OK; } res_lib_evs_mcast_groups.header.size = sizeof (struct res_lib_evs_mcast_groups); res_lib_evs_mcast_groups.header.id = MESSAGE_RES_EVS_MCAST_GROUPS; res_lib_evs_mcast_groups.header.error = error; api->ipc_response_send (conn, &res_lib_evs_mcast_groups, sizeof (struct res_lib_evs_mcast_groups)); } static void message_handler_req_evs_membership_get (void *conn, void *msg) { struct res_lib_evs_membership_get res_lib_evs_membership_get; res_lib_evs_membership_get.header.size = sizeof (struct res_lib_evs_membership_get); res_lib_evs_membership_get.header.id = MESSAGE_RES_EVS_MEMBERSHIP_GET; res_lib_evs_membership_get.header.error = CS_OK; res_lib_evs_membership_get.local_nodeid = api->totem_nodeid_get (); memcpy (&res_lib_evs_membership_get.member_list, &res_evs_confchg_callback.member_list, sizeof (res_lib_evs_membership_get.member_list)); res_lib_evs_membership_get.member_list_entries = res_evs_confchg_callback.member_list_entries; api->ipc_response_send (conn, &res_lib_evs_membership_get, sizeof (struct res_lib_evs_membership_get)); } static void req_exec_mcast_endian_convert (void *msg) { struct req_exec_evs_mcast *req_exec_evs_mcast = (struct req_exec_evs_mcast *)msg; req_exec_evs_mcast->group_entries = swab32 (req_exec_evs_mcast->group_entries); req_exec_evs_mcast->msg_len = swab32 (req_exec_evs_mcast->msg_len); } static void message_handler_req_exec_mcast ( const void *msg, unsigned int nodeid) { const struct req_exec_evs_mcast *req_exec_evs_mcast = msg; struct res_evs_deliver_callback res_evs_deliver_callback; const char *msg_addr; struct list_head *list; int found = 0; int i, j; struct evs_pd *evs_pd; struct iovec iov[2]; res_evs_deliver_callback.header.size = sizeof (struct res_evs_deliver_callback) + req_exec_evs_mcast->msg_len; res_evs_deliver_callback.header.id = MESSAGE_RES_EVS_DELIVER_CALLBACK; res_evs_deliver_callback.header.error = CS_OK; res_evs_deliver_callback.msglen = req_exec_evs_mcast->msg_len; msg_addr = (const char *)req_exec_evs_mcast + sizeof (struct req_exec_evs_mcast) + (sizeof (struct evs_group) * req_exec_evs_mcast->group_entries); for (list = confchg_notify.next; list != &confchg_notify; list = list->next) { found = 0; evs_pd = list_entry (list, struct evs_pd, list); for (i = 0; i < evs_pd->group_entries; i++) { for (j = 0; j < req_exec_evs_mcast->group_entries; j++) { if (memcmp (&evs_pd->groups[i], &req_exec_evs_mcast->groups[j], sizeof (struct evs_group)) == 0) { found = 1; break; } } if (found) { break; } } if (found) { res_evs_deliver_callback.local_nodeid = nodeid; iov[0].iov_base = &res_evs_deliver_callback; iov[0].iov_len = sizeof (struct res_evs_deliver_callback); iov[1].iov_base = (void *) msg_addr; /* discard const */ iov[1].iov_len = req_exec_evs_mcast->msg_len; api->ipc_dispatch_iov_send ( evs_pd->conn, iov, 2); } } } diff --git a/services/pload.c b/services/pload.c index 52713c0e..2ed7eac8 100644 --- a/services/pload.c +++ b/services/pload.c @@ -1,359 +1,359 @@ /* * Copyright (c) 2008-2009 Red Hat, Inc. * * All rights reserved. * * Author: Steven Dake (sdake@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include LOGSYS_DECLARE_SUBSYS ("PLOAD", LOG_INFO); enum pload_exec_message_req_types { MESSAGE_REQ_EXEC_PLOAD_START = 0, MESSAGE_REQ_EXEC_PLOAD_MCAST = 1 }; /* * Service Interfaces required by service_message_handler struct */ static int pload_exec_init_fn ( struct corosync_api_v1 *corosync_api); static void pload_confchg_fn ( enum totem_configuration_type configuration_type, - unsigned int *member_list, int member_list_entries, - unsigned int *left_list, int left_list_entries, - unsigned int *joined_list, int joined_list_entries, - struct memb_ring_id *ring_id); + const unsigned int *member_list, size_t member_list_entries, + const unsigned int *left_list, size_t left_list_entries, + const unsigned int *joined_list, size_t joined_list_entries, + const struct memb_ring_id *ring_id); static void message_handler_req_exec_pload_start (const void *msg, unsigned int nodeid); static void message_handler_req_exec_pload_mcast (const void *msg, unsigned int nodeid); static void req_exec_pload_start_endian_convert (void *msg); static void req_exec_pload_mcast_endian_convert (void *msg); static void message_handler_req_pload_start (void *conn, void *msg); static int pload_lib_init_fn (void *conn); static int pload_lib_exit_fn (void *conn); static char buffer[1000000]; static unsigned int msgs_delivered = 0; static unsigned int msgs_wanted = 0; static unsigned int msg_size = 0; static unsigned int msg_code = 1; static unsigned int msgs_sent = 0; static struct corosync_api_v1 *api; struct req_exec_pload_start { mar_req_header_t header; unsigned int msg_code; unsigned int msg_count; unsigned int msg_size; unsigned int time_interval; }; struct req_exec_pload_mcast { mar_req_header_t header; unsigned int msg_code; }; static struct corosync_lib_handler pload_lib_engine[] = { { /* 0 */ .lib_handler_fn = message_handler_req_pload_start, .response_size = sizeof (struct res_lib_pload_start), .response_id = MESSAGE_RES_PLOAD_START, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED } }; static struct corosync_exec_handler pload_exec_engine[] = { { .exec_handler_fn = message_handler_req_exec_pload_start, .exec_endian_convert_fn = req_exec_pload_start_endian_convert }, { .exec_handler_fn = message_handler_req_exec_pload_mcast, .exec_endian_convert_fn = req_exec_pload_mcast_endian_convert } }; struct corosync_service_engine pload_service_engine = { .name = "corosync profile loading service", .id = PLOAD_SERVICE, .private_data_size = 0, .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED, .lib_init_fn = pload_lib_init_fn, .lib_exit_fn = pload_lib_exit_fn, .lib_engine = pload_lib_engine, .lib_engine_count = sizeof (pload_lib_engine) / sizeof (struct corosync_lib_handler), .exec_engine = pload_exec_engine, .exec_engine_count = sizeof (pload_exec_engine) / sizeof (struct corosync_exec_handler), .confchg_fn = pload_confchg_fn, .exec_init_fn = pload_exec_init_fn, .exec_dump_fn = NULL }; static DECLARE_LIST_INIT (confchg_notify); /* * Dynamic loading descriptor */ static struct corosync_service_engine *pload_get_service_engine_ver0 (void); static struct corosync_service_engine_iface_ver0 pload_service_engine_iface = { .corosync_get_service_engine_ver0 = pload_get_service_engine_ver0 }; static struct lcr_iface corosync_pload_ver0[1] = { { .name = "corosync_pload", .version = 0, .versions_replace = 0, .versions_replace_count = 0, .dependencies = 0, .dependency_count = 0, .constructor = NULL, .destructor = NULL, .interfaces = NULL, } }; static struct lcr_comp pload_comp_ver0 = { .iface_count = 1, .ifaces = corosync_pload_ver0 }; static struct corosync_service_engine *pload_get_service_engine_ver0 (void) { return (&pload_service_engine); } __attribute__ ((constructor)) static void pload_comp_register (void) { lcr_interfaces_set (&corosync_pload_ver0[0], &pload_service_engine_iface); lcr_component_register (&pload_comp_ver0); } static int pload_exec_init_fn ( struct corosync_api_v1 *corosync_api) { api = corosync_api; return 0; } static void pload_confchg_fn ( enum totem_configuration_type configuration_type, - unsigned int *member_list, int member_list_entries, - unsigned int *left_list, int left_list_entries, - unsigned int *joined_list, int joined_list_entries, - struct memb_ring_id *ring_id) + const unsigned int *member_list, size_t member_list_entries, + const unsigned int *left_list, size_t left_list_entries, + const unsigned int *joined_list, size_t joined_list_entries, + const struct memb_ring_id *ring_id) { } static int pload_lib_init_fn (void *conn) { return (0); } static int pload_lib_exit_fn (void *conn) { return (0); } static void message_handler_req_pload_start (void *conn, void *msg) { struct req_lib_pload_start *req_lib_pload_start = (struct req_lib_pload_start *)msg; struct req_exec_pload_start req_exec_pload_start; struct iovec iov; req_exec_pload_start.header.id = SERVICE_ID_MAKE (PLOAD_SERVICE, MESSAGE_REQ_EXEC_PLOAD_START); req_exec_pload_start.msg_code = req_lib_pload_start->msg_code; req_exec_pload_start.msg_size = req_lib_pload_start->msg_size; req_exec_pload_start.msg_count = req_lib_pload_start->msg_count; req_exec_pload_start.time_interval = req_lib_pload_start->time_interval; iov.iov_base = &req_exec_pload_start; iov.iov_len = sizeof (struct req_exec_pload_start); api->totem_mcast (&iov, 1, TOTEM_AGREED); } static void req_exec_pload_start_endian_convert (void *msg) { } static void req_exec_pload_mcast_endian_convert (void *msg) { } static int send_message (enum totem_callback_token_type type, void *arg) { struct req_exec_pload_mcast req_exec_pload_mcast; struct iovec iov[2]; unsigned int res; int iov_len = 1; req_exec_pload_mcast.header.id = SERVICE_ID_MAKE (PLOAD_SERVICE, MESSAGE_REQ_EXEC_PLOAD_MCAST); req_exec_pload_mcast.header.size = sizeof (struct req_exec_pload_mcast) + msg_size; iov[0].iov_base = &req_exec_pload_mcast; iov[0].iov_len = sizeof (struct req_exec_pload_mcast); if (msg_size > sizeof (req_exec_pload_mcast)) { iov[1].iov_base = buffer; iov[1].iov_len = msg_size - sizeof (req_exec_pload_mcast); iov_len = 2; } do { res = api->totem_mcast (iov, iov_len, TOTEM_AGREED); if (res == -1) { break; } else { msgs_sent++; msg_code++; } } while (msgs_sent <= msgs_wanted); if (msgs_sent == msgs_wanted) { return (0); } else { return (-1); } } static void *token_callback; static void start_mcasting (void) { api->totem_callback_token_create ( &token_callback, TOTEM_CALLBACK_TOKEN_RECEIVED, 1, send_message, &token_callback); } static void message_handler_req_exec_pload_start ( const void *msg, unsigned int nodeid) { const struct req_exec_pload_start *req_exec_pload_start = msg; msgs_wanted = req_exec_pload_start->msg_count; msg_size = req_exec_pload_start->msg_size; msg_code = req_exec_pload_start->msg_code; start_mcasting (); } # define timersub(a, b, result) \ do { \ (result)->tv_sec = (a)->tv_sec - (b)->tv_sec; \ (result)->tv_usec = (a)->tv_usec - (b)->tv_usec; \ if ((result)->tv_usec < 0) { \ --(result)->tv_sec; \ (result)->tv_usec += 1000000; \ } \ } while (0) struct timeval tv1; struct timeval tv2; struct timeval tv_elapsed; int last_msg_no = 0; static void message_handler_req_exec_pload_mcast ( const void *msg, unsigned int nodeid) { const struct req_exec_pload_mcast *pload_mcast = msg; assert (pload_mcast->msg_code - 1 == last_msg_no); last_msg_no = pload_mcast->msg_code; if (msgs_delivered == 0) { gettimeofday (&tv1, NULL); } msgs_delivered += 1; if (msgs_delivered == msgs_wanted) { gettimeofday (&tv2, NULL); timersub (&tv2, &tv1, &tv_elapsed); printf ("%5d Writes ", msgs_delivered); printf ("%5d bytes per write ", msg_size); printf ("%7.3f Seconds runtime ", (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0))); printf ("%9.3f TP/s ", ((float)msgs_delivered) / (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0))); printf ("%7.3f MB/s.\n", ((float)msgs_delivered) * ((float)msg_size) / ((tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)) * 1000000.0)); } }