Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F4512216
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
100 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/exec/coroipcs.c b/exec/coroipcs.c
index afef48fa..f246c8f7 100644
--- a/exec/coroipcs.c
+++ b/exec/coroipcs.c
@@ -1,1721 +1,1733 @@
/*
* Copyright (c) 2006-2009 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Steven Dake (sdake@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <config.h>
#ifndef _GNU_SOURCE
#define _GNU_SOURCE 1
#endif
#include <pthread.h>
#include <limits.h>
#include <assert.h>
#include <pwd.h>
#include <grp.h>
#include <sys/types.h>
#include <sys/poll.h>
#include <sys/uio.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/wait.h>
#include <sys/stat.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <signal.h>
#include <sched.h>
#include <time.h>
#if defined(HAVE_GETPEERUCRED)
#include <ucred.h>
#endif
#include <string.h>
#include <sys/shm.h>
#include <corosync/corotypes.h>
#include <corosync/list.h>
#include <corosync/coroipc_types.h>
#include <corosync/hdb.h>
#include <corosync/coroipcs.h>
#include <corosync/coroipc_ipc.h>
#define LOGSYS_UTILS_ONLY 1
#include <corosync/engine/logsys.h>
#if _POSIX_THREAD_PROCESS_SHARED > 0
#include <semaphore.h>
#else
#include <sys/sem.h>
#endif
+#include "util.h"
#ifndef MSG_NOSIGNAL
#define MSG_NOSIGNAL 0
#endif
#define SERVER_BACKLOG 5
#define MSG_SEND_LOCKED 0
#define MSG_SEND_UNLOCKED 1
#define POLL_STATE_IN 1
#define POLL_STATE_INOUT 2
static struct coroipcs_init_state_v2 *api = NULL;
DECLARE_LIST_INIT (conn_info_list_head);
DECLARE_LIST_INIT (conn_info_exit_list_head);
struct outq_item {
void *msg;
size_t mlen;
struct list_head list;
};
struct zcb_mapped {
struct list_head list;
void *addr;
size_t size;
};
#if _POSIX_THREAD_PROCESS_SHARED < 1
#if defined(_SEM_SEMUN_UNDEFINED)
union semun {
int val;
struct semid_ds *buf;
unsigned short int *array;
struct seminfo *__buf;
};
#endif
#endif
enum conn_state {
CONN_STATE_THREAD_INACTIVE = 0,
CONN_STATE_THREAD_ACTIVE = 1,
CONN_STATE_THREAD_REQUEST_EXIT = 2,
CONN_STATE_THREAD_DESTROYED = 3,
CONN_STATE_LIB_EXIT_CALLED = 4,
CONN_STATE_DISCONNECT_INACTIVE = 5
};
struct conn_info {
int fd;
pthread_t thread;
pid_t client_pid;
pthread_attr_t thread_attr;
unsigned int service;
enum conn_state state;
int refcount;
hdb_handle_t stats_handle;
#if _POSIX_THREAD_PROCESS_SHARED < 1
key_t semkey;
#endif
unsigned int pending_semops;
pthread_mutex_t mutex;
struct control_buffer *control_buffer;
char *request_buffer;
char *response_buffer;
char *dispatch_buffer;
size_t control_size;
size_t request_size;
size_t response_size;
size_t dispatch_size;
struct list_head outq_head;
void *private_data;
struct list_head list;
char setup_msg[sizeof (mar_req_setup_t)];
unsigned int setup_bytes_read;
struct list_head zcb_mapped_list_head;
char *sending_allowed_private_data[64];
int poll_state;
};
static int shared_mem_dispatch_bytes_left (const struct conn_info *conn_info);
static void outq_flush (struct conn_info *conn_info);
static int priv_change (struct conn_info *conn_info);
static void ipc_disconnect (struct conn_info *conn_info);
static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len,
int locked);
static void _corosync_ipc_init(void);
#define log_printf(level, format, args...) \
do { \
if (api->log_printf) \
api->log_printf ( \
LOGSYS_ENCODE_RECID(level, \
api->log_subsys_id, \
LOGSYS_RECID_LOG), \
__FUNCTION__, __FILE__, __LINE__, \
(const char *)format, ##args); \
else \
api->old_log_printf ((const char *)format, ##args); \
} while (0)
static hdb_handle_t dummy_stats_create_connection (
const char *name,
pid_t pid,
int fd)
{
return (0ULL);
}
static void dummy_stats_destroy_connection (
hdb_handle_t handle)
{
}
static void dummy_stats_update_value (
hdb_handle_t handle,
const char *name,
const void *value,
size_t value_size)
{
}
static void dummy_stats_increment_value (
hdb_handle_t handle,
const char *name)
{
}
static int
memory_map (
const char *path,
size_t bytes,
void **buf)
{
int32_t fd;
void *addr_orig;
void *addr;
int32_t res;
fd = open (path, O_RDWR, 0600);
unlink (path);
if (fd == -1) {
return (-1);
}
res = ftruncate (fd, bytes);
if (res == -1) {
goto error_close_unlink;
}
addr_orig = mmap (NULL, bytes, PROT_NONE,
MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
if (addr_orig == MAP_FAILED) {
goto error_close_unlink;
}
addr = mmap (addr_orig, bytes, PROT_READ | PROT_WRITE,
MAP_FIXED | MAP_SHARED, fd, 0);
if (addr != addr_orig) {
munmap(addr_orig, bytes);
goto error_close_unlink;
}
#ifdef COROSYNC_BSD
madvise(addr, bytes, MADV_NOSYNC);
#endif
res = close (fd);
if (res) {
return (-1);
}
*buf = addr_orig;
return (0);
error_close_unlink:
close (fd);
unlink(path);
return -1;
}
static int
circular_memory_map (
const char *path,
size_t bytes,
void **buf)
{
int32_t fd;
void *addr_orig;
void *addr;
int32_t res;
fd = open (path, O_RDWR, 0600);
unlink (path);
if (fd == -1) {
return (-1);
}
res = ftruncate (fd, bytes);
if (res == -1) {
goto error_close_unlink;
}
addr_orig = mmap (NULL, bytes << 1, PROT_NONE,
MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
if (addr_orig == MAP_FAILED) {
munmap(addr_orig, bytes);
goto error_close_unlink;
}
addr = mmap (addr_orig, bytes, PROT_READ | PROT_WRITE,
MAP_FIXED | MAP_SHARED, fd, 0);
if (addr != addr_orig) {
munmap(addr_orig, bytes);
goto error_close_unlink;
}
#ifdef COROSYNC_BSD
madvise(addr_orig, bytes, MADV_NOSYNC);
#endif
addr = mmap (((char *)addr_orig) + bytes,
bytes, PROT_READ | PROT_WRITE,
MAP_FIXED | MAP_SHARED, fd, 0);
if (addr == MAP_FAILED) {
munmap(addr_orig, bytes);
munmap(addr, bytes);
goto error_close_unlink;
}
#ifdef COROSYNC_BSD
madvise(((char *)addr_orig) + bytes, bytes, MADV_NOSYNC);
#endif
res = close (fd);
if (res) {
munmap(addr_orig, bytes);
munmap(addr, bytes);
return (-1);
}
*buf = addr_orig;
return (0);
error_close_unlink:
close (fd);
unlink(path);
return (-1);
}
static inline int
circular_memory_unmap (void *buf, size_t bytes)
{
int res;
res = munmap (buf, bytes << 1);
return (res);
}
static void flow_control_state_set (
struct conn_info *conn_info,
int flow_control_state)
{
if (conn_info->control_buffer->flow_control_enabled == flow_control_state) {
return;
}
if (flow_control_state == 0) {
log_printf (LOGSYS_LEVEL_DEBUG,
"Disabling flow control for %d\n",
conn_info->client_pid);
} else
if (flow_control_state == 1) {
log_printf (LOGSYS_LEVEL_DEBUG,
"Enabling flow control for %d\n",
conn_info->client_pid);
}
conn_info->control_buffer->flow_control_enabled = flow_control_state;
api->stats_update_value (conn_info->stats_handle,
"flow_control",
&flow_control_state,
sizeof(flow_control_state));
api->stats_increment_value (conn_info->stats_handle,
"flow_control_count");
}
static inline int zcb_free (struct zcb_mapped *zcb_mapped)
{
unsigned int res;
res = munmap (zcb_mapped->addr, zcb_mapped->size);
list_del (&zcb_mapped->list);
free (zcb_mapped);
return (res);
}
static inline int zcb_by_addr_free (struct conn_info *conn_info, void *addr)
{
struct list_head *list;
struct zcb_mapped *zcb_mapped;
unsigned int res = 0;
for (list = conn_info->zcb_mapped_list_head.next;
list != &conn_info->zcb_mapped_list_head; list = list->next) {
zcb_mapped = list_entry (list, struct zcb_mapped, list);
if (zcb_mapped->addr == addr) {
res = zcb_free (zcb_mapped);
break;
}
}
return (res);
}
static inline int zcb_all_free (
struct conn_info *conn_info)
{
struct list_head *list;
struct zcb_mapped *zcb_mapped;
for (list = conn_info->zcb_mapped_list_head.next;
list != &conn_info->zcb_mapped_list_head;) {
zcb_mapped = list_entry (list, struct zcb_mapped, list);
list = list->next;
zcb_free (zcb_mapped);
}
return (0);
}
static inline int zcb_alloc (
struct conn_info *conn_info,
const char *path_to_file,
size_t size,
void **addr)
{
struct zcb_mapped *zcb_mapped;
unsigned int res;
zcb_mapped = malloc (sizeof (struct zcb_mapped));
if (zcb_mapped == NULL) {
return (-1);
}
res = memory_map (
path_to_file,
size,
addr);
if (res == -1) {
free (zcb_mapped);
return (-1);
}
list_init (&zcb_mapped->list);
zcb_mapped->addr = *addr;
zcb_mapped->size = size;
list_add_tail (&zcb_mapped->list, &conn_info->zcb_mapped_list_head);
return (0);
}
static int ipc_thread_active (void *conn)
{
struct conn_info *conn_info = (struct conn_info *)conn;
int retval = 0;
pthread_mutex_lock (&conn_info->mutex);
if (conn_info->state == CONN_STATE_THREAD_ACTIVE) {
retval = 1;
}
pthread_mutex_unlock (&conn_info->mutex);
return (retval);
}
static int ipc_thread_exiting (void *conn)
{
struct conn_info *conn_info = (struct conn_info *)conn;
int retval = 1;
pthread_mutex_lock (&conn_info->mutex);
if (conn_info->state == CONN_STATE_THREAD_INACTIVE) {
retval = 0;
} else
if (conn_info->state == CONN_STATE_THREAD_ACTIVE) {
retval = 0;
}
pthread_mutex_unlock (&conn_info->mutex);
return (retval);
}
/*
* returns 0 if should be called again, -1 if finished
*/
static inline int conn_info_destroy (struct conn_info *conn_info)
{
unsigned int res;
void *retval;
list_del (&conn_info->list);
list_init (&conn_info->list);
list_add (&conn_info->list, &conn_info_exit_list_head);
if (conn_info->state == CONN_STATE_THREAD_REQUEST_EXIT) {
res = pthread_join (conn_info->thread, &retval);
conn_info->state = CONN_STATE_THREAD_DESTROYED;
return (0);
}
if (conn_info->state == CONN_STATE_THREAD_INACTIVE ||
conn_info->state == CONN_STATE_DISCONNECT_INACTIVE) {
list_del (&conn_info->list);
close (conn_info->fd);
api->free (conn_info);
return (-1);
}
if (conn_info->state == CONN_STATE_THREAD_ACTIVE) {
ipc_sem_post (conn_info->control_buffer, SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
return (0);
}
api->serialize_lock ();
/*
* Retry library exit function if busy
*/
if (conn_info->state == CONN_STATE_THREAD_DESTROYED) {
api->stats_destroy_connection (conn_info->stats_handle);
res = api->exit_fn_get (conn_info->service) (conn_info);
if (res == -1) {
api->serialize_unlock ();
return (0);
} else {
conn_info->state = CONN_STATE_LIB_EXIT_CALLED;
}
}
pthread_mutex_lock (&conn_info->mutex);
if (conn_info->refcount > 0) {
pthread_mutex_unlock (&conn_info->mutex);
api->serialize_unlock ();
return (0);
}
list_del (&conn_info->list);
pthread_mutex_unlock (&conn_info->mutex);
#if _POSIX_THREAD_PROCESS_SHARED > 0
sem_destroy (&conn_info->control_buffer->sem_request_or_flush_or_exit);
sem_destroy (&conn_info->control_buffer->sem_request);
sem_destroy (&conn_info->control_buffer->sem_response);
sem_destroy (&conn_info->control_buffer->sem_dispatch);
#else
semctl (conn_info->control_buffer->semid, 0, IPC_RMID);
#endif
/*
* Destroy shared memory segment and semaphore
*/
res = munmap ((void *)conn_info->control_buffer, conn_info->control_size);
res = munmap ((void *)conn_info->request_buffer, conn_info->request_size);
res = munmap ((void *)conn_info->response_buffer, conn_info->response_size);
/*
* Free allocated data needed to retry exiting library IPC connection
*/
if (conn_info->private_data) {
api->free (conn_info->private_data);
}
close (conn_info->fd);
res = circular_memory_unmap (conn_info->dispatch_buffer, conn_info->dispatch_size);
zcb_all_free (conn_info);
api->free (conn_info);
api->serialize_unlock ();
return (-1);
}
union u {
uint64_t server_addr;
void *server_ptr;
};
static uint64_t void2serveraddr (void *server_ptr)
{
union u u;
u.server_ptr = server_ptr;
return (u.server_addr);
}
static void *serveraddr2void (uint64_t server_addr)
{
union u u;
u.server_addr = server_addr;
return (u.server_ptr);
};
static inline void zerocopy_operations_process (
struct conn_info *conn_info,
coroipc_request_header_t **header_out,
unsigned int *new_message)
{
coroipc_request_header_t *header;
header = (coroipc_request_header_t *)conn_info->request_buffer;
if (header->id == ZC_ALLOC_HEADER) {
mar_req_coroipcc_zc_alloc_t *hdr = (mar_req_coroipcc_zc_alloc_t *)header;
coroipc_response_header_t res_header;
void *addr = NULL;
struct coroipcs_zc_header *zc_header;
unsigned int res;
res = zcb_alloc (conn_info, hdr->path_to_file, hdr->map_size,
&addr);
zc_header = (struct coroipcs_zc_header *)addr;
zc_header->server_address = void2serveraddr(addr);
res_header.size = sizeof (coroipc_response_header_t);
res_header.id = 0;
coroipcs_response_send (
conn_info, &res_header,
res_header.size);
*new_message = 0;
return;
} else
if (header->id == ZC_FREE_HEADER) {
mar_req_coroipcc_zc_free_t *hdr = (mar_req_coroipcc_zc_free_t *)header;
coroipc_response_header_t res_header;
void *addr = NULL;
addr = serveraddr2void (hdr->server_address);
zcb_by_addr_free (conn_info, addr);
res_header.size = sizeof (coroipc_response_header_t);
res_header.id = 0;
coroipcs_response_send (
conn_info, &res_header,
res_header.size);
*new_message = 0;
return;
} else
if (header->id == ZC_EXECUTE_HEADER) {
mar_req_coroipcc_zc_execute_t *hdr = (mar_req_coroipcc_zc_execute_t *)header;
header = (coroipc_request_header_t *)(((char *)serveraddr2void(hdr->server_address) + sizeof (struct coroipcs_zc_header)));
}
*header_out = header;
*new_message = 1;
}
static void *pthread_ipc_consumer (void *conn)
{
struct conn_info *conn_info = (struct conn_info *)conn;
int res;
coroipc_request_header_t *header;
coroipc_response_header_t coroipc_response_header;
int send_ok;
unsigned int new_message;
int sem_value = 0;
#if defined(HAVE_PTHREAD_SETSCHEDPARAM) && defined(HAVE_SCHED_GET_PRIORITY_MAX)
if (api->sched_policy != 0) {
res = pthread_setschedparam (conn_info->thread,
api->sched_policy, api->sched_param);
}
#endif
for (;;) {
ipc_sem_wait (conn_info->control_buffer, SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT, IPC_SEMWAIT_NOFILE);
if (ipc_thread_active (conn_info) == 0) {
coroipcs_refcount_dec (conn_info);
pthread_exit (0);
}
outq_flush (conn_info);
ipc_sem_getvalue (conn_info->control_buffer, SEMAPHORE_REQUEST, &sem_value);
if (sem_value > 0) {
res = ipc_sem_wait (conn_info->control_buffer, SEMAPHORE_REQUEST, IPC_SEMWAIT_NOFILE);
} else {
continue;
}
zerocopy_operations_process (conn_info, &header, &new_message);
/*
* There is no new message to process, continue for loop
*/
if (new_message == 0) {
continue;
}
coroipcs_refcount_inc (conn);
send_ok = api->sending_allowed (conn_info->service,
header->id,
header,
conn_info->sending_allowed_private_data);
/*
* This happens when the message contains some kind of invalid
* parameter, such as an invalid size
*/
if (send_ok == -1) {
coroipc_response_header.size = sizeof (coroipc_response_header_t);
coroipc_response_header.id = 0;
coroipc_response_header.error = CS_ERR_INVALID_PARAM;
coroipcs_response_send (conn_info,
&coroipc_response_header,
sizeof (coroipc_response_header_t));
} else
if (send_ok) {
api->serialize_lock();
api->stats_increment_value (conn_info->stats_handle, "requests");
api->handler_fn_get (conn_info->service, header->id) (conn_info, header);
api->serialize_unlock();
} else {
/*
* Overload, tell library to retry
*/
coroipc_response_header.size = sizeof (coroipc_response_header_t);
coroipc_response_header.id = 0;
coroipc_response_header.error = CS_ERR_TRY_AGAIN;
coroipcs_response_send (conn_info,
&coroipc_response_header,
sizeof (coroipc_response_header_t));
}
api->sending_allowed_release (conn_info->sending_allowed_private_data);
coroipcs_refcount_dec (conn);
}
pthread_exit (0);
}
static int
req_setup_send (
struct conn_info *conn_info,
int error)
{
mar_res_setup_t res_setup;
unsigned int res;
memset (&res_setup, 0, sizeof (res_setup));
res_setup.error = error;
retry_send:
res = send (conn_info->fd, &res_setup, sizeof (mar_res_setup_t), MSG_WAITALL);
if (res == -1 && errno == EINTR) {
api->stats_increment_value (conn_info->stats_handle, "send_retry_count");
goto retry_send;
} else
if (res == -1 && errno == EAGAIN) {
api->stats_increment_value (conn_info->stats_handle, "send_retry_count");
goto retry_send;
}
return (0);
}
-static int
+static cs_error_t
req_setup_recv (
struct conn_info *conn_info)
{
int res;
struct msghdr msg_recv;
struct iovec iov_recv;
- int authenticated = 0;
+ cs_error_t auth_res = CS_ERR_LIBRARY;
#ifdef COROSYNC_LINUX
struct cmsghdr *cmsg;
char cmsg_cred[CMSG_SPACE (sizeof (struct ucred))];
int off = 0;
int on = 1;
struct ucred *cred;
#endif
msg_recv.msg_flags = 0;
msg_recv.msg_iov = &iov_recv;
msg_recv.msg_iovlen = 1;
msg_recv.msg_name = 0;
msg_recv.msg_namelen = 0;
#ifdef COROSYNC_LINUX
msg_recv.msg_control = (void *)cmsg_cred;
msg_recv.msg_controllen = sizeof (cmsg_cred);
#endif
#ifdef COROSYNC_SOLARIS
msg_recv.msg_accrights = 0;
msg_recv.msg_accrightslen = 0;
#endif /* COROSYNC_SOLARIS */
iov_recv.iov_base = &conn_info->setup_msg[conn_info->setup_bytes_read];
iov_recv.iov_len = sizeof (mar_req_setup_t) - conn_info->setup_bytes_read;
#ifdef COROSYNC_LINUX
setsockopt(conn_info->fd, SOL_SOCKET, SO_PASSCRED, &on, sizeof (on));
#endif
retry_recv:
res = recvmsg (conn_info->fd, &msg_recv, MSG_NOSIGNAL);
if (res == -1 && errno == EINTR) {
api->stats_increment_value (conn_info->stats_handle, "recv_retry_count");
goto retry_recv;
} else
if (res == -1 && errno != EAGAIN) {
- return (0);
+ return (CS_ERR_LIBRARY);
} else
if (res == 0) {
#if defined(COROSYNC_SOLARIS) || defined(COROSYNC_BSD) || defined(COROSYNC_DARWIN)
/* On many OS poll never return POLLHUP or POLLERR.
* EOF is detected when recvmsg return 0.
*/
ipc_disconnect (conn_info);
- return 0;
+ return (CS_ERR_LIBRARY);
#else
- return (-1);
+ return (CS_ERR_SECURITY);
#endif
}
conn_info->setup_bytes_read += res;
/*
* currently support getpeerucred, getpeereid, and SO_PASSCRED credential
* retrieval mechanisms for various Platforms
*/
#ifdef HAVE_GETPEERUCRED
/*
* Solaris and some BSD systems
*/
{
ucred_t *uc = NULL;
uid_t euid = -1;
gid_t egid = -1;
if (getpeerucred (conn_info->fd, &uc) == 0) {
euid = ucred_geteuid (uc);
egid = ucred_getegid (uc);
conn_info->client_pid = ucred_getpid (uc);
if (api->security_valid (euid, egid)) {
- authenticated = 1;
+ auth_res = CS_OK;
+ } else {
+ auth_res = hdb_error_to_cs(errno);
}
ucred_free(uc);
}
}
#elif HAVE_GETPEEREID
/*
* Usually MacOSX systems
*/
{
uid_t euid;
gid_t egid;
/*
* TODO get the peer's pid.
* conn_info->client_pid = ?;
*/
euid = -1;
egid = -1;
if (getpeereid (conn_info->fd, &euid, &egid) == 0) {
if (api->security_valid (euid, egid)) {
- authenticated = 1;
+ auth_res = CS_OK;
+ } else {
+ auth_res = hdb_error_to_cs(errno);
}
}
}
#elif SO_PASSCRED
/*
* Usually Linux systems
*/
cmsg = CMSG_FIRSTHDR (&msg_recv);
assert (cmsg);
cred = (struct ucred *)CMSG_DATA (cmsg);
if (cred) {
conn_info->client_pid = cred->pid;
if (api->security_valid (cred->uid, cred->gid)) {
- authenticated = 1;
+ auth_res = CS_OK;
+ } else {
+ auth_res = hdb_error_to_cs(errno);
}
}
#else /* no credentials */
- authenticated = 1;
- log_printf (LOGSYS_LEVEL_ERROR, "Platform does not support IPC authentication. Using no authentication\n");
+ auth_res = CS_OK;
+ log_printf (LOGSYS_LEVEL_ERROR, "Platform does not support IPC authentication. Using no authentication\n");
#endif /* no credentials */
- if (authenticated == 0) {
- log_printf (LOGSYS_LEVEL_ERROR, "Invalid IPC credentials.\n");
+ if (auth_res != CS_OK) {
ipc_disconnect (conn_info);
- return (-1);
- }
+ if (auth_res == CS_ERR_NO_RESOURCES) {
+ log_printf (LOGSYS_LEVEL_ERROR,
+ "Not enough file desciptors for IPC connection.\n");
+ } else {
+ log_printf (LOGSYS_LEVEL_ERROR, "Invalid IPC credentials.\n");
+ }
+ return auth_res;
+ }
if (conn_info->setup_bytes_read == sizeof (mar_req_setup_t)) {
#ifdef COROSYNC_LINUX
setsockopt(conn_info->fd, SOL_SOCKET, SO_PASSCRED,
&off, sizeof (off));
#endif
- return (1);
+ return (CS_OK);
}
- return (0);
+ return (CS_ERR_LIBRARY);
}
static void ipc_disconnect (struct conn_info *conn_info)
{
if (conn_info->state == CONN_STATE_THREAD_INACTIVE) {
conn_info->state = CONN_STATE_DISCONNECT_INACTIVE;
return;
}
if (conn_info->state != CONN_STATE_THREAD_ACTIVE) {
return;
}
pthread_mutex_lock (&conn_info->mutex);
conn_info->state = CONN_STATE_THREAD_REQUEST_EXIT;
pthread_mutex_unlock (&conn_info->mutex);
ipc_sem_post (conn_info->control_buffer, SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
}
static int conn_info_create (int fd)
{
struct conn_info *conn_info;
conn_info = api->malloc (sizeof (struct conn_info));
if (conn_info == NULL) {
return (-1);
}
memset (conn_info, 0, sizeof (struct conn_info));
conn_info->fd = fd;
conn_info->client_pid = 0;
conn_info->service = SOCKET_SERVICE_INIT;
conn_info->state = CONN_STATE_THREAD_INACTIVE;
conn_info->poll_state = POLL_STATE_IN;
list_init (&conn_info->outq_head);
list_init (&conn_info->list);
list_init (&conn_info->zcb_mapped_list_head);
list_add (&conn_info->list, &conn_info_list_head);
api->poll_dispatch_add (fd, conn_info);
return (0);
}
#if defined(COROSYNC_LINUX) || defined(COROSYNC_SOLARIS)
/* SUN_LEN is broken for abstract namespace
*/
#define COROSYNC_SUN_LEN(a) sizeof(*(a))
#else
#define COROSYNC_SUN_LEN(a) SUN_LEN(a)
#endif
/*
* Exported functions
*/
extern void coroipcs_ipc_init_v2 (
struct coroipcs_init_state_v2 *init_state_v2)
{
api = init_state_v2;
api->old_log_printf = NULL;
log_printf (LOGSYS_LEVEL_DEBUG, "you are using ipc api v2\n");
_corosync_ipc_init ();
}
extern void coroipcs_ipc_init (
struct coroipcs_init_state *init_state)
{
api = calloc (sizeof(struct coroipcs_init_state_v2), 1);
/* v2 api */
api->stats_create_connection = dummy_stats_create_connection;
api->stats_destroy_connection = dummy_stats_destroy_connection;
api->stats_update_value = dummy_stats_update_value;
api->stats_increment_value = dummy_stats_increment_value;
api->log_printf = NULL;
/* v1 api */
api->socket_name = init_state->socket_name;
api->sched_policy = init_state->sched_policy;
api->sched_param = init_state->sched_param;
api->malloc = init_state->malloc;
api->free = init_state->free;
api->old_log_printf = init_state->log_printf;
api->fatal_error = init_state->fatal_error;
api->security_valid = init_state->security_valid;
api->service_available = init_state->service_available;
api->private_data_size_get = init_state->private_data_size_get;
api->serialize_lock = init_state->serialize_lock;
api->serialize_unlock = init_state->serialize_unlock;
api->sending_allowed = init_state->sending_allowed;
api->sending_allowed_release = init_state->sending_allowed_release;
api->poll_accept_add = init_state->poll_accept_add;
api->poll_dispatch_add = init_state->poll_dispatch_add;
api->poll_dispatch_modify = init_state->poll_dispatch_modify;
api->init_fn_get = init_state->init_fn_get;
api->exit_fn_get = init_state->exit_fn_get;
api->handler_fn_get = init_state->handler_fn_get;
log_printf (LOGSYS_LEVEL_DEBUG, "you are using ipc api v1\n");
_corosync_ipc_init ();
}
static void _corosync_ipc_init(void)
{
int server_fd;
struct sockaddr_un un_addr;
int res;
/*
* Create socket for IPC clients, name socket, listen for connections
*/
#if defined(COROSYNC_SOLARIS)
server_fd = socket (PF_UNIX, SOCK_STREAM, 0);
#else
server_fd = socket (PF_LOCAL, SOCK_STREAM, 0);
#endif
if (server_fd == -1) {
log_printf (LOGSYS_LEVEL_CRIT, "Cannot create client connections socket.\n");
api->fatal_error ("Can't create library listen socket");
}
res = fcntl (server_fd, F_SETFL, O_NONBLOCK);
if (res == -1) {
char error_str[100];
strerror_r (errno, error_str, 100);
log_printf (LOGSYS_LEVEL_CRIT, "Could not set non-blocking operation on server socket: %s\n", error_str);
api->fatal_error ("Could not set non-blocking operation on server socket");
}
memset (&un_addr, 0, sizeof (struct sockaddr_un));
un_addr.sun_family = AF_UNIX;
#if defined(COROSYNC_BSD) || defined(COROSYNC_DARWIN)
un_addr.sun_len = SUN_LEN(&un_addr);
#endif
#if defined(COROSYNC_LINUX)
sprintf (un_addr.sun_path + 1, "%s", api->socket_name);
#else
{
struct stat stat_out;
res = stat (SOCKETDIR, &stat_out);
if (res == -1 || (res == 0 && !S_ISDIR(stat_out.st_mode))) {
log_printf (LOGSYS_LEVEL_CRIT, "Required directory not present %s\n", SOCKETDIR);
api->fatal_error ("Please create required directory.");
}
sprintf (un_addr.sun_path, "%s/%s", SOCKETDIR, api->socket_name);
unlink (un_addr.sun_path);
}
#endif
res = bind (server_fd, (struct sockaddr *)&un_addr, COROSYNC_SUN_LEN(&un_addr));
if (res) {
char error_str[100];
strerror_r (errno, error_str, 100);
log_printf (LOGSYS_LEVEL_CRIT, "Could not bind AF_UNIX (%s): %s.\n", un_addr.sun_path, error_str);
api->fatal_error ("Could not bind to AF_UNIX socket\n");
}
/*
* Allow eveyrone to write to the socket since the IPC layer handles
* security automatically
*/
#if !defined(COROSYNC_LINUX)
res = chmod (un_addr.sun_path, S_IRWXU|S_IRWXG|S_IRWXO);
#endif
listen (server_fd, SERVER_BACKLOG);
/*
* Setup connection dispatch routine
*/
api->poll_accept_add (server_fd);
}
void coroipcs_ipc_exit (void)
{
struct list_head *list;
struct conn_info *conn_info;
unsigned int res;
for (list = conn_info_list_head.next; list != &conn_info_list_head;
list = list->next) {
conn_info = list_entry (list, struct conn_info, list);
if (conn_info->state != CONN_STATE_THREAD_ACTIVE)
continue;
ipc_disconnect (conn_info);
#if _POSIX_THREAD_PROCESS_SHARED > 0
sem_destroy (&conn_info->control_buffer->sem_request_or_flush_or_exit);
sem_destroy (&conn_info->control_buffer->sem_request);
sem_destroy (&conn_info->control_buffer->sem_response);
sem_destroy (&conn_info->control_buffer->sem_dispatch);
#else
semctl (conn_info->control_buffer->semid, 0, IPC_RMID);
#endif
/*
* Unmap memory segments
*/
res = munmap ((void *)conn_info->control_buffer,
conn_info->control_size);
res = munmap ((void *)conn_info->request_buffer,
conn_info->request_size);
res = munmap ((void *)conn_info->response_buffer,
conn_info->response_size);
res = circular_memory_unmap (conn_info->dispatch_buffer,
conn_info->dispatch_size);
}
}
int coroipcs_ipc_service_exit (unsigned int service)
{
struct list_head *list, *list_next;
struct conn_info *conn_info;
for (list = conn_info_list_head.next; list != &conn_info_list_head;
list = list_next) {
list_next = list->next;
conn_info = list_entry (list, struct conn_info, list);
if (conn_info->service != service ||
(conn_info->state != CONN_STATE_THREAD_ACTIVE && conn_info->state != CONN_STATE_THREAD_REQUEST_EXIT)) {
continue;
}
ipc_disconnect (conn_info);
api->poll_dispatch_destroy (conn_info->fd, NULL);
while (conn_info_destroy (conn_info) != -1)
;
/*
* We will return to prevent token loss. Schedwrk will call us again.
*/
return (-1);
}
/*
* No conn info left in active list. We will traverse thru exit list. If there is any
* conn_info->service == service, we will wait to proper end -> return -1
*/
for (list = conn_info_exit_list_head.next; list != &conn_info_exit_list_head; list = list->next) {
conn_info = list_entry (list, struct conn_info, list);
if (conn_info->service == service) {
return (-1);
}
}
return (0);
}
/*
* Get the conn info private data
*/
void *coroipcs_private_data_get (void *conn)
{
struct conn_info *conn_info = (struct conn_info *)conn;
return (conn_info->private_data);
}
int coroipcs_response_send (void *conn, const void *msg, size_t mlen)
{
struct conn_info *conn_info = (struct conn_info *)conn;
memcpy (conn_info->response_buffer, msg, mlen);
ipc_sem_post (conn_info->control_buffer, SEMAPHORE_RESPONSE);
api->stats_increment_value (conn_info->stats_handle, "responses");
return (0);
}
int coroipcs_response_iov_send (void *conn, const struct iovec *iov, unsigned int iov_len)
{
struct conn_info *conn_info = (struct conn_info *)conn;
int write_idx = 0;
int i;
for (i = 0; i < iov_len; i++) {
memcpy (&conn_info->response_buffer[write_idx],
iov[i].iov_base, iov[i].iov_len);
write_idx += iov[i].iov_len;
}
ipc_sem_post (conn_info->control_buffer, SEMAPHORE_RESPONSE);
api->stats_increment_value (conn_info->stats_handle, "responses");
return (0);
}
static int shared_mem_dispatch_bytes_left (const struct conn_info *conn_info)
{
unsigned int n_read;
unsigned int n_write;
unsigned int bytes_left;
n_read = conn_info->control_buffer->read;
n_write = conn_info->control_buffer->write;
if (n_read <= n_write) {
bytes_left = conn_info->dispatch_size - n_write + n_read;
} else {
bytes_left = n_read - n_write;
}
if (bytes_left > 0) {
bytes_left--;
}
return (bytes_left);
}
static void memcpy_dwrap (struct conn_info *conn_info, void *msg, unsigned int len)
{
unsigned int write_idx;
write_idx = conn_info->control_buffer->write;
memcpy (&conn_info->dispatch_buffer[write_idx], msg, len);
conn_info->control_buffer->write = (write_idx + len) % conn_info->dispatch_size;
}
static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len,
int locked)
{
struct conn_info *conn_info = (struct conn_info *)conn;
int res;
int i;
char buf;
for (i = 0; i < iov_len; i++) {
memcpy_dwrap (conn_info, iov[i].iov_base, iov[i].iov_len);
}
buf = list_empty (&conn_info->outq_head);
res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
if (res != 1) {
conn_info->pending_semops += 1;
if (conn_info->poll_state == POLL_STATE_IN) {
conn_info->poll_state = POLL_STATE_INOUT;
api->poll_dispatch_modify (conn_info->fd,
POLLIN|POLLOUT|POLLNVAL);
}
}
ipc_sem_post (conn_info->control_buffer, SEMAPHORE_DISPATCH);
api->stats_increment_value (conn_info->stats_handle, "dispatched");
}
static void outq_flush (struct conn_info *conn_info) {
struct list_head *list, *list_next;
struct outq_item *outq_item;
unsigned int bytes_left;
struct iovec iov;
pthread_mutex_lock (&conn_info->mutex);
if (list_empty (&conn_info->outq_head)) {
flow_control_state_set (conn_info, 0);
pthread_mutex_unlock (&conn_info->mutex);
return;
}
for (list = conn_info->outq_head.next;
list != &conn_info->outq_head; list = list_next) {
list_next = list->next;
outq_item = list_entry (list, struct outq_item, list);
bytes_left = shared_mem_dispatch_bytes_left (conn_info);
if (bytes_left > outq_item->mlen) {
iov.iov_base = outq_item->msg;
iov.iov_len = outq_item->mlen;
msg_send (conn_info, &iov, 1, MSG_SEND_UNLOCKED);
list_del (list);
api->free (iov.iov_base);
api->free (outq_item);
api->stats_decrement_value (conn_info->stats_handle, "queue_size");
} else {
break;
}
}
pthread_mutex_unlock (&conn_info->mutex);
}
static int priv_change (struct conn_info *conn_info)
{
mar_req_priv_change req_priv_change;
unsigned int res;
#if _POSIX_THREAD_PROCESS_SHARED < 1
union semun semun;
struct semid_ds ipc_set;
int i;
#endif
retry_recv:
res = recv (conn_info->fd, &req_priv_change,
sizeof (mar_req_priv_change),
MSG_NOSIGNAL);
if (res == -1 && errno == EINTR) {
api->stats_increment_value (conn_info->stats_handle, "recv_retry_count");
goto retry_recv;
}
if (res == -1 && errno == EAGAIN) {
api->stats_increment_value (conn_info->stats_handle, "recv_retry_count");
goto retry_recv;
}
if (res == -1 && errno != EAGAIN) {
return (-1);
}
#if defined(COROSYNC_SOLARIS) || defined(COROSYNC_BSD) || defined(COROSYNC_DARWIN)
/* Error on socket, EOF is detected when recv return 0
*/
if (res == 0) {
return (-1);
}
#endif
#if _POSIX_THREAD_PROCESS_SHARED < 1
ipc_set.sem_perm.uid = req_priv_change.euid;
ipc_set.sem_perm.gid = req_priv_change.egid;
ipc_set.sem_perm.mode = 0600;
semun.buf = &ipc_set;
for (i = 0; i < 3; i++) {
res = semctl (conn_info->control_buffer->semid, 0, IPC_SET, semun);
if (res == -1) {
return (-1);
}
}
#endif
return (0);
}
static void msg_send_or_queue (void *conn, const struct iovec *iov, unsigned int iov_len)
{
struct conn_info *conn_info = (struct conn_info *)conn;
unsigned int bytes_left;
unsigned int bytes_msg = 0;
int i;
struct outq_item *outq_item;
char *write_buf = 0;
/*
* Exit transmission if the connection is dead
*/
if (ipc_thread_active (conn) == 0) {
return;
}
bytes_left = shared_mem_dispatch_bytes_left (conn_info);
for (i = 0; i < iov_len; i++) {
bytes_msg += iov[i].iov_len;
}
if (bytes_left < bytes_msg || list_empty (&conn_info->outq_head) == 0) {
flow_control_state_set (conn_info, 1);
outq_item = api->malloc (sizeof (struct outq_item));
if (outq_item == NULL) {
ipc_disconnect (conn);
return;
}
outq_item->msg = api->malloc (bytes_msg);
if (outq_item->msg == 0) {
api->free (outq_item);
ipc_disconnect (conn);
return;
}
write_buf = outq_item->msg;
for (i = 0; i < iov_len; i++) {
memcpy (write_buf, iov[i].iov_base, iov[i].iov_len);
write_buf += iov[i].iov_len;
}
outq_item->mlen = bytes_msg;
list_init (&outq_item->list);
pthread_mutex_lock (&conn_info->mutex);
list_add_tail (&outq_item->list, &conn_info->outq_head);
pthread_mutex_unlock (&conn_info->mutex);
api->stats_increment_value (conn_info->stats_handle, "queue_size");
return;
}
msg_send (conn, iov, iov_len, MSG_SEND_LOCKED);
}
void coroipcs_refcount_inc (void *conn)
{
struct conn_info *conn_info = (struct conn_info *)conn;
pthread_mutex_lock (&conn_info->mutex);
conn_info->refcount++;
pthread_mutex_unlock (&conn_info->mutex);
}
void coroipcs_refcount_dec (void *conn)
{
struct conn_info *conn_info = (struct conn_info *)conn;
pthread_mutex_lock (&conn_info->mutex);
conn_info->refcount--;
pthread_mutex_unlock (&conn_info->mutex);
}
int coroipcs_dispatch_send (void *conn, const void *msg, size_t mlen)
{
struct iovec iov;
iov.iov_base = (void *)msg;
iov.iov_len = mlen;
msg_send_or_queue (conn, &iov, 1);
return (0);
}
int coroipcs_dispatch_iov_send (void *conn, const struct iovec *iov, unsigned int iov_len)
{
msg_send_or_queue (conn, iov, iov_len);
return (0);
}
int coroipcs_handler_accept (
int fd,
int revent,
void *data)
{
socklen_t addrlen;
struct sockaddr_un un_addr;
int new_fd;
#ifdef COROSYNC_LINUX
int on = 1;
#endif
int res;
addrlen = sizeof (struct sockaddr_un);
retry_accept:
new_fd = accept (fd, (struct sockaddr *)&un_addr, &addrlen);
if (new_fd == -1 && errno == EINTR) {
goto retry_accept;
}
if (new_fd == -1) {
char error_str[100];
strerror_r (errno, error_str, 100);
log_printf (LOGSYS_LEVEL_ERROR,
"Could not accept Library connection: %s\n", error_str);
return (0); /* This is an error, but -1 would indicate disconnect from poll loop */
}
res = fcntl (new_fd, F_SETFL, O_NONBLOCK);
if (res == -1) {
char error_str[100];
strerror_r (errno, error_str, 100);
log_printf (LOGSYS_LEVEL_ERROR,
"Could not set non-blocking operation on library connection: %s\n",
error_str);
close (new_fd);
return (0); /* This is an error, but -1 would indicate disconnect from poll loop */
}
/*
* Valid accept
*/
/*
* Request credentials of sender provided by kernel
*/
#ifdef COROSYNC_LINUX
setsockopt(new_fd, SOL_SOCKET, SO_PASSCRED, &on, sizeof (on));
#endif
res = conn_info_create (new_fd);
if (res != 0) {
close (new_fd);
}
return (0);
}
static char * pid_to_name (pid_t pid, char *out_name, size_t name_len)
{
char *name;
char *rest;
FILE *fp;
char fname[32];
char buf[256];
snprintf (fname, 32, "/proc/%d/stat", pid);
fp = fopen (fname, "r");
if (!fp) {
return NULL;
}
if (fgets (buf, sizeof (buf), fp) == NULL) {
fclose (fp);
return NULL;
}
fclose (fp);
name = strrchr (buf, '(');
if (!name) {
return NULL;
}
/* move past the bracket */
name++;
rest = strrchr (buf, ')');
if (rest == NULL || rest[1] != ' ') {
return NULL;
}
*rest = '\0';
/* move past the NULL and space */
rest += 2;
/* copy the name */
strncpy (out_name, name, name_len);
out_name[name_len - 1] = '\0';
return out_name;
}
static void coroipcs_init_conn_stats (
struct conn_info *conn)
{
char conn_name[42];
char proc_name[32];
if (conn->client_pid > 0) {
if (pid_to_name (conn->client_pid, proc_name, sizeof(proc_name)))
snprintf (conn_name, sizeof(conn_name), "%s:%d:%d", proc_name, conn->client_pid, conn->fd);
else
snprintf (conn_name, sizeof(conn_name), "%d:%d", conn->client_pid, conn->fd);
} else
snprintf (conn_name, sizeof(conn_name), "%d", conn->fd);
conn->stats_handle = api->stats_create_connection (conn_name, conn->client_pid, conn->fd);
api->stats_update_value (conn->stats_handle, "service_id",
&conn->service, sizeof(conn->service));
}
int coroipcs_handler_dispatch (
int fd,
int revent,
void *context)
{
mar_req_setup_t *req_setup;
struct conn_info *conn_info = (struct conn_info *)context;
int res;
char buf;
if (ipc_thread_exiting (conn_info)) {
return conn_info_destroy (conn_info);
}
/*
* If an error occurs, request exit
*/
if (revent & (POLLERR|POLLHUP)) {
ipc_disconnect (conn_info);
return (0);
}
/*
* Read the header and process it
*/
if (conn_info->service == SOCKET_SERVICE_INIT && (revent & POLLIN)) {
/*
* Receive in a nonblocking fashion the request
* IF security invalid, send ERR_SECURITY, otherwise
* send OK
*/
res = req_setup_recv (conn_info);
- if (res == -1) {
- req_setup_send (conn_info, CS_ERR_SECURITY);
+ if (res != CS_OK && res != CS_ERR_LIBRARY) {
+ req_setup_send (conn_info, res);
}
- if (res != 1) {
+ if (res != CS_OK) {
return (0);
}
pthread_mutex_init (&conn_info->mutex, NULL);
req_setup = (mar_req_setup_t *)conn_info->setup_msg;
/*
* Is the service registered ?
*/
if (api->service_available (req_setup->service) == 0) {
req_setup_send (conn_info, CS_ERR_NOT_EXIST);
ipc_disconnect (conn_info);
return (0);
}
req_setup_send (conn_info, CS_OK);
#if _POSIX_THREAD_PROCESS_SHARED < 1
conn_info->semkey = req_setup->semkey;
#endif
res = memory_map (
req_setup->control_file,
req_setup->control_size,
(void *)&conn_info->control_buffer);
conn_info->control_size = req_setup->control_size;
res = memory_map (
req_setup->request_file,
req_setup->request_size,
(void *)&conn_info->request_buffer);
conn_info->request_size = req_setup->request_size;
res = memory_map (
req_setup->response_file,
req_setup->response_size,
(void *)&conn_info->response_buffer);
conn_info->response_size = req_setup->response_size;
res = circular_memory_map (
req_setup->dispatch_file,
req_setup->dispatch_size,
(void *)&conn_info->dispatch_buffer);
conn_info->dispatch_size = req_setup->dispatch_size;
conn_info->service = req_setup->service;
conn_info->refcount = 0;
conn_info->setup_bytes_read = 0;
#if _POSIX_THREAD_PROCESS_SHARED < 1
conn_info->control_buffer->semid = semget (conn_info->semkey, 3, 0600);
#endif
conn_info->pending_semops = 0;
/*
* ipc thread is the only reference at startup
*/
conn_info->refcount = 1;
conn_info->state = CONN_STATE_THREAD_ACTIVE;
conn_info->private_data = api->malloc (api->private_data_size_get (conn_info->service));
memset (conn_info->private_data, 0,
api->private_data_size_get (conn_info->service));
api->init_fn_get (conn_info->service) (conn_info);
/* create stats objects */
coroipcs_init_conn_stats (conn_info);
pthread_attr_init (&conn_info->thread_attr);
/*
* IA64 needs more stack space then other arches
*/
#if defined(__ia64__)
pthread_attr_setstacksize (&conn_info->thread_attr, 400000);
#else
pthread_attr_setstacksize (&conn_info->thread_attr, 200000);
#endif
pthread_attr_setdetachstate (&conn_info->thread_attr, PTHREAD_CREATE_JOINABLE);
res = pthread_create (&conn_info->thread,
&conn_info->thread_attr,
pthread_ipc_consumer,
conn_info);
/*
* Security check - disallow multiple configurations of
* the ipc connection
*/
if (conn_info->service == SOCKET_SERVICE_INIT) {
conn_info->service = -1;
}
} else
if (revent & POLLIN) {
coroipcs_refcount_inc (conn_info);
res = recv (fd, &buf, 1, MSG_NOSIGNAL);
if (res == 1) {
switch (buf) {
case MESSAGE_REQ_CHANGE_EUID:
if (priv_change (conn_info) == -1) {
ipc_disconnect (conn_info);
}
break;
default:
res = 0;
break;
}
}
#if defined(COROSYNC_SOLARIS) || defined(COROSYNC_BSD) || defined(COROSYNC_DARWIN)
/* On many OS poll never return POLLHUP or POLLERR.
* EOF is detected when recvmsg return 0.
*/
if (res == 0) {
ipc_disconnect (conn_info);
coroipcs_refcount_dec (conn_info);
return (0);
}
#endif
coroipcs_refcount_dec (conn_info);
}
if (revent & POLLOUT) {
int psop = conn_info->pending_semops;
int i;
assert (psop != 0);
for (i = 0; i < psop; i++) {
res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
if (res != 1) {
return (0);
} else {
conn_info->pending_semops -= 1;
}
}
if (conn_info->poll_state == POLL_STATE_INOUT) {
conn_info->poll_state = POLL_STATE_IN;
api->poll_dispatch_modify (conn_info->fd, POLLIN|POLLNVAL);
}
}
return (0);
}
diff --git a/exec/main.c b/exec/main.c
index a3a2f00d..9d7c21b7 100644
--- a/exec/main.c
+++ b/exec/main.c
@@ -1,1682 +1,1684 @@
/*
* Copyright (c) 2002-2006 MontaVista Software, Inc.
* Copyright (c) 2006-2009 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Steven Dake (sdake@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <config.h>
#include <pthread.h>
#include <assert.h>
#include <sys/types.h>
#include <sys/file.h>
#include <sys/poll.h>
#include <sys/uio.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/stat.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <signal.h>
#include <sched.h>
#include <time.h>
#include <semaphore.h>
#include <corosync/swab.h>
#include <corosync/corotypes.h>
#include <corosync/coroipc_types.h>
#include <corosync/corodefs.h>
#include <corosync/list.h>
#include <corosync/lcr/lcr_ifact.h>
#include <corosync/totem/coropoll.h>
#include <corosync/totem/totempg.h>
#include <corosync/engine/objdb.h>
#include <corosync/engine/config.h>
#include <corosync/engine/logsys.h>
#include <corosync/coroipcs.h>
#include "quorum.h"
#include "totemsrp.h"
#include "mainconfig.h"
#include "totemconfig.h"
#include "main.h"
#include "sync.h"
#include "syncv2.h"
#include "tlist.h"
#include "timer.h"
#include "util.h"
#include "apidef.h"
#include "service.h"
#include "schedwrk.h"
#include "evil.h"
LOGSYS_DECLARE_SYSTEM ("corosync",
LOGSYS_MODE_OUTPUT_STDERR | LOGSYS_MODE_THREADED | LOGSYS_MODE_FORK,
0,
NULL,
LOG_INFO,
LOG_DAEMON,
LOG_INFO,
NULL,
1000000);
LOGSYS_DECLARE_SUBSYS ("MAIN");
#define SERVER_BACKLOG 5
static int sched_priority = 0;
static unsigned int service_count = 32;
#if defined(HAVE_PTHREAD_SPIN_LOCK)
static pthread_spinlock_t serialize_spin;
#else
static pthread_mutex_t serialize_mutex = PTHREAD_MUTEX_INITIALIZER;
#endif
static struct totem_logging_configuration totem_logging_configuration;
static int num_config_modules;
static struct config_iface_ver0 *config_modules[MAX_DYNAMIC_SERVICES];
static struct objdb_iface_ver0 *objdb = NULL;
static struct corosync_api_v1 *api = NULL;
static enum cs_sync_mode minimum_sync_mode;
static int sync_in_process = 1;
static hdb_handle_t corosync_poll_handle;
struct sched_param global_sched_param;
static hdb_handle_t object_connection_handle;
static corosync_timer_handle_t corosync_stats_timer_handle;
static pthread_t corosync_exit_thread;
static sem_t corosync_exit_sem;
static const char *corosync_lock_file = LOCALSTATEDIR"/run/corosync.pid";
static int32_t corosync_not_enough_fds_left = 0;
static void serialize_unlock (void);
hdb_handle_t corosync_poll_handle_get (void)
{
return (corosync_poll_handle);
}
void corosync_state_dump (void)
{
int i;
for (i = 0; i < SERVICE_HANDLER_MAXIMUM_COUNT; i++) {
if (ais_service[i] && ais_service[i]->exec_dump_fn) {
ais_service[i]->exec_dump_fn ();
}
}
}
static void unlink_all_completed (void)
{
/*
* The schedwrk_do API takes the global serializer lock
* but doesn't release it because this exit callback is called
* before it finishes. Since we know we are exiting, we unlock it
* here
*/
serialize_unlock ();
api->timer_delete (corosync_stats_timer_handle);
poll_stop (corosync_poll_handle);
totempg_finalize ();
/*
* Remove pid lock file
*/
unlink (corosync_lock_file);
corosync_exit_error (AIS_DONE_EXIT);
}
void corosync_shutdown_request (void)
{
static int called = 0;
if (called) {
return;
}
if (called == 0) {
called = 1;
}
sem_post (&corosync_exit_sem);
}
static void *corosync_exit_thread_handler (void *arg)
{
sem_wait (&corosync_exit_sem);
corosync_service_unlink_all (api, unlink_all_completed);
return arg;
}
static void sigusr2_handler (int num)
{
/*
* TODO remove this from sigusr2 handler and access via cfg service
* engine api - corosync-cfgtool
*/
corosync_state_dump ();
}
static void sigterm_handler (int num)
{
corosync_shutdown_request ();
}
static void sigquit_handler (int num)
{
corosync_shutdown_request ();
}
static void sigintr_handler (int num)
{
corosync_shutdown_request ();
}
static void sigsegv_handler (int num)
{
(void)signal (SIGSEGV, SIG_DFL);
logsys_atexit();
logsys_log_rec_store (LOCALSTATEDIR "/lib/corosync/fdata");
raise (SIGSEGV);
}
static void sigabrt_handler (int num)
{
(void)signal (SIGABRT, SIG_DFL);
logsys_atexit();
logsys_log_rec_store (LOCALSTATEDIR "/lib/corosync/fdata");
raise (SIGABRT);
}
#define LOCALHOST_IP inet_addr("127.0.0.1")
static hdb_handle_t corosync_group_handle;
static struct totempg_group corosync_group = {
.group = "a",
.group_len = 1
};
#if defined(HAVE_PTHREAD_SPIN_LOCK)
static void serialize_lock (void)
{
pthread_spin_lock (&serialize_spin);
}
static void serialize_unlock (void)
{
pthread_spin_unlock (&serialize_spin);
}
#else
static void serialize_lock (void)
{
pthread_mutex_lock (&serialize_mutex);
}
static void serialize_unlock (void)
{
pthread_mutex_unlock (&serialize_mutex);
}
#endif
static void corosync_sync_completed (void)
{
log_printf (LOGSYS_LEVEL_NOTICE,
"Completed service synchronization, ready to provide service.\n");
sync_in_process = 0;
}
static int corosync_sync_callbacks_retrieve (int sync_id,
struct sync_callbacks *callbacks)
{
unsigned int ais_service_index;
int res;
for (ais_service_index = 0;
ais_service_index < SERVICE_HANDLER_MAXIMUM_COUNT;
ais_service_index++) {
if (ais_service[ais_service_index] != NULL
&& (ais_service[ais_service_index]->sync_mode == CS_SYNC_V1
|| ais_service[ais_service_index]->sync_mode == CS_SYNC_V1_APIV2)) {
if (ais_service_index == sync_id) {
break;
}
}
}
/*
* Try to load backwards compat sync engines
*/
if (ais_service_index == SERVICE_HANDLER_MAXIMUM_COUNT) {
res = evil_callbacks_load (sync_id, callbacks);
return (res);
}
callbacks->name = ais_service[ais_service_index]->name;
callbacks->sync_init_api.sync_init_v1 = ais_service[ais_service_index]->sync_init;
callbacks->api_version = 1;
if (ais_service[ais_service_index]->sync_mode == CS_SYNC_V1_APIV2) {
callbacks->api_version = 2;
}
callbacks->sync_process = ais_service[ais_service_index]->sync_process;
callbacks->sync_activate = ais_service[ais_service_index]->sync_activate;
callbacks->sync_abort = ais_service[ais_service_index]->sync_abort;
return (0);
}
static int corosync_sync_v2_callbacks_retrieve (
int service_id,
struct sync_callbacks *callbacks)
{
int res;
if (minimum_sync_mode == CS_SYNC_V2 && service_id == CLM_SERVICE && ais_service[CLM_SERVICE] == NULL) {
res = evil_callbacks_load (service_id, callbacks);
return (res);
}
if (minimum_sync_mode == CS_SYNC_V2 && service_id == EVT_SERVICE && ais_service[EVT_SERVICE] == NULL) {
res = evil_callbacks_load (service_id, callbacks);
return (res);
}
if (ais_service[service_id] == NULL) {
return (-1);
}
if (minimum_sync_mode == CS_SYNC_V1 && ais_service[service_id]->sync_mode != CS_SYNC_V2) {
return (-1);
}
callbacks->name = ais_service[service_id]->name;
callbacks->api_version = 1;
if (ais_service[service_id]->sync_mode == CS_SYNC_V1_APIV2) {
callbacks->api_version = 2;
}
callbacks->sync_init_api.sync_init_v1 = ais_service[service_id]->sync_init;
callbacks->sync_process = ais_service[service_id]->sync_process;
callbacks->sync_activate = ais_service[service_id]->sync_activate;
callbacks->sync_abort = ais_service[service_id]->sync_abort;
return (0);
}
static struct memb_ring_id corosync_ring_id;
static void confchg_fn (
enum totem_configuration_type configuration_type,
const unsigned int *member_list, size_t member_list_entries,
const unsigned int *left_list, size_t left_list_entries,
const unsigned int *joined_list, size_t joined_list_entries,
const struct memb_ring_id *ring_id)
{
int i;
int abort_activate = 0;
if (sync_in_process == 1) {
abort_activate = 1;
}
sync_in_process = 1;
serialize_lock ();
memcpy (&corosync_ring_id, ring_id, sizeof (struct memb_ring_id));
/*
* Call configuration change for all services
*/
for (i = 0; i < service_count; i++) {
if (ais_service[i] && ais_service[i]->confchg_fn) {
ais_service[i]->confchg_fn (configuration_type,
member_list, member_list_entries,
left_list, left_list_entries,
joined_list, joined_list_entries, ring_id);
}
}
serialize_unlock ();
if (abort_activate) {
sync_v2_abort ();
}
if (minimum_sync_mode == CS_SYNC_V2 && configuration_type == TOTEM_CONFIGURATION_TRANSITIONAL) {
sync_v2_save_transitional (member_list, member_list_entries, ring_id);
}
if (minimum_sync_mode == CS_SYNC_V2 && configuration_type == TOTEM_CONFIGURATION_REGULAR) {
sync_v2_start (member_list, member_list_entries, ring_id);
}
}
static void priv_drop (void)
{
return; /* TODO: we are still not dropping privs */
}
static void corosync_tty_detach (void)
{
/*
* Disconnect from TTY if this is not a debug run
*/
switch (fork ()) {
case -1:
corosync_exit_error (AIS_DONE_FORK);
break;
case 0:
/*
* child which is disconnected, run this process
*/
break;
default:
exit (0);
break;
}
/* Create new session */
(void)setsid();
/*
* Map stdin/out/err to /dev/null.
*/
freopen("/dev/null", "r", stdin);
freopen("/dev/null", "a", stderr);
freopen("/dev/null", "a", stdout);
}
static void corosync_mlockall (void)
{
#if !defined(COROSYNC_BSD) || defined(COROSYNC_FREEBSD_GE_8)
int res;
#endif
struct rlimit rlimit;
rlimit.rlim_cur = RLIM_INFINITY;
rlimit.rlim_max = RLIM_INFINITY;
#ifndef COROSYNC_SOLARIS
setrlimit (RLIMIT_MEMLOCK, &rlimit);
#else
setrlimit (RLIMIT_VMEM, &rlimit);
#endif
#if defined(COROSYNC_BSD) && !defined(COROSYNC_FREEBSD_GE_8)
/* under FreeBSD < 8 a process with locked page cannot call dlopen
* code disabled until FreeBSD bug i386/93396 was solved
*/
log_printf (LOGSYS_LEVEL_WARNING, "Could not lock memory of service to avoid page faults\n");
#else
res = mlockall (MCL_CURRENT | MCL_FUTURE);
if (res == -1) {
char error_str[100];
strerror_r (errno, error_str, 100);
log_printf (LOGSYS_LEVEL_WARNING,
"Could not lock memory of service to avoid page faults: %s\n",
error_str);
};
#endif
}
static void corosync_totem_stats_updater (void *data)
{
totempg_stats_t * stats;
uint32_t mtt_rx_token;
uint32_t total_mtt_rx_token;
uint32_t avg_backlog_calc;
uint32_t total_backlog_calc;
uint32_t avg_token_holdtime;
uint32_t total_token_holdtime;
int t, prev;
int32_t token_count;
stats = api->totem_get_stats();
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"orf_token_tx", strlen("orf_token_tx"),
&stats->mrp->srp->orf_token_tx, sizeof (stats->mrp->srp->orf_token_tx));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"orf_token_rx", strlen("orf_token_rx"),
&stats->mrp->srp->orf_token_rx, sizeof (stats->mrp->srp->orf_token_rx));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"memb_merge_detect_tx", strlen("memb_merge_detect_tx"),
&stats->mrp->srp->memb_merge_detect_tx, sizeof (stats->mrp->srp->memb_merge_detect_tx));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"memb_merge_detect_rx", strlen("memb_merge_detect_rx"),
&stats->mrp->srp->memb_merge_detect_rx, sizeof (stats->mrp->srp->memb_merge_detect_rx));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"memb_join_tx", strlen("memb_join_tx"),
&stats->mrp->srp->memb_join_tx, sizeof (stats->mrp->srp->memb_join_tx));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"memb_join_rx", strlen("memb_join_rx"),
&stats->mrp->srp->memb_join_rx, sizeof (stats->mrp->srp->memb_join_rx));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"mcast_tx", strlen("mcast_tx"),
&stats->mrp->srp->mcast_tx, sizeof (stats->mrp->srp->mcast_tx));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"mcast_retx", strlen("mcast_retx"),
&stats->mrp->srp->mcast_retx, sizeof (stats->mrp->srp->mcast_retx));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"mcast_rx", strlen("mcast_rx"),
&stats->mrp->srp->mcast_rx, sizeof (stats->mrp->srp->mcast_rx));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"memb_commit_token_tx", strlen("memb_commit_token_tx"),
&stats->mrp->srp->memb_commit_token_tx, sizeof (stats->mrp->srp->memb_commit_token_tx));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"memb_commit_token_rx", strlen("memb_commit_token_rx"),
&stats->mrp->srp->memb_commit_token_rx, sizeof (stats->mrp->srp->memb_commit_token_rx));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"token_hold_cancel_tx", strlen("token_hold_cancel_tx"),
&stats->mrp->srp->token_hold_cancel_tx, sizeof (stats->mrp->srp->token_hold_cancel_tx));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"token_hold_cancel_rx", strlen("token_hold_cancel_rx"),
&stats->mrp->srp->token_hold_cancel_rx, sizeof (stats->mrp->srp->token_hold_cancel_rx));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"operational_entered", strlen("operational_entered"),
&stats->mrp->srp->operational_entered, sizeof (stats->mrp->srp->operational_entered));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"operational_token_lost", strlen("operational_token_lost"),
&stats->mrp->srp->operational_token_lost, sizeof (stats->mrp->srp->operational_token_lost));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"gather_entered", strlen("gather_entered"),
&stats->mrp->srp->gather_entered, sizeof (stats->mrp->srp->gather_entered));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"gather_token_lost", strlen("gather_token_lost"),
&stats->mrp->srp->gather_token_lost, sizeof (stats->mrp->srp->gather_token_lost));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"commit_entered", strlen("commit_entered"),
&stats->mrp->srp->commit_entered, sizeof (stats->mrp->srp->commit_entered));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"commit_token_lost", strlen("commit_token_lost"),
&stats->mrp->srp->commit_token_lost, sizeof (stats->mrp->srp->commit_token_lost));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"recovery_entered", strlen("recovery_entered"),
&stats->mrp->srp->recovery_entered, sizeof (stats->mrp->srp->recovery_entered));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"recovery_token_lost", strlen("recovery_token_lost"),
&stats->mrp->srp->recovery_token_lost, sizeof (stats->mrp->srp->recovery_token_lost));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"consensus_timeouts", strlen("consensus_timeouts"),
&stats->mrp->srp->consensus_timeouts, sizeof (stats->mrp->srp->consensus_timeouts));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"rx_msg_dropped", strlen("rx_msg_dropped"),
&stats->mrp->srp->rx_msg_dropped, sizeof (stats->mrp->srp->rx_msg_dropped));
total_mtt_rx_token = 0;
total_token_holdtime = 0;
total_backlog_calc = 0;
token_count = 0;
t = stats->mrp->srp->latest_token;
while (1) {
if (t == 0)
prev = TOTEM_TOKEN_STATS_MAX - 1;
else
prev = t - 1;
if (prev == stats->mrp->srp->earliest_token)
break;
/* if tx == 0, then dropped token (not ours) */
if (stats->mrp->srp->token[t].tx != 0 ||
(stats->mrp->srp->token[t].rx - stats->mrp->srp->token[prev].rx) > 0 ) {
total_mtt_rx_token += (stats->mrp->srp->token[t].rx - stats->mrp->srp->token[prev].rx);
total_token_holdtime += (stats->mrp->srp->token[t].tx - stats->mrp->srp->token[t].rx);
total_backlog_calc += stats->mrp->srp->token[t].backlog_calc;
token_count++;
}
t = prev;
}
if (token_count) {
mtt_rx_token = (total_mtt_rx_token / token_count);
avg_backlog_calc = (total_backlog_calc / token_count);
avg_token_holdtime = (total_token_holdtime / token_count);
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"mtt_rx_token", strlen("mtt_rx_token"),
&mtt_rx_token, sizeof (mtt_rx_token));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"avg_token_workload", strlen("avg_token_workload"),
&avg_token_holdtime, sizeof (avg_token_holdtime));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"avg_backlog_calc", strlen("avg_backlog_calc"),
&avg_backlog_calc, sizeof (avg_backlog_calc));
}
api->timer_add_duration (1500 * MILLI_2_NANO_SECONDS, NULL,
corosync_totem_stats_updater,
&corosync_stats_timer_handle);
}
static void corosync_totem_stats_init (void)
{
totempg_stats_t * stats;
hdb_handle_t object_find_handle;
hdb_handle_t object_runtime_handle;
hdb_handle_t object_totem_handle;
uint32_t zero_32 = 0;
uint64_t zero_64 = 0;
stats = api->totem_get_stats();
objdb->object_find_create (
OBJECT_PARENT_HANDLE,
"runtime",
strlen ("runtime"),
&object_find_handle);
if (objdb->object_find_next (object_find_handle,
&object_runtime_handle) == 0) {
objdb->object_create (object_runtime_handle,
&object_totem_handle,
"totem", strlen ("totem"));
objdb->object_create (object_totem_handle,
&stats->hdr.handle,
"pg", strlen ("pg"));
objdb->object_create (stats->hdr.handle,
&stats->mrp->hdr.handle,
"mrp", strlen ("mrp"));
objdb->object_create (stats->mrp->hdr.handle,
&stats->mrp->srp->hdr.handle,
"srp", strlen ("srp"));
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"orf_token_tx", &stats->mrp->srp->orf_token_tx,
sizeof (stats->mrp->srp->orf_token_tx), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"orf_token_rx", &stats->mrp->srp->orf_token_rx,
sizeof (stats->mrp->srp->orf_token_rx), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"memb_merge_detect_tx", &stats->mrp->srp->memb_merge_detect_tx,
sizeof (stats->mrp->srp->memb_merge_detect_tx), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"memb_merge_detect_rx", &stats->mrp->srp->memb_merge_detect_rx,
sizeof (stats->mrp->srp->memb_merge_detect_rx), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"memb_join_tx", &stats->mrp->srp->memb_join_tx,
sizeof (stats->mrp->srp->memb_join_tx), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"memb_join_rx", &stats->mrp->srp->memb_join_rx,
sizeof (stats->mrp->srp->memb_join_rx), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"mcast_tx", &stats->mrp->srp->mcast_tx,
sizeof (stats->mrp->srp->mcast_tx), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"mcast_retx", &stats->mrp->srp->mcast_retx,
sizeof (stats->mrp->srp->mcast_retx), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"mcast_rx", &stats->mrp->srp->mcast_rx,
sizeof (stats->mrp->srp->mcast_rx), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"memb_commit_token_tx", &stats->mrp->srp->memb_commit_token_tx,
sizeof (stats->mrp->srp->memb_commit_token_tx), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"memb_commit_token_rx", &stats->mrp->srp->memb_commit_token_rx,
sizeof (stats->mrp->srp->memb_commit_token_rx), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"token_hold_cancel_tx", &stats->mrp->srp->token_hold_cancel_tx,
sizeof (stats->mrp->srp->token_hold_cancel_tx), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"token_hold_cancel_rx", &stats->mrp->srp->token_hold_cancel_rx,
sizeof (stats->mrp->srp->token_hold_cancel_rx), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"operational_entered", &stats->mrp->srp->operational_entered,
sizeof (stats->mrp->srp->operational_entered), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"operational_token_lost", &stats->mrp->srp->operational_token_lost,
sizeof (stats->mrp->srp->operational_token_lost), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"gather_entered", &stats->mrp->srp->gather_entered,
sizeof (stats->mrp->srp->gather_entered), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"gather_token_lost", &stats->mrp->srp->gather_token_lost,
sizeof (stats->mrp->srp->gather_token_lost), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"commit_entered", &stats->mrp->srp->commit_entered,
sizeof (stats->mrp->srp->commit_entered), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"commit_token_lost", &stats->mrp->srp->commit_token_lost,
sizeof (stats->mrp->srp->commit_token_lost), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"recovery_entered", &stats->mrp->srp->recovery_entered,
sizeof (stats->mrp->srp->recovery_entered), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"recovery_token_lost", &stats->mrp->srp->recovery_token_lost,
sizeof (stats->mrp->srp->recovery_token_lost), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"consensus_timeouts", &stats->mrp->srp->consensus_timeouts,
sizeof (stats->mrp->srp->consensus_timeouts), OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"mtt_rx_token", &zero_32,
sizeof (zero_32), OBJDB_VALUETYPE_UINT32);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"avg_token_workload", &zero_32,
sizeof (zero_32), OBJDB_VALUETYPE_UINT32);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"avg_backlog_calc", &zero_32,
sizeof (zero_32), OBJDB_VALUETYPE_UINT32);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"rx_msg_dropped", &zero_64,
sizeof (zero_64), OBJDB_VALUETYPE_UINT64);
}
/* start stats timer */
api->timer_add_duration (1500 * MILLI_2_NANO_SECONDS, NULL,
corosync_totem_stats_updater,
&corosync_stats_timer_handle);
}
static void deliver_fn (
unsigned int nodeid,
const void *msg,
unsigned int msg_len,
int endian_conversion_required)
{
const coroipc_request_header_t *header;
int service;
int fn_id;
unsigned int id;
unsigned int size;
unsigned int key_incr_dummy;
header = msg;
if (endian_conversion_required) {
id = swab32 (header->id);
size = swab32 (header->size);
} else {
id = header->id;
size = header->size;
}
/*
* Call the proper executive handler
*/
service = id >> 16;
fn_id = id & 0xffff;
serialize_lock();
if (ais_service[service] == NULL && service == EVT_SERVICE) {
evil_deliver_fn (nodeid, service, fn_id, msg,
endian_conversion_required);
}
if (!ais_service[service]) {
serialize_unlock();
return;
}
if (fn_id >= ais_service[service]->exec_engine_count) {
log_printf(LOGSYS_LEVEL_WARNING, "discarded unknown message %d for service %d (max id %d)",
fn_id, service, ais_service[service]->exec_engine_count);
serialize_unlock();
return;
}
objdb->object_key_increment (service_stats_handle[service][fn_id],
"rx", strlen("rx"),
&key_incr_dummy);
if (endian_conversion_required) {
assert(ais_service[service]->exec_engine[fn_id].exec_endian_convert_fn != NULL);
ais_service[service]->exec_engine[fn_id].exec_endian_convert_fn
((void *)msg);
}
ais_service[service]->exec_engine[fn_id].exec_handler_fn
(msg, nodeid);
serialize_unlock();
}
void main_get_config_modules(struct config_iface_ver0 ***modules, int *num)
{
*modules = config_modules;
*num = num_config_modules;
}
int main_mcast (
const struct iovec *iovec,
unsigned int iov_len,
unsigned int guarantee)
{
const coroipc_request_header_t *req = iovec->iov_base;
int service;
int fn_id;
unsigned int key_incr_dummy;
service = req->id >> 16;
fn_id = req->id & 0xffff;
if (ais_service[service]) {
objdb->object_key_increment (service_stats_handle[service][fn_id],
"tx", strlen("tx"), &key_incr_dummy);
}
return (totempg_groups_mcast_joined (corosync_group_handle, iovec, iov_len, guarantee));
}
int message_source_is_local (const mar_message_source_t *source)
{
int ret = 0;
assert (source != NULL);
if (source->nodeid == totempg_my_nodeid_get ()) {
ret = 1;
}
return ret;
}
void message_source_set (
mar_message_source_t *source,
void *conn)
{
assert ((source != NULL) && (conn != NULL));
memset (source, 0, sizeof (mar_message_source_t));
source->nodeid = totempg_my_nodeid_get ();
source->conn = conn;
}
/*
* Provides the glue from corosync to the IPC Service
*/
static int corosync_private_data_size_get (unsigned int service)
{
return (ais_service[service]->private_data_size);
}
static coroipcs_init_fn_lvalue corosync_init_fn_get (unsigned int service)
{
return (ais_service[service]->lib_init_fn);
}
static coroipcs_exit_fn_lvalue corosync_exit_fn_get (unsigned int service)
{
return (ais_service[service]->lib_exit_fn);
}
static coroipcs_handler_fn_lvalue corosync_handler_fn_get (unsigned int service, unsigned int id)
{
return (ais_service[service]->lib_engine[id].lib_handler_fn);
}
static int corosync_security_valid (int euid, int egid)
{
struct list_head *iter;
if (corosync_not_enough_fds_left) {
- return 0;
+ errno = EMFILE;
+ return (0);
}
if (euid == 0 || egid == 0) {
return (1);
}
for (iter = uidgid_list_head.next; iter != &uidgid_list_head;
iter = iter->next) {
struct uidgid_item *ugi = list_entry (iter, struct uidgid_item,
list);
if (euid == ugi->uid || egid == ugi->gid)
return (1);
}
+ errno = EACCES;
return (0);
}
static int corosync_service_available (unsigned int service)
{
return (ais_service[service] != NULL && !ais_service_exiting[service]);
}
struct sending_allowed_private_data_struct {
int reserved_msgs;
};
static int corosync_sending_allowed (
unsigned int service,
unsigned int id,
const void *msg,
void *sending_allowed_private_data)
{
struct sending_allowed_private_data_struct *pd =
(struct sending_allowed_private_data_struct *)sending_allowed_private_data;
struct iovec reserve_iovec;
coroipc_request_header_t *header = (coroipc_request_header_t *)msg;
int sending_allowed;
reserve_iovec.iov_base = (char *)header;
reserve_iovec.iov_len = header->size;
pd->reserved_msgs = totempg_groups_joined_reserve (
corosync_group_handle,
&reserve_iovec, 1);
if (pd->reserved_msgs == -1) {
return (-1);
}
sending_allowed =
(corosync_quorum_is_quorate() == 1 ||
ais_service[service]->allow_inquorate == CS_LIB_ALLOW_INQUORATE) &&
((ais_service[service]->lib_engine[id].flow_control == CS_LIB_FLOW_CONTROL_NOT_REQUIRED) ||
((ais_service[service]->lib_engine[id].flow_control == CS_LIB_FLOW_CONTROL_REQUIRED) &&
(pd->reserved_msgs) &&
(sync_in_process == 0)));
return (sending_allowed);
}
static void corosync_sending_allowed_release (void *sending_allowed_private_data)
{
struct sending_allowed_private_data_struct *pd =
(struct sending_allowed_private_data_struct *)sending_allowed_private_data;
if (pd->reserved_msgs == -1) {
return;
}
totempg_groups_joined_release (pd->reserved_msgs);
}
static int ipc_subsys_id = -1;
static void ipc_fatal_error(const char *error_msg) __attribute__((noreturn));
static void ipc_fatal_error(const char *error_msg) {
_logsys_log_printf (
LOGSYS_ENCODE_RECID(LOGSYS_LEVEL_ERROR,
ipc_subsys_id,
LOGSYS_RECID_LOG),
__FUNCTION__, __FILE__, __LINE__,
"%s", error_msg);
exit(EXIT_FAILURE);
}
static int corosync_poll_handler_accept (
hdb_handle_t handle,
int fd,
int revent,
void *context)
{
return (coroipcs_handler_accept (fd, revent, context));
}
static int corosync_poll_handler_dispatch (
hdb_handle_t handle,
int fd,
int revent,
void *context)
{
return (coroipcs_handler_dispatch (fd, revent, context));
}
static void corosync_poll_accept_add (
int fd)
{
poll_dispatch_add (corosync_poll_handle, fd, POLLIN|POLLNVAL, 0,
corosync_poll_handler_accept);
}
static void corosync_poll_dispatch_add (
int fd,
void *context)
{
poll_dispatch_add (corosync_poll_handle, fd, POLLIN|POLLNVAL, context,
corosync_poll_handler_dispatch);
}
static void corosync_poll_dispatch_modify (
int fd,
int events)
{
poll_dispatch_modify (corosync_poll_handle, fd, events,
corosync_poll_handler_dispatch);
}
static void corosync_poll_dispatch_destroy (
int fd,
void *context)
{
poll_dispatch_delete (corosync_poll_handle, fd);
}
static hdb_handle_t corosync_stats_create_connection (const char* name,
const pid_t pid, const int fd)
{
uint32_t zero_32 = 0;
uint64_t zero_64 = 0;
unsigned int key_incr_dummy;
hdb_handle_t object_handle;
objdb->object_key_increment (object_connection_handle,
"active", strlen("active"),
&key_incr_dummy);
objdb->object_create (object_connection_handle,
&object_handle,
name,
strlen (name));
objdb->object_key_create_typed (object_handle,
"service_id",
&zero_32, sizeof (zero_32),
OBJDB_VALUETYPE_UINT32);
objdb->object_key_create_typed (object_handle,
"client_pid",
&pid, sizeof (pid),
OBJDB_VALUETYPE_INT32);
objdb->object_key_create_typed (object_handle,
"responses",
&zero_64, sizeof (zero_64),
OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (object_handle,
"dispatched",
&zero_64, sizeof (zero_64),
OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (object_handle,
"requests",
&zero_64, sizeof (zero_64),
OBJDB_VALUETYPE_INT64);
objdb->object_key_create_typed (object_handle,
"sem_retry_count",
&zero_64, sizeof (zero_64),
OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (object_handle,
"send_retry_count",
&zero_64, sizeof (zero_64),
OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (object_handle,
"recv_retry_count",
&zero_64, sizeof (zero_64),
OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (object_handle,
"flow_control",
&zero_32, sizeof (zero_32),
OBJDB_VALUETYPE_UINT32);
objdb->object_key_create_typed (object_handle,
"flow_control_count",
&zero_64, sizeof (zero_64),
OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (object_handle,
"queue_size",
&zero_64, sizeof (zero_64),
OBJDB_VALUETYPE_UINT64);
return object_handle;
}
static void corosync_stats_destroy_connection (hdb_handle_t handle)
{
unsigned int key_incr_dummy;
objdb->object_destroy (handle);
objdb->object_key_increment (object_connection_handle,
"closed", strlen("closed"),
&key_incr_dummy);
objdb->object_key_decrement (object_connection_handle,
"active", strlen("active"),
&key_incr_dummy);
}
static void corosync_stats_update_value (hdb_handle_t handle,
const char *name, const void *value,
size_t value_len)
{
objdb->object_key_replace (handle,
name, strlen(name),
value, value_len);
}
static void corosync_stats_increment_value (hdb_handle_t handle,
const char* name)
{
unsigned int key_incr_dummy;
objdb->object_key_increment (handle,
name, strlen(name),
&key_incr_dummy);
}
static void corosync_stats_decrement_value (hdb_handle_t handle,
const char* name)
{
unsigned int key_incr_dummy;
objdb->object_key_decrement (handle,
name, strlen(name),
&key_incr_dummy);
}
static struct coroipcs_init_state_v2 ipc_init_state_v2 = {
.socket_name = COROSYNC_SOCKET_NAME,
.sched_policy = SCHED_OTHER,
.sched_param = &global_sched_param,
.malloc = malloc,
.free = free,
.log_printf = _logsys_log_printf,
.fatal_error = ipc_fatal_error,
.security_valid = corosync_security_valid,
.service_available = corosync_service_available,
.private_data_size_get = corosync_private_data_size_get,
.serialize_lock = serialize_lock,
.serialize_unlock = serialize_unlock,
.sending_allowed = corosync_sending_allowed,
.sending_allowed_release = corosync_sending_allowed_release,
.poll_accept_add = corosync_poll_accept_add,
.poll_dispatch_add = corosync_poll_dispatch_add,
.poll_dispatch_modify = corosync_poll_dispatch_modify,
.poll_dispatch_destroy = corosync_poll_dispatch_destroy,
.init_fn_get = corosync_init_fn_get,
.exit_fn_get = corosync_exit_fn_get,
.handler_fn_get = corosync_handler_fn_get,
.stats_create_connection = corosync_stats_create_connection,
.stats_destroy_connection = corosync_stats_destroy_connection,
.stats_update_value = corosync_stats_update_value,
.stats_increment_value = corosync_stats_increment_value,
.stats_decrement_value = corosync_stats_decrement_value,
};
static void corosync_setscheduler (void)
{
#if defined(HAVE_PTHREAD_SETSCHEDPARAM) && defined(HAVE_SCHED_GET_PRIORITY_MAX) && defined(HAVE_SCHED_SETSCHEDULER)
int res;
sched_priority = sched_get_priority_max (SCHED_RR);
if (sched_priority != -1) {
global_sched_param.sched_priority = sched_priority;
res = sched_setscheduler (0, SCHED_RR, &global_sched_param);
if (res == -1) {
char error_str[100];
strerror_r (errno, error_str, 100);
global_sched_param.sched_priority = 0;
log_printf (LOGSYS_LEVEL_WARNING, "Could not set SCHED_RR at priority %d: %s\n",
global_sched_param.sched_priority, error_str);
logsys_thread_priority_set (SCHED_OTHER, NULL, 1);
} else {
/*
* Turn on SCHED_RR in ipc system
*/
ipc_init_state_v2.sched_policy = SCHED_RR;
/*
* Turn on SCHED_RR in logsys system
*/
res = logsys_thread_priority_set (SCHED_RR, &global_sched_param, 10);
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR,
"Could not set logsys thread priority."
" Can't continue because of priority inversions.");
corosync_exit_error (AIS_DONE_LOGSETUP);
}
}
} else {
char error_str[100];
strerror_r (errno, error_str, 100);
log_printf (LOGSYS_LEVEL_WARNING,
"Could not get maximum scheduler priority: %s\n",
error_str);
sched_priority = 0;
}
#else
log_printf(LOGSYS_LEVEL_WARNING,
"The Platform is missing process priority setting features. Leaving at default.");
#endif
}
static void corosync_stats_init (void)
{
hdb_handle_t object_find_handle;
hdb_handle_t object_runtime_handle;
uint64_t zero_64 = 0;
objdb->object_find_create (OBJECT_PARENT_HANDLE,
"runtime", strlen ("runtime"),
&object_find_handle);
if (objdb->object_find_next (object_find_handle,
&object_runtime_handle) != 0) {
return;
}
/* Connection objects */
objdb->object_create (object_runtime_handle,
&object_connection_handle,
"connections", strlen ("connections"));
objdb->object_key_create_typed (object_connection_handle,
"active", &zero_64, sizeof (zero_64),
OBJDB_VALUETYPE_UINT64);
objdb->object_key_create_typed (object_connection_handle,
"closed", &zero_64, sizeof (zero_64),
OBJDB_VALUETYPE_UINT64);
}
static void main_low_fds_event(int32_t not_enough, int32_t fds_available)
{
corosync_not_enough_fds_left = not_enough;
if (not_enough) {
log_printf(LOGSYS_LEVEL_WARNING, "refusing new connections (fds_available:%d)\n",
fds_available);
} else {
log_printf(LOGSYS_LEVEL_NOTICE, "allowing new connections (fds_available:%d)\n",
fds_available);
}
}
static void main_service_ready (void)
{
int res;
/*
* This must occur after totempg is initialized because "this_ip" must be set
*/
res = corosync_service_defaults_link_and_init (api);
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "Could not initialize default services\n");
corosync_exit_error (AIS_DONE_INIT_SERVICES);
}
evil_init (api);
corosync_stats_init ();
corosync_totem_stats_init ();
if (minimum_sync_mode == CS_SYNC_V2) {
log_printf (LOGSYS_LEVEL_NOTICE, "Compatibility mode set to none. Using V2 of the synchronization engine.\n");
sync_v2_init (
corosync_sync_v2_callbacks_retrieve,
corosync_sync_completed);
} else
if (minimum_sync_mode == CS_SYNC_V1) {
log_printf (LOGSYS_LEVEL_NOTICE, "Compatibility mode set to whitetank. Using V1 and V2 of the synchronization engine.\n");
sync_register (
corosync_sync_callbacks_retrieve,
sync_v2_memb_list_determine,
sync_v2_memb_list_abort,
sync_v2_start);
sync_v2_init (
corosync_sync_v2_callbacks_retrieve,
corosync_sync_completed);
}
}
static enum e_ais_done corosync_flock (const char *lockfile, pid_t pid)
{
struct flock lock;
enum e_ais_done err;
char pid_s[17];
int fd_flag;
int lf;
err = AIS_DONE_EXIT;
lf = open (lockfile, O_WRONLY | O_CREAT, 0640);
if (lf == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't create lock file.\n");
return (AIS_DONE_AQUIRE_LOCK);
}
retry_fcntl:
lock.l_type = F_WRLCK;
lock.l_start = 0;
lock.l_whence = SEEK_SET;
lock.l_len = 0;
if (fcntl (lf, F_SETLK, &lock) == -1) {
switch (errno) {
case EINTR:
goto retry_fcntl;
break;
case EAGAIN:
case EACCES:
log_printf (LOGSYS_LEVEL_ERROR, "Another Corosync instance is already running.\n");
err = AIS_DONE_ALREADY_RUNNING;
goto error_close;
break;
default:
log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't aquire lock. Error was %s\n",
strerror(errno));
err = AIS_DONE_AQUIRE_LOCK;
goto error_close;
break;
}
}
if (ftruncate (lf, 0) == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't truncate lock file. Error was %s\n",
strerror (errno));
err = AIS_DONE_AQUIRE_LOCK;
goto error_close_unlink;
}
memset (pid_s, 0, sizeof (pid_s));
snprintf (pid_s, sizeof (pid_s) - 1, "%u\n", pid);
retry_write:
if (write (lf, pid_s, strlen (pid_s)) != strlen (pid_s)) {
if (errno == EINTR) {
goto retry_write;
} else {
log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't write pid to lock file. "
"Error was %s\n", strerror (errno));
err = AIS_DONE_AQUIRE_LOCK;
goto error_close_unlink;
}
}
if ((fd_flag = fcntl (lf, F_GETFD, 0)) == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't get close-on-exec flag from lock file. "
"Error was %s\n", strerror (errno));
err = AIS_DONE_AQUIRE_LOCK;
goto error_close_unlink;
}
fd_flag |= FD_CLOEXEC;
if (fcntl (lf, F_SETFD, fd_flag) == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't set close-on-exec flag to lock file. "
"Error was %s\n", strerror (errno));
err = AIS_DONE_AQUIRE_LOCK;
goto error_close_unlink;
}
return (err);
error_close_unlink:
unlink (lockfile);
error_close:
close (lf);
return (err);
}
int main (int argc, char **argv, char **envp)
{
const char *error_string;
struct totem_config totem_config;
hdb_handle_t objdb_handle;
hdb_handle_t config_handle;
unsigned int config_version = 0;
void *objdb_p;
struct config_iface_ver0 *config;
void *config_p;
const char *config_iface_init;
char *config_iface;
char *iface;
char *strtok_save_pt;
int res, ch;
int background, setprio;
struct stat stat_out;
char corosync_lib_dir[PATH_MAX];
hdb_handle_t object_runtime_handle;
enum e_ais_done flock_err;
#if defined(HAVE_PTHREAD_SPIN_LOCK)
pthread_spin_init (&serialize_spin, 0);
#endif
/* default configuration
*/
background = 1;
setprio = 1;
while ((ch = getopt (argc, argv, "fpv")) != EOF) {
switch (ch) {
case 'f':
background = 0;
logsys_config_mode_set (NULL, LOGSYS_MODE_OUTPUT_STDERR|LOGSYS_MODE_THREADED|LOGSYS_MODE_FORK);
break;
case 'p':
setprio = 0;
break;
case 'v':
printf ("Corosync Cluster Engine, version '%s' SVN revision '%s'\n", VERSION, SVN_REVISION);
printf ("Copyright (c) 2006-2009 Red Hat, Inc.\n");
return EXIT_SUCCESS;
break;
default:
fprintf(stderr, \
"usage:\n"\
" -f : Start application in foreground.\n"\
" -p : Do not set process priority. \n"\
" -v : Display version and SVN revision of Corosync and exit.\n");
return EXIT_FAILURE;
}
}
/*
* Set round robin realtime scheduling with priority 99
* Lock all memory to avoid page faults which may interrupt
* application healthchecking
*/
if (setprio) {
corosync_setscheduler ();
}
corosync_mlockall ();
log_printf (LOGSYS_LEVEL_NOTICE, "Corosync Cluster Engine ('%s'): started and ready to provide service.\n", VERSION);
log_printf (LOGSYS_LEVEL_INFO, "Corosync built-in features:" PACKAGE_FEATURES "\n");
(void)signal (SIGINT, sigintr_handler);
(void)signal (SIGUSR2, sigusr2_handler);
(void)signal (SIGSEGV, sigsegv_handler);
(void)signal (SIGABRT, sigabrt_handler);
(void)signal (SIGQUIT, sigquit_handler);
(void)signal (SIGTERM, sigterm_handler);
#if MSG_NOSIGNAL != 0
(void)signal (SIGPIPE, SIG_IGN);
#endif
/*
* Load the object database interface
*/
res = lcr_ifact_reference (
&objdb_handle,
"objdb",
0,
&objdb_p,
0);
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't open configuration object database component.\n");
corosync_exit_error (AIS_DONE_OBJDB);
}
objdb = (struct objdb_iface_ver0 *)objdb_p;
objdb->objdb_init ();
/*
* Initialize the corosync_api_v1 definition
*/
apidef_init (objdb);
api = apidef_get ();
num_config_modules = 0;
/*
* Bootstrap in the default configuration parser or use
* the corosync default built in parser if the configuration parser
* isn't overridden
*/
config_iface_init = getenv("COROSYNC_DEFAULT_CONFIG_IFACE");
if (!config_iface_init) {
config_iface_init = "corosync_parser";
}
/* Make a copy so we can deface it with strtok */
if ((config_iface = strdup(config_iface_init)) == NULL) {
log_printf (LOGSYS_LEVEL_ERROR, "exhausted virtual memory");
corosync_exit_error (AIS_DONE_OBJDB);
}
iface = strtok_r(config_iface, ":", &strtok_save_pt);
while (iface)
{
res = lcr_ifact_reference (
&config_handle,
iface,
config_version,
&config_p,
0);
config = (struct config_iface_ver0 *)config_p;
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't open configuration component '%s'\n", iface);
corosync_exit_error (AIS_DONE_MAINCONFIGREAD);
}
res = config->config_readconfig(objdb, &error_string);
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string);
corosync_exit_error (AIS_DONE_MAINCONFIGREAD);
}
log_printf (LOGSYS_LEVEL_NOTICE, "%s", error_string);
config_modules[num_config_modules++] = config;
iface = strtok_r(NULL, ":", &strtok_save_pt);
}
free(config_iface);
res = corosync_main_config_read (objdb, &error_string);
if (res == -1) {
/*
* if we are here, we _must_ flush the logsys queue
* and try to inform that we couldn't read the config.
* this is a desperate attempt before certain death
* and there is no guarantee that we can print to stderr
* nor that logsys is sending the messages where we expect.
*/
log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string);
fprintf(stderr, "%s", error_string);
syslog (LOGSYS_LEVEL_ERROR, "%s", error_string);
corosync_exit_error (AIS_DONE_MAINCONFIGREAD);
}
/*
* Make sure required directory is present
*/
sprintf (corosync_lib_dir, "%s/lib/corosync", LOCALSTATEDIR);
res = stat (corosync_lib_dir, &stat_out);
if ((res == -1) || (res == 0 && !S_ISDIR(stat_out.st_mode))) {
log_printf (LOGSYS_LEVEL_ERROR, "Required directory not present %s. Please create it.\n", corosync_lib_dir);
corosync_exit_error (AIS_DONE_DIR_NOT_PRESENT);
}
res = totem_config_read (objdb, &totem_config, &error_string);
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string);
corosync_exit_error (AIS_DONE_MAINCONFIGREAD);
}
res = totem_config_keyread (objdb, &totem_config, &error_string);
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string);
corosync_exit_error (AIS_DONE_MAINCONFIGREAD);
}
res = totem_config_validate (&totem_config, &error_string);
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string);
corosync_exit_error (AIS_DONE_MAINCONFIGREAD);
}
totem_config.totem_logging_configuration = totem_logging_configuration;
totem_config.totem_logging_configuration.log_subsys_id =
_logsys_subsys_create ("TOTEM");
if (totem_config.totem_logging_configuration.log_subsys_id < 0) {
log_printf (LOGSYS_LEVEL_ERROR,
"Unable to initialize TOTEM logging subsystem\n");
corosync_exit_error (AIS_DONE_MAINCONFIGREAD);
}
totem_config.totem_logging_configuration.log_level_security = LOGSYS_LEVEL_WARNING;
totem_config.totem_logging_configuration.log_level_error = LOGSYS_LEVEL_ERROR;
totem_config.totem_logging_configuration.log_level_warning = LOGSYS_LEVEL_WARNING;
totem_config.totem_logging_configuration.log_level_notice = LOGSYS_LEVEL_NOTICE;
totem_config.totem_logging_configuration.log_level_debug = LOGSYS_LEVEL_DEBUG;
totem_config.totem_logging_configuration.log_printf = _logsys_log_printf;
res = corosync_main_config_compatibility_read (objdb,
&minimum_sync_mode,
&error_string);
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string);
corosync_exit_error (AIS_DONE_MAINCONFIGREAD);
}
res = corosync_main_config_compatibility_read (objdb,
&minimum_sync_mode,
&error_string);
if (res == -1) {
log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string);
corosync_exit_error (AIS_DONE_MAINCONFIGREAD);
}
/* create the main runtime object */
objdb->object_create (OBJECT_PARENT_HANDLE,
&object_runtime_handle,
"runtime", strlen ("runtime"));
/*
* Now we are fully initialized.
*/
if (background) {
corosync_tty_detach ();
}
logsys_fork_completed();
if ((flock_err = corosync_flock (corosync_lock_file, getpid ())) != AIS_DONE_EXIT) {
corosync_exit_error (flock_err);
}
corosync_timer_init (
serialize_lock,
serialize_unlock,
sched_priority);
corosync_poll_handle = poll_create ();
poll_low_fds_event_set(corosync_poll_handle, main_low_fds_event);
/*
* Sleep for a while to let other nodes in the cluster
* understand that this node has been away (if it was
* an corosync restart).
*/
// TODO what is this hack for? usleep(totem_config.token_timeout * 2000);
/*
* Create semaphore and start "exit" thread
*/
res = sem_init (&corosync_exit_sem, 0, 0);
if (res != 0) {
log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't create exit thread.\n");
corosync_exit_error (AIS_DONE_FATAL_ERR);
}
res = pthread_create (&corosync_exit_thread, NULL, corosync_exit_thread_handler, NULL);
if (res != 0) {
log_printf (LOGSYS_LEVEL_ERROR, "Corosync Executive couldn't create exit thread.\n");
corosync_exit_error (AIS_DONE_FATAL_ERR);
}
/*
* if totempg_initialize doesn't have root priveleges, it cannot
* bind to a specific interface. This only matters if
* there is more then one interface in a system, so
* in this case, only a warning is printed
*/
/*
* Join multicast group and setup delivery
* and configuration change functions
*/
totempg_initialize (
corosync_poll_handle,
&totem_config);
totempg_service_ready_register (
main_service_ready);
totempg_groups_initialize (
&corosync_group_handle,
deliver_fn,
confchg_fn);
totempg_groups_join (
corosync_group_handle,
&corosync_group,
1);
/*
* Drop root privleges to user 'ais'
* TODO: Don't really need full root capabilities;
* needed capabilities are:
* CAP_NET_RAW (bindtodevice)
* CAP_SYS_NICE (setscheduler)
* CAP_IPC_LOCK (mlockall)
*/
priv_drop ();
schedwrk_init (
serialize_lock,
serialize_unlock);
ipc_subsys_id = _logsys_subsys_create ("IPC");
if (ipc_subsys_id < 0) {
log_printf (LOGSYS_LEVEL_ERROR,
"Could not initialize IPC logging subsystem\n");
corosync_exit_error (AIS_DONE_INIT_SERVICES);
}
ipc_init_state_v2.log_subsys_id = ipc_subsys_id;
coroipcs_ipc_init_v2 (&ipc_init_state_v2);
/*
* Start main processing loop
*/
poll_run (corosync_poll_handle);
return EXIT_SUCCESS;
}
diff --git a/exec/util.h b/exec/util.h
index ed3529ca..f04794e1 100644
--- a/exec/util.h
+++ b/exec/util.h
@@ -1,79 +1,100 @@
/*
* Copyright (c) 2002-2004 MontaVista Software, Inc.
* Copyright (c) 2004 Open Source Development Lab
*
* All rights reserved.
*
* Author: Steven Dake (sdake@redhat.com), Mark Haverkamp (markh@osdl.org)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef UTIL_H_DEFINED
#define UTIL_H_DEFINED
#include <sys/time.h>
#include <corosync/corotypes.h>
/*
* Get the time of day and convert to nanoseconds
*/
extern cs_time_t clust_time_now(void);
enum e_ais_done {
AIS_DONE_EXIT = 0,
AIS_DONE_UID_DETERMINE = 1,
AIS_DONE_GID_DETERMINE = 2,
AIS_DONE_MEMPOOL_INIT = 3,
AIS_DONE_FORK = 4,
AIS_DONE_LIBAIS_SOCKET = 5,
AIS_DONE_LIBAIS_BIND = 6,
AIS_DONE_READKEY = 7,
AIS_DONE_MAINCONFIGREAD = 8,
AIS_DONE_LOGSETUP = 9,
AIS_DONE_AMFCONFIGREAD = 10,
AIS_DONE_DYNAMICLOAD = 11,
AIS_DONE_OBJDB = 12,
AIS_DONE_INIT_SERVICES = 13,
AIS_DONE_OUT_OF_MEMORY = 14,
AIS_DONE_FATAL_ERR = 15,
AIS_DONE_DIR_NOT_PRESENT = 16,
AIS_DONE_AQUIRE_LOCK = 17,
AIS_DONE_ALREADY_RUNNING = 18,
};
+static inline cs_error_t hdb_error_to_cs (int res) \
+{ \
+ if (res == 0) { \
+ return (CS_OK); \
+ } else { \
+ if (errno == EBADF) { \
+ return (CS_ERR_BAD_HANDLE); \
+ } else \
+ if (errno == ENOMEM) { \
+ return (CS_ERR_NO_MEMORY); \
+ } else \
+ if (errno == EMFILE) { \
+ return (CS_ERR_NO_RESOURCES); \
+ } else \
+ if (errno == EACCES) { \
+ return (CS_ERR_SECURITY); \
+ } \
+ return (CS_ERR_LIBRARY); \
+ } \
+}
+
/*
* Compare two names. returns non-zero on match.
*/
extern int name_match(cs_name_t *name1, cs_name_t *name2);
#define corosync_exit_error(err) _corosync_exit_error ((err), __FILE__, __LINE__)
extern void _corosync_exit_error (enum e_ais_done err, const char *file,
unsigned int line) __attribute__((noreturn));
void _corosync_out_of_memory_error (void) __attribute__((noreturn));
extern char *getcs_name_t (cs_name_t *name);
extern void setcs_name_t (cs_name_t *name, char *str);
extern int cs_name_tisEqual (cs_name_t *str1, char *str2);
#endif /* UTIL_H_DEFINED */
diff --git a/lib/util.h b/lib/util.h
index 4a44bba5..c228b423 100644
--- a/lib/util.h
+++ b/lib/util.h
@@ -1,66 +1,72 @@
/*
* Copyright (c) 2002-2003 MontaVista Software, Inc.
*
* All rights reserved.
*
* Author: Steven Dake (sdake@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef AIS_UTIL_H_DEFINED
#define AIS_UTIL_H_DEFINED
#include <errno.h>
static inline cs_error_t hdb_error_to_cs (int res) \
{ \
if (res == 0) { \
return (CS_OK); \
} else { \
if (errno == EBADF) { \
return (CS_ERR_BAD_HANDLE); \
} else \
if (errno == ENOMEM) { \
return (CS_ERR_NO_MEMORY); \
+ } else \
+ if (errno == EMFILE) { \
+ return (CS_ERR_NO_RESOURCES); \
+ } else \
+ if (errno == EACCES) { \
+ return (CS_ERR_SECURITY); \
} \
return (CS_ERR_LIBRARY); \
} \
}
#ifdef HAVE_SMALL_MEMORY_FOOTPRINT
#define IPC_REQUEST_SIZE 1024*64
#define IPC_RESPONSE_SIZE 1024*64
#define IPC_DISPATCH_SIZE 1024*64
#else
#define IPC_REQUEST_SIZE 8192*128
#define IPC_RESPONSE_SIZE 8192*128
#define IPC_DISPATCH_SIZE 8192*128
#endif /* HAVE_SMALL_MEMORY_FOOTPRINT */
#endif /* AIS_UTIL_H_DEFINED */
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Wed, Jun 25, 4:38 AM (1 d, 14 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1952167
Default Alt Text
(100 KB)
Attached To
Mode
rC Corosync
Attached
Detach File
Event Timeline
Log In to Comment