diff --git a/exec/sync.c b/exec/sync.c index bfafd5ac..7eb4c74a 100644 --- a/exec/sync.c +++ b/exec/sync.c @@ -1,632 +1,630 @@ /* * Copyright (c) 2009-2012 Red Hat, Inc. * * All rights reserved. * * Author: Steven Dake (sdake@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "schedwrk.h" #include "quorum.h" #include "sync.h" #include "main.h" LOGSYS_DECLARE_SUBSYS ("SYNC"); #define MESSAGE_REQ_SYNC_BARRIER 0 #define MESSAGE_REQ_SYNC_SERVICE_BUILD 1 #define MESSAGE_REQ_SYNC_MEMB_DETERMINE 2 enum sync_process_state { INIT, PROCESS, ACTIVATE }; enum sync_state { SYNC_SERVICELIST_BUILD, SYNC_PROCESS, SYNC_BARRIER }; struct service_entry { int service_id; void (*sync_init) ( const unsigned int *trans_list, size_t trans_list_entries, const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id); void (*sync_abort) (void); int (*sync_process) (void); void (*sync_activate) (void); enum sync_process_state state; char name[128]; }; struct processor_entry { int nodeid; int received; }; struct req_exec_memb_determine_message { struct qb_ipc_request_header header __attribute__((aligned(8))); struct memb_ring_id ring_id __attribute__((aligned(8))); }; struct req_exec_service_build_message { struct qb_ipc_request_header header __attribute__((aligned(8))); struct memb_ring_id ring_id __attribute__((aligned(8))); int service_list_entries __attribute__((aligned(8))); int service_list[128] __attribute__((aligned(8))); }; struct req_exec_barrier_message { struct qb_ipc_request_header header __attribute__((aligned(8))); struct memb_ring_id ring_id __attribute__((aligned(8))); }; static enum sync_state my_state = SYNC_BARRIER; static struct memb_ring_id my_ring_id; static struct memb_ring_id my_memb_determine_ring_id; static int my_memb_determine = 0; static unsigned int my_memb_determine_list[PROCESSOR_COUNT_MAX]; static unsigned int my_memb_determine_list_entries = 0; static int my_processing_idx = 0; static hdb_handle_t my_schedwrk_handle; static struct processor_entry my_processor_list[PROCESSOR_COUNT_MAX]; static unsigned int my_member_list[PROCESSOR_COUNT_MAX]; static unsigned int my_trans_list[PROCESSOR_COUNT_MAX]; static size_t my_member_list_entries = 0; static size_t my_trans_list_entries = 0; static int my_processor_list_entries = 0; static struct service_entry my_service_list[SERVICES_COUNT_MAX]; static int my_service_list_entries = 0; -static const struct memb_ring_id sync_ring_id; - static void (*sync_synchronization_completed) (void); static void sync_deliver_fn ( unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required); static int schedwrk_processor (const void *context); static void sync_process_enter (void); static struct totempg_group sync_group = { .group = "sync", .group_len = 4 }; static void *sync_group_handle; int (*my_sync_callbacks_retrieve) ( int service_id, struct sync_callbacks *callbacks); int sync_init ( int (*sync_callbacks_retrieve) ( int service_id, struct sync_callbacks *callbacks), void (*synchronization_completed) (void)) { unsigned int res; res = totempg_groups_initialize ( &sync_group_handle, sync_deliver_fn, NULL); if (res == -1) { log_printf (LOGSYS_LEVEL_ERROR, "Couldn't initialize groups interface."); return (-1); } res = totempg_groups_join ( sync_group_handle, &sync_group, 1); if (res == -1) { log_printf (LOGSYS_LEVEL_ERROR, "Couldn't join group."); return (-1); } sync_synchronization_completed = synchronization_completed; my_sync_callbacks_retrieve = sync_callbacks_retrieve; return (0); } static void sync_barrier_handler (unsigned int nodeid, const void *msg) { const struct req_exec_barrier_message *req_exec_barrier_message = msg; int i; int barrier_reached = 1; if (memcmp (&my_ring_id, &req_exec_barrier_message->ring_id, sizeof (struct memb_ring_id)) != 0) { log_printf (LOGSYS_LEVEL_DEBUG, "barrier for old ring - discarding"); return; } for (i = 0; i < my_processor_list_entries; i++) { if (my_processor_list[i].nodeid == nodeid) { my_processor_list[i].received = 1; } } for (i = 0; i < my_processor_list_entries; i++) { if (my_processor_list[i].received == 0) { barrier_reached = 0; } } if (barrier_reached) { log_printf (LOGSYS_LEVEL_DEBUG, "Committing synchronization for %s", my_service_list[my_processing_idx].name); my_service_list[my_processing_idx].state = ACTIVATE; if (my_sync_callbacks_retrieve(my_service_list[my_processing_idx].service_id, NULL) != -1) { my_service_list[my_processing_idx].sync_activate (); } my_processing_idx += 1; if (my_service_list_entries == my_processing_idx) { my_memb_determine_list_entries = 0; sync_synchronization_completed (); } else { sync_process_enter (); } } } static void dummy_sync_init ( const unsigned int *trans_list, size_t trans_list_entries, const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id) { } static void dummy_sync_abort (void) { } static int dummy_sync_process (void) { return (0); } static void dummy_sync_activate (void) { } static int service_entry_compare (const void *a, const void *b) { const struct service_entry *service_entry_a = a; const struct service_entry *service_entry_b = b; return (service_entry_a->service_id > service_entry_b->service_id); } static void sync_memb_determine (unsigned int nodeid, const void *msg) { const struct req_exec_memb_determine_message *req_exec_memb_determine_message = msg; int found = 0; int i; if (memcmp (&req_exec_memb_determine_message->ring_id, &my_memb_determine_ring_id, sizeof (struct memb_ring_id)) != 0) { log_printf (LOGSYS_LEVEL_DEBUG, "memb determine for old ring - discarding"); return; } my_memb_determine = 1; for (i = 0; i < my_memb_determine_list_entries; i++) { if (my_memb_determine_list[i] == nodeid) { found = 1; } } if (found == 0) { my_memb_determine_list[my_memb_determine_list_entries] = nodeid; my_memb_determine_list_entries += 1; } } static void sync_service_build_handler (unsigned int nodeid, const void *msg) { const struct req_exec_service_build_message *req_exec_service_build_message = msg; int i, j; int barrier_reached = 1; int found; int qsort_trigger = 0; if (memcmp (&my_ring_id, &req_exec_service_build_message->ring_id, sizeof (struct memb_ring_id)) != 0) { log_printf (LOGSYS_LEVEL_DEBUG, "service build for old ring - discarding"); return; } for (i = 0; i < req_exec_service_build_message->service_list_entries; i++) { found = 0; for (j = 0; j < my_service_list_entries; j++) { if (req_exec_service_build_message->service_list[i] == my_service_list[j].service_id) { found = 1; break; } } if (found == 0) { my_service_list[my_service_list_entries].state = INIT; my_service_list[my_service_list_entries].service_id = req_exec_service_build_message->service_list[i]; sprintf (my_service_list[my_service_list_entries].name, "Uknown External Service (id = %d)\n", req_exec_service_build_message->service_list[i]); my_service_list[my_service_list_entries].sync_init = dummy_sync_init; my_service_list[my_service_list_entries].sync_abort = dummy_sync_abort; my_service_list[my_service_list_entries].sync_process = dummy_sync_process; my_service_list[my_service_list_entries].sync_activate = dummy_sync_activate; my_service_list_entries += 1; qsort_trigger = 1; } } if (qsort_trigger) { qsort (my_service_list, my_service_list_entries, sizeof (struct service_entry), service_entry_compare); } for (i = 0; i < my_processor_list_entries; i++) { if (my_processor_list[i].nodeid == nodeid) { my_processor_list[i].received = 1; } } for (i = 0; i < my_processor_list_entries; i++) { if (my_processor_list[i].received == 0) { barrier_reached = 0; } } if (barrier_reached) { sync_process_enter (); } } static void sync_deliver_fn ( unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required) { struct qb_ipc_request_header *header = (struct qb_ipc_request_header *)msg; switch (header->id) { case MESSAGE_REQ_SYNC_BARRIER: sync_barrier_handler (nodeid, msg); break; case MESSAGE_REQ_SYNC_SERVICE_BUILD: sync_service_build_handler (nodeid, msg); break; case MESSAGE_REQ_SYNC_MEMB_DETERMINE: sync_memb_determine (nodeid, msg); break; } } static void memb_determine_message_transmit (void) { struct iovec iovec; struct req_exec_memb_determine_message req_exec_memb_determine_message; req_exec_memb_determine_message.header.size = sizeof (struct req_exec_memb_determine_message); req_exec_memb_determine_message.header.id = MESSAGE_REQ_SYNC_MEMB_DETERMINE; memcpy (&req_exec_memb_determine_message.ring_id, &my_memb_determine_ring_id, sizeof (struct memb_ring_id)); iovec.iov_base = (char *)&req_exec_memb_determine_message; iovec.iov_len = sizeof (req_exec_memb_determine_message); (void)totempg_groups_mcast_joined (sync_group_handle, &iovec, 1, TOTEMPG_AGREED); } static void barrier_message_transmit (void) { struct iovec iovec; struct req_exec_barrier_message req_exec_barrier_message; req_exec_barrier_message.header.size = sizeof (struct req_exec_barrier_message); req_exec_barrier_message.header.id = MESSAGE_REQ_SYNC_BARRIER; memcpy (&req_exec_barrier_message.ring_id, &my_ring_id, sizeof (struct memb_ring_id)); iovec.iov_base = (char *)&req_exec_barrier_message; iovec.iov_len = sizeof (req_exec_barrier_message); (void)totempg_groups_mcast_joined (sync_group_handle, &iovec, 1, TOTEMPG_AGREED); } static void service_build_message_transmit (struct req_exec_service_build_message *service_build_message) { struct iovec iovec; service_build_message->header.size = sizeof (struct req_exec_service_build_message); service_build_message->header.id = MESSAGE_REQ_SYNC_SERVICE_BUILD; memcpy (&service_build_message->ring_id, &my_ring_id, sizeof (struct memb_ring_id)); iovec.iov_base = (void *)service_build_message; iovec.iov_len = sizeof (struct req_exec_service_build_message); (void)totempg_groups_mcast_joined (sync_group_handle, &iovec, 1, TOTEMPG_AGREED); } static void sync_barrier_enter (void) { my_state = SYNC_BARRIER; barrier_message_transmit (); } static void sync_process_enter (void) { int i; my_state = SYNC_PROCESS; /* * No sync services */ if (my_service_list_entries == 0) { my_state = SYNC_SERVICELIST_BUILD; my_memb_determine_list_entries = 0; sync_synchronization_completed (); return; } for (i = 0; i < my_processor_list_entries; i++) { my_processor_list[i].received = 0; } schedwrk_create (&my_schedwrk_handle, schedwrk_processor, NULL); } static void sync_servicelist_build_enter ( const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id) { struct req_exec_service_build_message service_build; int i; int res; struct sync_callbacks sync_callbacks; my_state = SYNC_SERVICELIST_BUILD; for (i = 0; i < member_list_entries; i++) { my_processor_list[i].nodeid = member_list[i]; my_processor_list[i].received = 0; } my_processor_list_entries = member_list_entries; memcpy (my_member_list, member_list, member_list_entries * sizeof (unsigned int)); my_member_list_entries = member_list_entries; my_processing_idx = 0; memset(my_service_list, 0, sizeof (struct service_entry) * SERVICES_COUNT_MAX); my_service_list_entries = 0; for (i = 0; i < SERVICES_COUNT_MAX; i++) { res = my_sync_callbacks_retrieve (i, &sync_callbacks); if (res == -1) { continue; } if (sync_callbacks.sync_init == NULL) { continue; } my_service_list[my_service_list_entries].state = INIT; my_service_list[my_service_list_entries].service_id = i; strcpy (my_service_list[my_service_list_entries].name, sync_callbacks.name); my_service_list[my_service_list_entries].sync_init = sync_callbacks.sync_init; my_service_list[my_service_list_entries].sync_process = sync_callbacks.sync_process; my_service_list[my_service_list_entries].sync_abort = sync_callbacks.sync_abort; my_service_list[my_service_list_entries].sync_activate = sync_callbacks.sync_activate; my_service_list_entries += 1; } for (i = 0; i < my_service_list_entries; i++) { service_build.service_list[i] = my_service_list[i].service_id; } service_build.service_list_entries = my_service_list_entries; service_build_message_transmit (&service_build); } static int schedwrk_processor (const void *context) { int res = 0; if (my_service_list[my_processing_idx].state == INIT) { unsigned int old_trans_list[PROCESSOR_COUNT_MAX]; size_t old_trans_list_entries = 0; int o, m; my_service_list[my_processing_idx].state = PROCESS; memcpy (old_trans_list, my_trans_list, my_trans_list_entries * sizeof (unsigned int)); old_trans_list_entries = my_trans_list_entries; my_trans_list_entries = 0; for (o = 0; o < old_trans_list_entries; o++) { for (m = 0; m < my_member_list_entries; m++) { if (old_trans_list[o] == my_member_list[m]) { my_trans_list[my_trans_list_entries] = my_member_list[m]; my_trans_list_entries++; break; } } } if (my_sync_callbacks_retrieve(my_service_list[my_processing_idx].service_id, NULL) != -1) { my_service_list[my_processing_idx].sync_init (my_trans_list, my_trans_list_entries, my_member_list, my_member_list_entries, &my_ring_id); } } if (my_service_list[my_processing_idx].state == PROCESS) { my_service_list[my_processing_idx].state = PROCESS; if (my_sync_callbacks_retrieve(my_service_list[my_processing_idx].service_id, NULL) != -1) { res = my_service_list[my_processing_idx].sync_process (); } else { res = 0; } if (res == 0) { sync_barrier_enter(); } else { return (-1); } } return (0); } void sync_start ( const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id) { ENTER(); memcpy (&my_ring_id, ring_id, sizeof (struct memb_ring_id)); if (my_memb_determine) { my_memb_determine = 0; sync_servicelist_build_enter (my_memb_determine_list, my_memb_determine_list_entries, ring_id); } else { sync_servicelist_build_enter (member_list, member_list_entries, ring_id); } } void sync_save_transitional ( const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id) { ENTER(); memcpy (my_trans_list, member_list, member_list_entries * sizeof (unsigned int)); my_trans_list_entries = member_list_entries; } void sync_abort (void) { ENTER(); if (my_state == SYNC_PROCESS) { schedwrk_destroy (my_schedwrk_handle); if (my_sync_callbacks_retrieve(my_service_list[my_processing_idx].service_id, NULL) != -1) { my_service_list[my_processing_idx].sync_abort (); } } /* this will cause any "old" barrier messages from causing * problems. */ memset (&my_ring_id, 0, sizeof (struct memb_ring_id)); } void sync_memb_list_determine (const struct memb_ring_id *ring_id) { ENTER(); memcpy (&my_memb_determine_ring_id, ring_id, sizeof (struct memb_ring_id)); memb_determine_message_transmit (); } void sync_memb_list_abort (void) { ENTER(); my_memb_determine_list_entries = 0; memset (&my_memb_determine_ring_id, 0, sizeof (struct memb_ring_id)); } diff --git a/test/testcpg.c b/test/testcpg.c index 5aecc130..3316f7b4 100644 --- a/test/testcpg.c +++ b/test/testcpg.c @@ -1,442 +1,438 @@ /* * Copyright (c) 2006-2009 Red Hat Inc * * All rights reserved. * * Author: Christine Caulfield * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #ifdef QBLOG #include #endif #ifndef HOST_NAME_MAX #define HOST_NAME_MAX _POSIX_HOST_NAME_MAX #endif static int quit = 0; static int show_ip = 0; static int restart = 0; static uint32_t nodeidStart = 0; static void print_localnodeid(cpg_handle_t handle); static void print_cpgname (const struct cpg_name *name) { unsigned int i; for (i = 0; i < name->length; i++) { printf ("%c", name->value[i]); } } static char * node_pid_format(unsigned int nodeid, unsigned int pid) { static char buffer[100]; if (show_ip) { struct in_addr saddr; #if __BYTE_ORDER == __LITTLE_ENDIAN saddr.s_addr = swab32(nodeid); #else saddr.s_addr = nodeid; #endif sprintf(buffer, "node/pid %s/%d", inet_ntoa(saddr),pid); } else { sprintf(buffer, "node/pid %d/%d", nodeid, pid); } return buffer; } static void print_time(void) { #define MAXLEN (256) char buf[MAXLEN]; char hostname[HOST_NAME_MAX]; struct timeval tnow; time_t t; size_t len; char *s = buf; len = sizeof(hostname); if(gethostname(hostname, len) == 0) { char *longName; hostname[len-1] = '\0'; longName = hostname; if( (longName = strstr( hostname, "." )) != NULL ) *longName = '\0'; } strcpy(s, hostname); s += strlen(hostname); s += snprintf(s, sizeof(buf)-(s-buf), ":%d", getpid()); t = time(0); gettimeofday( &tnow, 0 ); s += strftime(s, sizeof(buf)-(s-buf) , " %Y-%m-%d %T", localtime(&t)); s += snprintf(s, sizeof(buf)-(s-buf), ".%03ld", tnow.tv_usec/1000); assert(s-buf < (int)sizeof(buf)); printf("%s\n", buf); } static void DeliverCallback ( cpg_handle_t handle, const struct cpg_name *groupName, uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len) { print_time(); printf("DeliverCallback: message (len=%lu)from %s: '%s'\n", (unsigned long int) msg_len, node_pid_format(nodeid, pid), (const char *)msg); } static void ConfchgCallback ( cpg_handle_t handle, const struct cpg_name *groupName, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries) { unsigned int i; int result; uint32_t nodeid; print_time(); printf("ConfchgCallback: group '"); print_cpgname(groupName); printf("'\n"); print_localnodeid(handle); for (i=0; isin_addr.s_addr = nodeid; if(inet_ntop(AF_INET, (const void *)&v4addr->sin_addr.s_addr, addrStr, (socklen_t)sizeof(addrStr)) == NULL) { addrStr[0] = 0; } printf ("Local node id is %s/%x result %d\n", addrStr, nodeid, result); } } int main (int argc, char *argv[]) { cpg_handle_t handle; fd_set read_fds; int select_fd; int result; int retries; const char *options = "i"; int opt; unsigned int nodeid; char *fgets_res; struct cpg_address member_list[64]; int member_list_entries; int i; int recnt; int doexit; const char *exitStr = "EXIT"; doexit = 0; #ifdef QBLOG qb_log_init("testcpg", LOG_USER, LOG_ERR); qb_log_ctl(QB_LOG_SYSLOG, QB_LOG_CONF_ENABLED, QB_FALSE); qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD, QB_LOG_FILTER_FILE, "*", LOG_TRACE); qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE); qb_log_format_set(QB_LOG_STDERR, "[%p] %f %b"); #endif while ( (opt = getopt(argc, argv, options)) != -1 ) { switch (opt) { case 'i': show_ip = 1; break; } } if (argc > optind) { if (strlen(argv[optind]) >= CPG_MAX_NAME_LENGTH) { fprintf(stderr, "Invalid name for cpg group\n"); return (1); } strcpy(group_name.value, argv[optind]); group_name.length = strlen(argv[optind]); } else { strcpy(group_name.value, "GROUP"); group_name.length = 6; } recnt = 0; printf ("Type %s to finish\n", exitStr); restart = 1; do { if(restart) { restart = 0; retries = 0; cs_repeat_init(retries, 30, result = cpg_model_initialize (&handle, CPG_MODEL_V1, (cpg_model_data_t *)&model_data, NULL)); if (result != CS_OK) { printf ("Could not initialize Cluster Process Group API instance error %d\n", result); retrybackoff(recnt); } retries = 0; cs_repeat(retries, 30, result = cpg_local_get(handle, &nodeid)); if (result != CS_OK) { printf ("Could not get local node id\n"); retrybackoff(recnt); } printf ("Local node id is %x\n", nodeid); nodeidStart = nodeid; retries = 0; cs_repeat(retries, 30, result = cpg_join(handle, &group_name)); if (result != CS_OK) { printf ("Could not join process group, error %d\n", result); retrybackoff(recnt); } retries = 0; cs_repeat(retries, 30, result = cpg_membership_get (handle, &group_name, (struct cpg_address *)&member_list, &member_list_entries)); if (result != CS_OK) { printf ("Could not get current membership list %d\n", result); retrybackoff(recnt); } recnt = 0; printf ("membership list\n"); for (i = 0; i < member_list_entries; i++) { printf ("node id %d pid %d\n", member_list[i].nodeid, member_list[i].pid); } FD_ZERO (&read_fds); cpg_fd_get(handle, &select_fd); } FD_SET (select_fd, &read_fds); FD_SET (STDIN_FILENO, &read_fds); result = select (select_fd + 1, &read_fds, 0, 0, 0); if (result == -1) { perror ("select\n"); } if (FD_ISSET (STDIN_FILENO, &read_fds)) { char inbuf[132]; struct iovec iov; fgets_res = fgets(inbuf, (int)sizeof(inbuf), stdin); if (fgets_res == NULL) { doexit = 1; cpg_leave(handle, &group_name); } if (strncmp(inbuf, exitStr, strlen(exitStr)) == 0) { doexit = 1; cpg_leave(handle, &group_name); } else { iov.iov_base = inbuf; iov.iov_len = strlen(inbuf)+1; cpg_mcast_joined(handle, CPG_TYPE_AGREED, &iov, 1); } } if (FD_ISSET (select_fd, &read_fds)) { if (cpg_dispatch (handle, CS_DISPATCH_ALL) != CS_OK) { if(doexit) { exit(1); } restart = 1; } } if(restart) { if(!doexit) { result = cpg_finalize (handle); printf ("Finalize+restart result is %d (should be 1)\n", result); continue; } } } while (result && !quit && !doexit); result = cpg_finalize (handle); printf ("Finalize result is %d (should be 1)\n", result); return (0); } diff --git a/test/testcpgzc.c b/test/testcpgzc.c index e4fdcccd..2a6a9a81 100644 --- a/test/testcpgzc.c +++ b/test/testcpgzc.c @@ -1,251 +1,247 @@ /* * Copyright (c) 2006-2009 Red Hat Inc * * All rights reserved. * * Author: Christine Caulfield * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define BUFFER_SIZE 8192 #define DEFAULT_GROUP_NAME "GROUP" static int quit = 0; static int show_ip = 0; static void print_cpgname (const struct cpg_name *name) { int i; for (i = 0; i < name->length; i++) { printf ("%c", name->value[i]); } } static void DeliverCallback ( cpg_handle_t handle, const struct cpg_name *groupName, uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len) { if (show_ip) { struct in_addr saddr; saddr.s_addr = nodeid; printf("DeliverCallback: message (len=%lu)from node/pid %s/%d: '%s'\n", (unsigned long int) msg_len, inet_ntoa(saddr), pid, (const char *)msg); } else { printf("DeliverCallback: message (len=%lu)from node/pid %d/%d: '%s'\n", (unsigned long int) msg_len, nodeid, pid, (const char *)msg); } } static void ConfchgCallback ( cpg_handle_t handle, const struct cpg_name *groupName, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries) { int i; struct in_addr saddr; printf("\nConfchgCallback: group '"); print_cpgname(groupName); printf("'\n"); for (i=0; i optind) { if (strlen(argv[optind]) >= CPG_MAX_NAME_LENGTH) { fprintf(stderr, "Invalid name for cpg group\n"); return (1); } strcpy(group_name.value, argv[optind]); group_name.length = strlen(argv[optind])+1; } else { strcpy(group_name.value, DEFAULT_GROUP_NAME); group_name.length = strlen(DEFAULT_GROUP_NAME) + 1; } result = cpg_initialize (&handle, &callbacks); if (result != CS_OK) { printf ("Could not initialize Cluster Process Group API instance error %d\n", result); exit (1); } cpg_zcb_alloc (handle, BUFFER_SIZE, &buffer); cpg_zcb_free (handle, buffer); cpg_zcb_alloc (handle, BUFFER_SIZE, &buffer); result = cpg_local_get (handle, &nodeid); if (result != CS_OK) { printf ("Could not get local node id\n"); exit (1); } printf ("Local node id is %x\n", nodeid); result = cpg_join(handle, &group_name); if (result != CS_OK) { printf ("Could not join process group, error %d\n", result); exit (1); } FD_ZERO (&read_fds); cpg_fd_get(handle, &select_fd); printf ("Type EXIT to finish\n"); do { FD_SET (select_fd, &read_fds); FD_SET (STDIN_FILENO, &read_fds); result = select (select_fd + 1, &read_fds, 0, 0, 0); if (result == -1) { perror ("select\n"); } if (FD_ISSET (STDIN_FILENO, &read_fds)) { fgets_res = fgets(buffer, BUFFER_SIZE, stdin); if (fgets_res == NULL) { cpg_leave(handle, &group_name); } if (strncmp(buffer, "EXIT", 4) == 0) { cpg_leave(handle, &group_name); } else { cpg_zcb_mcast_joined (handle, CPG_TYPE_AGREED, buffer, strlen (buffer) + 1); } } if (FD_ISSET (select_fd, &read_fds)) { if (cpg_dispatch (handle, CS_DISPATCH_ALL) != CS_OK) exit(1); } } while (result && !quit); result = cpg_finalize (handle); printf ("Finalize result is %d (should be 1)\n", result); return (0); } diff --git a/test/testzcgc.c b/test/testzcgc.c index 67714857..15ab56bb 100644 --- a/test/testzcgc.c +++ b/test/testzcgc.c @@ -1,178 +1,174 @@ /* * Copyright (c) 2006-2009 Red Hat Inc * * All rights reserved. * * Author: Christine Caulfield * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include static int quit = 0; static int show_ip = 0; static void print_cpgname (const struct cpg_name *name) { int i; for (i = 0; i < name->length; i++) { printf ("%c", name->value[i]); } } static void DeliverCallback ( cpg_handle_t handle, const struct cpg_name *groupName, uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len) { if (show_ip) { struct in_addr saddr; saddr.s_addr = nodeid; printf("DeliverCallback: message (len=%lu)from node/pid %s/%d: '%s'\n", (unsigned long int) msg_len, inet_ntoa(saddr), pid, (const char *)msg); } else { printf("DeliverCallback: message (len=%lu)from node/pid %d/%d: '%s'\n", (unsigned long int) msg_len, nodeid, pid, (const char *)msg); } } static void ConfchgCallback ( cpg_handle_t handle, const struct cpg_name *groupName, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries) { int i; struct in_addr saddr; printf("\nConfchgCallback: group '"); print_cpgname(groupName); printf("'\n"); for (i=0; i