Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F2824799
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
31 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/include/qb/qbipcc.h b/include/qb/qbipcc.h
index f873b59..39fa5a1 100644
--- a/include/qb/qbipcc.h
+++ b/include/qb/qbipcc.h
@@ -1,124 +1,130 @@
/*
* Copyright (c) 2002-2003 MontaVista Software, Inc.
* Copyright (c) 2006-2007, 2009 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Steven Dake (sdake@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef QB_IPCC_H_DEFINED
#define QB_IPCC_H_DEFINED
#include <pthread.h>
#include <sys/poll.h>
#include <sys/socket.h>
#include <qb/qbhdb.h>
#include <qb/qbipc_common.h>
#ifdef __cplusplus
extern "C" {
#endif
extern int32_t
qb_ipcc_service_connect (
const char *socket_name,
unsigned int service,
size_t request_size,
size_t respnse__size,
size_t dispatch_size,
qb_hdb_handle_t *handle);
extern int32_t
qb_ipcc_service_disconnect (
qb_hdb_handle_t handle);
extern int32_t
qb_ipcc_fd_get (
qb_hdb_handle_t handle,
int *fd);
extern int32_t
qb_ipcc_dispatch_get (
qb_hdb_handle_t handle,
void **buf,
int timeout);
extern int32_t
qb_ipcc_dispatch_put (
qb_hdb_handle_t handle);
extern int32_t
qb_ipcc_dispatch_flow_control_get (
qb_hdb_handle_t handle,
unsigned int *flow_control_state);
+extern int32_t
+qb_ipcc_msg_send (
+ qb_hdb_handle_t handle,
+ const struct iovec *iov,
+ unsigned int iov_len);
+
extern int32_t
qb_ipcc_msg_send_reply_receive (
qb_hdb_handle_t handle,
const struct iovec *iov,
unsigned int iov_len,
void *res_msg,
size_t res_len);
extern int32_t
qb_ipcc_msg_send_reply_receive_in_buf_get (
qb_hdb_handle_t handle,
const struct iovec *iov,
unsigned int iov_len,
void **res_msg);
extern int32_t
qb_ipcc_msg_send_reply_receive_in_buf_put (
qb_hdb_handle_t handle);
extern int32_t
qb_ipcc_zcb_alloc (
qb_hdb_handle_t handle,
void **buffer,
size_t size,
size_t header_size);
extern int32_t
qb_ipcc_zcb_free (
qb_hdb_handle_t handle,
void *buffer);
extern int32_t
qb_ipcc_zcb_msg_send_reply_receive (
qb_hdb_handle_t handle,
void *msg,
void *res_msg,
size_t res_len);
#ifdef __cplusplus
}
#endif
#endif /* QB_IPCC_H_DEFINED */
diff --git a/lib/ipcc.c b/lib/ipcc.c
index a21f3f0..bb79673 100644
--- a/lib/ipcc.c
+++ b/lib/ipcc.c
@@ -1,1203 +1,1231 @@
/*
* vi: set autoindent tabstop=4 shiftwidth=4 :
*
* Copyright (c) 2002-2006 MontaVista Software, Inc.
* Copyright (c) 2006-2009 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Steven Dake (sdake@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <config.h>
#include "os_base.h"
#include <sys/shm.h>
#include <sys/mman.h>
#include <qb/qbipcc.h>
#include <qb/qbhdb.h>
#include "ipc_int.h"
#if _POSIX_THREAD_PROCESS_SHARED > 0
#include <semaphore.h>
#else
#include <sys/sem.h>
#endif
/*
* Define sem_wait timeout (real timeout will be (n-1;n) )
*/
#define IPC_SEMWAIT_TIMEOUT 2
struct ipc_instance {
int fd;
#if _POSIX_THREAD_PROCESS_SHARED < 1
int semid;
#endif
int flow_control_state;
struct control_buffer *control_buffer;
char *request_buffer;
char *response_buffer;
char *dispatch_buffer;
size_t control_size;
size_t request_size;
size_t response_size;
size_t dispatch_size;
uid_t euid;
pthread_mutex_t mutex;
};
void ipc_hdb_destructor (void *context);
DECLARE_HDB_DATABASE(ipc_hdb,ipc_hdb_destructor);
#if defined(QB_LINUX) || defined(QB_SOLARIS)
#define QB_SUN_LEN(a) sizeof(*(a))
#else
#define QB_SUN_LEN(a) SUN_LEN(a)
#endif
#ifdef SO_NOSIGPIPE
static void socket_nosigpipe(int s)
{
int on = 1;
setsockopt(s, SOL_SOCKET, SO_NOSIGPIPE, (void *)&on, sizeof(on));
}
#endif
#ifndef MSG_NOSIGNAL
#define MSG_NOSIGNAL 0
#endif
static int32_t
socket_send (
int s,
void *msg,
size_t len)
{
int32_t res = 0;
int result;
struct msghdr msg_send;
struct iovec iov_send;
char *rbuf = msg;
int processed = 0;
msg_send.msg_iov = &iov_send;
msg_send.msg_iovlen = 1;
msg_send.msg_name = 0;
msg_send.msg_namelen = 0;
#if !defined(QB_SOLARIS)
msg_send.msg_control = 0;
msg_send.msg_controllen = 0;
msg_send.msg_flags = 0;
#else
msg_send.msg_accrights = NULL;
msg_send.msg_accrightslen = 0;
#endif
retry_send:
iov_send.iov_base = &rbuf[processed];
iov_send.iov_len = len - processed;
result = sendmsg (s, &msg_send, MSG_NOSIGNAL);
if (result == -1) {
switch (errno) {
case EINTR:
res = EAGAIN;
goto res_exit;
case EAGAIN:
goto retry_send;
break;
default:
res = EBADE;
goto res_exit;
}
}
processed += result;
if (processed != len) {
goto retry_send;
}
return 0;
res_exit:
return (res);
}
static int32_t
socket_recv (
int s,
void *msg,
size_t len)
{
int32_t res = 0;
int result;
struct msghdr msg_recv;
struct iovec iov_recv;
char *rbuf = msg;
int processed = 0;
msg_recv.msg_iov = &iov_recv;
msg_recv.msg_iovlen = 1;
msg_recv.msg_name = 0;
msg_recv.msg_namelen = 0;
#if !defined (QB_SOLARIS)
msg_recv.msg_control = 0;
msg_recv.msg_controllen = 0;
msg_recv.msg_flags = 0;
#else
msg_recv.msg_accrights = NULL;
msg_recv.msg_accrightslen = 0;
#endif
retry_recv:
iov_recv.iov_base = (void *)&rbuf[processed];
iov_recv.iov_len = len - processed;
result = recvmsg (s, &msg_recv, MSG_NOSIGNAL|MSG_WAITALL);
if (result == -1) {
switch (errno) {
case EINTR:
res = EAGAIN;
goto res_exit;
case EAGAIN:
goto retry_recv;
break;
default:
res = EBADE;
goto res_exit;
}
}
#if defined(QB_SOLARIS) || defined(QB_BSD) || defined(QB_DARWIN)
/* On many OS poll never return POLLHUP or POLLERR.
* EOF is detected when recvmsg return 0.
*/
if (result == 0) {
res = EBADE;
goto res_exit;
}
#endif
processed += result;
if (processed != len) {
goto retry_recv;
}
assert (processed == len);
res_exit:
return (res);
}
#if _POSIX_THREAD_PROCESS_SHARED < 1
static int
priv_change_send (struct ipc_instance *ipc_instance)
{
char buf_req;
mar_req_priv_change req_priv_change;
unsigned int res;
req_priv_change.euid = geteuid();
/*
* Don't resend request unless euid has changed
*/
if (ipc_instance->euid == req_priv_change.euid) {
return (0);
}
req_priv_change.egid = getegid();
buf_req = MESSAGE_REQ_CHANGE_EUID;
res = socket_send (ipc_instance->fd, &buf_req, 1);
if (res == -1) {
return (-1);
}
res = socket_send (ipc_instance->fd, &req_priv_change,
sizeof (req_priv_change));
if (res == -1) {
return (-1);
}
ipc_instance->euid = req_priv_change.euid;
return (0);
}
#if defined(_SEM_SEMUN_UNDEFINED)
union semun {
int val;
struct semid_ds *buf;
unsigned short int *array;
struct seminfo *__buf;
};
#endif
#endif
static int
circular_memory_map (char *path, const char *file, void **buf, size_t bytes)
{
int fd;
void *addr_orig;
void *addr;
int res;
sprintf (path, "/dev/shm/%s", file);
fd = mkstemp (path);
if (fd == -1) {
sprintf (path, LOCALSTATEDIR "/run/%s", file);
fd = mkstemp (path);
if (fd == -1) {
return (-1);
}
}
res = ftruncate (fd, bytes);
addr_orig = mmap (NULL, bytes << 1, PROT_NONE,
MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
if (addr_orig == MAP_FAILED) {
return (-1);
}
addr = mmap (addr_orig, bytes, PROT_READ | PROT_WRITE,
MAP_FIXED | MAP_SHARED, fd, 0);
if (addr != addr_orig) {
return (-1);
}
#ifdef QB_BSD
madvise(addr_orig, bytes, MADV_NOSYNC);
#endif
addr = mmap (((char *)addr_orig) + bytes,
bytes, PROT_READ | PROT_WRITE,
MAP_FIXED | MAP_SHARED, fd, 0);
#ifdef QB_BSD
madvise(((char *)addr_orig) + bytes, bytes, MADV_NOSYNC);
#endif
res = close (fd);
if (res) {
return (-1);
}
*buf = addr_orig;
return (0);
}
static void
memory_unmap (void *addr, size_t bytes)
{
int res;
res = munmap (addr, bytes);
}
void ipc_hdb_destructor (void *context ) {
struct ipc_instance *ipc_instance = (struct ipc_instance *)context;
/*
* << 1 (or multiplied by 2) because this is a wrapped memory buffer
*/
memory_unmap (ipc_instance->control_buffer, ipc_instance->control_size);
memory_unmap (ipc_instance->request_buffer, ipc_instance->request_size);
memory_unmap (ipc_instance->response_buffer, ipc_instance->response_size);
memory_unmap (ipc_instance->dispatch_buffer, (ipc_instance->dispatch_size) << 1);
}
static int
memory_map (char *path, const char *file, void **buf, size_t bytes)
{
int fd;
void *addr_orig;
void *addr;
int res;
sprintf (path, "/dev/shm/%s", file);
fd = mkstemp (path);
if (fd == -1) {
sprintf (path, LOCALSTATEDIR "/run/%s", file);
fd = mkstemp (path);
if (fd == -1) {
return (-1);
}
}
res = ftruncate (fd, bytes);
addr_orig = mmap (NULL, bytes, PROT_NONE,
MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
if (addr_orig == MAP_FAILED) {
return (-1);
}
addr = mmap (addr_orig, bytes, PROT_READ | PROT_WRITE,
MAP_FIXED | MAP_SHARED, fd, 0);
if (addr != addr_orig) {
return (-1);
}
#ifdef QB_BSD
madvise(addr_orig, bytes, MADV_NOSYNC);
#endif
res = close (fd);
if (res) {
return (-1);
}
*buf = addr_orig;
return (0);
}
static int32_t
msg_send (
struct ipc_instance *ipc_instance,
const struct iovec *iov,
unsigned int iov_len)
{
#if _POSIX_THREAD_PROCESS_SHARED < 1
struct sembuf sop;
#endif
int i;
int res;
int req_buffer_idx = 0;
for (i = 0; i < iov_len; i++) {
if ((req_buffer_idx + iov[i].iov_len) >
ipc_instance->request_size) {
return (EINVAL);
}
memcpy (&ipc_instance->request_buffer[req_buffer_idx],
iov[i].iov_base,
iov[i].iov_len);
req_buffer_idx += iov[i].iov_len;
}
#if _POSIX_THREAD_PROCESS_SHARED > 0
res = sem_post (&ipc_instance->control_buffer->sem0);
if (res == -1) {
return EBADE;
}
#else
/*
* Signal semaphore #0 indicting a new message from client
* to server request queue
*/
sop.sem_num = 0;
sop.sem_op = 1;
sop.sem_flg = 0;
retry_semop:
res = semop (ipc_instance->semid, &sop, 1);
if (res == -1 && errno == EINTR) {
return (EAGAIN);
} else
if (res == -1 && errno == EACCES) {
priv_change_send (ipc_instance);
goto retry_semop;
} else
if (res == -1) {
return (EBADE);
}
#endif
return 0;
}
static int32_t
reply_receive (
struct ipc_instance *ipc_instance,
void *res_msg,
size_t res_len)
{
#if _POSIX_THREAD_PROCESS_SHARED < 1
struct sembuf sop;
#else
struct timespec timeout;
struct pollfd pfd;
#endif
qb_ipc_response_header_t *response_header;
int res;
#if _POSIX_THREAD_PROCESS_SHARED > 0
retry_semwait:
timeout.tv_sec = time(NULL) + IPC_SEMWAIT_TIMEOUT;
timeout.tv_nsec = 0;
res = sem_timedwait (&ipc_instance->control_buffer->sem1, &timeout);
if (res == -1 && errno == ETIMEDOUT) {
pfd.fd = ipc_instance->fd;
pfd.events = 0;
poll (&pfd, 1, 0);
if (pfd.revents == POLLERR || pfd.revents == POLLHUP) {
return EBADE;
}
goto retry_semwait;
}
if (res == -1 && errno == EINTR) {
goto retry_semwait;
}
#else
/*
* Wait for semaphore #1 indicating a new message from server
* to client in the response queue
*/
sop.sem_num = 1;
sop.sem_op = -1;
sop.sem_flg = 0;
retry_semop:
res = semop (ipc_instance->semid, &sop, 1);
if (res == -1 && errno == EINTR) {
return (EAGAIN);
} else
if (res == -1 && errno == EACCES) {
priv_change_send (ipc_instance);
goto retry_semop;
} else
if (res == -1) {
return (EBADE);
}
#endif
response_header = (qb_ipc_response_header_t *)ipc_instance->response_buffer;
if (response_header->error == EAGAIN) {
return EAGAIN;
}
memcpy (res_msg, ipc_instance->response_buffer, res_len);
return 0;
}
static int32_t
reply_receive_in_buf (
struct ipc_instance *ipc_instance,
void **res_msg)
{
#if _POSIX_THREAD_PROCESS_SHARED < 1
struct sembuf sop;
#else
struct timespec timeout;
struct pollfd pfd;
#endif
int res;
#if _POSIX_THREAD_PROCESS_SHARED > 0
retry_semwait:
timeout.tv_sec = time(NULL) + IPC_SEMWAIT_TIMEOUT;
timeout.tv_nsec = 0;
res = sem_timedwait (&ipc_instance->control_buffer->sem1, &timeout);
if (res == -1 && errno == ETIMEDOUT) {
pfd.fd = ipc_instance->fd;
pfd.events = 0;
poll (&pfd, 1, 0);
if (pfd.revents == POLLERR || pfd.revents == POLLHUP) {
return EBADE;
}
goto retry_semwait;
}
if (res == -1 && errno == EINTR) {
goto retry_semwait;
}
#else
/*
* Wait for semaphore #1 indicating a new message from server
* to client in the response queue
*/
sop.sem_num = 1;
sop.sem_op = -1;
sop.sem_flg = 0;
retry_semop:
res = semop (ipc_instance->semid, &sop, 1);
if (res == -1 && errno == EINTR) {
return (EAGAIN);
} else
if (res == -1 && errno == EACCES) {
priv_change_send (ipc_instance);
goto retry_semop;
} else
if (res == -1) {
return (EBADE);
}
#endif
*res_msg = (char *)ipc_instance->response_buffer;
return 0;
}
/*
* External API
*/
int32_t
qb_ipcc_service_connect (
const char *socket_name,
unsigned int service,
size_t request_size,
size_t response_size,
size_t dispatch_size,
qb_hdb_handle_t *handle)
{
int request_fd;
struct sockaddr_un address;
int32_t res;
struct ipc_instance *ipc_instance;
#if _POSIX_THREAD_PROCESS_SHARED < 1
key_t semkey = 0;
union semun semun;
#endif
int sys_res;
mar_req_setup_t req_setup;
mar_res_setup_t res_setup;
char control_map_path[128];
char request_map_path[128];
char response_map_path[128];
char dispatch_map_path[128];
res = qb_hdb_handle_create (&ipc_hdb,
sizeof (struct ipc_instance), handle);
if (res != 0) {
return (res);
}
res = qb_hdb_handle_get (&ipc_hdb, *handle, (void **)&ipc_instance);
if (res != 0) {
return (res);
}
res_setup.error = EBADE;
#if defined(QB_SOLARIS)
request_fd = socket (PF_UNIX, SOCK_STREAM, 0);
#else
request_fd = socket (PF_LOCAL, SOCK_STREAM, 0);
#endif
if (request_fd == -1) {
return (EBADE);
}
#ifdef SO_NOSIGPIPE
socket_nosigpipe (request_fd);
#endif
memset (&address, 0, sizeof (struct sockaddr_un));
address.sun_family = AF_UNIX;
#if defined(QB_BSD) || defined(QB_DARWIN)
address.sun_len = SUN_LEN(&address);
#endif
#if defined(QB_LINUX)
sprintf (address.sun_path + 1, "%s", socket_name);
#else
sprintf (address.sun_path, "%s/%s", SOCKETDIR, socket_name);
#endif
sys_res = connect (request_fd, (struct sockaddr *)&address,
QB_SUN_LEN(&address));
if (sys_res == -1) {
res = EAGAIN;
goto error_connect;
}
res = memory_map (
control_map_path,
"control_buffer-XXXXXX",
(void *)&ipc_instance->control_buffer,
8192);
if (res == -1) {
res = EBADE;
goto error_connect;
}
res = memory_map (
request_map_path,
"request_buffer-XXXXXX",
(void *)&ipc_instance->request_buffer,
request_size);
if (res == -1) {
res = EBADE;
goto error_request_buffer;
}
res = memory_map (
response_map_path,
"response_buffer-XXXXXX",
(void *)&ipc_instance->response_buffer,
response_size);
if (res == -1) {
res = EBADE;
goto error_response_buffer;
}
res = circular_memory_map (
dispatch_map_path,
"dispatch_buffer-XXXXXX",
(void *)&ipc_instance->dispatch_buffer,
dispatch_size);
if (res == -1) {
res = EBADE;
goto error_dispatch_buffer;
}
#if _POSIX_THREAD_PROCESS_SHARED > 0
sem_init (&ipc_instance->control_buffer->sem0, 1, 0);
sem_init (&ipc_instance->control_buffer->sem1, 1, 0);
sem_init (&ipc_instance->control_buffer->sem2, 1, 0);
#else
/*
* Allocate a semaphore segment
*/
while (1) {
semkey = random();
ipc_instance->euid = geteuid ();
if ((ipc_instance->semid
= semget (semkey, 3, IPC_CREAT|IPC_EXCL|0600)) != -1) {
break;
}
/*
* EACCESS can be returned as non root user when opening a different
* users semaphore.
*
* EEXIST can happen when we are a root or nonroot user opening
* an existing shared memory segment for which we have access
*/
if (errno != EEXIST && errno != EACCES) {
goto error_exit;
}
}
semun.val = 0;
res = semctl (ipc_instance->semid, 0, SETVAL, semun);
if (res != 0) {
goto error_exit;
}
res = semctl (ipc_instance->semid, 1, SETVAL, semun);
if (res != 0) {
goto error_exit;
}
#endif
/*
* Initialize IPC setup message
*/
req_setup.service = service;
strcpy (req_setup.control_file, control_map_path);
strcpy (req_setup.request_file, request_map_path);
strcpy (req_setup.response_file, response_map_path);
strcpy (req_setup.dispatch_file, dispatch_map_path);
req_setup.control_size = 8192;
req_setup.request_size = request_size;
req_setup.response_size = response_size;
req_setup.dispatch_size = dispatch_size;
#if _POSIX_THREAD_PROCESS_SHARED < 1
req_setup.semkey = semkey;
#endif
res = socket_send (request_fd, &req_setup, sizeof (mar_req_setup_t));
if (res != 0) {
goto error_exit;
}
res = socket_recv (request_fd, &res_setup, sizeof (mar_res_setup_t));
if (res != 0) {
goto error_exit;
}
ipc_instance->fd = request_fd;
ipc_instance->flow_control_state = 0;
if (res_setup.error == EAGAIN) {
res = res_setup.error;
goto error_exit;
}
ipc_instance->control_size = 8192;
ipc_instance->request_size = request_size;
ipc_instance->response_size = response_size;
ipc_instance->dispatch_size = dispatch_size;
pthread_mutex_init (&ipc_instance->mutex, NULL);
qb_hdb_handle_put (&ipc_hdb, *handle);
return (res_setup.error);
error_exit:
#if _POSIX_THREAD_PROCESS_SHARED < 1
if (ipc_instance->semid > 0)
semctl (ipc_instance->semid, 0, IPC_RMID);
#endif
memory_unmap (ipc_instance->dispatch_buffer, dispatch_size);
error_dispatch_buffer:
memory_unmap (ipc_instance->response_buffer, response_size);
error_response_buffer:
memory_unmap (ipc_instance->request_buffer, request_size);
error_request_buffer:
memory_unmap (ipc_instance->control_buffer, 8192);
error_connect:
close (request_fd);
qb_hdb_handle_destroy (&ipc_hdb, *handle);
qb_hdb_handle_put (&ipc_hdb, *handle);
return (res);
}
int32_t
qb_ipcc_service_disconnect (
qb_hdb_handle_t handle)
{
int32_t res;
struct ipc_instance *ipc_instance;
res = qb_hdb_handle_get (&ipc_hdb, handle, (void **)&ipc_instance);
if (res != 0) {
return (res);
}
shutdown (ipc_instance->fd, SHUT_RDWR);
close (ipc_instance->fd);
qb_hdb_handle_destroy (&ipc_hdb, handle);
qb_hdb_handle_put (&ipc_hdb, handle);
return (0);
}
int32_t
qb_ipcc_dispatch_flow_control_get (
qb_hdb_handle_t handle,
unsigned int *flow_control_state)
{
struct ipc_instance *ipc_instance;
int32_t res;
res = qb_hdb_handle_get (&ipc_hdb, handle, (void **)&ipc_instance);
if (res != 0) {
return (res);
}
*flow_control_state = ipc_instance->flow_control_state;
qb_hdb_handle_put (&ipc_hdb, handle);
return (res);
}
int32_t
qb_ipcc_fd_get (
qb_hdb_handle_t handle,
int *fd)
{
struct ipc_instance *ipc_instance;
int32_t res;
res = qb_hdb_handle_get (&ipc_hdb, handle, (void **)&ipc_instance);
if (res != 0) {
return (res);
}
*fd = ipc_instance->fd;
qb_hdb_handle_put (&ipc_hdb, handle);
return (res);
}
int32_t
qb_ipcc_dispatch_get (
qb_hdb_handle_t handle,
void **data,
int timeout)
{
struct pollfd ufds;
int poll_events;
char buf;
struct ipc_instance *ipc_instance;
int res;
char buf_two = 1;
char *data_addr;
int32_t error = 0;
error = qb_hdb_handle_get (&ipc_hdb, handle, (void **)&ipc_instance);
if (error != 0) {
return (error);
}
*data = NULL;
ufds.fd = ipc_instance->fd;
ufds.events = POLLIN;
ufds.revents = 0;
poll_events = poll (&ufds, 1, timeout);
if (poll_events == -1 && errno == EINTR) {
error = EAGAIN;
goto error_put;
} else
if (poll_events == -1) {
error = EBADE;
goto error_put;
} else
if (poll_events == 0) {
error = EAGAIN;
goto error_put;
}
if (poll_events == 1 && (ufds.revents & (POLLERR|POLLHUP))) {
error = EBADE;
goto error_put;
}
res = recv (ipc_instance->fd, &buf, 1, 0);
if (res == -1 && errno == EINTR) {
error = EAGAIN;
goto error_put;
} else
if (res == -1) {
error = EBADE;
goto error_put;
} else
if (res == 0) {
/* Means that the peer closed cleanly the socket. However, it should
* happen only on BSD and Darwing systems since poll() returns a
* POLLHUP event on other systems.
*/
error = EBADE;
goto error_put;
}
ipc_instance->flow_control_state = 0;
if (buf == MESSAGE_RES_OUTQ_NOT_EMPTY || buf == MESSAGE_RES_ENABLE_FLOWCONTROL) {
ipc_instance->flow_control_state = 1;
}
/*
* Notify executive to flush any pending dispatch messages
*/
if (ipc_instance->flow_control_state) {
buf_two = MESSAGE_REQ_OUTQ_FLUSH;
res = socket_send (ipc_instance->fd, &buf_two, 1);
assert (res == 0); /* TODO */
}
/*
* This is just a notification of flow control starting at the addition
* of a new pending message, not a message to dispatch
*/
if (buf == MESSAGE_RES_ENABLE_FLOWCONTROL) {
error = EAGAIN;
goto error_put;
}
if (buf == MESSAGE_RES_OUTQ_FLUSH_NR) {
error = EAGAIN;
goto error_put;
}
data_addr = ipc_instance->dispatch_buffer;
data_addr = &data_addr[ipc_instance->control_buffer->read];
*data = (void *)data_addr;
return (0);
error_put:
qb_hdb_handle_put (&ipc_hdb, handle);
return (error);
}
int32_t
qb_ipcc_dispatch_put (qb_hdb_handle_t handle)
{
#if _POSIX_THREAD_PROCESS_SHARED < 1
struct sembuf sop;
#endif
qb_ipc_response_header_t *header;
struct ipc_instance *ipc_instance;
int res;
char *addr;
unsigned int read_idx;
res = qb_hdb_handle_get_always (&ipc_hdb, handle, (void **)&ipc_instance);
if (res != 0) {
return (res);
}
#if _POSIX_THREAD_PROCESS_SHARED > 0
retry_semwait:
res = sem_wait (&ipc_instance->control_buffer->sem2);
if (res == -1 && errno == EINTR) {
goto retry_semwait;
}
#else
sop.sem_num = 2;
sop.sem_op = -1;
sop.sem_flg = 0;
retry_semop:
res = semop (ipc_instance->semid, &sop, 1);
if (res == -1 && errno == EINTR) {
res = EAGAIN;
goto error_exit;
} else
if (res == -1 && errno == EACCES) {
priv_change_send (ipc_instance);
goto retry_semop;
} else
if (res == -1) {
res = EBADE;
goto error_exit;
}
#endif
addr = ipc_instance->dispatch_buffer;
read_idx = ipc_instance->control_buffer->read;
header = (qb_ipc_response_header_t *) &addr[read_idx];
ipc_instance->control_buffer->read =
(read_idx + header->size) % ipc_instance->dispatch_size;
/*
* Put from dispatch get and also from this call's get
*/
res = 0;
#if _POSIX_THREAD_PROCESS_SHARED < 1
error_exit:
#endif
qb_hdb_handle_put (&ipc_hdb, handle);
qb_hdb_handle_put (&ipc_hdb, handle);
return (res);
}
+int32_t
+qb_ipcc_msg_send (
+ qb_hdb_handle_t handle,
+ const struct iovec *iov,
+ unsigned int iov_len)
+{
+ int32_t res;
+ struct ipc_instance *ipc_instance;
+
+ res = qb_hdb_handle_get (&ipc_hdb, handle, (void **)&ipc_instance);
+ if (res != 0) {
+ return (res);
+ }
+
+ pthread_mutex_lock (&ipc_instance->mutex);
+
+ res = msg_send (ipc_instance, iov, iov_len);
+ if (res != 0) {
+ goto error_exit;
+ }
+
+error_exit:
+ qb_hdb_handle_put (&ipc_hdb, handle);
+ pthread_mutex_unlock (&ipc_instance->mutex);
+
+ return (res);
+}
+
int32_t
qb_ipcc_msg_send_reply_receive (
qb_hdb_handle_t handle,
const struct iovec *iov,
unsigned int iov_len,
void *res_msg,
size_t res_len)
{
int32_t res;
struct ipc_instance *ipc_instance;
res = qb_hdb_handle_get (&ipc_hdb, handle, (void **)&ipc_instance);
if (res != 0) {
return (res);
}
pthread_mutex_lock (&ipc_instance->mutex);
res = msg_send (ipc_instance, iov, iov_len);
if (res != 0) {
goto error_exit;
}
res = reply_receive (ipc_instance, res_msg, res_len);
error_exit:
qb_hdb_handle_put (&ipc_hdb, handle);
pthread_mutex_unlock (&ipc_instance->mutex);
return (res);
}
int32_t
qb_ipcc_msg_send_reply_receive_in_buf_get (
qb_hdb_handle_t handle,
const struct iovec *iov,
unsigned int iov_len,
void **res_msg)
{
unsigned int res;
struct ipc_instance *ipc_instance;
res = qb_hdb_handle_get (&ipc_hdb, handle, (void **)&ipc_instance);
if (res != 0) {
return (res);
}
pthread_mutex_lock (&ipc_instance->mutex);
res = msg_send (ipc_instance, iov, iov_len);
if (res != 0) {
goto error_exit;
}
res = reply_receive_in_buf (ipc_instance, res_msg);
error_exit:
pthread_mutex_unlock (&ipc_instance->mutex);
return (res);
}
int32_t
qb_ipcc_msg_send_reply_receive_in_buf_put (
qb_hdb_handle_t handle)
{
unsigned int res;
struct ipc_instance *ipc_instance;
res = qb_hdb_handle_get (&ipc_hdb, handle, (void **)&ipc_instance);
if (res != 0) {
return (res);
}
qb_hdb_handle_put (&ipc_hdb, handle);
qb_hdb_handle_put (&ipc_hdb, handle);
return (res);
}
int32_t
qb_ipcc_zcb_alloc (
qb_hdb_handle_t handle,
void **buffer,
size_t size,
size_t header_size)
{
struct ipc_instance *ipc_instance;
void *buf = NULL;
char path[128];
unsigned int res;
mar_req_qb_ipcc_zc_alloc_t req_qb_ipcc_zc_alloc;
qb_ipc_response_header_t res_qb_ipcs_zc_alloc;
size_t map_size;
struct iovec iovec;
struct qb_ipcs_zc_header *hdr;
res = qb_hdb_handle_get (&ipc_hdb, handle, (void **)&ipc_instance);
if (res != 0) {
return (res);
}
map_size = size + header_size + sizeof (struct qb_ipcs_zc_header);
res = memory_map (path, "qb_zerocopy-XXXXXX", &buf, map_size);
assert (res != -1);
req_qb_ipcc_zc_alloc.header.size = sizeof (mar_req_qb_ipcc_zc_alloc_t);
req_qb_ipcc_zc_alloc.header.id = ZC_ALLOC_HEADER;
req_qb_ipcc_zc_alloc.map_size = map_size;
strcpy (req_qb_ipcc_zc_alloc.path_to_file, path);
iovec.iov_base = (void *)&req_qb_ipcc_zc_alloc;
iovec.iov_len = sizeof (mar_req_qb_ipcc_zc_alloc_t);
res = qb_ipcc_msg_send_reply_receive (
handle,
&iovec,
1,
&res_qb_ipcs_zc_alloc,
sizeof (qb_ipc_response_header_t));
hdr = (struct qb_ipcs_zc_header *)buf;
hdr->map_size = map_size;
*buffer = ((char *)buf) + sizeof (struct qb_ipcs_zc_header);
qb_hdb_handle_put (&ipc_hdb, handle);
return (res);
}
int32_t
qb_ipcc_zcb_free (
qb_hdb_handle_t handle,
void *buffer)
{
struct ipc_instance *ipc_instance;
mar_req_qb_ipcc_zc_free_t req_qb_ipcc_zc_free;
qb_ipc_response_header_t res_qb_ipcs_zc_free;
struct iovec iovec;
unsigned int res;
struct qb_ipcs_zc_header *header = (struct qb_ipcs_zc_header *)((char *)buffer - sizeof (struct qb_ipcs_zc_header));
res = qb_hdb_handle_get (&ipc_hdb, handle, (void **)&ipc_instance);
if (res != 0) {
return (res);
}
req_qb_ipcc_zc_free.header.size = sizeof (mar_req_qb_ipcc_zc_free_t);
req_qb_ipcc_zc_free.header.id = ZC_FREE_HEADER;
req_qb_ipcc_zc_free.map_size = header->map_size;
req_qb_ipcc_zc_free.server_address = header->server_address;
iovec.iov_base = (void *)&req_qb_ipcc_zc_free;
iovec.iov_len = sizeof (mar_req_qb_ipcc_zc_free_t);
res = qb_ipcc_msg_send_reply_receive (
handle,
&iovec,
1,
&res_qb_ipcs_zc_free,
sizeof (qb_ipc_response_header_t));
munmap ((void *)header, header->map_size);
qb_hdb_handle_put (&ipc_hdb, handle);
return (res);
}
int32_t
qb_ipcc_zcb_msg_send_reply_receive (
qb_hdb_handle_t handle,
void *msg,
void *res_msg,
size_t res_len)
{
struct ipc_instance *ipc_instance;
mar_req_qb_ipcc_zc_execute_t req_qb_ipcc_zc_execute;
struct qb_ipcs_zc_header *hdr;
struct iovec iovec;
int32_t res;
res = qb_hdb_handle_get (&ipc_hdb, handle, (void **)&ipc_instance);
if (res != 0) {
return (res);
}
hdr = (struct qb_ipcs_zc_header *)(((char *)msg) - sizeof (struct qb_ipcs_zc_header));
req_qb_ipcc_zc_execute.header.size = sizeof (mar_req_qb_ipcc_zc_execute_t);
req_qb_ipcc_zc_execute.header.id = ZC_EXECUTE_HEADER;
req_qb_ipcc_zc_execute.server_address = hdr->server_address;
iovec.iov_base = (void *)&req_qb_ipcc_zc_execute;
iovec.iov_len = sizeof (mar_req_qb_ipcc_zc_execute_t);
res = qb_ipcc_msg_send_reply_receive (
handle,
&iovec,
1,
res_msg,
res_len);
qb_hdb_handle_put (&ipc_hdb, handle);
return (res);
}
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sat, Jan 25, 10:48 AM (1 d, 2 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1322189
Default Alt Text
(31 KB)
Attached To
Mode
rQ LibQB
Attached
Detach File
Event Timeline
Log In to Comment