Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/exec/coroipcs.c b/exec/coroipcs.c
index 82a42005..2e93a977 100644
--- a/exec/coroipcs.c
+++ b/exec/coroipcs.c
@@ -1,1790 +1,1790 @@
/*
* Copyright (c) 2006-2009 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Steven Dake (sdake@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <config.h>
#ifndef _GNU_SOURCE
#define _GNU_SOURCE 1
#endif
#include <pthread.h>
#include <limits.h>
#include <assert.h>
#include <pwd.h>
#include <grp.h>
#include <sys/types.h>
#include <sys/poll.h>
#include <sys/uio.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/wait.h>
#include <sys/stat.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <signal.h>
#include <sched.h>
#include <time.h>
#if defined(HAVE_GETPEERUCRED)
#include <ucred.h>
#endif
#include <string.h>
#include <sys/shm.h>
#include <corosync/corotypes.h>
#include <corosync/list.h>
#include <corosync/coroipc_types.h>
#include <corosync/hdb.h>
#include <corosync/coroipcs.h>
#include <corosync/coroipc_ipc.h>
#define LOGSYS_UTILS_ONLY 1
#include <corosync/engine/logsys.h>
#if _POSIX_THREAD_PROCESS_SHARED > 0
#include <semaphore.h>
#else
#include <sys/sem.h>
#endif
#include "util.h"
#ifndef MSG_NOSIGNAL
#define MSG_NOSIGNAL 0
#endif
#define SERVER_BACKLOG 5
#define MSG_SEND_LOCKED 0
#define MSG_SEND_UNLOCKED 1
#define POLL_STATE_IN 1
#define POLL_STATE_INOUT 2
static struct coroipcs_init_state_v2 *api = NULL;
DECLARE_LIST_INIT (conn_info_list_head);
DECLARE_LIST_INIT (conn_info_exit_list_head);
struct outq_item {
void *msg;
size_t mlen;
struct list_head list;
};
struct zcb_mapped {
struct list_head list;
void *addr;
size_t size;
};
#if _POSIX_THREAD_PROCESS_SHARED < 1
#if defined(_SEM_SEMUN_UNDEFINED)
union semun {
int val;
struct semid_ds *buf;
unsigned short int *array;
struct seminfo *__buf;
};
#endif
#endif
enum conn_state {
CONN_STATE_THREAD_INACTIVE = 0,
CONN_STATE_THREAD_ACTIVE = 1,
CONN_STATE_THREAD_REQUEST_EXIT = 2,
CONN_STATE_THREAD_DESTROYED = 3,
CONN_STATE_LIB_EXIT_CALLED = 4,
CONN_STATE_DISCONNECT_INACTIVE = 5
};
struct conn_info {
int fd;
pthread_t thread;
pid_t client_pid;
pthread_attr_t thread_attr;
unsigned int service;
enum conn_state state;
int refcount;
hdb_handle_t stats_handle;
#if _POSIX_THREAD_PROCESS_SHARED < 1
key_t semkey;
#endif
unsigned int pending_semops;
pthread_mutex_t mutex;
struct control_buffer *control_buffer;
char *request_buffer;
char *response_buffer;
char *dispatch_buffer;
size_t control_size;
size_t request_size;
size_t response_size;
size_t dispatch_size;
struct list_head outq_head;
void *private_data;
struct list_head list;
char setup_msg[sizeof (mar_req_setup_t)];
unsigned int setup_bytes_read;
struct list_head zcb_mapped_list_head;
char *sending_allowed_private_data[64];
int poll_state;
};
static int shared_mem_dispatch_bytes_left (const struct conn_info *conn_info);
static void outq_flush (struct conn_info *conn_info);
static int priv_change (struct conn_info *conn_info);
static void ipc_disconnect (struct conn_info *conn_info);
static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len,
int locked);
static void _corosync_ipc_init(void);
#define log_printf(level, format, args...) \
do { \
if (api->log_printf) \
api->log_printf ( \
LOGSYS_ENCODE_RECID(level, \
api->log_subsys_id, \
LOGSYS_RECID_LOG), \
__FUNCTION__, __FILE__, __LINE__, \
(const char *)format, ##args); \
else \
api->old_log_printf ((const char *)format, ##args); \
} while (0)
static hdb_handle_t dummy_stats_create_connection (
const char *name,
pid_t pid,
int fd)
{
return (0ULL);
}
static void dummy_stats_destroy_connection (
hdb_handle_t handle)
{
}
static void dummy_stats_update_value (
hdb_handle_t handle,
const char *name,
const void *value,
size_t value_size)
{
}
static void dummy_stats_increment_value (
hdb_handle_t handle,
const char *name)
{
}
static int
memory_map (
const char *path,
size_t bytes,
void **buf)
{
int32_t fd;
void *addr_orig;
void *addr;
int32_t res;
fd = open (path, O_RDWR, 0600);
unlink (path);
if (fd == -1) {
return (-1);
}
res = ftruncate (fd, bytes);
if (res == -1) {
goto error_close_unlink;
}
addr_orig = mmap (NULL, bytes, PROT_NONE,
MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
if (addr_orig == MAP_FAILED) {
goto error_close_unlink;
}
addr = mmap (addr_orig, bytes, PROT_READ | PROT_WRITE,
MAP_FIXED | MAP_SHARED, fd, 0);
if (addr != addr_orig) {
munmap(addr_orig, bytes);
goto error_close_unlink;
}
#ifdef COROSYNC_BSD
madvise(addr, bytes, MADV_NOSYNC);
#endif
res = close (fd);
if (res) {
return (-1);
}
*buf = addr_orig;
return (0);
error_close_unlink:
close (fd);
unlink(path);
return -1;
}
static int
circular_memory_map (
const char *path,
size_t bytes,
void **buf)
{
int32_t fd;
void *addr_orig;
void *addr;
int32_t res;
fd = open (path, O_RDWR, 0600);
unlink (path);
if (fd == -1) {
return (-1);
}
res = ftruncate (fd, bytes);
if (res == -1) {
goto error_close_unlink;
}
addr_orig = mmap (NULL, bytes << 1, PROT_NONE,
MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
if (addr_orig == MAP_FAILED) {
munmap(addr_orig, bytes);
goto error_close_unlink;
}
addr = mmap (addr_orig, bytes, PROT_READ | PROT_WRITE,
MAP_FIXED | MAP_SHARED, fd, 0);
if (addr != addr_orig) {
munmap(addr_orig, bytes);
goto error_close_unlink;
}
#ifdef COROSYNC_BSD
madvise(addr_orig, bytes, MADV_NOSYNC);
#endif
addr = mmap (((char *)addr_orig) + bytes,
bytes, PROT_READ | PROT_WRITE,
MAP_FIXED | MAP_SHARED, fd, 0);
if (addr == MAP_FAILED) {
munmap(addr_orig, bytes);
munmap(addr, bytes);
goto error_close_unlink;
}
#ifdef COROSYNC_BSD
madvise(((char *)addr_orig) + bytes, bytes, MADV_NOSYNC);
#endif
res = close (fd);
if (res) {
munmap(addr_orig, bytes);
munmap(addr, bytes);
return (-1);
}
*buf = addr_orig;
return (0);
error_close_unlink:
close (fd);
unlink(path);
return (-1);
}
static inline int
circular_memory_unmap (void *buf, size_t bytes)
{
int res;
res = munmap (buf, bytes << 1);
return (res);
}
static int32_t flow_control_state_set (
struct conn_info *conn_info,
int flow_control_state)
{
if (conn_info->control_buffer->flow_control_enabled == flow_control_state) {
return 0;
}
if (flow_control_state == 0) {
log_printf (LOGSYS_LEVEL_DEBUG,
"Disabling flow control for %d\n",
conn_info->client_pid);
} else
if (flow_control_state == 1) {
log_printf (LOGSYS_LEVEL_DEBUG,
"Enabling flow control for %d\n",
conn_info->client_pid);
}
conn_info->control_buffer->flow_control_enabled = flow_control_state;
return 1;
}
static void flow_control_stats_update (
hdb_handle_t stats_handle,
int flow_control_state)
{
uint32_t fc_state = flow_control_state;
api->stats_update_value (stats_handle, "flow_control",
&fc_state, sizeof(fc_state));
api->stats_increment_value (stats_handle, "flow_control_count");
}
static inline int zcb_free (struct zcb_mapped *zcb_mapped)
{
unsigned int res;
res = munmap (zcb_mapped->addr, zcb_mapped->size);
list_del (&zcb_mapped->list);
free (zcb_mapped);
return (res);
}
static inline int zcb_by_addr_free (struct conn_info *conn_info, void *addr)
{
struct list_head *list;
struct zcb_mapped *zcb_mapped;
unsigned int res = 0;
for (list = conn_info->zcb_mapped_list_head.next;
list != &conn_info->zcb_mapped_list_head; list = list->next) {
zcb_mapped = list_entry (list, struct zcb_mapped, list);
if (zcb_mapped->addr == addr) {
res = zcb_free (zcb_mapped);
break;
}
}
return (res);
}
static inline int zcb_all_free (
struct conn_info *conn_info)
{
struct list_head *list;
struct zcb_mapped *zcb_mapped;
for (list = conn_info->zcb_mapped_list_head.next;
list != &conn_info->zcb_mapped_list_head;) {
zcb_mapped = list_entry (list, struct zcb_mapped, list);
list = list->next;
zcb_free (zcb_mapped);
}
return (0);
}
static inline int zcb_alloc (
struct conn_info *conn_info,
const char *path_to_file,
size_t size,
void **addr)
{
struct zcb_mapped *zcb_mapped;
unsigned int res;
zcb_mapped = malloc (sizeof (struct zcb_mapped));
if (zcb_mapped == NULL) {
return (-1);
}
res = memory_map (
path_to_file,
size,
addr);
if (res == -1) {
free (zcb_mapped);
return (-1);
}
list_init (&zcb_mapped->list);
zcb_mapped->addr = *addr;
zcb_mapped->size = size;
list_add_tail (&zcb_mapped->list, &conn_info->zcb_mapped_list_head);
return (0);
}
static int ipc_thread_active (void *conn)
{
struct conn_info *conn_info = (struct conn_info *)conn;
int retval = 0;
pthread_mutex_lock (&conn_info->mutex);
if (conn_info->state == CONN_STATE_THREAD_ACTIVE) {
retval = 1;
}
pthread_mutex_unlock (&conn_info->mutex);
return (retval);
}
static int ipc_thread_exiting (void *conn)
{
struct conn_info *conn_info = (struct conn_info *)conn;
int retval = 1;
pthread_mutex_lock (&conn_info->mutex);
if (conn_info->state == CONN_STATE_THREAD_INACTIVE) {
retval = 0;
} else
if (conn_info->state == CONN_STATE_THREAD_ACTIVE) {
retval = 0;
}
pthread_mutex_unlock (&conn_info->mutex);
return (retval);
}
/*
* returns 0 if should be called again, -1 if finished
*/
static inline int conn_info_destroy (struct conn_info *conn_info)
{
unsigned int res;
void *retval;
list_del (&conn_info->list);
list_init (&conn_info->list);
list_add (&conn_info->list, &conn_info_exit_list_head);
if (conn_info->state == CONN_STATE_THREAD_REQUEST_EXIT) {
res = pthread_join (conn_info->thread, &retval);
conn_info->state = CONN_STATE_THREAD_DESTROYED;
return (0);
}
if (conn_info->state == CONN_STATE_THREAD_INACTIVE ||
conn_info->state == CONN_STATE_DISCONNECT_INACTIVE) {
list_del (&conn_info->list);
close (conn_info->fd);
api->free (conn_info);
return (-1);
}
if (conn_info->state == CONN_STATE_THREAD_ACTIVE) {
ipc_sem_post (conn_info->control_buffer, SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
return (0);
}
/*
* Retry library exit function if busy
*/
if (conn_info->state == CONN_STATE_THREAD_DESTROYED) {
api->serialize_lock ();
res = api->exit_fn_get (conn_info->service) (conn_info);
api->serialize_unlock ();
api->stats_destroy_connection (conn_info->stats_handle);
if (res == -1) {
return (0);
} else {
conn_info->state = CONN_STATE_LIB_EXIT_CALLED;
}
}
pthread_mutex_lock (&conn_info->mutex);
if (conn_info->refcount > 0) {
pthread_mutex_unlock (&conn_info->mutex);
return (0);
}
list_del (&conn_info->list);
pthread_mutex_unlock (&conn_info->mutex);
/*
* Let library know, that connection is now closed
*/
conn_info->control_buffer->ipc_closed = 1;
ipc_sem_post (conn_info->control_buffer, SEMAPHORE_RESPONSE);
ipc_sem_post (conn_info->control_buffer, SEMAPHORE_DISPATCH);
#if _POSIX_THREAD_PROCESS_SHARED > 0
sem_destroy (&conn_info->control_buffer->sem_request_or_flush_or_exit);
sem_destroy (&conn_info->control_buffer->sem_request);
sem_destroy (&conn_info->control_buffer->sem_response);
sem_destroy (&conn_info->control_buffer->sem_dispatch);
#else
semctl (conn_info->control_buffer->semid, 0, IPC_RMID);
#endif
/*
* Destroy shared memory segment and semaphore
*/
res = munmap ((void *)conn_info->control_buffer, conn_info->control_size);
res = munmap ((void *)conn_info->request_buffer, conn_info->request_size);
res = munmap ((void *)conn_info->response_buffer, conn_info->response_size);
/*
* Free allocated data needed to retry exiting library IPC connection
*/
if (conn_info->private_data) {
api->free (conn_info->private_data);
}
close (conn_info->fd);
res = circular_memory_unmap (conn_info->dispatch_buffer, conn_info->dispatch_size);
zcb_all_free (conn_info);
api->free (conn_info);
return (-1);
}
union u {
uint64_t server_addr;
void *server_ptr;
};
static uint64_t void2serveraddr (void *server_ptr)
{
union u u;
u.server_ptr = server_ptr;
return (u.server_addr);
}
static void *serveraddr2void (uint64_t server_addr)
{
union u u;
u.server_addr = server_addr;
return (u.server_ptr);
};
static inline void zerocopy_operations_process (
struct conn_info *conn_info,
coroipc_request_header_t **header_out,
unsigned int *new_message)
{
coroipc_request_header_t *header;
header = (coroipc_request_header_t *)conn_info->request_buffer;
if (header->id == ZC_ALLOC_HEADER) {
mar_req_coroipcc_zc_alloc_t *hdr = (mar_req_coroipcc_zc_alloc_t *)header;
coroipc_response_header_t res_header;
void *addr = NULL;
struct coroipcs_zc_header *zc_header;
unsigned int res;
res = zcb_alloc (conn_info, hdr->path_to_file, hdr->map_size,
&addr);
zc_header = (struct coroipcs_zc_header *)addr;
zc_header->server_address = void2serveraddr(addr);
res_header.size = sizeof (coroipc_response_header_t);
res_header.id = 0;
coroipcs_response_send (
conn_info, &res_header,
res_header.size);
*new_message = 0;
return;
} else
if (header->id == ZC_FREE_HEADER) {
mar_req_coroipcc_zc_free_t *hdr = (mar_req_coroipcc_zc_free_t *)header;
coroipc_response_header_t res_header;
void *addr = NULL;
addr = serveraddr2void (hdr->server_address);
zcb_by_addr_free (conn_info, addr);
res_header.size = sizeof (coroipc_response_header_t);
res_header.id = 0;
coroipcs_response_send (
conn_info, &res_header,
res_header.size);
*new_message = 0;
return;
} else
if (header->id == ZC_EXECUTE_HEADER) {
mar_req_coroipcc_zc_execute_t *hdr = (mar_req_coroipcc_zc_execute_t *)header;
header = (coroipc_request_header_t *)(((char *)serveraddr2void(hdr->server_address) + sizeof (struct coroipcs_zc_header)));
}
*header_out = header;
*new_message = 1;
}
static void *pthread_ipc_consumer (void *conn)
{
struct conn_info *conn_info = (struct conn_info *)conn;
int res;
coroipc_request_header_t *header;
coroipc_response_header_t coroipc_response_header;
int send_ok;
unsigned int new_message;
int sem_value = 0;
#if defined(HAVE_PTHREAD_SETSCHEDPARAM) && defined(HAVE_SCHED_GET_PRIORITY_MAX)
if (api->sched_policy != 0) {
res = pthread_setschedparam (conn_info->thread,
api->sched_policy, api->sched_param);
}
#endif
for (;;) {
ipc_sem_wait (conn_info->control_buffer, SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT, IPC_SEMWAIT_NOFILE);
if (ipc_thread_active (conn_info) == 0) {
coroipcs_refcount_dec (conn_info);
pthread_exit (0);
}
outq_flush (conn_info);
ipc_sem_getvalue (conn_info->control_buffer, SEMAPHORE_REQUEST, &sem_value);
if (sem_value > 0) {
res = ipc_sem_wait (conn_info->control_buffer, SEMAPHORE_REQUEST, IPC_SEMWAIT_NOFILE);
} else {
continue;
}
zerocopy_operations_process (conn_info, &header, &new_message);
/*
* There is no new message to process, continue for loop
*/
if (new_message == 0) {
continue;
}
coroipcs_refcount_inc (conn);
send_ok = api->sending_allowed (conn_info->service,
header->id,
header,
conn_info->sending_allowed_private_data);
/*
* This happens when the message contains some kind of invalid
* parameter, such as an invalid size
*/
if (send_ok == -1) {
coroipc_response_header.size = sizeof (coroipc_response_header_t);
coroipc_response_header.id = 0;
coroipc_response_header.error = CS_ERR_INVALID_PARAM;
coroipcs_response_send (conn_info,
&coroipc_response_header,
sizeof (coroipc_response_header_t));
} else
if (send_ok) {
api->stats_increment_value (conn_info->stats_handle, "requests");
api->serialize_lock();
api->handler_fn_get (conn_info->service, header->id) (conn_info, header);
api->serialize_unlock();
} else {
/*
* Overload, tell library to retry
*/
coroipc_response_header.size = sizeof (coroipc_response_header_t);
coroipc_response_header.id = 0;
coroipc_response_header.error = CS_ERR_TRY_AGAIN;
coroipcs_response_send (conn_info,
&coroipc_response_header,
sizeof (coroipc_response_header_t));
}
api->sending_allowed_release (conn_info->sending_allowed_private_data);
coroipcs_refcount_dec (conn);
}
pthread_exit (0);
}
static int
req_setup_send (
struct conn_info *conn_info,
int error)
{
mar_res_setup_t res_setup;
unsigned int res;
memset (&res_setup, 0, sizeof (res_setup));
res_setup.error = error;
retry_send:
res = send (conn_info->fd, &res_setup, sizeof (mar_res_setup_t), MSG_WAITALL);
if (res == -1 && errno == EINTR) {
api->stats_increment_value (conn_info->stats_handle, "send_retry_count");
goto retry_send;
} else
if (res == -1 && errno == EAGAIN) {
api->stats_increment_value (conn_info->stats_handle, "send_retry_count");
goto retry_send;
}
return (0);
}
static cs_error_t
req_setup_recv (
struct conn_info *conn_info)
{
int res;
struct msghdr msg_recv;
struct iovec iov_recv;
cs_error_t auth_res = CS_ERR_LIBRARY;
#ifdef COROSYNC_LINUX
struct cmsghdr *cmsg;
char cmsg_cred[CMSG_SPACE (sizeof (struct ucred))];
int off = 0;
int on = 1;
struct ucred *cred;
#endif
msg_recv.msg_flags = 0;
msg_recv.msg_iov = &iov_recv;
msg_recv.msg_iovlen = 1;
msg_recv.msg_name = 0;
msg_recv.msg_namelen = 0;
#ifdef COROSYNC_LINUX
msg_recv.msg_control = (void *)cmsg_cred;
msg_recv.msg_controllen = sizeof (cmsg_cred);
#endif
#ifdef COROSYNC_SOLARIS
msg_recv.msg_accrights = 0;
msg_recv.msg_accrightslen = 0;
#endif /* COROSYNC_SOLARIS */
iov_recv.iov_base = &conn_info->setup_msg[conn_info->setup_bytes_read];
iov_recv.iov_len = sizeof (mar_req_setup_t) - conn_info->setup_bytes_read;
#ifdef COROSYNC_LINUX
setsockopt(conn_info->fd, SOL_SOCKET, SO_PASSCRED, &on, sizeof (on));
#endif
retry_recv:
res = recvmsg (conn_info->fd, &msg_recv, MSG_NOSIGNAL);
if (res == -1 && errno == EINTR) {
api->stats_increment_value (conn_info->stats_handle, "recv_retry_count");
goto retry_recv;
} else
if (res == -1 && errno != EAGAIN) {
return (CS_ERR_LIBRARY);
} else
if (res == 0) {
#if defined(COROSYNC_SOLARIS) || defined(COROSYNC_BSD) || defined(COROSYNC_DARWIN)
/* On many OS poll never return POLLHUP or POLLERR.
* EOF is detected when recvmsg return 0.
*/
ipc_disconnect (conn_info);
return (CS_ERR_LIBRARY);
#else
return (CS_ERR_SECURITY);
#endif
}
conn_info->setup_bytes_read += res;
/*
* currently support getpeerucred, getpeereid, and SO_PASSCRED credential
* retrieval mechanisms for various Platforms
*/
#ifdef HAVE_GETPEERUCRED
/*
* Solaris and some BSD systems
*/
{
ucred_t *uc = NULL;
uid_t euid = -1;
gid_t egid = -1;
if (getpeerucred (conn_info->fd, &uc) == 0) {
euid = ucred_geteuid (uc);
egid = ucred_getegid (uc);
conn_info->client_pid = ucred_getpid (uc);
if (api->security_valid (euid, egid)) {
auth_res = CS_OK;
} else {
auth_res = hdb_error_to_cs(errno);
}
ucred_free(uc);
}
}
#elif HAVE_GETPEEREID
/*
* Usually MacOSX systems
*/
{
uid_t euid;
gid_t egid;
/*
* TODO get the peer's pid.
* conn_info->client_pid = ?;
*/
euid = -1;
egid = -1;
if (getpeereid (conn_info->fd, &euid, &egid) == 0) {
if (api->security_valid (euid, egid)) {
auth_res = CS_OK;
} else {
auth_res = hdb_error_to_cs(errno);
}
}
}
#elif SO_PASSCRED
/*
* Usually Linux systems
*/
cmsg = CMSG_FIRSTHDR (&msg_recv);
assert (cmsg);
cred = (struct ucred *)CMSG_DATA (cmsg);
if (cred) {
conn_info->client_pid = cred->pid;
if (api->security_valid (cred->uid, cred->gid)) {
auth_res = CS_OK;
} else {
auth_res = hdb_error_to_cs(errno);
}
}
#else /* no credentials */
auth_res = CS_OK;
log_printf (LOGSYS_LEVEL_ERROR, "Platform does not support IPC authentication. Using no authentication\n");
#endif /* no credentials */
if (auth_res != CS_OK) {
ipc_disconnect (conn_info);
if (auth_res == CS_ERR_NO_RESOURCES) {
log_printf (LOGSYS_LEVEL_ERROR,
"Not enough file desciptors for IPC connection.\n");
} else {
log_printf (LOGSYS_LEVEL_ERROR, "Invalid IPC credentials.\n");
}
return auth_res;
}
if (conn_info->setup_bytes_read == sizeof (mar_req_setup_t)) {
#ifdef COROSYNC_LINUX
setsockopt(conn_info->fd, SOL_SOCKET, SO_PASSCRED,
&off, sizeof (off));
#endif
return (CS_OK);
}
return (CS_ERR_LIBRARY);
}
static void ipc_disconnect (struct conn_info *conn_info)
{
if (conn_info->state == CONN_STATE_THREAD_INACTIVE) {
conn_info->state = CONN_STATE_DISCONNECT_INACTIVE;
return;
}
if (conn_info->state != CONN_STATE_THREAD_ACTIVE) {
return;
}
pthread_mutex_lock (&conn_info->mutex);
conn_info->state = CONN_STATE_THREAD_REQUEST_EXIT;
pthread_mutex_unlock (&conn_info->mutex);
ipc_sem_post (conn_info->control_buffer, SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
}
static int conn_info_create (int fd)
{
struct conn_info *conn_info;
conn_info = api->malloc (sizeof (struct conn_info));
if (conn_info == NULL) {
return (-1);
}
memset (conn_info, 0, sizeof (struct conn_info));
conn_info->fd = fd;
conn_info->client_pid = 0;
conn_info->service = SOCKET_SERVICE_INIT;
conn_info->state = CONN_STATE_THREAD_INACTIVE;
conn_info->poll_state = POLL_STATE_IN;
list_init (&conn_info->outq_head);
list_init (&conn_info->list);
list_init (&conn_info->zcb_mapped_list_head);
list_add (&conn_info->list, &conn_info_list_head);
api->poll_dispatch_add (fd, conn_info);
return (0);
}
#if defined(COROSYNC_LINUX) || defined(COROSYNC_SOLARIS)
/* SUN_LEN is broken for abstract namespace
*/
#define COROSYNC_SUN_LEN(a) sizeof(*(a))
#else
#define COROSYNC_SUN_LEN(a) SUN_LEN(a)
#endif
/*
* Exported functions
*/
extern void coroipcs_ipc_init_v2 (
struct coroipcs_init_state_v2 *init_state_v2)
{
api = init_state_v2;
api->old_log_printf = NULL;
log_printf (LOGSYS_LEVEL_DEBUG, "you are using ipc api v2\n");
_corosync_ipc_init ();
}
extern void coroipcs_ipc_init (
struct coroipcs_init_state *init_state)
{
api = calloc (sizeof(struct coroipcs_init_state_v2), 1);
/* v2 api */
api->stats_create_connection = dummy_stats_create_connection;
api->stats_destroy_connection = dummy_stats_destroy_connection;
api->stats_update_value = dummy_stats_update_value;
api->stats_increment_value = dummy_stats_increment_value;
api->log_printf = NULL;
/* v1 api */
api->socket_name = init_state->socket_name;
api->sched_policy = init_state->sched_policy;
api->sched_param = init_state->sched_param;
api->malloc = init_state->malloc;
api->free = init_state->free;
api->old_log_printf = init_state->log_printf;
api->fatal_error = init_state->fatal_error;
api->security_valid = init_state->security_valid;
api->service_available = init_state->service_available;
api->private_data_size_get = init_state->private_data_size_get;
api->serialize_lock = init_state->serialize_lock;
api->serialize_unlock = init_state->serialize_unlock;
api->sending_allowed = init_state->sending_allowed;
api->sending_allowed_release = init_state->sending_allowed_release;
api->poll_accept_add = init_state->poll_accept_add;
api->poll_dispatch_add = init_state->poll_dispatch_add;
api->poll_dispatch_modify = init_state->poll_dispatch_modify;
api->init_fn_get = init_state->init_fn_get;
api->exit_fn_get = init_state->exit_fn_get;
api->handler_fn_get = init_state->handler_fn_get;
log_printf (LOGSYS_LEVEL_DEBUG, "you are using ipc api v1\n");
_corosync_ipc_init ();
}
static void _corosync_ipc_init(void)
{
int server_fd;
struct sockaddr_un un_addr;
int res;
/*
* Create socket for IPC clients, name socket, listen for connections
*/
#if defined(COROSYNC_SOLARIS)
server_fd = socket (PF_UNIX, SOCK_STREAM, 0);
#else
server_fd = socket (PF_LOCAL, SOCK_STREAM, 0);
#endif
if (server_fd == -1) {
log_printf (LOGSYS_LEVEL_CRIT, "Cannot create client connections socket.\n");
api->fatal_error ("Can't create library listen socket");
}
res = fcntl (server_fd, F_SETFL, O_NONBLOCK);
if (res == -1) {
char error_str[100];
strerror_r (errno, error_str, 100);
log_printf (LOGSYS_LEVEL_CRIT, "Could not set non-blocking operation on server socket: %s\n", error_str);
api->fatal_error ("Could not set non-blocking operation on server socket");
}
memset (&un_addr, 0, sizeof (struct sockaddr_un));
un_addr.sun_family = AF_UNIX;
#if defined(COROSYNC_BSD) || defined(COROSYNC_DARWIN)
un_addr.sun_len = SUN_LEN(&un_addr);
#endif
#if defined(COROSYNC_LINUX)
sprintf (un_addr.sun_path + 1, "%s", api->socket_name);
#else
{
struct stat stat_out;
res = stat (SOCKETDIR, &stat_out);
if (res == -1 || (res == 0 && !S_ISDIR(stat_out.st_mode))) {
log_printf (LOGSYS_LEVEL_CRIT, "Required directory not present %s\n", SOCKETDIR);
api->fatal_error ("Please create required directory.");
}
sprintf (un_addr.sun_path, "%s/%s", SOCKETDIR, api->socket_name);
unlink (un_addr.sun_path);
}
#endif
res = bind (server_fd, (struct sockaddr *)&un_addr, COROSYNC_SUN_LEN(&un_addr));
if (res) {
char error_str[100];
strerror_r (errno, error_str, 100);
log_printf (LOGSYS_LEVEL_CRIT, "Could not bind AF_UNIX (%s): %s.\n", un_addr.sun_path, error_str);
api->fatal_error ("Could not bind to AF_UNIX socket\n");
}
/*
* Allow eveyrone to write to the socket since the IPC layer handles
* security automatically
*/
#if !defined(COROSYNC_LINUX)
res = chmod (un_addr.sun_path, S_IRWXU|S_IRWXG|S_IRWXO);
#endif
listen (server_fd, SERVER_BACKLOG);
/*
* Setup connection dispatch routine
*/
api->poll_accept_add (server_fd);
}
void coroipcs_ipc_exit (void)
{
struct list_head *list;
struct conn_info *conn_info;
unsigned int res;
for (list = conn_info_list_head.next; list != &conn_info_list_head;
list = list->next) {
conn_info = list_entry (list, struct conn_info, list);
if (conn_info->state != CONN_STATE_THREAD_ACTIVE)
continue;
ipc_disconnect (conn_info);
#if _POSIX_THREAD_PROCESS_SHARED > 0
sem_destroy (&conn_info->control_buffer->sem_request_or_flush_or_exit);
sem_destroy (&conn_info->control_buffer->sem_request);
sem_destroy (&conn_info->control_buffer->sem_response);
sem_destroy (&conn_info->control_buffer->sem_dispatch);
#else
semctl (conn_info->control_buffer->semid, 0, IPC_RMID);
#endif
/*
* Unmap memory segments
*/
res = munmap ((void *)conn_info->control_buffer,
conn_info->control_size);
res = munmap ((void *)conn_info->request_buffer,
conn_info->request_size);
res = munmap ((void *)conn_info->response_buffer,
conn_info->response_size);
res = circular_memory_unmap (conn_info->dispatch_buffer,
conn_info->dispatch_size);
}
}
int coroipcs_ipc_service_exit (unsigned int service)
{
struct list_head *list, *list_next;
struct conn_info *conn_info;
for (list = conn_info_list_head.next; list != &conn_info_list_head;
list = list_next) {
list_next = list->next;
conn_info = list_entry (list, struct conn_info, list);
if (conn_info->service != service ||
(conn_info->state != CONN_STATE_THREAD_ACTIVE && conn_info->state != CONN_STATE_THREAD_REQUEST_EXIT)) {
continue;
}
ipc_disconnect (conn_info);
api->poll_dispatch_destroy (conn_info->fd, NULL);
while (conn_info_destroy (conn_info) != -1)
;
/*
* We will return to prevent token loss. Schedwrk will call us again.
*/
return (-1);
}
/*
* No conn info left in active list. We will traverse thru exit list. If there is any
* conn_info->service == service, we will wait to proper end -> return -1
*/
for (list = conn_info_exit_list_head.next; list != &conn_info_exit_list_head; list = list->next) {
conn_info = list_entry (list, struct conn_info, list);
if (conn_info->service == service) {
return (-1);
}
}
return (0);
}
/*
* Get the conn info private data
*/
void *coroipcs_private_data_get (void *conn)
{
struct conn_info *conn_info = (struct conn_info *)conn;
return (conn_info->private_data);
}
int coroipcs_response_send (void *conn, const void *msg, size_t mlen)
{
struct conn_info *conn_info = (struct conn_info *)conn;
memcpy (conn_info->response_buffer, msg, mlen);
ipc_sem_post (conn_info->control_buffer, SEMAPHORE_RESPONSE);
api->stats_increment_value (conn_info->stats_handle, "responses");
return (0);
}
int coroipcs_response_iov_send (void *conn, const struct iovec *iov, unsigned int iov_len)
{
struct conn_info *conn_info = (struct conn_info *)conn;
int write_idx = 0;
int i;
for (i = 0; i < iov_len; i++) {
memcpy (&conn_info->response_buffer[write_idx],
iov[i].iov_base, iov[i].iov_len);
write_idx += iov[i].iov_len;
}
ipc_sem_post (conn_info->control_buffer, SEMAPHORE_RESPONSE);
api->stats_increment_value (conn_info->stats_handle, "responses");
return (0);
}
static int shared_mem_dispatch_bytes_left (const struct conn_info *conn_info)
{
unsigned int n_read;
unsigned int n_write;
unsigned int bytes_left;
n_read = conn_info->control_buffer->read;
n_write = conn_info->control_buffer->write;
if (n_read <= n_write) {
bytes_left = conn_info->dispatch_size - n_write + n_read;
} else {
bytes_left = n_read - n_write;
}
if (bytes_left > 0) {
bytes_left--;
}
return (bytes_left);
}
static void memcpy_dwrap (struct conn_info *conn_info, void *msg, unsigned int len)
{
unsigned int write_idx;
write_idx = conn_info->control_buffer->write;
memcpy (&conn_info->dispatch_buffer[write_idx], msg, len);
- conn_info->control_buffer->write = ((write_idx + len + 7) & 0xFFFFFFFF8) % conn_info->dispatch_size;
+ conn_info->control_buffer->write = ((write_idx + len + 7) & 0xFFFFFFF8) % conn_info->dispatch_size;
}
static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len,
int locked)
{
struct conn_info *conn_info = (struct conn_info *)conn;
int res;
int i;
char buf;
for (i = 0; i < iov_len; i++) {
memcpy_dwrap (conn_info, iov[i].iov_base, iov[i].iov_len);
}
buf = list_empty (&conn_info->outq_head);
res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
if (res != 1) {
conn_info->pending_semops += 1;
if (conn_info->poll_state == POLL_STATE_IN) {
conn_info->poll_state = POLL_STATE_INOUT;
api->poll_dispatch_modify (conn_info->fd,
POLLIN|POLLOUT|POLLNVAL);
}
}
ipc_sem_post (conn_info->control_buffer, SEMAPHORE_DISPATCH);
}
static void outq_flush (struct conn_info *conn_info) {
struct list_head *list, *list_next;
struct outq_item *outq_item;
unsigned int bytes_left;
struct iovec iov;
int32_t q_size_dec = 0;
int32_t i;
int32_t fc_set;
pthread_mutex_lock (&conn_info->mutex);
if (list_empty (&conn_info->outq_head)) {
fc_set = flow_control_state_set (conn_info, 0);
pthread_mutex_unlock (&conn_info->mutex);
if (fc_set) {
flow_control_stats_update (conn_info->stats_handle, 0);
}
return;
}
for (list = conn_info->outq_head.next;
list != &conn_info->outq_head; list = list_next) {
list_next = list->next;
outq_item = list_entry (list, struct outq_item, list);
bytes_left = shared_mem_dispatch_bytes_left (conn_info);
if (bytes_left > outq_item->mlen) {
iov.iov_base = outq_item->msg;
iov.iov_len = outq_item->mlen;
msg_send (conn_info, &iov, 1, MSG_SEND_UNLOCKED);
list_del (list);
api->free (iov.iov_base);
api->free (outq_item);
q_size_dec++;
} else {
break;
}
}
pthread_mutex_unlock (&conn_info->mutex);
/*
* these need to be sent out of the conn_info->mutex
*/
for (i = 0; i < q_size_dec; i++) {
api->stats_decrement_value (conn_info->stats_handle, "queue_size");
api->stats_increment_value (conn_info->stats_handle, "dispatched");
}
}
static int priv_change (struct conn_info *conn_info)
{
mar_req_priv_change req_priv_change;
unsigned int res;
#if _POSIX_THREAD_PROCESS_SHARED < 1
union semun semun;
struct semid_ds ipc_set;
int i;
#endif
retry_recv:
res = recv (conn_info->fd, &req_priv_change,
sizeof (mar_req_priv_change),
MSG_NOSIGNAL);
if (res == -1 && errno == EINTR) {
api->stats_increment_value (conn_info->stats_handle, "recv_retry_count");
goto retry_recv;
}
if (res == -1 && errno == EAGAIN) {
api->stats_increment_value (conn_info->stats_handle, "recv_retry_count");
goto retry_recv;
}
if (res == -1 && errno != EAGAIN) {
return (-1);
}
#if defined(COROSYNC_SOLARIS) || defined(COROSYNC_BSD) || defined(COROSYNC_DARWIN)
/* Error on socket, EOF is detected when recv return 0
*/
if (res == 0) {
return (-1);
}
#endif
#if _POSIX_THREAD_PROCESS_SHARED < 1
ipc_set.sem_perm.uid = req_priv_change.euid;
ipc_set.sem_perm.gid = req_priv_change.egid;
ipc_set.sem_perm.mode = 0600;
semun.buf = &ipc_set;
for (i = 0; i < 3; i++) {
res = semctl (conn_info->control_buffer->semid, 0, IPC_SET, semun);
if (res == -1) {
return (-1);
}
}
#endif
return (0);
}
static void msg_send_or_queue (void *conn, const struct iovec *iov, unsigned int iov_len)
{
struct conn_info *conn_info = (struct conn_info *)conn;
unsigned int bytes_left;
unsigned int bytes_msg = 0;
int i;
struct outq_item *outq_item;
char *write_buf = 0;
/*
* Exit transmission if the connection is dead
*/
if (ipc_thread_active (conn) == 0) {
return;
}
bytes_left = shared_mem_dispatch_bytes_left (conn_info);
for (i = 0; i < iov_len; i++) {
bytes_msg += iov[i].iov_len;
}
if (bytes_left < bytes_msg || list_empty (&conn_info->outq_head) == 0) {
if (flow_control_state_set (conn_info, 1)) {
flow_control_stats_update(conn_info->stats_handle, 1);
}
outq_item = api->malloc (sizeof (struct outq_item));
if (outq_item == NULL) {
ipc_disconnect (conn);
return;
}
outq_item->msg = api->malloc (bytes_msg);
if (outq_item->msg == 0) {
api->free (outq_item);
ipc_disconnect (conn);
return;
}
write_buf = outq_item->msg;
for (i = 0; i < iov_len; i++) {
memcpy (write_buf, iov[i].iov_base, iov[i].iov_len);
write_buf += iov[i].iov_len;
}
outq_item->mlen = bytes_msg;
list_init (&outq_item->list);
pthread_mutex_lock (&conn_info->mutex);
list_add_tail (&outq_item->list, &conn_info->outq_head);
pthread_mutex_unlock (&conn_info->mutex);
api->stats_increment_value (conn_info->stats_handle, "queue_size");
return;
}
msg_send (conn, iov, iov_len, MSG_SEND_LOCKED);
api->stats_increment_value (conn_info->stats_handle, "dispatched");
}
void coroipcs_refcount_inc (void *conn)
{
struct conn_info *conn_info = (struct conn_info *)conn;
pthread_mutex_lock (&conn_info->mutex);
conn_info->refcount++;
pthread_mutex_unlock (&conn_info->mutex);
}
void coroipcs_refcount_dec (void *conn)
{
struct conn_info *conn_info = (struct conn_info *)conn;
pthread_mutex_lock (&conn_info->mutex);
conn_info->refcount--;
pthread_mutex_unlock (&conn_info->mutex);
}
int coroipcs_dispatch_send (void *conn, const void *msg, size_t mlen)
{
struct iovec iov;
iov.iov_base = (void *)msg;
iov.iov_len = mlen;
msg_send_or_queue (conn, &iov, 1);
return (0);
}
int coroipcs_dispatch_iov_send (void *conn, const struct iovec *iov, unsigned int iov_len)
{
msg_send_or_queue (conn, iov, iov_len);
return (0);
}
int coroipcs_handler_accept (
int fd,
int revent,
void *data)
{
socklen_t addrlen;
struct sockaddr_un un_addr;
int new_fd;
#ifdef COROSYNC_LINUX
int on = 1;
#endif
int res;
addrlen = sizeof (struct sockaddr_un);
retry_accept:
new_fd = accept (fd, (struct sockaddr *)&un_addr, &addrlen);
if (new_fd == -1 && errno == EINTR) {
goto retry_accept;
}
if (new_fd == -1) {
char error_str[100];
strerror_r (errno, error_str, 100);
log_printf (LOGSYS_LEVEL_ERROR,
"Could not accept Library connection: %s\n", error_str);
return (0); /* This is an error, but -1 would indicate disconnect from poll loop */
}
res = fcntl (new_fd, F_SETFL, O_NONBLOCK);
if (res == -1) {
char error_str[100];
strerror_r (errno, error_str, 100);
log_printf (LOGSYS_LEVEL_ERROR,
"Could not set non-blocking operation on library connection: %s\n",
error_str);
close (new_fd);
return (0); /* This is an error, but -1 would indicate disconnect from poll loop */
}
/*
* Valid accept
*/
/*
* Request credentials of sender provided by kernel
*/
#ifdef COROSYNC_LINUX
setsockopt(new_fd, SOL_SOCKET, SO_PASSCRED, &on, sizeof (on));
#endif
res = conn_info_create (new_fd);
if (res != 0) {
close (new_fd);
}
return (0);
}
static char * pid_to_name (pid_t pid, char *out_name, size_t name_len)
{
char *name;
char *rest;
FILE *fp;
char fname[32];
char buf[256];
snprintf (fname, 32, "/proc/%d/stat", pid);
fp = fopen (fname, "r");
if (!fp) {
return NULL;
}
if (fgets (buf, sizeof (buf), fp) == NULL) {
fclose (fp);
return NULL;
}
fclose (fp);
name = strrchr (buf, '(');
if (!name) {
return NULL;
}
/* move past the bracket */
name++;
rest = strrchr (buf, ')');
if (rest == NULL || rest[1] != ' ') {
return NULL;
}
*rest = '\0';
/* move past the NULL and space */
rest += 2;
/* copy the name */
strncpy (out_name, name, name_len);
out_name[name_len - 1] = '\0';
return out_name;
}
static void coroipcs_init_conn_stats (
struct conn_info *conn)
{
char conn_name[CS_MAX_NAME_LENGTH];
char proc_name[CS_MAX_NAME_LENGTH];
char int_str[4];
if (conn->client_pid > 0) {
if (pid_to_name (conn->client_pid, proc_name, sizeof(proc_name))) {
snprintf (conn_name, sizeof(conn_name),
"%s:%s:%d:%d", proc_name,
short_service_name_get(conn->service, int_str, 4),
conn->client_pid, conn->fd);
} else {
snprintf (conn_name, sizeof(conn_name),
"proc:%s:%d:%d",
short_service_name_get(conn->service, int_str, 4),
conn->client_pid,
conn->fd);
}
} else {
snprintf (conn_name, sizeof(conn_name),
"proc:%s:pid:%d",
short_service_name_get(conn->service, int_str, 4),
conn->fd);
}
conn->stats_handle = api->stats_create_connection (conn_name, conn->client_pid, conn->fd);
api->stats_update_value (conn->stats_handle, "service_id",
&conn->service, sizeof(conn->service));
}
int coroipcs_handler_dispatch (
int fd,
int revent,
void *context)
{
mar_req_setup_t *req_setup;
struct conn_info *conn_info = (struct conn_info *)context;
int res;
char buf;
if (ipc_thread_exiting (conn_info)) {
return conn_info_destroy (conn_info);
}
/*
* If an error occurs, request exit
*/
if (revent & (POLLERR|POLLHUP)) {
ipc_disconnect (conn_info);
return (0);
}
/*
* Read the header and process it
*/
if (conn_info->service == SOCKET_SERVICE_INIT && (revent & POLLIN)) {
/*
* Receive in a nonblocking fashion the request
* IF security invalid, send ERR_SECURITY, otherwise
* send OK
*/
res = req_setup_recv (conn_info);
if (res != CS_OK && res != CS_ERR_LIBRARY) {
req_setup_send (conn_info, res);
}
if (res != CS_OK) {
return (0);
}
pthread_mutex_init (&conn_info->mutex, NULL);
req_setup = (mar_req_setup_t *)conn_info->setup_msg;
/*
* Is the service registered ?
* Has service init function ?
*/
if (api->service_available (req_setup->service) == 0 ||
api->init_fn_get (req_setup->service) == NULL) {
req_setup_send (conn_info, CS_ERR_NOT_EXIST);
ipc_disconnect (conn_info);
return (0);
}
#if _POSIX_THREAD_PROCESS_SHARED < 1
conn_info->semkey = req_setup->semkey;
#endif
res = memory_map (
req_setup->control_file,
req_setup->control_size,
(void *)&conn_info->control_buffer);
if (res == -1) {
goto send_setup_response;
}
conn_info->control_size = req_setup->control_size;
res = memory_map (
req_setup->request_file,
req_setup->request_size,
(void *)&conn_info->request_buffer);
if (res == -1) {
goto send_setup_response;
}
conn_info->request_size = req_setup->request_size;
res = memory_map (
req_setup->response_file,
req_setup->response_size,
(void *)&conn_info->response_buffer);
if (res == -1) {
goto send_setup_response;
}
conn_info->response_size = req_setup->response_size;
res = circular_memory_map (
req_setup->dispatch_file,
req_setup->dispatch_size,
(void *)&conn_info->dispatch_buffer);
if (res == -1) {
goto send_setup_response;
}
conn_info->dispatch_size = req_setup->dispatch_size;
send_setup_response:
if (res == 0) {
req_setup_send (conn_info, CS_OK);
} else {
req_setup_send (conn_info, CS_ERR_LIBRARY);
ipc_disconnect (conn_info);
return (0);
}
conn_info->service = req_setup->service;
conn_info->refcount = 0;
conn_info->setup_bytes_read = 0;
#if _POSIX_THREAD_PROCESS_SHARED < 1
conn_info->control_buffer->semid = semget (conn_info->semkey, 3, 0600);
#endif
conn_info->pending_semops = 0;
/*
* ipc thread is the only reference at startup
*/
conn_info->refcount = 1;
conn_info->state = CONN_STATE_THREAD_ACTIVE;
conn_info->private_data = api->malloc (api->private_data_size_get (conn_info->service));
memset (conn_info->private_data, 0,
api->private_data_size_get (conn_info->service));
api->init_fn_get (conn_info->service) (conn_info);
/* create stats objects */
coroipcs_init_conn_stats (conn_info);
pthread_attr_init (&conn_info->thread_attr);
/*
* IA64 needs more stack space then other arches
*/
#if defined(__ia64__)
pthread_attr_setstacksize (&conn_info->thread_attr, 400000);
#else
pthread_attr_setstacksize (&conn_info->thread_attr, 200000);
#endif
pthread_attr_setdetachstate (&conn_info->thread_attr, PTHREAD_CREATE_JOINABLE);
res = pthread_create (&conn_info->thread,
&conn_info->thread_attr,
pthread_ipc_consumer,
conn_info);
/*
* Security check - disallow multiple configurations of
* the ipc connection
*/
if (conn_info->service == SOCKET_SERVICE_INIT) {
conn_info->service = -1;
}
} else
if (revent & POLLIN) {
coroipcs_refcount_inc (conn_info);
res = recv (fd, &buf, 1, MSG_NOSIGNAL);
if (res == 1) {
switch (buf) {
case MESSAGE_REQ_CHANGE_EUID:
if (priv_change (conn_info) == -1) {
ipc_disconnect (conn_info);
}
break;
default:
res = 0;
break;
}
}
#if defined(COROSYNC_SOLARIS) || defined(COROSYNC_BSD) || defined(COROSYNC_DARWIN)
/* On many OS poll never return POLLHUP or POLLERR.
* EOF is detected when recvmsg return 0.
*/
if (res == 0) {
ipc_disconnect (conn_info);
coroipcs_refcount_dec (conn_info);
return (0);
}
#endif
coroipcs_refcount_dec (conn_info);
}
if (revent & POLLOUT) {
int psop = conn_info->pending_semops;
int i;
assert (psop != 0);
for (i = 0; i < psop; i++) {
res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
if (res != 1) {
return (0);
} else {
conn_info->pending_semops -= 1;
}
}
if (conn_info->poll_state == POLL_STATE_INOUT) {
conn_info->poll_state = POLL_STATE_IN;
api->poll_dispatch_modify (conn_info->fd, POLLIN|POLLNVAL);
}
}
return (0);
}
diff --git a/exec/coropoll.c b/exec/coropoll.c
index 95978dfe..4fc30a2c 100644
--- a/exec/coropoll.c
+++ b/exec/coropoll.c
@@ -1,555 +1,559 @@
/*
* Copyright (c) 2003-2004 MontaVista Software, Inc.
* Copyright (c) 2006-2009 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Steven Dake (sdake@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <config.h>
#include <errno.h>
#include <pthread.h>
#include <sys/poll.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <corosync/hdb.h>
#include <corosync/totem/coropoll.h>
#include <corosync/list.h>
#include "tlist.h"
typedef int (*dispatch_fn_t) (hdb_handle_t hdb_handle, int fd, int revents, void *data);
struct poll_entry {
struct pollfd ufd;
dispatch_fn_t dispatch_fn;
void *data;
};
struct poll_instance {
struct poll_entry *poll_entries;
struct pollfd *ufds;
int poll_entry_count;
struct timerlist timerlist;
int stop_requested;
int pipefds[2];
poll_low_fds_event_fn low_fds_event_fn;
int32_t not_enough_fds;
};
DECLARE_HDB_DATABASE (poll_instance_database,NULL);
static int dummy_dispatch_fn (hdb_handle_t handle, int fd, int revents, void *data) {
return (0);
}
hdb_handle_t poll_create (void)
{
hdb_handle_t handle;
struct poll_instance *poll_instance;
unsigned int res;
res = hdb_handle_create (&poll_instance_database,
sizeof (struct poll_instance), &handle);
if (res != 0) {
goto error_exit;
}
res = hdb_handle_get (&poll_instance_database, handle,
(void *)&poll_instance);
if (res != 0) {
goto error_destroy;
}
poll_instance->poll_entries = 0;
poll_instance->ufds = 0;
poll_instance->poll_entry_count = 0;
poll_instance->stop_requested = 0;
poll_instance->not_enough_fds = 0;
timerlist_init (&poll_instance->timerlist);
res = pipe (poll_instance->pipefds);
if (res != 0) {
goto error_destroy;
}
/*
* Allow changes in modify to propogate into new poll instance
*/
res = poll_dispatch_add (
handle,
poll_instance->pipefds[0],
POLLIN,
NULL,
dummy_dispatch_fn);
if (res != 0) {
goto error_destroy;
}
return (handle);
error_destroy:
hdb_handle_destroy (&poll_instance_database, handle);
error_exit:
return (-1);
}
int poll_destroy (hdb_handle_t handle)
{
struct poll_instance *poll_instance;
int res = 0;
res = hdb_handle_get (&poll_instance_database, handle,
(void *)&poll_instance);
if (res != 0) {
res = -ENOENT;
goto error_exit;
}
free (poll_instance->poll_entries);
free (poll_instance->ufds);
hdb_handle_destroy (&poll_instance_database, handle);
hdb_handle_put (&poll_instance_database, handle);
error_exit:
return (res);
}
int poll_dispatch_add (
hdb_handle_t handle,
int fd,
int events,
void *data,
int (*dispatch_fn) (
hdb_handle_t hdb_handle_t,
int fd,
int revents,
void *data))
{
struct poll_instance *poll_instance;
struct poll_entry *poll_entries;
struct pollfd *ufds;
int found = 0;
int install_pos;
int res = 0;
res = hdb_handle_get (&poll_instance_database, handle,
(void *)&poll_instance);
if (res != 0) {
res = -ENOENT;
goto error_exit;
}
for (found = 0, install_pos = 0; install_pos < poll_instance->poll_entry_count; install_pos++) {
if (poll_instance->poll_entries[install_pos].ufd.fd == -1) {
found = 1;
break;
}
}
if (found == 0) {
/*
* Grow pollfd list
*/
poll_entries = (struct poll_entry *)realloc (poll_instance->poll_entries,
(poll_instance->poll_entry_count + 1) *
sizeof (struct poll_entry));
if (poll_entries == NULL) {
res = -ENOMEM;
goto error_put;
}
poll_instance->poll_entries = poll_entries;
ufds = (struct pollfd *)realloc (poll_instance->ufds,
(poll_instance->poll_entry_count + 1) *
sizeof (struct pollfd));
if (ufds == NULL) {
res = -ENOMEM;
goto error_put;
}
poll_instance->ufds = ufds;
poll_instance->poll_entry_count += 1;
install_pos = poll_instance->poll_entry_count - 1;
}
/*
* Install new dispatch handler
*/
poll_instance->poll_entries[install_pos].ufd.fd = fd;
poll_instance->poll_entries[install_pos].ufd.events = events;
poll_instance->poll_entries[install_pos].ufd.revents = 0;
poll_instance->poll_entries[install_pos].dispatch_fn = dispatch_fn;
poll_instance->poll_entries[install_pos].data = data;
error_put:
hdb_handle_put (&poll_instance_database, handle);
error_exit:
return (res);
}
int poll_dispatch_modify (
hdb_handle_t handle,
int fd,
int events,
int (*dispatch_fn) (
hdb_handle_t hdb_handle_t,
int fd,
int revents,
void *data))
{
struct poll_instance *poll_instance;
int i;
int res = 0;
res = hdb_handle_get (&poll_instance_database, handle,
(void *)&poll_instance);
if (res != 0) {
res = -ENOENT;
goto error_exit;
}
/*
* Find file descriptor to modify events and dispatch function
*/
for (i = 0; i < poll_instance->poll_entry_count; i++) {
if (poll_instance->poll_entries[i].ufd.fd == fd) {
int change_notify = 0;
if (poll_instance->poll_entries[i].ufd.events != events) {
change_notify = 1;
}
poll_instance->poll_entries[i].ufd.events = events;
poll_instance->poll_entries[i].dispatch_fn = dispatch_fn;
if (change_notify) {
char buf = 1;
- write (poll_instance->pipefds[1], &buf, 1);
+retry_write:
+ if (write (poll_instance->pipefds[1], &buf, 1) < 0 && errno == EINTR )
+ goto retry_write;
}
goto error_put;
}
}
res = -EBADF;
error_put:
hdb_handle_put (&poll_instance_database, handle);
error_exit:
return (res);
}
int poll_dispatch_delete (
hdb_handle_t handle,
int fd)
{
struct poll_instance *poll_instance;
int i;
int res = 0;
res = hdb_handle_get (&poll_instance_database, handle,
(void *)&poll_instance);
if (res != 0) {
res = -ENOENT;
goto error_exit;
}
/*
* Find dispatch fd to delete
*/
res = -EBADF;
for (i = 0; i < poll_instance->poll_entry_count; i++) {
if (poll_instance->poll_entries[i].ufd.fd == fd) {
poll_instance->ufds[i].fd = -1;
poll_instance->poll_entries[i].ufd.fd = -1;
poll_instance->poll_entries[i].ufd.revents = 0;
res = 0;
break;
}
}
hdb_handle_put (&poll_instance_database, handle);
error_exit:
return (res);
}
int poll_timer_add (
hdb_handle_t handle,
int msec_duration, void *data,
void (*timer_fn) (void *data),
poll_timer_handle *timer_handle_out)
{
struct poll_instance *poll_instance;
int res = 0;
if (timer_handle_out == NULL) {
res = -ENOENT;
goto error_exit;
}
res = hdb_handle_get (&poll_instance_database, handle,
(void *)&poll_instance);
if (res != 0) {
res = -ENOENT;
goto error_exit;
}
timerlist_add_duration (&poll_instance->timerlist,
timer_fn, data, ((unsigned long long)msec_duration) * 1000000ULL, timer_handle_out);
hdb_handle_put (&poll_instance_database, handle);
error_exit:
return (res);
}
int poll_timer_delete (
hdb_handle_t handle,
poll_timer_handle th)
{
struct poll_instance *poll_instance;
int res = 0;
if (th == 0) {
return (0);
}
res = hdb_handle_get (&poll_instance_database, handle,
(void *)&poll_instance);
if (res != 0) {
res = -ENOENT;
goto error_exit;
}
timerlist_del (&poll_instance->timerlist, (void *)th);
hdb_handle_put (&poll_instance_database, handle);
error_exit:
return (res);
}
int poll_stop (
hdb_handle_t handle)
{
struct poll_instance *poll_instance;
unsigned int res;
res = hdb_handle_get (&poll_instance_database, handle,
(void *)&poll_instance);
if (res != 0) {
res = -ENOENT;
goto error_exit;
}
poll_instance->stop_requested = 1;
hdb_handle_put (&poll_instance_database, handle);
error_exit:
return (res);
}
int poll_low_fds_event_set(
hdb_handle_t handle,
poll_low_fds_event_fn fn)
{
struct poll_instance *poll_instance;
if (hdb_handle_get (&poll_instance_database, handle,
(void *)&poll_instance) != 0) {
return -ENOENT;
}
poll_instance->low_fds_event_fn = fn;
hdb_handle_put (&poll_instance_database, handle);
return 0;
}
/* logs, std(in|out|err), pipe */
#define POLL_FDS_USED_MISC 50
static void poll_fds_usage_check(struct poll_instance *poll_instance)
{
struct rlimit lim;
static int32_t socks_limit = 0;
int32_t send_event = 0;
int32_t socks_used = 0;
int32_t socks_avail = 0;
int32_t i;
if (socks_limit == 0) {
if (getrlimit(RLIMIT_NOFILE, &lim) == -1) {
char error_str[100];
strerror_r(errno, error_str, 100);
printf("getrlimit: %s\n", error_str);
return;
}
socks_limit = lim.rlim_cur;
socks_limit -= POLL_FDS_USED_MISC;
if (socks_limit < 0) {
socks_limit = 0;
}
}
for (i = 0; i < poll_instance->poll_entry_count; i++) {
if (poll_instance->poll_entries[i].ufd.fd != -1) {
socks_used++;
}
}
socks_avail = socks_limit - socks_used;
if (socks_avail < 0) {
socks_avail = 0;
}
send_event = 0;
if (poll_instance->not_enough_fds) {
if (socks_avail > 2) {
poll_instance->not_enough_fds = 0;
send_event = 1;
}
} else {
if (socks_avail <= 1) {
poll_instance->not_enough_fds = 1;
send_event = 1;
}
}
if (send_event) {
poll_instance->low_fds_event_fn(poll_instance->not_enough_fds,
socks_avail);
}
}
int poll_run (
hdb_handle_t handle)
{
struct poll_instance *poll_instance;
int i;
unsigned long long expire_timeout_msec = -1;
int res;
int poll_entry_count;
res = hdb_handle_get (&poll_instance_database, handle,
(void *)&poll_instance);
if (res != 0) {
goto error_exit;
}
for (;;) {
rebuild_poll:
for (i = 0; i < poll_instance->poll_entry_count; i++) {
memcpy (&poll_instance->ufds[i],
&poll_instance->poll_entries[i].ufd,
sizeof (struct pollfd));
}
poll_fds_usage_check(poll_instance);
expire_timeout_msec = timerlist_msec_duration_to_expire (&poll_instance->timerlist);
if (expire_timeout_msec != -1 && expire_timeout_msec > 0xFFFFFFFF) {
expire_timeout_msec = 0xFFFFFFFE;
}
retry_poll:
res = poll (poll_instance->ufds,
poll_instance->poll_entry_count, expire_timeout_msec);
if (poll_instance->stop_requested) {
return (0);
}
if (errno == EINTR && res == -1) {
goto retry_poll;
} else
if (res == -1) {
goto error_exit;
}
if (poll_instance->ufds[0].revents) {
char buf;
- read (poll_instance->ufds[0].fd, &buf, 1);
+retry_read:
+ if (read (poll_instance->ufds[0].fd, &buf, 1) < 0 && errno == EINTR)
+ goto retry_read;
goto rebuild_poll;
}
poll_entry_count = poll_instance->poll_entry_count;
for (i = 0; i < poll_entry_count; i++) {
if (poll_instance->ufds[i].fd != -1 &&
poll_instance->ufds[i].revents) {
res = poll_instance->poll_entries[i].dispatch_fn (handle,
poll_instance->ufds[i].fd,
poll_instance->ufds[i].revents,
poll_instance->poll_entries[i].data);
/*
* Remove dispatch functions that return -1
*/
if (res == -1) {
poll_instance->poll_entries[i].ufd.fd = -1; /* empty entry */
}
}
}
timerlist_expire (&poll_instance->timerlist);
} /* for (;;) */
hdb_handle_put (&poll_instance_database, handle);
error_exit:
return (-1);
}
#ifdef COMPILE_OUT
void poll_print_state (
hdb_handle_t handle,
int fd)
{
struct poll_instance *poll_instance;
int i;
int res = 0;
res = hdb_handle_get (&poll_instance_database, handle,
(void *)&poll_instance);
if (res != 0) {
res = -ENOENT;
exit (1);
}
for (i = 0; i < poll_instance->poll_entry_count; i++) {
if (poll_instance->poll_entries[i].ufd.fd == fd) {
printf ("fd %d\n", poll_instance->poll_entries[i].ufd.fd);
printf ("events %d\n", poll_instance->poll_entries[i].ufd.events);
printf ("dispatch_fn %p\n", poll_instance->poll_entries[i].dispatch_fn);
}
}
}
#endif
diff --git a/exec/main.c b/exec/main.c
index 774d618f..5f0edaa3 100644
--- a/exec/main.c
+++ b/exec/main.c
@@ -1,1841 +1,1852 @@
/*
* Copyright (c) 2002-2006 MontaVista Software, Inc.
* Copyright (c) 2006-2009 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Steven Dake (sdake@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <config.h>
#include <pthread.h>
#include <assert.h>
#include <sys/types.h>
#include <sys/file.h>
#include <sys/poll.h>
#include <sys/uio.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/stat.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <signal.h>
#include <sched.h>
#include <time.h>
#include <semaphore.h>
#include <corosync/swab.h>
#include <corosync/corotypes.h>
#include <corosync/coroipc_types.h>
#include <corosync/corodefs.h>
#include <corosync/list.h>
#include <corosync/lcr/lcr_ifact.h>
#include <corosync/totem/coropoll.h>
#include <corosync/totem/totempg.h>
#include <corosync/engine/objdb.h>
#include <corosync/engine/config.h>
#include <corosync/engine/logsys.h>
#include <corosync/coroipcs.h>
#include "quorum.h"
#include "totemsrp.h"
#include "mainconfig.h"
#include "totemconfig.h"
#include "main.h"
#include "sync.h"
#include "syncv2.h"
#include "tlist.h"
#include "timer.h"
#include "util.h"
#include "apidef.h"
#include "service.h"
#include "schedwrk.h"
#include "evil.h"
LOGSYS_DECLARE_SYSTEM ("corosync",
LOGSYS_MODE_OUTPUT_STDERR | LOGSYS_MODE_THREADED | LOGSYS_MODE_FORK,
0,
NULL,
LOG_INFO,
LOG_DAEMON,
LOG_INFO,
NULL,
1000000);
LOGSYS_DECLARE_SUBSYS ("MAIN");
#define SERVER_BACKLOG 5
static int sched_priority = 0;
static unsigned int service_count = 32;
#if defined(HAVE_PTHREAD_SPIN_LOCK)
static pthread_spinlock_t serialize_spin;
#else
static pthread_mutex_t serialize_mutex = PTHREAD_MUTEX_INITIALIZER;
#endif
static struct totem_logging_configuration totem_logging_configuration;
static int num_config_modules;
static struct config_iface_ver0 *config_modules[MAX_DYNAMIC_SERVICES];
static struct objdb_iface_ver0 *objdb = NULL;
static struct corosync_api_v1 *api = NULL;
static enum cs_sync_mode minimum_sync_mode;
static int sync_in_process = 1;
static hdb_handle_t corosync_poll_handle;
struct sched_param global_sched_param;
static hdb_handle_t object_connection_handle;
static hdb_handle_t object_memb_handle;
static corosync_timer_handle_t corosync_stats_timer_handle;
static pthread_t corosync_exit_thread;
static sem_t corosync_exit_sem;
static const char *corosync_lock_file = LOCALSTATEDIR"/run/corosync.pid";
static int32_t corosync_not_enough_fds_left = 0;
static void serialize_unlock (void);
hdb_handle_t corosync_poll_handle_get (void)
{
return (corosync_poll_handle);
}
void corosync_state_dump (void)
{
int i;
for (i = 0; i < SERVICE_HANDLER_MAXIMUM_COUNT; i++) {
if (ais_service[i] && ais_service[i]->exec_dump_fn) {
ais_service[i]->exec_dump_fn ();
}
}
}
static void unlink_all_completed (void)
{
/*
* The schedwrk_do API takes the global serializer lock
* but doesn't release it because this exit callback is called
* before it finishes. Since we know we are exiting, we unlock it
* here
*/
serialize_unlock ();
api->timer_delete (corosync_stats_timer_handle);
poll_stop (corosync_poll_handle);
totempg_finalize ();
/*
* Remove pid lock file
*/
unlink (corosync_lock_file);
corosync_exit_error (AIS_DONE_EXIT);
}
void corosync_shutdown_request (void)
{
static int called = 0;
if (called) {
return;
}
if (called == 0) {
called = 1;
}
sem_post (&corosync_exit_sem);
}
static void *corosync_exit_thread_handler (void *arg)
{
totempg_stats_t * stats;
sem_wait (&corosync_exit_sem);
stats = api->totem_get_stats();
if (stats->mrp->srp->continuous_gather > MAX_NO_CONT_GATHER ||
stats->mrp->srp->operational_entered == 0) {
unlink_all_completed ();
/* NOTREACHED */
}
corosync_service_unlink_all (api, unlink_all_completed);
return arg;
}
static void sigusr2_handler (int num)
{
corosync_state_dump ();
logsys_log_rec_store (LOCALSTATEDIR "/lib/corosync/fdata");
}
static void sigterm_handler (int num)
{
corosync_shutdown_request ();
}
static void sigquit_handler (int num)
{
corosync_shutdown_request ();
}
static void sigintr_handler (int num)
{
corosync_shutdown_request ();
}
static void sigsegv_handler (int num)
{
(void)signal (SIGSEGV, SIG_DFL);
logsys_atexit();
logsys_log_rec_store (LOCALSTATEDIR "/lib/corosync/fdata");
raise (SIGSEGV);
}
static void sigabrt_handler (int num)
{
(void)signal (SIGABRT, SIG_DFL);
logsys_atexit();
logsys_log_rec_store (LOCALSTATEDIR "/lib/corosync/fdata");
raise (SIGABRT);
}
#define LOCALHOST_IP inet_addr("127.0.0.1")
static hdb_handle_t corosync_group_handle;
static struct totempg_group corosync_group = {
.group = "a",
.group_len = 1
};
#if defined(HAVE_PTHREAD_SPIN_LOCK)
static void serialize_lock (void)
{
pthread_spin_lock (&serialize_spin);
}
static void serialize_unlock (void)
{
pthread_spin_unlock (&serialize_spin);
}
#else
static void serialize_lock (void)
{
pthread_mutex_lock (&serialize_mutex);
}
static void serialize_unlock (void)
{
pthread_mutex_unlock (&serialize_mutex);
}
#endif
static void corosync_sync_completed (void)
{
log_printf (LOGSYS_LEVEL_NOTICE,
"Completed service synchronization, ready to provide service.\n");
sync_in_process = 0;
}
static int corosync_sync_callbacks_retrieve (int sync_id,
struct sync_callbacks *callbacks)
{
unsigned int ais_service_index;
int res;
for (ais_service_index = 0;
ais_service_index < SERVICE_HANDLER_MAXIMUM_COUNT;
ais_service_index++) {
if (ais_service[ais_service_index] != NULL
&& (ais_service[ais_service_index]->sync_mode == CS_SYNC_V1
|| ais_service[ais_service_index]->sync_mode == CS_SYNC_V1_APIV2)) {
if (ais_service_index == sync_id) {
break;
}
}
}
/*
* Try to load backwards compat sync engines
*/
if (ais_service_index == SERVICE_HANDLER_MAXIMUM_COUNT) {
res = evil_callbacks_load (sync_id, callbacks);
return (res);
}
callbacks->name = ais_service[ais_service_index]->name;
callbacks->sync_init_api.sync_init_v1 = ais_service[ais_service_index]->sync_init;
callbacks->api_version = 1;
if (ais_service[ais_service_index]->sync_mode == CS_SYNC_V1_APIV2) {
callbacks->api_version = 2;
}
callbacks->sync_process = ais_service[ais_service_index]->sync_process;
callbacks->sync_activate = ais_service[ais_service_index]->sync_activate;
callbacks->sync_abort = ais_service[ais_service_index]->sync_abort;
return (0);
}
static int corosync_sync_v2_callbacks_retrieve (
int service_id,
struct sync_callbacks *callbacks)
{
int res;
if (minimum_sync_mode == CS_SYNC_V2 && service_id == CLM_SERVICE && ais_service[CLM_SERVICE] == NULL) {
res = evil_callbacks_load (service_id, callbacks);
return (res);
}
if (minimum_sync_mode == CS_SYNC_V2 && service_id == EVT_SERVICE && ais_service[EVT_SERVICE] == NULL) {
res = evil_callbacks_load (service_id, callbacks);
return (res);
}
if (ais_service[service_id] == NULL) {
return (-1);
}
if (minimum_sync_mode == CS_SYNC_V1 && ais_service[service_id]->sync_mode != CS_SYNC_V2) {
return (-1);
}
callbacks->name = ais_service[service_id]->name;
callbacks->api_version = 1;
if (ais_service[service_id]->sync_mode == CS_SYNC_V1_APIV2) {
callbacks->api_version = 2;
}
callbacks->sync_init_api.sync_init_v1 = ais_service[service_id]->sync_init;
callbacks->sync_process = ais_service[service_id]->sync_process;
callbacks->sync_activate = ais_service[service_id]->sync_activate;
callbacks->sync_abort = ais_service[service_id]->sync_abort;
return (0);
}
static struct memb_ring_id corosync_ring_id;
static void member_object_joined (unsigned int nodeid)
{
hdb_handle_t object_find_handle;
hdb_handle_t object_node_handle;
char * nodeint_str;
char nodeid_str[64];
unsigned int key_incr_dummy;
snprintf (nodeid_str, 64, "%d", nodeid);
objdb->object_find_create (
object_memb_handle,
nodeid_str,
strlen (nodeid_str),
&object_find_handle);
if (objdb->object_find_next (object_find_handle,
&object_node_handle) == 0) {
objdb->object_key_increment (object_node_handle,
"join_count", strlen("join_count"),
&key_incr_dummy);
objdb->object_key_replace (object_node_handle,
"status", strlen("status"),
"joined", strlen("joined"));
} else {
nodeint_str = (char*)api->totem_ifaces_print (nodeid);
objdb->object_create (object_memb_handle,
&object_node_handle,
nodeid_str, strlen (nodeid_str));
objdb->object_key_create_typed (object_node_handle,
"ip",
nodeint_str, strlen(nodeint_str),
OBJDB_VALUETYPE_STRING);
key_incr_dummy = 1;
objdb->object_key_create_typed (object_node_handle,
"join_count",
&key_incr_dummy, sizeof (key_incr_dummy),
OBJDB_VALUETYPE_UINT32);
objdb->object_key_create_typed (object_node_handle,
"status",
"joined", strlen("joined"),
OBJDB_VALUETYPE_STRING);
}
}
static void member_object_left (unsigned int nodeid)
{
hdb_handle_t object_find_handle;
hdb_handle_t object_node_handle;
char nodeid_str[64];
snprintf (nodeid_str, 64, "%u", nodeid);
objdb->object_find_create (
object_memb_handle,
nodeid_str,
strlen (nodeid_str),
&object_find_handle);
if (objdb->object_find_next (object_find_handle,
&object_node_handle) == 0) {
objdb->object_key_replace (object_node_handle,
"status", strlen("status"),
"left", strlen("left"));
}
}
static void confchg_fn (
enum totem_configuration_type configuration_type,
const unsigned int *member_list, size_t member_list_entries,
const unsigned int *left_list, size_t left_list_entries,
const unsigned int *joined_list, size_t joined_list_entries,
const struct memb_ring_id *ring_id)
{
int i;
int abort_activate = 0;
if (sync_in_process == 1) {
abort_activate = 1;
}
sync_in_process = 1;
serialize_lock ();
memcpy (&corosync_ring_id, ring_id, sizeof (struct memb_ring_id));
for (i = 0; i < left_list_entries; i++) {
member_object_left (left_list[i]);
}
for (i = 0; i < joined_list_entries; i++) {
member_object_joined (joined_list[i]);
}
/*
* Call configuration change for all services
*/
for (i = 0; i < service_count; i++) {
if (ais_service[i] && ais_service[i]->confchg_fn) {
ais_service[i]->confchg_fn (configuration_type,
member_list, member_list_entries,
left_list, left_list_entries,
joined_list, joined_list_entries, ring_id);
}
}
serialize_unlock ();
if (abort_activate) {
sync_v2_abort ();
}
if (minimum_sync_mode == CS_SYNC_V2 && configuration_type == TOTEM_CONFIGURATION_TRANSITIONAL) {
sync_v2_save_transitional (member_list, member_list_entries, ring_id);
}
if (minimum_sync_mode == CS_SYNC_V2 && configuration_type == TOTEM_CONFIGURATION_REGULAR) {
sync_v2_start (member_list, member_list_entries, ring_id);
}
}
static void priv_drop (void)
{
return; /* TODO: we are still not dropping privs */
}
static void corosync_tty_detach (void)
{
+ FILE *r;
+
/*
* Disconnect from TTY if this is not a debug run
*/
switch (fork ()) {
case -1:
corosync_exit_error (AIS_DONE_FORK);
break;
case 0:
/*
* child which is disconnected, run this process
*/
break;
default:
exit (0);
break;
}
/* Create new session */
(void)setsid();
/*
* Map stdin/out/err to /dev/null.
*/
- freopen("/dev/null", "r", stdin);
- freopen("/dev/null", "a", stderr);
- freopen("/dev/null", "a", stdout);
+ r = freopen("/dev/null", "r", stdin);
+ if (r == NULL) {
+ corosync_exit_error (AIS_DONE_STD_TO_NULL_REDIR);
+ }
+ r = freopen("/dev/null", "a", stderr);
+ if (r == NULL) {
+ corosync_exit_error (AIS_DONE_STD_TO_NULL_REDIR);
+ }
+ r = freopen("/dev/null", "a", stdout);
+ if (r == NULL) {
+ corosync_exit_error (AIS_DONE_STD_TO_NULL_REDIR);
+ }
}
static void corosync_mlockall (void)
{
#if !defined(COROSYNC_BSD) || defined(COROSYNC_FREEBSD_GE_8)
int res;
#endif
struct rlimit rlimit;
rlimit.rlim_cur = RLIM_INFINITY;
rlimit.rlim_max = RLIM_INFINITY;
#ifndef COROSYNC_SOLARIS
setrlimit (RLIMIT_MEMLOCK, &rlimit);
#else
setrlimit (RLIMIT_VMEM, &rlimit);
#endif
#if defined(COROSYNC_BSD) && !defined(COROSYNC_FREEBSD_GE_8)
/* under FreeBSD < 8 a process with locked page cannot call dlopen
* code disabled until FreeBSD bug i386/93396 was solved
*/
log_printf (LOGSYS_LEVEL_WARNING, "Could not lock memory of service to avoid page faults\n");
#else
res = mlockall (MCL_CURRENT | MCL_FUTURE);
if (res == -1) {
char error_str[100];
strerror_r (errno, error_str, 100);
log_printf (LOGSYS_LEVEL_WARNING,
"Could not lock memory of service to avoid page faults: %s\n",
error_str);
};
#endif
}
static void corosync_totem_stats_updater (void *data)
{
totempg_stats_t * stats;
uint32_t mtt_rx_token;
uint32_t total_mtt_rx_token;
uint32_t avg_backlog_calc;
uint32_t total_backlog_calc;
uint32_t avg_token_holdtime;
uint32_t total_token_holdtime;
int t, prev;
int32_t token_count;
uint32_t firewall_enabled_or_nic_failure;
stats = api->totem_get_stats();
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"orf_token_tx", strlen("orf_token_tx"),
&stats->mrp->srp->orf_token_tx, sizeof (stats->mrp->srp->orf_token_tx));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"orf_token_rx", strlen("orf_token_rx"),
&stats->mrp->srp->orf_token_rx, sizeof (stats->mrp->srp->orf_token_rx));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"memb_merge_detect_tx", strlen("memb_merge_detect_tx"),
&stats->mrp->srp->memb_merge_detect_tx, sizeof (stats->mrp->srp->memb_merge_detect_tx));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"memb_merge_detect_rx", strlen("memb_merge_detect_rx"),
&stats->mrp->srp->memb_merge_detect_rx, sizeof (stats->mrp->srp->memb_merge_detect_rx));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"memb_join_tx", strlen("memb_join_tx"),
&stats->mrp->srp->memb_join_tx, sizeof (stats->mrp->srp->memb_join_tx));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"memb_join_rx", strlen("memb_join_rx"),
&stats->mrp->srp->memb_join_rx, sizeof (stats->mrp->srp->memb_join_rx));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"mcast_tx", strlen("mcast_tx"),
&stats->mrp->srp->mcast_tx, sizeof (stats->mrp->srp->mcast_tx));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"mcast_retx", strlen("mcast_retx"),
&stats->mrp->srp->mcast_retx, sizeof (stats->mrp->srp->mcast_retx));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"mcast_rx", strlen("mcast_rx"),
&stats->mrp->srp->mcast_rx, sizeof (stats->mrp->srp->mcast_rx));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"memb_commit_token_tx", strlen("memb_commit_token_tx"),
&stats->mrp->srp->memb_commit_token_tx, sizeof (stats->mrp->srp->memb_commit_token_tx));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"memb_commit_token_rx", strlen("memb_commit_token_rx"),
&stats->mrp->srp->memb_commit_token_rx, sizeof (stats->mrp->srp->memb_commit_token_rx));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"token_hold_cancel_tx", strlen("token_hold_cancel_tx"),
&stats->mrp->srp->token_hold_cancel_tx, sizeof (stats->mrp->srp->token_hold_cancel_tx));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"token_hold_cancel_rx", strlen("token_hold_cancel_rx"),
&stats->mrp->srp->token_hold_cancel_rx, sizeof (stats->mrp->srp->token_hold_cancel_rx));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"operational_entered", strlen("operational_entered"),
&stats->mrp->srp->operational_entered, sizeof (stats->mrp->srp->operational_entered));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"operational_token_lost", strlen("operational_token_lost"),
&stats->mrp->srp->operational_token_lost, sizeof (stats->mrp->srp->operational_token_lost));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"gather_entered", strlen("gather_entered"),
&stats->mrp->srp->gather_entered, sizeof (stats->mrp->srp->gather_entered));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"gather_token_lost", strlen("gather_token_lost"),
&stats->mrp->srp->gather_token_lost, sizeof (stats->mrp->srp->gather_token_lost));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"commit_entered", strlen("commit_entered"),
&stats->mrp->srp->commit_entered, sizeof (stats->mrp->srp->commit_entered));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"commit_token_lost", strlen("commit_token_lost"),
&stats->mrp->srp->commit_token_lost, sizeof (stats->mrp->srp->commit_token_lost));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"recovery_entered", strlen("recovery_entered"),
&stats->mrp->srp->recovery_entered, sizeof (stats->mrp->srp->recovery_entered));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"recovery_token_lost", strlen("recovery_token_lost"),
&stats->mrp->srp->recovery_token_lost, sizeof (stats->mrp->srp->recovery_token_lost));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"consensus_timeouts", strlen("consensus_timeouts"),
&stats->mrp->srp->consensus_timeouts, sizeof (stats->mrp->srp->consensus_timeouts));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"rx_msg_dropped", strlen("rx_msg_dropped"),
&stats->mrp->srp->rx_msg_dropped, sizeof (stats->mrp->srp->rx_msg_dropped));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"continuous_gather", strlen("continuous_gather"),
&stats->mrp->srp->continuous_gather, sizeof (stats->mrp->srp->continuous_gather));
firewall_enabled_or_nic_failure = (stats->mrp->srp->continuous_gather > MAX_NO_CONT_GATHER ? 1 : 0);
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"firewall_enabled_or_nic_failure", strlen("firewall_enabled_or_nic_failure"),
&firewall_enabled_or_nic_failure, sizeof (firewall_enabled_or_nic_failure));
total_mtt_rx_token = 0;
total_token_holdtime = 0;
total_backlog_calc = 0;
token_count = 0;
t = stats->mrp->srp->latest_token;
while (1) {
if (t == 0)
prev = TOTEM_TOKEN_STATS_MAX - 1;
else
prev = t - 1;
if (prev == stats->mrp->srp->earliest_token)
break;
/* if tx == 0, then dropped token (not ours) */
if (stats->mrp->srp->token[t].tx != 0 ||
(stats->mrp->srp->token[t].rx - stats->mrp->srp->token[prev].rx) > 0 ) {
total_mtt_rx_token += (stats->mrp->srp->token[t].rx - stats->mrp->srp->token[prev].rx);
total_token_holdtime += (stats->mrp->srp->token[t].tx - stats->mrp->srp->token[t].rx);
total_backlog_calc += stats->mrp->srp->token[t].backlog_calc;
token_count++;
}
t = prev;
}
if (token_count) {
mtt_rx_token = (total_mtt_rx_token / token_count);
avg_backlog_calc = (total_backlog_calc / token_count);
avg_token_holdtime = (total_token_holdtime / token_count);
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"mtt_rx_token", strlen("mtt_rx_token"),
&mtt_rx_token, sizeof (mtt_rx_token));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"avg_token_workload", strlen("avg_token_workload"),
&avg_token_holdtime, sizeof (avg_token_holdtime));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"avg_backlog_calc", strlen("avg_backlog_calc"),
&avg_backlog_calc, sizeof (avg_backlog_calc));
}
api->timer_add_duration (1500 * MILLI_2_NANO_SECONDS, NULL,
corosync_totem_stats_updater,
&corosync_stats_timer_handle);
}
static void corosync_totem_stats_init (void)
{
totempg_stats_t * stats;
hdb_handle_t object_find_handle;
hdb_handle_t object_runtime_handle;
hdb_handle_t object_totem_handle;
uint32_t zero_32 = 0;
uint64_t zero_64 = 0;
stats = api->totem_get_stats();
objdb->object_find_create (
OBJECT_PARENT_HANDLE,
"runtime",
strlen ("runtime"),
&object_find_handle);
if (objdb->object_find_next (object_find_handle,
&object_runtime_handle) == 0) {
objdb->object_create (object_runtime_handle,
&object_totem_handle,
"totem", strlen ("totem"));
objdb->object_create (object_totem_handle,
&stats->hdr.handle,
"pg", strlen ("pg"));
objdb->object_create (stats->hdr.handle,
&stats->mrp->hdr.handle,
"mrp", strlen ("mrp"));
objdb->object_create (stats->mrp->hdr.handle,
&stats->mrp->srp->hdr.handle,
"srp", strlen ("srp"));
/* Members object */
objdb->object_create (stats->mrp->srp->hdr.handle,
&object_memb_handle,
"members", strlen ("members"));
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"orf_token_tx", &stats->mrp->srp->orf_token_tx,
sizeof (stats->mrp->srp->orf_token_tx), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"orf_token_rx", &stats->mrp->srp->orf_token_rx,
sizeof (stats->mrp->srp->orf_token_rx), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"memb_merge_detect_tx", &stats->mrp->srp->memb_merge_detect_tx,
sizeof (stats->mrp->srp->memb_merge_detect_tx), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"memb_merge_detect_rx", &stats->mrp->srp->memb_merge_detect_rx,
sizeof (stats->mrp->srp->memb_merge_detect_rx), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"memb_join_tx", &stats->mrp->srp->memb_join_tx,
sizeof (stats->mrp->srp->memb_join_tx), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"memb_join_rx", &stats->mrp->srp->memb_join_rx,
sizeof (stats->mrp->srp->memb_join_rx), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"mcast_tx", &stats->mrp->srp->mcast_tx,
sizeof (stats->mrp->srp->mcast_tx), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"mcast_retx", &stats->mrp->srp->mcast_retx,
sizeof (stats->mrp->srp->mcast_retx), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"mcast_rx", &stats->mrp->srp->mcast_rx,
sizeof (stats->mrp->srp->mcast_rx), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"memb_commit_token_tx", &stats->mrp->srp->memb_commit_token_tx,
sizeof (stats->mrp->srp->memb_commit_token_tx), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"memb_commit_token_rx", &stats->mrp->srp->memb_commit_token_rx,
sizeof (stats->mrp->srp->memb_commit_token_rx), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"token_hold_cancel_tx", &stats->mrp->srp->token_hold_cancel_tx,
sizeof (stats->mrp->srp->token_hold_cancel_tx), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"token_hold_cancel_rx", &stats->mrp->srp->token_hold_cancel_rx,
sizeof (stats->mrp->srp->token_hold_cancel_rx), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"operational_entered", &stats->mrp->srp->operational_entered,
sizeof (stats->mrp->srp->operational_entered), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"operational_token_lost", &stats->mrp->srp->operational_token_lost,
sizeof (stats->mrp->srp->operational_token_lost), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"gather_entered", &stats->mrp->srp->gather_entered,
sizeof (stats->mrp->srp->gather_entered), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"gather_token_lost", &stats->mrp->srp->gather_token_lost,
sizeof (stats->mrp->srp->gather_token_lost), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"commit_entered", &stats->mrp->srp->commit_entered,
sizeof (stats->mrp->srp->commit_entered), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"commit_token_lost", &stats->mrp->srp->commit_token_lost,
sizeof (stats->mrp->srp->commit_token_lost), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"recovery_entered", &stats->mrp->srp->recovery_entered,
sizeof (stats->mrp->srp->recovery_entered), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"recovery_token_lost", &stats->mrp->srp->recovery_token_lost,
sizeof (stats->mrp->srp->recovery_token_lost), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"consensus_timeouts", &stats->mrp->srp->consensus_timeouts,
sizeof (stats->mrp->srp->consensus_timeouts), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"mtt_rx_token", &zero_32,
sizeof (zero_32), OBJDB_VALUETYPE_UINT32);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"avg_token_workload", &zero_32,
sizeof (zero_32), OBJDB_VALUETYPE_UINT32);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"avg_backlog_calc", &zero_32,
sizeof (zero_32), OBJDB_VALUETYPE_UINT32);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"rx_msg_dropped", &zero_64,
sizeof (zero_64), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"continuous_gather", &zero_32,
sizeof (zero_32), OBJDB_VALUETYPE_UINT32);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"firewall_enabled_or_nic_failure", &zero_32,
sizeof (zero_32), OBJDB_VALUETYPE_UINT32);
}
/* start stats timer */
api->timer_add_duration (1500 * MILLI_2_NANO_SECONDS, NULL,
corosync_totem_stats_updater,
&corosync_stats_timer_handle);
}
static void deliver_fn (
unsigned int nodeid,
const void *msg,
unsigned int msg_len,
int endian_conversion_required)
{
const coroipc_request_header_t *header;
int service;
int fn_id;
unsigned int id;
unsigned int size;
unsigned int key_incr_dummy;
header = msg;
if (endian_conversion_required) {
id = swab32 (header->id);
size = swab32 (header->size);
} else {
id = header->id;
size = header->size;
}
/*
* Call the proper executive handler
*/
service = id >> 16;
fn_id = id & 0xffff;
serialize_lock();
if (ais_service[service] == NULL && service == EVT_SERVICE) {
evil_deliver_fn (nodeid, service, fn_id, msg,
endian_conversion_required);
}
if (!ais_service[service]) {
serialize_unlock();
return;
}
if (fn_id >= ais_service[service]->exec_engine_count) {
log_printf(LOGSYS_LEVEL_WARNING, "discarded unknown message %d for service %d (max id %d)",
fn_id, service, ais_service[service]->exec_engine_count);
serialize_unlock();
return;
}
objdb->object_key_increment (service_stats_handle[service][fn_id],
"rx", strlen("rx"),
&key_incr_dummy);
if (endian_conversion_required) {
assert(ais_service[service]->exec_engine[fn_id].exec_endian_convert_fn != NULL);
ais_service[service]->exec_engine[fn_id].exec_endian_convert_fn
((void *)msg);
}
ais_service[service]->exec_engine[fn_id].exec_handler_fn
(msg, nodeid);
serialize_unlock();
}
void main_get_config_modules(struct config_iface_ver0 ***modules, int *num)
{
*modules = config_modules;
*num = num_config_modules;
}
int main_mcast (
const struct iovec *iovec,
unsigned int iov_len,
unsigned int guarantee)
{
const coroipc_request_header_t *req = iovec->iov_base;
int service;
int fn_id;
unsigned int key_incr_dummy;
service = req->id >> 16;
fn_id = req->id & 0xffff;
if (ais_service[service]) {
objdb->object_key_increment (service_stats_handle[service][fn_id],
"tx", strlen("tx"), &key_incr_dummy);
}
return (totempg_groups_mcast_joined (corosync_group_handle, iovec, iov_len, guarantee));
}
int message_source_is_local (const mar_message_source_t *source)
{
int ret = 0;
assert (source != NULL);
if (source->nodeid == totempg_my_nodeid_get ()) {
ret = 1;
}
return ret;
}
void message_source_set (
mar_message_source_t *source,
void *conn)
{
assert ((source != NULL) && (conn != NULL));
memset (source, 0, sizeof (mar_message_source_t));
source->nodeid = totempg_my_nodeid_get ();
source->conn = conn;
}
/*
* Provides the glue from corosync to the IPC Service
*/
static int corosync_private_data_size_get (unsigned int service)
{
return (ais_service[service]->private_data_size);
}
static coroipcs_init_fn_lvalue corosync_init_fn_get (unsigned int service)
{
return (ais_service[service]->lib_init_fn);
}
static coroipcs_exit_fn_lvalue corosync_exit_fn_get (unsigned int service)
{
return (ais_service[service]->lib_exit_fn);
}
static coroipcs_handler_fn_lvalue corosync_handler_fn_get (unsigned int service, unsigned int id)
{
return (ais_service[service]->lib_engine[id].lib_handler_fn);
}
static int corosync_security_valid (int euid, int egid)
{
struct list_head *iter;
if (corosync_not_enough_fds_left) {
errno = EMFILE;
return 0;
}
if (euid == 0 || egid == 0) {
return (1);
}
for (iter = uidgid_list_head.next; iter != &uidgid_list_head;
iter = iter->next) {
struct uidgid_item *ugi = list_entry (iter, struct uidgid_item,
list);
if (euid == ugi->uid || egid == ugi->gid)
return (1);
}
errno = EACCES;
return (0);
}
static int corosync_service_available (unsigned int service)
{
return (ais_service[service] != NULL && !ais_service_exiting[service]);
}
struct sending_allowed_private_data_struct {
int reserved_msgs;
};
static int corosync_sending_allowed (
unsigned int service,
unsigned int id,
const void *msg,
void *sending_allowed_private_data)
{
struct sending_allowed_private_data_struct *pd =
(struct sending_allowed_private_data_struct *)sending_allowed_private_data;
struct iovec reserve_iovec;
coroipc_request_header_t *header = (coroipc_request_header_t *)msg;
int sending_allowed;
reserve_iovec.iov_base = (char *)header;
reserve_iovec.iov_len = header->size;
pd->reserved_msgs = totempg_groups_joined_reserve (
corosync_group_handle,
&reserve_iovec, 1);
if (pd->reserved_msgs == -1) {
return (-1);
}
sending_allowed =
(corosync_quorum_is_quorate() == 1 ||
ais_service[service]->allow_inquorate == CS_LIB_ALLOW_INQUORATE) &&
((ais_service[service]->lib_engine[id].flow_control == CS_LIB_FLOW_CONTROL_NOT_REQUIRED) ||
((ais_service[service]->lib_engine[id].flow_control == CS_LIB_FLOW_CONTROL_REQUIRED) &&
(pd->reserved_msgs) &&
(sync_in_process == 0)));
return (sending_allowed);
}
static void corosync_sending_allowed_release (void *sending_allowed_private_data)
{
struct sending_allowed_private_data_struct *pd =
(struct sending_allowed_private_data_struct *)sending_allowed_private_data;
if (pd->reserved_msgs == -1) {
return;
}
totempg_groups_joined_release (pd->reserved_msgs);
}
static int ipc_subsys_id = -1;
static void ipc_fatal_error(const char *error_msg) __attribute__((noreturn));
static void ipc_fatal_error(const char *error_msg) {
_logsys_log_printf (
LOGSYS_ENCODE_RECID(LOGSYS_LEVEL_ERROR,
ipc_subsys_id,
LOGSYS_RECID_LOG),
__FUNCTION__, __FILE__, __LINE__,
"%s", error_msg);
exit(EXIT_FAILURE);
}
static int corosync_poll_handler_accept (
hdb_handle_t handle,
int fd,
int revent,
void *context)
{
return (coroipcs_handler_accept (fd, revent, context));
}
static int corosync_poll_handler_dispatch (
hdb_handle_t handle,
int fd,
int revent,
void *context)
{
return (coroipcs_handler_dispatch (fd, revent, context));
}
static void corosync_poll_accept_add (
int fd)
{
poll_dispatch_add (corosync_poll_handle, fd, POLLIN|POLLNVAL, 0,
corosync_poll_handler_accept);
}
static void corosync_poll_dispatch_add (
int fd,
void *context)
{
poll_dispatch_add (corosync_poll_handle, fd, POLLIN|POLLNVAL, context,
corosync_poll_handler_dispatch);
}
static void corosync_poll_dispatch_modify (
int fd,
int events)
{
poll_dispatch_modify (corosync_poll_handle, fd, events,
corosync_poll_handler_dispatch);
}
static void corosync_poll_dispatch_destroy (
int fd,
void *context)
{
poll_dispatch_delete (corosync_poll_handle, fd);
}
static hdb_handle_t corosync_stats_create_connection (const char* name,
const pid_t pid, const int fd)
{
uint32_t zero_32 = 0;
uint64_t zero_64 = 0;
unsigned int key_incr_dummy;
hdb_handle_t object_handle;
objdb->object_key_increment (object_connection_handle,
"active", strlen("active"),
&key_incr_dummy);
objdb->object_create (object_connection_handle,
&object_handle,
name,
strlen (name));
objdb->object_key_create_typed (object_handle,
"service_id",
&zero_32, sizeof (zero_32),
OBJDB_VALUETYPE_UINT32);
objdb->object_key_create_typed (object_handle,
"client_pid",
&pid, sizeof (pid),
OBJDB_VALUETYPE_INT32);
objdb->object_key_create_typed (object_handle,
"responses",
&zero_64, sizeof (zero_64),
OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (object_handle,
"dispatched",
&zero_64, sizeof (zero_64),
OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (object_handle,
"requests",
&zero_64, sizeof (zero_64),
OBJDB_VALUETYPE_INT64);
objdb->object_key_create_typed (object_handle,
"sem_retry_count",
&zero_64, sizeof (zero_64),
OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (object_handle,
"send_retry_count",
&zero_64, sizeof (zero_64),
OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (object_handle,
"recv_retry_count",
&zero_64, sizeof (zero_64),
OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (object_handle,
"flow_control",
&zero_32, sizeof (zero_32),
OBJDB_VALUETYPE_UINT32);
objdb->object_key_create_typed (object_handle,
"flow_control_count",
&zero_64, sizeof (zero_64),
OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (object_handle,
"queue_size",
&zero_64, sizeof (zero_64),
OBJDB_VALUETYPE_UINT64);
return object_handle;
}
static void corosync_stats_destroy_connection (hdb_handle_t handle)
{
unsigned int key_incr_dummy;
objdb->object_destroy (handle);
objdb->object_key_increment (object_connection_handle,
"closed", strlen("closed"),
&key_incr_dummy);
objdb->object_key_decrement (object_connection_handle,
"active", strlen("active"),
&key_incr_dummy);
}
static void corosync_stats_update_value (hdb_handle_t handle,
const char *name, const void *value,
size_t value_len)
{
objdb->object_key_replace (handle,
name, strlen(name),
value, value_len);
}
static void corosync_stats_increment_value (hdb_handle_t handle,
const char* name)
{
unsigned int key_incr_dummy;
objdb->object_key_increment (handle,
name, strlen(name),
&key_incr_dummy);
}
static void corosync_stats_decrement_value (hdb_handle_t handle,
const char* name)
{
unsigned int key_incr_dummy;
objdb->object_key_decrement (handle,
name, strlen(name),
&key_incr_dummy);
}
static struct coroipcs_init_state_v2 ipc_init_state_v2 = {
.socket_name = COROSYNC_SOCKET_NAME,
.sched_policy = SCHED_OTHER,
.sched_param = &global_sched_param,
.malloc = malloc,
.free = free,
.log_printf = _logsys_log_printf,
.fatal_error = ipc_fatal_error,
.security_valid = corosync_security_valid,
.service_available = corosync_service_available,
.private_data_size_get = corosync_private_data_size_get,
.serialize_lock = serialize_lock,
.serialize_unlock = serialize_unlock,
.sending_allowed = corosync_sending_allowed,
.sending_allowed_release = corosync_sending_allowed_release,
.poll_accept_add = corosync_poll_accept_add,
.poll_dispatch_add = corosync_poll_dispatch_add,
.poll_dispatch_modify = corosync_poll_dispatch_modify,
.poll_dispatch_destroy = corosync_poll_dispatch_destroy,
.init_fn_get = corosync_init_fn_get,
.exit_fn_get = corosync_exit_fn_get,
.handler_fn_get = corosync_handler_fn_get,
.stats_create_connection = corosync_stats_create_connection,
.stats_destroy_connection = corosync_stats_destroy_connection,
.stats_update_value = corosync_stats_update_value,
.stats_increment_value = corosync_stats_increment_value,
.stats_decrement_value = corosync_stats_decrement_value,
};
static void corosync_setscheduler (void)
{
#if defined(HAVE_PTHREAD_SETSCHEDPARAM) && defined(HAVE_SCHED_GET_PRIORITY_MAX) && defined(HAVE_SCHED_SETSCHEDULER)
int res;
sched_priority = sched_get_priority_max (SCHED_RR);
if (sched_priority != -1) {
global_sched_param.sched_priority = sched_priority;
res = sched_setscheduler (0, SCHED_RR, &global_sched_param);
if (res == -1) {
char error_str[100];
strerror_r (errno, error_str, 100);
global_sched_param.sched_priority = 0;
log_printf (LOGSYS_LEVEL_WARNING, "Could not set SCHED_RR at priority %d: %s\n",
global_sched_param.sched_priority, error_str);
logsys_thread_priority_set (SCHED_OTHER, NULL, 1);
} else {
/*
* Turn on SCHED_RR in ipc system
*/
ipc_init_state_v2.sched_policy = SCHED_RR;
/*
* Turn on SCHED_RR in logsys system
*/
res = logsys_thread_priority_set (SCHED_RR, &global_sched_param, 10);
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR,
"Could not set logsys thread priority."
" Can't continue because of priority inversions.");
corosync_exit_error (AIS_DONE_LOGSETUP);
}
}
} else {
char error_str[100];
strerror_r (errno, error_str, 100);
log_printf (LOGSYS_LEVEL_WARNING,
"Could not get maximum scheduler priority: %s\n",
error_str);
sched_priority = 0;
}
#else
log_printf(LOGSYS_LEVEL_WARNING,
"The Platform is missing process priority setting features. Leaving at default.");
#endif
}
static void fplay_key_change_notify_fn (
object_change_type_t change_type,
hdb_handle_t parent_object_handle,
hdb_handle_t object_handle,
const void *object_name_pt, size_t object_name_len,
const void *key_name_pt, size_t key_len,
const void *key_value_pt, size_t key_value_len,
void *priv_data_pt)
{
if (key_len == strlen ("dump_flight_data") &&
memcmp ("dump_flight_data", key_name_pt, key_len) == 0) {
logsys_log_rec_store (LOCALSTATEDIR "/lib/corosync/fdata");
}
if (key_len == strlen ("dump_state") &&
memcmp ("dump_state", key_name_pt, key_len) == 0) {
corosync_state_dump ();
}
}
static void corosync_fplay_control_init (void)
{
hdb_handle_t object_find_handle;
hdb_handle_t object_runtime_handle;
hdb_handle_t object_blackbox_handle;
objdb->object_find_create (OBJECT_PARENT_HANDLE,
"runtime", strlen ("runtime"),
&object_find_handle);
if (objdb->object_find_next (object_find_handle,
&object_runtime_handle) != 0) {
return;
}
objdb->object_create (object_runtime_handle,
&object_blackbox_handle,
"blackbox", strlen ("blackbox"));
objdb->object_key_create_typed (object_blackbox_handle,
"dump_flight_data", "no", strlen("no"),
OBJDB_VALUETYPE_STRING);
objdb->object_key_create_typed (object_blackbox_handle,
"dump_state", "no", strlen("no"),
OBJDB_VALUETYPE_STRING);
objdb->object_track_start (object_blackbox_handle,
OBJECT_TRACK_DEPTH_RECURSIVE,
fplay_key_change_notify_fn,
NULL, NULL, NULL, NULL);
}
static void corosync_stats_init (void)
{
hdb_handle_t object_find_handle;
hdb_handle_t object_runtime_handle;
uint64_t zero_64 = 0;
objdb->object_find_create (OBJECT_PARENT_HANDLE,
"runtime", strlen ("runtime"),
&object_find_handle);
if (objdb->object_find_next (object_find_handle,
&object_runtime_handle) != 0) {
return;
}
/* Connection objects */
objdb->object_create (object_runtime_handle,
&object_connection_handle,
"connections", strlen ("connections"));
objdb->object_key_create_typed (object_connection_handle,
"active", &zero_64, sizeof (zero_64),
OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (object_connection_handle,
"closed", &zero_64, sizeof (zero_64),
OBJDB_VALUETYPE_UINT64);
}
static void main_low_fds_event(int32_t not_enough, int32_t fds_available)
{
corosync_not_enough_fds_left = not_enough;
if (not_enough) {
log_printf(LOGSYS_LEVEL_WARNING, "refusing new connections (fds_available:%d)\n",
fds_available);
} else {
log_printf(LOGSYS_LEVEL_NOTICE, "allowing new connections (fds_available:%d)\n",
fds_available);
}
}
static void main_service_ready (void)
{
int res;
/*
* This must occur after totempg is initialized because "this_ip" must be set
*/
res = corosync_service_defaults_link_and_init (api);
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "Could not initialize default services\n");
corosync_exit_error (AIS_DONE_INIT_SERVICES);
}
evil_init (api);
corosync_stats_init ();
corosync_totem_stats_init ();
corosync_fplay_control_init ();
if (minimum_sync_mode == CS_SYNC_V2) {
log_printf (LOGSYS_LEVEL_NOTICE, "Compatibility mode set to none. Using V2 of the synchronization engine.\n");
sync_v2_init (
corosync_sync_v2_callbacks_retrieve,
corosync_sync_completed);
} else
if (minimum_sync_mode == CS_SYNC_V1) {
log_printf (LOGSYS_LEVEL_NOTICE, "Compatibility mode set to whitetank. Using V1 and V2 of the synchronization engine.\n");
sync_register (
corosync_sync_callbacks_retrieve,
sync_v2_memb_list_determine,
sync_v2_memb_list_abort,
sync_v2_start);
sync_v2_init (
corosync_sync_v2_callbacks_retrieve,
corosync_sync_completed);
}
}
static enum e_ais_done corosync_flock (const char *lockfile, pid_t pid)
{
struct flock lock;
enum e_ais_done err;
char pid_s[17];
int fd_flag;
int lf;
err = AIS_DONE_EXIT;
lf = open (lockfile, O_WRONLY | O_CREAT, 0640);
if (lf == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't create lock file.\n");
return (AIS_DONE_AQUIRE_LOCK);
}
retry_fcntl:
lock.l_type = F_WRLCK;
lock.l_start = 0;
lock.l_whence = SEEK_SET;
lock.l_len = 0;
if (fcntl (lf, F_SETLK, &lock) == -1) {
switch (errno) {
case EINTR:
goto retry_fcntl;
break;
case EAGAIN:
case EACCES:
log_printf (LOGSYS_LEVEL_ERROR, "Another Corosync instance is already running.\n");
err = AIS_DONE_ALREADY_RUNNING;
goto error_close;
break;
default:
log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't aquire lock. Error was %s\n",
strerror(errno));
err = AIS_DONE_AQUIRE_LOCK;
goto error_close;
break;
}
}
if (ftruncate (lf, 0) == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't truncate lock file. Error was %s\n",
strerror (errno));
err = AIS_DONE_AQUIRE_LOCK;
goto error_close_unlink;
}
memset (pid_s, 0, sizeof (pid_s));
snprintf (pid_s, sizeof (pid_s) - 1, "%u\n", pid);
retry_write:
if (write (lf, pid_s, strlen (pid_s)) != strlen (pid_s)) {
if (errno == EINTR) {
goto retry_write;
} else {
log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't write pid to lock file. "
"Error was %s\n", strerror (errno));
err = AIS_DONE_AQUIRE_LOCK;
goto error_close_unlink;
}
}
if ((fd_flag = fcntl (lf, F_GETFD, 0)) == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't get close-on-exec flag from lock file. "
"Error was %s\n", strerror (errno));
err = AIS_DONE_AQUIRE_LOCK;
goto error_close_unlink;
}
fd_flag |= FD_CLOEXEC;
if (fcntl (lf, F_SETFD, fd_flag) == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't set close-on-exec flag to lock file. "
"Error was %s\n", strerror (errno));
err = AIS_DONE_AQUIRE_LOCK;
goto error_close_unlink;
}
return (err);
error_close_unlink:
unlink (lockfile);
error_close:
close (lf);
return (err);
}
int main (int argc, char **argv, char **envp)
{
const char *error_string;
struct totem_config totem_config;
hdb_handle_t objdb_handle;
hdb_handle_t config_handle;
unsigned int config_version = 0;
void *objdb_p;
struct config_iface_ver0 *config;
void *config_p;
const char *config_iface_init;
char *config_iface;
char *iface;
char *strtok_save_pt;
int res, ch;
int background, setprio;
struct stat stat_out;
char corosync_lib_dir[PATH_MAX];
hdb_handle_t object_runtime_handle;
enum e_ais_done flock_err;
#if defined(HAVE_PTHREAD_SPIN_LOCK)
pthread_spin_init (&serialize_spin, 0);
#endif
/* default configuration
*/
background = 1;
setprio = 1;
while ((ch = getopt (argc, argv, "fpv")) != EOF) {
switch (ch) {
case 'f':
background = 0;
logsys_config_mode_set (NULL, LOGSYS_MODE_OUTPUT_STDERR|LOGSYS_MODE_THREADED|LOGSYS_MODE_FORK);
break;
case 'p':
setprio = 0;
break;
case 'v':
printf ("Corosync Cluster Engine, version '%s'\n", VERSION);
printf ("Copyright (c) 2006-2009 Red Hat, Inc.\n");
return EXIT_SUCCESS;
break;
default:
fprintf(stderr, \
"usage:\n"\
" -f : Start application in foreground.\n"\
" -p : Do not set process priority. \n"\
" -v : Display version and SVN revision of Corosync and exit.\n");
return EXIT_FAILURE;
}
}
/*
* Set round robin realtime scheduling with priority 99
* Lock all memory to avoid page faults which may interrupt
* application healthchecking
*/
if (setprio) {
corosync_setscheduler ();
}
corosync_mlockall ();
log_printf (LOGSYS_LEVEL_NOTICE, "Corosync Cluster Engine ('%s'): started and ready to provide service.\n", VERSION);
log_printf (LOGSYS_LEVEL_INFO, "Corosync built-in features:" PACKAGE_FEATURES "\n");
(void)signal (SIGINT, sigintr_handler);
(void)signal (SIGUSR2, sigusr2_handler);
(void)signal (SIGSEGV, sigsegv_handler);
(void)signal (SIGABRT, sigabrt_handler);
(void)signal (SIGQUIT, sigquit_handler);
(void)signal (SIGTERM, sigterm_handler);
#if MSG_NOSIGNAL != 0
(void)signal (SIGPIPE, SIG_IGN);
#endif
/*
* Load the object database interface
*/
res = lcr_ifact_reference (
&objdb_handle,
"objdb",
0,
&objdb_p,
0);
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't open configuration object database component.\n");
corosync_exit_error (AIS_DONE_OBJDB);
}
objdb = (struct objdb_iface_ver0 *)objdb_p;
objdb->objdb_init ();
/*
* Initialize the corosync_api_v1 definition
*/
apidef_init (objdb);
api = apidef_get ();
num_config_modules = 0;
/*
* Bootstrap in the default configuration parser or use
* the corosync default built in parser if the configuration parser
* isn't overridden
*/
config_iface_init = getenv("COROSYNC_DEFAULT_CONFIG_IFACE");
if (!config_iface_init) {
config_iface_init = "corosync_parser";
}
/* Make a copy so we can deface it with strtok */
if ((config_iface = strdup(config_iface_init)) == NULL) {
log_printf (LOGSYS_LEVEL_ERROR, "exhausted virtual memory");
corosync_exit_error (AIS_DONE_OBJDB);
}
iface = strtok_r(config_iface, ":", &strtok_save_pt);
while (iface)
{
res = lcr_ifact_reference (
&config_handle,
iface,
config_version,
&config_p,
0);
config = (struct config_iface_ver0 *)config_p;
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't open configuration component '%s'\n", iface);
corosync_exit_error (AIS_DONE_MAINCONFIGREAD);
}
res = config->config_readconfig(objdb, &error_string);
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string);
corosync_exit_error (AIS_DONE_MAINCONFIGREAD);
}
log_printf (LOGSYS_LEVEL_NOTICE, "%s", error_string);
config_modules[num_config_modules++] = config;
iface = strtok_r(NULL, ":", &strtok_save_pt);
}
free(config_iface);
res = corosync_main_config_read (objdb, &error_string);
if (res == -1) {
/*
* if we are here, we _must_ flush the logsys queue
* and try to inform that we couldn't read the config.
* this is a desperate attempt before certain death
* and there is no guarantee that we can print to stderr
* nor that logsys is sending the messages where we expect.
*/
log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string);
fprintf(stderr, "%s", error_string);
syslog (LOGSYS_LEVEL_ERROR, "%s", error_string);
corosync_exit_error (AIS_DONE_MAINCONFIGREAD);
}
/*
* Make sure required directory is present
*/
sprintf (corosync_lib_dir, "%s/lib/corosync", LOCALSTATEDIR);
res = stat (corosync_lib_dir, &stat_out);
if ((res == -1) || (res == 0 && !S_ISDIR(stat_out.st_mode))) {
log_printf (LOGSYS_LEVEL_ERROR, "Required directory not present %s. Please create it.\n", corosync_lib_dir);
corosync_exit_error (AIS_DONE_DIR_NOT_PRESENT);
}
res = totem_config_read (objdb, &totem_config, &error_string);
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string);
corosync_exit_error (AIS_DONE_MAINCONFIGREAD);
}
res = totem_config_keyread (objdb, &totem_config, &error_string);
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string);
corosync_exit_error (AIS_DONE_MAINCONFIGREAD);
}
res = totem_config_validate (&totem_config, &error_string);
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string);
corosync_exit_error (AIS_DONE_MAINCONFIGREAD);
}
totem_config.totem_logging_configuration = totem_logging_configuration;
totem_config.totem_logging_configuration.log_subsys_id =
_logsys_subsys_create ("TOTEM");
if (totem_config.totem_logging_configuration.log_subsys_id < 0) {
log_printf (LOGSYS_LEVEL_ERROR,
"Unable to initialize TOTEM logging subsystem\n");
corosync_exit_error (AIS_DONE_MAINCONFIGREAD);
}
totem_config.totem_logging_configuration.log_level_security = LOGSYS_LEVEL_WARNING;
totem_config.totem_logging_configuration.log_level_error = LOGSYS_LEVEL_ERROR;
totem_config.totem_logging_configuration.log_level_warning = LOGSYS_LEVEL_WARNING;
totem_config.totem_logging_configuration.log_level_notice = LOGSYS_LEVEL_NOTICE;
totem_config.totem_logging_configuration.log_level_debug = LOGSYS_LEVEL_DEBUG;
totem_config.totem_logging_configuration.log_printf = _logsys_log_printf;
res = corosync_main_config_compatibility_read (objdb,
&minimum_sync_mode,
&error_string);
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string);
corosync_exit_error (AIS_DONE_MAINCONFIGREAD);
}
res = corosync_main_config_compatibility_read (objdb,
&minimum_sync_mode,
&error_string);
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string);
corosync_exit_error (AIS_DONE_MAINCONFIGREAD);
}
/* create the main runtime object */
objdb->object_create (OBJECT_PARENT_HANDLE,
&object_runtime_handle,
"runtime", strlen ("runtime"));
/*
* Now we are fully initialized.
*/
if (background) {
corosync_tty_detach ();
}
logsys_fork_completed();
if ((flock_err = corosync_flock (corosync_lock_file, getpid ())) != AIS_DONE_EXIT) {
corosync_exit_error (flock_err);
}
corosync_timer_init (
serialize_lock,
serialize_unlock,
sched_priority);
corosync_poll_handle = poll_create ();
poll_low_fds_event_set(corosync_poll_handle, main_low_fds_event);
/*
* Sleep for a while to let other nodes in the cluster
* understand that this node has been away (if it was
* an corosync restart).
*/
// TODO what is this hack for? usleep(totem_config.token_timeout * 2000);
/*
* Create semaphore and start "exit" thread
*/
res = sem_init (&corosync_exit_sem, 0, 0);
if (res != 0) {
log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't create exit thread.\n");
corosync_exit_error (AIS_DONE_FATAL_ERR);
}
res = pthread_create (&corosync_exit_thread, NULL, corosync_exit_thread_handler, NULL);
if (res != 0) {
log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't create exit thread.\n");
corosync_exit_error (AIS_DONE_FATAL_ERR);
}
/*
* if totempg_initialize doesn't have root priveleges, it cannot
* bind to a specific interface. This only matters if
* there is more then one interface in a system, so
* in this case, only a warning is printed
*/
/*
* Join multicast group and setup delivery
* and configuration change functions
*/
totempg_initialize (
corosync_poll_handle,
&totem_config);
totempg_service_ready_register (
main_service_ready);
totempg_groups_initialize (
&corosync_group_handle,
deliver_fn,
confchg_fn);
totempg_groups_join (
corosync_group_handle,
&corosync_group,
1);
/*
* Drop root privleges to user 'ais'
* TODO: Don't really need full root capabilities;
* needed capabilities are:
* CAP_NET_RAW (bindtodevice)
* CAP_SYS_NICE (setscheduler)
* CAP_IPC_LOCK (mlockall)
*/
priv_drop ();
schedwrk_init (
serialize_lock,
serialize_unlock);
ipc_subsys_id = _logsys_subsys_create ("IPC");
if (ipc_subsys_id < 0) {
log_printf (LOGSYS_LEVEL_ERROR,
"Could not initialize IPC logging subsystem\n");
corosync_exit_error (AIS_DONE_INIT_SERVICES);
}
ipc_init_state_v2.log_subsys_id = ipc_subsys_id;
coroipcs_ipc_init_v2 (&ipc_init_state_v2);
/*
* Start main processing loop
*/
poll_run (corosync_poll_handle);
return EXIT_SUCCESS;
}
diff --git a/exec/totemconfig.c b/exec/totemconfig.c
index 0b9732b4..45d4a30a 100644
--- a/exec/totemconfig.c
+++ b/exec/totemconfig.c
@@ -1,948 +1,948 @@
/*
* Copyright (c) 2002-2005 MontaVista Software, Inc.
* Copyright (c) 2006-2010 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Steven Dake (sdake@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <config.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/param.h>
#include <corosync/swab.h>
#include <corosync/list.h>
#include <corosync/totem/totem.h>
#include <corosync/engine/objdb.h>
#include <corosync/engine/config.h>
#include <corosync/engine/logsys.h>
#ifdef HAVE_LIBNSS
#include <nss.h>
#include <pk11pub.h>
#include <pkcs11.h>
#include <prerror.h>
#endif
#include "util.h"
#include "totemconfig.h"
#include "tlist.h" /* for HZ */
#define TOKEN_RETRANSMITS_BEFORE_LOSS_CONST 4
#define TOKEN_TIMEOUT 1000
#define TOKEN_RETRANSMIT_TIMEOUT (int)(TOKEN_TIMEOUT / (TOKEN_RETRANSMITS_BEFORE_LOSS_CONST + 0.2))
#define TOKEN_HOLD_TIMEOUT (int)(TOKEN_RETRANSMIT_TIMEOUT * 0.8 - (1000/(int)HZ))
#define JOIN_TIMEOUT 50
#define MERGE_TIMEOUT 200
#define DOWNCHECK_TIMEOUT 1000
#define FAIL_TO_RECV_CONST 50
#define SEQNO_UNCHANGED_CONST 30
#define MINIMUM_TIMEOUT (int)(1000/HZ)*3
#define MAX_NETWORK_DELAY 50
#define WINDOW_SIZE 50
#define MAX_MESSAGES 17
#define MISS_COUNT_CONST 5
#define RRP_PROBLEM_COUNT_TIMEOUT 2000
#define RRP_PROBLEM_COUNT_THRESHOLD_DEFAULT 10
#define RRP_PROBLEM_COUNT_THRESHOLD_MIN 5
static char error_string_response[512];
static struct objdb_iface_ver0 *global_objdb;
static void add_totem_config_notification(
struct objdb_iface_ver0 *objdb,
struct totem_config *totem_config,
hdb_handle_t totem_object_handle);
/* These just makes the code below a little neater */
static inline int objdb_get_string (
const struct objdb_iface_ver0 *objdb,
hdb_handle_t object_service_handle,
const char *key, const char **value)
{
int res;
*value = NULL;
if ( !(res = objdb->object_key_get (object_service_handle,
key,
strlen (key),
(void *)value,
NULL))) {
if (*value) {
return 0;
}
}
return -1;
}
static inline void objdb_get_int (
const struct objdb_iface_ver0 *objdb,
hdb_handle_t object_service_handle,
const char *key, unsigned int *intvalue)
{
char *value = NULL;
if (!objdb->object_key_get (object_service_handle,
key,
strlen (key),
(void *)&value,
NULL)) {
if (value) {
*intvalue = atoi(value);
}
}
}
static unsigned int totem_handle_find (
struct objdb_iface_ver0 *objdb,
hdb_handle_t *totem_find_handle) {
hdb_handle_t object_find_handle;
unsigned int res;
/*
* Find a network section
*/
objdb->object_find_create (
OBJECT_PARENT_HANDLE,
"network",
strlen ("network"),
&object_find_handle);
res = objdb->object_find_next (
object_find_handle,
totem_find_handle);
objdb->object_find_destroy (object_find_handle);
/*
* Network section not found in configuration, checking for totem
*/
if (res == -1) {
objdb->object_find_create (
OBJECT_PARENT_HANDLE,
"totem",
strlen ("totem"),
&object_find_handle);
res = objdb->object_find_next (
object_find_handle,
totem_find_handle);
objdb->object_find_destroy (object_find_handle);
}
if (res == -1) {
return (-1);
}
return (0);
}
static void totem_volatile_config_read (
struct objdb_iface_ver0 *objdb,
struct totem_config *totem_config,
hdb_handle_t object_totem_handle)
{
objdb_get_int (objdb,object_totem_handle, "token", &totem_config->token_timeout);
objdb_get_int (objdb,object_totem_handle, "token_retransmit", &totem_config->token_retransmit_timeout);
objdb_get_int (objdb,object_totem_handle, "hold", &totem_config->token_hold_timeout);
objdb_get_int (objdb,object_totem_handle, "token_retransmits_before_loss_const", &totem_config->token_retransmits_before_loss_const);
objdb_get_int (objdb,object_totem_handle, "join", &totem_config->join_timeout);
objdb_get_int (objdb,object_totem_handle, "send_join", &totem_config->send_join_timeout);
objdb_get_int (objdb,object_totem_handle, "consensus", &totem_config->consensus_timeout);
objdb_get_int (objdb,object_totem_handle, "merge", &totem_config->merge_timeout);
objdb_get_int (objdb,object_totem_handle, "downcheck", &totem_config->downcheck_timeout);
objdb_get_int (objdb,object_totem_handle, "fail_recv_const", &totem_config->fail_to_recv_const);
objdb_get_int (objdb,object_totem_handle, "seqno_unchanged_const", &totem_config->seqno_unchanged_const);
objdb_get_int (objdb,object_totem_handle, "rrp_token_expired_timeout", &totem_config->rrp_token_expired_timeout);
objdb_get_int (objdb,object_totem_handle, "rrp_problem_count_timeout", &totem_config->rrp_problem_count_timeout);
objdb_get_int (objdb,object_totem_handle, "rrp_problem_count_threshold", &totem_config->rrp_problem_count_threshold);
objdb_get_int (objdb,object_totem_handle, "heartbeat_failures_allowed", &totem_config->heartbeat_failures_allowed);
objdb_get_int (objdb,object_totem_handle, "max_network_delay", &totem_config->max_network_delay);
objdb_get_int (objdb,object_totem_handle, "window_size", &totem_config->window_size);
objdb_get_string (objdb, object_totem_handle, "vsftype", &totem_config->vsf_type);
objdb_get_int (objdb,object_totem_handle, "max_messages", &totem_config->max_messages);
objdb_get_int (objdb,object_totem_handle, "miss_count_const", &totem_config->miss_count_const);
}
static void totem_get_crypto_type(
const struct objdb_iface_ver0 *objdb,
hdb_handle_t object_totem_handle,
struct totem_config *totem_config)
{
const char *str;
totem_config->crypto_accept = TOTEM_CRYPTO_ACCEPT_OLD;
if (!objdb_get_string (objdb, object_totem_handle, "crypto_accept", &str)) {
if (strcmp(str, "new") == 0) {
totem_config->crypto_accept = TOTEM_CRYPTO_ACCEPT_NEW;
}
}
totem_config->crypto_type = TOTEM_CRYPTO_SOBER;
#ifdef HAVE_LIBNSS
/*
* We must set these even if the key does not exist.
* Encryption type can be set on-the-fly using CFG
*/
totem_config->crypto_crypt_type = CKM_AES_CBC_PAD;
totem_config->crypto_sign_type = CKM_SHA256_RSA_PKCS;
#endif
if (!objdb_get_string (objdb, object_totem_handle, "crypto_type", &str)) {
if (strcmp(str, "sober") == 0) {
return;
}
#ifdef HAVE_LIBNSS
if (strcmp(str, "nss") == 0) {
totem_config->crypto_type = TOTEM_CRYPTO_NSS;
}
#endif
}
}
extern int totem_config_read (
struct objdb_iface_ver0 *objdb,
struct totem_config *totem_config,
const char **error_string)
{
int res = 0;
hdb_handle_t object_totem_handle;
hdb_handle_t object_interface_handle;
hdb_handle_t object_member_handle;
const char *str;
unsigned int ringnumber = 0;
hdb_handle_t object_find_interface_handle;
hdb_handle_t object_find_member_handle;
const char *transport_type;
int member_count = 0;
res = totem_handle_find (objdb, &object_totem_handle);
if (res == -1) {
printf ("couldn't find totem handle\n");
return (-1);
}
memset (totem_config, 0, sizeof (struct totem_config));
totem_config->interfaces = malloc (sizeof (struct totem_interface) * INTERFACE_MAX);
if (totem_config->interfaces == 0) {
*error_string = "Out of memory trying to allocate ethernet interface storage area";
return -1;
}
memset (totem_config->interfaces, 0,
sizeof (struct totem_interface) * INTERFACE_MAX);
totem_config->secauth = 1;
strcpy (totem_config->rrp_mode, "none");
if (!objdb_get_string (objdb, object_totem_handle, "version", &str)) {
if (strcmp (str, "2") == 0) {
totem_config->version = 2;
}
}
if (!objdb_get_string (objdb, object_totem_handle, "secauth", &str)) {
if (strcmp (str, "on") == 0) {
totem_config->secauth = 1;
}
if (strcmp (str, "off") == 0) {
totem_config->secauth = 0;
}
}
if (totem_config->secauth == 1) {
totem_get_crypto_type(objdb, object_totem_handle, totem_config);
}
if (!objdb_get_string (objdb, object_totem_handle, "rrp_mode", &str)) {
strcpy (totem_config->rrp_mode, str);
}
/*
* Get interface node id
*/
objdb_get_int (objdb, object_totem_handle, "nodeid", &totem_config->node_id);
totem_config->clear_node_high_bit = 0;
if (!objdb_get_string (objdb,object_totem_handle, "clear_node_high_bit", &str)) {
if (strcmp (str, "yes") == 0) {
totem_config->clear_node_high_bit = 1;
}
}
objdb_get_int (objdb,object_totem_handle, "threads", &totem_config->threads);
objdb_get_int (objdb,object_totem_handle, "netmtu", &totem_config->net_mtu);
/*
* Get things that might change in the future
*/
totem_volatile_config_read (objdb, totem_config, object_totem_handle);
objdb->object_find_create (
object_totem_handle,
"interface",
strlen ("interface"),
&object_find_interface_handle);
while (objdb->object_find_next (
object_find_interface_handle,
&object_interface_handle) == 0) {
member_count = 0;
objdb_get_int (objdb, object_interface_handle, "ringnumber", &ringnumber);
/*
* Get interface multicast address
*/
if (!objdb_get_string (objdb, object_interface_handle, "mcastaddr", &str)) {
res = totemip_parse (&totem_config->interfaces[ringnumber].mcast_addr, str, 0);
}
totem_config->broadcast_use = 0;
if (!objdb_get_string (objdb, object_interface_handle, "broadcast", &str)) {
if (strcmp (str, "yes") == 0) {
totem_config->broadcast_use = 1;
totemip_parse (
&totem_config->interfaces[ringnumber].mcast_addr,
"255.255.255.255", 0);
}
}
/*
* Get mcast port
*/
if (!objdb_get_string (objdb, object_interface_handle, "mcastport", &str)) {
totem_config->interfaces[ringnumber].ip_port = atoi (str);
}
/*
* Get the bind net address
*/
if (!objdb_get_string (objdb, object_interface_handle, "bindnetaddr", &str)) {
res = totemip_parse (&totem_config->interfaces[ringnumber].bindnet, str,
totem_config->interfaces[ringnumber].mcast_addr.family);
}
/*
* Get the TTL
*/
totem_config->interfaces[ringnumber].ttl = 1;
if (!objdb_get_string (objdb, object_interface_handle, "ttl", &str)) {
totem_config->interfaces[ringnumber].ttl = atoi (str);
}
objdb->object_find_create (
object_interface_handle,
"member",
strlen ("member"),
&object_find_member_handle);
while (objdb->object_find_next (
object_find_member_handle,
&object_member_handle) == 0) {
if (!objdb_get_string (objdb, object_member_handle, "memberaddr", &str)) {
res = totemip_parse (&totem_config->interfaces[ringnumber].member_list[member_count++], str, 0);
}
}
totem_config->interfaces[ringnumber].member_count = member_count;
totem_config->interface_count++;
}
objdb->object_find_destroy (object_find_interface_handle);
add_totem_config_notification(objdb, totem_config, object_totem_handle);
totem_config->transport_number = TOTEM_TRANSPORT_UDP;
objdb_get_string (objdb, object_totem_handle, "transport", &transport_type);
if (transport_type) {
if (strcmp (transport_type, "udpu") == 0) {
totem_config->transport_number = TOTEM_TRANSPORT_UDPU;
}
}
if (transport_type) {
if (strcmp (transport_type, "iba") == 0) {
totem_config->transport_number = TOTEM_TRANSPORT_RDMA;
}
}
return 0;
}
int totem_config_validate (
struct totem_config *totem_config,
const char **error_string)
{
static char local_error_reason[512];
char parse_error[512];
const char *error_reason = local_error_reason;
int i;
unsigned int interface_max = INTERFACE_MAX;
if (totem_config->interface_count == 0) {
error_reason = "No interfaces defined";
goto parse_error;
}
for (i = 0; i < totem_config->interface_count; i++) {
/*
* Some error checking of parsed data to make sure its valid
*/
struct totem_ip_address null_addr;
memset (&null_addr, 0, sizeof (struct totem_ip_address));
if ((totem_config->transport_number == 0) &&
memcmp (&totem_config->interfaces[i].mcast_addr, &null_addr,
sizeof (struct totem_ip_address)) == 0) {
error_reason = "No multicast address specified";
goto parse_error;
}
if (totem_config->interfaces[i].ip_port == 0) {
error_reason = "No multicast port specified";
goto parse_error;
}
- if (totem_config->interfaces[i].ttl > 255 || totem_config->interfaces[i].ttl < 0) {
+ if (totem_config->interfaces[i].ttl > 255) {
error_reason = "Invalid TTL (should be 0..255)";
goto parse_error;
}
if (totem_config->transport_number != TOTEM_TRANSPORT_UDP &&
totem_config->interfaces[i].ttl != 1) {
error_reason = "Can only set ttl on multicast transport types";
goto parse_error;
}
if (totem_config->interfaces[i].mcast_addr.family == AF_INET6 &&
totem_config->node_id == 0) {
error_reason = "An IPV6 network requires that a node ID be specified.";
goto parse_error;
}
if (totem_config->broadcast_use == 0 && totem_config->transport_number == 0) {
if (totem_config->interfaces[i].mcast_addr.family != totem_config->interfaces[i].bindnet.family) {
error_reason = "Multicast address family does not match bind address family";
goto parse_error;
}
if (totem_config->interfaces[i].mcast_addr.family != totem_config->interfaces[i].bindnet.family) {
error_reason = "Not all bind address belong to the same IP family";
goto parse_error;
}
if (totemip_is_mcast (&totem_config->interfaces[i].mcast_addr) != 0) {
error_reason = "mcastaddr is not a correct multicast address.";
goto parse_error;
}
}
}
if (totem_config->version != 2) {
error_reason = "This totem parser can only parse version 2 configurations.";
goto parse_error;
}
if (totem_config->token_retransmits_before_loss_const == 0) {
totem_config->token_retransmits_before_loss_const =
TOKEN_RETRANSMITS_BEFORE_LOSS_CONST;
}
/*
* Setup timeout values that are not setup by user
*/
if (totem_config->token_timeout == 0) {
totem_config->token_timeout = TOKEN_TIMEOUT;
if (totem_config->token_retransmits_before_loss_const == 0) {
totem_config->token_retransmits_before_loss_const = TOKEN_RETRANSMITS_BEFORE_LOSS_CONST;
}
if (totem_config->token_retransmit_timeout == 0) {
totem_config->token_retransmit_timeout =
(int)(totem_config->token_timeout /
(totem_config->token_retransmits_before_loss_const + 0.2));
}
if (totem_config->token_hold_timeout == 0) {
totem_config->token_hold_timeout =
(int)(totem_config->token_retransmit_timeout * 0.8 -
(1000/HZ));
}
}
if (totem_config->max_network_delay == 0) {
totem_config->max_network_delay = MAX_NETWORK_DELAY;
}
if (totem_config->max_network_delay < MINIMUM_TIMEOUT) {
snprintf (local_error_reason, sizeof(local_error_reason),
"The max_network_delay parameter (%d ms) may not be less then (%d ms).",
totem_config->max_network_delay, MINIMUM_TIMEOUT);
goto parse_error;
}
if (totem_config->window_size == 0) {
totem_config->window_size = WINDOW_SIZE;
}
if (totem_config->max_messages == 0) {
totem_config->max_messages = MAX_MESSAGES;
}
if (totem_config->miss_count_const == 0) {
totem_config->miss_count_const = MISS_COUNT_CONST;
}
if (totem_config->token_timeout < MINIMUM_TIMEOUT) {
snprintf (local_error_reason, sizeof(local_error_reason),
"The token timeout parameter (%d ms) may not be less then (%d ms).",
totem_config->token_timeout, MINIMUM_TIMEOUT);
goto parse_error;
}
if (totem_config->token_retransmit_timeout == 0) {
totem_config->token_retransmit_timeout =
(int)(totem_config->token_timeout /
(totem_config->token_retransmits_before_loss_const + 0.2));
}
if (totem_config->token_hold_timeout == 0) {
totem_config->token_hold_timeout =
(int)(totem_config->token_retransmit_timeout * 0.8 -
(1000/HZ));
}
if (totem_config->token_retransmit_timeout < MINIMUM_TIMEOUT) {
snprintf (local_error_reason, sizeof(local_error_reason),
"The token retransmit timeout parameter (%d ms) may not be less then (%d ms).",
totem_config->token_retransmit_timeout, MINIMUM_TIMEOUT);
goto parse_error;
}
if (totem_config->token_hold_timeout == 0) {
totem_config->token_hold_timeout = TOKEN_HOLD_TIMEOUT;
}
if (totem_config->token_hold_timeout < MINIMUM_TIMEOUT) {
snprintf (local_error_reason, sizeof(local_error_reason),
"The token hold timeout parameter (%d ms) may not be less then (%d ms).",
totem_config->token_hold_timeout, MINIMUM_TIMEOUT);
goto parse_error;
}
if (totem_config->join_timeout == 0) {
totem_config->join_timeout = JOIN_TIMEOUT;
}
if (totem_config->join_timeout < MINIMUM_TIMEOUT) {
snprintf (local_error_reason, sizeof(local_error_reason),
"The join timeout parameter (%d ms) may not be less then (%d ms).",
totem_config->join_timeout, MINIMUM_TIMEOUT);
goto parse_error;
}
if (totem_config->consensus_timeout == 0) {
totem_config->consensus_timeout = (int)(float)(1.2 * totem_config->token_timeout);
}
if (totem_config->consensus_timeout < MINIMUM_TIMEOUT) {
snprintf (local_error_reason, sizeof(local_error_reason),
"The consensus timeout parameter (%d ms) may not be less then (%d ms).",
totem_config->consensus_timeout, MINIMUM_TIMEOUT);
goto parse_error;
}
if (totem_config->merge_timeout == 0) {
totem_config->merge_timeout = MERGE_TIMEOUT;
}
if (totem_config->merge_timeout < MINIMUM_TIMEOUT) {
snprintf (local_error_reason, sizeof(local_error_reason),
"The merge timeout parameter (%d ms) may not be less then (%d ms).",
totem_config->merge_timeout, MINIMUM_TIMEOUT);
goto parse_error;
}
if (totem_config->downcheck_timeout == 0) {
totem_config->downcheck_timeout = DOWNCHECK_TIMEOUT;
}
if (totem_config->downcheck_timeout < MINIMUM_TIMEOUT) {
snprintf (local_error_reason, sizeof(local_error_reason),
"The downcheck timeout parameter (%d ms) may not be less then (%d ms).",
totem_config->downcheck_timeout, MINIMUM_TIMEOUT);
goto parse_error;
}
/*
* RRP values validation
*/
if (strcmp (totem_config->rrp_mode, "none") &&
strcmp (totem_config->rrp_mode, "active") &&
strcmp (totem_config->rrp_mode, "passive")) {
snprintf (local_error_reason, sizeof(local_error_reason),
"The RRP mode \"%s\" specified is invalid. It must be none, active, or passive.\n", totem_config->rrp_mode);
goto parse_error;
}
if (totem_config->rrp_problem_count_timeout == 0) {
totem_config->rrp_problem_count_timeout = RRP_PROBLEM_COUNT_TIMEOUT;
}
if (totem_config->rrp_problem_count_timeout < MINIMUM_TIMEOUT) {
snprintf (local_error_reason, sizeof(local_error_reason),
"The RRP problem count timeout parameter (%d ms) may not be less then (%d ms).",
totem_config->rrp_problem_count_timeout, MINIMUM_TIMEOUT);
goto parse_error;
}
if (totem_config->rrp_problem_count_threshold == 0) {
totem_config->rrp_problem_count_threshold = RRP_PROBLEM_COUNT_THRESHOLD_DEFAULT;
}
if (totem_config->rrp_problem_count_threshold < RRP_PROBLEM_COUNT_THRESHOLD_MIN) {
snprintf (local_error_reason, sizeof(local_error_reason),
"The RRP problem count threshold (%d problem count) may not be less then (%d problem count).",
totem_config->rrp_problem_count_threshold, RRP_PROBLEM_COUNT_THRESHOLD_MIN);
goto parse_error;
}
if (totem_config->rrp_token_expired_timeout == 0) {
totem_config->rrp_token_expired_timeout =
totem_config->token_retransmit_timeout;
}
if (totem_config->rrp_token_expired_timeout < MINIMUM_TIMEOUT) {
snprintf (local_error_reason, sizeof(local_error_reason),
"The RRP token expired timeout parameter (%d ms) may not be less then (%d ms).",
totem_config->rrp_token_expired_timeout, MINIMUM_TIMEOUT);
goto parse_error;
}
if (strcmp (totem_config->rrp_mode, "none") == 0) {
interface_max = 1;
}
if (interface_max < totem_config->interface_count) {
snprintf (parse_error, sizeof(parse_error),
"%d is too many configured interfaces for the rrp_mode setting %s.",
totem_config->interface_count,
totem_config->rrp_mode);
error_reason = parse_error;
goto parse_error;
}
if (totem_config->fail_to_recv_const == 0) {
totem_config->fail_to_recv_const = FAIL_TO_RECV_CONST;
}
if (totem_config->seqno_unchanged_const == 0) {
totem_config->seqno_unchanged_const = SEQNO_UNCHANGED_CONST;
}
if (totem_config->net_mtu == 0) {
totem_config->net_mtu = 1500;
}
if ((MESSAGE_QUEUE_MAX) < totem_config->max_messages) {
snprintf (local_error_reason, sizeof(local_error_reason),
"The max_messages parameter (%d messages) may not be greater then (%d messages).",
totem_config->max_messages, MESSAGE_QUEUE_MAX);
goto parse_error;
}
if (totem_config->threads > SEND_THREADS_MAX) {
totem_config->threads = SEND_THREADS_MAX;
}
if (totem_config->secauth == 0) {
totem_config->threads = 0;
}
if (totem_config->net_mtu > FRAME_SIZE_MAX) {
error_reason = "This net_mtu parameter is greater then the maximum frame size";
goto parse_error;
}
if (totem_config->vsf_type == NULL) {
totem_config->vsf_type = "none";
}
return (0);
parse_error:
snprintf (error_string_response, sizeof(error_string_response),
"parse error in config: %s\n", error_reason);
*error_string = error_string_response;
return (-1);
}
static int read_keyfile (
const char *key_location,
struct totem_config *totem_config,
const char **error_string)
{
int fd;
int res;
ssize_t expected_key_len = sizeof (totem_config->private_key);
int saved_errno;
char error_str[100];
fd = open (key_location, O_RDONLY);
if (fd == -1) {
strerror_r (errno, error_str, 100);
snprintf (error_string_response, sizeof(error_string_response),
"Could not open %s: %s\n",
key_location, error_str);
goto parse_error;
}
res = read (fd, totem_config->private_key, expected_key_len);
saved_errno = errno;
close (fd);
if (res == -1) {
strerror_r (errno, error_str, 100);
snprintf (error_string_response, sizeof(error_string_response),
"Could not read %s: %s\n",
key_location, error_str);
goto parse_error;
}
totem_config->private_key_len = expected_key_len;
if (res != expected_key_len) {
snprintf (error_string_response, sizeof(error_string_response),
"Could only read %d bits of 1024 bits from %s.\n",
res * 8, key_location);
goto parse_error;
}
return 0;
parse_error:
*error_string = error_string_response;
return (-1);
}
int totem_config_keyread (
struct objdb_iface_ver0 *objdb,
struct totem_config *totem_config,
const char **error_string)
{
int got_key = 0;
const char *key_location = NULL;
hdb_handle_t object_totem_handle;
int res;
memset (totem_config->private_key, 0, 128);
totem_config->private_key_len = 128;
if (totem_config->secauth == 0) {
return (0);
}
res = totem_handle_find (objdb, &object_totem_handle);
if (res == -1) {
return (-1);
}
/* objdb may store the location of the key file */
if (!objdb_get_string (objdb,object_totem_handle, "keyfile", &key_location)
&& key_location) {
res = read_keyfile(key_location, totem_config, error_string);
if (res) {
goto key_error;
}
got_key = 1;
} else { /* Or the key itself may be in the objdb */
char *key = NULL;
size_t key_len;
res = objdb->object_key_get (object_totem_handle,
"key",
strlen ("key"),
(void *)&key,
&key_len);
if (res == 0 && key) {
if (key_len > sizeof (totem_config->private_key)) {
goto key_error;
}
memcpy(totem_config->private_key, key, key_len);
totem_config->private_key_len = key_len;
got_key = 1;
}
}
/* In desperation we read the default filename */
if (!got_key) {
const char *filename = getenv("COROSYNC_TOTEM_AUTHKEY_FILE");
if (!filename)
filename = COROSYSCONFDIR "/authkey";
res = read_keyfile(filename, totem_config, error_string);
if (res)
goto key_error;
}
return (0);
key_error:
*error_string = error_string_response;
return (-1);
}
static void totem_key_change_notify(object_change_type_t change_type,
hdb_handle_t parent_object_handle,
hdb_handle_t object_handle,
const void *object_name_pt, size_t object_name_len,
const void *key_name_pt, size_t key_len,
const void *key_value_pt, size_t key_value_len,
void *priv_data_pt)
{
struct totem_config *totem_config = priv_data_pt;
if (memcmp(object_name_pt, "totem", object_name_len) == 0)
totem_volatile_config_read(global_objdb,
totem_config,
object_handle); // CHECK
}
static void totem_objdb_reload_notify(objdb_reload_notify_type_t type, int flush,
void *priv_data_pt)
{
struct totem_config *totem_config = priv_data_pt;
hdb_handle_t totem_object_handle;
if (totem_config == NULL)
return;
/*
* A new totem {} key might exist, cancel the
* existing notification at the start of reload,
* and start a new one on the new object when
* it's all settled.
*/
if (type == OBJDB_RELOAD_NOTIFY_START) {
global_objdb->object_track_stop(
totem_key_change_notify,
NULL,
NULL,
NULL,
totem_config);
}
if (type == OBJDB_RELOAD_NOTIFY_END ||
type == OBJDB_RELOAD_NOTIFY_FAILED) {
if (!totem_handle_find(global_objdb,
&totem_object_handle)) {
global_objdb->object_track_start(totem_object_handle,
1,
totem_key_change_notify,
NULL, // object_create_notify,
NULL, // object_destroy_notify,
NULL, // object_reload_notify
totem_config); // priv_data
/*
* Reload the configuration
*/
totem_volatile_config_read(global_objdb,
totem_config,
totem_object_handle);
}
else {
log_printf(LOGSYS_LEVEL_ERROR, "totem objdb tracking stopped, cannot find totem{} handle on objdb\n");
}
}
}
static void add_totem_config_notification(
struct objdb_iface_ver0 *objdb,
struct totem_config *totem_config,
hdb_handle_t totem_object_handle)
{
global_objdb = objdb;
objdb->object_track_start(totem_object_handle,
1,
totem_key_change_notify,
NULL, // object_create_notify,
NULL, // object_destroy_notify,
NULL, // object_reload_notify
totem_config); // priv_data
/*
* Reload notify must be on the parent object
*/
objdb->object_track_start(OBJECT_PARENT_HANDLE,
1,
NULL, // key_change_notify,
NULL, // object_create_notify,
NULL, // object_destroy_notify,
totem_objdb_reload_notify, // object_reload_notify
totem_config); // priv_data
}
diff --git a/exec/util.h b/exec/util.h
index b5f3abef..263919bb 100644
--- a/exec/util.h
+++ b/exec/util.h
@@ -1,106 +1,107 @@
/*
* Copyright (c) 2002-2004 MontaVista Software, Inc.
* Copyright (c) 2004 Open Source Development Lab
*
* All rights reserved.
*
* Author: Steven Dake (sdake@redhat.com), Mark Haverkamp (markh@osdl.org)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef UTIL_H_DEFINED
#define UTIL_H_DEFINED
#include <sys/time.h>
#include <corosync/corotypes.h>
/*
* Get the time of day and convert to nanoseconds
*/
extern cs_time_t clust_time_now(void);
enum e_ais_done {
AIS_DONE_EXIT = 0,
AIS_DONE_UID_DETERMINE = 1,
AIS_DONE_GID_DETERMINE = 2,
AIS_DONE_MEMPOOL_INIT = 3,
AIS_DONE_FORK = 4,
AIS_DONE_LIBAIS_SOCKET = 5,
AIS_DONE_LIBAIS_BIND = 6,
AIS_DONE_READKEY = 7,
AIS_DONE_MAINCONFIGREAD = 8,
AIS_DONE_LOGSETUP = 9,
AIS_DONE_AMFCONFIGREAD = 10,
AIS_DONE_DYNAMICLOAD = 11,
AIS_DONE_OBJDB = 12,
AIS_DONE_INIT_SERVICES = 13,
AIS_DONE_OUT_OF_MEMORY = 14,
AIS_DONE_FATAL_ERR = 15,
AIS_DONE_DIR_NOT_PRESENT = 16,
AIS_DONE_AQUIRE_LOCK = 17,
AIS_DONE_ALREADY_RUNNING = 18,
+ AIS_DONE_STD_TO_NULL_REDIR = 19,
};
static inline cs_error_t hdb_error_to_cs (int res) \
{ \
if (res == 0) { \
return (CS_OK); \
} else { \
if (errno == EBADF) { \
return (CS_ERR_BAD_HANDLE); \
} else \
if (errno == ENOMEM) { \
return (CS_ERR_NO_MEMORY); \
} else \
if (errno == EMFILE) { \
return (CS_ERR_NO_RESOURCES); \
} else \
if (errno == EACCES) { \
return (CS_ERR_SECURITY); \
} \
return (CS_ERR_LIBRARY); \
} \
}
/*
* Compare two names. returns non-zero on match.
*/
extern int name_match(cs_name_t *name1, cs_name_t *name2);
#define corosync_exit_error(err) _corosync_exit_error ((err), __FILE__, __LINE__)
extern void _corosync_exit_error (enum e_ais_done err, const char *file,
unsigned int line) __attribute__((noreturn));
void _corosync_out_of_memory_error (void) __attribute__((noreturn));
extern char *getcs_name_t (cs_name_t *name);
extern void setcs_name_t (cs_name_t *name, char *str);
extern int cs_name_tisEqual (cs_name_t *str1, char *str2);
/**
* Get the short name of a service from the service_id.
*/
const char * short_service_name_get(uint32_t service_id,
char *buf, size_t buf_size);
#endif /* UTIL_H_DEFINED */
diff --git a/tools/corosync-notifyd.c b/tools/corosync-notifyd.c
index ee2f7a06..5ac2b834 100644
--- a/tools/corosync-notifyd.c
+++ b/tools/corosync-notifyd.c
@@ -1,1081 +1,1085 @@
/*
* Copyright (c) 2011 Red Hat
*
* All rights reserved.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <config.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/types.h>
#include <netdb.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include <ctype.h>
#include <poll.h>
#include <signal.h>
#include <syslog.h>
#include <corosync/corotypes.h>
#include <corosync/totem/coropoll.h>
#include <corosync/confdb.h>
#include <corosync/cfg.h>
#include <corosync/quorum.h>
/*
* generic declarations
*/
enum {
CS_NTF_LOG,
CS_NTF_STDOUT,
CS_NTF_SNMP,
CS_NTF_DBUS,
CS_NTF_FG,
CS_NTF_MAX,
};
static int conf[CS_NTF_MAX];
static int32_t _cs_is_quorate = 0;
typedef void (*node_membership_fn_t)(char *nodename, uint32_t nodeid, char *state, char* ip);
typedef void (*node_quorum_fn_t)(char *nodename, uint32_t nodeid, const char *state);
typedef void (*application_connection_fn_t)(char *nodename, uint32_t nodeid, char *app_name, const char *state);
struct notify_callbacks {
node_membership_fn_t node_membership_fn;
node_quorum_fn_t node_quorum_fn;
application_connection_fn_t application_connection_fn;
};
#define MAX_NOTIFIERS 5
static int num_notifiers = 0;
static struct notify_callbacks notifiers[MAX_NOTIFIERS];
static uint32_t local_nodeid = 0;
static char local_nodename[CS_MAX_NAME_LENGTH];
static hdb_handle_t poll_handle;
static quorum_handle_t quorum_handle;
static void _cs_node_membership_event(char *nodename, uint32_t nodeid, char *state, char* ip);
static void _cs_node_quorum_event(const char *state);
static void _cs_application_connection_event(char *app_name, const char *state);
#ifdef HAVE_DBUS
#include <dbus/dbus.h>
/*
* dbus
*/
#define DBUS_CS_NAME "org.corosync"
#define DBUS_CS_IFACE "org.corosync"
#define DBUS_CS_PATH "/org/corosync"
static DBusConnection *db = NULL;
static char _err[512];
static int err_set = 0;
static void _cs_dbus_init(void);
#endif /* HAVE_DBUS */
#ifdef ENABLE_SNMP
#include <net-snmp/net-snmp-config.h>
#include <net-snmp/snmpv3_api.h>
#include <net-snmp/agent/agent_trap.h>
#include <net-snmp/library/mib.h>
#include <net-snmp/library/snmp_api.h>
#include <net-snmp/library/snmp_client.h>
#include <net-snmp/library/snmp_debug.h>
enum snmp_node_status {
SNMP_NODE_STATUS_UNKNOWN = 0,
SNMP_NODE_STATUS_JOINED = 1,
SNMP_NODE_STATUS_LEFT = 2
};
#define SNMP_OID_COROSYNC "1.3.6.1.4.1.35488"
#define SNMP_OID_OBJECT_ROOT SNMP_OID_COROSYNC ".1"
#define SNMP_OID_OBJECT_NODE_NAME SNMP_OID_OBJECT_ROOT ".1"
#define SNMP_OID_OBJECT_NODE_ID SNMP_OID_OBJECT_ROOT ".2"
#define SNMP_OID_OBJECT_NODE_STATUS SNMP_OID_OBJECT_ROOT ".3"
#define SNMP_OID_OBJECT_NODE_ADDR SNMP_OID_OBJECT_ROOT ".4"
#define SNMP_OID_OBJECT_RINGSEQ SNMP_OID_OBJECT_ROOT ".20"
#define SNMP_OID_OBJECT_QUORUM SNMP_OID_OBJECT_ROOT ".21"
#define SNMP_OID_OBJECT_APP_NAME SNMP_OID_OBJECT_ROOT ".40"
#define SNMP_OID_OBJECT_APP_STATUS SNMP_OID_OBJECT_ROOT ".41"
#define SNMP_OID_TRAPS_ROOT SNMP_OID_COROSYNC ".0"
#define SNMP_OID_TRAPS_NODE SNMP_OID_TRAPS_ROOT ".1"
#define SNMP_OID_TRAPS_QUORUM SNMP_OID_TRAPS_ROOT ".2"
#define SNMP_OID_TRAPS_APP SNMP_OID_TRAPS_ROOT ".3"
#define CS_TIMESTAMP_STR_LEN 20
static const char *local_host = "localhost";
#endif /* ENABLE_SNMP */
static char snmp_manager_buf[CS_MAX_NAME_LENGTH];
static char *snmp_manager = NULL;
/*
* confdb
*/
#define SEPERATOR_STR "."
static confdb_handle_t confdb_handle;
static void _cs_confdb_key_changed(confdb_handle_t handle,
confdb_change_type_t change_type,
hdb_handle_t parent_object_handle,
hdb_handle_t object_handle,
const void *object_name, size_t object_name_len,
const void *key_name, size_t key_name_len,
const void *key_value, size_t key_value_len);
static void _cs_confdb_object_created(confdb_handle_t handle,
hdb_handle_t parent_object_handle,
hdb_handle_t object_handle,
const void *name_pt, size_t name_len);
static void _cs_confdb_object_deleted(confdb_handle_t handle,
hdb_handle_t parent_object_handle,
const void *name_pt, size_t name_len);
static confdb_callbacks_t callbacks = {
.confdb_key_change_notify_fn = _cs_confdb_key_changed,
.confdb_object_create_change_notify_fn = _cs_confdb_object_created,
.confdb_object_delete_change_notify_fn = _cs_confdb_object_deleted,
};
static int32_t _cs_ip_to_hostname(char* ip, char* name_out)
{
struct sockaddr_in sa;
int rc;
if (strchr(ip, ':') == NULL) {
sa.sin_family = AF_INET;
} else {
sa.sin_family = AF_INET6;
}
rc = inet_pton(sa.sin_family, ip, &sa.sin_addr);
if (rc == 0) {
return -EINVAL;
}
rc = getnameinfo((struct sockaddr*)&sa, sizeof(sa),
name_out, CS_MAX_NAME_LENGTH, NULL, 0, 0);
if (rc != 0) {
syslog (LOG_ERR, "error looking up %s : %s\n", ip, gai_strerror(rc));
return -EINVAL;
}
return 0;
}
static void
_cs_confdb_key_changed(confdb_handle_t handle,
confdb_change_type_t change_type,
hdb_handle_t parent_object_handle,
hdb_handle_t object_handle,
const void *object_name_pt, size_t object_name_len,
const void *key_name_pt, size_t key_name_len,
const void *key_value_pt, size_t key_value_len)
{
char parent_name[CS_MAX_NAME_LENGTH];
size_t len = 0;
hdb_handle_t real_parent_object_handle;
cs_error_t rc = CS_OK;
char nodename[CS_MAX_NAME_LENGTH];
char nodeid_str[CS_MAX_NAME_LENGTH];
uint32_t nodeid;
char status[CS_MAX_NAME_LENGTH];
char ip[CS_MAX_NAME_LENGTH];
size_t ip_len;
confdb_value_types_t type;
char* open_bracket = NULL;
char* close_bracket = NULL;
rc = confdb_object_parent_get (handle,
parent_object_handle, &real_parent_object_handle);
assert(rc == CS_OK);
rc = confdb_object_name_get (handle,
real_parent_object_handle,
parent_name,
&len);
parent_name[len] = '\0';
assert(rc == CS_OK);
if (strcmp(parent_name, "members") == 0) {
if (strncmp(key_name_pt, "status", strlen("status")) == 0) {
memcpy(nodeid_str, object_name_pt, object_name_len);
nodeid_str[object_name_len] = '\0';
nodeid = atoi(nodeid_str);
memcpy(status, key_value_pt, key_value_len);
status[key_value_len] = '\0';
rc = confdb_key_get_typed(handle, parent_object_handle,
"ip", ip, &ip_len, &type);
assert(rc == CS_OK);
ip[ip_len-1] = '\0';
/*
* We want the ip out of: "r(0) ip(192.168.100.92)"
*/
open_bracket = strrchr(ip, '(');
open_bracket++;
close_bracket = strrchr(open_bracket, ')');
*close_bracket = '\0';
_cs_ip_to_hostname(open_bracket, nodename);
_cs_node_membership_event(nodename, nodeid, status, open_bracket);
}
}
}
static void
_cs_confdb_object_created(confdb_handle_t handle,
hdb_handle_t parent_object_handle,
hdb_handle_t object_handle,
const void *name_pt,
size_t name_len)
{
char parent_name[CS_MAX_NAME_LENGTH];
size_t len = 0;
char obj_name[CS_MAX_NAME_LENGTH];
cs_error_t rc = CS_OK;
memcpy(obj_name, name_pt, name_len);
obj_name[name_len] = '\0';
rc = confdb_object_name_get (handle,
object_handle, parent_name, &len);
parent_name[len] = '\0';
if (rc != CS_OK) {
return;
}
if (strcmp(parent_name, "connections") == 0) {
_cs_application_connection_event(obj_name, "connected");
}
}
static void
_cs_confdb_object_deleted(confdb_handle_t handle,
hdb_handle_t parent_object_handle,
const void *name_pt,
size_t name_len)
{
char obj_name[CS_MAX_NAME_LENGTH];
char parent_name[CS_MAX_NAME_LENGTH];
size_t len = 0;
cs_error_t rc;
memcpy(obj_name, name_pt, name_len);
obj_name[name_len] = '\0';
rc = confdb_object_name_get (handle,
parent_object_handle, parent_name, &len);
parent_name[len] = '\0';
assert(rc == CS_OK);
if (strcmp(parent_name, "connections") == 0) {
_cs_application_connection_event(obj_name, "disconnected");
}
}
static cs_error_t
_cs_confdb_find_object (confdb_handle_t handle,
const char * name_pt,
hdb_handle_t * out_handle)
{
char * obj_name_pt;
char * save_pt;
hdb_handle_t obj_handle;
confdb_handle_t parent_object_handle = OBJECT_PARENT_HANDLE;
char tmp_name[CS_MAX_NAME_LENGTH];
cs_error_t res = CS_OK;
strncpy (tmp_name, name_pt, CS_MAX_NAME_LENGTH);
obj_name_pt = strtok_r(tmp_name, SEPERATOR_STR, &save_pt);
while (obj_name_pt != NULL) {
res = confdb_object_find_start(handle, parent_object_handle);
if (res != CS_OK) {
syslog (LOG_ERR, "Could not start object_find %d\n", res);
exit (EXIT_FAILURE);
}
res = confdb_object_find(handle, parent_object_handle,
obj_name_pt, strlen (obj_name_pt), &obj_handle);
if (res != CS_OK) {
return res;
}
parent_object_handle = obj_handle;
obj_name_pt = strtok_r (NULL, SEPERATOR_STR, &save_pt);
}
*out_handle = parent_object_handle;
return res;
}
static int
_cs_confdb_dispatch(hdb_handle_t handle,
int fd, int revents, void *data)
{
confdb_dispatch(confdb_handle, CS_DISPATCH_ONE);
return 0;
}
static void _cs_quorum_notification(quorum_handle_t handle,
uint32_t quorate, uint64_t ring_seq,
uint32_t view_list_entries, uint32_t *view_list)
{
if (_cs_is_quorate == quorate) {
return;
}
_cs_is_quorate = quorate;
if (quorate) {
_cs_node_quorum_event("quorate");
} else {
_cs_node_quorum_event("not quorate");
}
}
static int
_cs_quorum_dispatch(hdb_handle_t handle,
int fd, int revents, void *data)
{
quorum_dispatch(quorum_handle, CS_DISPATCH_ONE);
return 0;
}
static void
_cs_quorum_init(void)
{
cs_error_t rc;
int fd;
quorum_callbacks_t quorum_callbacks = {
.quorum_notify_fn = _cs_quorum_notification,
};
rc = quorum_initialize (&quorum_handle, &quorum_callbacks);
if (rc != CS_OK) {
syslog(LOG_ERR, "Could not connect to corosync(quorum)");
return;
}
quorum_fd_get(quorum_handle, &fd);
poll_dispatch_add (poll_handle, fd, POLLIN|POLLNVAL, NULL,
_cs_quorum_dispatch);
quorum_trackstart(quorum_handle, CS_TRACK_CHANGES);
}
static void
_cs_quorum_finalize(void)
{
quorum_finalize (quorum_handle);
}
#ifdef HAVE_DBUS
/*
* dbus notifications
*/
static void
_cs_dbus_auto_flush(void)
{
dbus_connection_ref(db);
dbus_connection_read_write(db, 500);
dbus_connection_unref(db);
}
static void
_cs_dbus_release(void)
{
DBusError err;
if (!db)
return;
dbus_error_init(&err);
dbus_bus_release_name(db, DBUS_CS_NAME, &err);
dbus_error_free(&err);
dbus_connection_unref(db);
db = NULL;
}
static void
_cs_dbus_node_quorum_event(char *nodename, uint32_t nodeid, const char *state)
{
DBusMessage *msg = NULL;
int ret = -1;
if (err_set) {
syslog (LOG_ERR, "%s\n", _err);
err_set = 0;
}
if (!db) {
goto out_free;
}
if (dbus_connection_get_is_connected(db) != TRUE) {
err_set = 1;
snprintf(_err, sizeof(_err), "DBus connection lost");
_cs_dbus_release();
goto out_unlock;
}
_cs_dbus_auto_flush();
if (!(msg = dbus_message_new_signal(DBUS_CS_PATH,
DBUS_CS_IFACE,
"QuorumStateChange"))) {
syslog (LOG_ERR, "%s(%d) error\n", __func__, __LINE__);
goto out_unlock;
}
if (!dbus_message_append_args(msg,
DBUS_TYPE_STRING, &nodename,
DBUS_TYPE_UINT32, &nodeid,
DBUS_TYPE_STRING, &state,
DBUS_TYPE_INVALID)) {
syslog (LOG_ERR, "%s(%d) error\n", __func__, __LINE__);
goto out_unlock;
}
dbus_connection_send(db, msg, NULL);
ret = 0;
out_unlock:
if (ret == -1) {
syslog (LOG_ERR, "%s() error\n", __func__);
}
if (msg)
dbus_message_unref(msg);
out_free:
return;
}
static void
_cs_dbus_node_membership_event(char *nodename, uint32_t nodeid, char *state, char* ip)
{
DBusMessage *msg = NULL;
int ret = -1;
if (err_set) {
syslog (LOG_ERR, "%s\n", _err);
err_set = 0;
}
if (!db) {
goto out_free;
}
if (dbus_connection_get_is_connected(db) != TRUE) {
err_set = 1;
snprintf(_err, sizeof(_err), "DBus connection lost");
_cs_dbus_release();
goto out_unlock;
}
_cs_dbus_auto_flush();
if (!(msg = dbus_message_new_signal(DBUS_CS_PATH,
DBUS_CS_IFACE,
"NodeStateChange"))) {
syslog (LOG_ERR, "%s(%d) error\n", __func__, __LINE__);
goto out_unlock;
}
if (!dbus_message_append_args(msg,
DBUS_TYPE_STRING, &nodename,
DBUS_TYPE_UINT32, &nodeid,
DBUS_TYPE_STRING, &ip,
DBUS_TYPE_STRING, &state,
DBUS_TYPE_INVALID)) {
syslog (LOG_ERR, "%s(%d) error\n", __func__, __LINE__);
goto out_unlock;
}
dbus_connection_send(db, msg, NULL);
ret = 0;
out_unlock:
if (ret == -1) {
syslog (LOG_ERR, "%s() error\n", __func__);
}
if (msg)
dbus_message_unref(msg);
out_free:
return;
}
static void
_cs_dbus_application_connection_event(char *nodename, uint32_t nodeid, char *app_name, const char *state)
{
DBusMessage *msg = NULL;
int ret = -1;
if (err_set) {
syslog (LOG_ERR, "%s\n", _err);
err_set = 0;
}
if (!db) {
goto out_free;
}
if (dbus_connection_get_is_connected(db) != TRUE) {
err_set = 1;
snprintf(_err, sizeof(_err), "DBus connection lost");
_cs_dbus_release();
goto out_unlock;
}
_cs_dbus_auto_flush();
if (!(msg = dbus_message_new_signal(DBUS_CS_PATH,
DBUS_CS_IFACE,
"ConnectionStateChange"))) {
syslog (LOG_ERR, "%s(%d) error\n", __func__, __LINE__);
goto out_unlock;
}
if (!dbus_message_append_args(msg,
DBUS_TYPE_STRING, &nodename,
DBUS_TYPE_UINT32, &nodeid,
DBUS_TYPE_STRING, &app_name,
DBUS_TYPE_STRING, &state,
DBUS_TYPE_INVALID)) {
syslog (LOG_ERR, "%s(%d) error\n", __func__, __LINE__);
goto out_unlock;
}
dbus_connection_send(db, msg, NULL);
ret = 0;
out_unlock:
if (msg)
dbus_message_unref(msg);
out_free:
return;
}
static void
_cs_dbus_init(void)
{
DBusConnection *dbc = NULL;
DBusError err;
dbus_error_init(&err);
dbc = dbus_bus_get(DBUS_BUS_SYSTEM, &err);
if (!dbc) {
snprintf(_err, sizeof(_err),
"dbus_bus_get: %s", err.message);
err_set = 1;
dbus_error_free(&err);
return;
}
dbus_connection_set_exit_on_disconnect(dbc, FALSE);
db = dbc;
notifiers[num_notifiers].node_membership_fn =
_cs_dbus_node_membership_event;
notifiers[num_notifiers].node_quorum_fn =
_cs_dbus_node_quorum_event;
notifiers[num_notifiers].application_connection_fn =
_cs_dbus_application_connection_event;
num_notifiers++;
}
#endif /* HAVE_DBUS */
#ifdef ENABLE_SNMP
static netsnmp_session *snmp_init (const char *target)
{
static netsnmp_session *session = NULL;
#ifndef NETSNMPV54
char default_port[128];
snprintf (default_port, sizeof (default_port), "%s:162", target);
#endif
if (session) {
return (session);
}
if (target == NULL) {
return NULL;
}
session = malloc (sizeof (netsnmp_session));
snmp_sess_init (session);
session->version = SNMP_VERSION_2c;
session->callback = NULL;
session->callback_magic = NULL;
session = snmp_add(session,
#ifdef NETSNMPV54
netsnmp_transport_open_client ("snmptrap", target),
#else
netsnmp_tdomain_transport (default_port, 0, "udp"),
#endif
NULL, NULL);
if (session == NULL) {
syslog(LOG_ERR, "Could not create snmp transport");
}
return (session);
}
static inline void add_field (
netsnmp_pdu *trap_pdu,
u_char asn_type,
const char *prefix,
void *value,
size_t value_size)
{
oid _oid[MAX_OID_LEN];
size_t _oid_len = MAX_OID_LEN;
if (snmp_parse_oid(prefix, _oid, &_oid_len)) {
snmp_pdu_add_variable (trap_pdu, _oid, _oid_len, asn_type, (u_char *) value, value_size);
}
}
static void
_cs_snmp_node_membership_event(char *nodename, uint32_t nodeid, char *state, char* ip)
{
int ret;
char csysuptime[CS_TIMESTAMP_STR_LEN];
static oid snmptrap_oid[] = { 1,3,6,1,6,3,1,1,4,1,0 };
static oid sysuptime_oid[] = { 1,3,6,1,2,1,1,3,0 };
time_t now = time (NULL);
netsnmp_pdu *trap_pdu;
netsnmp_session *session = snmp_init (snmp_manager);
if (session == NULL) {
syslog (LOG_NOTICE, "Failed to init SNMP session.\n");
return ;
}
trap_pdu = snmp_pdu_create (SNMP_MSG_TRAP2);
if (!trap_pdu) {
syslog (LOG_NOTICE, "Failed to create SNMP notification.\n");
return ;
}
/* send uptime */
snprintf (csysuptime, CS_TIMESTAMP_STR_LEN, "%ld", now);
snmp_add_var (trap_pdu, sysuptime_oid, sizeof (sysuptime_oid) / sizeof (oid), 't', csysuptime);
snmp_add_var (trap_pdu, snmptrap_oid, sizeof (snmptrap_oid) / sizeof (oid), 'o', SNMP_OID_TRAPS_NODE);
/* Add extries to the trap */
add_field (trap_pdu, ASN_OCTET_STR, SNMP_OID_OBJECT_NODE_NAME, (void*)nodename, strlen (nodename));
add_field (trap_pdu, ASN_INTEGER, SNMP_OID_OBJECT_NODE_ID, (void*)&nodeid, sizeof (nodeid));
add_field (trap_pdu, ASN_OCTET_STR, SNMP_OID_OBJECT_NODE_ADDR, (void*)ip, strlen (ip));
add_field (trap_pdu, ASN_OCTET_STR, SNMP_OID_OBJECT_NODE_STATUS, (void*)state, strlen (state));
/* Send and cleanup */
ret = snmp_send (session, trap_pdu);
if (ret == 0) {
/* error */
syslog (LOG_ERR, "Could not send SNMP trap");
snmp_free_pdu (trap_pdu);
}
}
static void
_cs_snmp_node_quorum_event(char *nodename, uint32_t nodeid,
const char *state)
{
int ret;
char csysuptime[20];
static oid snmptrap_oid[] = { 1,3,6,1,6,3,1,1,4,1,0 };
static oid sysuptime_oid[] = { 1,3,6,1,2,1,1,3,0 };
time_t now = time (NULL);
netsnmp_pdu *trap_pdu;
netsnmp_session *session = snmp_init (snmp_manager);
if (session == NULL) {
syslog (LOG_NOTICE, "Failed to init SNMP session.\n");
return ;
}
trap_pdu = snmp_pdu_create (SNMP_MSG_TRAP2);
if (!trap_pdu) {
syslog (LOG_NOTICE, "Failed to create SNMP notification.\n");
return ;
}
/* send uptime */
sprintf (csysuptime, "%ld", now);
snmp_add_var (trap_pdu, sysuptime_oid, sizeof (sysuptime_oid) / sizeof (oid), 't', csysuptime);
snmp_add_var (trap_pdu, snmptrap_oid, sizeof (snmptrap_oid) / sizeof (oid), 'o', SNMP_OID_TRAPS_NODE);
/* Add extries to the trap */
add_field (trap_pdu, ASN_OCTET_STR, SNMP_OID_OBJECT_NODE_NAME, (void*)nodename, strlen (nodename));
add_field (trap_pdu, ASN_INTEGER, SNMP_OID_OBJECT_NODE_ID, (void*)&nodeid, sizeof (nodeid));
add_field (trap_pdu, ASN_OCTET_STR, SNMP_OID_OBJECT_QUORUM, (void*)state, strlen (state));
/* Send and cleanup */
ret = snmp_send (session, trap_pdu);
if (ret == 0) {
/* error */
syslog (LOG_ERR, "Could not send SNMP trap");
snmp_free_pdu (trap_pdu);
}
}
static void
_cs_snmp_init(void)
{
if (snmp_manager == NULL) {
snmp_manager = (char*)local_host;
}
notifiers[num_notifiers].node_membership_fn =
_cs_snmp_node_membership_event;
notifiers[num_notifiers].node_quorum_fn =
_cs_snmp_node_quorum_event;
notifiers[num_notifiers].application_connection_fn = NULL;
num_notifiers++;
}
#endif /* ENABLE_SNMP */
static void
_cs_syslog_node_membership_event(char *nodename, uint32_t nodeid, char *state, char* ip)
{
syslog (LOG_NOTICE, "%s[%d] ip:%s %s\n", nodename, nodeid, ip, state);
}
static void
_cs_syslog_node_quorum_event(char *nodename, uint32_t nodeid, const char *state)
{
if (strcmp(state, "quorate") == 0) {
syslog (LOG_NOTICE, "%s[%d] is now %s\n", nodename, nodeid, state);
} else {
syslog (LOG_NOTICE, "%s[%d] has lost quorum\n", nodename, nodeid);
}
}
static void
_cs_syslog_application_connection_event(char *nodename, uint32_t nodeid, char* app_name, const char *state)
{
if (strcmp(state, "connected") == 0) {
syslog (LOG_ERR, "%s[%d] %s is now %s to corosync\n", nodename, nodeid, app_name, state);
} else {
syslog (LOG_ERR, "%s[%d] %s is now %s from corosync\n", nodename, nodeid, app_name, state);
}
}
static void
_cs_node_membership_event(char *nodename, uint32_t nodeid, char *state, char* ip)
{
int i;
for (i = 0; i < num_notifiers; i++) {
if (notifiers[i].node_membership_fn) {
notifiers[i].node_membership_fn(nodename, nodeid, state, ip);
}
}
}
static void
_cs_local_node_info_get(char **nodename, uint32_t *nodeid)
{
cs_error_t rc;
corosync_cfg_handle_t cfg_handle;
if (local_nodeid == 0) {
corosync_cfg_initialize(&cfg_handle, NULL);
rc = corosync_cfg_local_get (cfg_handle, &local_nodeid);
corosync_cfg_finalize(cfg_handle);
if (rc != CS_OK) {
local_nodeid = 0;
strncpy(local_nodename, "localhost", CS_MAX_NAME_LENGTH);
} else {
gethostname(local_nodename, CS_MAX_NAME_LENGTH);
}
}
*nodeid = local_nodeid;
*nodename = local_nodename;
}
static void
_cs_node_quorum_event(const char *state)
{
int i;
char *nodename;
uint32_t nodeid;
_cs_local_node_info_get(&nodename, &nodeid);
for (i = 0; i < num_notifiers; i++) {
if (notifiers[i].node_quorum_fn) {
notifiers[i].node_quorum_fn(nodename, nodeid, state);
}
}
}
static void
_cs_application_connection_event(char *app_name, const char *state)
{
int i;
char *nodename;
uint32_t nodeid;
_cs_local_node_info_get(&nodename, &nodeid);
for (i = 0; i < num_notifiers; i++) {
if (notifiers[i].application_connection_fn) {
notifiers[i].application_connection_fn(nodename, nodeid, app_name, state);
}
}
}
static void
sig_exit_handler (int num)
{
poll_stop(poll_handle);
}
static void
_cs_confdb_init(void)
{
hdb_handle_t obj_handle;
cs_error_t rc;
int conf_fd = 0;
rc = confdb_initialize (&confdb_handle, &callbacks);
if (rc != CS_OK) {
syslog (LOG_ERR, "Failed to initialize the objdb API. Error %d\n", rc);
exit (EXIT_FAILURE);
}
confdb_fd_get(confdb_handle, &conf_fd);
poll_dispatch_add (poll_handle, conf_fd, POLLIN|POLLNVAL, NULL,
_cs_confdb_dispatch);
rc = _cs_confdb_find_object (confdb_handle, "runtime.connections.",
&obj_handle);
if (rc != CS_OK) {
syslog (LOG_ERR,
"Failed to find the connections object. Error %d\n", rc);
exit (EXIT_FAILURE);
}
rc = confdb_track_changes (confdb_handle, obj_handle,
CONFDB_TRACK_DEPTH_ONE);
if (rc != CS_OK) {
syslog (LOG_ERR,
"Failed to track the connections object. Error %d\n", rc);
exit (EXIT_FAILURE);
}
rc = _cs_confdb_find_object(confdb_handle,
"runtime.totem.pg.mrp.srp.members.", &obj_handle);
if (rc != CS_OK) {
syslog (LOG_ERR, "Failed to find the object. Error %d\n", rc);
exit (EXIT_FAILURE);
}
rc = confdb_track_changes(confdb_handle,
obj_handle, CONFDB_TRACK_DEPTH_RECURSIVE);
if (rc != CS_OK) {
syslog (LOG_ERR,
"Failed to track the object. Error %d\n", rc);
exit (EXIT_FAILURE);
}
}
static void
_cs_confdb_finalize(void)
{
confdb_stop_track_changes (confdb_handle);
confdb_finalize (confdb_handle);
}
static void
_cs_check_config(void)
{
if (conf[CS_NTF_LOG] == 0 &&
conf[CS_NTF_STDOUT] == 0 &&
conf[CS_NTF_SNMP] == 0 &&
conf[CS_NTF_DBUS] == 0) {
syslog(LOG_ERR, "no event type enabled, see corosync-notifyd -h, exiting.");
exit(EXIT_FAILURE);
}
#ifndef ENABLE_SNMP
if (conf[CS_NTF_SNMP]) {
syslog(LOG_ERR, "Not compiled with SNMP support enabled, exiting.");
exit(EXIT_FAILURE);
}
#endif
#ifndef HAVE_DBUS
if (conf[CS_NTF_DBUS]) {
syslog(LOG_ERR, "Not compiled with DBus support enabled, exiting.");
exit(EXIT_FAILURE);
}
#endif
if (conf[CS_NTF_STDOUT] && !conf[CS_NTF_FG]) {
syslog(LOG_ERR, "configured to print to stdout and run in the background, exiting");
exit(EXIT_FAILURE);
}
if (conf[CS_NTF_SNMP] && conf[CS_NTF_DBUS]) {
syslog(LOG_ERR, "configured to send snmp traps and dbus signals - are you sure?.");
}
}
static void
_cs_usage(void)
{
fprintf(stderr, "usage:\n"\
" -f : Start application in foreground.\n"\
" -l : Log all events.\n"\
" -o : Print events to stdout (turns on -l).\n"\
" -s : Send SNMP traps on all events.\n"\
" -m : SNMP Manager IP address (defaults to localhost).\n"\
" -d : Send DBUS signals on all events.\n"\
" -h : Print this help\n\n");
}
int
main(int argc, char *argv[])
{
int ch;
conf[CS_NTF_FG] = 0;
conf[CS_NTF_LOG] = 0;
conf[CS_NTF_STDOUT] = 0;
conf[CS_NTF_SNMP] = 0;
conf[CS_NTF_DBUS] = 0;
while ((ch = getopt (argc, argv, "floshdm:")) != EOF) {
switch (ch) {
case 'f':
conf[CS_NTF_FG] = 1;
break;
case 'l':
conf[CS_NTF_LOG] = 1;
break;
case 'm':
conf[CS_NTF_SNMP] = 1;
strncpy(snmp_manager_buf, optarg, CS_MAX_NAME_LENGTH);
snmp_manager = snmp_manager_buf;
break;
case 'o':
conf[CS_NTF_LOG] = 1;
conf[CS_NTF_STDOUT] = 1;
break;
case 's':
conf[CS_NTF_SNMP] = 1;
break;
case 'd':
conf[CS_NTF_DBUS] = 1;
break;
case 'h':
default:
_cs_usage();
return EXIT_FAILURE;
}
}
if (conf[CS_NTF_STDOUT]) {
openlog(NULL, LOG_PID|LOG_PERROR, LOG_DAEMON);
} else {
openlog(NULL, LOG_PID, LOG_DAEMON);
}
_cs_check_config();
if (!conf[CS_NTF_FG]) {
- daemon(0, 0);
+ if (daemon(0, 0) < 0)
+ {
+ perror("daemon() failed");
+ return EXIT_FAILURE;
+ }
}
num_notifiers = 0;
if (conf[CS_NTF_LOG]) {
notifiers[num_notifiers].node_membership_fn =
_cs_syslog_node_membership_event;
notifiers[num_notifiers].node_quorum_fn =
_cs_syslog_node_quorum_event;
notifiers[num_notifiers].application_connection_fn =
_cs_syslog_application_connection_event;
num_notifiers++;
}
poll_handle = poll_create();
_cs_confdb_init();
_cs_quorum_init();
#ifdef HAVE_DBUS
if (conf[CS_NTF_DBUS]) {
_cs_dbus_init();
}
#endif /* HAVE_DBUS */
#ifdef ENABLE_SNMP
if (conf[CS_NTF_SNMP]) {
_cs_snmp_init();
}
#endif /* ENABLE_SNMP */
(void)signal (SIGINT, sig_exit_handler);
(void)signal (SIGQUIT, sig_exit_handler);
(void)signal (SIGTERM, sig_exit_handler);
poll_run(poll_handle);
#ifdef HAVE_DBUS
if (conf[CS_NTF_DBUS]) {
_cs_dbus_release();
}
#endif /* HAVE_DBUS */
_cs_quorum_finalize();
_cs_confdb_finalize();
return 0;
}

File Metadata

Mime Type
text/x-diff
Expires
Mon, Dec 23, 11:59 AM (1 d, 1 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1128293
Default Alt Text
(173 KB)

Event Timeline