diff --git a/exec/coroipcs.c b/exec/coroipcs.c index f60fc4dd..7495f4cf 100644 --- a/exec/coroipcs.c +++ b/exec/coroipcs.c @@ -1,1097 +1,1097 @@ /* * Copyright (c) 2006-2009 Red Hat, Inc. * * All rights reserved. * * Author: Steven Dake (sdake@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #ifndef _GNU_SOURCE #define _GNU_SOURCE 1 #endif #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #if defined(HAVE_GETPEERUCRED) #include #endif #include #include #include #include #include "coroipcs.h" #include #ifndef MSG_NOSIGNAL #define MSG_NOSIGNAL 0 #endif #define SERVER_BACKLOG 5 #define MSG_SEND_LOCKED 0 #define MSG_SEND_UNLOCKED 1 static struct coroipcs_init_state *api; DECLARE_LIST_INIT (conn_info_list_head); struct outq_item { void *msg; size_t mlen; struct list_head list; }; #if defined(_SEM_SEMUN_UNDEFINED) union semun { int val; struct semid_ds *buf; unsigned short int *array; struct seminfo *__buf; }; #endif enum conn_state { CONN_STATE_THREAD_INACTIVE = 0, CONN_STATE_THREAD_ACTIVE = 1, CONN_STATE_THREAD_REQUEST_EXIT = 2, CONN_STATE_THREAD_DESTROYED = 3, CONN_STATE_LIB_EXIT_CALLED = 4, CONN_STATE_DISCONNECT_INACTIVE = 5 }; struct conn_info { int fd; pthread_t thread; pthread_attr_t thread_attr; unsigned int service; enum conn_state state; int notify_flow_control_enabled; int refcount; key_t shmkey; key_t semkey; int shmid; int semid; unsigned int pending_semops; pthread_mutex_t mutex; struct shared_memory *mem; struct list_head outq_head; void *private_data; struct list_head list; char setup_msg[sizeof (mar_req_setup_t)]; unsigned int setup_bytes_read; char *sending_allowed_private_data[64]; }; static int shared_mem_dispatch_bytes_left (struct conn_info *conn_info); static void outq_flush (struct conn_info *conn_info); static int priv_change (struct conn_info *conn_info); static void ipc_disconnect (struct conn_info *conn_info); static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len, int locked); static int memcpy_dwrap (struct conn_info *conn_info, void *msg, int len); static int ipc_thread_active (void *conn) { struct conn_info *conn_info = (struct conn_info *)conn; int retval = 0; pthread_mutex_lock (&conn_info->mutex); if (conn_info->state == CONN_STATE_THREAD_ACTIVE) { retval = 1; } pthread_mutex_unlock (&conn_info->mutex); return (retval); } static int ipc_thread_exiting (void *conn) { struct conn_info *conn_info = (struct conn_info *)conn; int retval = 1; pthread_mutex_lock (&conn_info->mutex); if (conn_info->state == CONN_STATE_THREAD_INACTIVE) { retval = 0; } else if (conn_info->state == CONN_STATE_THREAD_ACTIVE) { retval = 0; } pthread_mutex_unlock (&conn_info->mutex); return (retval); } /* * returns 0 if should be called again, -1 if finished */ static inline int conn_info_destroy (struct conn_info *conn_info) { unsigned int res; void *retval; list_del (&conn_info->list); list_init (&conn_info->list); if (conn_info->state == CONN_STATE_THREAD_REQUEST_EXIT) { res = pthread_join (conn_info->thread, &retval); conn_info->state = CONN_STATE_THREAD_DESTROYED; return (0); } if (conn_info->state == CONN_STATE_THREAD_INACTIVE || conn_info->state == CONN_STATE_DISCONNECT_INACTIVE) { list_del (&conn_info->list); close (conn_info->fd); api->free (conn_info); return (-1); } if (conn_info->state == CONN_STATE_THREAD_ACTIVE) { pthread_kill (conn_info->thread, SIGUSR1); return (0); } api->serialize_lock (); /* * Retry library exit function if busy */ if (conn_info->state == CONN_STATE_THREAD_DESTROYED) { res = api->exit_fn_get (conn_info->service) (conn_info); if (res == -1) { api->serialize_unlock (); return (0); } else { conn_info->state = CONN_STATE_LIB_EXIT_CALLED; } } pthread_mutex_lock (&conn_info->mutex); if (conn_info->refcount > 0) { pthread_mutex_unlock (&conn_info->mutex); api->serialize_unlock (); return (0); } list_del (&conn_info->list); pthread_mutex_unlock (&conn_info->mutex); /* * Destroy shared memory segment and semaphore */ shmdt (conn_info->mem); res = shmctl (conn_info->shmid, IPC_RMID, NULL); semctl (conn_info->semid, 0, IPC_RMID); /* * Free allocated data needed to retry exiting library IPC connection */ if (conn_info->private_data) { api->free (conn_info->private_data); } close (conn_info->fd); api->free (conn_info); api->serialize_unlock (); return (-1); } struct res_overlay { mar_res_header_t header __attribute__((aligned(8))); char buf[4096]; }; static void *pthread_ipc_consumer (void *conn) { struct conn_info *conn_info = (struct conn_info *)conn; struct sembuf sop; int res; mar_req_header_t *header; struct res_overlay res_overlay; int send_ok; if (api->sched_priority != 0) { struct sched_param sched_param; sched_param.sched_priority = api->sched_priority; res = pthread_setschedparam (conn_info->thread, SCHED_RR, &sched_param); } for (;;) { sop.sem_num = 0; sop.sem_op = -1; sop.sem_flg = 0; retry_semop: if (ipc_thread_active (conn_info) == 0) { coroipcs_refcount_dec (conn_info); pthread_exit (0); } res = semop (conn_info->semid, &sop, 1); if ((res == -1) && (errno == EINTR || errno == EAGAIN)) { goto retry_semop; } else if ((res == -1) && (errno == EINVAL || errno == EIDRM)) { coroipcs_refcount_dec (conn_info); pthread_exit (0); } coroipcs_refcount_inc (conn_info); header = (mar_req_header_t *)conn_info->mem->req_buffer; send_ok = api->sending_allowed (conn_info->service, header->id, header, conn_info->sending_allowed_private_data); if (send_ok) { api->serialize_lock(); api->handler_fn_get (conn_info->service, header->id) (conn_info, header); api->serialize_unlock(); } else { /* * Overload, tell library to retry */ res_overlay.header.size = api->response_size_get (conn_info->service, header->id); res_overlay.header.id = api->response_id_get (conn_info->service, header->id); res_overlay.header.error = CS_ERR_TRY_AGAIN; coroipcs_response_send (conn_info, &res_overlay, res_overlay.header.size); } api->sending_allowed_release (conn_info->sending_allowed_private_data); coroipcs_refcount_dec (conn); } pthread_exit (0); } static int req_setup_send ( struct conn_info *conn_info, int error) { mar_res_setup_t res_setup; unsigned int res; res_setup.error = error; retry_send: res = send (conn_info->fd, &res_setup, sizeof (mar_res_setup_t), MSG_WAITALL); if (res == -1 && errno == EINTR) { goto retry_send; } else if (res == -1 && errno == EAGAIN) { goto retry_send; } return (0); } static int req_setup_recv ( struct conn_info *conn_info) { int res; struct msghdr msg_recv; struct iovec iov_recv; #ifdef COROSYNC_LINUX struct cmsghdr *cmsg; char cmsg_cred[CMSG_SPACE (sizeof (struct ucred))]; struct ucred *cred; int off = 0; int on = 1; #endif msg_recv.msg_iov = &iov_recv; msg_recv.msg_iovlen = 1; msg_recv.msg_name = 0; msg_recv.msg_namelen = 0; #ifdef COROSYNC_LINUX msg_recv.msg_control = (void *)cmsg_cred; msg_recv.msg_controllen = sizeof (cmsg_cred); #endif #ifdef PORTABILITY_WORK_TODO #ifdef COROSYNC_SOLARIS msg_recv.msg_flags = 0; uid_t euid; gid_t egid; euid = -1; egid = -1; if (getpeereid(conn_info->fd, &euid, &egid) != -1 && (api->security_valid (euid, egid)) { if (conn_info->state == CONN_IO_STATE_INITIALIZING) { api->log_printf ("Invalid security authentication\n"); return (-1); } } msg_recv.msg_accrights = 0; msg_recv.msg_accrightslen = 0; #else /* COROSYNC_SOLARIS */ #ifdef HAVE_GETPEERUCRED ucred_t *uc; uid_t euid = -1; gid_t egid = -1; if (getpeerucred (conn_info->fd, &uc) == 0) { euid = ucred_geteuid (uc); egid = ucred_getegid (uc); if (api->security_valid (euid, egid) { conn_info->authenticated = 1; } ucred_free(uc); } if (conn_info->authenticated == 0) { api->log_printf ("Invalid security authentication\n"); } #else /* HAVE_GETPEERUCRED */ api->log_printf (LOG_LEVEL_SECURITY, "Connection not authenticated " "because platform does not support " "authentication with sockets, continuing " "with a fake authentication\n"); #endif /* HAVE_GETPEERUCRED */ #endif /* COROSYNC_SOLARIS */ #endif iov_recv.iov_base = &conn_info->setup_msg[conn_info->setup_bytes_read]; iov_recv.iov_len = sizeof (mar_req_setup_t) - conn_info->setup_bytes_read; #ifdef COROSYNC_LINUX setsockopt(conn_info->fd, SOL_SOCKET, SO_PASSCRED, &on, sizeof (on)); #endif retry_recv: res = recvmsg (conn_info->fd, &msg_recv, MSG_NOSIGNAL); if (res == -1 && errno == EINTR) { goto retry_recv; } else if (res == -1 && errno != EAGAIN) { return (0); } else if (res == 0) { #if defined(COROSYNC_SOLARIS) || defined(COROSYNC_BSD) || defined(COROSYNC_DARWIN) /* On many OS poll never return POLLHUP or POLLERR. * EOF is detected when recvmsg return 0. */ ipc_disconnect (conn_info); #endif return (-1); } conn_info->setup_bytes_read += res; #ifdef COROSYNC_LINUX cmsg = CMSG_FIRSTHDR (&msg_recv); assert (cmsg); cred = (struct ucred *)CMSG_DATA (cmsg); if (cred) { if (api->security_valid (cred->uid, cred->gid)) { } else { ipc_disconnect (conn_info); api->log_printf ("Invalid security authentication\n"); return (-1); } } #endif if (conn_info->setup_bytes_read == sizeof (mar_req_setup_t)) { #ifdef COROSYNC_LINUX setsockopt(conn_info->fd, SOL_SOCKET, SO_PASSCRED, &off, sizeof (off)); #endif return (1); } return (0); } static void ipc_disconnect (struct conn_info *conn_info) { if (conn_info->state == CONN_STATE_THREAD_INACTIVE) { conn_info->state = CONN_STATE_DISCONNECT_INACTIVE; return; } if (conn_info->state != CONN_STATE_THREAD_ACTIVE) { return; } pthread_mutex_lock (&conn_info->mutex); conn_info->state = CONN_STATE_THREAD_REQUEST_EXIT; pthread_mutex_unlock (&conn_info->mutex); pthread_kill (conn_info->thread, SIGUSR1); } static int conn_info_create (int fd) { struct conn_info *conn_info; conn_info = api->malloc (sizeof (struct conn_info)); if (conn_info == NULL) { return (-1); } memset (conn_info, 0, sizeof (struct conn_info)); conn_info->fd = fd; conn_info->service = SOCKET_SERVICE_INIT; conn_info->state = CONN_STATE_THREAD_INACTIVE; list_init (&conn_info->outq_head); list_init (&conn_info->list); list_add (&conn_info->list, &conn_info_list_head); api->poll_dispatch_add (fd, conn_info); return (0); } #if defined(COROSYNC_LINUX) || defined(COROSYNC_SOLARIS) /* SUN_LEN is broken for abstract namespace */ #define COROSYNC_SUN_LEN(a) sizeof(*(a)) #else #define COROSYNC_SUN_LEN(a) SUN_LEN(a) #endif /* * Exported functions */ extern void coroipcs_ipc_init ( struct coroipcs_init_state *init_state) { int server_fd; struct sockaddr_un un_addr; int res; api = init_state; /* * Create socket for IPC clients, name socket, listen for connections */ server_fd = socket (PF_UNIX, SOCK_STREAM, 0); if (server_fd == -1) { api->log_printf ("Cannot create client connections socket.\n"); api->fatal_error ("Can't create library listen socket"); }; res = fcntl (server_fd, F_SETFL, O_NONBLOCK); if (res == -1) { api->log_printf ("Could not set non-blocking operation on server socket: %s\n", strerror (errno)); api->fatal_error ("Could not set non-blocking operation on server socket"); } memset (&un_addr, 0, sizeof (struct sockaddr_un)); un_addr.sun_family = AF_UNIX; #if defined(COROSYNC_BSD) || defined(COROSYNC_DARWIN) un_addr.sun_len = sizeof(struct sockaddr_un); #endif #if defined(COROSYNC_LINUX) sprintf (un_addr.sun_path + 1, "%s", api->socket_name); #else sprintf (un_addr.sun_path, "%s/%s", SOCKETDIR, api->socket_name); unlink (un_addr.sun_path); #endif res = bind (server_fd, (struct sockaddr *)&un_addr, COROSYNC_SUN_LEN(&un_addr)); if (res) { api->log_printf ("Could not bind AF_UNIX: %s.\n", strerror (errno)); api->fatal_error ("Could not bind to AF_UNIX socket\n"); } listen (server_fd, SERVER_BACKLOG); /* * Setup connection dispatch routine */ api->poll_accept_add (server_fd); } void coroipcs_ipc_exit (void) { struct list_head *list; struct conn_info *conn_info; for (list = conn_info_list_head.next; list != &conn_info_list_head; list = list->next) { conn_info = list_entry (list, struct conn_info, list); shmdt (conn_info->mem); shmctl (conn_info->shmid, IPC_RMID, NULL); semctl (conn_info->semid, 0, IPC_RMID); pthread_kill (conn_info->thread, SIGUSR1); } } /* * Get the conn info private data */ void *coroipcs_private_data_get (void *conn) { struct conn_info *conn_info = (struct conn_info *)conn; return (conn_info->private_data); } -int coroipcs_response_send (void *conn, const void *msg, int mlen) +int coroipcs_response_send (void *conn, const void *msg, size_t mlen) { struct conn_info *conn_info = (struct conn_info *)conn; struct sembuf sop; int res; memcpy (conn_info->mem->res_buffer, msg, mlen); sop.sem_num = 1; sop.sem_op = 1; sop.sem_flg = 0; retry_semop: res = semop (conn_info->semid, &sop, 1); if ((res == -1) && (errno == EINTR || errno == EAGAIN)) { goto retry_semop; } else if ((res == -1) && (errno == EINVAL || errno == EIDRM)) { return (0); } return (0); } int coroipcs_response_iov_send (void *conn, const struct iovec *iov, unsigned int iov_len) { struct conn_info *conn_info = (struct conn_info *)conn; struct sembuf sop; int res; int write_idx = 0; int i; for (i = 0; i < iov_len; i++) { memcpy (&conn_info->mem->res_buffer[write_idx], iov[i].iov_base, iov[i].iov_len); write_idx += iov[i].iov_len; } sop.sem_num = 1; sop.sem_op = 1; sop.sem_flg = 0; retry_semop: res = semop (conn_info->semid, &sop, 1); if ((res == -1) && (errno == EINTR || errno == EAGAIN)) { goto retry_semop; } else if ((res == -1) && (errno == EINVAL || errno == EIDRM)) { return (0); } return (0); } static int shared_mem_dispatch_bytes_left (struct conn_info *conn_info) { unsigned int read; unsigned int write; unsigned int bytes_left; read = conn_info->mem->read; write = conn_info->mem->write; if (read <= write) { bytes_left = DISPATCH_SIZE - write + read; } else { bytes_left = read - write; } return (bytes_left); } static int memcpy_dwrap (struct conn_info *conn_info, void *msg, int len) { char *dest_char = (char *)conn_info->mem->dispatch_buffer; char *src_char = msg; unsigned int first_write; unsigned int second_write; first_write = len; second_write = 0; if (len + conn_info->mem->write >= DISPATCH_SIZE) { first_write = DISPATCH_SIZE - conn_info->mem->write; second_write = len - first_write; } memcpy (&dest_char[conn_info->mem->write], src_char, first_write); if (second_write) { memcpy (dest_char, &src_char[first_write], second_write); } conn_info->mem->write = (conn_info->mem->write + len) % DISPATCH_SIZE; return (0); } static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len, int locked) { struct conn_info *conn_info = (struct conn_info *)conn; struct sembuf sop; int res; int i; char buf; for (i = 0; i < iov_len; i++) { memcpy_dwrap (conn_info, iov[i].iov_base, iov[i].iov_len); } buf = !list_empty (&conn_info->outq_head); res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL); if (res == -1 && errno == EAGAIN) { if (locked == 0) { pthread_mutex_lock (&conn_info->mutex); } conn_info->pending_semops += 1; if (locked == 0) { pthread_mutex_unlock (&conn_info->mutex); } api->poll_dispatch_modify (conn_info->fd, POLLIN|POLLOUT|POLLNVAL); } else if (res == -1) { ipc_disconnect (conn_info); } sop.sem_num = 2; sop.sem_op = 1; sop.sem_flg = 0; retry_semop: res = semop (conn_info->semid, &sop, 1); if ((res == -1) && (errno == EINTR || errno == EAGAIN)) { goto retry_semop; } else if ((res == -1) && (errno == EINVAL || errno == EIDRM)) { return; } } static void outq_flush (struct conn_info *conn_info) { struct list_head *list, *list_next; struct outq_item *outq_item; unsigned int bytes_left; struct iovec iov; char buf; int res; pthread_mutex_lock (&conn_info->mutex); if (list_empty (&conn_info->outq_head)) { buf = 3; res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL); pthread_mutex_unlock (&conn_info->mutex); return; } for (list = conn_info->outq_head.next; list != &conn_info->outq_head; list = list_next) { list_next = list->next; outq_item = list_entry (list, struct outq_item, list); bytes_left = shared_mem_dispatch_bytes_left (conn_info); if (bytes_left > outq_item->mlen) { iov.iov_base = outq_item->msg; iov.iov_len = outq_item->mlen; msg_send (conn_info, &iov, 1, MSG_SEND_UNLOCKED); list_del (list); api->free (iov.iov_base); api->free (outq_item); } else { break; } } pthread_mutex_unlock (&conn_info->mutex); } static int priv_change (struct conn_info *conn_info) { mar_req_priv_change req_priv_change; unsigned int res; union semun semun; struct semid_ds ipc_set; int i; retry_recv: res = recv (conn_info->fd, &req_priv_change, sizeof (mar_req_priv_change), MSG_NOSIGNAL); if (res == -1 && errno == EINTR) { goto retry_recv; } if (res == -1 && errno == EAGAIN) { goto retry_recv; } if (res == -1 && errno != EAGAIN) { return (-1); } #if defined(COROSYNC_SOLARIS) || defined(COROSYNC_BSD) || defined(COROSYNC_DARWIN) /* Error on socket, EOF is detected when recv return 0 */ if (res == 0) { return (-1); } #endif ipc_set.sem_perm.uid = req_priv_change.euid; ipc_set.sem_perm.gid = req_priv_change.egid; ipc_set.sem_perm.mode = 0600; semun.buf = &ipc_set; for (i = 0; i < 3; i++) { res = semctl (conn_info->semid, 0, IPC_SET, semun); if (res == -1) { return (-1); } } return (0); } static void msg_send_or_queue (void *conn, const struct iovec *iov, unsigned int iov_len) { struct conn_info *conn_info = (struct conn_info *)conn; unsigned int bytes_left; unsigned int bytes_msg = 0; int i; struct outq_item *outq_item; char *write_buf = 0; /* * Exit transmission if the connection is dead */ if (ipc_thread_active (conn) == 0) { return; } bytes_left = shared_mem_dispatch_bytes_left (conn_info); for (i = 0; i < iov_len; i++) { bytes_msg += iov[i].iov_len; } if (bytes_left < bytes_msg || list_empty (&conn_info->outq_head) == 0) { outq_item = api->malloc (sizeof (struct outq_item)); if (outq_item == NULL) { ipc_disconnect (conn); return; } outq_item->msg = api->malloc (bytes_msg); if (outq_item->msg == 0) { api->free (outq_item); ipc_disconnect (conn); return; } write_buf = outq_item->msg; for (i = 0; i < iov_len; i++) { memcpy (write_buf, iov[i].iov_base, iov[i].iov_len); write_buf += iov[i].iov_len; } outq_item->mlen = bytes_msg; list_init (&outq_item->list); pthread_mutex_lock (&conn_info->mutex); if (list_empty (&conn_info->outq_head)) { conn_info->notify_flow_control_enabled = 1; api->poll_dispatch_modify (conn_info->fd, POLLIN|POLLOUT|POLLNVAL); } list_add_tail (&outq_item->list, &conn_info->outq_head); pthread_mutex_unlock (&conn_info->mutex); return; } msg_send (conn, iov, iov_len, MSG_SEND_LOCKED); } void coroipcs_refcount_inc (void *conn) { struct conn_info *conn_info = (struct conn_info *)conn; pthread_mutex_lock (&conn_info->mutex); conn_info->refcount++; pthread_mutex_unlock (&conn_info->mutex); } void coroipcs_refcount_dec (void *conn) { struct conn_info *conn_info = (struct conn_info *)conn; pthread_mutex_lock (&conn_info->mutex); conn_info->refcount--; pthread_mutex_unlock (&conn_info->mutex); } -int coroipcs_dispatch_send (void *conn, const void *msg, int mlen) +int coroipcs_dispatch_send (void *conn, const void *msg, size_t mlen) { struct iovec iov; iov.iov_base = msg; iov.iov_len = mlen; msg_send_or_queue (conn, &iov, 1); return (0); } int coroipcs_dispatch_iov_send (void *conn, const struct iovec *iov, unsigned int iov_len) { msg_send_or_queue (conn, iov, iov_len); return (0); } int coroipcs_handler_accept ( int fd, int revent, void *data) { socklen_t addrlen; struct sockaddr_un un_addr; int new_fd; #ifdef COROSYNC_LINUX int on = 1; #endif int res; addrlen = sizeof (struct sockaddr_un); retry_accept: new_fd = accept (fd, (struct sockaddr *)&un_addr, &addrlen); if (new_fd == -1 && errno == EINTR) { goto retry_accept; } if (new_fd == -1) { api->log_printf ("Could not accept Library connection: %s\n", strerror (errno)); return (0); /* This is an error, but -1 would indicate disconnect from poll loop */ } res = fcntl (new_fd, F_SETFL, O_NONBLOCK); if (res == -1) { api->log_printf ("Could not set non-blocking operation on library connection: %s\n", strerror (errno)); close (new_fd); return (0); /* This is an error, but -1 would indicate disconnect from poll loop */ } /* * Valid accept */ /* * Request credentials of sender provided by kernel */ #ifdef COROSYNC_LINUX setsockopt(new_fd, SOL_SOCKET, SO_PASSCRED, &on, sizeof (on)); #endif res = conn_info_create (new_fd); if (res != 0) { close (new_fd); } return (0); } int coroipcs_handler_dispatch ( int fd, int revent, void *context) { mar_req_setup_t *req_setup; struct conn_info *conn_info = (struct conn_info *)context; int res; char buf; if (ipc_thread_exiting (conn_info)) { return conn_info_destroy (conn_info); } /* * If an error occurs, request exit */ if (revent & (POLLERR|POLLHUP)) { ipc_disconnect (conn_info); return (0); } /* * Read the header and process it */ if (conn_info->service == SOCKET_SERVICE_INIT && (revent & POLLIN)) { /* * Receive in a nonblocking fashion the request * IF security invalid, send TRY_AGAIN, otherwise * send OK */ res = req_setup_recv (conn_info); if (res == -1) { req_setup_send (conn_info, CS_ERR_TRY_AGAIN); } if (res != 1) { return (0); } req_setup_send (conn_info, CS_OK); pthread_mutex_init (&conn_info->mutex, NULL); req_setup = (mar_req_setup_t *)conn_info->setup_msg; /* * Is the service registered ? */ if (api->service_available (req_setup->service) == 0) { ipc_disconnect (conn_info); return (0); } conn_info->shmkey = req_setup->shmkey; conn_info->semkey = req_setup->semkey; conn_info->service = req_setup->service; conn_info->refcount = 0; conn_info->notify_flow_control_enabled = 0; conn_info->setup_bytes_read = 0; conn_info->shmid = shmget (conn_info->shmkey, sizeof (struct shared_memory), 0600); conn_info->mem = shmat (conn_info->shmid, NULL, 0); conn_info->semid = semget (conn_info->semkey, 3, 0600); conn_info->pending_semops = 0; /* * ipc thread is the only reference at startup */ conn_info->refcount = 1; conn_info->state = CONN_STATE_THREAD_ACTIVE; conn_info->private_data = api->malloc (api->private_data_size_get (conn_info->service)); memset (conn_info->private_data, 0, api->private_data_size_get (conn_info->service)); api->init_fn_get (conn_info->service) (conn_info); pthread_attr_init (&conn_info->thread_attr); /* * IA64 needs more stack space then other arches */ #if defined(__ia64__) pthread_attr_setstacksize (&conn_info->thread_attr, 400000); #else pthread_attr_setstacksize (&conn_info->thread_attr, 200000); #endif pthread_attr_setdetachstate (&conn_info->thread_attr, PTHREAD_CREATE_JOINABLE); res = pthread_create (&conn_info->thread, &conn_info->thread_attr, pthread_ipc_consumer, conn_info); /* * Security check - disallow multiple configurations of * the ipc connection */ if (conn_info->service == SOCKET_SERVICE_INIT) { conn_info->service = -1; } } else if (revent & POLLIN) { coroipcs_refcount_inc (conn_info); res = recv (fd, &buf, 1, MSG_NOSIGNAL); if (res == 1) { switch (buf) { case MESSAGE_REQ_OUTQ_FLUSH: outq_flush (conn_info); break; case MESSAGE_REQ_CHANGE_EUID: if (priv_change (conn_info) == -1) { ipc_disconnect (conn_info); } break; default: res = 0; break; } coroipcs_refcount_dec (conn_info); } #if defined(COROSYNC_SOLARIS) || defined(COROSYNC_BSD) || defined(COROSYNC_DARWIN) /* On many OS poll never return POLLHUP or POLLERR. * EOF is detected when recvmsg return 0. */ if (res == 0) { ipc_disconnect (conn_info); return (0); } #endif } coroipcs_refcount_inc (conn_info); pthread_mutex_lock (&conn_info->mutex); if ((conn_info->state == CONN_STATE_THREAD_ACTIVE) && (revent & POLLOUT)) { buf = !list_empty (&conn_info->outq_head); for (; conn_info->pending_semops;) { res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL); if (res == 1) { conn_info->pending_semops--; } else { break; } } if (conn_info->notify_flow_control_enabled) { buf = 2; res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL); if (res == 1) { conn_info->notify_flow_control_enabled = 0; } } if (conn_info->notify_flow_control_enabled == 0 && conn_info->pending_semops == 0) { api->poll_dispatch_modify (conn_info->fd, POLLIN|POLLNVAL); } } pthread_mutex_unlock (&conn_info->mutex); coroipcs_refcount_dec (conn_info); return (0); } diff --git a/exec/coroipcs.h b/exec/coroipcs.h index c59870d3..60b63d26 100644 --- a/exec/coroipcs.h +++ b/exec/coroipcs.h @@ -1,100 +1,100 @@ /* * Copyright (c) 2006-2009 Red Hat, Inc. * * All rights reserved. * * Author: Steven Dake (sdake@redhat.com) * * This software licensed under BSD license, the text of which follows: - * + * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #ifndef COROIPCS_H_DEFINED #define COROIPCS_H_DEFINED #include #define SOCKET_SERVICE_INIT 0xFFFFFFFF struct iovec; typedef int (*coroipcs_init_fn_lvalue) (void *conn); typedef int (*coroipcs_exit_fn_lvalue) (void *conn); typedef void (*coroipcs_handler_fn_lvalue) (void *conn, void *msg); struct coroipcs_init_state { const char *socket_name; int sched_priority; void *(*malloc) (size_t size); void (*free) (void *ptr); void (*log_printf) ( const char *format, ...) __attribute__((format(printf, 1, 2))); int (*service_available)(unsigned int service); int (*private_data_size_get)(unsigned int service); int (*security_valid)(int uid, int gid); void (*serialize_lock)(void); void (*serialize_unlock)(void); int (*response_size_get)(unsigned int service, unsigned int id); int (*response_id_get)(unsigned int service, unsigned int id); int (*sending_allowed)(unsigned int service, unsigned int id, void *msg, void *sending_allowed_private_data); void (*sending_allowed_release)(void *sending_allowed_private_data); void (*poll_accept_add)(int fd); void (*poll_dispatch_add)(int fd, void *context); void (*poll_dispatch_modify)(int fd, int events); void (*fatal_error)(const char *error_msg); coroipcs_init_fn_lvalue (*init_fn_get)(unsigned int service); coroipcs_exit_fn_lvalue (*exit_fn_get)(unsigned int service); coroipcs_handler_fn_lvalue (*handler_fn_get)(unsigned int service, unsigned int id); }; extern void coroipcs_ipc_init ( struct coroipcs_init_state *init_state); extern void *coroipcs_private_data_get (void *conn); -extern int coroipcs_response_send (void *conn, const void *msg, int mlen); +extern int coroipcs_response_send (void *conn, const void *msg, size_t mlen); extern int coroipcs_response_iov_send (void *conn, const struct iovec *iov, unsigned int iov_len); -extern int coroipcs_dispatch_send (void *conn, const void *msg, int mlen); +extern int coroipcs_dispatch_send (void *conn, const void *msg, size_t mlen); extern int coroipcs_dispatch_iov_send (void *conn, const struct iovec *iov, unsigned int iov_len); extern void coroipcs_refcount_inc (void *conn); extern void coroipcs_refcount_dec (void *conn); extern void coroipcs_ipc_exit (void); extern int coroipcs_handler_accept (int fd, int revent, void *context); extern int coroipcs_handler_dispatch (int fd, int revent, void *context); #endif /* COROIPCS_H_DEFINED */ diff --git a/include/corosync/cpg.h b/include/corosync/cpg.h index 49299ffd..6e849d39 100644 --- a/include/corosync/cpg.h +++ b/include/corosync/cpg.h @@ -1,201 +1,201 @@ /* * Copyright (c) 2006-2009 Red Hat, Inc. * * All rights reserved. * * Author: Christine Caulfield (ccaulfi@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #ifndef COROSYNC_CPG_H_DEFINED #define COROSYNC_CPG_H_DEFINED #include #include /** * @addtogroup cpg_corosync * * @{ */ typedef uint64_t cpg_handle_t; typedef enum { CPG_TYPE_UNORDERED, /* not implemented */ CPG_TYPE_FIFO, /* same as agreed */ CPG_TYPE_AGREED, CPG_TYPE_SAFE /* not implemented */ } cpg_guarantee_t; typedef enum { CPG_FLOW_CONTROL_DISABLED, /* flow control is disabled - new messages may be sent */ CPG_FLOW_CONTROL_ENABLED /* flow control is enabled - new messages should not be sent */ } cpg_flow_control_state_t; typedef enum { CPG_REASON_JOIN = 1, CPG_REASON_LEAVE = 2, CPG_REASON_NODEDOWN = 3, CPG_REASON_NODEUP = 4, CPG_REASON_PROCDOWN = 5 } cpg_reason_t; struct cpg_address { uint32_t nodeid; uint32_t pid; uint32_t reason; }; #define CPG_MAX_NAME_LENGTH 128 struct cpg_name { uint32_t length; char value[CPG_MAX_NAME_LENGTH]; }; #define CPG_MEMBERS_MAX 128 typedef void (*cpg_deliver_fn_t) ( cpg_handle_t handle, - struct cpg_name *group_name, + const struct cpg_name *group_name, uint32_t nodeid, uint32_t pid, const void *msg, size_t msg_len); typedef void (*cpg_confchg_fn_t) ( cpg_handle_t handle, - struct cpg_name *group_name, + const struct cpg_name *group_name, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries); typedef void (*cpg_groups_get_fn_t) ( cpg_handle_t handle, uint32_t group_num, uint32_t group_total, const struct cpg_name *group_name, const struct cpg_address *member_list, size_t member_list_entries); typedef struct { cpg_deliver_fn_t cpg_deliver_fn; cpg_confchg_fn_t cpg_confchg_fn; cpg_groups_get_fn_t cpg_groups_get_fn; } cpg_callbacks_t; /** @} */ /* * Create a new cpg connection */ cs_error_t cpg_initialize ( cpg_handle_t *handle, cpg_callbacks_t *callbacks); /* * Close the cpg handle */ cs_error_t cpg_finalize ( cpg_handle_t handle); /* * Get a file descriptor on which to poll. cpg_handle_t is NOT a * file descriptor and may not be used directly. */ cs_error_t cpg_fd_get ( cpg_handle_t handle, int *fd); /* * Get and set contexts for a CPG handle */ cs_error_t cpg_context_get ( cpg_handle_t handle, void **context); cs_error_t cpg_context_set ( cpg_handle_t handle, void *context); /* * Dispatch messages and configuration changes */ cs_error_t cpg_dispatch ( cpg_handle_t handle, cs_dispatch_flags_t dispatch_types); /* * Join one or more groups. * messages multicasted with cpg_mcast_joined will be sent to every * group that has been joined on handle handle. Any message multicasted * to a group that has been previously joined will be delivered in cpg_dispatch */ cs_error_t cpg_join ( cpg_handle_t handle, struct cpg_name *group); /* * Leave one or more groups */ cs_error_t cpg_leave ( cpg_handle_t handle, struct cpg_name *group); /* * Multicast to groups joined with cpg_join. * The iovec described by iovec will be multicasted to all groups joined with * the cpg_join interface for handle. */ cs_error_t cpg_mcast_joined ( cpg_handle_t handle, cpg_guarantee_t guarantee, struct iovec *iovec, unsigned int iov_len); /* * Get membership information from cpg */ cs_error_t cpg_membership_get ( cpg_handle_t handle, struct cpg_name *groupName, struct cpg_address *member_list, int *member_list_entries); cs_error_t cpg_local_get ( cpg_handle_t handle, unsigned int *local_nodeid); cs_error_t cpg_groups_get ( cpg_handle_t handle, unsigned int *num_groups); cs_error_t cpg_flow_control_state_get ( cpg_handle_t handle, cpg_flow_control_state_t *flow_control_enabled); #endif /* COROSYNC_CPG_H_DEFINED */ diff --git a/test/cpgbench.c b/test/cpgbench.c index 23f15d74..aff3acf2 100644 --- a/test/cpgbench.c +++ b/test/cpgbench.c @@ -1,190 +1,190 @@ #include /* * Copyright (c) 2006, 2009 Red Hat, Inc. * * All rights reserved. * * Author: Steven Dake (sdake@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #ifdef COROSYNC_SOLARIS #define timersub(a, b, result) \ do { \ (result)->tv_sec = (a)->tv_sec - (b)->tv_sec; \ (result)->tv_usec = (a)->tv_usec - (b)->tv_usec; \ if ((result)->tv_usec < 0) { \ --(result)->tv_sec; \ (result)->tv_usec += 1000000; \ } \ } while (0) #endif static int alarm_notice; static void cpg_bm_confchg_fn ( cpg_handle_t handle, - struct cpg_name *group_name, - struct cpg_address *member_list, int member_list_entries, - struct cpg_address *left_list, int left_list_entries, - struct cpg_address *joined_list, int joined_list_entries) + const struct cpg_name *group_name, + const struct cpg_address *member_list, size_t member_list_entries, + const struct cpg_address *left_list, size_t left_list_entries, + const struct cpg_address *joined_list, size_t joined_list_entries) { } static unsigned int write_count; static void cpg_bm_deliver_fn ( cpg_handle_t handle, - struct cpg_name *group_name, + const struct cpg_name *group_name, uint32_t nodeid, uint32_t pid, - void *msg, - int msg_len) + const void *msg, + size_t msg_len) { write_count++; } static cpg_callbacks_t callbacks = { .cpg_deliver_fn = cpg_bm_deliver_fn, .cpg_confchg_fn = cpg_bm_confchg_fn }; static char data[500000]; static void cpg_benchmark ( cpg_handle_t handle, int write_size) { struct timeval tv1, tv2, tv_elapsed; struct iovec iov; unsigned int res; cpg_flow_control_state_t flow_control_state; alarm_notice = 0; iov.iov_base = data; iov.iov_len = write_size; write_count = 0; alarm (10); gettimeofday (&tv1, NULL); do { /* * Test checkpoint write */ cpg_flow_control_state_get (handle, &flow_control_state); if (flow_control_state == CPG_FLOW_CONTROL_DISABLED) { retry: res = cpg_mcast_joined (handle, CPG_TYPE_AGREED, &iov, 1); if (res == CS_ERR_TRY_AGAIN) { goto retry; } } res = cpg_dispatch (handle, CS_DISPATCH_ALL); if (res != CS_OK) { printf ("cpg dispatch returned error %d\n", res); exit (1); } } while (alarm_notice == 0); gettimeofday (&tv2, NULL); timersub (&tv2, &tv1, &tv_elapsed); printf ("%5d messages received ", write_count); printf ("%5d bytes per write ", write_size); printf ("%7.3f Seconds runtime ", (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0))); printf ("%9.3f TP/s ", ((float)write_count) / (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0))); printf ("%7.3f MB/s.\n", ((float)write_count) * ((float)write_size) / ((tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)) * 1000000.0)); } static void sigalrm_handler (int num) { alarm_notice = 1; } static struct cpg_name group_name = { .value = "cpg_bm", .length = 6 }; int main (void) { cpg_handle_t handle; unsigned int size; int i; unsigned int res; size = 1000; signal (SIGALRM, sigalrm_handler); res = cpg_initialize (&handle, &callbacks); if (res != CS_OK) { printf ("cpg_initialize failed with result %d\n", res); exit (1); } res = cpg_join (handle, &group_name); if (res != CS_OK) { printf ("cpg_join failed with result %d\n", res); exit (1); } for (i = 0; i < 50; i++) { /* number of repetitions - up to 50k */ cpg_benchmark (handle, size); size += 1000; } res = cpg_finalize (handle); if (res != CS_OK) { printf ("cpg_join failed with result %d\n", res); exit (1); } return (0); } diff --git a/test/testcpg.c b/test/testcpg.c index 2ab68c94..8e86248e 100644 --- a/test/testcpg.c +++ b/test/testcpg.c @@ -1,235 +1,239 @@ /* * Copyright (c) 2006-2009 Red Hat Inc * * All rights reserved. * * Author: Christine Caulfield * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include static int quit = 0; static int show_ip = 0; -static void print_cpgname (struct cpg_name *name) +static void print_cpgname (const struct cpg_name *name) { int i; for (i = 0; i < name->length; i++) { printf ("%c", name->value[i]); } } static void DeliverCallback ( cpg_handle_t handle, - struct cpg_name *groupName, + const struct cpg_name *groupName, uint32_t nodeid, uint32_t pid, - void *msg, - int msg_len) + const void *msg, + size_t msg_len) { if (show_ip) { struct in_addr saddr; saddr.s_addr = nodeid; - printf("DeliverCallback: message (len=%d)from node/pid %s/%d: '%s'\n", - msg_len, inet_ntoa(saddr), pid, (char *)msg); + printf("DeliverCallback: message (len=%lu)from node/pid %s/%d: '%s'\n", + (unsigned long int) msg_len, + inet_ntoa(saddr), pid, (const char *)msg); } else { - printf("DeliverCallback: message (len=%d)from node/pid %d/%d: '%s'\n", msg_len, nodeid, pid, (char *)msg); + printf("DeliverCallback: message (len=%lu)from node/pid %d/%d: '%s'\n", + (unsigned long int) msg_len, nodeid, pid, + (const char *)msg); } } static void ConfchgCallback ( cpg_handle_t handle, - struct cpg_name *groupName, - struct cpg_address *member_list, int member_list_entries, - struct cpg_address *left_list, int left_list_entries, - struct cpg_address *joined_list, int joined_list_entries) + const struct cpg_name *groupName, + const struct cpg_address *member_list, size_t member_list_entries, + const struct cpg_address *left_list, size_t left_list_entries, + const struct cpg_address *joined_list, size_t joined_list_entries) { int i; struct in_addr saddr; printf("\nConfchgCallback: group '"); print_cpgname(groupName); printf("'\n"); for (i=0; i optind) { strcpy(group_name.value, argv[optind]); group_name.length = strlen(argv[optind])+1; } else { strcpy(group_name.value, "GROUP"); group_name.length = 6; } result = cpg_initialize (&handle, &callbacks); if (result != CS_OK) { printf ("Could not initialize Cluster Process Group API instance error %d\n", result); exit (1); } result = cpg_local_get (handle, &nodeid); if (result != CS_OK) { printf ("Could not get local node id\n"); exit (1); } printf ("Local node id is %x\n", nodeid); result = cpg_join(handle, &group_name); if (result != CS_OK) { printf ("Could not join process group, error %d\n", result); exit (1); } FD_ZERO (&read_fds); cpg_fd_get(handle, &select_fd); printf ("Type EXIT to finish\n"); do { FD_SET (select_fd, &read_fds); FD_SET (STDIN_FILENO, &read_fds); result = select (select_fd + 1, &read_fds, 0, 0, 0); if (result == -1) { perror ("select\n"); } if (FD_ISSET (STDIN_FILENO, &read_fds)) { char inbuf[132]; struct iovec iov; fgets(inbuf, sizeof(inbuf), stdin); if (strncmp(inbuf, "EXIT", 4) == 0) { cpg_leave(handle, &group_name); } else { iov.iov_base = inbuf; iov.iov_len = strlen(inbuf)+1; cpg_mcast_joined(handle, CPG_TYPE_AGREED, &iov, 1); } } if (FD_ISSET (select_fd, &read_fds)) { if (cpg_dispatch (handle, CS_DISPATCH_ALL) != CS_OK) exit(1); } } while (result && !quit); result = cpg_finalize (handle); printf ("Finalize result is %d (should be 1)\n", result); return (0); } diff --git a/test/testcpg2.c b/test/testcpg2.c index ee0336ff..2e0ac6c1 100644 --- a/test/testcpg2.c +++ b/test/testcpg2.c @@ -1,90 +1,90 @@ /* * Copyright (c) 2007, 2009 Red Hat, Inc. * * All rights reserved. * * Author: Alan Conway * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include static void deliver( cpg_handle_t handle, - struct cpg_name *group_name, + const struct cpg_name *group_name, uint32_t nodeid, uint32_t pid, - void *msg, - int msg_len) + const void *msg, + size_t msg_len) { printf("self delivered nodeid: %x\n", nodeid); } static void confch( cpg_handle_t handle, - struct cpg_name *group_name, - struct cpg_address *member_list, int member_list_entries, - struct cpg_address *left_list, int left_list_entries, - struct cpg_address *joined_list, int joined_list_entries) + const struct cpg_name *group_name, + const struct cpg_address *member_list, size_t member_list_entries, + const struct cpg_address *left_list, size_t left_list_entries, + const struct cpg_address *joined_list, size_t joined_list_entries) { printf("confchg nodeid %x\n", member_list[0].nodeid); } int main(int argc, char** argv) { cpg_handle_t handle=0; cpg_callbacks_t cb={&deliver,&confch}; unsigned int nodeid=0; struct cpg_name group={3,"foo"}; struct pollfd pfd; int fd; printf ("All of the nodeids should match on a single node configuration\n for the test to pass."); assert(CS_OK==cpg_initialize(&handle, &cb)); assert(CS_OK==cpg_local_get(handle,&nodeid)); printf("local_get: %x\n", nodeid); assert(CS_OK==cpg_join(handle, &group)); struct iovec msg={(void *)"hello", 5}; /* discard const */ assert(CS_OK==cpg_mcast_joined(handle,CPG_TYPE_AGREED,&msg,1)); cpg_fd_get (handle, &fd); pfd.fd = fd; pfd.events = POLLIN; poll (&pfd, 1, 1000); cpg_dispatch(handle, CS_DISPATCH_ALL); return (0); }