Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F1841718
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
69 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/exec/Makefile.am b/exec/Makefile.am
index 754dfe81..e1fd810c 100644
--- a/exec/Makefile.am
+++ b/exec/Makefile.am
@@ -1,166 +1,166 @@
# Copyright (c) 2009 Red Hat, Inc.
#
# Authors: Andrew Beekhof
# Steven Dake (sdake@redhat.com)
#
# This software licensed under BSD license, the text of which follows:
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# - Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# - Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# - Neither the name of the MontaVista Software, Inc. nor the names of its
# contributors may be used to endorse or promote products derived from this
# software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
# THE POSSIBILITY OF SUCH DAMAGE.
MAINTAINERCLEANFILES = Makefile.in
AM_CFLAGS = -fPIC
INCLUDES = -I$(top_builddir)/include -I$(top_srcdir)/include $(nss_CFLAGS)
TOTEM_SRC = coropoll.c totemip.c totemnet.c totemrrp.c \
totemsrp.c totemmrp.c totempg.c crypto.c wthread.c
LOGSYS_SRC = wthread.c logsys.c
COROIPCS_SRC = coroipcs.c
LCRSO_SRC = objdb.c vsf_ykd.c coroparse.c vsf_quorum.c
LCRSO_OBJS = $(LCRSO_SRC:%.c=%.o)
LCRSO = $(LCRSO_SRC:%.c=%.lcrso)
lib_LIBRARIES = libtotem_pg.a liblogsys.a libcoroipcs.a
sbin_PROGRAMS = corosync
libtotem_pg_a_SOURCES = $(TOTEM_SRC)
liblogsys_a_SOURCES = $(LOGSYS_SRC)
libcoroipcs_a_SOURCES = $(COROIPCS_SRC)
corosync_SOURCES = main.c util.c sync.c apidef.c service.c \
timer.c totemconfig.c mainconfig.c quorum.c schedwrk.c \
- ../lcr/lcr_ifact.c evil.c
+ ../lcr/lcr_ifact.c evil.c syncv2.c
corosync_LDADD = -ltotem_pg -llogsys -lcoroipcs
corosync_DEPENDENCIES = libtotem_pg.so.$(SONAME) liblogsys.so.$(SONAME) libcoroipcs.so.$(SONAME)
corosync_LDFLAGS = $(OS_DYFLAGS) -L./
TOTEM_OBJS = $(TOTEM_SRC:%.c=%.o)
LOGSYS_OBJS = $(LOGSYS_SRC:%.c=%.o)
COROIPCS_OBJS = $(COROIPCS_SRC:%.c=%.o)
SHARED_LIBS = $(lib_LIBRARIES:%.a=%.so.$(SONAME))
SHARED_LIBS_SO = $(SHARED_LIBS:%.so.$(SONAME)=%.so)
SHARED_LIBS_SO_TWO = $(SHARED_LIBS:%.so.$(SONAME)=%.so.$(SOMAJOR))
noinst_HEADERS = apidef.h crypto.h mainconfig.h main.h \
quorum.h service.h sync.h timer.h tlist.h totemconfig.h \
totemmrp.h totemnet.h totemrrp.h totemsrp.h util.h \
- version.h vsf.h wthread.h schedwrk.h evil.h
+ version.h vsf.h wthread.h schedwrk.h evil.h syncv2.h
EXTRA_DIST = $(LCRSO_SRC)
if BUILD_DARWIN
%.lcrso: %.o
$(CC) $(CFLAGS) -L$(top_builddir)/exec -llogsys -bundle -bind_at_load -bundle_loader ./corosync $^ -o $@
libtotem_pg.so.$(SONAME): $(TOTEM_OBJS)
$(CC) $(LDFLAGS) $(DARWIN_OPTS) $(TOTEM_OBJS) -o $@ -lpthread
ln -sf libtotem_pg.so.$(SONAME) libtotem_pg.so
ln -sf libtotem_pg.so.$(SONAME) libtotem_pg.so.$(SOMAJOR)
liblogsys.so.$(SONAME): $(LOGSYS_OBJS)
$(CC) $(LDFLAGS) $(DARWIN_OPTS) $(LOGSYS_OBJS) -o $@ -lpthread
ln -sf liblogsys.so.$(SONAME) liblogsys.so
ln -sf liblogsys.so.$(SONAME) liblogsys.so.$(SOMAJOR)
libcoroipcs.so.$(SONAME): $(COROIPCS_OBJS)
$(CC) $(LDFLAGS) $(DARWIN_OPTS) $(COROIPCS_OBJS) -o $@ -lpthread
ln -sf libcoroipcs.so.$(SONAME) libcoroipcs.so
ln -sf libcoroipcs.so.$(SONAME) libcoroipcs.so.$(SOMAJOR)
else
if BUILD_SOLARIS
%.lcrso: %.o
$(LD) -G $^ -o $@
libtotem_pg.so.$(SONAME): $(TOTEM_OBJS)
$(LD) -G $(TOTEM_OBJS) -o $@ -lpthread
ln -sf libtotem_pg.so.$(SONAME) libtotem_pg.so
ln -sf libtotem_pg.so.$(SONAME) libtotem_pg.so.$(SOMAJOR)
liblogsys.so.$(SONAME): $(LOGSYS_OBJS)
$(LD) -G $(LOGSYS_OBJS) -o $@ -lpthread
ln -sf liblogsys.so.$(SONAME) liblogsys.so
ln -sf liblogsys.so.$(SONAME) liblogsys.so.$(SOMAJOR)
libcoroipcs.so.$(SONAME): $(COROIPCS_OBJS)
$(LD) -G $(COROIPCS_OBJS) -o $@ -lpthread
ln -sf libcoroipcs.so.$(SONAME) libcoroipcs.so
ln -sf libcoroipcs.so.$(SONAME) libcoroipcs.so.$(SOMAJOR)
else
%.lcrso: %.o
$(CC) $(CFLAGS) -shared -Wl,-soname=$@ $^ -o $@
libtotem_pg.so.$(SONAME): $(TOTEM_OBJS)
$(CC) -shared -o $@ \
-Wl,-soname=libtotem_pg.so.$(SOMAJOR) \
$^ $(LDFLAGS) $(nss_LIBS) -lpthread
ln -sf libtotem_pg.so.$(SONAME) libtotem_pg.so
ln -sf libtotem_pg.so.$(SONAME) libtotem_pg.so.$(SOMAJOR)
liblogsys.so.$(SONAME): $(LOGSYS_OBJS)
$(CC) -shared -o $@ \
-Wl,-soname=liblogsys.so.$(SOMAJOR) \
$^ $(LDFLAGS) -lpthread
ln -sf liblogsys.so.$(SONAME) liblogsys.so
ln -sf liblogsys.so.$(SONAME) liblogsys.so.$(SOMAJOR)
libcoroipcs.so.$(SONAME): $(COROIPCS_OBJS)
$(CC) -shared -o $@ \
-Wl,-soname=libcoroipcs.so.$(SOMAJOR) \
$^ $(LDFLAGS) -lpthread
ln -sf libcoroipcs.so.$(SONAME) libcoroipcs.so
ln -sf libcoroipcs.so.$(SONAME) libcoroipcs.so.$(SOMAJOR)
endif
endif
lint:
-splint $(LINT_FLAGS) $(CFLAGS) *.c
all-local: $(LCRSO_OBJS) $(LCRSO) $(SHARED_LIBS)
@echo Built corosync Executive
install-exec-local:
$(INSTALL) -d $(DESTDIR)/$(libdir)
$(INSTALL) -m 755 $(SHARED_LIBS) $(DESTDIR)/$(libdir)
$(CP) -a $(SHARED_LIBS_SO) $(SHARED_LIBS_SO_TWO) $(DESTDIR)/$(libdir)
$(INSTALL) -d $(DESTDIR)/$(LCRSODIR)
$(INSTALL) -m 755 $(LCRSO) $(DESTDIR)/$(LCRSODIR)
uninstall-local:
cd $(DESTDIR)/$(libdir) && \
rm -f $(SHARED_LIBS) $(SHARED_LIBS_SO) $(SHARED_LIBS_SO_TWO)
cd $(DESTDIR)/$(LCRSODIR) && \
rm -f $(LCRSO)
clean-local:
rm -f corosync *.o *.lcrso gmon.out *.da *.bb *.bbg *.so*
diff --git a/exec/main.c b/exec/main.c
index bc9c1e01..022f4298 100644
--- a/exec/main.c
+++ b/exec/main.c
@@ -1,975 +1,1030 @@
/*
* Copyright (c) 2002-2006 MontaVista Software, Inc.
* Copyright (c) 2006-2009 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Steven Dake (sdake@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <config.h>
#include <pthread.h>
#include <assert.h>
#include <sys/types.h>
#include <sys/poll.h>
#include <sys/uio.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/stat.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <signal.h>
#include <sched.h>
#include <time.h>
#include <corosync/swab.h>
#include <corosync/corotypes.h>
#include <corosync/coroipc_types.h>
#include <corosync/corodefs.h>
#include <corosync/list.h>
#include <corosync/lcr/lcr_ifact.h>
#include <corosync/totem/coropoll.h>
#include <corosync/totem/totempg.h>
#include <corosync/engine/objdb.h>
#include <corosync/engine/config.h>
#include <corosync/engine/logsys.h>
#include <corosync/coroipcs.h>
#include "quorum.h"
#include "totemsrp.h"
#include "mainconfig.h"
#include "totemconfig.h"
#include "main.h"
#include "sync.h"
+#include "syncv2.h"
#include "tlist.h"
#include "timer.h"
#include "util.h"
#include "apidef.h"
#include "service.h"
#include "schedwrk.h"
#include "version.h"
#include "evil.h"
LOGSYS_DECLARE_SYSTEM ("corosync",
LOGSYS_MODE_OUTPUT_STDERR | LOGSYS_MODE_THREADED | LOGSYS_MODE_FORK,
0,
NULL,
LOG_INFO,
LOG_DAEMON,
LOG_INFO,
NULL,
1000000);
LOGSYS_DECLARE_SUBSYS ("MAIN");
#define SERVER_BACKLOG 5
static int sched_priority = 0;
static unsigned int service_count = 32;
#if defined(HAVE_PTHREAD_SPIN_LOCK)
static pthread_spinlock_t serialize_spin;
#else
static pthread_mutex_t serialize_mutex = PTHREAD_MUTEX_INITIALIZER;
#endif
static struct totem_logging_configuration totem_logging_configuration;
static int num_config_modules;
static struct config_iface_ver0 *config_modules[MAX_DYNAMIC_SERVICES];
static struct objdb_iface_ver0 *objdb = NULL;
static struct corosync_api_v1 *api = NULL;
static enum cs_sync_mode minimum_sync_mode;
static enum cs_sync_mode minimum_sync_mode;
+static int sync_in_process = 1;
+
unsigned long long *(*main_clm_get_by_nodeid) (unsigned int node_id);
hdb_handle_t corosync_poll_handle;
struct sched_param global_sched_param;
void corosync_state_dump (void)
{
int i;
for (i = 0; i < SERVICE_HANDLER_MAXIMUM_COUNT; i++) {
if (ais_service[i] && ais_service[i]->exec_dump_fn) {
ais_service[i]->exec_dump_fn ();
}
}
}
static void sigusr2_handler (int num)
{
/*
* TODO remove this from sigusr2 handler and access via cfg service
* engine api - corosync-cfgtool
*/
corosync_state_dump ();
}
/*
* TODO this function needs some love
*/
void corosync_shutdown_request (void)
{
if (api) {
corosync_service_unlink_all (api);
}
poll_stop (0);
totempg_finalize ();
coroipcs_ipc_exit ();
corosync_exit_error (AIS_DONE_EXIT);
}
static void sigquit_handler (int num)
{
corosync_shutdown_request ();
}
static void sigintr_handler (int num)
{
corosync_shutdown_request ();
}
static void sigsegv_handler (int num)
{
(void)signal (SIGSEGV, SIG_DFL);
logsys_atexit();
logsys_log_rec_store (LOCALSTATEDIR "/lib/corosync/fdata");
raise (SIGSEGV);
}
static void sigabrt_handler (int num)
{
(void)signal (SIGABRT, SIG_DFL);
logsys_atexit();
logsys_log_rec_store (LOCALSTATEDIR "/lib/corosync/fdata");
raise (SIGABRT);
}
#define LOCALHOST_IP inet_addr("127.0.0.1")
hdb_handle_t corosync_group_handle;
struct totempg_group corosync_group = {
.group = "a",
.group_len = 1
};
#if defined(HAVE_PTHREAD_SPIN_LOCK)
static void serialize_lock (void)
{
pthread_spin_lock (&serialize_spin);
}
static void serialize_unlock (void)
{
pthread_spin_unlock (&serialize_spin);
}
#else
static void serialize_lock (void)
{
pthread_mutex_lock (&serialize_mutex);
}
static void serialize_unlock (void)
{
pthread_mutex_unlock (&serialize_mutex);
}
#endif
static void corosync_sync_completed (void)
{
+ log_printf (LOGSYS_LEVEL_NOTICE,
+ "Completed service synchronization, ready to provide service.\n");
+ sync_in_process = 0;
}
static int corosync_sync_callbacks_retrieve (int sync_id,
struct sync_callbacks *callbacks)
{
unsigned int ais_service_index;
int res;
for (ais_service_index = 0;
ais_service_index < SERVICE_HANDLER_MAXIMUM_COUNT;
ais_service_index++) {
- if (ais_service[ais_service_index] != NULL) {
+ if (ais_service[ais_service_index] != NULL
+ && ais_service[ais_service_index]->sync_mode == CS_SYNC_V1) {
if (ais_service_index == sync_id) {
break;
}
}
}
/*
* Try to load backwards compat sync engines
*/
if (ais_service_index == SERVICE_HANDLER_MAXIMUM_COUNT) {
res = evil_callbacks_load (sync_id, callbacks);
return (res);
}
callbacks->name = ais_service[ais_service_index]->name;
callbacks->sync_init = ais_service[ais_service_index]->sync_init;
callbacks->sync_process = ais_service[ais_service_index]->sync_process;
callbacks->sync_activate = ais_service[ais_service_index]->sync_activate;
callbacks->sync_abort = ais_service[ais_service_index]->sync_abort;
return (0);
}
+static int corosync_sync_v2_callbacks_retrieve (
+ int service_id,
+ struct sync_callbacks *callbacks)
+{
+ int res;
+
+ if (service_id == CLM_SERVICE && ais_service[CLM_SERVICE] == NULL) {
+ res = evil_callbacks_load (service_id, callbacks);
+ return (res);
+ }
+ if (ais_service[service_id] == NULL) {
+ return (-1);
+ }
+ if (minimum_sync_mode == CS_SYNC_V1 && ais_service[service_id]->sync_mode != CS_SYNC_V2) {
+printf ("returning -1 %d\n", service_id);
+ return (-1);
+ }
+
+ callbacks->name = ais_service[service_id]->name;
+ callbacks->sync_init = ais_service[service_id]->sync_init;
+ callbacks->sync_process = ais_service[service_id]->sync_process;
+printf ("process %p\n", ais_service[service_id]->sync_process);
+ callbacks->sync_activate = ais_service[service_id]->sync_activate;
+ callbacks->sync_abort = ais_service[service_id]->sync_abort;
+ return (0);
+}
+
static struct memb_ring_id corosync_ring_id;
static void confchg_fn (
enum totem_configuration_type configuration_type,
const unsigned int *member_list, size_t member_list_entries,
const unsigned int *left_list, size_t left_list_entries,
const unsigned int *joined_list, size_t joined_list_entries,
const struct memb_ring_id *ring_id)
{
int i;
+ int abort_activate = 0;
+ if (sync_in_process == 1) {
+ abort_activate = 1;
+ }
+ sync_in_process = 1;
serialize_lock ();
memcpy (&corosync_ring_id, ring_id, sizeof (struct memb_ring_id));
/*
* Call configuration change for all services
*/
for (i = 0; i < service_count; i++) {
if (ais_service[i] && ais_service[i]->confchg_fn) {
ais_service[i]->confchg_fn (configuration_type,
member_list, member_list_entries,
left_list, left_list_entries,
joined_list, joined_list_entries, ring_id);
}
}
serialize_unlock ();
+
+ if (abort_activate) {
+ sync_v2_abort ();
+ }
+ if (minimum_sync_mode == CS_SYNC_V2 && configuration_type == TOTEM_CONFIGURATION_REGULAR) {
+ sync_v2_start (member_list, member_list_entries, ring_id);
+ }
}
static void priv_drop (void)
{
return; /* TODO: we are still not dropping privs */
}
static void corosync_tty_detach (void)
{
int fd;
/*
* Disconnect from TTY if this is not a debug run
*/
switch (fork ()) {
case -1:
corosync_exit_error (AIS_DONE_FORK);
break;
case 0:
/*
* child which is disconnected, run this process
*/
/* setset();
close (0);
close (1);
close (2);
*/
break;
default:
exit (0);
break;
}
/* Create new session */
(void)setsid();
/*
* Map stdin/out/err to /dev/null.
*/
fd = open("/dev/null", O_RDWR);
if (fd >= 0) {
/* dup2 to 0 / 1 / 2 (stdin / stdout / stderr) */
dup2(fd, STDIN_FILENO); /* 0 */
dup2(fd, STDOUT_FILENO); /* 1 */
dup2(fd, STDERR_FILENO); /* 2 */
/* Should be 0, but just in case it isn't... */
if (fd > 2)
close(fd);
}
}
static void corosync_mlockall (void)
{
#if !defined(COROSYNC_BSD)
int res;
#endif
struct rlimit rlimit;
rlimit.rlim_cur = RLIM_INFINITY;
rlimit.rlim_max = RLIM_INFINITY;
#ifndef COROSYNC_SOLARIS
setrlimit (RLIMIT_MEMLOCK, &rlimit);
#else
setrlimit (RLIMIT_VMEM, &rlimit);
#endif
#if defined(COROSYNC_BSD)
/* under FreeBSD a process with locked page cannot call dlopen
* code disabled until FreeBSD bug i386/93396 was solved
*/
log_printf (LOGSYS_LEVEL_WARNING, "Could not lock memory of service to avoid page faults\n");
#else
res = mlockall (MCL_CURRENT | MCL_FUTURE);
if (res == -1) {
log_printf (LOGSYS_LEVEL_WARNING, "Could not lock memory of service to avoid page faults: %s\n", strerror (errno));
};
#endif
}
static void deliver_fn (
unsigned int nodeid,
const void *msg,
unsigned int msg_len,
int endian_conversion_required)
{
const coroipc_request_header_t *header;
int service;
int fn_id;
unsigned int id;
unsigned int size;
header = msg;
if (endian_conversion_required) {
id = swab32 (header->id);
size = swab32 (header->size);
} else {
id = header->id;
size = header->size;
}
/*
* Call the proper executive handler
*/
service = id >> 16;
fn_id = id & 0xffff;
if (!ais_service[service])
return;
serialize_lock();
if (endian_conversion_required) {
assert(ais_service[service]->exec_engine[fn_id].exec_endian_convert_fn != NULL);
ais_service[service]->exec_engine[fn_id].exec_endian_convert_fn
((void *)msg);
}
ais_service[service]->exec_engine[fn_id].exec_handler_fn
(msg, nodeid);
serialize_unlock();
}
void main_get_config_modules(struct config_iface_ver0 ***modules, int *num)
{
*modules = config_modules;
*num = num_config_modules;
}
int main_mcast (
const struct iovec *iovec,
unsigned int iov_len,
unsigned int guarantee)
{
return (totempg_groups_mcast_joined (corosync_group_handle, iovec, iov_len, guarantee));
}
int message_source_is_local (const mar_message_source_t *source)
{
int ret = 0;
assert (source != NULL);
if (source->nodeid == totempg_my_nodeid_get ()) {
ret = 1;
}
return ret;
}
void message_source_set (
mar_message_source_t *source,
void *conn)
{
assert ((source != NULL) && (conn != NULL));
memset (source, 0, sizeof (mar_message_source_t));
source->nodeid = totempg_my_nodeid_get ();
source->conn = conn;
}
/*
* Provides the glue from corosync to the IPC Service
*/
static int corosync_private_data_size_get (unsigned int service)
{
return (ais_service[service]->private_data_size);
}
static coroipcs_init_fn_lvalue corosync_init_fn_get (unsigned int service)
{
return (ais_service[service]->lib_init_fn);
}
static coroipcs_exit_fn_lvalue corosync_exit_fn_get (unsigned int service)
{
return (ais_service[service]->lib_exit_fn);
}
static coroipcs_handler_fn_lvalue corosync_handler_fn_get (unsigned int service, unsigned int id)
{
return (ais_service[service]->lib_engine[id].lib_handler_fn);
}
static int corosync_security_valid (int euid, int egid)
{
struct list_head *iter;
if (euid == 0 || egid == 0) {
return (1);
}
for (iter = uidgid_list_head.next; iter != &uidgid_list_head;
iter = iter->next) {
struct uidgid_item *ugi = list_entry (iter, struct uidgid_item,
list);
if (euid == ugi->uid || egid == ugi->gid)
return (1);
}
return (0);
}
static int corosync_service_available (unsigned int service)
{
return (ais_service[service] != NULL);
}
struct sending_allowed_private_data_struct {
int reserved_msgs;
};
static int corosync_sending_allowed (
unsigned int service,
unsigned int id,
const void *msg,
void *sending_allowed_private_data)
{
struct sending_allowed_private_data_struct *pd =
(struct sending_allowed_private_data_struct *)sending_allowed_private_data;
struct iovec reserve_iovec;
coroipc_request_header_t *header = (coroipc_request_header_t *)msg;
int sending_allowed;
reserve_iovec.iov_base = (char *)header;
reserve_iovec.iov_len = header->size;
pd->reserved_msgs = totempg_groups_joined_reserve (
corosync_group_handle,
&reserve_iovec, 1);
sending_allowed =
(corosync_quorum_is_quorate() == 1 ||
ais_service[service]->allow_inquorate == CS_LIB_ALLOW_INQUORATE) &&
((ais_service[service]->lib_engine[id].flow_control == CS_LIB_FLOW_CONTROL_NOT_REQUIRED) ||
((ais_service[service]->lib_engine[id].flow_control == CS_LIB_FLOW_CONTROL_REQUIRED) &&
(pd->reserved_msgs) &&
- (sync_in_process() == 0)));
+ (sync_in_process == 0)));
return (sending_allowed);
}
static void corosync_sending_allowed_release (void *sending_allowed_private_data)
{
struct sending_allowed_private_data_struct *pd =
(struct sending_allowed_private_data_struct *)sending_allowed_private_data;
totempg_groups_joined_release (pd->reserved_msgs);
}
static int ipc_subsys_id = -1;
static void ipc_log_printf (const char *format, ...) __attribute__((format(printf, 1, 2)));
static void ipc_log_printf (const char *format, ...) {
va_list ap;
va_start (ap, format);
_logsys_log_vprintf (
LOGSYS_ENCODE_RECID(LOGSYS_LEVEL_ERROR,
ipc_subsys_id,
LOGSYS_RECID_LOG),
__FUNCTION__, __FILE__, __LINE__,
format, ap);
va_end (ap);
}
static void ipc_fatal_error(const char *error_msg) {
_logsys_log_printf (
LOGSYS_ENCODE_RECID(LOGSYS_LEVEL_ERROR,
ipc_subsys_id,
LOGSYS_RECID_LOG),
__FUNCTION__, __FILE__, __LINE__,
"%s", error_msg);
exit(EXIT_FAILURE);
}
static int corosync_poll_handler_accept (
hdb_handle_t handle,
int fd,
int revent,
void *context)
{
return (coroipcs_handler_accept (fd, revent, context));
}
static int corosync_poll_handler_dispatch (
hdb_handle_t handle,
int fd,
int revent,
void *context)
{
return (coroipcs_handler_dispatch (fd, revent, context));
}
static void corosync_poll_accept_add (
int fd)
{
poll_dispatch_add (corosync_poll_handle, fd, POLLIN|POLLNVAL, 0,
corosync_poll_handler_accept);
}
static void corosync_poll_dispatch_add (
int fd,
void *context)
{
poll_dispatch_add (corosync_poll_handle, fd, POLLIN|POLLNVAL, context,
corosync_poll_handler_dispatch);
}
static void corosync_poll_dispatch_modify (
int fd,
int events)
{
poll_dispatch_modify (corosync_poll_handle, fd, events,
corosync_poll_handler_dispatch);
}
static struct coroipcs_init_state ipc_init_state = {
.socket_name = COROSYNC_SOCKET_NAME,
.sched_policy = SCHED_OTHER,
.sched_param = &global_sched_param,
.malloc = malloc,
.free = free,
.log_printf = ipc_log_printf,
.fatal_error = ipc_fatal_error,
.security_valid = corosync_security_valid,
.service_available = corosync_service_available,
.private_data_size_get = corosync_private_data_size_get,
.serialize_lock = serialize_lock,
.serialize_unlock = serialize_unlock,
.sending_allowed = corosync_sending_allowed,
.sending_allowed_release = corosync_sending_allowed_release,
.poll_accept_add = corosync_poll_accept_add,
.poll_dispatch_add = corosync_poll_dispatch_add,
.poll_dispatch_modify = corosync_poll_dispatch_modify,
.init_fn_get = corosync_init_fn_get,
.exit_fn_get = corosync_exit_fn_get,
.handler_fn_get = corosync_handler_fn_get
};
static void corosync_setscheduler (void)
{
#if defined(HAVE_PTHREAD_SETSCHEDPARAM) && defined(HAVE_SCHED_GET_PRIORITY_MAX)
int res;
sched_priority = sched_get_priority_max (SCHED_RR);
if (sched_priority != -1) {
global_sched_param.sched_priority = sched_priority;
res = sched_setscheduler (0, SCHED_RR, &global_sched_param);
if (res == -1) {
global_sched_param.sched_priority = 0;
log_printf (LOGSYS_LEVEL_WARNING, "Could not set SCHED_RR at priority %d: %s\n",
global_sched_param.sched_priority, strerror (errno));
} else {
/*
* Turn on SCHED_RR in ipc system
*/
ipc_init_state.sched_policy = SCHED_RR;
}
} else {
log_printf (LOGSYS_LEVEL_WARNING, "Could not get maximum scheduler priority: %s\n", strerror (errno));
sched_priority = 0;
}
#else
log_printf(LOGSYS_LEVEL_WARNING,
"The Platform is missing process priority setting features. Leaving at default.");
#endif
}
int main (int argc, char **argv)
{
const char *error_string;
struct totem_config totem_config;
hdb_handle_t objdb_handle;
hdb_handle_t config_handle;
unsigned int config_version = 0;
void *objdb_p;
struct config_iface_ver0 *config;
void *config_p;
const char *config_iface_init;
char *config_iface;
char *iface;
int res, ch;
int background, setprio;
struct stat stat_out;
char corosync_lib_dir[PATH_MAX];
#if defined(HAVE_PTHREAD_SPIN_LOCK)
pthread_spin_init (&serialize_spin, 0);
#endif
/* default configuration
*/
background = 1;
setprio = 1;
while ((ch = getopt (argc, argv, "fp")) != EOF) {
switch (ch) {
case 'f':
background = 0;
logsys_config_mode_set (NULL, LOGSYS_MODE_OUTPUT_STDERR|LOGSYS_MODE_THREADED|LOGSYS_MODE_FORK);
break;
case 'p':
setprio = 0;
break;
default:
fprintf(stderr, \
"usage:\n"\
" -f : Start application in foreground.\n"\
" -p : Do not set process priority. \n");
return EXIT_FAILURE;
}
}
if (background)
corosync_tty_detach ();
/*
* Set round robin realtime scheduling with priority 99
* Lock all memory to avoid page faults which may interrupt
* application healthchecking
*/
if (setprio) {
corosync_setscheduler ();
}
corosync_mlockall ();
log_printf (LOGSYS_LEVEL_NOTICE, "Corosync Cluster Engine ('%s'): started and ready to provide service.\n", RELEASE_VERSION);
(void)signal (SIGINT, sigintr_handler);
(void)signal (SIGUSR2, sigusr2_handler);
(void)signal (SIGSEGV, sigsegv_handler);
(void)signal (SIGABRT, sigabrt_handler);
(void)signal (SIGQUIT, sigquit_handler);
#if MSG_NOSIGNAL == 0
(void)signal (SIGPIPE, SIG_IGN);
#endif
corosync_timer_init (
serialize_lock,
serialize_unlock,
sched_priority);
corosync_poll_handle = poll_create ();
/*
* Load the object database interface
*/
res = lcr_ifact_reference (
&objdb_handle,
"objdb",
0,
&objdb_p,
0);
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't open configuration object database component.\n");
corosync_exit_error (AIS_DONE_OBJDB);
}
objdb = (struct objdb_iface_ver0 *)objdb_p;
objdb->objdb_init ();
/*
* Initialize the corosync_api_v1 definition
*/
apidef_init (objdb);
api = apidef_get ();
num_config_modules = 0;
/*
* Bootstrap in the default configuration parser or use
* the corosync default built in parser if the configuration parser
* isn't overridden
*/
config_iface_init = getenv("COROSYNC_DEFAULT_CONFIG_IFACE");
if (!config_iface_init) {
config_iface_init = "corosync_parser";
}
/* Make a copy so we can deface it with strtok */
if ((config_iface = strdup(config_iface_init)) == NULL) {
log_printf (LOGSYS_LEVEL_ERROR, "exhausted virtual memory");
corosync_exit_error (AIS_DONE_OBJDB);
}
iface = strtok(config_iface, ":");
while (iface)
{
res = lcr_ifact_reference (
&config_handle,
iface,
config_version,
&config_p,
0);
config = (struct config_iface_ver0 *)config_p;
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't open configuration component '%s'\n", iface);
corosync_exit_error (AIS_DONE_MAINCONFIGREAD);
}
res = config->config_readconfig(objdb, &error_string);
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string);
corosync_exit_error (AIS_DONE_MAINCONFIGREAD);
}
log_printf (LOGSYS_LEVEL_NOTICE, "%s", error_string);
config_modules[num_config_modules++] = config;
iface = strtok(NULL, ":");
}
free(config_iface);
res = corosync_main_config_read (objdb, &error_string);
if (res == -1) {
/*
* if we are here, we _must_ flush the logsys queue
* and try to inform that we couldn't read the config.
* this is a desperate attempt before certain death
* and there is no guarantee that we can print to stderr
* nor that logsys is sending the messages where we expect.
*/
log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string);
fprintf(stderr, "%s", error_string);
syslog (LOGSYS_LEVEL_ERROR, "%s", error_string);
corosync_exit_error (AIS_DONE_MAINCONFIGREAD);
}
logsys_fork_completed();
if (setprio) {
res = logsys_thread_priority_set (SCHED_RR, &global_sched_param, 10);
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR,
"Could not set logsys thread priority. Can't continue because of priority inversions.");
corosync_exit_error (AIS_DONE_LOGSETUP);
}
} else {
res = logsys_thread_priority_set (SCHED_OTHER, NULL, 1);
}
/*
* Make sure required directory is present
*/
sprintf (corosync_lib_dir, "%s/lib/corosync", LOCALSTATEDIR);
res = stat (corosync_lib_dir, &stat_out);
if ((res == -1) || (res == 0 && !S_ISDIR(stat_out.st_mode))) {
log_printf (LOGSYS_LEVEL_ERROR, "Required directory not present %s. Please create it.\n", corosync_lib_dir);
corosync_exit_error (AIS_DONE_DIR_NOT_PRESENT);
}
res = totem_config_read (objdb, &totem_config, &error_string);
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string);
corosync_exit_error (AIS_DONE_MAINCONFIGREAD);
}
res = totem_config_keyread (objdb, &totem_config, &error_string);
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string);
corosync_exit_error (AIS_DONE_MAINCONFIGREAD);
}
res = totem_config_validate (&totem_config, &error_string);
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string);
corosync_exit_error (AIS_DONE_MAINCONFIGREAD);
}
totem_config.totem_logging_configuration = totem_logging_configuration;
totem_config.totem_logging_configuration.log_subsys_id =
_logsys_subsys_create ("TOTEM");
if (totem_config.totem_logging_configuration.log_subsys_id < 0) {
log_printf (LOGSYS_LEVEL_ERROR,
"Unable to initialize TOTEM logging subsystem\n");
corosync_exit_error (AIS_DONE_MAINCONFIGREAD);
}
totem_config.totem_logging_configuration.log_level_security = LOGSYS_LEVEL_WARNING;
totem_config.totem_logging_configuration.log_level_error = LOGSYS_LEVEL_ERROR;
totem_config.totem_logging_configuration.log_level_warning = LOGSYS_LEVEL_WARNING;
totem_config.totem_logging_configuration.log_level_notice = LOGSYS_LEVEL_NOTICE;
totem_config.totem_logging_configuration.log_level_debug = LOGSYS_LEVEL_DEBUG;
totem_config.totem_logging_configuration.log_printf = _logsys_log_printf;
res = corosync_main_config_compatibility_read (objdb,
&minimum_sync_mode,
&error_string);
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string);
corosync_exit_error (AIS_DONE_MAINCONFIGREAD);
}
res = corosync_main_config_compatibility_read (objdb,
&minimum_sync_mode,
&error_string);
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string);
corosync_exit_error (AIS_DONE_MAINCONFIGREAD);
}
/*
* Sleep for a while to let other nodes in the cluster
* understand that this node has been away (if it was
* an corosync restart).
*/
// TODO what is this hack for? usleep(totem_config.token_timeout * 2000);
/*
* if totempg_initialize doesn't have root priveleges, it cannot
* bind to a specific interface. This only matters if
* there is more then one interface in a system, so
* in this case, only a warning is printed
*/
/*
* Join multicast group and setup delivery
* and configuration change functions
*/
totempg_initialize (
corosync_poll_handle,
&totem_config);
totempg_groups_initialize (
&corosync_group_handle,
deliver_fn,
confchg_fn);
totempg_groups_join (
corosync_group_handle,
&corosync_group,
1);
- if (minimum_sync_mode == 1) {
- log_printf (LOGSYS_LEVEL_NOTICE, "Compatibility mode set to none. Using V2 of the synchronization engine.\n");
- } else
- if (minimum_sync_mode == 0) {
- log_printf (LOGSYS_LEVEL_NOTICE, "Compatibility mode set to whitetank. Using V1 and V2 of the synchronization engine.\n");
- }
-
/*
* This must occur after totempg is initialized because "this_ip" must be set
*/
res = corosync_service_defaults_link_and_init (api);
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "Could not initialize default services\n");
corosync_exit_error (AIS_DONE_INIT_SERVICES);
}
evil_init (api);
- sync_register (corosync_sync_callbacks_retrieve, corosync_sync_completed);
+ if (minimum_sync_mode == 1) {
+ log_printf (LOGSYS_LEVEL_NOTICE, "Compatibility mode set to none. Using V2 of the synchronization engine.\n");
+ sync_v2_init (
+ corosync_sync_v2_callbacks_retrieve,
+ corosync_sync_completed);
+ } else
+ if (minimum_sync_mode == 0) {
+ log_printf (LOGSYS_LEVEL_NOTICE, "Compatibility mode set to whitetank. Using V1 and V2 of the synchronization engine.\n");
+ sync_register (
+ corosync_sync_callbacks_retrieve,
+ sync_v2_start);
+
+ sync_v2_init (
+ corosync_sync_v2_callbacks_retrieve,
+ corosync_sync_completed);
+ }
+
/*
* Drop root privleges to user 'ais'
* TODO: Don't really need full root capabilities;
* needed capabilities are:
* CAP_NET_RAW (bindtodevice)
* CAP_SYS_NICE (setscheduler)
* CAP_IPC_LOCK (mlockall)
*/
priv_drop ();
schedwrk_init (
serialize_lock,
serialize_unlock);
ipc_subsys_id = _logsys_subsys_create ("IPC");
if (ipc_subsys_id < 0) {
log_printf (LOGSYS_LEVEL_ERROR,
"Could not initialize IPC logging subsystem\n");
corosync_exit_error (AIS_DONE_INIT_SERVICES);
}
coroipcs_ipc_init (&ipc_init_state);
/*
* Start main processing loop
*/
poll_run (corosync_poll_handle);
return EXIT_SUCCESS;
}
diff --git a/exec/service.h b/exec/service.h
index d68fcd2b..7e331b05 100644
--- a/exec/service.h
+++ b/exec/service.h
@@ -1,75 +1,77 @@
/*
* Copyright (c) 2002-2006 MontaVista Software, Inc.
* Copyright (c) 2006-2007, 2009 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Steven Dake (sdake@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef COROSYNC_SERVICE_H_DEFINED
#define COROSYNC_SERVICE_H_DEFINED
/*
* Link and initialize a service
*/
+struct corosync_api_v1;
+
extern unsigned int corosync_service_link_and_init (
struct corosync_api_v1 *objdb,
const char *service_name,
unsigned int service_ver);
/*
* Unlink and exit a service based on service priority
*/
extern unsigned int corosync_service_unlink_priority (
struct corosync_api_v1 *corosync_api,
int priority);
/*
* Unlink and exit a service
*/
extern unsigned int corosync_service_unlink_and_exit (
struct corosync_api_v1 *objdb,
const char *service_name,
unsigned int service_ver);
/*
* Unlink and exit all corosync services
*/
extern unsigned int corosync_service_unlink_all (
struct corosync_api_v1 *objdb);
/*
* Load all of the default services
*/
extern unsigned int corosync_service_defaults_link_and_init (
struct corosync_api_v1 *objdb);
extern struct corosync_service_engine *ais_service[];
#endif /* SERVICE_H_DEFINED */
diff --git a/exec/sync.c b/exec/sync.c
index 4edc053a..805badbf 100644
--- a/exec/sync.c
+++ b/exec/sync.c
@@ -1,451 +1,466 @@
/*
* Copyright (c) 2005-2006 MontaVista Software, Inc.
* Copyright (c) 2006-2007, 2009 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Steven Dake (sdake@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <config.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/ioctl.h>
#include <netinet/in.h>
#include <sys/uio.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <time.h>
#include <unistd.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <corosync/corotypes.h>
#include <corosync/swab.h>
#include <corosync/totem/totempg.h>
#include <corosync/totem/totem.h>
#include <corosync/lcr/lcr_ifact.h>
#include <corosync/engine/logsys.h>
#include <corosync/coroipc_types.h>
#include "quorum.h"
#include "sync.h"
LOGSYS_DECLARE_SUBSYS ("SYNC");
#define MESSAGE_REQ_SYNC_BARRIER 0
struct barrier_data {
unsigned int nodeid;
int completed;
};
static const struct memb_ring_id *sync_ring_id;
static int (*sync_callbacks_retrieve) (int sync_id, struct sync_callbacks *callack);
static struct sync_callbacks sync_callbacks;
static int sync_processing = 0;
-static void (*sync_synchronization_completed) (void);
+static void (*sync_next_start) (
+ unsigned int *member_list,
+ size_t member_list_entries,
+ const struct memb_ring_id *ring_id);
static int sync_recovery_index = 0;
static void *sync_callback_token_handle = 0;
static struct barrier_data barrier_data_confchg[PROCESSOR_COUNT_MAX];
static size_t barrier_data_confchg_entries;
static struct barrier_data barrier_data_process[PROCESSOR_COUNT_MAX];
+static unsigned int my_member_list[PROCESSOR_COUNT_MAX];
+
+static unsigned int my_member_list_entries;
+
static int sync_barrier_send (const struct memb_ring_id *ring_id);
static int sync_start_process (enum totem_callback_token_type type,
const void *data);
static void sync_service_init (struct memb_ring_id *ring_id);
static int sync_service_process (enum totem_callback_token_type type,
const void *data);
static void sync_deliver_fn (
unsigned int nodeid,
const void *msg,
unsigned int msg_len,
int endian_conversion_required);
static void sync_confchg_fn (
enum totem_configuration_type configuration_type,
- const unsigned int *member_list, size_t member_list_entries,
- const unsigned int *left_list, size_t left_list_entries,
- const unsigned int *joined_list, size_t joined_list_entries,
+ const unsigned int *member_list,
+ size_t member_list_entries,
+ const unsigned int *left_list,
+ size_t left_list_entries,
+ const unsigned int *joined_list,
+ size_t joined_list_entries,
const struct memb_ring_id *ring_id);
static void sync_primary_callback_fn (
const unsigned int *view_list,
size_t view_list_entries,
int primary_designated,
const struct memb_ring_id *ring_id);
static struct totempg_group sync_group = {
.group = "sync",
.group_len = 4
};
static hdb_handle_t sync_group_handle;
struct req_exec_sync_barrier_start {
coroipc_request_header_t header;
struct memb_ring_id ring_id;
};
/*
* Send a barrier data structure
*/
static int sync_barrier_send (const struct memb_ring_id *ring_id)
{
struct req_exec_sync_barrier_start req_exec_sync_barrier_start;
struct iovec iovec;
int res;
req_exec_sync_barrier_start.header.size = sizeof (struct req_exec_sync_barrier_start);
req_exec_sync_barrier_start.header.id = MESSAGE_REQ_SYNC_BARRIER;
memcpy (&req_exec_sync_barrier_start.ring_id, ring_id,
sizeof (struct memb_ring_id));
iovec.iov_base = (char *)&req_exec_sync_barrier_start;
iovec.iov_len = sizeof (req_exec_sync_barrier_start);
res = totempg_groups_mcast_joined (sync_group_handle, &iovec, 1, TOTEMPG_AGREED);
return (res);
}
static void sync_start_init (const struct memb_ring_id *ring_id)
{
totempg_callback_token_create (
&sync_callback_token_handle,
TOTEM_CALLBACK_TOKEN_SENT,
0, /* don't delete after callback */
sync_start_process,
ring_id);
}
static void sync_service_init (struct memb_ring_id *ring_id)
{
sync_callbacks.sync_init ();
totempg_callback_token_destroy (&sync_callback_token_handle);
/*
* Create the token callback for the processing
*/
totempg_callback_token_create (
&sync_callback_token_handle,
TOTEM_CALLBACK_TOKEN_SENT,
0, /* don't delete after callback */
sync_service_process,
ring_id);
}
static int sync_start_process (enum totem_callback_token_type type,
const void *data)
{
int res;
const struct memb_ring_id *ring_id = data;
res = sync_barrier_send (ring_id);
if (res == 0) {
/*
* Delete the token callback for the barrier
*/
totempg_callback_token_destroy (&sync_callback_token_handle);
}
return (0);
}
static void sync_callbacks_load (void)
{
int res;
for (;;) {
res = sync_callbacks_retrieve (sync_recovery_index,
&sync_callbacks);
/*
* No more service handlers have sync callbacks at this time
` */
if (res == -1) {
sync_processing = 0;
break;
}
sync_recovery_index += 1;
if (sync_callbacks.sync_init) {
break;
}
}
}
static int sync_service_process (enum totem_callback_token_type type,
const void *data)
{
int res;
const struct memb_ring_id *ring_id = data;
/*
* If process operation not from this ring id, then ignore it and stop
* processing
*/
if (memcmp (ring_id, sync_ring_id, sizeof (struct memb_ring_id)) != 0) {
return (0);
}
/*
* If process returns 0, then its time to activate
* and start the next service's synchronization
*/
res = sync_callbacks.sync_process ();
if (res != 0) {
return (0);
}
totempg_callback_token_destroy (&sync_callback_token_handle);
sync_start_init (ring_id);
return (0);
}
int sync_register (
- int (*callbacks_retrieve) (int sync_id, struct sync_callbacks *callack),
- void (*synchronization_completed) (void))
+ int (*callbacks_retrieve) (
+ int sync_id,
+ struct sync_callbacks *callbacks),
+
+ void (*next_start) (
+ unsigned int *member_list,
+ size_t member_list_entries,
+ const struct memb_ring_id *ring_id))
{
unsigned int res;
res = totempg_groups_initialize (
&sync_group_handle,
sync_deliver_fn,
sync_confchg_fn);
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR,
"Couldn't initialize groups interface.\n");
return (-1);
}
res = totempg_groups_join (
sync_group_handle,
&sync_group,
1);
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "Couldn't join group.\n");
return (-1);
}
sync_callbacks_retrieve = callbacks_retrieve;
- sync_synchronization_completed = synchronization_completed;
+ sync_next_start = next_start;
return (0);
}
static void sync_primary_callback_fn (
const unsigned int *view_list,
size_t view_list_entries,
int primary_designated,
const struct memb_ring_id *ring_id)
{
int i;
if (primary_designated) {
log_printf (LOGSYS_LEVEL_DEBUG, "This node is within the primary component and will provide service.\n");
} else {
log_printf (LOGSYS_LEVEL_DEBUG, "This node is within the non-primary component and will NOT provide any services.\n");
return;
}
/*
* Execute configuration change for synchronization service
*/
sync_processing = 1;
totempg_callback_token_destroy (&sync_callback_token_handle);
sync_recovery_index = 0;
memset (&barrier_data_confchg, 0, sizeof (barrier_data_confchg));
for (i = 0; i < view_list_entries; i++) {
barrier_data_confchg[i].nodeid = view_list[i];
barrier_data_confchg[i].completed = 0;
}
memcpy (barrier_data_process, barrier_data_confchg,
sizeof (barrier_data_confchg));
barrier_data_confchg_entries = view_list_entries;
sync_start_init (sync_ring_id);
}
static struct memb_ring_id deliver_ring_id;
static void sync_endian_convert (struct req_exec_sync_barrier_start
*req_exec_sync_barrier_start)
{
totemip_copy_endian_convert(&req_exec_sync_barrier_start->ring_id.rep,
&req_exec_sync_barrier_start->ring_id.rep);
req_exec_sync_barrier_start->ring_id.seq = swab64 (req_exec_sync_barrier_start->ring_id.seq);
}
static void sync_deliver_fn (
unsigned int nodeid,
const void *msg,
unsigned int msg_len,
int endian_conversion_required)
{
struct req_exec_sync_barrier_start *req_exec_sync_barrier_start =
(struct req_exec_sync_barrier_start *)msg;
unsigned int barrier_completed;
int i;
log_printf (LOGSYS_LEVEL_DEBUG, "confchg entries %lu\n",
(unsigned long int) barrier_data_confchg_entries);
if (endian_conversion_required) {
sync_endian_convert (req_exec_sync_barrier_start);
}
barrier_completed = 1;
memcpy (&deliver_ring_id, &req_exec_sync_barrier_start->ring_id,
sizeof (struct memb_ring_id));
/*
* Is this barrier from this configuration, if not, ignore it
*/
if (memcmp (&req_exec_sync_barrier_start->ring_id, sync_ring_id,
sizeof (struct memb_ring_id)) != 0) {
return;
}
/*
* Set completion for source_addr's address
*/
for (i = 0; i < barrier_data_confchg_entries; i++) {
if (nodeid == barrier_data_process[i].nodeid) {
barrier_data_process[i].completed = 1;
log_printf (LOGSYS_LEVEL_DEBUG,
"Barrier Start Received From %d\n",
barrier_data_process[i].nodeid);
break;
}
}
/*
* Test if barrier is complete
*/
for (i = 0; i < barrier_data_confchg_entries; i++) {
log_printf (LOGSYS_LEVEL_DEBUG,
"Barrier completion status for nodeid %d = %d. \n",
barrier_data_process[i].nodeid,
barrier_data_process[i].completed);
if (barrier_data_process[i].completed == 0) {
barrier_completed = 0;
}
}
if (barrier_completed) {
log_printf (LOGSYS_LEVEL_DEBUG,
"Synchronization barrier completed\n");
}
/*
* This sync is complete so activate and start next service sync
*/
if (barrier_completed && sync_callbacks.sync_activate) {
sync_callbacks.sync_activate ();
log_printf (LOGSYS_LEVEL_DEBUG,
"Committing synchronization for (%s)\n",
sync_callbacks.name);
}
/*
* Start synchronization if the barrier has completed
*/
if (barrier_completed) {
memcpy (barrier_data_process, barrier_data_confchg,
sizeof (barrier_data_confchg));
sync_callbacks_load();
/*
* if sync service found, execute it
*/
if (sync_processing && sync_callbacks.sync_init) {
log_printf (LOGSYS_LEVEL_DEBUG,
"Synchronization actions starting for (%s)\n",
sync_callbacks.name);
sync_service_init (&deliver_ring_id);
}
+ if (sync_processing == 0) {
+ sync_next_start (my_member_list, my_member_list_entries, sync_ring_id);
+ }
}
return;
}
static void sync_confchg_fn (
enum totem_configuration_type configuration_type,
- const unsigned int *member_list, size_t member_list_entries,
- const unsigned int *left_list, size_t left_list_entries,
- const unsigned int *joined_list, size_t joined_list_entries,
+ const unsigned int *member_list,
+ size_t member_list_entries,
+ const unsigned int *left_list,
+ size_t left_list_entries,
+ const unsigned int *joined_list,
+ size_t joined_list_entries,
const struct memb_ring_id *ring_id)
{
sync_ring_id = ring_id;
if (configuration_type != TOTEM_CONFIGURATION_REGULAR) {
return;
}
+ memcpy (my_member_list, member_list, member_list_entries * sizeof (unsigned int));
+ my_member_list_entries = member_list_entries;
+
if (sync_processing && sync_callbacks.sync_abort != NULL) {
sync_callbacks.sync_abort ();
sync_callbacks.sync_activate = NULL;
}
sync_primary_callback_fn (
member_list,
member_list_entries,
1,
ring_id);
}
-
-int sync_in_process (void)
-{
- return (sync_processing);
-}
-
-int sync_primary_designated (void)
-{
- return (1);
-}
diff --git a/exec/sync.h b/exec/sync.h
index cb6f44b1..c57b8694 100644
--- a/exec/sync.h
+++ b/exec/sync.h
@@ -1,67 +1,61 @@
/*
* Copyright (c) 2002-2004 MontaVista Software, Inc.
*
* All rights reserved.
*
* Author: Steven Dake (sdake@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef SYNC_H_DEFINED
#define SYNC_H_DEFINED
#include <netinet/in.h>
#include <corosync/totem/totempg.h>
#include "totemsrp.h"
struct sync_callbacks {
void (*sync_init) (void);
int (*sync_process) (void);
void (*sync_activate) (void);
void (*sync_abort) (void);
const char *name;
};
-struct corosync_api_v1;
int sync_register (
- int (*sync_callbacks_retrieve) (int sync_id, struct sync_callbacks *callbacks),
- void (*synchronization_completed) (void));
+ int (*sync_callbacks_retrieve) (
+ int sync_id,
+ struct sync_callbacks *callbacks),
-int sync_in_process (void);
+ void (*next_start) (
+ unsigned int *member_list,
+ size_t member_list_entries,
+ const struct memb_ring_id *ring_id));
-int sync_primary_designated (void);
-
-/**
- * Execute synchronization upon request for the named service
- * @param name service handler name to synchronize
- *
- * @return int 0 OK, error code otherwise
- */
-extern int sync_request (const char *name);
#endif /* SYNC_H_DEFINED */
diff --git a/exec/syncv2.c b/exec/syncv2.c
new file mode 100644
index 00000000..211a53a9
--- /dev/null
+++ b/exec/syncv2.c
@@ -0,0 +1,478 @@
+/*
+ * Copyright (c) 2009 Red Hat, Inc.
+ *
+ * All rights reserved.
+ *
+ * Author: Steven Dake (sdake@redhat.com)
+ *
+ * This software licensed under BSD license, the text of which follows:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * - Neither the name of the MontaVista Software, Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from this
+ * software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+#include <config.h>
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <sys/ioctl.h>
+#include <netinet/in.h>
+#include <sys/uio.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <errno.h>
+#include <time.h>
+#include <unistd.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+
+#include <corosync/corotypes.h>
+#include <corosync/swab.h>
+#include <corosync/totem/totempg.h>
+#include <corosync/totem/totem.h>
+#include <corosync/lcr/lcr_ifact.h>
+#include <corosync/engine/logsys.h>
+#include <corosync/coroipc_types.h>
+#include "schedwrk.h"
+#include "quorum.h"
+#include "sync.h"
+#include "syncv2.h"
+
+LOGSYS_DECLARE_SUBSYS ("SYNCV2");
+
+#define MESSAGE_REQ_SYNC_BARRIER 0
+#define MESSAGE_REQ_SYNC_SERVICE_BUILD 1
+
+enum sync_process_state {
+ INIT,
+ PROCESS,
+ ACTIVATE
+};
+
+enum sync_state {
+ SYNC_SERVICELIST_BUILD,
+ SYNC_PROCESS,
+ SYNC_BARRIER
+};
+
+struct service_entry {
+ int service_id;
+ void (*sync_init) (void);
+ void (*sync_abort) (void);
+ int (*sync_process) (void);
+ void (*sync_activate) (void);
+ enum sync_process_state state;
+ char name[128];
+};
+
+struct processor_entry {
+ int nodeid;
+ int received;
+};
+
+struct req_exec_service_build_message {
+ coroipc_request_header_t header;
+ struct memb_ring_id ring_id;
+ int service_list[128];
+ int service_list_entries;
+};
+
+struct req_exec_barrier_message {
+ coroipc_request_header_t header;
+ struct memb_ring_id ring_id;
+};
+
+static enum sync_state my_state = SYNC_BARRIER;
+
+static struct memb_ring_id my_ring_id;
+
+static int my_processing_idx = 0;
+
+static hdb_handle_t my_schedwrk_handle;
+
+static struct processor_entry my_processor_list[128];
+
+static int my_processor_list_entries = 0;
+
+static struct service_entry my_service_list[128];
+
+static int my_service_list_entries = 0;
+
+static const struct memb_ring_id sync_ring_id;
+
+static struct service_entry my_initial_service_list[128];
+
+static int my_initial_service_list_entries;
+
+static void (*sync_synchronization_completed) (void);
+
+static void sync_deliver_fn (
+ unsigned int nodeid,
+ const void *msg,
+ unsigned int msg_len,
+ int endian_conversion_required);
+
+static int schedwrk_processor (const void *context);
+
+static void sync_process_enter (void);
+
+static struct totempg_group sync_group = {
+ .group = "syncv2",
+ .group_len = 6
+};
+
+static hdb_handle_t sync_group_handle;
+
+int sync_v2_init (
+ int (*sync_callbacks_retrieve) (
+ int service_id,
+ struct sync_callbacks *callbacks),
+ void (*synchronization_completed) (void))
+{
+ unsigned int res;
+ int i;
+ struct sync_callbacks sync_callbacks;
+
+ res = totempg_groups_initialize (
+ &sync_group_handle,
+ sync_deliver_fn,
+ NULL);
+ if (res == -1) {
+ log_printf (LOGSYS_LEVEL_ERROR,
+ "Couldn't initialize groups interface.\n");
+ return (-1);
+ }
+
+ res = totempg_groups_join (
+ sync_group_handle,
+ &sync_group,
+ 1);
+ if (res == -1) {
+ log_printf (LOGSYS_LEVEL_ERROR, "Couldn't join group.\n");
+ return (-1);
+ }
+
+ sync_synchronization_completed = synchronization_completed;
+ for (i = 0; i < 64; i++) {
+ res = sync_callbacks_retrieve (i, &sync_callbacks);
+ if (res == -1) {
+ continue;
+ }
+ if (sync_callbacks.sync_init == NULL) {
+ continue;
+ }
+ my_initial_service_list[my_initial_service_list_entries].state =
+ INIT;
+ my_initial_service_list[my_initial_service_list_entries].service_id = i;
+ strcpy (my_initial_service_list[my_initial_service_list_entries].name,
+ sync_callbacks.name);
+ my_initial_service_list[my_initial_service_list_entries].sync_init = sync_callbacks.sync_init;
+ my_initial_service_list[my_initial_service_list_entries].sync_process = sync_callbacks.sync_process;
+ my_initial_service_list[my_initial_service_list_entries].sync_abort = sync_callbacks.sync_abort;
+ my_initial_service_list[my_initial_service_list_entries].sync_activate = sync_callbacks.sync_activate;
+ my_initial_service_list_entries += 1;
+ }
+ return (0);
+}
+
+static void sync_barrier_handler (unsigned int nodeid, const void *msg)
+{
+ const struct req_exec_barrier_message *req_exec_barrier_message = msg;
+ int i;
+ int barrier_reached = 1;
+
+ if (memcmp (&my_ring_id, &req_exec_barrier_message->ring_id,
+ sizeof (struct memb_ring_id)) != 0) {
+
+ return;
+ }
+ for (i = 0; i < my_processor_list_entries; i++) {
+ if (my_processor_list[i].nodeid == nodeid) {
+ my_processor_list[i].received = 1;
+ }
+ }
+ for (i = 0; i < my_processor_list_entries; i++) {
+ if (my_processor_list[i].received == 0) {
+ barrier_reached = 0;
+ }
+ }
+ if (barrier_reached) {
+ my_processing_idx += 1;
+ if (my_service_list_entries == my_processing_idx) {
+ sync_synchronization_completed ();
+ } else {
+ sync_process_enter ();
+ }
+ }
+}
+
+static void dummy_sync_init (void)
+{
+}
+
+static void dummy_sync_abort (void)
+{
+}
+
+static int dummy_sync_process (void)
+{
+ return (0);
+}
+
+static void dummy_sync_activate (void)
+{
+}
+
+static int service_entry_compare (const void *a, const void *b)
+{
+ const struct service_entry *service_entry_a = a;
+ const struct service_entry *service_entry_b = b;
+
+ return (service_entry_a->service_id > service_entry_b->service_id);
+}
+
+static void sync_service_build_handler (unsigned int nodeid, const void *msg)
+{
+ const struct req_exec_service_build_message *req_exec_service_build_message = msg;
+ int i, j;
+ int barrier_reached = 1;
+ int found;
+ int qsort_trigger = 0;
+
+ if (memcmp (&my_ring_id, &req_exec_service_build_message->ring_id,
+ sizeof (struct memb_ring_id)) != 0) {
+
+ return;
+ }
+ for (i = 0; i < req_exec_service_build_message->service_list_entries; i++) {
+
+ found = 0;
+ for (j = 0; j < my_service_list_entries; j++) {
+ if (req_exec_service_build_message->service_list[i] ==
+ my_service_list[j].service_id) {
+ found = 1;
+ break;
+ }
+ }
+ if (found == 0) {
+ my_service_list[my_service_list_entries].state =
+ INIT;
+ my_service_list[my_service_list_entries].service_id =
+ req_exec_service_build_message->service_list[i];
+ sprintf (my_service_list[my_service_list_entries].name,
+ "External Service (id = %d)\n",
+ req_exec_service_build_message->service_list[i]);
+ my_service_list[my_service_list_entries].sync_init =
+ dummy_sync_init;
+ my_service_list[my_service_list_entries].sync_abort =
+ dummy_sync_abort;
+ my_service_list[my_service_list_entries].sync_process =
+ dummy_sync_process;
+ my_service_list[my_service_list_entries].sync_activate =
+ dummy_sync_activate;
+ my_service_list_entries += 1;
+
+ qsort_trigger = 1;
+ }
+ }
+ if (qsort_trigger) {
+ qsort (my_service_list, my_service_list_entries,
+ sizeof (struct service_entry), service_entry_compare);
+ }
+ for (i = 0; i < my_processor_list_entries; i++) {
+ if (my_processor_list[i].nodeid == nodeid) {
+ my_processor_list[i].received = 1;
+ }
+ }
+ for (i = 0; i < my_processor_list_entries; i++) {
+ if (my_processor_list[i].received == 0) {
+ barrier_reached = 0;
+ }
+ }
+ if (barrier_reached) {
+ sync_process_enter ();
+ }
+}
+
+static void sync_deliver_fn (
+ unsigned int nodeid,
+ const void *msg,
+ unsigned int msg_len,
+ int endian_conversion_required)
+{
+ coroipc_request_header_t *header = (coroipc_request_header_t *)msg;
+
+ switch (header->id) {
+ case MESSAGE_REQ_SYNC_BARRIER:
+ sync_barrier_handler (nodeid, msg);
+ break;
+ case MESSAGE_REQ_SYNC_SERVICE_BUILD:
+ sync_service_build_handler (nodeid, msg);
+ break;
+ }
+}
+
+static void barrier_message_transmit (void)
+{
+ struct iovec iovec;
+ struct req_exec_barrier_message req_exec_barrier_message;
+ int res;
+
+ req_exec_barrier_message.header.size = sizeof (struct req_exec_barrier_message);
+ req_exec_barrier_message.header.id = MESSAGE_REQ_SYNC_BARRIER;
+
+ memcpy (&req_exec_barrier_message.ring_id, &my_ring_id,
+ sizeof (struct memb_ring_id));
+
+ iovec.iov_base = (char *)&req_exec_barrier_message;
+ iovec.iov_len = sizeof (req_exec_barrier_message);
+
+ res = totempg_groups_mcast_joined (sync_group_handle,
+ &iovec, 1, TOTEMPG_AGREED);
+}
+
+static void service_build_message_transmit (struct req_exec_service_build_message *service_build_message)
+{
+ struct iovec iovec;
+ int res;
+
+ service_build_message->header.size = sizeof (struct req_exec_service_build_message);
+ service_build_message->header.id = MESSAGE_REQ_SYNC_SERVICE_BUILD;
+
+ memcpy (&service_build_message->ring_id, &my_ring_id,
+ sizeof (struct memb_ring_id));
+
+ iovec.iov_base = (void *)service_build_message;
+ iovec.iov_len = sizeof (struct req_exec_service_build_message);
+
+ res = totempg_groups_mcast_joined (sync_group_handle,
+ &iovec, 1, TOTEMPG_AGREED);
+}
+
+static void sync_barrier_enter (void)
+{
+ my_state = SYNC_BARRIER;
+ barrier_message_transmit ();
+}
+
+static void sync_process_enter (void)
+{
+ int i;
+
+ my_state = SYNC_PROCESS;
+
+ /*
+ * No syncv2 services
+ */
+assert (my_service_list_entries);
+ if (my_service_list_entries == 0) {
+ my_state = SYNC_SERVICELIST_BUILD;
+ sync_synchronization_completed ();
+ return;
+ }
+ for (i = 0; i < my_processor_list_entries; i++) {
+ my_processor_list[i].received = 0;
+ }
+ schedwrk_create (&my_schedwrk_handle,
+ schedwrk_processor,
+ NULL);
+}
+
+static void sync_servicelist_build_enter (
+ const unsigned int *member_list,
+ size_t member_list_entries,
+ const struct memb_ring_id *ring_id)
+{
+ struct req_exec_service_build_message service_build;
+ int i;
+
+ my_state = SYNC_SERVICELIST_BUILD;
+ for (i = 0; i < member_list_entries; i++) {
+ my_processor_list[i].nodeid = member_list[i];
+ my_processor_list[i].received = 0;
+ }
+ my_processor_list_entries = member_list_entries;
+
+ my_processing_idx = 0;
+
+ memcpy (my_service_list, my_initial_service_list,
+ sizeof (struct service_entry) *
+ my_initial_service_list_entries);
+ my_service_list_entries = my_initial_service_list_entries;
+
+ for (i = 0; i < my_initial_service_list[i].service_id; i++) {
+ service_build.service_list[i] =
+ my_initial_service_list[i].service_id;
+ }
+ service_build.service_list_entries = i;
+
+ service_build_message_transmit (&service_build);
+}
+
+static int schedwrk_processor (const void *context)
+{
+ int res;
+
+ if (my_service_list[my_processing_idx].state == INIT) {
+ my_service_list[my_processing_idx].state = PROCESS;
+ my_service_list[my_processing_idx].sync_init ();
+ }
+ if (my_service_list[my_processing_idx].state == PROCESS) {
+ my_service_list[my_processing_idx].state = PROCESS;
+ res = my_service_list[my_processing_idx].sync_process ();
+ if (res != -1) {
+ my_service_list[my_processing_idx].state = ACTIVATE;
+ } else {
+ return (-1);
+ }
+ }
+ if (my_service_list[my_processing_idx].state == ACTIVATE) {
+ my_service_list[my_processing_idx].state = ACTIVATE;
+ my_service_list[my_processing_idx].sync_activate ();
+ log_printf (LOGSYS_LEVEL_DEBUG, "Committing synchronization for %s\n",
+ my_service_list[my_processing_idx].name);
+ sync_barrier_enter();
+ }
+ return (0);
+}
+
+void sync_v2_start (
+ const unsigned int *member_list,
+ size_t member_list_entries,
+ const struct memb_ring_id *ring_id)
+{
+ memcpy (&my_ring_id, ring_id, sizeof (struct memb_ring_id));
+
+ sync_servicelist_build_enter (member_list, member_list_entries, ring_id);
+}
+
+void sync_v2_abort (void)
+{
+ if (my_state == SYNC_PROCESS) {
+ schedwrk_destroy (my_schedwrk_handle);
+ my_service_list[my_processing_idx].sync_abort ();
+ }
+}
diff --git a/exec/sync.h b/exec/syncv2.h
similarity index 70%
copy from exec/sync.h
copy to exec/syncv2.h
index cb6f44b1..13427229 100644
--- a/exec/sync.h
+++ b/exec/syncv2.h
@@ -1,67 +1,53 @@
/*
- * Copyright (c) 2002-2004 MontaVista Software, Inc.
+ * Copyright (c) 2009 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Steven Dake (sdake@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
-#ifndef SYNC_H_DEFINED
-#define SYNC_H_DEFINED
+#ifndef SYNCV2_H_DEFINED
+#define SYNCV2_H_DEFINED
-#include <netinet/in.h>
-#include <corosync/totem/totempg.h>
-#include "totemsrp.h"
+#include "sync.h"
-struct sync_callbacks {
- void (*sync_init) (void);
- int (*sync_process) (void);
- void (*sync_activate) (void);
- void (*sync_abort) (void);
- const char *name;
-};
-
-struct corosync_api_v1;
-int sync_register (
- int (*sync_callbacks_retrieve) (int sync_id, struct sync_callbacks *callbacks),
+extern int sync_v2_init (
+ int (*sync_callbacks_retrieve) (
+ int service_id,
+ struct sync_callbacks *callbacks),
void (*synchronization_completed) (void));
-int sync_in_process (void);
-
-int sync_primary_designated (void);
+extern void sync_v2_start (
+ const unsigned int *member_list,
+ size_t member_list_entries,
+ const struct memb_ring_id *ring_id);
-/**
- * Execute synchronization upon request for the named service
- * @param name service handler name to synchronize
- *
- * @return int 0 OK, error code otherwise
- */
-extern int sync_request (const char *name);
+extern void sync_v2_abort (void);
#endif /* SYNC_H_DEFINED */
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sat, Nov 23, 7:20 AM (17 h, 57 s)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1018394
Default Alt Text
(69 KB)
Attached To
Mode
rC Corosync
Attached
Detach File
Event Timeline
Log In to Comment