diff --git a/exec/sync.c b/exec/sync.c index 06cb4564..962f341b 100644 --- a/exec/sync.c +++ b/exec/sync.c @@ -1,546 +1,549 @@ /* * Copyright (c) 2009-2012 Red Hat, Inc. * * All rights reserved. * * Author: Steven Dake (sdake@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "schedwrk.h" #include "quorum.h" #include "sync.h" #include "main.h" LOGSYS_DECLARE_SUBSYS ("SYNC"); #define MESSAGE_REQ_SYNC_BARRIER 0 #define MESSAGE_REQ_SYNC_SERVICE_BUILD 1 enum sync_process_state { PROCESS, ACTIVATE }; enum sync_state { SYNC_SERVICELIST_BUILD, SYNC_PROCESS, SYNC_BARRIER }; struct service_entry { int service_id; void (*sync_init) ( const unsigned int *trans_list, size_t trans_list_entries, const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id); void (*sync_abort) (void); int (*sync_process) (void); void (*sync_activate) (void); enum sync_process_state state; char name[128]; }; struct processor_entry { int nodeid; int received; }; struct req_exec_service_build_message { struct qb_ipc_request_header header __attribute__((aligned(8))); struct memb_ring_id ring_id __attribute__((aligned(8))); int service_list_entries __attribute__((aligned(8))); int service_list[128] __attribute__((aligned(8))); }; struct req_exec_barrier_message { struct qb_ipc_request_header header __attribute__((aligned(8))); struct memb_ring_id ring_id __attribute__((aligned(8))); }; static enum sync_state my_state = SYNC_BARRIER; static struct memb_ring_id my_ring_id; static int my_processing_idx = 0; static hdb_handle_t my_schedwrk_handle; static struct processor_entry my_processor_list[PROCESSOR_COUNT_MAX]; static unsigned int my_member_list[PROCESSOR_COUNT_MAX]; static unsigned int my_trans_list[PROCESSOR_COUNT_MAX]; static size_t my_member_list_entries = 0; static size_t my_trans_list_entries = 0; static int my_processor_list_entries = 0; static struct service_entry my_service_list[SERVICES_COUNT_MAX]; static int my_service_list_entries = 0; static void (*sync_synchronization_completed) (void); static void sync_deliver_fn ( unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required); static int schedwrk_processor (const void *context); static void sync_process_enter (void); static void sync_process_call_init (void); static struct totempg_group sync_group = { .group = "sync", .group_len = 4 }; static void *sync_group_handle; int (*my_sync_callbacks_retrieve) ( int service_id, struct sync_callbacks *callbacks); int sync_init ( int (*sync_callbacks_retrieve) ( int service_id, struct sync_callbacks *callbacks), void (*synchronization_completed) (void)) { unsigned int res; res = totempg_groups_initialize ( &sync_group_handle, sync_deliver_fn, NULL); if (res == -1) { log_printf (LOGSYS_LEVEL_ERROR, "Couldn't initialize groups interface."); return (-1); } res = totempg_groups_join ( sync_group_handle, &sync_group, 1); if (res == -1) { log_printf (LOGSYS_LEVEL_ERROR, "Couldn't join group."); return (-1); } sync_synchronization_completed = synchronization_completed; my_sync_callbacks_retrieve = sync_callbacks_retrieve; return (0); } static void sync_barrier_handler (unsigned int nodeid, const void *msg) { const struct req_exec_barrier_message *req_exec_barrier_message = msg; int i; int barrier_reached = 1; if (memcmp (&my_ring_id, &req_exec_barrier_message->ring_id, sizeof (struct memb_ring_id)) != 0) { log_printf (LOGSYS_LEVEL_DEBUG, "barrier for old ring - discarding"); return; } for (i = 0; i < my_processor_list_entries; i++) { if (my_processor_list[i].nodeid == nodeid) { my_processor_list[i].received = 1; } } for (i = 0; i < my_processor_list_entries; i++) { if (my_processor_list[i].received == 0) { barrier_reached = 0; } } if (barrier_reached) { log_printf (LOGSYS_LEVEL_DEBUG, "Committing synchronization for %s", my_service_list[my_processing_idx].name); my_service_list[my_processing_idx].state = ACTIVATE; if (my_sync_callbacks_retrieve(my_service_list[my_processing_idx].service_id, NULL) != -1) { my_service_list[my_processing_idx].sync_activate (); } my_processing_idx += 1; if (my_service_list_entries == my_processing_idx) { sync_synchronization_completed (); } else { sync_process_enter (); } } } static void dummy_sync_abort (void) { } static int dummy_sync_process (void) { return (0); } static void dummy_sync_activate (void) { } static int service_entry_compare (const void *a, const void *b) { const struct service_entry *service_entry_a = a; const struct service_entry *service_entry_b = b; return (service_entry_a->service_id > service_entry_b->service_id); } static void sync_service_build_handler (unsigned int nodeid, const void *msg) { const struct req_exec_service_build_message *req_exec_service_build_message = msg; int i, j; int barrier_reached = 1; int found; int qsort_trigger = 0; if (memcmp (&my_ring_id, &req_exec_service_build_message->ring_id, sizeof (struct memb_ring_id)) != 0) { log_printf (LOGSYS_LEVEL_DEBUG, "service build for old ring - discarding"); return; } for (i = 0; i < req_exec_service_build_message->service_list_entries; i++) { found = 0; for (j = 0; j < my_service_list_entries; j++) { if (req_exec_service_build_message->service_list[i] == my_service_list[j].service_id) { found = 1; break; } } if (found == 0) { my_service_list[my_service_list_entries].state = PROCESS; my_service_list[my_service_list_entries].service_id = req_exec_service_build_message->service_list[i]; sprintf (my_service_list[my_service_list_entries].name, "Unknown External Service (id = %d)\n", req_exec_service_build_message->service_list[i]); my_service_list[my_service_list_entries].sync_init = NULL; my_service_list[my_service_list_entries].sync_abort = dummy_sync_abort; my_service_list[my_service_list_entries].sync_process = dummy_sync_process; my_service_list[my_service_list_entries].sync_activate = dummy_sync_activate; my_service_list_entries += 1; qsort_trigger = 1; } } if (qsort_trigger) { qsort (my_service_list, my_service_list_entries, sizeof (struct service_entry), service_entry_compare); } for (i = 0; i < my_processor_list_entries; i++) { if (my_processor_list[i].nodeid == nodeid) { my_processor_list[i].received = 1; } } for (i = 0; i < my_processor_list_entries; i++) { if (my_processor_list[i].received == 0) { barrier_reached = 0; } } if (barrier_reached) { log_printf (LOGSYS_LEVEL_DEBUG, "enter sync process"); sync_process_enter (); } } static void sync_deliver_fn ( unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required) { struct qb_ipc_request_header *header = (struct qb_ipc_request_header *)msg; switch (header->id) { case MESSAGE_REQ_SYNC_BARRIER: sync_barrier_handler (nodeid, msg); break; case MESSAGE_REQ_SYNC_SERVICE_BUILD: sync_service_build_handler (nodeid, msg); break; } } static void barrier_message_transmit (void) { struct iovec iovec; struct req_exec_barrier_message req_exec_barrier_message; memset(&req_exec_barrier_message, 0, sizeof(req_exec_barrier_message)); req_exec_barrier_message.header.size = sizeof (struct req_exec_barrier_message); req_exec_barrier_message.header.id = MESSAGE_REQ_SYNC_BARRIER; memcpy (&req_exec_barrier_message.ring_id, &my_ring_id, sizeof (struct memb_ring_id)); iovec.iov_base = (char *)&req_exec_barrier_message; iovec.iov_len = sizeof (req_exec_barrier_message); (void)totempg_groups_mcast_joined (sync_group_handle, &iovec, 1, TOTEMPG_AGREED); } static void service_build_message_transmit (struct req_exec_service_build_message *service_build_message) { struct iovec iovec; service_build_message->header.size = sizeof (struct req_exec_service_build_message); service_build_message->header.id = MESSAGE_REQ_SYNC_SERVICE_BUILD; memcpy (&service_build_message->ring_id, &my_ring_id, sizeof (struct memb_ring_id)); iovec.iov_base = (void *)service_build_message; iovec.iov_len = sizeof (struct req_exec_service_build_message); (void)totempg_groups_mcast_joined (sync_group_handle, &iovec, 1, TOTEMPG_AGREED); } static void sync_barrier_enter (void) { my_state = SYNC_BARRIER; barrier_message_transmit (); } static void sync_process_call_init (void) { unsigned int old_trans_list[PROCESSOR_COUNT_MAX]; size_t old_trans_list_entries = 0; int o, m; int i; memcpy (old_trans_list, my_trans_list, my_trans_list_entries * sizeof (unsigned int)); old_trans_list_entries = my_trans_list_entries; my_trans_list_entries = 0; for (o = 0; o < old_trans_list_entries; o++) { for (m = 0; m < my_member_list_entries; m++) { if (old_trans_list[o] == my_member_list[m]) { my_trans_list[my_trans_list_entries] = my_member_list[m]; my_trans_list_entries++; break; } } } for (i = 0; i < my_service_list_entries; i++) { if (my_sync_callbacks_retrieve(my_service_list[i].service_id, NULL) != -1) { my_service_list[i].sync_init (my_trans_list, my_trans_list_entries, my_member_list, my_member_list_entries, &my_ring_id); } } } static void sync_process_enter (void) { int i; my_state = SYNC_PROCESS; /* * No sync services */ if (my_service_list_entries == 0) { my_state = SYNC_SERVICELIST_BUILD; sync_synchronization_completed (); return; } for (i = 0; i < my_processor_list_entries; i++) { my_processor_list[i].received = 0; } schedwrk_create (&my_schedwrk_handle, schedwrk_processor, NULL); } static void sync_servicelist_build_enter ( const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id) { struct req_exec_service_build_message service_build; int i; int res; struct sync_callbacks sync_callbacks; memset(&service_build, 0, sizeof(service_build)); my_state = SYNC_SERVICELIST_BUILD; for (i = 0; i < member_list_entries; i++) { my_processor_list[i].nodeid = member_list[i]; my_processor_list[i].received = 0; } my_processor_list_entries = member_list_entries; memcpy (my_member_list, member_list, member_list_entries * sizeof (unsigned int)); my_member_list_entries = member_list_entries; my_processing_idx = 0; memset(my_service_list, 0, sizeof (struct service_entry) * SERVICES_COUNT_MAX); my_service_list_entries = 0; for (i = 0; i < SERVICES_COUNT_MAX; i++) { res = my_sync_callbacks_retrieve (i, &sync_callbacks); if (res == -1) { continue; } if (sync_callbacks.sync_init == NULL) { continue; } my_service_list[my_service_list_entries].state = PROCESS; my_service_list[my_service_list_entries].service_id = i; + + assert(strlen(sync_callbacks.name) < sizeof(my_service_list[my_service_list_entries].name)); + strcpy (my_service_list[my_service_list_entries].name, sync_callbacks.name); my_service_list[my_service_list_entries].sync_init = sync_callbacks.sync_init; my_service_list[my_service_list_entries].sync_process = sync_callbacks.sync_process; my_service_list[my_service_list_entries].sync_abort = sync_callbacks.sync_abort; my_service_list[my_service_list_entries].sync_activate = sync_callbacks.sync_activate; my_service_list_entries += 1; } for (i = 0; i < my_service_list_entries; i++) { service_build.service_list[i] = my_service_list[i].service_id; } service_build.service_list_entries = my_service_list_entries; service_build_message_transmit (&service_build); log_printf (LOGSYS_LEVEL_DEBUG, "call init for locally known services"); sync_process_call_init (); } static int schedwrk_processor (const void *context) { int res = 0; if (my_service_list[my_processing_idx].state == PROCESS) { if (my_sync_callbacks_retrieve(my_service_list[my_processing_idx].service_id, NULL) != -1) { res = my_service_list[my_processing_idx].sync_process (); } else { res = 0; } if (res == 0) { sync_barrier_enter(); } else { return (-1); } } return (0); } void sync_start ( const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id) { ENTER(); memcpy (&my_ring_id, ring_id, sizeof (struct memb_ring_id)); sync_servicelist_build_enter (member_list, member_list_entries, ring_id); } void sync_save_transitional ( const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id) { ENTER(); memcpy (my_trans_list, member_list, member_list_entries * sizeof (unsigned int)); my_trans_list_entries = member_list_entries; } void sync_abort (void) { ENTER(); if (my_state == SYNC_PROCESS) { schedwrk_destroy (my_schedwrk_handle); if (my_sync_callbacks_retrieve(my_service_list[my_processing_idx].service_id, NULL) != -1) { my_service_list[my_processing_idx].sync_abort (); } } /* this will cause any "old" barrier messages from causing * problems. */ memset (&my_ring_id, 0, sizeof (struct memb_ring_id)); }