diff --git a/exec/coroipcs.h b/exec/coroipcs.h index 60b63d26..283b2c15 100644 --- a/exec/coroipcs.h +++ b/exec/coroipcs.h @@ -1,100 +1,100 @@ /* * Copyright (c) 2006-2009 Red Hat, Inc. * * All rights reserved. * * Author: Steven Dake (sdake@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #ifndef COROIPCS_H_DEFINED #define COROIPCS_H_DEFINED #include #define SOCKET_SERVICE_INIT 0xFFFFFFFF struct iovec; typedef int (*coroipcs_init_fn_lvalue) (void *conn); typedef int (*coroipcs_exit_fn_lvalue) (void *conn); -typedef void (*coroipcs_handler_fn_lvalue) (void *conn, void *msg); +typedef void (*coroipcs_handler_fn_lvalue) (void *conn, const void *msg); struct coroipcs_init_state { const char *socket_name; int sched_priority; void *(*malloc) (size_t size); void (*free) (void *ptr); void (*log_printf) ( const char *format, ...) __attribute__((format(printf, 1, 2))); int (*service_available)(unsigned int service); int (*private_data_size_get)(unsigned int service); int (*security_valid)(int uid, int gid); void (*serialize_lock)(void); void (*serialize_unlock)(void); int (*response_size_get)(unsigned int service, unsigned int id); int (*response_id_get)(unsigned int service, unsigned int id); int (*sending_allowed)(unsigned int service, unsigned int id, void *msg, void *sending_allowed_private_data); void (*sending_allowed_release)(void *sending_allowed_private_data); void (*poll_accept_add)(int fd); void (*poll_dispatch_add)(int fd, void *context); void (*poll_dispatch_modify)(int fd, int events); void (*fatal_error)(const char *error_msg); coroipcs_init_fn_lvalue (*init_fn_get)(unsigned int service); coroipcs_exit_fn_lvalue (*exit_fn_get)(unsigned int service); coroipcs_handler_fn_lvalue (*handler_fn_get)(unsigned int service, unsigned int id); }; extern void coroipcs_ipc_init ( struct coroipcs_init_state *init_state); extern void *coroipcs_private_data_get (void *conn); extern int coroipcs_response_send (void *conn, const void *msg, size_t mlen); extern int coroipcs_response_iov_send (void *conn, const struct iovec *iov, unsigned int iov_len); extern int coroipcs_dispatch_send (void *conn, const void *msg, size_t mlen); extern int coroipcs_dispatch_iov_send (void *conn, const struct iovec *iov, unsigned int iov_len); extern void coroipcs_refcount_inc (void *conn); extern void coroipcs_refcount_dec (void *conn); extern void coroipcs_ipc_exit (void); extern int coroipcs_handler_accept (int fd, int revent, void *context); extern int coroipcs_handler_dispatch (int fd, int revent, void *context); #endif /* COROIPCS_H_DEFINED */ diff --git a/exec/vsf_quorum.c b/exec/vsf_quorum.c index dc05458b..45f537ef 100644 --- a/exec/vsf_quorum.c +++ b/exec/vsf_quorum.c @@ -1,470 +1,475 @@ /* * Copyright (c) 2008, 2009 Red Hat, Inc. * * All rights reserved. * * Author: Christine Caulfield (ccaulfie@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of Red Hat Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include LOGSYS_DECLARE_SUBSYS ("QUORUM", LOG_INFO); struct quorum_pd { unsigned char track_flags; int tracking_enabled; struct list_head list; void *conn; }; struct internal_callback_pd { struct list_head list; quorum_callback_fn_t callback; void *context; }; -static void message_handler_req_lib_quorum_getquorate (void *conn, void *msg); -static void message_handler_req_lib_quorum_trackstart (void *conn, void *msg); -static void message_handler_req_lib_quorum_trackstop (void *conn, void *msg); +static void message_handler_req_lib_quorum_getquorate (void *conn, + const void *msg); +static void message_handler_req_lib_quorum_trackstart (void *conn, + const void *msg); +static void message_handler_req_lib_quorum_trackstop (void *conn, + const void *msg); static void send_library_notification(void *conn); static void send_internal_notification(void); static int quorum_exec_init_fn (struct corosync_api_v1 *api); static int quorum_lib_init_fn (void *conn); static int quorum_lib_exit_fn (void *conn); static int primary_designated = 0; static struct corosync_api_v1 *corosync_api; static struct list_head lib_trackers_list; static struct list_head internal_trackers_list; static struct memb_ring_id quorum_ring_id; static size_t quorum_view_list_entries = 0; static int quorum_view_list[PROCESSOR_COUNT_MAX]; struct quorum_services_api_ver1 *quorum_iface = NULL; static void (*sync_primary_callback_fn) ( const unsigned int *view_list, size_t view_list_entries, int primary_designated, struct memb_ring_id *ring_id); /* Internal quorum API function */ static void quorum_api_set_quorum(const unsigned int *view_list, size_t view_list_entries, int quorum, struct memb_ring_id *ring_id) { primary_designated = quorum; if (primary_designated) { log_printf (LOG_LEVEL_NOTICE, "This node is within the primary component and will provide service.\n"); } else { log_printf (LOG_LEVEL_NOTICE, "This node is within the non-primary component and will NOT provide any services.\n"); } quorum_view_list_entries = view_list_entries; /* Tell sync() only if there is a new ring_id (ie this is not a 'fake' quorum event) */ if (memcmp(&quorum_ring_id, ring_id, sizeof (quorum_ring_id))) { sync_primary_callback_fn(view_list, view_list_entries, primary_designated, ring_id); } memcpy(&quorum_ring_id, ring_id, sizeof (quorum_ring_id)); memcpy(quorum_view_list, view_list, sizeof(unsigned int)*view_list_entries); /* Tell internal listeners */ send_internal_notification(); /* Tell IPC listeners */ send_library_notification(NULL); } static struct corosync_lib_handler quorum_lib_service[] = { { /* 0 */ .lib_handler_fn = message_handler_req_lib_quorum_getquorate, .response_size = sizeof (struct res_lib_quorum_getquorate), .response_id = MESSAGE_RES_QUORUM_GETQUORATE, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 1 */ .lib_handler_fn = message_handler_req_lib_quorum_trackstart, .response_size = sizeof (mar_res_header_t), .response_id = MESSAGE_RES_QUORUM_NOTIFICATION, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 2 */ .lib_handler_fn = message_handler_req_lib_quorum_trackstop, .response_size = sizeof (mar_res_header_t), .response_id = MESSAGE_RES_QUORUM_TRACKSTOP, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED } }; static struct corosync_service_engine quorum_service_handler = { .name = "corosync cluster quorum service v0.1", .id = QUORUM_SERVICE, .private_data_size = sizeof (struct quorum_pd), .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED, .allow_inquorate = CS_LIB_ALLOW_INQUORATE, .lib_init_fn = quorum_lib_init_fn, .lib_exit_fn = quorum_lib_exit_fn, .lib_engine = quorum_lib_service, .exec_init_fn = quorum_exec_init_fn, .lib_engine_count = sizeof (quorum_lib_service) / sizeof (struct corosync_lib_handler), }; static struct lcr_iface corosync_quorum_ver0[1] = { { .name = "corosync_quorum", .version = 0, .versions_replace = 0, .versions_replace_count = 0, .dependencies = 0, .dependency_count = 0, .constructor = NULL, .destructor = NULL, .interfaces = NULL, }, }; static struct corosync_service_engine *quorum_get_service_handler_ver0 (void) { return (&quorum_service_handler); } static struct lcr_comp quorum_comp_ver0 = { .iface_count = 1, .ifaces = corosync_quorum_ver0 }; static struct corosync_service_engine_iface_ver0 quorum_service_handler_iface = { .corosync_get_service_engine_ver0 = quorum_get_service_handler_ver0 }; __attribute__ ((constructor)) static void quorum_comp_register (void) { lcr_component_register (&quorum_comp_ver0); lcr_interfaces_set (&corosync_quorum_ver0[0], &quorum_service_handler_iface); } /* -------------------------------------------------- */ /* * Internal API functions for corosync */ static int quorum_quorate(void) { return primary_designated; } static int quorum_register_callback(quorum_callback_fn_t function, void *context) { struct internal_callback_pd *pd = malloc(sizeof(struct internal_callback_pd)); if (!pd) return -1; pd->context = context; pd->callback = function; list_add (&pd->list, &internal_trackers_list); return 0; } static int quorum_unregister_callback(quorum_callback_fn_t function, void *context) { struct internal_callback_pd *pd; struct list_head *tmp; for (tmp = internal_trackers_list.next; tmp != &internal_trackers_list; tmp = tmp->next) { pd = list_entry(tmp, struct internal_callback_pd, list); if (pd->callback == function && pd->context == context) { list_del(&pd->list); return 0; } } return -1; } static struct quorum_callin_functions callins = { .quorate = quorum_quorate, .register_callback = quorum_register_callback, .unregister_callback = quorum_unregister_callback }; /* --------------------------------------------------------------------- */ static int quorum_exec_init_fn (struct corosync_api_v1 *api) { hdb_handle_t find_handle; hdb_handle_t quorum_handle = 0; hdb_handle_t q_handle; char *quorum_module; int res; void *quorum_iface_p; corosync_api = api; list_init (&lib_trackers_list); list_init (&internal_trackers_list); /* * Tell corosync we have a quorum engine. */ api->quorum_initialize(&callins); /* * Look for a quorum provider */ api->object_find_create(OBJECT_PARENT_HANDLE, "quorum", strlen("quorum"), &find_handle); api->object_find_next(find_handle, &quorum_handle); api->object_find_destroy(find_handle); if (quorum_handle) { if ( !(res = api->object_key_get(quorum_handle, "provider", strlen("provider"), (void *)&quorum_module, NULL))) { res = lcr_ifact_reference ( &q_handle, quorum_module, 0, &quorum_iface_p, 0); if (res == -1) { log_printf (LOG_LEVEL_NOTICE, "Couldn't load quorum provider %s\n", quorum_module); return (-1); } log_printf (LOG_LEVEL_NOTICE, "Using quorum provider %s\n", quorum_module); quorum_iface = (struct quorum_services_api_ver1 *)quorum_iface_p; quorum_iface->init (api, quorum_api_set_quorum); } } if (!quorum_iface) { /* * With no quorum provider, we are always quorate */ primary_designated = 1; } return (0); } static int quorum_lib_init_fn (void *conn) { struct quorum_pd *pd = (struct quorum_pd *)corosync_api->ipc_private_data_get (conn); log_printf(LOG_LEVEL_DEBUG, "lib_init_fn: conn=%p\n", conn); list_init (&pd->list); pd->conn = conn; return (0); } static int quorum_lib_exit_fn (void *conn) { struct quorum_pd *quorum_pd = (struct quorum_pd *)corosync_api->ipc_private_data_get (conn); log_printf(LOG_LEVEL_DEBUG, "lib_exit_fn: conn=%p\n", conn); if (quorum_pd->tracking_enabled) { list_del (&quorum_pd->list); list_init (&quorum_pd->list); } return (0); } static void send_internal_notification(void) { struct list_head *tmp; struct internal_callback_pd *pd; for (tmp = internal_trackers_list.next; tmp != &internal_trackers_list; tmp = tmp->next) { pd = list_entry(tmp, struct internal_callback_pd, list); pd->callback(primary_designated, pd->context); } } static void send_library_notification(void *conn) { int size = sizeof(struct res_lib_quorum_notification) + sizeof(unsigned int)*quorum_view_list_entries; char buf[size]; struct res_lib_quorum_notification *res_lib_quorum_notification = (struct res_lib_quorum_notification *)buf; struct list_head *tmp; int i; log_printf(LOG_LEVEL_DEBUG, "sending quorum notification to %p, length = %d\n", conn, size); res_lib_quorum_notification->quorate = primary_designated; res_lib_quorum_notification->ring_seq = quorum_ring_id.seq; res_lib_quorum_notification->view_list_entries = quorum_view_list_entries; for (i=0; iview_list[i] = quorum_view_list[i]; } res_lib_quorum_notification->header.id = MESSAGE_RES_QUORUM_NOTIFICATION; res_lib_quorum_notification->header.size = size; res_lib_quorum_notification->header.error = CS_OK; /* Send it to all interested parties */ if (conn) { corosync_api->ipc_response_send(conn, res_lib_quorum_notification, size); } else { struct quorum_pd *qpd; for (tmp = lib_trackers_list.next; tmp != &lib_trackers_list; tmp = tmp->next) { qpd = list_entry(tmp, struct quorum_pd, list); corosync_api->ipc_dispatch_send(qpd->conn, res_lib_quorum_notification, size); } } return; } -static void message_handler_req_lib_quorum_getquorate (void *conn, void *msg) +static void message_handler_req_lib_quorum_getquorate (void *conn, + const void *msg) { struct res_lib_quorum_getquorate res_lib_quorum_getquorate; log_printf(LOG_LEVEL_DEBUG, "got quorate request on %p\n", conn); /* send status */ res_lib_quorum_getquorate.quorate = primary_designated; res_lib_quorum_getquorate.header.size = sizeof(res_lib_quorum_getquorate); res_lib_quorum_getquorate.header.id = MESSAGE_RES_QUORUM_GETQUORATE; res_lib_quorum_getquorate.header.error = CS_OK; corosync_api->ipc_response_send(conn, &res_lib_quorum_getquorate, sizeof(res_lib_quorum_getquorate)); } -static void message_handler_req_lib_quorum_trackstart (void *conn, void *msg) +static void message_handler_req_lib_quorum_trackstart (void *conn, + const void *msg) { - struct req_lib_quorum_trackstart *req_lib_quorum_trackstart = (struct req_lib_quorum_trackstart *)msg; + const struct req_lib_quorum_trackstart *req_lib_quorum_trackstart = msg; mar_res_header_t res; struct quorum_pd *quorum_pd = (struct quorum_pd *)corosync_api->ipc_private_data_get (conn); log_printf(LOG_LEVEL_DEBUG, "got trackstart request on %p\n", conn); /* * If an immediate listing of the current cluster membership * is requested, generate membership list */ if (req_lib_quorum_trackstart->track_flags & CS_TRACK_CURRENT || req_lib_quorum_trackstart->track_flags & CS_TRACK_CHANGES) { log_printf(LOG_LEVEL_DEBUG, "sending initial status to %p\n", conn); send_library_notification(conn); } /* * Record requests for tracking */ if (req_lib_quorum_trackstart->track_flags & CS_TRACK_CHANGES || req_lib_quorum_trackstart->track_flags & CS_TRACK_CHANGES_ONLY) { quorum_pd->track_flags = req_lib_quorum_trackstart->track_flags; quorum_pd->tracking_enabled = 1; list_add (&quorum_pd->list, &lib_trackers_list); } /* send status */ res.size = sizeof(res); res.id = MESSAGE_RES_QUORUM_TRACKSTART; res.error = CS_OK; corosync_api->ipc_response_send(conn, &res, sizeof(mar_res_header_t)); } -static void message_handler_req_lib_quorum_trackstop (void *conn, void *msg) +static void message_handler_req_lib_quorum_trackstop (void *conn, const void *msg) { mar_res_header_t res; struct quorum_pd *quorum_pd = (struct quorum_pd *)corosync_api->ipc_private_data_get (conn); log_printf(LOG_LEVEL_DEBUG, "got trackstop request on %p\n", conn); if (quorum_pd->tracking_enabled) { res.error = CS_OK; quorum_pd->tracking_enabled = 0; list_del (&quorum_pd->list); list_init (&quorum_pd->list); } else { res.error = CS_ERR_NOT_EXIST; } /* send status */ res.size = sizeof(res); res.id = MESSAGE_RES_QUORUM_TRACKSTOP; res.error = CS_OK; corosync_api->ipc_response_send(conn, &res, sizeof(mar_res_header_t)); } diff --git a/include/corosync/engine/coroapi.h b/include/corosync/engine/coroapi.h index d143ee1d..6f6c69d8 100644 --- a/include/corosync/engine/coroapi.h +++ b/include/corosync/engine/coroapi.h @@ -1,603 +1,603 @@ /* * Copyright (c) 2008, 2009 Red Hat, Inc. * * All rights reserved. * * Author: Steven Dake (sdake@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #ifndef COROAPI_H_DEFINED #define COROAPI_H_DEFINED #include #ifdef COROSYNC_BSD #include #endif #include typedef void * corosync_timer_handle_t; struct corosync_tpg_group { const void *group; size_t group_len; }; #define TOTEMIP_ADDRLEN (sizeof(struct in6_addr)) #define PROCESSOR_COUNT_MAX 384 #define INTERFACE_MAX 2 #ifndef MESSAGE_SIZE_MAX #define MESSAGE_SIZE_MAX 1024*1024 /* (1MB) */ #endif /* MESSAGE_SIZE_MAX */ #ifndef MESSAGE_QUEUE_MAX #define MESSAGE_QUEUE_MAX MESSAGE_SIZE_MAX / totem_config->net_mtu #endif /* MESSAGE_QUEUE_MAX */ #define TOTEM_AGREED 0 #define TOTEM_SAFE 1 #define MILLI_2_NANO_SECONDS 1000000ULL #if !defined(TOTEM_IP_ADDRESS) struct totem_ip_address { unsigned int nodeid; unsigned short family; unsigned char addr[TOTEMIP_ADDRLEN]; } __attribute__((packed)); #endif #if !defined(MEMB_RING_ID) struct memb_ring_id { struct totem_ip_address rep; unsigned long long seq; } __attribute__((packed)); #endif #if !defined(TOTEM_CONFIGURATION_TYPE) enum totem_configuration_type { TOTEM_CONFIGURATION_REGULAR, TOTEM_CONFIGURATION_TRANSITIONAL }; #endif #if !defined(TOTEM_CALLBACK_TOKEN_TYPE) enum totem_callback_token_type { TOTEM_CALLBACK_TOKEN_RECEIVED = 1, TOTEM_CALLBACK_TOKEN_SENT = 2 }; #endif enum cs_lib_flow_control { CS_LIB_FLOW_CONTROL_REQUIRED = 1, CS_LIB_FLOW_CONTROL_NOT_REQUIRED = 2 }; #define corosync_lib_flow_control cs_lib_flow_control #define COROSYNC_LIB_FLOW_CONTROL_REQUIRED CS_LIB_FLOW_CONTROL_REQUIRED #define COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED CS_LIB_FLOW_CONTROL_NOT_REQUIRED enum cs_lib_allow_inquorate { CS_LIB_DISALLOW_INQUORATE = 0, /* default */ CS_LIB_ALLOW_INQUORATE = 1 }; #if !defined (COROSYNC_FLOW_CONTROL_STATE) enum cs_flow_control_state { CS_FLOW_CONTROL_STATE_DISABLED, CS_FLOW_CONTROL_STATE_ENABLED }; #define corosync_flow_control_state cs_flow_control_state #define CS_FLOW_CONTROL_STATE_DISABLED CS_FLOW_CONTROL_STATE_DISABLED #define CS_FLOW_CONTROL_STATE_ENABLED CS_FLOW_CONTROL_STATE_ENABLED #endif /* COROSYNC_FLOW_CONTROL_STATE */ typedef enum { COROSYNC_FATAL_ERROR_EXIT = -1, COROSYNC_LIBAIS_SOCKET = -6, COROSYNC_LIBAIS_BIND = -7, COROSYNC_READKEY = -8, COROSYNC_INVALID_CONFIG = -9, COROSYNC_DYNAMICLOAD = -12, COROSYNC_OUT_OF_MEMORY = -15, COROSYNC_FATAL_ERR = -16 } cs_fatal_error_t; #define corosync_fatal_error_t cs_fatal_error_t; #ifndef OBJECT_PARENT_HANDLE #define OBJECT_PARENT_HANDLE 0xffffffff00000000ULL struct object_valid { char *object_name; size_t object_len; }; struct object_key_valid { char *key_name; size_t key_len; int (*validate_callback) (const void *key, size_t key_len, const void *value, size_t value_len); }; /* deprecated */ typedef enum { OBJECT_TRACK_DEPTH_ONE, OBJECT_TRACK_DEPTH_RECURSIVE } object_track_depth_t; typedef enum { OBJECT_KEY_CREATED, OBJECT_KEY_REPLACED, OBJECT_KEY_DELETED } object_change_type_t; typedef enum { OBJDB_RELOAD_NOTIFY_START, OBJDB_RELOAD_NOTIFY_END, OBJDB_RELOAD_NOTIFY_FAILED } objdb_reload_notify_type_t; typedef void (*object_key_change_notify_fn_t)( object_change_type_t change_type, hdb_handle_t parent_object_handle, hdb_handle_t object_handle, const void *object_name_pt, size_t object_name_len, const void *key_name_pt, size_t key_len, const void *key_value_pt, size_t key_value_len, void *priv_data_pt); typedef void (*object_create_notify_fn_t) ( hdb_handle_t parent_object_handle, hdb_handle_t object_handle, const uint8_t *name_pt, size_t name_len, void *priv_data_pt); typedef void (*object_destroy_notify_fn_t) ( hdb_handle_t parent_object_handle, const uint8_t *name_pt, size_t name_len, void *priv_data_pt); typedef void (*object_notify_callback_fn_t)( hdb_handle_t object_handle, const void *key_name, size_t key_len, const void *value, size_t value_len, object_change_type_t type, const void * priv_data_pt); typedef void (*object_reload_notify_fn_t) ( objdb_reload_notify_type_t, int flush, void *priv_data_pt); #endif /* OBJECT_PARENT_HANDLE_DEFINED */ #ifndef QUORUM_H_DEFINED typedef void (*quorum_callback_fn_t) (int quorate, void *context); struct quorum_callin_functions { int (*quorate) (void); int (*register_callback) (quorum_callback_fn_t callback_fn, void *context); int (*unregister_callback) (quorum_callback_fn_t callback_fn, void *context); }; typedef void (*sync_callback_fn_t) ( const unsigned int *view_list, size_t view_list_entries, int primary_designated, struct memb_ring_id *ring_id); #endif /* QUORUM_H_DEFINED */ struct corosync_api_v1 { /* * Object and configuration APIs */ int (*object_create) ( hdb_handle_t parent_object_handle, hdb_handle_t *object_handle, const void *object_name, size_t object_name_len); int (*object_priv_set) ( hdb_handle_t object_handle, void *priv); int (*object_key_create) ( hdb_handle_t object_handle, const void *key_name, size_t key_len, const void *value, size_t value_len); int (*object_destroy) ( hdb_handle_t object_handle); int (*object_valid_set) ( hdb_handle_t object_handle, struct object_valid *object_valid_list, unsigned int object_valid_list_entries); int (*object_key_valid_set) ( hdb_handle_t object_handle, struct object_key_valid *object_key_valid_list, unsigned int object_key_valid_list_entries); int (*object_find_create) ( hdb_handle_t parent_object_handle, const void *object_name, size_t object_name_len, hdb_handle_t *object_find_handle); int (*object_find_next) ( hdb_handle_t object_find_handle, hdb_handle_t *object_handle); int (*object_find_destroy) ( hdb_handle_t object_find_handle); int (*object_key_get) ( hdb_handle_t object_handle, const void *key_name, size_t key_len, void **value, size_t *value_len); int (*object_priv_get) ( hdb_handle_t jobject_handle, void **priv); int (*object_key_replace) ( hdb_handle_t object_handle, const void *key_name, size_t key_len, const void *new_value, size_t new_value_len); int (*object_key_delete) ( hdb_handle_t object_handle, const void *key_name, size_t key_len); int (*object_iter_reset) ( hdb_handle_t parent_object_handle); int (*object_iter) ( hdb_handle_t parent_object_handle, void **object_name, size_t *name_len, hdb_handle_t *object_handle); int (*object_key_iter_reset) ( hdb_handle_t object_handle); int (*object_key_iter) ( hdb_handle_t parent_object_handle, void **key_name, size_t *key_len, void **value, size_t *value_len); int (*object_parent_get) ( hdb_handle_t object_handle, hdb_handle_t *parent_handle); int (*object_name_get) ( hdb_handle_t object_handle, char *object_name, size_t *object_name_len); int (*object_dump) ( hdb_handle_t object_handle, FILE *file); int (*object_key_iter_from) ( hdb_handle_t parent_object_handle, hdb_handle_t start_pos, void **key_name, size_t *key_len, void **value, size_t *value_len); int (*object_track_start) ( hdb_handle_t object_handle, object_track_depth_t depth, object_key_change_notify_fn_t key_change_notify_fn, object_create_notify_fn_t object_create_notify_fn, object_destroy_notify_fn_t object_destroy_notify_fn, object_reload_notify_fn_t object_reload_notify_fn, void * priv_data_pt); void (*object_track_stop) ( object_key_change_notify_fn_t key_change_notify_fn, object_create_notify_fn_t object_create_notify_fn, object_destroy_notify_fn_t object_destroy_notify_fn, object_reload_notify_fn_t object_reload_notify_fn, void * priv_data_pt); int (*object_write_config) (const char **error_string); int (*object_reload_config) (int flush, const char **error_string); int (*object_key_increment) ( hdb_handle_t object_handle, const void *key_name, size_t key_len, unsigned int *value); int (*object_key_decrement) ( hdb_handle_t object_handle, const void *key_name, size_t key_len, unsigned int *value); /* * Time and timer APIs */ int (*timer_add_duration) ( unsigned long long nanoseconds_in_future, void *data, void (*timer_nf) (void *data), corosync_timer_handle_t *handle); int (*timer_add_absolute) ( unsigned long long nanoseconds_from_epoch, void *data, void (*timer_fn) (void *data), corosync_timer_handle_t *handle); void (*timer_delete) ( corosync_timer_handle_t timer_handle); unsigned long long (*timer_time_get) (void); unsigned long long (*timer_expire_time_get) ( corosync_timer_handle_t timer_handle); /* * IPC APIs */ void (*ipc_source_set) (mar_message_source_t *source, void *conn); int (*ipc_source_is_local) (const mar_message_source_t *source); void *(*ipc_private_data_get) (void *conn); int (*ipc_response_send) (void *conn, const void *msg, size_t mlen); int (*ipc_response_iov_send) (void *conn, const struct iovec *iov, unsigned int iov_len); int (*ipc_dispatch_send) (void *conn, const void *msg, size_t mlen); int (*ipc_dispatch_iov_send) (void *conn, const struct iovec *iov, unsigned int iov_len); void (*ipc_refcnt_inc) (void *conn); void (*ipc_refcnt_dec) (void *conn); /* * Totem APIs */ unsigned int (*totem_nodeid_get) (void); int (*totem_family_get) (void); int (*totem_ring_reenable) (void); int (*totem_mcast) (struct iovec *iovec, unsigned int iov_len, unsigned int guarantee); int (*totem_ifaces_get) ( unsigned int nodeid, struct totem_ip_address *interfaces, char ***status, unsigned int *iface_count); const char *(*totem_ifaces_print) (unsigned int nodeid); const char *(*totem_ip_print) (const struct totem_ip_address *addr); int (*totem_callback_token_create) ( void **handle_out, enum totem_callback_token_type type, int delete, int (*callback_fn) (enum totem_callback_token_type type, const void *), const void *data); /* * Totem open process groups API for those service engines * wanting their own groups */ int (*tpg_init) ( hdb_handle_t *handle, void (*deliver_fn) ( unsigned int nodeid, struct iovec *iovec, unsigned int iov_len, int endian_conversion_required), void (*confchg_fn) ( enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)); int (*tpg_exit) ( hdb_handle_t handle); int (*tpg_join) ( hdb_handle_t handle, struct corosync_tpg_group *groups, size_t group_cnt); int (*tpg_leave) ( hdb_handle_t handle, struct corosync_tpg_group *groups, size_t group_cnt); int (*tpg_joined_mcast) ( hdb_handle_t handle, const struct iovec *iovec, unsigned int iov_len, int guarantee); int (*tpg_joined_reserve) ( hdb_handle_t handle, const struct iovec *iovec, unsigned int iov_len); int (*tpg_joined_release) ( int reserved_msgs); int (*tpg_groups_mcast) ( hdb_handle_t handle, int guarantee, const struct corosync_tpg_group *groups, size_t groups_cnt, const struct iovec *iovec, unsigned int iov_len); int (*tpg_groups_reserve) ( hdb_handle_t handle, const struct corosync_tpg_group *groups, size_t groups_cnt, const struct iovec *iovec, unsigned int iov_len); int (*tpg_groups_release) ( int reserved_msgs); int (*sync_request) ( const char *service_name); /* * User plugin-callable functions for quorum */ int (*quorum_is_quorate) (void); int (*quorum_register_callback) (quorum_callback_fn_t callback_fn, void *context); int (*quorum_unregister_callback) (quorum_callback_fn_t callback_fn, void *context); /* * This one is for the quorum management plugin's use */ int (*quorum_initialize)(struct quorum_callin_functions *fns); /* * Plugin loading and unloading */ int (*plugin_interface_reference) ( hdb_handle_t *handle, const char *iface_name, int version, void **interface, void *context); int (*plugin_interface_release) (hdb_handle_t handle); /* * Service loading and unloading APIs */ unsigned int (*service_link_and_init) ( struct corosync_api_v1 *corosync_api_v1, const char *service_name, unsigned int service_ver); unsigned int (*service_unlink_and_exit) ( struct corosync_api_v1 *corosync_api_v1, const char *service_name, unsigned int service_ver); /* * Error handling APIs */ void (*error_memory_failure) (void); #define corosync_fatal_error(err) api->fatal_error ((err), __FILE__, __LINE__) void (*fatal_error) (cs_fatal_error_t err, const char *file, unsigned int line); }; #define SERVICE_ID_MAKE(a,b) ( ((a)<<16) | (b) ) #define SERVICE_HANDLER_MAXIMUM_COUNT 64 struct corosync_lib_handler { - void (*lib_handler_fn) (void *conn, void *msg); + void (*lib_handler_fn) (void *conn, const void *msg); int response_size; int response_id; enum cs_lib_flow_control flow_control; }; struct corosync_exec_handler { void (*exec_handler_fn) (const void *msg, unsigned int nodeid); void (*exec_endian_convert_fn) (void *msg); }; struct corosync_service_engine_iface_ver0 { struct corosync_service_engine *(*corosync_get_service_engine_ver0) (void); }; struct corosync_service_engine { const char *name; unsigned short id; unsigned int private_data_size; enum cs_lib_flow_control flow_control; enum cs_lib_allow_inquorate allow_inquorate; int (*exec_init_fn) (struct corosync_api_v1 *); int (*exec_exit_fn) (void); void (*exec_dump_fn) (void); int (*lib_init_fn) (void *conn); int (*lib_exit_fn) (void *conn); struct corosync_lib_handler *lib_engine; int lib_engine_count; struct corosync_exec_handler *exec_engine; int exec_engine_count; int (*config_init_fn) (struct corosync_api_v1 *); void (*confchg_fn) ( enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id); void (*sync_init) (void); int (*sync_process) (void); void (*sync_activate) (void); void (*sync_abort) (void); }; #endif /* COROAPI_H_DEFINED */ diff --git a/services/cfg.c b/services/cfg.c index d93324ad..fdb9046a 100644 --- a/services/cfg.c +++ b/services/cfg.c @@ -1,1058 +1,1059 @@ /* * Copyright (c) 2005-2006 MontaVista Software, Inc. * Copyright (c) 2006-2009 Red Hat, Inc. * * All rights reserved. * * Author: Steven Dake (sdake@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include LOGSYS_DECLARE_SUBSYS ("CFG", LOG_INFO); enum cfg_message_req_types { MESSAGE_REQ_EXEC_CFG_RINGREENABLE = 0, MESSAGE_REQ_EXEC_CFG_KILLNODE = 1, MESSAGE_REQ_EXEC_CFG_SHUTDOWN = 2 }; #define DEFAULT_SHUTDOWN_TIMEOUT 5 static struct list_head trackers_list; /* * Variables controlling a requested shutdown */ static corosync_timer_handle_t shutdown_timer; static struct cfg_info *shutdown_con; static uint32_t shutdown_flags; static int shutdown_yes; static int shutdown_no; static int shutdown_expected; struct cfg_info { struct list_head list; void *conn; void *tracker_conn; enum {SHUTDOWN_REPLY_UNKNOWN, SHUTDOWN_REPLY_YES, SHUTDOWN_REPLY_NO} shutdown_reply; }; static void cfg_confchg_fn ( enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id); static int cfg_exec_init_fn (struct corosync_api_v1 *corosync_api_v1); static struct corosync_api_v1 *api; static int cfg_lib_init_fn (void *conn); static int cfg_lib_exit_fn (void *conn); static void message_handler_req_exec_cfg_ringreenable ( const void *message, unsigned int nodeid); static void message_handler_req_exec_cfg_killnode ( const void *message, unsigned int nodeid); static void message_handler_req_exec_cfg_shutdown ( const void *message, unsigned int nodeid); static void exec_cfg_killnode_endian_convert (void *msg); static void message_handler_req_lib_cfg_ringstatusget ( void *conn, - void *msg); + const void *msg); static void message_handler_req_lib_cfg_ringreenable ( void *conn, - void *msg); + const void *msg); static void message_handler_req_lib_cfg_statetrack ( void *conn, - void *msg); + const void *msg); static void message_handler_req_lib_cfg_statetrackstop ( void *conn, - void *msg); + const void *msg); static void message_handler_req_lib_cfg_administrativestateset ( void *conn, - void *msg); + const void *msg); static void message_handler_req_lib_cfg_administrativestateget ( void *conn, - void *msg); + const void *msg); static void message_handler_req_lib_cfg_serviceload ( void *conn, - void *msg); + const void *msg); static void message_handler_req_lib_cfg_serviceunload ( void *conn, - void *msg); + const void *msg); static void message_handler_req_lib_cfg_killnode ( void *conn, - void *msg); + const void *msg); static void message_handler_req_lib_cfg_tryshutdown ( void *conn, - void *msg); + const void *msg); static void message_handler_req_lib_cfg_replytoshutdown ( void *conn, - void *msg); + const void *msg); static void message_handler_req_lib_cfg_get_node_addrs ( void *conn, - void *msg); + const void *msg); static void message_handler_req_lib_cfg_local_get ( void *conn, - void *msg); + const void *msg); /* * Service Handler Definition */ static struct corosync_lib_handler cfg_lib_engine[] = { { /* 0 */ .lib_handler_fn = message_handler_req_lib_cfg_ringstatusget, .response_size = sizeof (struct res_lib_cfg_ringstatusget), .response_id = MESSAGE_RES_CFG_RINGSTATUSGET, .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED }, { /* 1 */ .lib_handler_fn = message_handler_req_lib_cfg_ringreenable, .response_size = sizeof (struct res_lib_cfg_ringreenable), .response_id = MESSAGE_RES_CFG_RINGREENABLE, .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED }, { /* 2 */ .lib_handler_fn = message_handler_req_lib_cfg_statetrack, .response_size = sizeof (struct res_lib_cfg_statetrack), .response_id = MESSAGE_RES_CFG_STATETRACKSTART, .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED }, { /* 3 */ .lib_handler_fn = message_handler_req_lib_cfg_statetrackstop, .response_size = sizeof (struct res_lib_cfg_statetrackstop), .response_id = MESSAGE_RES_CFG_STATETRACKSTOP, .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED }, { /* 4 */ .lib_handler_fn = message_handler_req_lib_cfg_administrativestateset, .response_size = sizeof (struct res_lib_cfg_administrativestateset), .response_id = MESSAGE_RES_CFG_ADMINISTRATIVESTATESET, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 5 */ .lib_handler_fn = message_handler_req_lib_cfg_administrativestateget, .response_size = sizeof (struct res_lib_cfg_administrativestateget), .response_id = MESSAGE_RES_CFG_ADMINISTRATIVESTATEGET, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 6 */ .lib_handler_fn = message_handler_req_lib_cfg_serviceload, .response_size = sizeof (struct res_lib_cfg_serviceload), .response_id = MESSAGE_RES_CFG_SERVICELOAD, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 7 */ .lib_handler_fn = message_handler_req_lib_cfg_serviceunload, .response_size = sizeof (struct res_lib_cfg_serviceunload), .response_id = MESSAGE_RES_CFG_SERVICEUNLOAD, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 8 */ .lib_handler_fn = message_handler_req_lib_cfg_killnode, .response_size = sizeof (struct res_lib_cfg_killnode), .response_id = MESSAGE_RES_CFG_KILLNODE, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 9 */ .lib_handler_fn = message_handler_req_lib_cfg_tryshutdown, .response_size = sizeof (struct res_lib_cfg_tryshutdown), .response_id = MESSAGE_RES_CFG_TRYSHUTDOWN, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 10 */ .lib_handler_fn = message_handler_req_lib_cfg_replytoshutdown, .response_size = sizeof (struct res_lib_cfg_replytoshutdown), .response_id = MESSAGE_RES_CFG_REPLYTOSHUTDOWN, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 11 */ .lib_handler_fn = message_handler_req_lib_cfg_get_node_addrs, .response_size = sizeof (struct res_lib_cfg_get_node_addrs), .response_id = MESSAGE_RES_CFG_GET_NODE_ADDRS, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 12 */ .lib_handler_fn = message_handler_req_lib_cfg_local_get, .response_size = sizeof (struct res_lib_cfg_local_get), .response_id = MESSAGE_RES_CFG_LOCAL_GET, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED } }; static struct corosync_exec_handler cfg_exec_engine[] = { { /* 0 */ .exec_handler_fn = message_handler_req_exec_cfg_ringreenable, }, { /* 1 */ .exec_handler_fn = message_handler_req_exec_cfg_killnode, .exec_endian_convert_fn = exec_cfg_killnode_endian_convert }, { /* 2 */ .exec_handler_fn = message_handler_req_exec_cfg_shutdown, } }; /* * Exports the interface for the service */ struct corosync_service_engine cfg_service_engine = { .name = "corosync configuration service", .id = CFG_SERVICE, .private_data_size = sizeof(struct cfg_info), .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED, .allow_inquorate = CS_LIB_ALLOW_INQUORATE, .lib_init_fn = cfg_lib_init_fn, .lib_exit_fn = cfg_lib_exit_fn, .lib_engine = cfg_lib_engine, .lib_engine_count = sizeof (cfg_lib_engine) / sizeof (struct corosync_lib_handler), .exec_init_fn = cfg_exec_init_fn, .exec_engine = cfg_exec_engine, .exec_engine_count = 0, /* sizeof (cfg_aisexec_handler_fns) / sizeof (coroync_exec_handler), */ .confchg_fn = cfg_confchg_fn, }; /* * Dynamic Loader definition */ static struct corosync_service_engine *cfg_get_service_engine_ver0 (void); static struct corosync_service_engine_iface_ver0 cfg_service_engine_iface = { .corosync_get_service_engine_ver0 = cfg_get_service_engine_ver0 }; static struct lcr_iface corosync_cfg_ver0[1] = { { .name = "corosync_cfg", .version = 0, .versions_replace = 0, .versions_replace_count = 0, .dependencies = 0, .dependency_count = 0, .constructor = NULL, .destructor = NULL, .interfaces = NULL } }; static struct lcr_comp cfg_comp_ver0 = { .iface_count = 1, .ifaces = corosync_cfg_ver0 }; static struct corosync_service_engine *cfg_get_service_engine_ver0 (void) { return (&cfg_service_engine); } __attribute__ ((constructor)) static void register_this_component (void) { lcr_interfaces_set (&corosync_cfg_ver0[0], &cfg_service_engine_iface); lcr_component_register (&cfg_comp_ver0); } struct req_exec_cfg_ringreenable { mar_req_header_t header __attribute__((aligned(8))); mar_message_source_t source __attribute__((aligned(8))); }; struct req_exec_cfg_killnode { mar_req_header_t header __attribute__((aligned(8))); mar_uint32_t nodeid __attribute__((aligned(8))); mar_name_t reason __attribute__((aligned(8))); }; struct req_exec_cfg_shutdown { mar_req_header_t header __attribute__((aligned(8))); }; /* IMPL */ static int cfg_exec_init_fn ( struct corosync_api_v1 *corosync_api_v1) { api = corosync_api_v1; list_init(&trackers_list); return (0); } static void cfg_confchg_fn ( enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id) { } /* * Tell other nodes we are shutting down */ static int send_shutdown(void) { struct req_exec_cfg_shutdown req_exec_cfg_shutdown; struct iovec iovec; ENTER(); req_exec_cfg_shutdown.header.size = sizeof (struct req_exec_cfg_shutdown); req_exec_cfg_shutdown.header.id = SERVICE_ID_MAKE (CFG_SERVICE, MESSAGE_REQ_EXEC_CFG_SHUTDOWN); iovec.iov_base = (char *)&req_exec_cfg_shutdown; iovec.iov_len = sizeof (struct req_exec_cfg_shutdown); assert (api->totem_mcast (&iovec, 1, TOTEM_SAFE) == 0); LEAVE(); return 0; } static void send_test_shutdown(void *only_conn, void *exclude_conn, int status) { struct res_lib_cfg_testshutdown res_lib_cfg_testshutdown; struct list_head *iter; ENTER(); res_lib_cfg_testshutdown.header.size = sizeof(struct res_lib_cfg_testshutdown); res_lib_cfg_testshutdown.header.id = MESSAGE_RES_CFG_TESTSHUTDOWN; res_lib_cfg_testshutdown.header.error = status; res_lib_cfg_testshutdown.flags = shutdown_flags; if (only_conn) { TRACE1("sending testshutdown to only %p", only_conn); api->ipc_dispatch_send(only_conn, &res_lib_cfg_testshutdown, sizeof(res_lib_cfg_testshutdown)); } else { for (iter = trackers_list.next; iter != &trackers_list; iter = iter->next) { struct cfg_info *ci = list_entry(iter, struct cfg_info, list); if (ci->conn != exclude_conn) { TRACE1("sending testshutdown to %p", ci->tracker_conn); api->ipc_dispatch_send(ci->tracker_conn, &res_lib_cfg_testshutdown, sizeof(res_lib_cfg_testshutdown)); } } } LEAVE(); } static void check_shutdown_status(void) { ENTER(); /* * Shutdown client might have gone away */ if (!shutdown_con) { LEAVE(); return; } /* * All replies safely gathered in ? */ if (shutdown_yes + shutdown_no >= shutdown_expected) { struct res_lib_cfg_tryshutdown res_lib_cfg_tryshutdown; api->timer_delete(shutdown_timer); if (shutdown_yes >= shutdown_expected || shutdown_flags == CFG_SHUTDOWN_FLAG_REGARDLESS) { TRACE1("shutdown confirmed"); res_lib_cfg_tryshutdown.header.size = sizeof(struct res_lib_cfg_tryshutdown); res_lib_cfg_tryshutdown.header.id = MESSAGE_RES_CFG_TRYSHUTDOWN; res_lib_cfg_tryshutdown.header.error = CS_OK; /* * Tell originator that shutdown was confirmed */ api->ipc_response_send(shutdown_con->conn, &res_lib_cfg_tryshutdown, sizeof(res_lib_cfg_tryshutdown)); shutdown_con = NULL; /* * Tell other nodes we are going down */ send_shutdown(); } else { TRACE1("shutdown cancelled"); res_lib_cfg_tryshutdown.header.size = sizeof(struct res_lib_cfg_tryshutdown); res_lib_cfg_tryshutdown.header.id = MESSAGE_RES_CFG_TRYSHUTDOWN; res_lib_cfg_tryshutdown.header.error = CS_ERR_BUSY; /* * Tell originator that shutdown was cancelled */ api->ipc_response_send(shutdown_con->conn, &res_lib_cfg_tryshutdown, sizeof(res_lib_cfg_tryshutdown)); shutdown_con = NULL; } log_printf(LOG_DEBUG, "shutdown decision is: (yes count: %d, no count: %d) flags=%x\n", shutdown_yes, shutdown_no, shutdown_flags); } LEAVE(); } /* * Not all nodes responded to the shutdown (in time) */ static void shutdown_timer_fn(void *arg) { ENTER(); /* * Mark undecideds as "NO" */ shutdown_no = shutdown_expected; check_shutdown_status(); send_test_shutdown(NULL, NULL, CS_ERR_TIMEOUT); LEAVE(); } static void remove_ci_from_shutdown(struct cfg_info *ci) { ENTER(); /* * If the controlling shutdown process has quit, then cancel the * shutdown session */ if (ci == shutdown_con) { shutdown_con = NULL; api->timer_delete(shutdown_timer); } if (!list_empty(&ci->list)) { list_del(&ci->list); list_init(&ci->list); /* * Remove our option */ if (shutdown_con) { if (ci->shutdown_reply == SHUTDOWN_REPLY_YES) shutdown_yes--; if (ci->shutdown_reply == SHUTDOWN_REPLY_NO) shutdown_no--; } /* * If we are leaving, then that's an implicit YES to shutdown */ ci->shutdown_reply = SHUTDOWN_REPLY_YES; shutdown_yes++; check_shutdown_status(); } LEAVE(); } int cfg_lib_exit_fn (void *conn) { struct cfg_info *ci = (struct cfg_info *)api->ipc_private_data_get (conn); ENTER(); if (!list_empty(&ci->list)) { list_del(&ci->list); remove_ci_from_shutdown(ci); } LEAVE(); return (0); } static int cfg_lib_init_fn (void *conn) { struct cfg_info *ci = (struct cfg_info *)api->ipc_private_data_get (conn); ENTER(); list_init(&ci->list); LEAVE(); return (0); } /* * Executive message handlers */ static void message_handler_req_exec_cfg_ringreenable ( const void *message, unsigned int nodeid) { const struct req_exec_cfg_ringreenable *req_exec_cfg_ringreenable = message; struct res_lib_cfg_ringreenable res_lib_cfg_ringreenable; ENTER(); api->totem_ring_reenable (); if (api->ipc_source_is_local(&req_exec_cfg_ringreenable->source)) { res_lib_cfg_ringreenable.header.id = MESSAGE_RES_CFG_RINGREENABLE; res_lib_cfg_ringreenable.header.size = sizeof (struct res_lib_cfg_ringreenable); res_lib_cfg_ringreenable.header.error = CS_OK; api->ipc_response_send ( req_exec_cfg_ringreenable->source.conn, &res_lib_cfg_ringreenable, sizeof (struct res_lib_cfg_ringreenable)); } LEAVE(); } static void exec_cfg_killnode_endian_convert (void *msg) { struct req_exec_cfg_killnode *req_exec_cfg_killnode = (struct req_exec_cfg_killnode *)msg; ENTER(); swab_mar_name_t(&req_exec_cfg_killnode->reason); LEAVE(); } static void message_handler_req_exec_cfg_killnode ( const void *message, unsigned int nodeid) { const struct req_exec_cfg_killnode *req_exec_cfg_killnode = message; cs_name_t reason; ENTER(); log_printf(LOG_DEBUG, "request to kill node %d(us=%d): %s\n", req_exec_cfg_killnode->nodeid, api->totem_nodeid_get(), reason.value); if (req_exec_cfg_killnode->nodeid == api->totem_nodeid_get()) { marshall_from_mar_name_t(&reason, &req_exec_cfg_killnode->reason); log_printf(LOG_NOTICE, "Killed by node %d: %s\n", nodeid, reason.value); corosync_fatal_error(COROSYNC_FATAL_ERROR_EXIT); } LEAVE(); } /* * Self shutdown */ static void message_handler_req_exec_cfg_shutdown ( const void *message, unsigned int nodeid) { ENTER(); log_printf(LOG_NOTICE, "Node %d was shut down by sysadmin\n", nodeid); if (nodeid == api->totem_nodeid_get()) { corosync_fatal_error(COROSYNC_FATAL_ERROR_EXIT); } LEAVE(); } /* * Library Interface Implementation */ static void message_handler_req_lib_cfg_ringstatusget ( void *conn, - void *msg) + const void *msg) { struct res_lib_cfg_ringstatusget res_lib_cfg_ringstatusget; struct totem_ip_address interfaces[INTERFACE_MAX]; unsigned int iface_count; char **status; - char *totem_ip_string; + const char *totem_ip_string; unsigned int i; ENTER(); res_lib_cfg_ringstatusget.header.id = MESSAGE_RES_CFG_RINGSTATUSGET; res_lib_cfg_ringstatusget.header.size = sizeof (struct res_lib_cfg_ringstatusget); res_lib_cfg_ringstatusget.header.error = CS_OK; api->totem_ifaces_get ( api->totem_nodeid_get(), interfaces, &status, &iface_count); res_lib_cfg_ringstatusget.interface_count = iface_count; for (i = 0; i < iface_count; i++) { - totem_ip_string = (char *)api->totem_ip_print (&interfaces[i]); + totem_ip_string + = (const char *)api->totem_ip_print (&interfaces[i]); strcpy ((char *)&res_lib_cfg_ringstatusget.interface_status[i], status[i]); strcpy ((char *)&res_lib_cfg_ringstatusget.interface_name[i], totem_ip_string); } api->ipc_response_send ( conn, &res_lib_cfg_ringstatusget, sizeof (struct res_lib_cfg_ringstatusget)); LEAVE(); } static void message_handler_req_lib_cfg_ringreenable ( void *conn, - void *msg) + const void *msg) { struct req_exec_cfg_ringreenable req_exec_cfg_ringreenable; struct iovec iovec; ENTER(); req_exec_cfg_ringreenable.header.size = sizeof (struct req_exec_cfg_ringreenable); req_exec_cfg_ringreenable.header.id = SERVICE_ID_MAKE (CFG_SERVICE, MESSAGE_REQ_EXEC_CFG_RINGREENABLE); api->ipc_source_set (&req_exec_cfg_ringreenable.source, conn); iovec.iov_base = (char *)&req_exec_cfg_ringreenable; iovec.iov_len = sizeof (struct req_exec_cfg_ringreenable); assert (api->totem_mcast (&iovec, 1, TOTEM_SAFE) == 0); LEAVE(); } static void message_handler_req_lib_cfg_statetrack ( void *conn, - void *msg) + const void *msg) { struct cfg_info *ci = (struct cfg_info *)api->ipc_private_data_get (conn); struct res_lib_cfg_statetrack res_lib_cfg_statetrack; ENTER(); /* * We only do shutdown tracking at the moment */ if (list_empty(&ci->list)) { list_add(&ci->list, &trackers_list); ci->tracker_conn = conn; if (shutdown_con) { /* * Shutdown already in progress, ask the newcomer's opinion */ ci->shutdown_reply = SHUTDOWN_REPLY_UNKNOWN; shutdown_expected++; send_test_shutdown(conn, NULL, CS_OK); } } res_lib_cfg_statetrack.header.size = sizeof(struct res_lib_cfg_statetrack); res_lib_cfg_statetrack.header.id = MESSAGE_RES_CFG_STATETRACKSTART; res_lib_cfg_statetrack.header.error = CS_OK; api->ipc_response_send(conn, &res_lib_cfg_statetrack, sizeof(res_lib_cfg_statetrack)); LEAVE(); } static void message_handler_req_lib_cfg_statetrackstop ( void *conn, - void *msg) + const void *msg) { struct cfg_info *ci = (struct cfg_info *)api->ipc_private_data_get (conn); // struct req_lib_cfg_statetrackstop *req_lib_cfg_statetrackstop = (struct req_lib_cfg_statetrackstop *)message; ENTER(); remove_ci_from_shutdown(ci); LEAVE(); } static void message_handler_req_lib_cfg_administrativestateset ( void *conn, - void *msg) + const void *msg) { // struct req_lib_cfg_administrativestateset *req_lib_cfg_administrativestateset = (struct req_lib_cfg_administrativestateset *)message; ENTER(); LEAVE(); } static void message_handler_req_lib_cfg_administrativestateget ( void *conn, - void *msg) + const void *msg) { // struct req_lib_cfg_administrativestateget *req_lib_cfg_administrativestateget = (struct req_lib_cfg_administrativestateget *)message; ENTER(); LEAVE(); } static void message_handler_req_lib_cfg_serviceload ( void *conn, - void *msg) + const void *msg) { - struct req_lib_cfg_serviceload *req_lib_cfg_serviceload = - (struct req_lib_cfg_serviceload *)msg; + const struct req_lib_cfg_serviceload *req_lib_cfg_serviceload = msg; struct res_lib_cfg_serviceload res_lib_cfg_serviceload; ENTER(); api->service_link_and_init ( api, - (char *)req_lib_cfg_serviceload->service_name, + (const char *)req_lib_cfg_serviceload->service_name, req_lib_cfg_serviceload->service_ver); res_lib_cfg_serviceload.header.id = MESSAGE_RES_CFG_SERVICEUNLOAD; res_lib_cfg_serviceload.header.size = sizeof (struct res_lib_cfg_serviceload); res_lib_cfg_serviceload.header.error = CS_OK; api->ipc_response_send ( conn, &res_lib_cfg_serviceload, sizeof (struct res_lib_cfg_serviceload)); LEAVE(); } static void message_handler_req_lib_cfg_serviceunload ( void *conn, - void *msg) + const void *msg) { - struct req_lib_cfg_serviceunload *req_lib_cfg_serviceunload = - (struct req_lib_cfg_serviceunload *)msg; + const struct req_lib_cfg_serviceunload *req_lib_cfg_serviceunload = msg; struct res_lib_cfg_serviceunload res_lib_cfg_serviceunload; ENTER(); api->service_unlink_and_exit ( api, - (char *)req_lib_cfg_serviceunload->service_name, + (const char *)req_lib_cfg_serviceunload->service_name, req_lib_cfg_serviceunload->service_ver); res_lib_cfg_serviceunload.header.id = MESSAGE_RES_CFG_SERVICEUNLOAD; res_lib_cfg_serviceunload.header.size = sizeof (struct res_lib_cfg_serviceunload); res_lib_cfg_serviceunload.header.error = CS_OK; api->ipc_response_send ( conn, &res_lib_cfg_serviceunload, sizeof (struct res_lib_cfg_serviceunload)); LEAVE(); } static void message_handler_req_lib_cfg_killnode ( void *conn, - void *msg) + const void *msg) { - struct req_lib_cfg_killnode *req_lib_cfg_killnode = (struct req_lib_cfg_killnode *)msg; + const struct req_lib_cfg_killnode *req_lib_cfg_killnode = msg; struct res_lib_cfg_killnode res_lib_cfg_killnode; struct req_exec_cfg_killnode req_exec_cfg_killnode; struct iovec iovec; int res; ENTER(); req_exec_cfg_killnode.header.size = sizeof (struct req_exec_cfg_killnode); req_exec_cfg_killnode.header.id = SERVICE_ID_MAKE (CFG_SERVICE, MESSAGE_REQ_EXEC_CFG_KILLNODE); req_exec_cfg_killnode.nodeid = req_lib_cfg_killnode->nodeid; marshall_to_mar_name_t(&req_exec_cfg_killnode.reason, &req_lib_cfg_killnode->reason); iovec.iov_base = (char *)&req_exec_cfg_killnode; iovec.iov_len = sizeof (struct req_exec_cfg_killnode); res = api->totem_mcast (&iovec, 1, TOTEM_SAFE); res_lib_cfg_killnode.header.size = sizeof(struct res_lib_cfg_killnode); res_lib_cfg_killnode.header.id = MESSAGE_RES_CFG_KILLNODE; res_lib_cfg_killnode.header.error = CS_OK; api->ipc_response_send(conn, &res_lib_cfg_killnode, sizeof(res_lib_cfg_killnode)); LEAVE(); } static void message_handler_req_lib_cfg_tryshutdown ( void *conn, - void *msg) + const void *msg) { struct cfg_info *ci = (struct cfg_info *)api->ipc_private_data_get (conn); - struct req_lib_cfg_tryshutdown *req_lib_cfg_tryshutdown = (struct req_lib_cfg_tryshutdown *)msg; + const struct req_lib_cfg_tryshutdown *req_lib_cfg_tryshutdown = msg; struct res_lib_cfg_tryshutdown res_lib_cfg_tryshutdown; struct list_head *iter; ENTER(); if (req_lib_cfg_tryshutdown->flags == CFG_SHUTDOWN_FLAG_IMMEDIATE) { /* * Tell other nodes */ send_shutdown(); res_lib_cfg_tryshutdown.header.size = sizeof(struct res_lib_cfg_tryshutdown); res_lib_cfg_tryshutdown.header.id = MESSAGE_RES_CFG_TRYSHUTDOWN; res_lib_cfg_tryshutdown.header.error = CS_OK; api->ipc_response_send(conn, &res_lib_cfg_tryshutdown, sizeof(res_lib_cfg_tryshutdown)); LEAVE(); return; } /* * Shutdown in progress, return an error */ if (shutdown_con) { struct res_lib_cfg_tryshutdown res_lib_cfg_tryshutdown; res_lib_cfg_tryshutdown.header.size = sizeof(struct res_lib_cfg_tryshutdown); res_lib_cfg_tryshutdown.header.id = MESSAGE_RES_CFG_TRYSHUTDOWN; res_lib_cfg_tryshutdown.header.error = CS_ERR_EXIST; api->ipc_response_send(conn, &res_lib_cfg_tryshutdown, sizeof(res_lib_cfg_tryshutdown)); LEAVE(); return; } ci->conn = conn; shutdown_con = (struct cfg_info *)api->ipc_private_data_get (conn); shutdown_flags = req_lib_cfg_tryshutdown->flags; shutdown_yes = 0; shutdown_no = 0; /* * Count the number of listeners */ shutdown_expected = 0; for (iter = trackers_list.next; iter != &trackers_list; iter = iter->next) { struct cfg_info *testci = list_entry(iter, struct cfg_info, list); /* * It is assumed that we will allow shutdown */ if (testci != ci) { testci->shutdown_reply = SHUTDOWN_REPLY_UNKNOWN; shutdown_expected++; } } /* * If no-one is listening for events then we can just go down now */ if (shutdown_expected == 0) { struct res_lib_cfg_tryshutdown res_lib_cfg_tryshutdown; res_lib_cfg_tryshutdown.header.size = sizeof(struct res_lib_cfg_tryshutdown); res_lib_cfg_tryshutdown.header.id = MESSAGE_RES_CFG_TRYSHUTDOWN; res_lib_cfg_tryshutdown.header.error = CS_OK; /* * Tell originator that shutdown was confirmed */ api->ipc_response_send(conn, &res_lib_cfg_tryshutdown, sizeof(res_lib_cfg_tryshutdown)); send_shutdown(); LEAVE(); return; } else { hdb_handle_t cfg_handle; hdb_handle_t find_handle; char *timeout_str; unsigned int shutdown_timeout = DEFAULT_SHUTDOWN_TIMEOUT; /* * Look for a shutdown timeout in objdb */ api->object_find_create(OBJECT_PARENT_HANDLE, "cfg", strlen("cfg"), &find_handle); api->object_find_next(find_handle, &cfg_handle); api->object_find_destroy(find_handle); if (cfg_handle) { if ( !api->object_key_get(cfg_handle, "shutdown_timeout", strlen("shutdown_timeout"), (void *)&timeout_str, NULL)) { shutdown_timeout = atoi(timeout_str); } } /* * Start the timer. If we don't get a full set of replies before this goes * off we'll cancel the shutdown */ api->timer_add_duration((unsigned long long)shutdown_timeout*1000000000, NULL, shutdown_timer_fn, &shutdown_timer); /* * Tell the users we would like to shut down */ send_test_shutdown(NULL, conn, CS_OK); } /* * We don't sent a reply to the caller here. * We send it when we know if we can shut down or not */ LEAVE(); } static void message_handler_req_lib_cfg_replytoshutdown ( void *conn, - void *msg) + const void *msg) { struct cfg_info *ci = (struct cfg_info *)api->ipc_private_data_get (conn); - struct req_lib_cfg_replytoshutdown *req_lib_cfg_replytoshutdown = (struct req_lib_cfg_replytoshutdown *)msg; + const struct req_lib_cfg_replytoshutdown *req_lib_cfg_replytoshutdown = msg; struct res_lib_cfg_replytoshutdown res_lib_cfg_replytoshutdown; int status = CS_OK; ENTER(); if (!shutdown_con) { status = CS_ERR_ACCESS; goto exit_fn; } if (req_lib_cfg_replytoshutdown->response) { shutdown_yes++; ci->shutdown_reply = SHUTDOWN_REPLY_YES; } else { shutdown_no++; ci->shutdown_reply = SHUTDOWN_REPLY_NO; } check_shutdown_status(); exit_fn: res_lib_cfg_replytoshutdown.header.error = status; res_lib_cfg_replytoshutdown.header.id = MESSAGE_RES_CFG_REPLYTOSHUTDOWN; res_lib_cfg_replytoshutdown.header.size = sizeof(res_lib_cfg_replytoshutdown); api->ipc_response_send(conn, &res_lib_cfg_replytoshutdown, sizeof(res_lib_cfg_replytoshutdown)); LEAVE(); } -static void message_handler_req_lib_cfg_get_node_addrs (void *conn, void *msg) +static void message_handler_req_lib_cfg_get_node_addrs (void *conn, + const void *msg) { struct totem_ip_address node_ifs[INTERFACE_MAX]; char buf[PIPE_BUF]; char **status; unsigned int num_interfaces = 0; int ret = CS_OK; int i; - struct req_lib_cfg_get_node_addrs *req_lib_cfg_get_node_addrs = (struct req_lib_cfg_get_node_addrs *)msg; + const struct req_lib_cfg_get_node_addrs *req_lib_cfg_get_node_addrs = msg; struct res_lib_cfg_get_node_addrs *res_lib_cfg_get_node_addrs = (struct res_lib_cfg_get_node_addrs *)buf; + unsigned int nodeid = req_lib_cfg_get_node_addrs->nodeid; - if (req_lib_cfg_get_node_addrs->nodeid == 0) - req_lib_cfg_get_node_addrs->nodeid = api->totem_nodeid_get(); + if (nodeid == 0) + nodeid = api->totem_nodeid_get(); - api->totem_ifaces_get(req_lib_cfg_get_node_addrs->nodeid, node_ifs, &status, &num_interfaces); + api->totem_ifaces_get(nodeid, node_ifs, &status, &num_interfaces); res_lib_cfg_get_node_addrs->header.size = sizeof(struct res_lib_cfg_get_node_addrs) + (num_interfaces * TOTEMIP_ADDRLEN); res_lib_cfg_get_node_addrs->header.id = MESSAGE_RES_CFG_GET_NODE_ADDRS; res_lib_cfg_get_node_addrs->header.error = ret; res_lib_cfg_get_node_addrs->num_addrs = num_interfaces; if (num_interfaces) { res_lib_cfg_get_node_addrs->family = node_ifs[0].family; for (i = 0; iaddrs[i][0], node_ifs[i].addr, TOTEMIP_ADDRLEN); } } else { res_lib_cfg_get_node_addrs->header.error = CS_ERR_NOT_EXIST; } api->ipc_response_send(conn, res_lib_cfg_get_node_addrs, res_lib_cfg_get_node_addrs->header.size); } -static void message_handler_req_lib_cfg_local_get (void *conn, void *message) +static void message_handler_req_lib_cfg_local_get (void *conn, const void *msg) { struct res_lib_cfg_local_get res_lib_cfg_local_get; res_lib_cfg_local_get.header.size = sizeof(res_lib_cfg_local_get); res_lib_cfg_local_get.header.id = MESSAGE_RES_CFG_LOCAL_GET; res_lib_cfg_local_get.header.error = CS_OK; res_lib_cfg_local_get.local_nodeid = api->totem_nodeid_get (); api->ipc_response_send(conn, &res_lib_cfg_local_get, sizeof(res_lib_cfg_local_get)); } diff --git a/services/confdb.c b/services/confdb.c index ad19a207..6ae5fdb1 100644 --- a/services/confdb.c +++ b/services/confdb.c @@ -1,718 +1,760 @@ /* * Copyright (c) 2008-2009 Red Hat, Inc. * * All rights reserved. * * Author: Christine Caulfield (ccaulfie@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include LOGSYS_DECLARE_SUBSYS ("CONFDB", LOG_INFO); static struct corosync_api_v1 *api; static int confdb_exec_init_fn ( struct corosync_api_v1 *corosync_api); static int confdb_lib_init_fn (void *conn); static int confdb_lib_exit_fn (void *conn); -static void message_handler_req_lib_confdb_object_create (void *conn, void *message); -static void message_handler_req_lib_confdb_object_destroy (void *conn, void *message); -static void message_handler_req_lib_confdb_object_find_destroy (void *conn, void *message); - -static void message_handler_req_lib_confdb_key_create (void *conn, void *message); -static void message_handler_req_lib_confdb_key_get (void *conn, void *message); -static void message_handler_req_lib_confdb_key_replace (void *conn, void *message); -static void message_handler_req_lib_confdb_key_delete (void *conn, void *message); -static void message_handler_req_lib_confdb_key_iter (void *conn, void *message); - -static void message_handler_req_lib_confdb_key_increment (void *conn, void *message); -static void message_handler_req_lib_confdb_key_decrement (void *conn, void *message); - -static void message_handler_req_lib_confdb_object_iter (void *conn, void *message); -static void message_handler_req_lib_confdb_object_find (void *conn, void *message); - -static void message_handler_req_lib_confdb_object_parent_get (void *conn, void *message); -static void message_handler_req_lib_confdb_write (void *conn, void *message); -static void message_handler_req_lib_confdb_reload (void *conn, void *message); - -static void message_handler_req_lib_confdb_track_start (void *conn, void *message); -static void message_handler_req_lib_confdb_track_stop (void *conn, void *message); +static void message_handler_req_lib_confdb_object_create (void *conn, + const void *message); +static void message_handler_req_lib_confdb_object_destroy (void *conn, + const void *message); +static void message_handler_req_lib_confdb_object_find_destroy (void *conn, + const void *message); + +static void message_handler_req_lib_confdb_key_create (void *conn, + const void *message); +static void message_handler_req_lib_confdb_key_get (void *conn, + const void *message); +static void message_handler_req_lib_confdb_key_replace (void *conn, + const void *message); +static void message_handler_req_lib_confdb_key_delete (void *conn, + const void *message); +static void message_handler_req_lib_confdb_key_iter (void *conn, + const void *message); + +static void message_handler_req_lib_confdb_key_increment (void *conn, + const void *message); +static void message_handler_req_lib_confdb_key_decrement (void *conn, + const void *message); + +static void message_handler_req_lib_confdb_object_iter (void *conn, + const void *message); +static void message_handler_req_lib_confdb_object_find (void *conn, + const void *message); + +static void message_handler_req_lib_confdb_object_parent_get (void *conn, + const void *message); +static void message_handler_req_lib_confdb_write (void *conn, + const void *message); +static void message_handler_req_lib_confdb_reload (void *conn, + const void *message); + +static void message_handler_req_lib_confdb_track_start (void *conn, + const void *message); +static void message_handler_req_lib_confdb_track_stop (void *conn, + const void *message); static void confdb_notify_lib_of_key_change( object_change_type_t change_type, hdb_handle_t parent_object_handle, hdb_handle_t object_handle, const void *object_name_pt, size_t object_name_len, const void *key_name_pt, size_t key_name_len, const void *key_value_pt, size_t key_value_len, void *priv_data_pt); static void confdb_notify_lib_of_new_object( hdb_handle_t parent_object_handle, hdb_handle_t object_handle, const uint8_t *name_pt, size_t name_len, void *priv_data_pt); static void confdb_notify_lib_of_destroyed_object( hdb_handle_t parent_object_handle, const uint8_t *name_pt, size_t name_len, void *priv_data_pt); /* * Library Handler Definition */ static struct corosync_lib_handler confdb_lib_engine[] = { { /* 0 */ .lib_handler_fn = message_handler_req_lib_confdb_object_create, .response_size = sizeof (mar_res_header_t), .response_id = MESSAGE_RES_CONFDB_OBJECT_CREATE, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 1 */ .lib_handler_fn = message_handler_req_lib_confdb_object_destroy, .response_size = sizeof (mar_res_header_t), .response_id = MESSAGE_RES_CONFDB_OBJECT_DESTROY, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 2 */ .lib_handler_fn = message_handler_req_lib_confdb_object_find, .response_size = sizeof (struct res_lib_confdb_object_find), .response_id = MESSAGE_RES_CONFDB_OBJECT_FIND, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 3 */ .lib_handler_fn = message_handler_req_lib_confdb_key_create, .response_size = sizeof (mar_res_header_t), .response_id = MESSAGE_RES_CONFDB_KEY_CREATE, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 4 */ .lib_handler_fn = message_handler_req_lib_confdb_key_get, .response_size = sizeof (struct res_lib_confdb_key_get), .response_id = MESSAGE_RES_CONFDB_KEY_GET, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 5 */ .lib_handler_fn = message_handler_req_lib_confdb_key_replace, .response_size = sizeof (mar_res_header_t), .response_id = MESSAGE_RES_CONFDB_KEY_REPLACE, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 6 */ .lib_handler_fn = message_handler_req_lib_confdb_key_delete, .response_size = sizeof (mar_res_header_t), .response_id = MESSAGE_RES_CONFDB_KEY_DELETE, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 7 */ .lib_handler_fn = message_handler_req_lib_confdb_object_iter, .response_size = sizeof (struct res_lib_confdb_object_iter), .response_id = MESSAGE_RES_CONFDB_OBJECT_ITER, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 8 */ .lib_handler_fn = message_handler_req_lib_confdb_object_parent_get, .response_size = sizeof (struct res_lib_confdb_object_parent_get), .response_id = MESSAGE_RES_CONFDB_OBJECT_PARENT_GET, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 9 */ .lib_handler_fn = message_handler_req_lib_confdb_key_iter, .response_size = sizeof (struct res_lib_confdb_key_iter), .response_id = MESSAGE_RES_CONFDB_KEY_ITER, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 10 */ .lib_handler_fn = message_handler_req_lib_confdb_track_start, .response_size = sizeof (mar_res_header_t), .response_id = MESSAGE_RES_CONFDB_TRACK_START, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 11 */ .lib_handler_fn = message_handler_req_lib_confdb_track_stop, .response_size = sizeof (mar_res_header_t), .response_id = MESSAGE_RES_CONFDB_TRACK_STOP, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 12 */ .lib_handler_fn = message_handler_req_lib_confdb_write, .response_size = sizeof (struct res_lib_confdb_write), .response_id = MESSAGE_RES_CONFDB_WRITE, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 13 */ .lib_handler_fn = message_handler_req_lib_confdb_reload, .response_size = sizeof (struct res_lib_confdb_reload), .response_id = MESSAGE_RES_CONFDB_RELOAD, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 14 */ .lib_handler_fn = message_handler_req_lib_confdb_object_find_destroy, .response_size = sizeof (mar_res_header_t), .response_id = MESSAGE_RES_CONFDB_OBJECT_FIND_DESTROY, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 15 */ .lib_handler_fn = message_handler_req_lib_confdb_key_increment, .response_size = sizeof (struct res_lib_confdb_key_incdec), .response_id = MESSAGE_RES_CONFDB_KEY_INCREMENT, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 16 */ .lib_handler_fn = message_handler_req_lib_confdb_key_decrement, .response_size = sizeof (struct res_lib_confdb_key_incdec), .response_id = MESSAGE_RES_CONFDB_KEY_DECREMENT, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, }; struct corosync_service_engine confdb_service_engine = { .name = "corosync cluster config database access v1.01", .id = CONFDB_SERVICE, .private_data_size = 0, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED, .allow_inquorate = CS_LIB_ALLOW_INQUORATE, .lib_init_fn = confdb_lib_init_fn, .lib_exit_fn = confdb_lib_exit_fn, .lib_engine = confdb_lib_engine, .lib_engine_count = sizeof (confdb_lib_engine) / sizeof (struct corosync_lib_handler), .exec_init_fn = confdb_exec_init_fn, }; /* * Dynamic loader definition */ static struct corosync_service_engine *confdb_get_service_engine_ver0 (void); static struct corosync_service_engine_iface_ver0 confdb_service_engine_iface = { .corosync_get_service_engine_ver0 = confdb_get_service_engine_ver0 }; static struct lcr_iface corosync_confdb_ver0[1] = { { .name = "corosync_confdb", .version = 0, .versions_replace = 0, .versions_replace_count = 0, .dependencies = 0, .dependency_count = 0, .constructor = NULL, .destructor = NULL, .interfaces = NULL } }; static struct lcr_comp confdb_comp_ver0 = { .iface_count = 1, .ifaces = corosync_confdb_ver0 }; static struct corosync_service_engine *confdb_get_service_engine_ver0 (void) { return (&confdb_service_engine); } __attribute__ ((constructor)) static void confdb_comp_register (void) { lcr_interfaces_set (&corosync_confdb_ver0[0], &confdb_service_engine_iface); lcr_component_register (&confdb_comp_ver0); } static int confdb_exec_init_fn ( struct corosync_api_v1 *corosync_api) { api = corosync_api; return 0; } static int confdb_lib_init_fn (void *conn) { log_printf(LOG_LEVEL_DEBUG, "lib_init_fn: conn=%p\n", conn); return (0); } static int confdb_lib_exit_fn (void *conn) { log_printf(LOG_LEVEL_DEBUG, "exit_fn for conn=%p\n", conn); /* cleanup the object trackers for this client. */ api->object_track_stop(confdb_notify_lib_of_key_change, confdb_notify_lib_of_new_object, confdb_notify_lib_of_destroyed_object, NULL, conn); return (0); } -static void message_handler_req_lib_confdb_object_create (void *conn, void *message) +static void message_handler_req_lib_confdb_object_create (void *conn, + const void *message) { - struct req_lib_confdb_object_create *req_lib_confdb_object_create = (struct req_lib_confdb_object_create *)message; + const struct req_lib_confdb_object_create *req_lib_confdb_object_create + = message; struct res_lib_confdb_object_create res_lib_confdb_object_create; hdb_handle_t object_handle; int ret = CS_OK; if (api->object_create(req_lib_confdb_object_create->parent_object_handle, &object_handle, req_lib_confdb_object_create->object_name.value, req_lib_confdb_object_create->object_name.length)) ret = CS_ERR_ACCESS; res_lib_confdb_object_create.object_handle = object_handle; res_lib_confdb_object_create.header.size = sizeof(res_lib_confdb_object_create); res_lib_confdb_object_create.header.id = MESSAGE_RES_CONFDB_OBJECT_CREATE; res_lib_confdb_object_create.header.error = ret; api->ipc_response_send(conn, &res_lib_confdb_object_create, sizeof(res_lib_confdb_object_create)); } -static void message_handler_req_lib_confdb_object_destroy (void *conn, void *message) +static void message_handler_req_lib_confdb_object_destroy (void *conn, + const void *message) { - struct req_lib_confdb_object_destroy *req_lib_confdb_object_destroy = (struct req_lib_confdb_object_destroy *)message; + const struct req_lib_confdb_object_destroy *req_lib_confdb_object_destroy + = message; mar_res_header_t res; int ret = CS_OK; if (api->object_destroy(req_lib_confdb_object_destroy->object_handle)) ret = CS_ERR_ACCESS; res.size = sizeof(res); res.id = MESSAGE_RES_CONFDB_OBJECT_DESTROY; res.error = ret; api->ipc_response_send(conn, &res, sizeof(res)); } -static void message_handler_req_lib_confdb_object_find_destroy (void *conn, void *message) +static void message_handler_req_lib_confdb_object_find_destroy (void *conn, + const void *message) { - struct req_lib_confdb_object_find_destroy *req_lib_confdb_object_find_destroy = (struct req_lib_confdb_object_find_destroy *)message; + const struct req_lib_confdb_object_find_destroy + *req_lib_confdb_object_find_destroy = message; mar_res_header_t res; int ret = CS_OK; if (api->object_find_destroy(req_lib_confdb_object_find_destroy->find_handle)) ret = CS_ERR_ACCESS; res.size = sizeof(res); res.id = MESSAGE_RES_CONFDB_OBJECT_FIND_DESTROY; res.error = ret; api->ipc_response_send(conn, &res, sizeof(res)); } -static void message_handler_req_lib_confdb_key_create (void *conn, void *message) +static void message_handler_req_lib_confdb_key_create (void *conn, + const void *message) { - struct req_lib_confdb_key_create *req_lib_confdb_key_create = (struct req_lib_confdb_key_create *)message; + const struct req_lib_confdb_key_create *req_lib_confdb_key_create + = message; mar_res_header_t res; int ret = CS_OK; if (api->object_key_create(req_lib_confdb_key_create->object_handle, req_lib_confdb_key_create->key_name.value, req_lib_confdb_key_create->key_name.length, req_lib_confdb_key_create->value.value, req_lib_confdb_key_create->value.length)) ret = CS_ERR_ACCESS; res.size = sizeof(res); res.id = MESSAGE_RES_CONFDB_KEY_CREATE; res.error = ret; api->ipc_response_send(conn, &res, sizeof(res)); } -static void message_handler_req_lib_confdb_key_get (void *conn, void *message) +static void message_handler_req_lib_confdb_key_get (void *conn, + const void *message) { - struct req_lib_confdb_key_get *req_lib_confdb_key_get = (struct req_lib_confdb_key_get *)message; + const struct req_lib_confdb_key_get *req_lib_confdb_key_get = message; struct res_lib_confdb_key_get res_lib_confdb_key_get; size_t value_len; void *value; int ret = CS_OK; if (api->object_key_get(req_lib_confdb_key_get->parent_object_handle, req_lib_confdb_key_get->key_name.value, req_lib_confdb_key_get->key_name.length, &value, &value_len)) ret = CS_ERR_ACCESS; else { memcpy(res_lib_confdb_key_get.value.value, value, value_len); res_lib_confdb_key_get.value.length = value_len; } res_lib_confdb_key_get.header.size = sizeof(res_lib_confdb_key_get); res_lib_confdb_key_get.header.id = MESSAGE_RES_CONFDB_KEY_GET; res_lib_confdb_key_get.header.error = ret; api->ipc_response_send(conn, &res_lib_confdb_key_get, sizeof(res_lib_confdb_key_get)); } -static void message_handler_req_lib_confdb_key_increment (void *conn, void *message) +static void message_handler_req_lib_confdb_key_increment (void *conn, + const void *message) { - struct req_lib_confdb_key_get *req_lib_confdb_key_get = (struct req_lib_confdb_key_get *)message; + const struct req_lib_confdb_key_get *req_lib_confdb_key_get = message; struct res_lib_confdb_key_incdec res_lib_confdb_key_incdec; int ret = CS_OK; if (api->object_key_increment(req_lib_confdb_key_get->parent_object_handle, req_lib_confdb_key_get->key_name.value, req_lib_confdb_key_get->key_name.length, &res_lib_confdb_key_incdec.value)) ret = CS_ERR_ACCESS; res_lib_confdb_key_incdec.header.size = sizeof(res_lib_confdb_key_incdec); res_lib_confdb_key_incdec.header.id = MESSAGE_RES_CONFDB_KEY_INCREMENT; res_lib_confdb_key_incdec.header.error = ret; api->ipc_response_send(conn, &res_lib_confdb_key_incdec, sizeof(res_lib_confdb_key_incdec)); } -static void message_handler_req_lib_confdb_key_decrement (void *conn, void *message) +static void message_handler_req_lib_confdb_key_decrement (void *conn, + const void *message) { - struct req_lib_confdb_key_get *req_lib_confdb_key_get = (struct req_lib_confdb_key_get *)message; + const struct req_lib_confdb_key_get *req_lib_confdb_key_get = message; struct res_lib_confdb_key_incdec res_lib_confdb_key_incdec; int ret = CS_OK; if (api->object_key_decrement(req_lib_confdb_key_get->parent_object_handle, req_lib_confdb_key_get->key_name.value, req_lib_confdb_key_get->key_name.length, &res_lib_confdb_key_incdec.value)) ret = CS_ERR_ACCESS; res_lib_confdb_key_incdec.header.size = sizeof(res_lib_confdb_key_incdec); res_lib_confdb_key_incdec.header.id = MESSAGE_RES_CONFDB_KEY_DECREMENT; res_lib_confdb_key_incdec.header.error = ret; api->ipc_response_send(conn, &res_lib_confdb_key_incdec, sizeof(res_lib_confdb_key_incdec)); } -static void message_handler_req_lib_confdb_key_replace (void *conn, void *message) +static void message_handler_req_lib_confdb_key_replace (void *conn, + const void *message) { const struct req_lib_confdb_key_replace *req_lib_confdb_key_replace = message; mar_res_header_t res; int ret = CS_OK; if (api->object_key_replace(req_lib_confdb_key_replace->object_handle, req_lib_confdb_key_replace->key_name.value, req_lib_confdb_key_replace->key_name.length, req_lib_confdb_key_replace->new_value.value, req_lib_confdb_key_replace->new_value.length)) ret = CS_ERR_ACCESS; res.size = sizeof(res); res.id = MESSAGE_RES_CONFDB_KEY_REPLACE; res.error = ret; api->ipc_response_send(conn, &res, sizeof(res)); } -static void message_handler_req_lib_confdb_key_delete (void *conn, void *message) +static void message_handler_req_lib_confdb_key_delete (void *conn, + const void *message) { - struct req_lib_confdb_key_delete *req_lib_confdb_key_delete = (struct req_lib_confdb_key_delete *)message; + const struct req_lib_confdb_key_delete *req_lib_confdb_key_delete + = message; mar_res_header_t res; int ret = CS_OK; if (api->object_key_delete(req_lib_confdb_key_delete->object_handle, req_lib_confdb_key_delete->key_name.value, req_lib_confdb_key_delete->key_name.length)) ret = CS_ERR_ACCESS; res.size = sizeof(res); res.id = MESSAGE_RES_CONFDB_KEY_DELETE; res.error = ret; api->ipc_response_send(conn, &res, sizeof(res)); } -static void message_handler_req_lib_confdb_object_parent_get (void *conn, void *message) +static void message_handler_req_lib_confdb_object_parent_get (void *conn, + const void *message) { - struct req_lib_confdb_object_parent_get *req_lib_confdb_object_parent_get = (struct req_lib_confdb_object_parent_get *)message; + const struct req_lib_confdb_object_parent_get + *req_lib_confdb_object_parent_get = message; struct res_lib_confdb_object_parent_get res_lib_confdb_object_parent_get; hdb_handle_t object_handle; int ret = CS_OK; if (api->object_parent_get(req_lib_confdb_object_parent_get->object_handle, &object_handle)) ret = CS_ERR_ACCESS; res_lib_confdb_object_parent_get.parent_object_handle = object_handle; res_lib_confdb_object_parent_get.header.size = sizeof(res_lib_confdb_object_parent_get); res_lib_confdb_object_parent_get.header.id = MESSAGE_RES_CONFDB_OBJECT_CREATE; res_lib_confdb_object_parent_get.header.error = ret; api->ipc_response_send(conn, &res_lib_confdb_object_parent_get, sizeof(res_lib_confdb_object_parent_get)); } -static void message_handler_req_lib_confdb_key_iter (void *conn, void *message) +static void message_handler_req_lib_confdb_key_iter (void *conn, + const void *message) { - struct req_lib_confdb_key_iter *req_lib_confdb_key_iter = (struct req_lib_confdb_key_iter *)message; + const struct req_lib_confdb_key_iter *req_lib_confdb_key_iter = message; struct res_lib_confdb_key_iter res_lib_confdb_key_iter; void *key_name; size_t key_name_len; void *value; size_t value_len; int ret = CS_OK; if (api->object_key_iter_from(req_lib_confdb_key_iter->parent_object_handle, req_lib_confdb_key_iter->next_entry, &key_name, &key_name_len, &value, &value_len)) ret = CS_ERR_ACCESS; else { memcpy(res_lib_confdb_key_iter.key_name.value, key_name, key_name_len); memcpy(res_lib_confdb_key_iter.value.value, value, value_len); res_lib_confdb_key_iter.key_name.length = key_name_len; res_lib_confdb_key_iter.value.length = value_len; } res_lib_confdb_key_iter.header.size = sizeof(res_lib_confdb_key_iter); res_lib_confdb_key_iter.header.id = MESSAGE_RES_CONFDB_KEY_ITER; res_lib_confdb_key_iter.header.error = ret; api->ipc_response_send(conn, &res_lib_confdb_key_iter, sizeof(res_lib_confdb_key_iter)); } -static void message_handler_req_lib_confdb_object_iter (void *conn, void *message) +static void message_handler_req_lib_confdb_object_iter (void *conn, + const void *message) { - struct req_lib_confdb_object_iter *req_lib_confdb_object_iter = (struct req_lib_confdb_object_iter *)message; + const struct req_lib_confdb_object_iter *req_lib_confdb_object_iter + = message; struct res_lib_confdb_object_iter res_lib_confdb_object_iter; size_t object_name_len; int ret = CS_OK; if (!req_lib_confdb_object_iter->find_handle) { api->object_find_create(req_lib_confdb_object_iter->parent_object_handle, NULL, 0, &res_lib_confdb_object_iter.find_handle); } else res_lib_confdb_object_iter.find_handle = req_lib_confdb_object_iter->find_handle; if (api->object_find_next(res_lib_confdb_object_iter.find_handle, &res_lib_confdb_object_iter.object_handle)) { ret = CS_ERR_ACCESS; api->object_find_destroy(res_lib_confdb_object_iter.find_handle); } else { api->object_name_get(res_lib_confdb_object_iter.object_handle, (char *)res_lib_confdb_object_iter.object_name.value, &object_name_len); res_lib_confdb_object_iter.object_name.length = object_name_len; } res_lib_confdb_object_iter.header.size = sizeof(res_lib_confdb_object_iter); res_lib_confdb_object_iter.header.id = MESSAGE_RES_CONFDB_OBJECT_ITER; res_lib_confdb_object_iter.header.error = ret; api->ipc_response_send(conn, &res_lib_confdb_object_iter, sizeof(res_lib_confdb_object_iter)); } -static void message_handler_req_lib_confdb_object_find (void *conn, void *message) +static void message_handler_req_lib_confdb_object_find (void *conn, + const void *message) { - struct req_lib_confdb_object_find *req_lib_confdb_object_find = (struct req_lib_confdb_object_find *)message; + const struct req_lib_confdb_object_find *req_lib_confdb_object_find + = message; struct res_lib_confdb_object_find res_lib_confdb_object_find; int ret = CS_OK; if (!req_lib_confdb_object_find->find_handle) { api->object_find_create(req_lib_confdb_object_find->parent_object_handle, req_lib_confdb_object_find->object_name.value, req_lib_confdb_object_find->object_name.length, &res_lib_confdb_object_find.find_handle); } else res_lib_confdb_object_find.find_handle = req_lib_confdb_object_find->find_handle; if (api->object_find_next(res_lib_confdb_object_find.find_handle, &res_lib_confdb_object_find.object_handle)) { ret = CS_ERR_ACCESS; api->object_find_destroy(res_lib_confdb_object_find.find_handle); } res_lib_confdb_object_find.header.size = sizeof(res_lib_confdb_object_find); res_lib_confdb_object_find.header.id = MESSAGE_RES_CONFDB_OBJECT_FIND; res_lib_confdb_object_find.header.error = ret; api->ipc_response_send(conn, &res_lib_confdb_object_find, sizeof(res_lib_confdb_object_find)); } -static void message_handler_req_lib_confdb_write (void *conn, void *message) +static void message_handler_req_lib_confdb_write (void *conn, + const void *message) { struct res_lib_confdb_write res_lib_confdb_write; int ret = CS_OK; const char *error_string = NULL; if (api->object_write_config(&error_string)) ret = CS_ERR_ACCESS; res_lib_confdb_write.header.size = sizeof(res_lib_confdb_write); res_lib_confdb_write.header.id = MESSAGE_RES_CONFDB_WRITE; res_lib_confdb_write.header.error = ret; if (error_string) { strcpy((char *)res_lib_confdb_write.error.value, error_string); res_lib_confdb_write.error.length = strlen(error_string) + 1; } else res_lib_confdb_write.error.length = 0; api->ipc_response_send(conn, &res_lib_confdb_write, sizeof(res_lib_confdb_write)); } -static void message_handler_req_lib_confdb_reload (void *conn, void *message) +static void message_handler_req_lib_confdb_reload (void *conn, + const void *message) { - struct req_lib_confdb_reload *req_lib_confdb_reload = (struct req_lib_confdb_reload *)message; + const struct req_lib_confdb_reload *req_lib_confdb_reload = message; struct res_lib_confdb_reload res_lib_confdb_reload; int ret = CS_OK; const char *error_string = NULL; if (api->object_reload_config(req_lib_confdb_reload->flush, &error_string)) ret = CS_ERR_ACCESS; res_lib_confdb_reload.header.size = sizeof(res_lib_confdb_reload); res_lib_confdb_reload.header.id = MESSAGE_RES_CONFDB_RELOAD; res_lib_confdb_reload.header.error = ret; if(error_string) { strcpy((char *)res_lib_confdb_reload.error.value, error_string); res_lib_confdb_reload.error.length = strlen(error_string) + 1; } else res_lib_confdb_reload.error.length = 0; api->ipc_response_send(conn, &res_lib_confdb_reload, sizeof(res_lib_confdb_reload)); } static void confdb_notify_lib_of_key_change(object_change_type_t change_type, hdb_handle_t parent_object_handle, hdb_handle_t object_handle, const void *object_name_pt, size_t object_name_len, const void *key_name_pt, size_t key_name_len, const void *key_value_pt, size_t key_value_len, void *priv_data_pt) { struct res_lib_confdb_key_change_callback res; res.header.size = sizeof(res); res.header.id = MESSAGE_RES_CONFDB_KEY_CHANGE_CALLBACK; res.header.error = CS_OK; // handle & type res.change_type = change_type; res.parent_object_handle = parent_object_handle; res.object_handle = object_handle; //object memcpy(res.object_name.value, object_name_pt, object_name_len); res.object_name.length = object_name_len; //key name memcpy(res.key_name.value, key_name_pt, key_name_len); res.key_name.length = key_name_len; //key value memcpy(res.key_value.value, key_value_pt, key_value_len); res.key_value.length = key_value_len; api->ipc_dispatch_send(priv_data_pt, &res, sizeof(res)); } static void confdb_notify_lib_of_new_object(hdb_handle_t parent_object_handle, hdb_handle_t object_handle, const uint8_t *name_pt, size_t name_len, void *priv_data_pt) { struct res_lib_confdb_object_create_callback res; res.header.size = sizeof(res); res.header.id = MESSAGE_RES_CONFDB_OBJECT_CREATE_CALLBACK; res.header.error = CS_OK; res.parent_object_handle = parent_object_handle; res.object_handle = object_handle; memcpy(res.name.value, name_pt, name_len); res.name.length = name_len; api->ipc_dispatch_send(priv_data_pt, &res, sizeof(res)); } static void confdb_notify_lib_of_destroyed_object( hdb_handle_t parent_object_handle, const uint8_t *name_pt, size_t name_len, void *priv_data_pt) { struct res_lib_confdb_object_destroy_callback res; res.header.size = sizeof(res); res.header.id = MESSAGE_RES_CONFDB_OBJECT_DESTROY_CALLBACK; res.header.error = CS_OK; res.parent_object_handle = parent_object_handle; memcpy(res.name.value, name_pt, name_len); res.name.length = name_len; api->ipc_dispatch_send(priv_data_pt, &res, sizeof(res)); } -static void message_handler_req_lib_confdb_track_start (void *conn, void *message) +static void message_handler_req_lib_confdb_track_start (void *conn, + const void *message) { - struct req_lib_confdb_object_track_start *req = (struct req_lib_confdb_object_track_start *)message; + const struct req_lib_confdb_object_track_start *req = message; mar_res_header_t res; api->object_track_start(req->object_handle, req->flags, confdb_notify_lib_of_key_change, confdb_notify_lib_of_new_object, confdb_notify_lib_of_destroyed_object, NULL, conn); res.size = sizeof(res); res.id = MESSAGE_RES_CONFDB_TRACK_START; res.error = CS_OK; api->ipc_response_send(conn, &res, sizeof(res)); } -static void message_handler_req_lib_confdb_track_stop (void *conn, void *message) +static void message_handler_req_lib_confdb_track_stop (void *conn, + const void *message) { mar_res_header_t res; api->object_track_stop(confdb_notify_lib_of_key_change, confdb_notify_lib_of_new_object, confdb_notify_lib_of_destroyed_object, NULL, conn); res.size = sizeof(res); res.id = MESSAGE_RES_CONFDB_TRACK_STOP; res.error = CS_OK; api->ipc_response_send(conn, &res, sizeof(res)); } diff --git a/services/cpg.c b/services/cpg.c index 7bae0fd3..f60427a3 100644 --- a/services/cpg.c +++ b/services/cpg.c @@ -1,1231 +1,1241 @@ /* * Copyright (c) 2006-2009 Red Hat, Inc. * * All rights reserved. * * Author: Christine Caulfield (ccaulfie@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #ifndef COROSYNC_BSD #include #endif #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include LOGSYS_DECLARE_SUBSYS ("CPG", LOG_INFO); #define GROUP_HASH_SIZE 32 #define PI_FLAG_MEMBER 1 enum cpg_message_req_types { MESSAGE_REQ_EXEC_CPG_PROCJOIN = 0, MESSAGE_REQ_EXEC_CPG_PROCLEAVE = 1, MESSAGE_REQ_EXEC_CPG_JOINLIST = 2, MESSAGE_REQ_EXEC_CPG_MCAST = 3, MESSAGE_REQ_EXEC_CPG_DOWNLIST = 4 }; struct removed_group { struct group_info *gi; struct list_head list; /* on removed_list */ int left_list_entries; mar_cpg_address_t left_list[PROCESSOR_COUNT_MAX]; int left_list_size; }; struct group_info { mar_cpg_name_t group_name; struct list_head members; struct list_head list; /* on hash list */ struct removed_group *rg; /* when a node goes down */ }; struct process_info { unsigned int nodeid; uint32_t pid; uint32_t flags; void *conn; void *trackerconn; struct group_info *group; struct list_head list; /* on the group_info members list */ }; struct join_list_entry { uint32_t pid; mar_cpg_name_t group_name; }; static struct list_head group_lists[GROUP_HASH_SIZE]; static struct corosync_api_v1 *api = NULL; /* * Service Interfaces required by service_message_handler struct */ static void cpg_confchg_fn ( enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id); static int cpg_exec_init_fn (struct corosync_api_v1 *); static int cpg_lib_init_fn (void *conn); static int cpg_lib_exit_fn (void *conn); static void message_handler_req_exec_cpg_procjoin ( const void *message, unsigned int nodeid); static void message_handler_req_exec_cpg_procleave ( const void *message, unsigned int nodeid); static void message_handler_req_exec_cpg_joinlist ( const void *message, unsigned int nodeid); static void message_handler_req_exec_cpg_mcast ( const void *message, unsigned int nodeid); static void message_handler_req_exec_cpg_downlist ( const void *message, unsigned int nodeid); static void exec_cpg_procjoin_endian_convert (void *msg); static void exec_cpg_joinlist_endian_convert (void *msg); static void exec_cpg_mcast_endian_convert (void *msg); static void exec_cpg_downlist_endian_convert (void *msg); -static void message_handler_req_lib_cpg_join (void *conn, void *message); +static void message_handler_req_lib_cpg_join (void *conn, const void *message); -static void message_handler_req_lib_cpg_leave (void *conn, void *message); +static void message_handler_req_lib_cpg_leave (void *conn, const void *message); -static void message_handler_req_lib_cpg_mcast (void *conn, void *message); +static void message_handler_req_lib_cpg_mcast (void *conn, const void *message); -static void message_handler_req_lib_cpg_membership (void *conn, void *message); +static void message_handler_req_lib_cpg_membership (void *conn, + const void *message); -static void message_handler_req_lib_cpg_trackstart (void *conn, void *message); +static void message_handler_req_lib_cpg_trackstart (void *conn, + const void *message); -static void message_handler_req_lib_cpg_trackstop (void *conn, void *message); +static void message_handler_req_lib_cpg_trackstop (void *conn, + const void *message); -static void message_handler_req_lib_cpg_local_get (void *conn, void *message); +static void message_handler_req_lib_cpg_local_get (void *conn, + const void *message); -static void message_handler_req_lib_cpg_groups_get (void *conn, void *message); +static void message_handler_req_lib_cpg_groups_get (void *conn, + const void *message); static int cpg_node_joinleave_send (struct group_info *gi, struct process_info *pi, int fn, int reason); static int cpg_exec_send_joinlist(void); static void cpg_sync_init (void); static int cpg_sync_process (void); static void cpg_sync_activate (void); static void cpg_sync_abort (void); /* * Library Handler Definition */ static struct corosync_lib_handler cpg_lib_engine[] = { { /* 0 */ .lib_handler_fn = message_handler_req_lib_cpg_join, .response_size = sizeof (struct res_lib_cpg_join), .response_id = MESSAGE_RES_CPG_JOIN, .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED }, { /* 1 */ .lib_handler_fn = message_handler_req_lib_cpg_leave, .response_size = sizeof (struct res_lib_cpg_leave), .response_id = MESSAGE_RES_CPG_LEAVE, .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED }, { /* 2 */ .lib_handler_fn = message_handler_req_lib_cpg_mcast, .response_size = sizeof (struct res_lib_cpg_mcast), .response_id = MESSAGE_RES_CPG_MCAST, .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED }, { /* 3 */ .lib_handler_fn = message_handler_req_lib_cpg_membership, .response_size = sizeof (mar_res_header_t), .response_id = MESSAGE_RES_CPG_MEMBERSHIP, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 4 */ .lib_handler_fn = message_handler_req_lib_cpg_trackstart, .response_size = sizeof (struct res_lib_cpg_trackstart), .response_id = MESSAGE_RES_CPG_TRACKSTART, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 5 */ .lib_handler_fn = message_handler_req_lib_cpg_trackstop, .response_size = sizeof (struct res_lib_cpg_trackstart), .response_id = MESSAGE_RES_CPG_TRACKSTOP, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 6 */ .lib_handler_fn = message_handler_req_lib_cpg_local_get, .response_size = sizeof (struct res_lib_cpg_local_get), .response_id = MESSAGE_RES_CPG_LOCAL_GET, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 7 */ .lib_handler_fn = message_handler_req_lib_cpg_groups_get, .response_size = sizeof (struct res_lib_cpg_groups_get), .response_id = MESSAGE_RES_CPG_GROUPS_GET, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED } }; static struct corosync_exec_handler cpg_exec_engine[] = { { /* 0 */ .exec_handler_fn = message_handler_req_exec_cpg_procjoin, .exec_endian_convert_fn = exec_cpg_procjoin_endian_convert }, { /* 1 */ .exec_handler_fn = message_handler_req_exec_cpg_procleave, .exec_endian_convert_fn = exec_cpg_procjoin_endian_convert }, { /* 2 */ .exec_handler_fn = message_handler_req_exec_cpg_joinlist, .exec_endian_convert_fn = exec_cpg_joinlist_endian_convert }, { /* 3 */ .exec_handler_fn = message_handler_req_exec_cpg_mcast, .exec_endian_convert_fn = exec_cpg_mcast_endian_convert }, { /* 4 */ .exec_handler_fn = message_handler_req_exec_cpg_downlist, .exec_endian_convert_fn = exec_cpg_downlist_endian_convert }, }; struct corosync_service_engine cpg_service_engine = { .name = "corosync cluster closed process group service v1.01", .id = CPG_SERVICE, .private_data_size = sizeof (struct process_info), .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED, .allow_inquorate = CS_LIB_ALLOW_INQUORATE, .lib_init_fn = cpg_lib_init_fn, .lib_exit_fn = cpg_lib_exit_fn, .lib_engine = cpg_lib_engine, .lib_engine_count = sizeof (cpg_lib_engine) / sizeof (struct corosync_lib_handler), .exec_init_fn = cpg_exec_init_fn, .exec_dump_fn = NULL, .exec_engine = cpg_exec_engine, .exec_engine_count = sizeof (cpg_exec_engine) / sizeof (struct corosync_exec_handler), .confchg_fn = cpg_confchg_fn, .sync_init = cpg_sync_init, .sync_process = cpg_sync_process, .sync_activate = cpg_sync_activate, .sync_abort = cpg_sync_abort }; /* * Dynamic loader definition */ static struct corosync_service_engine *cpg_get_service_engine_ver0 (void); static struct corosync_service_engine_iface_ver0 cpg_service_engine_iface = { .corosync_get_service_engine_ver0 = cpg_get_service_engine_ver0 }; static struct lcr_iface corosync_cpg_ver0[1] = { { .name = "corosync_cpg", .version = 0, .versions_replace = 0, .versions_replace_count = 0, .dependencies = 0, .dependency_count = 0, .constructor = NULL, .destructor = NULL, .interfaces = NULL } }; static struct lcr_comp cpg_comp_ver0 = { .iface_count = 1, .ifaces = corosync_cpg_ver0 }; static struct corosync_service_engine *cpg_get_service_engine_ver0 (void) { return (&cpg_service_engine); } __attribute__ ((constructor)) static void cpg_comp_register (void) { lcr_interfaces_set (&corosync_cpg_ver0[0], &cpg_service_engine_iface); lcr_component_register (&cpg_comp_ver0); } struct req_exec_cpg_procjoin { mar_req_header_t header __attribute__((aligned(8))); mar_cpg_name_t group_name __attribute__((aligned(8))); mar_uint32_t pid __attribute__((aligned(8))); mar_uint32_t reason __attribute__((aligned(8))); }; struct req_exec_cpg_mcast { mar_req_header_t header __attribute__((aligned(8))); mar_cpg_name_t group_name __attribute__((aligned(8))); mar_uint32_t msglen __attribute__((aligned(8))); mar_uint32_t pid __attribute__((aligned(8))); mar_message_source_t source __attribute__((aligned(8))); mar_uint8_t message[] __attribute__((aligned(8))); }; struct req_exec_cpg_downlist { mar_req_header_t header __attribute__((aligned(8))); mar_uint32_t left_nodes __attribute__((aligned(8))); mar_uint32_t nodeids[PROCESSOR_COUNT_MAX] __attribute__((aligned(8))); }; static struct req_exec_cpg_downlist req_exec_cpg_downlist; static void cpg_sync_init (void) { } static int cpg_sync_process (void) { return cpg_exec_send_joinlist(); } static void cpg_sync_activate (void) { } static void cpg_sync_abort (void) { } static int notify_lib_joinlist( struct group_info *gi, void *conn, int joined_list_entries, mar_cpg_address_t *joined_list, int left_list_entries, mar_cpg_address_t *left_list, int id) { int count = 0; char *buf; struct res_lib_cpg_confchg_callback *res; struct list_head *iter; struct list_head *tmp; mar_cpg_address_t *retgi; int size; /* First, we need to know how many nodes are in the list. While we're traversing this list, look for the 'us' entry so we know which connection to send back down */ for (iter = gi->members.next; iter != &gi->members; iter = iter->next) { struct process_info *pi = list_entry(iter, struct process_info, list); if (pi->pid) count++; } log_printf(LOG_LEVEL_DEBUG, "Sending new joinlist (%d elements) to clients\n", count); size = sizeof(struct res_lib_cpg_confchg_callback) + sizeof(mar_cpg_address_t) * (count + left_list_entries + joined_list_entries); buf = alloca(size); if (!buf) return CS_ERR_NO_SPACE; res = (struct res_lib_cpg_confchg_callback *)buf; res->joined_list_entries = joined_list_entries; res->left_list_entries = left_list_entries; retgi = res->member_list; res->header.size = size; res->header.id = id; res->header.error = CS_OK; memcpy(&res->group_name, &gi->group_name, sizeof(mar_cpg_name_t)); /* Build up the message */ count = 0; for (iter = gi->members.next; iter != &gi->members; iter = iter->next) { struct process_info *pi = list_entry(iter, struct process_info, list); if (pi->pid) { /* Processes leaving will be removed AFTER this is done (so that they get their own leave notifications), so exclude them from the members list here */ int i; for (i=0; ipid && left_list[i].nodeid == pi->nodeid) goto next_member; } retgi->nodeid = pi->nodeid; retgi->pid = pi->pid; retgi++; count++; next_member: ; } } res->member_list_entries = count; if (left_list_entries) { memcpy(retgi, left_list, left_list_entries * sizeof(mar_cpg_address_t)); retgi += left_list_entries; } if (joined_list_entries) { memcpy(retgi, joined_list, joined_list_entries * sizeof(mar_cpg_address_t)); retgi += joined_list_entries; } if (conn) { api->ipc_dispatch_send(conn, buf, size); } else { /* Send it to all listeners */ for (iter = gi->members.next, tmp=iter->next; iter != &gi->members; iter = tmp, tmp=iter->next) { struct process_info *pi = list_entry(iter, struct process_info, list); if (pi->trackerconn && (pi->flags & PI_FLAG_MEMBER)) { if (api->ipc_dispatch_send(pi->trackerconn, buf, size) == -1) { // Error ?? } } } } return CS_OK; } static void remove_group(struct group_info *gi) { list_del(&gi->list); free(gi); } static int cpg_exec_init_fn (struct corosync_api_v1 *corosync_api) { int i; for (i=0; iipc_private_data_get (conn); struct group_info *gi = pi->group; mar_cpg_address_t notify_info; log_printf(LOG_LEVEL_DEBUG, "exit_fn for conn=%p\n", conn); if (gi) { notify_info.pid = pi->pid; notify_info.nodeid = api->totem_nodeid_get(); notify_info.reason = CONFCHG_CPG_REASON_PROCDOWN; cpg_node_joinleave_send(gi, pi, MESSAGE_REQ_EXEC_CPG_PROCLEAVE, CONFCHG_CPG_REASON_PROCDOWN); } if (pi->pid) list_del(&pi->list); api->ipc_refcnt_dec (conn); return (0); } static int count_groups(void) { struct list_head *iter; int num_groups = 0; uint32_t hash; for (hash=0 ; hash < GROUP_HASH_SIZE; hash++) { for (iter = group_lists[hash].next; iter != &group_lists[hash]; iter = iter->next) { num_groups++; } } return num_groups; } static struct group_info *get_group(const mar_cpg_name_t *name) { struct list_head *iter; struct group_info *gi = NULL; struct group_info *itergi; uint32_t hash = jhash(name->value, name->length, 0) % GROUP_HASH_SIZE; for (iter = group_lists[hash].next; iter != &group_lists[hash]; iter = iter->next) { itergi = list_entry(iter, struct group_info, list); if (memcmp(itergi->group_name.value, name->value, name->length) == 0) { gi = itergi; break; } } if (!gi) { gi = malloc(sizeof(struct group_info)); if (!gi) { log_printf(LOG_LEVEL_WARNING, "Unable to allocate group_info struct"); return NULL; } memcpy(&gi->group_name, name, sizeof(mar_cpg_name_t)); gi->rg = NULL; list_init(&gi->members); list_add(&gi->list, &group_lists[hash]); } return gi; } static int cpg_node_joinleave_send (struct group_info *gi, struct process_info *pi, int fn, int reason) { struct req_exec_cpg_procjoin req_exec_cpg_procjoin; struct iovec req_exec_cpg_iovec; int result; memcpy(&req_exec_cpg_procjoin.group_name, &gi->group_name, sizeof(mar_cpg_name_t)); req_exec_cpg_procjoin.pid = pi->pid; req_exec_cpg_procjoin.reason = reason; req_exec_cpg_procjoin.header.size = sizeof(req_exec_cpg_procjoin); req_exec_cpg_procjoin.header.id = SERVICE_ID_MAKE(CPG_SERVICE, fn); req_exec_cpg_iovec.iov_base = (char *)&req_exec_cpg_procjoin; req_exec_cpg_iovec.iov_len = sizeof(req_exec_cpg_procjoin); result = api->totem_mcast (&req_exec_cpg_iovec, 1, TOTEM_AGREED); return (result); } static void remove_node_from_groups( unsigned int nodeid, struct list_head *remlist) { int i; struct list_head *iter, *iter2, *tmp; struct process_info *pi; struct group_info *gi; for (i=0; i < GROUP_HASH_SIZE; i++) { for (iter = group_lists[i].next; iter != &group_lists[i]; iter = iter->next) { gi = list_entry(iter, struct group_info, list); for (iter2 = gi->members.next, tmp = iter2->next; iter2 != &gi->members; iter2 = tmp, tmp = iter2->next) { pi = list_entry(iter2, struct process_info, list); if (pi->nodeid == nodeid) { /* Add it to the list of nodes to send notifications for */ if (!gi->rg) { gi->rg = malloc(sizeof(struct removed_group)); if (gi->rg) { list_add(&gi->rg->list, remlist); gi->rg->gi = gi; gi->rg->left_list_entries = 0; gi->rg->left_list_size = PROCESSOR_COUNT_MAX; } else { log_printf(LOG_LEVEL_CRIT, "Unable to allocate removed group struct. CPG callbacks will be junk."); return; } } /* Do we need to increase the size ? * Yes, I increase this exponentially. Generally, if you've got a lot of groups, * you'll have a /lot/ of groups, and cgp_groupinfo is pretty small anyway */ if (gi->rg->left_list_size == gi->rg->left_list_entries) { int newsize; struct removed_group *newrg; list_del(&gi->rg->list); newsize = gi->rg->left_list_size * 2; newrg = realloc(gi->rg, sizeof(struct removed_group) + newsize*sizeof(mar_cpg_address_t)); if (!newrg) { log_printf(LOG_LEVEL_CRIT, "Unable to realloc removed group struct. CPG callbacks will be junk."); return; } newrg->left_list_size = newsize+PROCESSOR_COUNT_MAX; gi->rg = newrg; list_add(&gi->rg->list, remlist); } gi->rg->left_list[gi->rg->left_list_entries].pid = pi->pid; gi->rg->left_list[gi->rg->left_list_entries].nodeid = pi->nodeid; gi->rg->left_list[gi->rg->left_list_entries].reason = CONFCHG_CPG_REASON_NODEDOWN; gi->rg->left_list_entries++; /* Remove node info for dead node */ list_del(&pi->list); free(pi); } } } } } static void cpg_confchg_fn ( enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id) { int i; uint32_t lowest_nodeid = 0xffffffff; struct iovec req_exec_cpg_iovec; /* We don't send the library joinlist in here because it can end up out of order with the rest of the messages (which are totem ordered). So we get the lowest nodeid to send out a list of left nodes instead. On receipt of that message, all nodes will then notify their local clients of the new joinlist */ if (left_list_entries) { for (i = 0; i < member_list_entries; i++) { if (member_list[i] < lowest_nodeid) lowest_nodeid = member_list[i]; } log_printf(LOG_LEVEL_DEBUG, "confchg, low nodeid=%d, us = %d\n", lowest_nodeid, api->totem_nodeid_get()); if (lowest_nodeid == api->totem_nodeid_get()) { req_exec_cpg_downlist.header.id = SERVICE_ID_MAKE(CPG_SERVICE, MESSAGE_REQ_EXEC_CPG_DOWNLIST); req_exec_cpg_downlist.header.size = sizeof(struct req_exec_cpg_downlist); req_exec_cpg_downlist.left_nodes = left_list_entries; for (i = 0; i < left_list_entries; i++) { req_exec_cpg_downlist.nodeids[i] = left_list[i]; } log_printf(LOG_LEVEL_DEBUG, "confchg, build downlist: %lu nodes\n", (long unsigned int) left_list_entries); } } /* Don't send this message until we get the final configuration message */ if (configuration_type == TOTEM_CONFIGURATION_REGULAR && req_exec_cpg_downlist.left_nodes) { req_exec_cpg_iovec.iov_base = (char *)&req_exec_cpg_downlist; req_exec_cpg_iovec.iov_len = req_exec_cpg_downlist.header.size; api->totem_mcast (&req_exec_cpg_iovec, 1, TOTEM_AGREED); req_exec_cpg_downlist.left_nodes = 0; log_printf(LOG_LEVEL_DEBUG, "confchg, sent downlist\n"); } } /* Can byteswap join & leave messages */ static void exec_cpg_procjoin_endian_convert (void *msg) { - struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = (struct req_exec_cpg_procjoin *)msg; + struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = msg; req_exec_cpg_procjoin->pid = swab32(req_exec_cpg_procjoin->pid); swab_mar_cpg_name_t (&req_exec_cpg_procjoin->group_name); req_exec_cpg_procjoin->reason = swab32(req_exec_cpg_procjoin->reason); } static void exec_cpg_joinlist_endian_convert (void *msg_v) { char *msg = msg_v; mar_res_header_t *res = (mar_res_header_t *)msg; struct join_list_entry *jle = (struct join_list_entry *)(msg + sizeof(mar_res_header_t)); /* XXX shouldn't mar_res_header be swabbed? */ while ((const char*)jle < msg + res->size) { jle->pid = swab32(jle->pid); swab_mar_cpg_name_t (&jle->group_name); jle++; } } static void exec_cpg_downlist_endian_convert (void *msg) { - struct req_exec_cpg_downlist *req_exec_cpg_downlist = (struct req_exec_cpg_downlist *)msg; + struct req_exec_cpg_downlist *req_exec_cpg_downlist = msg; unsigned int i; req_exec_cpg_downlist->left_nodes = swab32(req_exec_cpg_downlist->left_nodes); for (i = 0; i < req_exec_cpg_downlist->left_nodes; i++) { req_exec_cpg_downlist->nodeids[i] = swab32(req_exec_cpg_downlist->nodeids[i]); } } static void exec_cpg_mcast_endian_convert (void *msg) { - struct req_exec_cpg_mcast *req_exec_cpg_mcast = (struct req_exec_cpg_mcast *)msg; + struct req_exec_cpg_mcast *req_exec_cpg_mcast = msg; swab_mar_req_header_t (&req_exec_cpg_mcast->header); swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name); req_exec_cpg_mcast->pid = swab32(req_exec_cpg_mcast->pid); req_exec_cpg_mcast->msglen = swab32(req_exec_cpg_mcast->msglen); swab_mar_message_source_t (&req_exec_cpg_mcast->source); } static void do_proc_join( const mar_cpg_name_t *name, uint32_t pid, unsigned int nodeid, int reason) { struct group_info *gi; struct process_info *pi; struct list_head *iter; mar_cpg_address_t notify_info; gi = get_group(name); /* this will always succeed ! */ assert(gi); /* See if it already exists in this group */ for (iter = gi->members.next; iter != &gi->members; iter = iter->next) { pi = list_entry(iter, struct process_info, list); if (pi->pid == pid && pi->nodeid == nodeid) { /* It could be a local join message */ if ((nodeid == api->totem_nodeid_get()) && (!pi->flags & PI_FLAG_MEMBER)) { goto local_join; } else { return; } } } pi = malloc(sizeof(struct process_info)); if (!pi) { log_printf(LOG_LEVEL_WARNING, "Unable to allocate process_info struct"); return; } pi->nodeid = nodeid; pi->pid = pid; pi->group = gi; pi->conn = NULL; pi->trackerconn = NULL; list_add_tail(&pi->list, &gi->members); local_join: pi->flags = PI_FLAG_MEMBER; notify_info.pid = pi->pid; notify_info.nodeid = nodeid; notify_info.reason = reason; notify_lib_joinlist(gi, NULL, 1, ¬ify_info, 0, NULL, MESSAGE_RES_CPG_CONFCHG_CALLBACK); } static void message_handler_req_exec_cpg_downlist ( const void *message, unsigned int nodeid) { const struct req_exec_cpg_downlist *req_exec_cpg_downlist = message; int i; struct list_head removed_list; log_printf(LOG_LEVEL_DEBUG, "downlist left_list: %d\n", req_exec_cpg_downlist->left_nodes); list_init(&removed_list); /* Remove nodes from joined groups and add removed groups to the list */ for (i = 0; i < req_exec_cpg_downlist->left_nodes; i++) { remove_node_from_groups( req_exec_cpg_downlist->nodeids[i], &removed_list); } if (!list_empty(&removed_list)) { struct list_head *iter, *tmp; for (iter = removed_list.next, tmp=iter->next; iter != &removed_list; iter = tmp, tmp = iter->next) { struct removed_group *rg = list_entry(iter, struct removed_group, list); notify_lib_joinlist(rg->gi, NULL, 0, NULL, rg->left_list_entries, rg->left_list, MESSAGE_RES_CPG_CONFCHG_CALLBACK); rg->gi->rg = NULL; free(rg); } } } static void message_handler_req_exec_cpg_procjoin ( const void *message, unsigned int nodeid) { const struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = message; log_printf(LOG_LEVEL_DEBUG, "got procjoin message from cluster node %d\n", nodeid); do_proc_join(&req_exec_cpg_procjoin->group_name, req_exec_cpg_procjoin->pid, nodeid, CONFCHG_CPG_REASON_JOIN); } static void message_handler_req_exec_cpg_procleave ( const void *message, unsigned int nodeid) { const struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = message; struct group_info *gi; struct process_info *pi; struct list_head *iter; mar_cpg_address_t notify_info; log_printf(LOG_LEVEL_DEBUG, "got procleave message from cluster node %d\n", nodeid); gi = get_group(&req_exec_cpg_procjoin->group_name); /* this will always succeed ! */ assert(gi); notify_info.pid = req_exec_cpg_procjoin->pid; notify_info.nodeid = nodeid; notify_info.reason = req_exec_cpg_procjoin->reason; notify_lib_joinlist(gi, NULL, 0, NULL, 1, ¬ify_info, MESSAGE_RES_CPG_CONFCHG_CALLBACK); /* Find the node/PID to remove */ for (iter = gi->members.next; iter != &gi->members; iter = iter->next) { pi = list_entry(iter, struct process_info, list); if (pi->pid == req_exec_cpg_procjoin->pid && pi->nodeid == nodeid) { list_del(&pi->list); if (!pi->conn) free(pi); else pi->pid = 0; if (list_empty(&gi->members)) { remove_group(gi); } break; } } } /* Got a proclist from another node */ static void message_handler_req_exec_cpg_joinlist ( const void *message_v, unsigned int nodeid) { const char *message = message_v; const mar_res_header_t *res = (const mar_res_header_t *)message; const struct join_list_entry *jle = (const struct join_list_entry *)(message + sizeof(mar_res_header_t)); log_printf(LOG_LEVEL_NOTICE, "got joinlist message from node %d\n", nodeid); /* Ignore our own messages */ if (nodeid == api->totem_nodeid_get()) { return; } while ((const char*)jle < message + res->size) { do_proc_join(&jle->group_name, jle->pid, nodeid, CONFCHG_CPG_REASON_NODEUP); jle++; } } static void message_handler_req_exec_cpg_mcast ( const void *message, unsigned int nodeid) { const struct req_exec_cpg_mcast *req_exec_cpg_mcast = message; struct res_lib_cpg_deliver_callback *res_lib_cpg_mcast; int msglen = req_exec_cpg_mcast->msglen; char buf[sizeof(*res_lib_cpg_mcast) + msglen]; struct group_info *gi; struct list_head *iter; /* * Track local messages so that flow is controlled on the local node */ gi = get_group(&req_exec_cpg_mcast->group_name); /* this will always succeed ! */ assert(gi); res_lib_cpg_mcast = (struct res_lib_cpg_deliver_callback *)buf; res_lib_cpg_mcast->header.id = MESSAGE_RES_CPG_DELIVER_CALLBACK; res_lib_cpg_mcast->header.size = sizeof(*res_lib_cpg_mcast) + msglen; res_lib_cpg_mcast->msglen = msglen; res_lib_cpg_mcast->pid = req_exec_cpg_mcast->pid; res_lib_cpg_mcast->nodeid = nodeid; if (api->ipc_source_is_local (&req_exec_cpg_mcast->source)) { api->ipc_refcnt_dec (req_exec_cpg_mcast->source.conn); } memcpy(&res_lib_cpg_mcast->group_name, &gi->group_name, sizeof(mar_cpg_name_t)); memcpy(&res_lib_cpg_mcast->message, (const char*)message+sizeof(*req_exec_cpg_mcast), msglen); /* Send to all interested members */ for (iter = gi->members.next; iter != &gi->members; iter = iter->next) { struct process_info *pi = list_entry(iter, struct process_info, list); if (pi->trackerconn && (pi->flags & PI_FLAG_MEMBER)) { api->ipc_dispatch_send( pi->trackerconn, buf, res_lib_cpg_mcast->header.size); } } } static int cpg_exec_send_joinlist(void) { int count = 0; char *buf; int i; struct list_head *iter; struct list_head *iter2; struct group_info *gi; mar_res_header_t *res; struct join_list_entry *jle; struct iovec req_exec_cpg_iovec; log_printf(LOG_LEVEL_DEBUG, "sending joinlist to cluster\n"); /* Count the number of groups we are a member of */ for (i=0; inext) { gi = list_entry(iter, struct group_info, list); for (iter2 = gi->members.next; iter2 != &gi->members; iter2 = iter2->next) { struct process_info *pi = list_entry(iter2, struct process_info, list); if (pi->pid && pi->nodeid == api->totem_nodeid_get()) { count++; } } } } /* Nothing to send */ if (!count) return 0; buf = alloca(sizeof(mar_res_header_t) + sizeof(struct join_list_entry) * count); if (!buf) { log_printf(LOG_LEVEL_WARNING, "Unable to allocate joinlist buffer"); return -1; } jle = (struct join_list_entry *)(buf + sizeof(mar_res_header_t)); res = (mar_res_header_t *)buf; for (i=0; inext) { gi = list_entry(iter, struct group_info, list); for (iter2 = gi->members.next; iter2 != &gi->members; iter2 = iter2->next) { struct process_info *pi = list_entry(iter2, struct process_info, list); if (pi->pid && pi->nodeid == api->totem_nodeid_get()) { memcpy(&jle->group_name, &gi->group_name, sizeof(mar_cpg_name_t)); jle->pid = pi->pid; jle++; } } } } res->id = SERVICE_ID_MAKE(CPG_SERVICE, MESSAGE_REQ_EXEC_CPG_JOINLIST); res->size = sizeof(mar_res_header_t)+sizeof(struct join_list_entry) * count; req_exec_cpg_iovec.iov_base = buf; req_exec_cpg_iovec.iov_len = res->size; return (api->totem_mcast (&req_exec_cpg_iovec, 1, TOTEM_AGREED)); } static int cpg_lib_init_fn (void *conn) { struct process_info *pi = (struct process_info *)api->ipc_private_data_get (conn); api->ipc_refcnt_inc (conn); pi->conn = conn; log_printf(LOG_LEVEL_DEBUG, "lib_init_fn: conn=%p, pi=%p\n", conn, pi); return (0); } /* Join message from the library */ -static void message_handler_req_lib_cpg_join (void *conn, void *message) +static void message_handler_req_lib_cpg_join (void *conn, const void *message) { - struct req_lib_cpg_join *req_lib_cpg_join = (struct req_lib_cpg_join *)message; + const struct req_lib_cpg_join *req_lib_cpg_join = message; struct process_info *pi = (struct process_info *)api->ipc_private_data_get (conn); struct res_lib_cpg_join res_lib_cpg_join; struct group_info *gi; cs_error_t error = CS_OK; log_printf(LOG_LEVEL_DEBUG, "got join request on %p, pi=%p, pi->pid=%d\n", conn, pi, pi->pid); /* Already joined on this conn */ if (pi->pid) { error = CS_ERR_INVALID_PARAM; goto join_err; } gi = get_group(&req_lib_cpg_join->group_name); if (!gi) { error = CS_ERR_NO_SPACE; goto join_err; } /* Add a node entry for us */ pi->nodeid = api->totem_nodeid_get(); pi->pid = req_lib_cpg_join->pid; pi->group = gi; list_add(&pi->list, &gi->members); /* Tell the rest of the cluster */ cpg_node_joinleave_send(gi, pi, MESSAGE_REQ_EXEC_CPG_PROCJOIN, CONFCHG_CPG_REASON_JOIN); join_err: res_lib_cpg_join.header.size = sizeof(res_lib_cpg_join); res_lib_cpg_join.header.id = MESSAGE_RES_CPG_JOIN; res_lib_cpg_join.header.error = error; api->ipc_response_send(conn, &res_lib_cpg_join, sizeof(res_lib_cpg_join)); } /* Leave message from the library */ -static void message_handler_req_lib_cpg_leave (void *conn, void *message) +static void message_handler_req_lib_cpg_leave (void *conn, const void *message) { struct process_info *pi = (struct process_info *)api->ipc_private_data_get (conn); struct res_lib_cpg_leave res_lib_cpg_leave; struct group_info *gi; cs_error_t error = CS_OK; log_printf(LOG_LEVEL_DEBUG, "got leave request on %p\n", conn); if (!pi || !pi->pid || !pi->group) { error = CS_ERR_INVALID_PARAM; goto leave_ret; } gi = pi->group; /* Tell other nodes we are leaving. When we get this message back we will leave too */ cpg_node_joinleave_send(gi, pi, MESSAGE_REQ_EXEC_CPG_PROCLEAVE, CONFCHG_CPG_REASON_LEAVE); pi->group = NULL; leave_ret: /* send return */ res_lib_cpg_leave.header.size = sizeof(res_lib_cpg_leave); res_lib_cpg_leave.header.id = MESSAGE_RES_CPG_LEAVE; res_lib_cpg_leave.header.error = error; api->ipc_response_send(conn, &res_lib_cpg_leave, sizeof(res_lib_cpg_leave)); } /* Mcast message from the library */ -static void message_handler_req_lib_cpg_mcast (void *conn, void *message) +static void message_handler_req_lib_cpg_mcast (void *conn, const void *message) { - struct req_lib_cpg_mcast *req_lib_cpg_mcast = (struct req_lib_cpg_mcast *)message; + const struct req_lib_cpg_mcast *req_lib_cpg_mcast = message; struct process_info *pi = (struct process_info *)api->ipc_private_data_get (conn); struct group_info *gi = pi->group; struct iovec req_exec_cpg_iovec[2]; struct req_exec_cpg_mcast req_exec_cpg_mcast; struct res_lib_cpg_mcast res_lib_cpg_mcast; int msglen = req_lib_cpg_mcast->msglen; int result; log_printf(LOG_LEVEL_DEBUG, "got mcast request on %p\n", conn); /* Can't send if we're not joined */ if (!gi) { res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast); res_lib_cpg_mcast.header.id = MESSAGE_RES_CPG_MCAST; res_lib_cpg_mcast.header.error = CS_ERR_ACCESS; /* TODO Better error code ?? */ api->ipc_response_send(conn, &res_lib_cpg_mcast, sizeof(res_lib_cpg_mcast)); return; } req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + msglen; req_exec_cpg_mcast.header.id = SERVICE_ID_MAKE(CPG_SERVICE, MESSAGE_REQ_EXEC_CPG_MCAST); req_exec_cpg_mcast.pid = pi->pid; req_exec_cpg_mcast.msglen = msglen; api->ipc_source_set (&req_exec_cpg_mcast.source, conn); memcpy(&req_exec_cpg_mcast.group_name, &gi->group_name, sizeof(mar_cpg_name_t)); req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast; req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast); req_exec_cpg_iovec[1].iov_base = (char *)&req_lib_cpg_mcast->message; req_exec_cpg_iovec[1].iov_len = msglen; // TODO: guarantee type... result = api->totem_mcast (req_exec_cpg_iovec, 2, TOTEM_AGREED); api->ipc_refcnt_inc (conn); res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast); res_lib_cpg_mcast.header.id = MESSAGE_RES_CPG_MCAST; res_lib_cpg_mcast.header.error = CS_OK; api->ipc_response_send(conn, &res_lib_cpg_mcast, sizeof(res_lib_cpg_mcast)); } -static void message_handler_req_lib_cpg_membership (void *conn, void *message) +static void message_handler_req_lib_cpg_membership (void *conn, + const void *message) { struct process_info *pi = (struct process_info *)api->ipc_private_data_get (conn); log_printf(LOG_LEVEL_DEBUG, "got membership request on %p\n", conn); if (!pi->group) { mar_res_header_t res; res.size = sizeof(res); res.id = MESSAGE_RES_CPG_MEMBERSHIP; res.error = CS_ERR_ACCESS; /* TODO Better error code */ api->ipc_response_send(conn, &res, sizeof(res)); return; } notify_lib_joinlist(pi->group, conn, 0, NULL, 0, NULL, MESSAGE_RES_CPG_MEMBERSHIP); } -static void message_handler_req_lib_cpg_trackstart (void *conn, void *message) +static void message_handler_req_lib_cpg_trackstart (void *conn, + const void *message) { - struct req_lib_cpg_trackstart *req_lib_cpg_trackstart = (struct req_lib_cpg_trackstart *)message; + const struct req_lib_cpg_trackstart *req_lib_cpg_trackstart = message; struct res_lib_cpg_trackstart res_lib_cpg_trackstart; struct group_info *gi; struct process_info *otherpi; cs_error_t error = CS_OK; log_printf(LOG_LEVEL_DEBUG, "got trackstart request on %p\n", conn); gi = get_group(&req_lib_cpg_trackstart->group_name); if (!gi) { error = CS_ERR_NO_SPACE; goto tstart_ret; } /* Find the partner connection and add us to it's process_info struct */ otherpi = (struct process_info *)api->ipc_private_data_get (conn); otherpi->trackerconn = conn; tstart_ret: res_lib_cpg_trackstart.header.size = sizeof(res_lib_cpg_trackstart); res_lib_cpg_trackstart.header.id = MESSAGE_RES_CPG_TRACKSTART; res_lib_cpg_trackstart.header.error = CS_OK; api->ipc_response_send(conn, &res_lib_cpg_trackstart, sizeof(res_lib_cpg_trackstart)); } -static void message_handler_req_lib_cpg_trackstop (void *conn, void *message) +static void message_handler_req_lib_cpg_trackstop (void *conn, + const void *message) { - struct req_lib_cpg_trackstop *req_lib_cpg_trackstop = (struct req_lib_cpg_trackstop *)message; + const struct req_lib_cpg_trackstop *req_lib_cpg_trackstop = message; struct res_lib_cpg_trackstop res_lib_cpg_trackstop; struct process_info *otherpi; struct group_info *gi; cs_error_t error = CS_OK; log_printf(LOG_LEVEL_DEBUG, "got trackstop request on %p\n", conn); gi = get_group(&req_lib_cpg_trackstop->group_name); if (!gi) { error = CS_ERR_NO_SPACE; goto tstop_ret; } /* Find the partner connection and add us to it's process_info struct */ otherpi = (struct process_info *)api->ipc_private_data_get (conn); otherpi->trackerconn = NULL; tstop_ret: res_lib_cpg_trackstop.header.size = sizeof(res_lib_cpg_trackstop); res_lib_cpg_trackstop.header.id = MESSAGE_RES_CPG_TRACKSTOP; res_lib_cpg_trackstop.header.error = CS_OK; api->ipc_response_send(conn, &res_lib_cpg_trackstop.header, sizeof(res_lib_cpg_trackstop)); } -static void message_handler_req_lib_cpg_local_get (void *conn, void *message) +static void message_handler_req_lib_cpg_local_get (void *conn, + const void *message) { struct res_lib_cpg_local_get res_lib_cpg_local_get; res_lib_cpg_local_get.header.size = sizeof(res_lib_cpg_local_get); res_lib_cpg_local_get.header.id = MESSAGE_RES_CPG_LOCAL_GET; res_lib_cpg_local_get.header.error = CS_OK; res_lib_cpg_local_get.local_nodeid = api->totem_nodeid_get (); api->ipc_response_send(conn, &res_lib_cpg_local_get, sizeof(res_lib_cpg_local_get)); } -static void message_handler_req_lib_cpg_groups_get (void *conn, void *message) +static void message_handler_req_lib_cpg_groups_get (void *conn, + const void *message) { struct res_lib_cpg_groups_get res_lib_cpg_groups_get; res_lib_cpg_groups_get.header.size = sizeof(res_lib_cpg_groups_get); res_lib_cpg_groups_get.header.id = MESSAGE_RES_CPG_GROUPS_GET; res_lib_cpg_groups_get.header.error = CS_OK; res_lib_cpg_groups_get.num_groups = count_groups(); api->ipc_response_send(conn, &res_lib_cpg_groups_get, sizeof(res_lib_cpg_groups_get)); } diff --git a/services/evs.c b/services/evs.c index 389af981..24fff4dd 100644 --- a/services/evs.c +++ b/services/evs.c @@ -1,527 +1,527 @@ /* * Copyright (c) 2004-2006 MontaVista Software, Inc. * Copyright (c) 2006-2009 Red Hat, Inc. * * All rights reserved. * * Author: Steven Dake (sdake@redhat.com) * * This software licensed under BSD license, the text of which follows: - * + * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include LOGSYS_DECLARE_SUBSYS ("EVS", LOG_INFO); enum evs_exec_message_req_types { MESSAGE_REQ_EXEC_EVS_MCAST = 0 }; /* * Service Interfaces required by service_message_handler struct */ static int evs_exec_init_fn ( struct corosync_api_v1 *corosync_api); static void evs_confchg_fn ( enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id); static void message_handler_req_exec_mcast (const void *msg, unsigned int nodeid); static void req_exec_mcast_endian_convert (void *msg); -static void message_handler_req_evs_join (void *conn, void *msg); -static void message_handler_req_evs_leave (void *conn, void *msg); -static void message_handler_req_evs_mcast_joined (void *conn, void *msg); -static void message_handler_req_evs_mcast_groups (void *conn, void *msg); -static void message_handler_req_evs_membership_get (void *conn, void *msg); +static void message_handler_req_evs_join (void *conn, const void *msg); +static void message_handler_req_evs_leave (void *conn, const void *msg); +static void message_handler_req_evs_mcast_joined (void *conn, const void *msg); +static void message_handler_req_evs_mcast_groups (void *conn, const void *msg); +static void message_handler_req_evs_membership_get (void *conn, const void *msg); static int evs_lib_init_fn (void *conn); static int evs_lib_exit_fn (void *conn); struct evs_pd { struct evs_group *groups; int group_entries; struct list_head list; void *conn; }; - + static struct corosync_api_v1 *api; static struct corosync_lib_handler evs_lib_engine[] = { { /* 0 */ .lib_handler_fn = message_handler_req_evs_join, .response_size = sizeof (struct res_lib_evs_join), .response_id = MESSAGE_RES_EVS_JOIN, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 1 */ .lib_handler_fn = message_handler_req_evs_leave, .response_size = sizeof (struct res_lib_evs_leave), .response_id = MESSAGE_RES_EVS_LEAVE, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 2 */ .lib_handler_fn = message_handler_req_evs_mcast_joined, .response_size = sizeof (struct res_lib_evs_mcast_joined), .response_id = MESSAGE_RES_EVS_MCAST_JOINED, .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED }, { /* 3 */ .lib_handler_fn = message_handler_req_evs_mcast_groups, .response_size = sizeof (struct res_lib_evs_mcast_groups), .response_id = MESSAGE_RES_EVS_MCAST_GROUPS, .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED }, { /* 4 */ .lib_handler_fn = message_handler_req_evs_membership_get, .response_size = sizeof (struct res_lib_evs_membership_get), .response_id = MESSAGE_RES_EVS_MEMBERSHIP_GET, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED } }; static struct corosync_exec_handler evs_exec_engine[] = { { .exec_handler_fn = message_handler_req_exec_mcast, .exec_endian_convert_fn = req_exec_mcast_endian_convert } }; struct corosync_service_engine evs_service_engine = { .name = "corosync extended virtual synchrony service", .id = EVS_SERVICE, .private_data_size = sizeof (struct evs_pd), - .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED, + .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED, .lib_init_fn = evs_lib_init_fn, .lib_exit_fn = evs_lib_exit_fn, .lib_engine = evs_lib_engine, .lib_engine_count = sizeof (evs_lib_engine) / sizeof (struct corosync_lib_handler), .exec_engine = evs_exec_engine, .exec_engine_count = sizeof (evs_exec_engine) / sizeof (struct corosync_exec_handler), .confchg_fn = evs_confchg_fn, .exec_init_fn = evs_exec_init_fn, .exec_dump_fn = NULL }; static DECLARE_LIST_INIT (confchg_notify); /* * Dynamic loading descriptor */ static struct corosync_service_engine *evs_get_service_engine_ver0 (void); static struct corosync_service_engine_iface_ver0 evs_service_engine_iface = { .corosync_get_service_engine_ver0 = evs_get_service_engine_ver0 }; static struct lcr_iface corosync_evs_ver0[1] = { { .name = "corosync_evs", .version = 0, .versions_replace = 0, .versions_replace_count = 0, .dependencies = 0, .dependency_count = 0, .constructor = NULL, .destructor = NULL, .interfaces = NULL, } }; static struct lcr_comp evs_comp_ver0 = { .iface_count = 1, .ifaces = corosync_evs_ver0 }; static struct corosync_service_engine *evs_get_service_engine_ver0 (void) { return (&evs_service_engine); } __attribute__ ((constructor)) static void evs_comp_register (void) { lcr_interfaces_set (&corosync_evs_ver0[0], &evs_service_engine_iface); lcr_component_register (&evs_comp_ver0); } static int evs_exec_init_fn ( struct corosync_api_v1 *corosync_api) { api = corosync_api; return 0; } struct res_evs_confchg_callback res_evs_confchg_callback; static void evs_confchg_fn ( enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id) { struct list_head *list; struct evs_pd *evs_pd; /* * Build configuration change message */ res_evs_confchg_callback.header.size = sizeof (struct res_evs_confchg_callback); res_evs_confchg_callback.header.id = MESSAGE_RES_EVS_CONFCHG_CALLBACK; res_evs_confchg_callback.header.error = CS_OK; memcpy (res_evs_confchg_callback.member_list, member_list, member_list_entries * sizeof(*member_list)); res_evs_confchg_callback.member_list_entries = member_list_entries; memcpy (res_evs_confchg_callback.left_list, left_list, left_list_entries * sizeof(*left_list)); res_evs_confchg_callback.left_list_entries = left_list_entries; memcpy (res_evs_confchg_callback.joined_list, joined_list, joined_list_entries * sizeof(*joined_list)); res_evs_confchg_callback.joined_list_entries = joined_list_entries; /* * Send configuration change message to every EVS library user */ for (list = confchg_notify.next; list != &confchg_notify; list = list->next) { evs_pd = list_entry (list, struct evs_pd, list); api->ipc_response_send (evs_pd->conn, &res_evs_confchg_callback, sizeof (res_evs_confchg_callback)); } } static int evs_lib_init_fn (void *conn) { struct evs_pd *evs_pd = (struct evs_pd *)api->ipc_private_data_get (conn); log_printf (LOG_LEVEL_DEBUG, "Got request to initalize evs service.\n"); evs_pd->groups = NULL; evs_pd->group_entries = 0; evs_pd->conn = conn; list_init (&evs_pd->list); list_add (&evs_pd->list, &confchg_notify); api->ipc_response_send (conn, &res_evs_confchg_callback, sizeof (res_evs_confchg_callback)); return (0); } static int evs_lib_exit_fn (void *conn) { struct evs_pd *evs_pd = (struct evs_pd *)api->ipc_private_data_get (conn); list_del (&evs_pd->list); return (0); } -static void message_handler_req_evs_join (void *conn, void *msg) +static void message_handler_req_evs_join (void *conn, const void *msg) { cs_error_t error = CS_OK; - struct req_lib_evs_join *req_lib_evs_join = (struct req_lib_evs_join *)msg; + const struct req_lib_evs_join *req_lib_evs_join = msg; struct res_lib_evs_join res_lib_evs_join; void *addr; struct evs_pd *evs_pd = (struct evs_pd *)api->ipc_private_data_get (conn); if (req_lib_evs_join->group_entries > 50) { error = CS_ERR_TOO_MANY_GROUPS; goto exit_error; } - addr = realloc (evs_pd->groups, sizeof (struct evs_group) * + addr = realloc (evs_pd->groups, sizeof (struct evs_group) * (evs_pd->group_entries + req_lib_evs_join->group_entries)); if (addr == NULL) { error = CS_ERR_NO_MEMORY; goto exit_error; } evs_pd->groups = addr; memcpy (&evs_pd->groups[evs_pd->group_entries], req_lib_evs_join->groups, sizeof (struct evs_group) * req_lib_evs_join->group_entries); evs_pd->group_entries += req_lib_evs_join->group_entries; exit_error: res_lib_evs_join.header.size = sizeof (struct res_lib_evs_join); res_lib_evs_join.header.id = MESSAGE_RES_EVS_JOIN; res_lib_evs_join.header.error = error; api->ipc_response_send (conn, &res_lib_evs_join, sizeof (struct res_lib_evs_join)); } -static void message_handler_req_evs_leave (void *conn, void *msg) +static void message_handler_req_evs_leave (void *conn, const void *msg) { - struct req_lib_evs_leave *req_lib_evs_leave = (struct req_lib_evs_leave *)msg; + const struct req_lib_evs_leave *req_lib_evs_leave = msg; struct res_lib_evs_leave res_lib_evs_leave; cs_error_t error = CS_OK; int error_index; int i, j; int found; struct evs_pd *evs_pd = (struct evs_pd *)api->ipc_private_data_get (conn); for (i = 0; i < req_lib_evs_leave->group_entries; i++) { found = 0; for (j = 0; j < evs_pd->group_entries;) { if (memcmp (&req_lib_evs_leave->groups[i], &evs_pd->groups[j], sizeof (struct evs_group)) == 0) { /* * Delete entry */ memmove (&evs_pd->groups[j], &evs_pd->groups[j + 1], (evs_pd->group_entries - j - 1) * sizeof (struct evs_group)); evs_pd->group_entries -= 1; found = 1; break; } else { j++; } } if (found == 0) { error = CS_ERR_NOT_EXIST; error_index = i; break; } } res_lib_evs_leave.header.size = sizeof (struct res_lib_evs_leave); res_lib_evs_leave.header.id = MESSAGE_RES_EVS_LEAVE; res_lib_evs_leave.header.error = error; api->ipc_response_send (conn, &res_lib_evs_leave, sizeof (struct res_lib_evs_leave)); } -static void message_handler_req_evs_mcast_joined (void *conn, void *msg) +static void message_handler_req_evs_mcast_joined (void *conn, const void *msg) { cs_error_t error = CS_ERR_TRY_AGAIN; - struct req_lib_evs_mcast_joined *req_lib_evs_mcast_joined = (struct req_lib_evs_mcast_joined *)msg; + const struct req_lib_evs_mcast_joined *req_lib_evs_mcast_joined = msg; struct res_lib_evs_mcast_joined res_lib_evs_mcast_joined; struct iovec req_exec_evs_mcast_iovec[3]; struct req_exec_evs_mcast req_exec_evs_mcast; int res; struct evs_pd *evs_pd = (struct evs_pd *)api->ipc_private_data_get (conn); req_exec_evs_mcast.header.size = sizeof (struct req_exec_evs_mcast) + evs_pd->group_entries * sizeof (struct evs_group) + req_lib_evs_mcast_joined->msg_len; req_exec_evs_mcast.header.id = SERVICE_ID_MAKE (EVS_SERVICE, MESSAGE_REQ_EXEC_EVS_MCAST); req_exec_evs_mcast.msg_len = req_lib_evs_mcast_joined->msg_len; req_exec_evs_mcast.group_entries = evs_pd->group_entries; req_exec_evs_mcast_iovec[0].iov_base = (char *)&req_exec_evs_mcast; req_exec_evs_mcast_iovec[0].iov_len = sizeof (req_exec_evs_mcast); req_exec_evs_mcast_iovec[1].iov_base = (char *)evs_pd->groups; req_exec_evs_mcast_iovec[1].iov_len = evs_pd->group_entries * sizeof (struct evs_group); req_exec_evs_mcast_iovec[2].iov_base = (char *)&req_lib_evs_mcast_joined->msg; req_exec_evs_mcast_iovec[2].iov_len = req_lib_evs_mcast_joined->msg_len; res = api->totem_mcast (req_exec_evs_mcast_iovec, 3, TOTEM_AGREED); // TODO if (res == 0) { error = CS_OK; } res_lib_evs_mcast_joined.header.size = sizeof (struct res_lib_evs_mcast_joined); res_lib_evs_mcast_joined.header.id = MESSAGE_RES_EVS_MCAST_JOINED; res_lib_evs_mcast_joined.header.error = error; api->ipc_response_send (conn, &res_lib_evs_mcast_joined, sizeof (struct res_lib_evs_mcast_joined)); } -static void message_handler_req_evs_mcast_groups (void *conn, void *msg) +static void message_handler_req_evs_mcast_groups (void *conn, const void *msg) { cs_error_t error = CS_ERR_TRY_AGAIN; - struct req_lib_evs_mcast_groups *req_lib_evs_mcast_groups = (struct req_lib_evs_mcast_groups *)msg; + const struct req_lib_evs_mcast_groups *req_lib_evs_mcast_groups = msg; struct res_lib_evs_mcast_groups res_lib_evs_mcast_groups; struct iovec req_exec_evs_mcast_iovec[3]; struct req_exec_evs_mcast req_exec_evs_mcast; - char *msg_addr; + const char *msg_addr; int res; req_exec_evs_mcast.header.size = sizeof (struct req_exec_evs_mcast) + sizeof (struct evs_group) * req_lib_evs_mcast_groups->group_entries + req_lib_evs_mcast_groups->msg_len; req_exec_evs_mcast.header.id = SERVICE_ID_MAKE (EVS_SERVICE, MESSAGE_REQ_EXEC_EVS_MCAST); req_exec_evs_mcast.msg_len = req_lib_evs_mcast_groups->msg_len; req_exec_evs_mcast.group_entries = req_lib_evs_mcast_groups->group_entries; - msg_addr = (char *)req_lib_evs_mcast_groups + - sizeof (struct req_lib_evs_mcast_groups) + + msg_addr = (const char *)req_lib_evs_mcast_groups + + sizeof (struct req_lib_evs_mcast_groups) + (sizeof (struct evs_group) * req_lib_evs_mcast_groups->group_entries); req_exec_evs_mcast_iovec[0].iov_base = (char *)&req_exec_evs_mcast; req_exec_evs_mcast_iovec[0].iov_len = sizeof (req_exec_evs_mcast); req_exec_evs_mcast_iovec[1].iov_base = (char *)&req_lib_evs_mcast_groups->groups; req_exec_evs_mcast_iovec[1].iov_len = sizeof (struct evs_group) * req_lib_evs_mcast_groups->group_entries; - req_exec_evs_mcast_iovec[2].iov_base = msg_addr; + req_exec_evs_mcast_iovec[2].iov_base = (void *) msg_addr; /* discard const */ req_exec_evs_mcast_iovec[2].iov_len = req_lib_evs_mcast_groups->msg_len; - + res = api->totem_mcast (req_exec_evs_mcast_iovec, 3, TOTEM_AGREED); if (res == 0) { error = CS_OK; } res_lib_evs_mcast_groups.header.size = sizeof (struct res_lib_evs_mcast_groups); res_lib_evs_mcast_groups.header.id = MESSAGE_RES_EVS_MCAST_GROUPS; res_lib_evs_mcast_groups.header.error = error; api->ipc_response_send (conn, &res_lib_evs_mcast_groups, sizeof (struct res_lib_evs_mcast_groups)); } -static void message_handler_req_evs_membership_get (void *conn, void *msg) +static void message_handler_req_evs_membership_get (void *conn, const void *msg) { struct res_lib_evs_membership_get res_lib_evs_membership_get; res_lib_evs_membership_get.header.size = sizeof (struct res_lib_evs_membership_get); res_lib_evs_membership_get.header.id = MESSAGE_RES_EVS_MEMBERSHIP_GET; res_lib_evs_membership_get.header.error = CS_OK; res_lib_evs_membership_get.local_nodeid = api->totem_nodeid_get (); memcpy (&res_lib_evs_membership_get.member_list, &res_evs_confchg_callback.member_list, sizeof (res_lib_evs_membership_get.member_list)); res_lib_evs_membership_get.member_list_entries = res_evs_confchg_callback.member_list_entries; api->ipc_response_send (conn, &res_lib_evs_membership_get, sizeof (struct res_lib_evs_membership_get)); } static void req_exec_mcast_endian_convert (void *msg) { struct req_exec_evs_mcast *req_exec_evs_mcast = (struct req_exec_evs_mcast *)msg; req_exec_evs_mcast->group_entries = swab32 (req_exec_evs_mcast->group_entries); req_exec_evs_mcast->msg_len = swab32 (req_exec_evs_mcast->msg_len); } static void message_handler_req_exec_mcast ( const void *msg, unsigned int nodeid) { const struct req_exec_evs_mcast *req_exec_evs_mcast = msg; struct res_evs_deliver_callback res_evs_deliver_callback; const char *msg_addr; struct list_head *list; int found = 0; int i, j; struct evs_pd *evs_pd; struct iovec iov[2]; res_evs_deliver_callback.header.size = sizeof (struct res_evs_deliver_callback) + req_exec_evs_mcast->msg_len; res_evs_deliver_callback.header.id = MESSAGE_RES_EVS_DELIVER_CALLBACK; res_evs_deliver_callback.header.error = CS_OK; res_evs_deliver_callback.msglen = req_exec_evs_mcast->msg_len; msg_addr = (const char *)req_exec_evs_mcast + sizeof (struct req_exec_evs_mcast) + (sizeof (struct evs_group) * req_exec_evs_mcast->group_entries); for (list = confchg_notify.next; list != &confchg_notify; list = list->next) { found = 0; evs_pd = list_entry (list, struct evs_pd, list); for (i = 0; i < evs_pd->group_entries; i++) { for (j = 0; j < req_exec_evs_mcast->group_entries; j++) { if (memcmp (&evs_pd->groups[i], &req_exec_evs_mcast->groups[j], sizeof (struct evs_group)) == 0) { found = 1; break; } } if (found) { break; } } if (found) { res_evs_deliver_callback.local_nodeid = nodeid; iov[0].iov_base = &res_evs_deliver_callback; iov[0].iov_len = sizeof (struct res_evs_deliver_callback); iov[1].iov_base = (void *) msg_addr; /* discard const */ iov[1].iov_len = req_exec_evs_mcast->msg_len; api->ipc_dispatch_iov_send ( evs_pd->conn, iov, 2); } } } diff --git a/services/pload.c b/services/pload.c index 2dbe974b..424abe6e 100644 --- a/services/pload.c +++ b/services/pload.c @@ -1,359 +1,359 @@ /* * Copyright (c) 2008-2009 Red Hat, Inc. * * All rights reserved. * * Author: Steven Dake (sdake@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include LOGSYS_DECLARE_SUBSYS ("PLOAD", LOG_INFO); enum pload_exec_message_req_types { MESSAGE_REQ_EXEC_PLOAD_START = 0, MESSAGE_REQ_EXEC_PLOAD_MCAST = 1 }; /* * Service Interfaces required by service_message_handler struct */ static int pload_exec_init_fn ( struct corosync_api_v1 *corosync_api); static void pload_confchg_fn ( enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id); static void message_handler_req_exec_pload_start (const void *msg, unsigned int nodeid); static void message_handler_req_exec_pload_mcast (const void *msg, unsigned int nodeid); static void req_exec_pload_start_endian_convert (void *msg); static void req_exec_pload_mcast_endian_convert (void *msg); -static void message_handler_req_pload_start (void *conn, void *msg); +static void message_handler_req_pload_start (void *conn, const void *msg); static int pload_lib_init_fn (void *conn); static int pload_lib_exit_fn (void *conn); static char buffer[1000000]; static unsigned int msgs_delivered = 0; static unsigned int msgs_wanted = 0; static unsigned int msg_size = 0; static unsigned int msg_code = 1; static unsigned int msgs_sent = 0; static struct corosync_api_v1 *api; struct req_exec_pload_start { mar_req_header_t header; unsigned int msg_code; unsigned int msg_count; unsigned int msg_size; unsigned int time_interval; }; struct req_exec_pload_mcast { mar_req_header_t header; unsigned int msg_code; }; static struct corosync_lib_handler pload_lib_engine[] = { { /* 0 */ .lib_handler_fn = message_handler_req_pload_start, .response_size = sizeof (struct res_lib_pload_start), .response_id = MESSAGE_RES_PLOAD_START, .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED } }; static struct corosync_exec_handler pload_exec_engine[] = { { .exec_handler_fn = message_handler_req_exec_pload_start, .exec_endian_convert_fn = req_exec_pload_start_endian_convert }, { .exec_handler_fn = message_handler_req_exec_pload_mcast, .exec_endian_convert_fn = req_exec_pload_mcast_endian_convert } }; struct corosync_service_engine pload_service_engine = { .name = "corosync profile loading service", .id = PLOAD_SERVICE, .private_data_size = 0, .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED, .lib_init_fn = pload_lib_init_fn, .lib_exit_fn = pload_lib_exit_fn, .lib_engine = pload_lib_engine, .lib_engine_count = sizeof (pload_lib_engine) / sizeof (struct corosync_lib_handler), .exec_engine = pload_exec_engine, .exec_engine_count = sizeof (pload_exec_engine) / sizeof (struct corosync_exec_handler), .confchg_fn = pload_confchg_fn, .exec_init_fn = pload_exec_init_fn, .exec_dump_fn = NULL }; static DECLARE_LIST_INIT (confchg_notify); /* * Dynamic loading descriptor */ static struct corosync_service_engine *pload_get_service_engine_ver0 (void); static struct corosync_service_engine_iface_ver0 pload_service_engine_iface = { .corosync_get_service_engine_ver0 = pload_get_service_engine_ver0 }; static struct lcr_iface corosync_pload_ver0[1] = { { .name = "corosync_pload", .version = 0, .versions_replace = 0, .versions_replace_count = 0, .dependencies = 0, .dependency_count = 0, .constructor = NULL, .destructor = NULL, .interfaces = NULL, } }; static struct lcr_comp pload_comp_ver0 = { .iface_count = 1, .ifaces = corosync_pload_ver0 }; static struct corosync_service_engine *pload_get_service_engine_ver0 (void) { return (&pload_service_engine); } __attribute__ ((constructor)) static void pload_comp_register (void) { lcr_interfaces_set (&corosync_pload_ver0[0], &pload_service_engine_iface); lcr_component_register (&pload_comp_ver0); } static int pload_exec_init_fn ( struct corosync_api_v1 *corosync_api) { api = corosync_api; return 0; } static void pload_confchg_fn ( enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id) { } static int pload_lib_init_fn (void *conn) { return (0); } static int pload_lib_exit_fn (void *conn) { return (0); } -static void message_handler_req_pload_start (void *conn, void *msg) +static void message_handler_req_pload_start (void *conn, const void *msg) { - struct req_lib_pload_start *req_lib_pload_start = (struct req_lib_pload_start *)msg; + const struct req_lib_pload_start *req_lib_pload_start = msg; struct req_exec_pload_start req_exec_pload_start; struct iovec iov; req_exec_pload_start.header.id = SERVICE_ID_MAKE (PLOAD_SERVICE, MESSAGE_REQ_EXEC_PLOAD_START); req_exec_pload_start.msg_code = req_lib_pload_start->msg_code; req_exec_pload_start.msg_size = req_lib_pload_start->msg_size; req_exec_pload_start.msg_count = req_lib_pload_start->msg_count; req_exec_pload_start.time_interval = req_lib_pload_start->time_interval; iov.iov_base = &req_exec_pload_start; iov.iov_len = sizeof (struct req_exec_pload_start); api->totem_mcast (&iov, 1, TOTEM_AGREED); } static void req_exec_pload_start_endian_convert (void *msg) { } static void req_exec_pload_mcast_endian_convert (void *msg) { } static int send_message (enum totem_callback_token_type type, const void *arg) { struct req_exec_pload_mcast req_exec_pload_mcast; struct iovec iov[2]; unsigned int res; unsigned int iov_len = 1; req_exec_pload_mcast.header.id = SERVICE_ID_MAKE (PLOAD_SERVICE, MESSAGE_REQ_EXEC_PLOAD_MCAST); req_exec_pload_mcast.header.size = sizeof (struct req_exec_pload_mcast) + msg_size; iov[0].iov_base = &req_exec_pload_mcast; iov[0].iov_len = sizeof (struct req_exec_pload_mcast); if (msg_size > sizeof (req_exec_pload_mcast)) { iov[1].iov_base = buffer; iov[1].iov_len = msg_size - sizeof (req_exec_pload_mcast); iov_len = 2; } do { res = api->totem_mcast (iov, iov_len, TOTEM_AGREED); if (res == -1) { break; } else { msgs_sent++; msg_code++; } } while (msgs_sent <= msgs_wanted); if (msgs_sent == msgs_wanted) { return (0); } else { return (-1); } } static void *token_callback; static void start_mcasting (void) { api->totem_callback_token_create ( &token_callback, TOTEM_CALLBACK_TOKEN_RECEIVED, 1, send_message, &token_callback); } static void message_handler_req_exec_pload_start ( const void *msg, unsigned int nodeid) { const struct req_exec_pload_start *req_exec_pload_start = msg; msgs_wanted = req_exec_pload_start->msg_count; msg_size = req_exec_pload_start->msg_size; msg_code = req_exec_pload_start->msg_code; start_mcasting (); } # define timersub(a, b, result) \ do { \ (result)->tv_sec = (a)->tv_sec - (b)->tv_sec; \ (result)->tv_usec = (a)->tv_usec - (b)->tv_usec; \ if ((result)->tv_usec < 0) { \ --(result)->tv_sec; \ (result)->tv_usec += 1000000; \ } \ } while (0) struct timeval tv1; struct timeval tv2; struct timeval tv_elapsed; int last_msg_no = 0; static void message_handler_req_exec_pload_mcast ( const void *msg, unsigned int nodeid) { const struct req_exec_pload_mcast *pload_mcast = msg; assert (pload_mcast->msg_code - 1 == last_msg_no); last_msg_no = pload_mcast->msg_code; if (msgs_delivered == 0) { gettimeofday (&tv1, NULL); } msgs_delivered += 1; if (msgs_delivered == msgs_wanted) { gettimeofday (&tv2, NULL); timersub (&tv2, &tv1, &tv_elapsed); printf ("%5d Writes ", msgs_delivered); printf ("%5d bytes per write ", msg_size); printf ("%7.3f Seconds runtime ", (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0))); printf ("%9.3f TP/s ", ((float)msgs_delivered) / (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0))); printf ("%7.3f MB/s.\n", ((float)msgs_delivered) * ((float)msg_size) / ((tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)) * 1000000.0)); } } diff --git a/services/votequorum.c b/services/votequorum.c index dc0fc083..5834ed05 100644 --- a/services/votequorum.c +++ b/services/votequorum.c @@ -1,1700 +1,1724 @@ /* * Copyright (c) 2009 Red Hat, Inc. * * All rights reserved. * * Author: Christine Caulfield (ccaulfie@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #ifndef COROSYNC_BSD #include #endif #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define VOTEQUORUM_MAJOR_VERSION 6 #define VOTEQUORUM_MINOR_VERSION 3 #define VOTEQUORUM_PATCH_VERSION 0 /* Silly default to prevent accidents! */ #define DEFAULT_EXPECTED 1024 #define DEFAULT_QDEV_POLL 10000 #define DEFAULT_LEAVE_TMO 10000 LOGSYS_DECLARE_SUBSYS ("VOTEQ", LOG_INFO); enum quorum_message_req_types { MESSAGE_REQ_EXEC_VOTEQUORUM_NODEINFO = 0, MESSAGE_REQ_EXEC_VOTEQUORUM_RECONFIGURE = 1, MESSAGE_REQ_EXEC_VOTEQUORUM_KILLNODE = 2, }; #define NODE_FLAGS_BEENDOWN 1 #define NODE_FLAGS_SEESDISALLOWED 8 #define NODE_FLAGS_HASSTATE 16 #define NODE_FLAGS_QDISK 32 #define NODE_FLAGS_REMOVED 64 #define NODE_FLAGS_US 128 typedef enum { NODESTATE_JOINING=1, NODESTATE_MEMBER, NODESTATE_DEAD, NODESTATE_LEAVING, NODESTATE_DISALLOWED } nodestate_t; /* This structure is tacked onto the start of a cluster message packet for our * own nefarious purposes. */ struct q_protheader { unsigned char tgtport; /* Target port number */ unsigned char srcport; /* Source (originating) port number */ unsigned short pad; unsigned int flags; int srcid; /* Node ID of the sender */ int tgtid; /* Node ID of the target */ } __attribute__((packed)); struct cluster_node { int flags; int node_id; unsigned int expected_votes; unsigned int votes; time_t join_time; nodestate_t state; struct timeval last_hello; /* Only used for quorum devices */ struct list_head list; }; static int quorum_flags; #define VOTEQUORUM_FLAG_FEATURE_DISALLOWED 1 #define VOTEQUORUM_FLAG_FEATURE_TWONODE 1 static int quorum; static int cluster_is_quorate; static int first_trans = 1; static unsigned int quorumdev_poll = DEFAULT_QDEV_POLL; static unsigned int leaving_timeout = DEFAULT_LEAVE_TMO; static struct cluster_node *us; static struct cluster_node *quorum_device = NULL; static char quorum_device_name[VOTEQUORUM_MAX_QDISK_NAME_LEN]; static corosync_timer_handle_t quorum_device_timer; static corosync_timer_handle_t leaving_timer; static struct list_head cluster_members_list; static struct corosync_api_v1 *corosync_api; static struct list_head trackers_list; static unsigned int quorum_members[PROCESSOR_COUNT_MAX+1]; static int quorum_members_entries = 0; static struct memb_ring_id quorum_ringid; static hdb_handle_t group_handle; #define max(a,b) (((a) > (b)) ? (a) : (b)) static struct cluster_node *find_node_by_nodeid(int nodeid); static struct cluster_node *allocate_node(int nodeid); static const char *kill_reason(int reason); static struct corosync_tpg_group quorum_group[1] = { { .group = "VOTEQ", .group_len = 5}, }; #define list_iterate(v, head) \ for (v = (head)->next; v != head; v = v->next) struct quorum_pd { unsigned char track_flags; int tracking_enabled; uint64_t tracking_context; struct list_head list; void *conn; }; /* * Service Interfaces required by service_message_handler struct */ static void votequorum_init(struct corosync_api_v1 *api, quorum_set_quorate_fn_t report); static void quorum_confchg_fn ( enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id); static void quorum_deliver_fn(unsigned int nodeid, struct iovec *iovec, unsigned int iov_len, int endian_conversion_required); static int votequorum_exec_init_fn (struct corosync_api_v1 *corosync_api); static int quorum_lib_init_fn (void *conn); static int quorum_lib_exit_fn (void *conn); static void message_handler_req_exec_quorum_nodeinfo ( - void *message, + const void *message, unsigned int nodeid); static void message_handler_req_exec_quorum_reconfigure ( - void *message, + const void *message, unsigned int nodeid); static void message_handler_req_exec_quorum_killnode ( - void *message, + const void *message, unsigned int nodeid); -static void message_handler_req_lib_votequorum_getinfo (void *conn, void *message); +static void message_handler_req_lib_votequorum_getinfo (void *conn, + const void *message); -static void message_handler_req_lib_votequorum_setexpected (void *conn, void *message); +static void message_handler_req_lib_votequorum_setexpected (void *conn, + const void *message); -static void message_handler_req_lib_votequorum_setvotes (void *conn, void *message); +static void message_handler_req_lib_votequorum_setvotes (void *conn, + const void *message); -static void message_handler_req_lib_votequorum_qdisk_register (void *conn, void *message); +static void message_handler_req_lib_votequorum_qdisk_register (void *conn, + const void *message); -static void message_handler_req_lib_votequorum_qdisk_unregister (void *conn, void *message); +static void message_handler_req_lib_votequorum_qdisk_unregister (void *conn, + const void *message); -static void message_handler_req_lib_votequorum_qdisk_poll (void *conn, void *message); +static void message_handler_req_lib_votequorum_qdisk_poll (void *conn, + const void *message); -static void message_handler_req_lib_votequorum_qdisk_getinfo (void *conn, void *message); +static void message_handler_req_lib_votequorum_qdisk_getinfo (void *conn, + const void *message); -static void message_handler_req_lib_votequorum_setstate (void *conn, void *message); +static void message_handler_req_lib_votequorum_setstate (void *conn, + const void *message); -static void message_handler_req_lib_votequorum_leaving (void *conn, void *message); -static void message_handler_req_lib_votequorum_trackstart (void *conn, void *msg); -static void message_handler_req_lib_votequorum_trackstop (void *conn, void *msg); +static void message_handler_req_lib_votequorum_leaving (void *conn, + const void *message); +static void message_handler_req_lib_votequorum_trackstart (void *conn, + const void *msg); +static void message_handler_req_lib_votequorum_trackstop (void *conn, + const void *msg); static int quorum_exec_send_nodeinfo(void); static int quorum_exec_send_reconfigure(int param, int nodeid, int value); static int quorum_exec_send_killnode(int nodeid, unsigned int reason); static void add_votequorum_config_notification(hdb_handle_t quorum_object_handle); static void recalculate_quorum(int allow_decrease, int by_current_nodes); /* * Library Handler Definition */ static struct corosync_lib_handler quorum_lib_service[] = { { /* 0 */ .lib_handler_fn = message_handler_req_lib_votequorum_getinfo, .response_size = sizeof (struct res_lib_votequorum_getinfo), .response_id = MESSAGE_RES_VOTEQUORUM_GETINFO, .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 1 */ .lib_handler_fn = message_handler_req_lib_votequorum_setexpected, .response_size = sizeof (struct res_lib_votequorum_status), .response_id = MESSAGE_RES_VOTEQUORUM_STATUS, .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 2 */ .lib_handler_fn = message_handler_req_lib_votequorum_setvotes, .response_size = sizeof (struct res_lib_votequorum_status), .response_id = MESSAGE_RES_VOTEQUORUM_STATUS, .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 3 */ .lib_handler_fn = message_handler_req_lib_votequorum_qdisk_register, .response_size = sizeof (struct res_lib_votequorum_status), .response_id = MESSAGE_RES_VOTEQUORUM_STATUS, .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 4 */ .lib_handler_fn = message_handler_req_lib_votequorum_qdisk_unregister, .response_size = sizeof (struct res_lib_votequorum_status), .response_id = MESSAGE_RES_VOTEQUORUM_STATUS, .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 5 */ .lib_handler_fn = message_handler_req_lib_votequorum_qdisk_poll, .response_size = sizeof (struct res_lib_votequorum_status), .response_id = MESSAGE_RES_VOTEQUORUM_STATUS, .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 6 */ .lib_handler_fn = message_handler_req_lib_votequorum_qdisk_getinfo, .response_size = sizeof (struct res_lib_votequorum_qdisk_getinfo), .response_id = MESSAGE_RES_VOTEQUORUM_QDISK_GETINFO, .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 7 */ .lib_handler_fn = message_handler_req_lib_votequorum_setstate, .response_size = sizeof (struct res_lib_votequorum_status), .response_id = MESSAGE_RES_VOTEQUORUM_STATUS, .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 8 */ .lib_handler_fn = message_handler_req_lib_votequorum_leaving, .response_size = sizeof (struct res_lib_votequorum_status), .response_id = MESSAGE_RES_VOTEQUORUM_STATUS, .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 9 */ .lib_handler_fn = message_handler_req_lib_votequorum_trackstart, .response_size = sizeof (struct res_lib_votequorum_status), .response_id = MESSAGE_RES_VOTEQUORUM_STATUS, .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED }, { /* 10 */ .lib_handler_fn = message_handler_req_lib_votequorum_trackstop, .response_size = sizeof (struct res_lib_votequorum_status), .response_id = MESSAGE_RES_VOTEQUORUM_STATUS, .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED } }; static quorum_set_quorate_fn_t set_quorum; /* * lcrso object definition */ static struct quorum_services_api_ver1 votequorum_iface_ver0 = { .init = votequorum_init }; static struct corosync_service_engine quorum_service_handler = { .name = "corosync votes quorum service v0.90", .id = VOTEQUORUM_SERVICE, .private_data_size = sizeof (struct quorum_pd), .allow_inquorate = CS_LIB_ALLOW_INQUORATE, .flow_control = COROSYNC_LIB_FLOW_CONTROL_REQUIRED, .lib_init_fn = quorum_lib_init_fn, .lib_exit_fn = quorum_lib_exit_fn, .lib_engine = quorum_lib_service, .lib_engine_count = sizeof (quorum_lib_service) / sizeof (struct corosync_lib_handler), .exec_init_fn = votequorum_exec_init_fn, .exec_engine = NULL, .exec_engine_count = 0, .confchg_fn = NULL, }; /* * Dynamic loader definition */ static struct corosync_service_engine *quorum_get_service_handler_ver0 (void); static struct corosync_service_engine_iface_ver0 quorum_service_handler_iface = { .corosync_get_service_engine_ver0 = quorum_get_service_handler_ver0 }; static struct lcr_iface corosync_quorum_ver0[2] = { { .name = "corosync_votequorum", .version = 0, .versions_replace = 0, .versions_replace_count = 0, .dependencies = 0, .dependency_count = 0, .constructor = NULL, .destructor = NULL, .interfaces = (void **)(void *)&votequorum_iface_ver0 }, { .name = "corosync_votequorum_iface", .version = 0, .versions_replace = 0, .versions_replace_count = 0, .dependencies = 0, .dependency_count = 0, .constructor = NULL, .destructor = NULL, .interfaces = NULL } }; static struct lcr_comp quorum_comp_ver0 = { .iface_count = 2, .ifaces = corosync_quorum_ver0 }; static struct corosync_service_engine *quorum_get_service_handler_ver0 (void) { return (&quorum_service_handler); } __attribute__ ((constructor)) static void quorum_comp_register (void) { lcr_interfaces_set (&corosync_quorum_ver0[0], &votequorum_iface_ver0); lcr_interfaces_set (&corosync_quorum_ver0[1], &quorum_service_handler_iface); lcr_component_register (&quorum_comp_ver0); } static void votequorum_init(struct corosync_api_v1 *api, quorum_set_quorate_fn_t report) { ENTER(); set_quorum = report; /* Load the library-servicing part of this module */ api->service_link_and_init(api, "corosync_votequorum_iface", 0); LEAVE(); } /* Message types */ #define VOTEQUORUM_MSG_NODEINFO 5 #define VOTEQUORUM_MSG_KILLNODE 6 #define VOTEQUORUM_MSG_RECONFIGURE 8 struct req_exec_quorum_nodeinfo { unsigned char cmd; unsigned char first_trans; unsigned int votes; unsigned int expected_votes; unsigned int major_version; /* Not backwards compatible */ unsigned int minor_version; /* Backwards compatible */ unsigned int patch_version; /* Backwards/forwards compatible */ unsigned int config_version; unsigned int flags; } __attribute__((packed)); /* Parameters for RECONFIG command */ #define RECONFIG_PARAM_EXPECTED_VOTES 1 #define RECONFIG_PARAM_NODE_VOTES 2 #define RECONFIG_PARAM_LEAVING 3 struct req_exec_quorum_reconfigure { unsigned char cmd; unsigned char param; unsigned short pad; int nodeid; unsigned int value; }; struct req_exec_quorum_killnode { unsigned char cmd; unsigned char pad1; uint16_t reason; int nodeid; }; /* These just make the access a little neater */ static inline int objdb_get_string(const struct corosync_api_v1 *corosync, unsigned int object_service_handle, char *key, char **value) { int res; *value = NULL; if ( !(res = corosync_api->object_key_get(object_service_handle, key, strlen(key), (void *)value, NULL))) { if (*value) return 0; } return -1; } static inline void objdb_get_int(const struct corosync_api_v1 *corosync, unsigned int object_service_handle, const char *key, unsigned int *intvalue, unsigned int default_value) { char *value = NULL; *intvalue = default_value; if (!corosync_api->object_key_get(object_service_handle, key, strlen(key), (void *)&value, NULL)) { if (value) { *intvalue = atoi(value); } } } -static int votequorum_send_message(void *message, int len) +static int votequorum_send_message(const void *message, size_t len) { struct iovec iov[2]; struct q_protheader header; header.tgtport = 0; header.srcport = 0; header.flags = 0; header.srcid = us->node_id; header.tgtid = 0; iov[0].iov_base = &header; iov[0].iov_len = sizeof(header); - iov[1].iov_base = message; + iov[1].iov_base = (void *) message; iov[1].iov_len = len; return corosync_api->tpg_joined_mcast(group_handle, iov, 2, TOTEM_AGREED); } static void read_quorum_config(unsigned int quorum_handle) { unsigned int value = 0; int cluster_members = 0; struct list_head *tmp; struct cluster_node *node; log_printf(LOG_INFO, "Reading configuration\n"); objdb_get_int(corosync_api, quorum_handle, "expected_votes", &us->expected_votes, DEFAULT_EXPECTED); objdb_get_int(corosync_api, quorum_handle, "votes", &us->votes, 1); objdb_get_int(corosync_api, quorum_handle, "quorumdev_poll", &quorumdev_poll, DEFAULT_QDEV_POLL); objdb_get_int(corosync_api, quorum_handle, "leaving_timeout", &leaving_timeout, DEFAULT_LEAVE_TMO); objdb_get_int(corosync_api, quorum_handle, "disallowed", &value, 0); if (value) quorum_flags |= VOTEQUORUM_FLAG_FEATURE_DISALLOWED; else quorum_flags &= ~VOTEQUORUM_FLAG_FEATURE_DISALLOWED; objdb_get_int(corosync_api, quorum_handle, "two_node", &value, 0); if (value) quorum_flags |= VOTEQUORUM_FLAG_FEATURE_TWONODE; else quorum_flags &= ~VOTEQUORUM_FLAG_FEATURE_TWONODE; /* * two_node mode is invalid if there are more than 2 nodes in the cluster! */ list_iterate(tmp, &cluster_members_list) { node = list_entry(tmp, struct cluster_node, list); cluster_members++; } if (quorum_flags & VOTEQUORUM_FLAG_FEATURE_TWONODE && cluster_members > 2) { log_printf(LOG_WARNING, "quorum.two_node was set but there are more than 2 nodes in the cluster. It will be ignored."); quorum_flags &= ~VOTEQUORUM_FLAG_FEATURE_TWONODE; } } static int votequorum_exec_init_fn (struct corosync_api_v1 *api) { hdb_handle_t object_handle; hdb_handle_t find_handle; ENTER(); corosync_api = api; list_init(&cluster_members_list); list_init(&trackers_list); /* Allocate a cluster_node for us */ us = allocate_node(corosync_api->totem_nodeid_get()); if (!us) return (1); us->flags |= NODE_FLAGS_US; us->state = NODESTATE_MEMBER; us->expected_votes = DEFAULT_EXPECTED; us->votes = 1; time(&us->join_time); /* Get configuration variables */ corosync_api->object_find_create(OBJECT_PARENT_HANDLE, "quorum", strlen("quorum"), &find_handle); if (corosync_api->object_find_next(find_handle, &object_handle) == 0) { read_quorum_config(object_handle); } recalculate_quorum(0, 0); /* Listen for changes */ add_votequorum_config_notification(object_handle); corosync_api->object_find_destroy(find_handle); api->tpg_init(&group_handle, quorum_deliver_fn, quorum_confchg_fn); api->tpg_join(group_handle, quorum_group, 1); LEAVE(); return (0); } static int quorum_lib_exit_fn (void *conn) { struct quorum_pd *quorum_pd = (struct quorum_pd *)corosync_api->ipc_private_data_get (conn); ENTER(); if (quorum_pd->tracking_enabled) { list_del (&quorum_pd->list); list_init (&quorum_pd->list); } LEAVE(); return (0); } static int send_quorum_notification(void *conn, uint64_t context) { struct res_lib_votequorum_notification *res_lib_votequorum_notification; struct list_head *tmp; struct cluster_node *node; int cluster_members = 0; int i = 0; int size; char *buf; ENTER(); list_iterate(tmp, &cluster_members_list) { node = list_entry(tmp, struct cluster_node, list); cluster_members++; } if (quorum_device) cluster_members++; size = sizeof(struct res_lib_votequorum_notification) + sizeof(struct votequorum_node) * cluster_members; buf = alloca(size); if (!buf) { LEAVE(); return -1; } res_lib_votequorum_notification = (struct res_lib_votequorum_notification *)buf; res_lib_votequorum_notification->quorate = cluster_is_quorate; res_lib_votequorum_notification->node_list_entries = cluster_members; res_lib_votequorum_notification->context = context; list_iterate(tmp, &cluster_members_list) { node = list_entry(tmp, struct cluster_node, list); res_lib_votequorum_notification->node_list[i].nodeid = node->node_id; res_lib_votequorum_notification->node_list[i++].state = node->state; } if (quorum_device) { res_lib_votequorum_notification->node_list[i].nodeid = 0; res_lib_votequorum_notification->node_list[i++].state = quorum_device->state | 0x80; } res_lib_votequorum_notification->header.id = MESSAGE_RES_VOTEQUORUM_NOTIFICATION; res_lib_votequorum_notification->header.size = size; res_lib_votequorum_notification->header.error = CS_OK; /* Send it to all interested parties */ if (conn) { int ret = corosync_api->ipc_dispatch_send(conn, buf, size); LEAVE(); return ret; } else { struct quorum_pd *qpd; list_iterate(tmp, &trackers_list) { qpd = list_entry(tmp, struct quorum_pd, list); res_lib_votequorum_notification->context = qpd->tracking_context; corosync_api->ipc_dispatch_send(qpd->conn, buf, size); } } LEAVE(); return 0; } static void send_expectedvotes_notification(void) { struct res_lib_votequorum_expectedvotes_notification res_lib_votequorum_expectedvotes_notification; struct quorum_pd *qpd; struct list_head *tmp; log_printf(LOG_DEBUG, "Sending expected votes callback\n"); res_lib_votequorum_expectedvotes_notification.header.id = MESSAGE_RES_VOTEQUORUM_EXPECTEDVOTES_NOTIFICATION; res_lib_votequorum_expectedvotes_notification.header.size = sizeof(res_lib_votequorum_expectedvotes_notification); res_lib_votequorum_expectedvotes_notification.header.error = CS_OK; res_lib_votequorum_expectedvotes_notification.expected_votes = us->expected_votes; list_iterate(tmp, &trackers_list) { qpd = list_entry(tmp, struct quorum_pd, list); res_lib_votequorum_expectedvotes_notification.context = qpd->tracking_context; corosync_api->ipc_dispatch_send(qpd->conn, &res_lib_votequorum_expectedvotes_notification, sizeof(struct res_lib_votequorum_expectedvotes_notification)); } } static void set_quorate(int total_votes) { int quorate; ENTER(); if (quorum > total_votes) { quorate = 0; } else { quorate = 1; } if (cluster_is_quorate && !quorate) log_printf(LOG_INFO, "quorum lost, blocking activity\n"); if (!cluster_is_quorate && quorate) log_printf(LOG_INFO, "quorum regained, resuming activity\n"); /* If we are newly quorate, then kill any DISALLOWED nodes */ if (!cluster_is_quorate && quorate) { struct cluster_node *node = NULL; struct list_head *tmp; list_iterate(tmp, &cluster_members_list) { node = list_entry(tmp, struct cluster_node, list); if (node->state == NODESTATE_DISALLOWED) quorum_exec_send_killnode(node->node_id, VOTEQUORUM_REASON_KILL_REJOIN); } } cluster_is_quorate = quorate; set_quorum(quorum_members, quorum_members_entries, quorate, &quorum_ringid); ENTER(); } static int calculate_quorum(int allow_decrease, int max_expected, unsigned int *ret_total_votes) { struct list_head *nodelist; struct cluster_node *node; unsigned int total_votes = 0; unsigned int highest_expected = 0; unsigned int newquorum, q1, q2; unsigned int total_nodes = 0; ENTER(); list_iterate(nodelist, &cluster_members_list) { node = list_entry(nodelist, struct cluster_node, list); log_printf(LOG_DEBUG, "node %x state=%d, votes=%d, expected=%d\n", node->node_id, node->state, node->votes, node->expected_votes); if (node->state == NODESTATE_MEMBER) { if (max_expected) node->expected_votes = max_expected; else highest_expected = max(highest_expected, node->expected_votes); total_votes += node->votes; total_nodes++; } } if (quorum_device && quorum_device->state == NODESTATE_MEMBER) total_votes += quorum_device->votes; if (max_expected > 0) highest_expected = max_expected; /* This quorum calculation is taken from the OpenVMS Cluster Systems * manual, but, then, you guessed that didn't you */ q1 = (highest_expected + 2) / 2; q2 = (total_votes + 2) / 2; newquorum = max(q1, q2); /* Normally quorum never decreases but the system administrator can * force it down by setting expected votes to a maximum value */ if (!allow_decrease) newquorum = max(quorum, newquorum); /* The special two_node mode allows each of the two nodes to retain * quorum if the other fails. Only one of the two should live past * fencing (as both nodes try to fence each other in split-brain.) * Also: if there are more than two nodes, force us inquorate to avoid * any damage or confusion. */ if ((quorum_flags & VOTEQUORUM_FLAG_FEATURE_TWONODE) && total_nodes <= 2) newquorum = 1; if (ret_total_votes) *ret_total_votes = total_votes; LEAVE(); return newquorum; } /* Recalculate cluster quorum, set quorate and notify changes */ static void recalculate_quorum(int allow_decrease, int by_current_nodes) { unsigned int total_votes = 0; int cluster_members = 0; struct list_head *nodelist; struct cluster_node *node; ENTER(); list_iterate(nodelist, &cluster_members_list) { node = list_entry(nodelist, struct cluster_node, list); if (node->state == NODESTATE_MEMBER) { if (by_current_nodes) cluster_members++; total_votes += node->votes; } } /* Keep expected_votes at the highest number of votes in the cluster */ log_printf(LOG_DEBUG, "total_votes=%d, expected_votes=%d\n", total_votes, us->expected_votes); if (total_votes > us->expected_votes) { us->expected_votes = total_votes; send_expectedvotes_notification(); } quorum = calculate_quorum(allow_decrease, cluster_members, &total_votes); set_quorate(total_votes); send_quorum_notification(NULL, 0L); LEAVE(); } static int have_disallowed(void) { struct cluster_node *node; struct list_head *tmp; list_iterate(tmp, &cluster_members_list) { node = list_entry(tmp, struct cluster_node, list); if (node->state == NODESTATE_DISALLOWED) return 1; } return 0; } static void node_add_ordered(struct cluster_node *newnode) { struct cluster_node *node = NULL; struct list_head *tmp; struct list_head *newlist = &newnode->list; list_iterate(tmp, &cluster_members_list) { node = list_entry(tmp, struct cluster_node, list); if (newnode->node_id < node->node_id) break; } if (!node) list_add(&newnode->list, &cluster_members_list); else { newlist->prev = tmp->prev; newlist->next = tmp; tmp->prev->next = newlist; tmp->prev = newlist; } } static struct cluster_node *allocate_node(int nodeid) { struct cluster_node *cl; cl = malloc(sizeof(struct cluster_node)); if (cl) { memset(cl, 0, sizeof(struct cluster_node)); cl->node_id = nodeid; if (nodeid) node_add_ordered(cl); } return cl; } static struct cluster_node *find_node_by_nodeid(int nodeid) { struct cluster_node *node; struct list_head *tmp; list_iterate(tmp, &cluster_members_list) { node = list_entry(tmp, struct cluster_node, list); if (node->node_id == nodeid) return node; } return NULL; } static int quorum_exec_send_nodeinfo() { struct req_exec_quorum_nodeinfo req_exec_quorum_nodeinfo; int ret; ENTER(); req_exec_quorum_nodeinfo.cmd = VOTEQUORUM_MSG_NODEINFO; req_exec_quorum_nodeinfo.expected_votes = us->expected_votes; req_exec_quorum_nodeinfo.votes = us->votes; req_exec_quorum_nodeinfo.major_version = VOTEQUORUM_MAJOR_VERSION; req_exec_quorum_nodeinfo.minor_version = VOTEQUORUM_MINOR_VERSION; req_exec_quorum_nodeinfo.patch_version = VOTEQUORUM_PATCH_VERSION; req_exec_quorum_nodeinfo.flags = us->flags; req_exec_quorum_nodeinfo.first_trans = first_trans; if (have_disallowed()) req_exec_quorum_nodeinfo.flags |= NODE_FLAGS_SEESDISALLOWED; ret = votequorum_send_message(&req_exec_quorum_nodeinfo, sizeof(req_exec_quorum_nodeinfo)); LEAVE(); return ret; } static int quorum_exec_send_reconfigure(int param, int nodeid, int value) { struct req_exec_quorum_reconfigure req_exec_quorum_reconfigure; int ret; ENTER(); req_exec_quorum_reconfigure.cmd = VOTEQUORUM_MSG_RECONFIGURE; req_exec_quorum_reconfigure.param = param; req_exec_quorum_reconfigure.nodeid = nodeid; req_exec_quorum_reconfigure.value = value; ret = votequorum_send_message(&req_exec_quorum_reconfigure, sizeof(req_exec_quorum_reconfigure)); LEAVE(); return ret; } static int quorum_exec_send_killnode(int nodeid, unsigned int reason) { struct req_exec_quorum_killnode req_exec_quorum_killnode; int ret; ENTER(); req_exec_quorum_killnode.cmd = VOTEQUORUM_MSG_KILLNODE; req_exec_quorum_killnode.nodeid = nodeid; req_exec_quorum_killnode.reason = reason; ret = votequorum_send_message(&req_exec_quorum_killnode, sizeof(req_exec_quorum_killnode)); LEAVE(); return ret; } static void quorum_confchg_fn ( enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id) { int i; int leaving = 0; struct cluster_node *node; ENTER(); if (member_list_entries > 1) first_trans = 0; if (left_list_entries) { for (i = 0; i< left_list_entries; i++) { node = find_node_by_nodeid(left_list[i]); if (node) { if (node->state == NODESTATE_LEAVING) leaving = 1; node->state = NODESTATE_DEAD; node->flags |= NODE_FLAGS_BEENDOWN; } } recalculate_quorum(leaving, leaving); } if (member_list_entries) { memcpy(quorum_members, member_list, sizeof(unsigned int) * member_list_entries); quorum_members_entries = member_list_entries; if (quorum_device) { quorum_members[quorum_members_entries++] = 0; } quorum_exec_send_nodeinfo(); } memcpy(&quorum_ringid, ring_id, sizeof(*ring_id)); LEAVE(); } static void exec_quorum_nodeinfo_endian_convert (void *msg) { - struct req_exec_quorum_nodeinfo *nodeinfo = (struct req_exec_quorum_nodeinfo *)msg; + struct req_exec_quorum_nodeinfo *nodeinfo = msg; nodeinfo->votes = swab32(nodeinfo->votes); nodeinfo->expected_votes = swab32(nodeinfo->expected_votes); nodeinfo->major_version = swab32(nodeinfo->major_version); nodeinfo->minor_version = swab32(nodeinfo->minor_version); nodeinfo->patch_version = swab32(nodeinfo->patch_version); nodeinfo->config_version = swab32(nodeinfo->config_version); nodeinfo->flags = swab32(nodeinfo->flags); } static void exec_quorum_reconfigure_endian_convert (void *msg) { - struct req_exec_quorum_reconfigure *reconfigure = (struct req_exec_quorum_reconfigure *)msg; + struct req_exec_quorum_reconfigure *reconfigure = msg; reconfigure->nodeid = swab32(reconfigure->nodeid); reconfigure->value = swab32(reconfigure->value); } static void exec_quorum_killnode_endian_convert (void *msg) { - struct req_exec_quorum_killnode *killnode = (struct req_exec_quorum_killnode *)msg; + struct req_exec_quorum_killnode *killnode = msg; killnode->reason = swab16(killnode->reason); killnode->nodeid = swab32(killnode->nodeid); } static void quorum_deliver_fn(unsigned int nodeid, struct iovec *iovec, unsigned int iov_len, int endian_conversion_required) { struct q_protheader *header = iovec->iov_base; char *buf; ENTER(); if (endian_conversion_required) { header->srcid = swab32(header->srcid); header->tgtid = swab32(header->tgtid); header->flags = swab32(header->flags); } /* Only pass on messages for us or everyone */ if (header->tgtport == 0 && (header->tgtid == us->node_id || header->tgtid == 0)) { buf = (char *)(iovec->iov_base) + sizeof(struct q_protheader); switch (*buf) { case VOTEQUORUM_MSG_NODEINFO: if (endian_conversion_required) exec_quorum_nodeinfo_endian_convert(buf); message_handler_req_exec_quorum_nodeinfo (buf, header->srcid); break; case VOTEQUORUM_MSG_RECONFIGURE: if (endian_conversion_required) exec_quorum_reconfigure_endian_convert(buf); message_handler_req_exec_quorum_reconfigure (buf, header->srcid); break; case VOTEQUORUM_MSG_KILLNODE: if (endian_conversion_required) exec_quorum_killnode_endian_convert(buf); message_handler_req_exec_quorum_killnode (buf, header->srcid); break; /* Just ignore other messages */ } } LEAVE(); } static void message_handler_req_exec_quorum_nodeinfo ( - void *message, + const void *message, unsigned int nodeid) { - struct req_exec_quorum_nodeinfo *req_exec_quorum_nodeinfo = (struct req_exec_quorum_nodeinfo *)message; + const struct req_exec_quorum_nodeinfo *req_exec_quorum_nodeinfo = message; struct cluster_node *node; int old_votes; int old_expected; nodestate_t old_state; int new_node = 0; ENTER(); log_printf(LOG_LEVEL_DEBUG, "got nodeinfo message from cluster node %d\n", nodeid); node = find_node_by_nodeid(nodeid); if (!node) { node = allocate_node(nodeid); new_node = 1; } if (!node) { corosync_api->error_memory_failure(); return; } /* * If the node sending the message sees disallowed nodes and we don't, then * we have to leave */ if (req_exec_quorum_nodeinfo->flags & NODE_FLAGS_SEESDISALLOWED && !have_disallowed()) { /* Must use syslog directly here or the message will never arrive */ syslog(LOG_CRIT, "[VOTEQ]: Joined a cluster with disallowed nodes. must die"); corosync_api->fatal_error(2, __FILE__, __LINE__); exit(2); } old_votes = node->votes; old_expected = node->expected_votes; old_state = node->state; /* Update node state */ if (req_exec_quorum_nodeinfo->minor_version >= 2) node->votes = req_exec_quorum_nodeinfo->votes; node->expected_votes = req_exec_quorum_nodeinfo->expected_votes; node->state = NODESTATE_MEMBER; /* Check flags for disallowed (if enabled) */ if (quorum_flags & VOTEQUORUM_FLAG_FEATURE_DISALLOWED) { if ((req_exec_quorum_nodeinfo->flags & NODE_FLAGS_HASSTATE && node->flags & NODE_FLAGS_BEENDOWN) || (req_exec_quorum_nodeinfo->flags & NODE_FLAGS_HASSTATE && req_exec_quorum_nodeinfo->first_trans && !(node->flags & NODE_FLAGS_US) && (us->flags & NODE_FLAGS_HASSTATE))) { if (node->state != NODESTATE_DISALLOWED) { if (cluster_is_quorate) { log_printf(LOG_CRIT, "Killing node %d because it has rejoined the cluster with existing state", node->node_id); node->state = NODESTATE_DISALLOWED; quorum_exec_send_killnode(nodeid, VOTEQUORUM_REASON_KILL_REJOIN); } else { log_printf(LOG_CRIT, "Node %d not joined to quorum because it has existing state", node->node_id); node->state = NODESTATE_DISALLOWED; } } } } node->flags &= ~NODE_FLAGS_BEENDOWN; if (new_node || old_votes != node->votes || old_expected != node->expected_votes || old_state != node->state) recalculate_quorum(0, 0); LEAVE(); } static void message_handler_req_exec_quorum_killnode ( - void *message, + const void *message, unsigned int nodeid) { - struct req_exec_quorum_killnode *req_exec_quorum_killnode = (struct req_exec_quorum_killnode *)message; + const struct req_exec_quorum_killnode *req_exec_quorum_killnode = message; if (req_exec_quorum_killnode->nodeid == corosync_api->totem_nodeid_get()) { log_printf(LOG_CRIT, "Killed by node %d: %s\n", nodeid, kill_reason(req_exec_quorum_killnode->reason)); corosync_api->fatal_error(1, __FILE__, __LINE__); exit(1); } } static void message_handler_req_exec_quorum_reconfigure ( - void *message, + const void *message, unsigned int nodeid) { - struct req_exec_quorum_reconfigure *req_exec_quorum_reconfigure = (struct req_exec_quorum_reconfigure *)message; + const struct req_exec_quorum_reconfigure *req_exec_quorum_reconfigure = message; struct cluster_node *node; struct list_head *nodelist; log_printf(LOG_LEVEL_DEBUG, "got reconfigure message from cluster node %d\n", nodeid); node = find_node_by_nodeid(req_exec_quorum_reconfigure->nodeid); if (!node) return; switch(req_exec_quorum_reconfigure->param) { case RECONFIG_PARAM_EXPECTED_VOTES: list_iterate(nodelist, &cluster_members_list) { node = list_entry(nodelist, struct cluster_node, list); if (node->state == NODESTATE_MEMBER && node->expected_votes > req_exec_quorum_reconfigure->value) { node->expected_votes = req_exec_quorum_reconfigure->value; } } send_expectedvotes_notification(); recalculate_quorum(1, 0); /* Allow decrease */ break; case RECONFIG_PARAM_NODE_VOTES: node->votes = req_exec_quorum_reconfigure->value; recalculate_quorum(1, 0); /* Allow decrease */ break; case RECONFIG_PARAM_LEAVING: if (req_exec_quorum_reconfigure->value == 1 && node->state == NODESTATE_MEMBER) node->state = NODESTATE_LEAVING; if (req_exec_quorum_reconfigure->value == 0 && node->state == NODESTATE_LEAVING) node->state = NODESTATE_MEMBER; break; } } static int quorum_lib_init_fn (void *conn) { struct quorum_pd *pd = (struct quorum_pd *)corosync_api->ipc_private_data_get (conn); ENTER(); list_init (&pd->list); pd->conn = conn; LEAVE(); return (0); } /* * Someone called votequorum_leave AGES ago! * Assume they forgot to shut down the node. */ static void leaving_timer_fn(void *arg) { ENTER(); if (us->state == NODESTATE_LEAVING) us->state = NODESTATE_MEMBER; /* Tell everyone else we made a mistake */ quorum_exec_send_reconfigure(RECONFIG_PARAM_LEAVING, us->node_id, 0); LEAVE(); } /* Message from the library */ -static void message_handler_req_lib_votequorum_getinfo (void *conn, void *message) +static void message_handler_req_lib_votequorum_getinfo (void *conn, const void *message) { - struct req_lib_votequorum_getinfo *req_lib_votequorum_getinfo = (struct req_lib_votequorum_getinfo *)message; + const struct req_lib_votequorum_getinfo *req_lib_votequorum_getinfo = message; struct res_lib_votequorum_getinfo res_lib_votequorum_getinfo; struct cluster_node *node; unsigned int highest_expected = 0; unsigned int total_votes = 0; cs_error_t error = CS_OK; log_printf(LOG_LEVEL_DEBUG, "got getinfo request on %p for node %d\n", conn, req_lib_votequorum_getinfo->nodeid); if (req_lib_votequorum_getinfo->nodeid) { node = find_node_by_nodeid(req_lib_votequorum_getinfo->nodeid); } else { node = us; } if (node) { struct cluster_node *iternode; struct list_head *nodelist; list_iterate(nodelist, &cluster_members_list) { iternode = list_entry(nodelist, struct cluster_node, list); if (iternode->state == NODESTATE_MEMBER) { highest_expected = max(highest_expected, iternode->expected_votes); total_votes += iternode->votes; } } if (quorum_device && quorum_device->state == NODESTATE_MEMBER) { total_votes += quorum_device->votes; } res_lib_votequorum_getinfo.votes = us->votes; res_lib_votequorum_getinfo.expected_votes = us->expected_votes; res_lib_votequorum_getinfo.highest_expected = highest_expected; res_lib_votequorum_getinfo.quorum = quorum; res_lib_votequorum_getinfo.total_votes = total_votes; res_lib_votequorum_getinfo.flags = 0; res_lib_votequorum_getinfo.nodeid = node->node_id; if (us->flags & NODE_FLAGS_HASSTATE) res_lib_votequorum_getinfo.flags |= VOTEQUORUM_INFO_FLAG_HASSTATE; if (quorum_flags & VOTEQUORUM_FLAG_FEATURE_TWONODE) res_lib_votequorum_getinfo.flags |= VOTEQUORUM_INFO_FLAG_TWONODE; if (cluster_is_quorate) res_lib_votequorum_getinfo.flags |= VOTEQUORUM_INFO_FLAG_QUORATE; if (us->flags & NODE_FLAGS_SEESDISALLOWED) res_lib_votequorum_getinfo.flags |= VOTEQUORUM_INFO_FLAG_DISALLOWED; } else { error = CS_ERR_NOT_EXIST; } res_lib_votequorum_getinfo.header.size = sizeof(res_lib_votequorum_getinfo); res_lib_votequorum_getinfo.header.id = MESSAGE_RES_VOTEQUORUM_GETINFO; res_lib_votequorum_getinfo.header.error = error; corosync_api->ipc_response_send(conn, &res_lib_votequorum_getinfo, sizeof(res_lib_votequorum_getinfo)); log_printf(LOG_LEVEL_DEBUG, "getinfo response error: %d\n", error); } /* Message from the library */ -static void message_handler_req_lib_votequorum_setexpected (void *conn, void *message) +static void message_handler_req_lib_votequorum_setexpected (void *conn, const void *message) { - struct req_lib_votequorum_setexpected *req_lib_votequorum_setexpected = (struct req_lib_votequorum_setexpected *)message; + const struct req_lib_votequorum_setexpected *req_lib_votequorum_setexpected = message; struct res_lib_votequorum_status res_lib_votequorum_status; cs_error_t error = CS_OK; unsigned int newquorum; unsigned int total_votes; ENTER(); /* * If there are disallowed nodes, then we can't allow the user * to bypass them by fiddling with expected votes. */ if (quorum_flags & VOTEQUORUM_FLAG_FEATURE_DISALLOWED && have_disallowed()) { error = CS_ERR_EXIST; goto error_exit; } /* Validate new expected votes */ newquorum = calculate_quorum(1, req_lib_votequorum_setexpected->expected_votes, &total_votes); if (newquorum < total_votes / 2 || newquorum > total_votes) { error = CS_ERR_INVALID_PARAM; goto error_exit; } quorum_exec_send_reconfigure(RECONFIG_PARAM_EXPECTED_VOTES, us->node_id, req_lib_votequorum_setexpected->expected_votes); /* send status */ error_exit: res_lib_votequorum_status.header.size = sizeof(res_lib_votequorum_status); res_lib_votequorum_status.header.id = MESSAGE_RES_VOTEQUORUM_STATUS; res_lib_votequorum_status.header.error = error; corosync_api->ipc_response_send(conn, &res_lib_votequorum_status, sizeof(res_lib_votequorum_status)); LEAVE(); } /* Message from the library */ -static void message_handler_req_lib_votequorum_setvotes (void *conn, void *message) +static void message_handler_req_lib_votequorum_setvotes (void *conn, const void *message) { - struct req_lib_votequorum_setvotes *req_lib_votequorum_setvotes = (struct req_lib_votequorum_setvotes *)message; + const struct req_lib_votequorum_setvotes *req_lib_votequorum_setvotes = message; struct res_lib_votequorum_status res_lib_votequorum_status; struct cluster_node *node; unsigned int newquorum; unsigned int total_votes; unsigned int saved_votes; cs_error_t error = CS_OK; + unsigned int nodeid; ENTER(); - node = find_node_by_nodeid(req_lib_votequorum_setvotes->nodeid); + nodeid = req_lib_votequorum_setvotes->nodeid; + node = find_node_by_nodeid(nodeid); if (!node) { error = CS_ERR_NAME_NOT_FOUND; goto error_exit; } /* Check votes is valid */ saved_votes = node->votes; node->votes = req_lib_votequorum_setvotes->votes; newquorum = calculate_quorum(1, 0, &total_votes); if (newquorum < total_votes / 2 || newquorum > total_votes) { node->votes = saved_votes; error = CS_ERR_INVALID_PARAM; goto error_exit; } - if (!req_lib_votequorum_setvotes->nodeid) - req_lib_votequorum_setvotes->nodeid = corosync_api->totem_nodeid_get(); + if (!nodeid) + nodeid = corosync_api->totem_nodeid_get(); - quorum_exec_send_reconfigure(RECONFIG_PARAM_NODE_VOTES, req_lib_votequorum_setvotes->nodeid, req_lib_votequorum_setvotes->votes); + quorum_exec_send_reconfigure(RECONFIG_PARAM_NODE_VOTES, nodeid, + req_lib_votequorum_setvotes->votes); error_exit: /* send status */ res_lib_votequorum_status.header.size = sizeof(res_lib_votequorum_status); res_lib_votequorum_status.header.id = MESSAGE_RES_VOTEQUORUM_STATUS; res_lib_votequorum_status.header.error = error; corosync_api->ipc_response_send(conn, &res_lib_votequorum_status, sizeof(res_lib_votequorum_status)); LEAVE(); } -static void message_handler_req_lib_votequorum_leaving (void *conn, void *message) +static void message_handler_req_lib_votequorum_leaving (void *conn, const void *message) { struct res_lib_votequorum_status res_lib_votequorum_status; cs_error_t error = CS_OK; ENTER(); quorum_exec_send_reconfigure(RECONFIG_PARAM_LEAVING, us->node_id, 1); /* * If we don't shut down in a sensible amount of time then cancel the * leave status. */ if (leaving_timeout) corosync_api->timer_add_duration((unsigned long long)leaving_timeout*1000000, NULL, leaving_timer_fn, &leaving_timer); /* send status */ res_lib_votequorum_status.header.size = sizeof(res_lib_votequorum_status); res_lib_votequorum_status.header.id = MESSAGE_RES_VOTEQUORUM_STATUS; res_lib_votequorum_status.header.error = error; corosync_api->ipc_response_send(conn, &res_lib_votequorum_status, sizeof(res_lib_votequorum_status)); LEAVE(); } static void quorum_device_timer_fn(void *arg) { struct timeval now; ENTER(); if (!quorum_device || quorum_device->state == NODESTATE_DEAD) return; gettimeofday(&now, NULL); if (quorum_device->last_hello.tv_sec + quorumdev_poll/1000 < now.tv_sec) { quorum_device->state = NODESTATE_DEAD; log_printf(LOG_INFO, "lost contact with quorum device\n"); recalculate_quorum(0, 0); } else { corosync_api->timer_add_duration((unsigned long long)quorumdev_poll*1000000, quorum_device, quorum_device_timer_fn, &quorum_device_timer); } LEAVE(); } -static void message_handler_req_lib_votequorum_qdisk_register (void *conn, void *message) +static void message_handler_req_lib_votequorum_qdisk_register (void *conn, + const void *message) { - struct req_lib_votequorum_qdisk_register *req_lib_votequorum_qdisk_register = (struct req_lib_votequorum_qdisk_register *)message; + const struct req_lib_votequorum_qdisk_register + *req_lib_votequorum_qdisk_register = message; struct res_lib_votequorum_status res_lib_votequorum_status; cs_error_t error = CS_OK; ENTER(); if (quorum_device) { error = CS_ERR_EXIST; } else { quorum_device = allocate_node(0); quorum_device->state = NODESTATE_DEAD; quorum_device->votes = req_lib_votequorum_qdisk_register->votes; strcpy(quorum_device_name, req_lib_votequorum_qdisk_register->name); list_add(&quorum_device->list, &cluster_members_list); } /* send status */ res_lib_votequorum_status.header.size = sizeof(res_lib_votequorum_status); res_lib_votequorum_status.header.id = MESSAGE_RES_VOTEQUORUM_STATUS; res_lib_votequorum_status.header.error = error; corosync_api->ipc_response_send(conn, &res_lib_votequorum_status, sizeof(res_lib_votequorum_status)); LEAVE(); } -static void message_handler_req_lib_votequorum_qdisk_unregister (void *conn, void *message) +static void message_handler_req_lib_votequorum_qdisk_unregister (void *conn, + const void *message) { struct res_lib_votequorum_status res_lib_votequorum_status; cs_error_t error = CS_OK; ENTER(); if (quorum_device) { struct cluster_node *node = quorum_device; quorum_device = NULL; list_del(&node->list); free(node); recalculate_quorum(0, 0); } else { error = CS_ERR_NOT_EXIST; } /* send status */ res_lib_votequorum_status.header.size = sizeof(res_lib_votequorum_status); res_lib_votequorum_status.header.id = MESSAGE_RES_VOTEQUORUM_STATUS; res_lib_votequorum_status.header.error = error; corosync_api->ipc_response_send(conn, &res_lib_votequorum_status, sizeof(res_lib_votequorum_status)); LEAVE(); } -static void message_handler_req_lib_votequorum_qdisk_poll (void *conn, void *message) +static void message_handler_req_lib_votequorum_qdisk_poll (void *conn, + const void *message) { - struct req_lib_votequorum_qdisk_poll *req_lib_votequorum_qdisk_poll = (struct req_lib_votequorum_qdisk_poll *)message; + const struct req_lib_votequorum_qdisk_poll + *req_lib_votequorum_qdisk_poll = message; struct res_lib_votequorum_status res_lib_votequorum_status; cs_error_t error = CS_OK; ENTER(); if (quorum_device) { if (req_lib_votequorum_qdisk_poll->state) { gettimeofday(&quorum_device->last_hello, NULL); if (quorum_device->state == NODESTATE_DEAD) { quorum_device->state = NODESTATE_MEMBER; recalculate_quorum(0, 0); corosync_api->timer_add_duration((unsigned long long)quorumdev_poll*1000000, quorum_device, quorum_device_timer_fn, &quorum_device_timer); } } else { if (quorum_device->state == NODESTATE_MEMBER) { quorum_device->state = NODESTATE_DEAD; recalculate_quorum(0, 0); corosync_api->timer_delete(quorum_device_timer); } } } else { error = CS_ERR_NOT_EXIST; } /* send status */ res_lib_votequorum_status.header.size = sizeof(res_lib_votequorum_status); res_lib_votequorum_status.header.id = MESSAGE_RES_VOTEQUORUM_STATUS; res_lib_votequorum_status.header.error = error; corosync_api->ipc_response_send(conn, &res_lib_votequorum_status, sizeof(res_lib_votequorum_status)); LEAVE(); } -static void message_handler_req_lib_votequorum_qdisk_getinfo (void *conn, void *message) +static void message_handler_req_lib_votequorum_qdisk_getinfo (void *conn, + const void *message) { struct res_lib_votequorum_qdisk_getinfo res_lib_votequorum_qdisk_getinfo; cs_error_t error = CS_OK; ENTER(); if (quorum_device) { log_printf(LOG_LEVEL_DEBUG, "got qdisk_getinfo state %d\n", quorum_device->state); res_lib_votequorum_qdisk_getinfo.votes = quorum_device->votes; if (quorum_device->state == NODESTATE_MEMBER) res_lib_votequorum_qdisk_getinfo.state = 1; else res_lib_votequorum_qdisk_getinfo.state = 0; strcpy(res_lib_votequorum_qdisk_getinfo.name, quorum_device_name); } else { error = CS_ERR_NOT_EXIST; } /* send status */ res_lib_votequorum_qdisk_getinfo.header.size = sizeof(res_lib_votequorum_qdisk_getinfo); res_lib_votequorum_qdisk_getinfo.header.id = MESSAGE_RES_VOTEQUORUM_GETINFO; res_lib_votequorum_qdisk_getinfo.header.error = error; corosync_api->ipc_response_send(conn, &res_lib_votequorum_qdisk_getinfo, sizeof(res_lib_votequorum_qdisk_getinfo)); LEAVE(); } -static void message_handler_req_lib_votequorum_setstate (void *conn, void *message) +static void message_handler_req_lib_votequorum_setstate (void *conn, + const void *message) { struct res_lib_votequorum_status res_lib_votequorum_status; cs_error_t error = CS_OK; ENTER(); us->flags |= NODE_FLAGS_HASSTATE; /* send status */ res_lib_votequorum_status.header.size = sizeof(res_lib_votequorum_status); res_lib_votequorum_status.header.id = MESSAGE_RES_VOTEQUORUM_STATUS; res_lib_votequorum_status.header.error = error; corosync_api->ipc_response_send(conn, &res_lib_votequorum_status, sizeof(res_lib_votequorum_status)); LEAVE(); } -static void message_handler_req_lib_votequorum_trackstart (void *conn, void *msg) +static void message_handler_req_lib_votequorum_trackstart (void *conn, + const void *msg) { - struct req_lib_votequorum_trackstart *req_lib_votequorum_trackstart = (struct req_lib_votequorum_trackstart *)msg; + const struct req_lib_votequorum_trackstart + *req_lib_votequorum_trackstart = msg; struct res_lib_votequorum_status res_lib_votequorum_status; struct quorum_pd *quorum_pd = (struct quorum_pd *)corosync_api->ipc_private_data_get (conn); ENTER(); /* * If an immediate listing of the current cluster membership * is requested, generate membership list */ if (req_lib_votequorum_trackstart->track_flags & CS_TRACK_CURRENT || req_lib_votequorum_trackstart->track_flags & CS_TRACK_CHANGES) { log_printf(LOG_LEVEL_DEBUG, "sending initial status to %p\n", conn); send_quorum_notification(conn, req_lib_votequorum_trackstart->context); } /* * Record requests for tracking */ if (req_lib_votequorum_trackstart->track_flags & CS_TRACK_CHANGES || req_lib_votequorum_trackstart->track_flags & CS_TRACK_CHANGES_ONLY) { quorum_pd->track_flags = req_lib_votequorum_trackstart->track_flags; quorum_pd->tracking_enabled = 1; quorum_pd->tracking_context = req_lib_votequorum_trackstart->context; list_add (&quorum_pd->list, &trackers_list); } /* Send status */ res_lib_votequorum_status.header.size = sizeof(res_lib_votequorum_status); res_lib_votequorum_status.header.id = MESSAGE_RES_VOTEQUORUM_STATUS; res_lib_votequorum_status.header.error = CS_OK; corosync_api->ipc_response_send(conn, &res_lib_votequorum_status, sizeof(res_lib_votequorum_status)); LEAVE(); } -static void message_handler_req_lib_votequorum_trackstop (void *conn, void *msg) +static void message_handler_req_lib_votequorum_trackstop (void *conn, + const void *msg) { struct res_lib_votequorum_status res_lib_votequorum_status; struct quorum_pd *quorum_pd = (struct quorum_pd *)corosync_api->ipc_private_data_get (conn); int error = CS_OK; ENTER(); if (quorum_pd->tracking_enabled) { error = CS_OK; quorum_pd->tracking_enabled = 0; list_del (&quorum_pd->list); list_init (&quorum_pd->list); } else { error = CS_ERR_NOT_EXIST; } /* send status */ res_lib_votequorum_status.header.size = sizeof(res_lib_votequorum_status); res_lib_votequorum_status.header.id = MESSAGE_RES_VOTEQUORUM_STATUS; res_lib_votequorum_status.header.error = error; corosync_api->ipc_response_send(conn, &res_lib_votequorum_status, sizeof(res_lib_votequorum_status)); LEAVE(); } static const char *kill_reason(int reason) { static char msg[1024]; switch (reason) { case VOTEQUORUM_REASON_KILL_REJECTED: return "our membership application was rejected"; case VOTEQUORUM_REASON_KILL_APPLICATION: return "we were killed by an application request"; case VOTEQUORUM_REASON_KILL_REJOIN: return "we rejoined the cluster without a full restart"; default: sprintf(msg, "we got kill message number %d", reason); return msg; } } static void reread_config(hdb_handle_t object_handle) { unsigned int old_votes; unsigned int old_expected; old_votes = us->votes; old_expected = us->expected_votes; /* * Reload the configuration */ read_quorum_config(object_handle); /* * Check for fundamental changes that we need to propogate */ if (old_votes != us->votes) { quorum_exec_send_reconfigure(RECONFIG_PARAM_NODE_VOTES, us->node_id, us->votes); } if (old_expected != us->expected_votes) { quorum_exec_send_reconfigure(RECONFIG_PARAM_EXPECTED_VOTES, us->node_id, us->expected_votes); } } static void quorum_key_change_notify(object_change_type_t change_type, hdb_handle_t parent_object_handle, hdb_handle_t object_handle, const void *object_name_pt, size_t object_name_len, const void *key_name_pt, size_t key_len, const void *key_value_pt, size_t key_value_len, void *priv_data_pt) { if (memcmp(object_name_pt, "quorum", object_name_len) == 0) reread_config(object_handle); } /* Called when the objdb is reloaded */ static void votequorum_objdb_reload_notify( objdb_reload_notify_type_t type, int flush, void *priv_data_pt) { /* * A new quorum {} key might exist, cancel the * existing notification at the start of reload, * and start a new one on the new object when * it's all settled. */ if (type == OBJDB_RELOAD_NOTIFY_START) { corosync_api->object_track_stop( quorum_key_change_notify, NULL, NULL, NULL, NULL); } if (type == OBJDB_RELOAD_NOTIFY_END || type == OBJDB_RELOAD_NOTIFY_FAILED) { hdb_handle_t find_handle; hdb_handle_t object_handle; corosync_api->object_find_create(OBJECT_PARENT_HANDLE, "quorum", strlen("quorum"), &find_handle); if (corosync_api->object_find_next(find_handle, &object_handle) == 0) { add_votequorum_config_notification(object_handle); reread_config(object_handle); } else { log_printf(LOG_LEVEL_ERROR, "votequorum objdb tracking stopped, cannot find quorum{} handle in objdb\n"); } } } static void add_votequorum_config_notification( hdb_handle_t quorum_object_handle) { corosync_api->object_track_start(quorum_object_handle, 1, quorum_key_change_notify, NULL, NULL, NULL, NULL); /* * Reload notify must be on the parent object */ corosync_api->object_track_start(OBJECT_PARENT_HANDLE, 1, NULL, NULL, NULL, votequorum_objdb_reload_notify, NULL); }