Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/include/qb/qbipcc.h b/include/qb/qbipcc.h
index de96c72..867ba04 100644
--- a/include/qb/qbipcc.h
+++ b/include/qb/qbipcc.h
@@ -1,264 +1,294 @@
/*
* Copyright (C) 2010-2020 Red Hat, Inc.
*
* Author: Steven Dake <sdake@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef QB_IPCC_H_DEFINED
#define QB_IPCC_H_DEFINED
/* *INDENT-OFF* */
#ifdef __cplusplus
extern "C" {
#endif
/* *INDENT-ON* */
#include <sys/types.h> /* size_t, ssize_t */
#include <sys/uio.h> /* iovec */
#include <qb/qbipc_common.h>
/**
* @file qbipcc.h
*
* Client IPC API.
*
* @par Lifecycle of an IPC connection.
* An IPC connection is made to the server with qb_ipcc_connect(). This function
* connects to the server and requests channels be created for communication.
* To disconnect, the client either exits or executes the function qb_ipcc_disconnect().
*
* @par Synchronous communication
* The function qb_ipcc_sendv_recv() sends an iovector request and receives a response.
*
* @par Asynchronous requests from the client
* The function qb_ipcc_sendv() sends an iovector request.
* The function qb_ipcc_send() sends an message buffer request.
*
* @par Asynchronous events from the server
* The qb_ipcc_event_recv() function receives an out-of-band asynchronous message.
* The asynchronous messages are queued and can provide very high out-of-band performance.
* To determine when to call qb_ipcc_event_recv() the qb_ipcc_fd_get() call is
* used to obtain a file descriptor used in the poll() or select() system calls.
*
* @example ipcclient.c
* This is an example of how to use the client.
*/
typedef struct qb_ipcc_connection qb_ipcc_connection_t;
/**
* Create a connection to an IPC service.
*
* @param name name of the service.
* @param max_msg_size biggest msg size.
* @return NULL (error: see errno) or a connection object.
*
* @note It is recommended to do a one time check on the
* max_msg_size value using qb_ipcc_verify_dgram_max_msg_size
* _BEFORE_ calling the connect function when IPC_SOCKET is in use.
* Some distributions while allow large message buffers to be
* set on the socket, but not actually honor them because of
* kernel state values. The qb_ipcc_verify_dgram_max_msg_size
* function both sets the socket buffer size and verifies it by
* doing a send/recv.
*/
qb_ipcc_connection_t*
qb_ipcc_connect(const char *name, size_t max_msg_size);
+/**
+ * Asynchronously connect to an IPC service
+ * @param name name of the service.
+ * @param max_msg_size biggest msg size.
+ * @param connect_fd return FD to continue connection with
+ * @return NULL (error: see errno) or a connection object.
+ *
+ * qb_ipcc_connect_async() returns a connection FD which
+ * should be used added to the application's mainloop - when it is
+ * active, qb_ipcc_connect_continue() should be called for the
+ * connection to be finalised.
+ * NOTE: This is NOT the same FD that is used for normal applicaion
+ * polling. qb_ipcc_fd_get() must still be called once the connection
+ * is established.
+ */
+qb_ipcc_connection_t *
+qb_ipcc_connect_async(const char *name, size_t max_msg_size, int *connect_fd);
+
+/**
+ * Finish up an asynchonous IPC connection
+ * @param c connection handle as returned from qb_ipcc_connect_async()
+ * @return 0 or -errno.
+ *
+ * Finishes up a connection that was initiated by qb_ipcc_connect_async(),
+ * this should only be called when the fd returned by qb_ipcc_connect_async()
+ * becomes active, usually as a callback in the application's main loop.
+ */
+int
+qb_ipcc_connect_continue(struct qb_ipcc_connection * c);
+
/**
* Test kernel dgram socket buffers to verify the largest size up
* to the max_msg_size value a single msg can be. Rounds down to the
* nearest 1k.
*
* @param max_msg_size biggest msg size.
* @return -1 if max size can not be detected, positive value
* representing the largest single msg up to max_msg_size
* that can successfully be sent over a unix dgram socket.
*/
int32_t
qb_ipcc_verify_dgram_max_msg_size(size_t max_msg_size);
/**
* Disconnect an IPC connection.
*
* @param c connection instance
*/
void qb_ipcc_disconnect(qb_ipcc_connection_t* c);
/**
* Get the file descriptor to poll.
*
* @param c connection instance
* @param fd (out) file descriptor to poll
*/
int32_t qb_ipcc_fd_get(qb_ipcc_connection_t* c, int32_t * fd);
/**
* Get the credentials of the server process
*
*
* @param c connection instance
* @param pid PID of the server we are connected to
* @param uid UID of the server we are connected to
* @param gid GID of the server we are connected to
*/
int32_t qb_ipcc_auth_get(qb_ipcc_connection_t* c, pid_t *pid, uid_t *uid, gid_t *gid);
/**
* Set the maximum allowable flowcontrol value.
*
* @note the default is 1
*
* @param c connection instance
* @param max the max allowable flowcontrol value (1 or 2)
*/
int32_t qb_ipcc_fc_enable_max_set(qb_ipcc_connection_t * c, uint32_t max);
/**
* Send a message.
*
* @param c connection instance
* @param msg_ptr pointer to a message to send
* @param msg_len the size of the message
* @return (size sent, -errno == error)
*
* @note the msg_ptr must include a qb_ipc_request_header at
* the top of the message. The server will read the size field
* to determine how much to recv.
*/
ssize_t qb_ipcc_send(qb_ipcc_connection_t* c, const void *msg_ptr,
size_t msg_len);
/**
* Send a message (iovec).
*
* @param c connection instance
* @param iov pointer to an iovec struct to send
* @param iov_len the number of iovecs used
* @return (size sent, -errno == error)
*
* @note the iov[0] must be a qb_ipc_request_header. The server will
* read the size field to determine how much to recv.
*/
ssize_t qb_ipcc_sendv(qb_ipcc_connection_t* c, const struct iovec* iov,
size_t iov_len);
/**
* Receive a response.
*
* @param c connection instance
* @param msg_ptr pointer to a message buffer to receive into
* @param msg_len the size of the buffer
* @param ms_timeout max time to wait for a response
* @return (size recv'ed, -errno == error)
*
* @note that msg_ptr will include a qb_ipc_response_header at
* the top of the message.
*/
ssize_t qb_ipcc_recv(qb_ipcc_connection_t* c, void *msg_ptr,
size_t msg_len, int32_t ms_timeout);
/**
* This is a convenience function that simply sends and then recvs.
*
* @param c connection instance
* @param iov pointer to an iovec struct to send
* @param iov_len the number of iovecs used
* @param msg_ptr pointer to a message buffer to receive into
* @param msg_len the size of the buffer
* @param ms_timeout max time to wait for a response
*
* @note the iov[0] must include a qb_ipc_request_header at
* the top of the message. The server will read the size field
* to determine how much to recv.
* @note that msg_ptr will include a qb_ipc_response_header at
* the top of the message.
*
* @see qb_ipcc_sendv() qb_ipcc_recv()
*/
ssize_t qb_ipcc_sendv_recv(qb_ipcc_connection_t *c,
const struct iovec *iov, uint32_t iov_len,
void *msg_ptr, size_t msg_len,
int32_t ms_timeout);
/**
* Receive an event.
*
* @param c connection instance
* @param msg_ptr pointer to a message buffer to receive into
* @param msg_len the size of the buffer
* @param ms_timeout time in milliseconds to wait for a message
* 0 == no wait, negative == block, positive == wait X ms.
* @return size of the message or error (-errno)
*
* @note that msg_ptr will include a qb_ipc_response_header at
* the top of the message.
*/
ssize_t qb_ipcc_event_recv(qb_ipcc_connection_t* c, void *msg_ptr,
size_t msg_len, int32_t ms_timeout);
/**
* Associate a "user" pointer with this connection.
*
* @param context the point to associate with this connection.
* @param c connection instance
* @see qb_ipcc_context_get()
*/
void qb_ipcc_context_set(qb_ipcc_connection_t *c, void *context);
/**
* Get the context (set previously)
*
* @param c connection instance
* @return the context
* @see qb_ipcc_context_set()
*/
void *qb_ipcc_context_get(qb_ipcc_connection_t *c);
/**
* Is the connection connected?
*
* @param c connection instance
* @retval QB_TRUE when connected
* @retval QB_FALSE when not connected
*/
int32_t qb_ipcc_is_connected(qb_ipcc_connection_t *c);
/**
* What is the actual buffer size used after the connection.
*
* @note The buffer size is guaranteed to be at least the size
* of the value given in qb_ipcc_connect, but it is possible
* the server will enforce a larger size depending on the
* implementation. If the server side is known to enforce
* a buffer size, use this function after the client connection
* is established to retrieve the buffer size in use. It is
* important for the client side to know the buffer size in use
* so the client can successfully retrieve large server events.
*
* @param c connection instance
* @retval connection size in bytes or -error code
*/
int32_t qb_ipcc_get_buffer_size(qb_ipcc_connection_t * c);
/* *INDENT-OFF* */
#ifdef __cplusplus
}
#endif
/* *INDENT-ON* */
#endif /* QB_IPCC_H_DEFINED */
diff --git a/lib/ipc_int.h b/lib/ipc_int.h
index 03c5dab..87f1de1 100644
--- a/lib/ipc_int.h
+++ b/lib/ipc_int.h
@@ -1,214 +1,215 @@
/*
* Copyright (C) 2009 Red Hat, Inc.
*
* Author: Steven Dake <sdake@redhat.com>
* Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef QB_IPC_INT_H_DEFINED
#define QB_IPC_INT_H_DEFINED
#include "os_base.h"
#include <dirent.h>
#include <qb/qblist.h>
#include <qb/qbloop.h>
#include <qb/qbipcc.h>
#include <qb/qbipcs.h>
#include <qb/qbipc_common.h>
#include <qb/qbrb.h>
#define QB_IPC_MAX_WAIT_MS 2000
/*
Client Server
SEND CONN REQ ->
ACCEPT & CREATE queues
or DENY
<- SEND ACCEPT(with details)/DENY
*/
struct qb_ipc_connection_request {
struct qb_ipc_request_header hdr;
uint32_t max_msg_size;
} __attribute__ ((aligned(8)));
struct qb_ipc_event_connection_request {
struct qb_ipc_request_header hdr;
intptr_t connection;
} __attribute__ ((aligned(8)));
struct qb_ipc_connection_response {
struct qb_ipc_response_header hdr;
int32_t connection_type;
uint32_t max_msg_size;
intptr_t connection;
char request[PATH_MAX];
char response[PATH_MAX];
char event[PATH_MAX];
} __attribute__ ((aligned(8)));
struct qb_ipcc_connection;
struct qb_ipc_one_way {
size_t max_msg_size;
enum qb_ipc_type type;
union {
struct {
int32_t sock;
char *sock_name;
void* shared_data;
char shared_file_name[NAME_MAX];
} us;
struct {
qb_ringbuffer_t *rb;
} shm;
} u;
};
struct qb_ipcc_funcs {
ssize_t (*recv)(struct qb_ipc_one_way *one_way, void *buf, size_t buf_size, int32_t timeout);
ssize_t (*send)(struct qb_ipc_one_way *one_way, const void *data, size_t size);
ssize_t (*sendv)(struct qb_ipc_one_way *one_way, const struct iovec *iov, size_t iov_len);
void (*disconnect)(struct qb_ipcc_connection* c);
int32_t (*fc_get)(struct qb_ipc_one_way *one_way);
};
struct qb_ipcc_connection {
char name[NAME_MAX];
int32_t needs_sock_for_poll;
gid_t egid;
pid_t server_pid;
struct qb_ipc_one_way setup;
struct qb_ipc_one_way request;
struct qb_ipc_one_way response;
struct qb_ipc_one_way event;
struct qb_ipcc_funcs funcs;
struct qb_ipc_request_header *receive_buf;
uint32_t fc_enable_max;
int32_t is_connected;
void * context;
uid_t euid;
};
int32_t qb_ipcc_us_setup_connect(struct qb_ipcc_connection *c,
- struct qb_ipc_connection_response *r);
+ struct qb_ipc_connection_response *r);
+int qb_ipcc_setup_connect_continue(struct qb_ipcc_connection *c, struct qb_ipc_connection_response *response);
ssize_t qb_ipc_us_send(struct qb_ipc_one_way *one_way, const void *msg, size_t len);
ssize_t qb_ipc_us_recv(struct qb_ipc_one_way *one_way, void *msg, size_t len, int32_t timeout);
int32_t qb_ipc_us_ready(struct qb_ipc_one_way *ow_data, struct qb_ipc_one_way *ow_conn,
int32_t ms_timeout, int32_t events);
void qb_ipcc_us_sock_close(int32_t sock);
int32_t qb_ipcc_us_connect(struct qb_ipcc_connection *c, struct qb_ipc_connection_response * response);
int32_t qb_ipcc_shm_connect(struct qb_ipcc_connection *c, struct qb_ipc_connection_response * response);
struct qb_ipcs_service;
struct qb_ipcs_connection;
struct qb_ipcs_funcs {
int32_t (*connect)(struct qb_ipcs_service *s, struct qb_ipcs_connection *c,
struct qb_ipc_connection_response *r);
void (*disconnect)(struct qb_ipcs_connection *c);
ssize_t (*recv)(struct qb_ipc_one_way *one_way, void *buf, size_t buf_size, int32_t timeout);
ssize_t (*peek)(struct qb_ipc_one_way *one_way, void **data_out, int32_t timeout);
void (*reclaim)(struct qb_ipc_one_way *one_way);
ssize_t (*send)(struct qb_ipc_one_way *one_way, const void *data, size_t size);
ssize_t (*sendv)(struct qb_ipc_one_way *one_way, const struct iovec* iov, size_t iov_len);
void (*fc_set)(struct qb_ipc_one_way *one_way, int32_t fc_enable);
ssize_t (*q_len_get)(struct qb_ipc_one_way *one_way);
};
struct qb_ipcs_service {
enum qb_ipc_type type;
char name[NAME_MAX];
uint32_t max_buffer_size;
int32_t service_id;
int32_t ref_count;
pid_t pid;
int32_t needs_sock_for_poll;
int32_t server_sock;
struct qb_ipcs_service_handlers serv_fns;
struct qb_ipcs_poll_handlers poll_fns;
struct qb_ipcs_funcs funcs;
enum qb_loop_priority poll_priority;
struct qb_list_head connections;
struct qb_list_head list;
struct qb_ipcs_stats stats;
void *context;
};
enum qb_ipcs_connection_state {
QB_IPCS_CONNECTION_INACTIVE,
QB_IPCS_CONNECTION_ACTIVE,
QB_IPCS_CONNECTION_ESTABLISHED,
QB_IPCS_CONNECTION_SHUTTING_DOWN,
};
#define CONNECTION_DESCRIPTION NAME_MAX
struct qb_ipcs_connection_auth {
uid_t uid;
gid_t gid;
mode_t mode;
};
struct qb_ipcs_connection {
enum qb_ipcs_connection_state state;
int32_t refcount;
pid_t pid;
uid_t euid;
gid_t egid;
struct qb_ipcs_connection_auth auth;
struct qb_ipc_one_way setup;
struct qb_ipc_one_way request;
struct qb_ipc_one_way response;
struct qb_ipc_one_way event;
struct qb_ipcs_service *service;
struct qb_list_head list;
struct qb_ipc_request_header *receive_buf;
void *context;
int32_t fc_enabled;
int32_t poll_events;
int32_t outstanding_notifiers;
char description[CONNECTION_DESCRIPTION];
struct qb_ipcs_connection_stats_2 stats;
};
void qb_ipcs_us_init(struct qb_ipcs_service *s);
void qb_ipcs_shm_init(struct qb_ipcs_service *s);
int32_t qb_ipcs_us_publish(struct qb_ipcs_service *s);
int32_t qb_ipcs_us_withdraw(struct qb_ipcs_service *s);
int32_t qb_ipcc_us_sock_connect(const char *socket_name, int32_t * sock_pt);
int32_t qb_ipcs_dispatch_connection_request(int32_t fd, int32_t revents, void *data);
struct qb_ipcs_connection* qb_ipcs_connection_alloc(struct qb_ipcs_service *s);
int32_t qb_ipcs_process_request(struct qb_ipcs_service *s,
struct qb_ipc_request_header *hdr);
int32_t qb_ipc_us_sock_error_is_disconnected(int err);
int use_filesystem_sockets(void);
void remove_tempdir(const char *name);
#endif /* QB_IPC_INT_H_DEFINED */
diff --git a/lib/ipc_setup.c b/lib/ipc_setup.c
index c144a5e..0ef9bb6 100644
--- a/lib/ipc_setup.c
+++ b/lib/ipc_setup.c
@@ -1,934 +1,944 @@
/*
* Copyright (C) 2010,2013 Red Hat, Inc.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os_base.h"
#include <poll.h>
#if defined(HAVE_GETPEERUCRED)
#include <ucred.h>
#endif
#ifdef HAVE_SYS_UN_H
#include <sys/un.h>
#endif /* HAVE_SYS_UN_H */
#ifdef HAVE_SYS_STAT_H
#include <sys/stat.h>
#endif
#ifdef HAVE_SYS_MMAN_H
#include <sys/mman.h>
#endif
#include <qb/qbatomic.h>
#include <qb/qbipcs.h>
#include <qb/qbloop.h>
#include <qb/qbdefs.h>
#include "util_int.h"
#include "ipc_int.h"
struct ipc_auth_ugp {
uid_t uid;
gid_t gid;
pid_t pid;
};
struct ipc_auth_data {
int32_t sock;
struct qb_ipcs_service *s;
union {
struct qb_ipc_connection_request req;
struct qb_ipc_connection_response res;
} msg;
struct msghdr msg_recv;
struct iovec iov_recv;
struct ipc_auth_ugp ugp;
size_t processed;
size_t len;
#ifdef SO_PASSCRED
char *cmsg_cred;
#endif
};
static int32_t qb_ipcs_us_connection_acceptor(int fd, int revent, void *data);
ssize_t
qb_ipc_us_send(struct qb_ipc_one_way *one_way, const void *msg, size_t len)
{
int32_t result;
int32_t processed = 0;
char *rbuf = (char *)msg;
qb_sigpipe_ctl(QB_SIGPIPE_IGNORE);
retry_send:
result = send(one_way->u.us.sock,
&rbuf[processed], len - processed, MSG_NOSIGNAL);
if (result == -1) {
if (errno == EAGAIN && processed > 0) {
goto retry_send;
} else {
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
return -errno;
}
}
processed += result;
if (processed != len) {
goto retry_send;
}
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
return processed;
}
static ssize_t
qb_ipc_us_recv_msghdr(struct ipc_auth_data *data)
{
char *msg = (char *) &data->msg;
int32_t result;
qb_sigpipe_ctl(QB_SIGPIPE_IGNORE);
retry_recv:
data->msg_recv.msg_iov->iov_base = &msg[data->processed];
data->msg_recv.msg_iov->iov_len = data->len - data->processed;
result = recvmsg(data->sock, &data->msg_recv, MSG_NOSIGNAL | MSG_WAITALL);
if (result == -1 && errno == EAGAIN) {
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
return -EAGAIN;
}
if (result == -1) {
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
return -errno;
}
if (result == 0) {
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
qb_util_log(LOG_DEBUG,
"recv(fd %d) got 0 bytes assuming ENOTCONN", data->sock);
return -ENOTCONN;
}
data->processed += result;
if (data->processed != data->len) {
goto retry_recv;
}
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
assert(data->processed == data->len);
return data->processed;
}
int32_t
qb_ipc_us_sock_error_is_disconnected(int err)
{
if (err >= 0) {
return QB_FALSE;
} else if (err == -EAGAIN ||
err == -ETIMEDOUT ||
err == -EINTR ||
#ifdef EWOULDBLOCK
err == -EWOULDBLOCK ||
#endif
err == -EMSGSIZE ||
err == -ENOMSG ||
err == -EINVAL) {
return QB_FALSE;
}
return QB_TRUE;
}
int32_t
qb_ipc_us_ready(struct qb_ipc_one_way * ow_data,
struct qb_ipc_one_way * ow_conn,
int32_t ms_timeout, int32_t events)
{
struct pollfd ufds[2];
int32_t poll_events;
int numfds = 1;
int i;
ufds[0].fd = ow_data->u.us.sock;
ufds[0].events = events;
ufds[0].revents = 0;
if (ow_conn && ow_data != ow_conn) {
numfds++;
ufds[1].fd = ow_conn->u.us.sock;
ufds[1].events = POLLIN;
ufds[1].revents = 0;
}
poll_events = poll(ufds, numfds, ms_timeout);
if ((poll_events == -1 && errno == EINTR) || poll_events == 0) {
return -EAGAIN;
} else if (poll_events == -1) {
return -errno;
}
for (i = 0; i < poll_events; i++) {
if (ufds[i].revents & POLLERR) {
qb_util_log(LOG_DEBUG, "poll(fd %d) got POLLERR",
ufds[i].fd);
return -ENOTCONN;
} else if (ufds[i].revents & POLLHUP) {
qb_util_log(LOG_DEBUG, "poll(fd %d) got POLLHUP",
ufds[i].fd);
return -ENOTCONN;
} else if (ufds[i].revents & POLLNVAL) {
qb_util_log(LOG_DEBUG, "poll(fd %d) got POLLNVAL",
ufds[i].fd);
return -ENOTCONN;
} else if (ufds[i].revents == 0) {
qb_util_log(LOG_DEBUG, "poll(fd %d) zero revents",
ufds[i].fd);
return -ENOTCONN;
}
}
return 0;
}
/*
* recv an entire message - and try hard to get all of it.
*/
ssize_t
qb_ipc_us_recv(struct qb_ipc_one_way * one_way,
void *msg, size_t len, int32_t timeout)
{
int32_t result;
int32_t final_rc = 0;
int32_t processed = 0;
int32_t to_recv = len;
char *data = msg;
qb_sigpipe_ctl(QB_SIGPIPE_IGNORE);
retry_recv:
result = recv(one_way->u.us.sock, &data[processed], to_recv,
MSG_NOSIGNAL | MSG_WAITALL);
if (result == -1) {
if (errno == EAGAIN && (processed > 0 || timeout == -1)) {
result = qb_ipc_us_ready(one_way, NULL, timeout, POLLIN);
if (result == 0 || result == -EAGAIN) {
goto retry_recv;
}
final_rc = result;
goto cleanup_sigpipe;
} else if (errno == ECONNRESET || errno == EPIPE) {
final_rc = -ENOTCONN;
goto cleanup_sigpipe;
} else {
final_rc = -errno;
goto cleanup_sigpipe;
}
}
if (result == 0) {
final_rc = -ENOTCONN;
goto cleanup_sigpipe;
}
processed += result;
to_recv -= result;
if (processed != len) {
goto retry_recv;
}
final_rc = processed;
cleanup_sigpipe:
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
return final_rc;
}
static int32_t
qb_ipcc_stream_sock_connect(const char *socket_name, int32_t * sock_pt)
{
int32_t request_fd;
struct sockaddr_un address;
int32_t res = 0;
request_fd = socket(PF_UNIX, SOCK_STREAM, 0);
if (request_fd == -1) {
return -errno;
}
qb_socket_nosigpipe(request_fd);
res = qb_sys_fd_nonblock_cloexec_set(request_fd);
if (res < 0) {
goto error_connect;
}
memset(&address, 0, sizeof(struct sockaddr_un));
address.sun_family = AF_UNIX;
#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
address.sun_len = QB_SUN_LEN(&address);
#endif
if (!use_filesystem_sockets()) {
snprintf(address.sun_path + 1, UNIX_PATH_MAX - 1, "%s", socket_name);
} else {
snprintf(address.sun_path, sizeof(address.sun_path), "%s/%s", SOCKETDIR,
socket_name);
}
if (connect(request_fd, (struct sockaddr *)&address,
QB_SUN_LEN(&address)) == -1) {
res = -errno;
goto error_connect;
}
*sock_pt = request_fd;
return 0;
error_connect:
close(request_fd);
*sock_pt = -1;
return res;
}
void
qb_ipcc_us_sock_close(int32_t sock)
{
shutdown(sock, SHUT_RDWR);
close(sock);
}
static int32_t
qb_ipc_auth_creds(struct ipc_auth_data *data)
{
int32_t res = 0;
/*
* currently support getpeerucred, getpeereid, and SO_PASSCRED credential
* retrieval mechanisms for various Platforms
*/
#ifdef HAVE_GETPEERUCRED
/*
* Solaris and some BSD systems
*/
{
ucred_t *uc = NULL;
if (getpeerucred(data->sock, &uc) == 0) {
res = 0;
data->ugp.uid = ucred_geteuid(uc);
data->ugp.gid = ucred_getegid(uc);
data->ugp.pid = ucred_getpid(uc);
ucred_free(uc);
} else {
res = -errno;
}
}
#elif defined(HAVE_GETPEEREID)
/*
* Usually MacOSX systems
*/
{
/*
* TODO get the peer's pid.
* c->pid = ?;
*/
if (getpeereid(data->sock, &data->ugp.uid, &data->ugp.gid) == 0) {
res = 0;
} else {
res = -errno;
}
}
#elif defined(SO_PASSCRED)
/*
* Usually Linux systems
*/
{
struct ucred cred;
struct cmsghdr *cmsg;
res = -EINVAL;
for (cmsg = CMSG_FIRSTHDR(&data->msg_recv); cmsg != NULL;
cmsg = CMSG_NXTHDR(&data->msg_recv, cmsg)) {
if (cmsg->cmsg_type != SCM_CREDENTIALS)
continue;
memcpy(&cred, CMSG_DATA(cmsg), sizeof(struct ucred));
res = 0;
data->ugp.pid = cred.pid;
data->ugp.uid = cred.uid;
data->ugp.gid = cred.gid;
break;
}
}
#else /* no credentials */
data->ugp.pid = 0;
data->ugp.uid = 0;
data->ugp.gid = 0;
res = -ENOTSUP;
#endif /* no credentials */
return res;
}
static void
destroy_ipc_auth_data(struct ipc_auth_data *data)
{
if (data->s) {
qb_ipcs_unref(data->s);
}
#ifdef SO_PASSCRED
free(data->cmsg_cred);
#endif
free(data);
}
static struct ipc_auth_data *
init_ipc_auth_data(int sock, size_t len)
{
struct ipc_auth_data *data = calloc(1, sizeof(struct ipc_auth_data));
if (data == NULL) {
return NULL;
}
data->msg_recv.msg_iov = &data->iov_recv;
data->msg_recv.msg_iovlen = 1;
data->msg_recv.msg_name = 0;
data->msg_recv.msg_namelen = 0;
#ifdef SO_PASSCRED
data->cmsg_cred = calloc(1, CMSG_SPACE(sizeof(struct ucred)));
if (data->cmsg_cred == NULL) {
destroy_ipc_auth_data(data);
return NULL;
}
data->msg_recv.msg_control = (void *)data->cmsg_cred;
data->msg_recv.msg_controllen = CMSG_SPACE(sizeof(struct ucred));
#endif
#ifdef QB_SOLARIS
data->msg_recv.msg_accrights = 0;
data->msg_recv.msg_accrightslen = 0;
#else
data->msg_recv.msg_flags = 0;
#endif /* QB_SOLARIS */
data->len = len;
data->iov_recv.iov_base = &data->msg;
data->iov_recv.iov_len = data->len;
data->sock = sock;
return data;
}
int32_t
qb_ipcc_us_setup_connect(struct qb_ipcc_connection *c,
struct qb_ipc_connection_response *r)
{
int32_t res;
struct qb_ipc_connection_request request;
- struct ipc_auth_data *data;
#ifdef QB_LINUX
- int off = 0;
int on = 1;
#endif
res = qb_ipcc_stream_sock_connect(c->name, &c->setup.u.us.sock);
if (res != 0) {
return res;
}
#ifdef QB_LINUX
setsockopt(c->setup.u.us.sock, SOL_SOCKET, SO_PASSCRED, &on,
sizeof(on));
#endif
memset(&request, 0, sizeof(request));
request.hdr.id = QB_IPC_MSG_AUTHENTICATE;
request.hdr.size = sizeof(request);
request.max_msg_size = c->setup.max_msg_size;
res = qb_ipc_us_send(&c->setup, &request, request.hdr.size);
if (res < 0) {
qb_ipcc_us_sock_close(c->setup.u.us.sock);
return res;
}
+ /* ... To be continued ... (when the FD is active) */
+ return 0;
+}
+
+/* Called from ipcc_connect_continue() when async connect socket is active */
+int qb_ipcc_setup_connect_continue(struct qb_ipcc_connection *c, struct qb_ipc_connection_response *r)
+{
+ struct ipc_auth_data *data;
+ int32_t res;
+#ifdef QB_LINUX
+ int off = 0;
+#endif
data = init_ipc_auth_data(c->setup.u.us.sock, sizeof(struct qb_ipc_connection_response));
if (data == NULL) {
qb_ipcc_us_sock_close(c->setup.u.us.sock);
return -ENOMEM;
}
- qb_ipc_us_ready(&c->setup, NULL, -1, POLLIN);
res = qb_ipc_us_recv_msghdr(data);
#ifdef QB_LINUX
setsockopt(c->setup.u.us.sock, SOL_SOCKET, SO_PASSCRED, &off,
sizeof(off));
#endif
if (res != data->len) {
destroy_ipc_auth_data(data);
return res;
}
memcpy(r, &data->msg.res, sizeof(struct qb_ipc_connection_response));
qb_ipc_auth_creds(data);
c->egid = data->ugp.gid;
c->euid = data->ugp.uid;
c->server_pid = data->ugp.pid;
destroy_ipc_auth_data(data);
+
return r->hdr.error;
}
/*
**************************************************************************
* SERVER
*/
int32_t
qb_ipcs_us_publish(struct qb_ipcs_service * s)
{
struct sockaddr_un un_addr;
int32_t res;
#ifdef SO_PASSCRED
int on = 1;
#endif
/*
* Create socket for IPC clients, name socket, listen for connections
*/
s->server_sock = socket(PF_UNIX, SOCK_STREAM, 0);
if (s->server_sock == -1) {
res = -errno;
qb_util_perror(LOG_ERR, "Cannot create server socket");
return res;
}
res = qb_sys_fd_nonblock_cloexec_set(s->server_sock);
if (res < 0) {
goto error_close;
}
memset(&un_addr, 0, sizeof(struct sockaddr_un));
un_addr.sun_family = AF_UNIX;
#if defined(QB_BSD) || defined(QB_DARWIN)
un_addr.sun_len = SUN_LEN(&un_addr);
#endif
qb_util_log(LOG_INFO, "server name: %s", s->name);
if (!use_filesystem_sockets()) {
snprintf(un_addr.sun_path + 1, UNIX_PATH_MAX - 1, "%s", s->name);
}
else {
struct stat stat_out;
res = stat(SOCKETDIR, &stat_out);
if (res == -1 || (res == 0 && !S_ISDIR(stat_out.st_mode))) {
res = -errno;
qb_util_log(LOG_CRIT,
"Required directory not present %s",
SOCKETDIR);
goto error_close;
}
snprintf(un_addr.sun_path, sizeof(un_addr.sun_path), "%s/%s", SOCKETDIR,
s->name);
unlink(un_addr.sun_path);
}
res = bind(s->server_sock, (struct sockaddr *)&un_addr,
QB_SUN_LEN(&un_addr));
if (res) {
res = -errno;
qb_util_perror(LOG_ERR, "Could not bind AF_UNIX (%s)",
un_addr.sun_path);
goto error_close;
}
/*
* Allow everyone to write to the socket since the IPC layer handles
* security automatically
*/
if (use_filesystem_sockets()) {
(void)chmod(un_addr.sun_path, S_IRWXU | S_IRWXG | S_IRWXO);
}
#ifdef SO_PASSCRED
(void)setsockopt(s->server_sock, SOL_SOCKET, SO_PASSCRED, &on, sizeof(on));
#endif
if (listen(s->server_sock, SERVER_BACKLOG) == -1) {
qb_util_perror(LOG_ERR, "socket listen failed");
}
res = s->poll_fns.dispatch_add(s->poll_priority, s->server_sock,
POLLIN | POLLPRI | POLLNVAL,
s, qb_ipcs_us_connection_acceptor);
return res;
error_close:
close(s->server_sock);
return res;
}
int32_t
qb_ipcs_us_withdraw(struct qb_ipcs_service * s)
{
qb_util_log(LOG_INFO, "withdrawing server sockets");
(void)s->poll_fns.dispatch_del(s->server_sock);
shutdown(s->server_sock, SHUT_RDWR);
if (use_filesystem_sockets()) {
struct sockaddr_un sockname;
socklen_t socklen = sizeof(sockname);
if ((getsockname(s->server_sock, (struct sockaddr *)&sockname, &socklen) == 0) &&
sockname.sun_family == AF_UNIX) {
#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
/*
* Terminating NUL on FreeBSD is not part of the sun_path.
* Add it to use sun_path as a parameter of unlink
*/
sockname.sun_path[sockname.sun_len - offsetof(struct sockaddr_un, sun_path)] = '\0';
#endif
unlink(sockname.sun_path);
}
}
close(s->server_sock);
s->server_sock = -1;
return 0;
}
static int32_t
handle_new_connection(struct qb_ipcs_service *s,
int32_t auth_result,
int32_t sock,
void *msg, size_t len, struct ipc_auth_ugp *ugp)
{
struct qb_ipcs_connection *c = NULL;
struct qb_ipc_connection_request *req = msg;
int32_t res = auth_result;
int32_t res2 = 0;
uint32_t max_buffer_size = QB_MAX(req->max_msg_size, s->max_buffer_size);
struct qb_ipc_connection_response response;
const char suffix[] = "/qb";
int desc_len;
c = qb_ipcs_connection_alloc(s);
if (c == NULL) {
qb_ipcc_us_sock_close(sock);
return -ENOMEM;
}
c->receive_buf = calloc(1, max_buffer_size);
if (c->receive_buf == NULL) {
free(c);
qb_ipcc_us_sock_close(sock);
return -ENOMEM;
}
c->setup.u.us.sock = sock;
c->request.max_msg_size = max_buffer_size;
c->response.max_msg_size = max_buffer_size;
c->event.max_msg_size = max_buffer_size;
c->pid = ugp->pid;
c->auth.uid = c->euid = ugp->uid;
c->auth.gid = c->egid = ugp->gid;
c->auth.mode = 0600;
c->stats.client_pid = ugp->pid;
memset(&response, 0, sizeof(response));
#if defined(QB_LINUX) || defined(QB_CYGWIN)
desc_len = snprintf(c->description, CONNECTION_DESCRIPTION - sizeof suffix,
"/dev/shm/qb-%d-%d-%d-XXXXXX", s->pid, ugp->pid, c->setup.u.us.sock);
if (desc_len < 0) {
res = -errno;
goto send_response;
}
if (desc_len >= CONNECTION_DESCRIPTION - sizeof suffix) {
res = -ENAMETOOLONG;
goto send_response;
}
if (mkdtemp(c->description) == NULL) {
res = -errno;
goto send_response;
}
if (chmod(c->description, 0770)) {
res = -errno;
goto send_response;
}
/* chown can fail because we might not be root */
(void)chown(c->description, c->auth.uid, c->auth.gid);
/* We can't pass just a directory spec to the clients */
memcpy(c->description + desc_len, suffix, sizeof suffix);
#else
desc_len = snprintf(c->description, CONNECTION_DESCRIPTION,
"%d-%d-%d", s->pid, ugp->pid, c->setup.u.us.sock);
if (desc_len < 0) {
res = -errno;
goto send_response;
}
if (desc_len >= CONNECTION_DESCRIPTION) {
res = -ENAMETOOLONG;
goto send_response;
}
#endif
if (auth_result == 0 && c->service->serv_fns.connection_accept) {
res = c->service->serv_fns.connection_accept(c,
c->euid, c->egid);
}
if (res != 0) {
goto send_response;
}
qb_util_log(LOG_DEBUG, "IPC credentials authenticated (%s)",
c->description);
if (s->funcs.connect) {
res = s->funcs.connect(s, c, &response);
if (res != 0) {
goto send_response;
}
}
/*
* The connection is good, add it to the active connection list
*/
c->state = QB_IPCS_CONNECTION_ACTIVE;
qb_list_add(&c->list, &s->connections);
send_response:
response.hdr.id = QB_IPC_MSG_AUTHENTICATE;
response.hdr.size = sizeof(response);
response.hdr.error = res;
if (res == 0) {
response.connection = (intptr_t) c;
response.connection_type = s->type;
response.max_msg_size = c->request.max_msg_size;
s->stats.active_connections++;
}
res2 = qb_ipc_us_send(&c->setup, &response, response.hdr.size);
if (res == 0 && res2 != response.hdr.size) {
res = res2;
}
if (res == 0) {
qb_ipcs_connection_ref(c);
if (s->serv_fns.connection_created) {
s->serv_fns.connection_created(c);
}
if (c->state == QB_IPCS_CONNECTION_ACTIVE) {
c->state = QB_IPCS_CONNECTION_ESTABLISHED;
}
qb_ipcs_connection_unref(c);
} else {
if (res == -EACCES) {
qb_util_log(LOG_ERR, "Invalid IPC credentials (%s).",
c->description);
} else if (res == -EAGAIN) {
qb_util_log(LOG_WARNING, "Denied connection, is not ready (%s)",
c->description);
} else {
errno = -res;
qb_util_perror(LOG_ERR,
"Error in connection setup (%s)",
c->description);
}
if (c->state == QB_IPCS_CONNECTION_INACTIVE) {
/* This removes the initial alloc ref */
qb_ipcs_connection_unref(c);
qb_ipcc_us_sock_close(sock);
} else {
qb_ipcs_disconnect(c);
}
}
return res;
}
static int32_t
process_auth(int32_t fd, int32_t revents, void *d)
{
struct ipc_auth_data *data = (struct ipc_auth_data *) d;
int32_t res = 0;
#ifdef SO_PASSCRED
int off = 0;
#endif
if (data->s->server_sock == -1) {
qb_util_log(LOG_DEBUG, "Closing fd (%d) for server shutdown", fd);
res = -ESHUTDOWN;
goto cleanup_and_return;
}
if (revents & POLLNVAL) {
qb_util_log(LOG_DEBUG, "NVAL conn fd (%d)", fd);
res = -EINVAL;
goto cleanup_and_return;
}
if (revents & POLLHUP) {
qb_util_log(LOG_DEBUG, "HUP conn fd (%d)", fd);
res = -ESHUTDOWN;
goto cleanup_and_return;
}
if ((revents & POLLIN) == 0) {
return 0;
}
res = qb_ipc_us_recv_msghdr(data);
if (res == -EAGAIN) {
/* yield to mainloop, Let mainloop call us again */
return 0;
}
if (res != data->len) {
res = -EIO;
goto cleanup_and_return;
}
res = qb_ipc_auth_creds(data);
cleanup_and_return:
#ifdef SO_PASSCRED
setsockopt(data->sock, SOL_SOCKET, SO_PASSCRED, &off, sizeof(off));
#endif
(void)data->s->poll_fns.dispatch_del(data->sock);
if (res < 0) {
close(data->sock);
} else if (data->msg.req.hdr.id == QB_IPC_MSG_AUTHENTICATE) {
(void)handle_new_connection(data->s, res, data->sock, &data->msg, data->len, &data->ugp);
} else {
close(data->sock);
}
destroy_ipc_auth_data(data);
return 1;
}
static void
qb_ipcs_uc_recv_and_auth(int32_t sock, struct qb_ipcs_service *s)
{
int res = 0;
struct ipc_auth_data *data = NULL;
#ifdef SO_PASSCRED
int on = 1;
#endif
data = init_ipc_auth_data(sock, sizeof(struct qb_ipc_connection_request));
if (data == NULL) {
close(sock);
/* -ENOMEM */
return;
}
data->s = s;
qb_ipcs_ref(data->s);
#ifdef SO_PASSCRED
setsockopt(sock, SOL_SOCKET, SO_PASSCRED, &on, sizeof(on));
#endif
res = s->poll_fns.dispatch_add(s->poll_priority,
data->sock,
POLLIN | POLLPRI | POLLNVAL,
data, process_auth);
if (res < 0) {
qb_util_log(LOG_DEBUG, "Failed to arrange for AUTH for fd (%d)",
data->sock);
close(sock);
destroy_ipc_auth_data(data);
}
}
static int32_t
qb_ipcs_us_connection_acceptor(int fd, int revent, void *data)
{
struct sockaddr_un un_addr;
int32_t new_fd;
struct qb_ipcs_service *s = (struct qb_ipcs_service *)data;
int32_t res;
socklen_t addrlen = sizeof(struct sockaddr_un);
if (revent & (POLLNVAL | POLLHUP | POLLERR)) {
/*
* handle shutdown more cleanly.
*/
return -1;
}
retry_accept:
errno = 0;
new_fd = accept(fd, (struct sockaddr *)&un_addr, &addrlen);
if (new_fd == -1 && errno == EINTR) {
goto retry_accept;
}
if (new_fd == -1 && errno == EBADF) {
qb_util_perror(LOG_ERR,
"Could not accept client connection from fd:%d",
fd);
return -1;
}
if (new_fd == -1) {
qb_util_perror(LOG_ERR, "Could not accept client connection");
/* This is an error, but -1 would indicate disconnect
* from the poll loop
*/
return 0;
}
res = qb_sys_fd_nonblock_cloexec_set(new_fd);
if (res < 0) {
close(new_fd);
/* This is an error, but -1 would indicate disconnect
* from the poll loop
*/
return 0;
}
qb_ipcs_uc_recv_and_auth(new_fd, s);
return 0;
}
void remove_tempdir(const char *name)
{
#if defined(QB_LINUX) || defined(QB_CYGWIN)
char dirname[PATH_MAX];
char *slash = strrchr(name, '/');
if (slash && slash - name < sizeof dirname) {
memcpy(dirname, name, slash - name);
dirname[slash - name] = '\0';
/* This gets called more than it needs to be really, so we don't check
* the return code. It's more of a desperate attempt to clean up after ourself
* in either the server or client.
*/
(void)rmdir(dirname);
}
#endif
}
diff --git a/lib/ipcc.c b/lib/ipcc.c
index a6cf409..c744ea1 100644
--- a/lib/ipcc.c
+++ b/lib/ipcc.c
@@ -1,474 +1,539 @@
/*
* Copyright (C) 2010 Red Hat, Inc.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os_base.h"
#include <poll.h>
#include "ipc_int.h"
#include "util_int.h"
#include <qb/qbdefs.h>
#include <qb/qbipcc.h>
qb_ipcc_connection_t *
qb_ipcc_connect(const char *name, size_t max_msg_size)
{
int32_t res;
qb_ipcc_connection_t *c = NULL;
struct qb_ipc_connection_response response;
c = calloc(1, sizeof(struct qb_ipcc_connection));
if (c == NULL) {
return NULL;
}
c->setup.max_msg_size = QB_MAX(max_msg_size,
sizeof(struct qb_ipc_connection_response));
(void)strlcpy(c->name, name, NAME_MAX);
res = qb_ipcc_us_setup_connect(c, &response);
if (res < 0) {
goto disconnect_and_cleanup;
}
+ qb_ipc_us_ready(&c->setup, NULL, -1, POLLIN);
+ res = qb_ipcc_connect_continue(c);
+ if (res != 0) {
+ /* qb_ipcc_connect_continue() has cleaned up for us */
+ errno = -res;
+ return NULL;
+ }
+
+ return c;
+
+disconnect_and_cleanup:
+ if (c->setup.u.us.sock >= 0) {
+ qb_ipcc_us_sock_close(c->setup.u.us.sock);
+ }
+ free(c->receive_buf);
+ free(c);
+ errno = -res;
+ return NULL;
+}
+
+qb_ipcc_connection_t *
+qb_ipcc_connect_async(const char *name, size_t max_msg_size, int *connect_fd)
+{
+ int32_t res;
+ qb_ipcc_connection_t *c = NULL;
+ struct qb_ipc_connection_response response;
+
+ c = calloc(1, sizeof(struct qb_ipcc_connection));
+ if (c == NULL) {
+ return NULL;
+ }
+
+ c->setup.max_msg_size = QB_MAX(max_msg_size,
+ sizeof(struct qb_ipc_connection_response));
+ (void)strlcpy(c->name, name, NAME_MAX);
+ res = qb_ipcc_us_setup_connect(c, &response);
+ if (res < 0) {
+ goto disconnect_and_cleanup;
+ }
+
+ *connect_fd = c->setup.u.us.sock;
+ return c;
+
+disconnect_and_cleanup:
+ if (c->setup.u.us.sock >= 0) {
+ qb_ipcc_us_sock_close(c->setup.u.us.sock);
+ }
+ free(c->receive_buf);
+ free(c);
+ errno = -res;
+ return NULL;
+}
+
+int qb_ipcc_connect_continue(struct qb_ipcc_connection * c)
+{
+ struct qb_ipc_connection_response response;
+ int32_t res;
+
+ /* Finish up the authentication part */
+ res = qb_ipcc_setup_connect_continue(c, &response);
+ if (res != 0) {
+ goto disconnect_and_cleanup;
+ }
+
c->response.type = response.connection_type;
c->request.type = response.connection_type;
c->event.type = response.connection_type;
c->setup.type = response.connection_type;
c->response.max_msg_size = response.max_msg_size;
c->request.max_msg_size = response.max_msg_size;
c->event.max_msg_size = response.max_msg_size;
c->receive_buf = calloc(1, response.max_msg_size);
c->fc_enable_max = 1;
if (c->receive_buf == NULL) {
res = -ENOMEM;
goto disconnect_and_cleanup;
}
switch (c->request.type) {
case QB_IPC_SHM:
res = qb_ipcc_shm_connect(c, &response);
break;
case QB_IPC_SOCKET:
res = qb_ipcc_us_connect(c, &response);
break;
case QB_IPC_POSIX_MQ:
case QB_IPC_SYSV_MQ:
res = -ENOTSUP;
break;
default:
res = -EINVAL;
break;
}
if (res != 0) {
goto disconnect_and_cleanup;
}
c->is_connected = QB_TRUE;
- return c;
+ return 0;
disconnect_and_cleanup:
if (c->setup.u.us.sock >= 0) {
qb_ipcc_us_sock_close(c->setup.u.us.sock);
}
free(c->receive_buf);
free(c);
errno = -res;
- return NULL;
+ return -res;
+
}
static int32_t
_check_connection_state_with(struct qb_ipcc_connection * c, int32_t res,
struct qb_ipc_one_way * one_way,
int32_t ms_timeout, int32_t events)
{
if (res >= 0) return res;
if (qb_ipc_us_sock_error_is_disconnected(res)) {
errno = -res;
qb_util_perror(LOG_DEBUG,
"interpreting result %d as a disconnect",
res);
c->is_connected = QB_FALSE;
}
if (res == -EAGAIN || res == -ETIMEDOUT) {
int32_t res2;
int32_t poll_ms = ms_timeout;
if (res == -ETIMEDOUT) {
poll_ms = 0;
}
res2 = qb_ipc_us_ready(one_way, &c->setup, poll_ms, events);
if (qb_ipc_us_sock_error_is_disconnected(res2)) {
errno = -res2;
qb_util_perror(LOG_DEBUG,
"%s %d %s",
"interpreting result",
res2,
"(from socket) as a disconnect");
c->is_connected = QB_FALSE;
res = res2;
} else if (res != -ETIMEDOUT) {
/* if the result we're checking against is a TIMEOUT error.
* don't override that result with another error that does
* not imply a disconnect */
res = res2;
}
}
return res;
}
static int32_t
_check_connection_state(struct qb_ipcc_connection * c, int32_t res)
{
if (res >= 0) return res;
if (qb_ipc_us_sock_error_is_disconnected(res)) {
errno = -res;
qb_util_perror(LOG_DEBUG,
"interpreting result %d as a disconnect",
res);
c->is_connected = QB_FALSE;
}
return res;
}
static struct qb_ipc_one_way *
_event_sock_one_way_get(struct qb_ipcc_connection * c)
{
if (c->needs_sock_for_poll) {
return &c->setup;
}
return &c->event;
}
static struct qb_ipc_one_way *
_response_sock_one_way_get(struct qb_ipcc_connection * c)
{
if (c->needs_sock_for_poll) {
return &c->setup;
}
return &c->response;
}
ssize_t
qb_ipcc_send(struct qb_ipcc_connection * c, const void *msg_ptr, size_t msg_len)
{
ssize_t res;
ssize_t res2;
if (c == NULL) {
return -EINVAL;
}
if (msg_len > c->request.max_msg_size) {
return -EMSGSIZE;
}
if (c->funcs.fc_get) {
res = c->funcs.fc_get(&c->request);
if (res < 0) {
return res;
} else if (res > 0 && res <= c->fc_enable_max) {
return -EAGAIN;
} else {
/*
* we can transmit
*/
}
}
res = c->funcs.send(&c->request, msg_ptr, msg_len);
if (res == msg_len && c->needs_sock_for_poll) {
do {
res2 = qb_ipc_us_send(&c->setup, msg_ptr, 1);
} while (res2 == -EAGAIN);
if (res2 == -EPIPE) {
res2 = -ENOTCONN;
}
if (res2 != 1) {
res = res2;
}
}
return _check_connection_state(c, res);
}
int32_t
qb_ipcc_fc_enable_max_set(struct qb_ipcc_connection * c, uint32_t max)
{
if (c == NULL || max > 2) {
return -EINVAL;
}
c->fc_enable_max = max;
return 0;
}
ssize_t
qb_ipcc_sendv(struct qb_ipcc_connection * c, const struct iovec * iov,
size_t iov_len)
{
int32_t total_size = 0;
int32_t i;
int32_t res;
int32_t res2;
for (i = 0; i < iov_len; i++) {
total_size += iov[i].iov_len;
}
if (c == NULL) {
return -EINVAL;
}
if (total_size > c->request.max_msg_size) {
return -EMSGSIZE;
}
if (c->funcs.fc_get) {
res = c->funcs.fc_get(&c->request);
if (res < 0) {
return res;
} else if (res > 0 && res <= c->fc_enable_max) {
return -EAGAIN;
} else {
/*
* we can transmit
*/
}
}
res = c->funcs.sendv(&c->request, iov, iov_len);
if (res > 0 && c->needs_sock_for_poll) {
do {
res2 = qb_ipc_us_send(&c->setup, &res, 1);
} while (res2 == -EAGAIN);
if (res2 == -EPIPE) {
res2 = -ENOTCONN;
}
if (res2 != 1) {
res = res2;
}
}
return _check_connection_state(c, res);
}
ssize_t
qb_ipcc_recv(struct qb_ipcc_connection * c, void *msg_ptr,
size_t msg_len, int32_t ms_timeout)
{
int32_t res = 0;
int32_t connect_res = 0;
if (c == NULL) {
return -EINVAL;
}
res = c->funcs.recv(&c->response, msg_ptr, msg_len, ms_timeout);
if (res >= 0) {
return res;
}
/* if we didn't get a msg, check connection state */
connect_res = _check_connection_state_with(c, res,
_response_sock_one_way_get(c),
ms_timeout, POLLIN);
/* only report the connection state check result if an error is returned. */
if (connect_res < 0) {
return connect_res;
}
return res;
}
ssize_t
qb_ipcc_sendv_recv(qb_ipcc_connection_t * c,
const struct iovec * iov, uint32_t iov_len,
void *res_msg, size_t res_len, int32_t ms_timeout)
{
ssize_t res = 0;
int32_t timeout_now;
int32_t timeout_rem = ms_timeout;
if (c == NULL) {
return -EINVAL;
}
if (c->funcs.fc_get) {
res = c->funcs.fc_get(&c->request);
if (res < 0) {
return res;
} else if (res > 0 && res <= c->fc_enable_max) {
return -EAGAIN;
} else {
/*
* we can transmit
*/
}
}
res = qb_ipcc_sendv(c, iov, iov_len);
if (res < 0) {
return res;
}
do {
/* following is a liveness-driven interleaving
(for cases the server side failed/exited) */
if (timeout_rem > QB_IPC_MAX_WAIT_MS || ms_timeout == -1) {
timeout_now = QB_IPC_MAX_WAIT_MS;
} else {
timeout_now = timeout_rem;
}
res = qb_ipcc_recv(c, res_msg, res_len, timeout_now);
if (res == -ETIMEDOUT) {
if (ms_timeout < 0) {
res = -EAGAIN;
} else {
timeout_rem -= timeout_now;
if (timeout_rem > 0) {
res = -EAGAIN;
}
}
} else if (res < 0 && res != -EAGAIN) {
errno = -res;
qb_util_perror(LOG_DEBUG,
"qb_ipcc_recv %d timeout:(%d/%d)",
res, timeout_now, timeout_rem);
}
} while (res == -EAGAIN && c->is_connected);
return res;
}
int32_t
qb_ipcc_fd_get(struct qb_ipcc_connection * c, int32_t * fd)
{
if (c == NULL) {
return -EINVAL;
}
if (c->event.type == QB_IPC_SOCKET) {
*fd = c->event.u.us.sock;
} else {
*fd = c->setup.u.us.sock;
}
return 0;
}
int32_t
qb_ipcc_auth_get(struct qb_ipcc_connection * c, pid_t *pid, uid_t *uid, gid_t *gid)
{
if (c == NULL) {
return -EINVAL;
}
if (pid) {
*pid = c->server_pid;
}
if (uid) {
*uid = c->euid;
}
if (gid) {
*gid = c->egid;
}
return 0;
}
ssize_t
qb_ipcc_event_recv(struct qb_ipcc_connection * c, void *msg_pt,
size_t msg_len, int32_t ms_timeout)
{
char one_byte = 1;
int32_t res;
ssize_t size;
if (c == NULL) {
return -EINVAL;
}
res = _check_connection_state_with(c, -EAGAIN, _event_sock_one_way_get(c),
ms_timeout, POLLIN);
if (res < 0) {
return res;
}
size = c->funcs.recv(&c->event, msg_pt, msg_len, ms_timeout);
if (size > 0 && c->needs_sock_for_poll) {
res = qb_ipc_us_recv(&c->setup, &one_byte, 1, -1);
if (res != 1) {
size = res;
}
}
return _check_connection_state(c, size);
}
void
qb_ipcc_disconnect(struct qb_ipcc_connection *c)
{
struct qb_ipc_one_way *ow = NULL;
qb_util_log(LOG_DEBUG, "%s()", __func__);
if (c == NULL) {
return;
}
ow = _event_sock_one_way_get(c);
(void)_check_connection_state_with(c, -EAGAIN, ow, 0, POLLIN);
if (c->funcs.disconnect) {
c->funcs.disconnect(c);
}
free(c->receive_buf);
free(c);
}
void
qb_ipcc_context_set(struct qb_ipcc_connection *c, void *context)
{
if (c == NULL) {
return;
}
c->context = context;
}
void *qb_ipcc_context_get(struct qb_ipcc_connection *c)
{
if (c == NULL) {
return NULL;
}
return c->context;
}
int32_t
qb_ipcc_is_connected(qb_ipcc_connection_t *c)
{
struct qb_ipc_one_way *ow;
if (c == NULL) {
return QB_FALSE;
}
ow = _response_sock_one_way_get(c);
(void)_check_connection_state_with(c, -EAGAIN, ow, 0, POLLIN);
return c->is_connected;
}
int32_t
qb_ipcc_get_buffer_size(qb_ipcc_connection_t * c)
{
if (c == NULL) {
return -EINVAL;
}
return c->event.max_msg_size;
}
diff --git a/tests/check_ipc.c b/tests/check_ipc.c
index e8f81f3..6090354 100644
--- a/tests/check_ipc.c
+++ b/tests/check_ipc.c
@@ -1,2370 +1,2450 @@
/*
* Copyright (c) 2010 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os_base.h"
#include <sys/wait.h>
#include <sys/un.h>
#include <signal.h>
#include <stdbool.h>
#include <fcntl.h>
#ifdef HAVE_GLIB
#include <glib.h>
#endif
#include "check_common.h"
#include <qb/qbdefs.h>
#include <qb/qblog.h>
#include <qb/qbipcc.h>
#include <qb/qbipcs.h>
#include <qb/qbloop.h>
#ifdef HAVE_FAILURE_INJECTION
#include "_failure_injection.h"
#endif
#define NUM_STRESS_CONNECTIONS 5000
static char ipc_name[256];
#define DEFAULT_MAX_MSG_SIZE (8192*16)
#ifndef __clang__
static int CALCULATED_DGRAM_MAX_MSG_SIZE = 0;
#define DGRAM_MAX_MSG_SIZE \
(CALCULATED_DGRAM_MAX_MSG_SIZE == 0 ? \
CALCULATED_DGRAM_MAX_MSG_SIZE = qb_ipcc_verify_dgram_max_msg_size(DEFAULT_MAX_MSG_SIZE) : \
CALCULATED_DGRAM_MAX_MSG_SIZE)
#define MAX_MSG_SIZE (ipc_type == QB_IPC_SOCKET ? DGRAM_MAX_MSG_SIZE : DEFAULT_MAX_MSG_SIZE)
#else
/* because of clang's
'variable length array in structure' extension will never be supported;
assign default for SHM as we'll skip test that would use run-time
established value (via qb_ipcc_verify_dgram_max_msg_size), anyway */
static const int MAX_MSG_SIZE = DEFAULT_MAX_MSG_SIZE;
#endif
/* The size the giant msg's data field needs to be to make
* this the largests msg we can successfully send. */
#define GIANT_MSG_DATA_SIZE MAX_MSG_SIZE - sizeof(struct qb_ipc_response_header) - 8
static int enforce_server_buffer;
static qb_ipcc_connection_t *conn;
static enum qb_ipc_type ipc_type;
static enum qb_loop_priority global_loop_prio = QB_LOOP_MED;
static bool global_use_glib;
static int global_pipefd[2];
enum my_msg_ids {
IPC_MSG_REQ_TX_RX,
IPC_MSG_RES_TX_RX,
IPC_MSG_REQ_DISPATCH,
IPC_MSG_RES_DISPATCH,
IPC_MSG_REQ_BULK_EVENTS,
IPC_MSG_RES_BULK_EVENTS,
IPC_MSG_REQ_STRESS_EVENT,
IPC_MSG_RES_STRESS_EVENT,
IPC_MSG_REQ_SELF_FEED,
IPC_MSG_RES_SELF_FEED,
IPC_MSG_REQ_SERVER_FAIL,
IPC_MSG_RES_SERVER_FAIL,
IPC_MSG_REQ_SERVER_DISCONNECT,
IPC_MSG_RES_SERVER_DISCONNECT,
};
/* these 2 functions from pacemaker code */
static enum qb_ipcs_rate_limit
conv_libqb_prio2ratelimit(enum qb_loop_priority prio)
{
/* this is an inversion of what libqb's qb_ipcs_request_rate_limit does */
enum qb_ipcs_rate_limit ret = QB_IPCS_RATE_NORMAL;
switch (prio) {
case QB_LOOP_LOW:
ret = QB_IPCS_RATE_SLOW;
break;
case QB_LOOP_HIGH:
ret = QB_IPCS_RATE_FAST;
break;
default:
qb_log(LOG_DEBUG, "Invalid libqb's loop priority %d,"
" assuming QB_LOOP_MED", prio);
/* fall-through */
case QB_LOOP_MED:
break;
}
return ret;
}
#ifdef HAVE_GLIB
static gint
conv_prio_libqb2glib(enum qb_loop_priority prio)
{
gint ret = G_PRIORITY_DEFAULT;
switch (prio) {
case QB_LOOP_LOW:
ret = G_PRIORITY_LOW;
break;
case QB_LOOP_HIGH:
ret = G_PRIORITY_HIGH;
break;
default:
qb_log(LOG_DEBUG, "Invalid libqb's loop priority %d,"
" assuming QB_LOOP_MED", prio);
/* fall-through */
case QB_LOOP_MED:
break;
}
return ret;
}
/* these 3 glue functions inspired from pacemaker, too */
static gboolean
gio_source_prepare(GSource *source, gint *timeout)
{
qb_enter();
*timeout = 500;
return FALSE;
}
static gboolean
gio_source_check(GSource *source)
{
qb_enter();
return TRUE;
}
static gboolean
gio_source_dispatch(GSource *source, GSourceFunc callback, gpointer user_data)
{
gboolean ret = G_SOURCE_CONTINUE;
qb_enter();
if (callback) {
ret = callback(user_data);
}
return ret;
}
static GSourceFuncs gio_source_funcs = {
.prepare = gio_source_prepare,
.check = gio_source_check,
.dispatch = gio_source_dispatch,
};
#endif
/* Test Cases
*
* 1) basic send & recv different message sizes
*
* 2) send message to start dispatch (confirm receipt)
*
* 3) flow control
*
* 4) authentication
*
* 5) thread safety
*
* 6) cleanup
*
* 7) service availability
*
* 8) multiple services
*
* 9) setting perms on the sockets
*/
static qb_loop_t *my_loop;
static qb_ipcs_service_t* s1;
static int32_t turn_on_fc = QB_FALSE;
static int32_t fc_enabled = 89;
static int32_t send_event_on_created = QB_FALSE;
static int32_t disconnect_after_created = QB_FALSE;
static int32_t num_bulk_events = 10;
static int32_t num_stress_events = 30000;
static int32_t reference_count_test = QB_FALSE;
static int32_t multiple_connections = QB_FALSE;
static int32_t set_perms_on_socket = QB_FALSE;
static int32_t
exit_handler(int32_t rsignal, void *data)
{
qb_log(LOG_DEBUG, "caught signal %d", rsignal);
qb_ipcs_destroy(s1);
exit(0);
}
static void
set_ipc_name(const char *prefix)
{
FILE *f;
char process_name[256];
/* The process-unique part of the IPC name has already been decided
* and stored in the file ipc-test-name.
*/
f = fopen("ipc-test-name", "r");
if (f) {
fgets(process_name, sizeof(process_name), f);
fclose(f);
snprintf(ipc_name, sizeof(ipc_name), "%.44s%s", prefix, process_name);
} else {
/* This is the old code, use only as a fallback */
static char t_sec[3] = "";
if (t_sec[0] == '\0') {
const char *const found = strrchr(__TIME__, ':');
strncpy(t_sec, found ? found + 1 : "-", sizeof(t_sec) - 1);
t_sec[sizeof(t_sec) - 1] = '\0';
}
snprintf(ipc_name, sizeof(ipc_name), "%.44s%s%lX%.4x", prefix, t_sec,
(unsigned long)getpid(), (unsigned) ((long) time(NULL) % (0x10000)));
}
}
static int
pipe_writer(int fd, int revents, void *data) {
qb_enter();
static const char buf[8] = { 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h' };
ssize_t wbytes = 0, wbytes_sum = 0;
//for (size_t i = 0; i < SIZE_MAX; i++) {
for (size_t i = 0; i < 4096; i++) {
wbytes_sum += wbytes;
if ((wbytes = write(fd, buf, sizeof(buf))) == -1) {
if (errno != EAGAIN) {
perror("write");
exit(-1);
}
break;
}
}
if (wbytes_sum > 0) {
qb_log(LOG_DEBUG, "written %zd bytes", wbytes_sum);
}
qb_leave();
return 1;
}
static int
pipe_reader(int fd, int revents, void *data) {
qb_enter();
ssize_t rbytes, rbytes_sum = 0;
size_t cnt = SIZE_MAX;
char buf[4096] = { '\0' };
while ((rbytes = read(fd, buf, sizeof(buf))) > 0 && rbytes < cnt) {
cnt -= rbytes;
rbytes_sum += rbytes;
}
if (rbytes_sum > 0) {
ck_assert(buf[0] != '\0'); /* avoid dead store elimination */
qb_log(LOG_DEBUG, "read %zd bytes", rbytes_sum);
sleep(1);
}
qb_leave();
return 1;
}
#if HAVE_GLIB
static gboolean
gio_pipe_reader(void *data) {
return (pipe_reader(*((int *) data), 0, NULL) > 0);
}
static gboolean
gio_pipe_writer(void *data) {
return (pipe_writer(*((int *) data), 0, NULL) > 0);
}
#endif
static int32_t
s1_msg_process_fn(qb_ipcs_connection_t *c,
void *data, size_t size)
{
struct qb_ipc_request_header *req_pt = (struct qb_ipc_request_header *)data;
struct qb_ipc_response_header response = { 0, };
ssize_t res;
if (req_pt->id == IPC_MSG_REQ_TX_RX) {
response.size = sizeof(struct qb_ipc_response_header);
response.id = IPC_MSG_RES_TX_RX;
response.error = 0;
res = qb_ipcs_response_send(c, &response, response.size);
if (res < 0) {
qb_perror(LOG_INFO, "qb_ipcs_response_send");
} else if (res != response.size) {
qb_log(LOG_DEBUG, "qb_ipcs_response_send %zd != %d",
res, response.size);
}
if (turn_on_fc) {
qb_ipcs_request_rate_limit(s1, QB_IPCS_RATE_OFF);
}
} else if (req_pt->id == IPC_MSG_REQ_DISPATCH) {
response.size = sizeof(struct qb_ipc_response_header);
response.id = IPC_MSG_RES_DISPATCH;
response.error = 0;
res = qb_ipcs_event_send(c, &response,
sizeof(response));
if (res < 0) {
qb_perror(LOG_INFO, "qb_ipcs_event_send");
}
} else if (req_pt->id == IPC_MSG_REQ_BULK_EVENTS) {
int32_t m;
int32_t num;
struct qb_ipcs_connection_stats_2 *stats;
uint32_t max_size = MAX_MSG_SIZE;
response.size = sizeof(struct qb_ipc_response_header);
response.error = 0;
stats = qb_ipcs_connection_stats_get_2(c, QB_FALSE);
num = stats->event_q_length;
free(stats);
/* crazy large message */
res = qb_ipcs_event_send(c, &response, max_size*10);
ck_assert_int_eq(res, -EMSGSIZE);
/* send one event before responding */
res = qb_ipcs_event_send(c, &response, sizeof(response));
ck_assert_int_eq(res, sizeof(response));
response.id++;
/* There should be one more item in the event queue now. */
stats = qb_ipcs_connection_stats_get_2(c, QB_FALSE);
ck_assert_int_eq(stats->event_q_length - num, 1);
free(stats);
/* send response */
response.id = IPC_MSG_RES_BULK_EVENTS;
res = qb_ipcs_response_send(c, &response, response.size);
ck_assert_int_eq(res, sizeof(response));
/* send the rest of the events after the response */
for (m = 1; m < num_bulk_events; m++) {
res = qb_ipcs_event_send(c, &response, sizeof(response));
if (res == -EAGAIN || res == -ENOBUFS) {
/* retry */
usleep(1000);
m--;
continue;
}
ck_assert_int_eq(res, sizeof(response));
response.id++;
}
} else if (req_pt->id == IPC_MSG_REQ_STRESS_EVENT) {
struct {
struct qb_ipc_response_header hdr __attribute__ ((aligned(8)));
char data[GIANT_MSG_DATA_SIZE] __attribute__ ((aligned(8)));
uint32_t sent_msgs __attribute__ ((aligned(8)));
} __attribute__ ((aligned(8))) giant_event_send;
int32_t m;
response.size = sizeof(struct qb_ipc_response_header);
response.error = 0;
response.id = IPC_MSG_RES_STRESS_EVENT;
res = qb_ipcs_response_send(c, &response, response.size);
ck_assert_int_eq(res, sizeof(response));
giant_event_send.hdr.error = 0;
giant_event_send.hdr.id = IPC_MSG_RES_STRESS_EVENT;
for (m = 0; m < num_stress_events; m++) {
size_t sent_len = sizeof(struct qb_ipc_response_header);
if (((m+1) % 1000) == 0) {
sent_len = sizeof(giant_event_send);
giant_event_send.sent_msgs = m + 1;
}
giant_event_send.hdr.size = sent_len;
res = qb_ipcs_event_send(c, &giant_event_send, sent_len);
if (res < 0) {
if (res == -EAGAIN || res == -ENOBUFS) {
/* yield to the receive process */
usleep(1000);
m--;
continue;
} else {
qb_perror(LOG_DEBUG, "sending stress events");
ck_assert_int_eq(res, sent_len);
}
} else if (((m+1) % 1000) == 0) {
qb_log(LOG_DEBUG, "SENT: %d stress events sent", m+1);
}
giant_event_send.hdr.id++;
}
} else if (req_pt->id == IPC_MSG_REQ_SELF_FEED) {
if (pipe(global_pipefd) != 0) {
perror("pipefd");
ck_assert(0);
}
fcntl(global_pipefd[0], F_SETFL, O_NONBLOCK);
fcntl(global_pipefd[1], F_SETFL, O_NONBLOCK);
if (global_use_glib) {
#ifdef HAVE_GLIB
GSource *source_r, *source_w;
source_r = g_source_new(&gio_source_funcs, sizeof(GSource));
source_w = g_source_new(&gio_source_funcs, sizeof(GSource));
ck_assert(source_r != NULL && source_w != NULL);
g_source_set_priority(source_r, conv_prio_libqb2glib(QB_LOOP_HIGH));
g_source_set_priority(source_w, conv_prio_libqb2glib(QB_LOOP_HIGH));
g_source_set_can_recurse(source_r, FALSE);
g_source_set_can_recurse(source_w, FALSE);
g_source_set_callback(source_r, gio_pipe_reader, &global_pipefd[0], NULL);
g_source_set_callback(source_w, gio_pipe_writer, &global_pipefd[1], NULL);
g_source_add_unix_fd(source_r, global_pipefd[0], G_IO_IN);
g_source_add_unix_fd(source_w, global_pipefd[1], G_IO_OUT);
g_source_attach(source_r, NULL);
g_source_attach(source_w, NULL);
#else
ck_assert(0);
#endif
} else {
qb_loop_poll_add(my_loop, QB_LOOP_HIGH, global_pipefd[1],
POLLOUT|POLLERR, NULL, pipe_writer);
qb_loop_poll_add(my_loop, QB_LOOP_HIGH, global_pipefd[0],
POLLIN|POLLERR, NULL, pipe_reader);
}
} else if (req_pt->id == IPC_MSG_REQ_SERVER_FAIL) {
exit(0);
} else if (req_pt->id == IPC_MSG_REQ_SERVER_DISCONNECT) {
multiple_connections = QB_FALSE;
qb_ipcs_disconnect(c);
}
return 0;
}
static int32_t
my_job_add(enum qb_loop_priority p,
void *data,
qb_loop_job_dispatch_fn fn)
{
return qb_loop_job_add(my_loop, p, data, fn);
}
static int32_t
my_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t events,
void *data, qb_ipcs_dispatch_fn_t fn)
{
return qb_loop_poll_add(my_loop, p, fd, events, data, fn);
}
static int32_t
my_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t events,
void *data, qb_ipcs_dispatch_fn_t fn)
{
return qb_loop_poll_mod(my_loop, p, fd, events, data, fn);
}
static int32_t
my_dispatch_del(int32_t fd)
{
return qb_loop_poll_del(my_loop, fd);
}
/* taken from examples/ipcserver.c, with s/my_g/gio/ */
#ifdef HAVE_GLIB
#include <qb/qbarray.h>
static qb_array_t *gio_map;
static GMainLoop *glib_loop;
struct gio_to_qb_poll {
int32_t is_used;
int32_t events;
int32_t source;
int32_t fd;
void *data;
qb_ipcs_dispatch_fn_t fn;
enum qb_loop_priority p;
};
static gboolean
gio_read_socket(GIOChannel * gio, GIOCondition condition, gpointer data)
{
struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
gint fd = g_io_channel_unix_get_fd(gio);
qb_enter();
return (adaptor->fn(fd, condition, adaptor->data) == 0);
}
static void
gio_poll_destroy(gpointer data)
{
struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
adaptor->is_used--;
if (adaptor->is_used == 0) {
qb_log(LOG_DEBUG, "fd %d adaptor destroyed\n", adaptor->fd);
adaptor->fd = 0;
adaptor->source = 0;
}
}
static int32_t
gio_dispatch_update(enum qb_loop_priority p, int32_t fd, int32_t evts,
void *data, qb_ipcs_dispatch_fn_t fn, gboolean is_new)
{
struct gio_to_qb_poll *adaptor;
GIOChannel *channel;
int32_t res = 0;
qb_enter();
res = qb_array_index(gio_map, fd, (void **)&adaptor);
if (res < 0) {
return res;
}
if (adaptor->is_used && adaptor->source) {
if (is_new) {
return -EEXIST;
}
g_source_remove(adaptor->source);
adaptor->source = 0;
}
channel = g_io_channel_unix_new(fd);
if (!channel) {
return -ENOMEM;
}
adaptor->fn = fn;
adaptor->events = evts;
adaptor->data = data;
adaptor->p = p;
adaptor->is_used++;
adaptor->fd = fd;
adaptor->source = g_io_add_watch_full(channel, conv_prio_libqb2glib(p),
evts, gio_read_socket, adaptor,
gio_poll_destroy);
/* we are handing the channel off to be managed by mainloop now.
* remove our reference. */
g_io_channel_unref(channel);
return 0;
}
static int32_t
gio_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts,
void *data, qb_ipcs_dispatch_fn_t fn)
{
return gio_dispatch_update(p, fd, evts, data, fn, TRUE);
}
static int32_t
gio_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t evts,
void *data, qb_ipcs_dispatch_fn_t fn)
{
return gio_dispatch_update(p, fd, evts, data, fn, FALSE);
}
static int32_t
gio_dispatch_del(int32_t fd)
{
struct gio_to_qb_poll *adaptor;
if (qb_array_index(gio_map, fd, (void **)&adaptor) == 0) {
g_source_remove(adaptor->source);
adaptor->source = 0;
}
return 0;
}
#endif /* HAVE_GLIB */
static int32_t
s1_connection_closed(qb_ipcs_connection_t *c)
{
if (multiple_connections) {
return 0;
}
/* Stop the connection being freed when we call qb_ipcs_disconnect()
in the callback */
if (disconnect_after_created == QB_TRUE) {
disconnect_after_created = QB_FALSE;
return 1;
}
qb_enter();
qb_leave();
return 0;
}
static void
outq_flush (void *data)
{
static int i = 0;
struct cs_ipcs_conn_context *cnx;
cnx = qb_ipcs_context_get(data);
qb_log(LOG_DEBUG,"iter %u\n", i);
i++;
if (i == 2) {
qb_ipcs_destroy(s1);
s1 = NULL;
}
/* if the reference counting is not working, this should fail
* for i > 1.
*/
qb_ipcs_event_send(data, "test", 4);
assert(memcmp(cnx, "test", 4) == 0);
if (i < 5) {
qb_loop_job_add(my_loop, QB_LOOP_HIGH, data, outq_flush);
} else {
/* this single unref should clean everything up.
*/
qb_ipcs_connection_unref(data);
qb_log(LOG_INFO, "end of test, stopping loop");
qb_loop_stop(my_loop);
}
}
static void
s1_connection_destroyed(qb_ipcs_connection_t *c)
{
if (multiple_connections) {
return;
}
qb_enter();
if (reference_count_test) {
struct cs_ipcs_conn_context *cnx;
cnx = qb_ipcs_context_get(c);
free(cnx);
} else {
qb_loop_stop(my_loop);
}
qb_leave();
}
static int32_t
s1_connection_accept(qb_ipcs_connection_t *c, uid_t uid, gid_t gid)
{
if (set_perms_on_socket) {
qb_ipcs_connection_auth_set(c, 555, 741, S_IRWXU|S_IRWXG|S_IROTH|S_IWOTH);
}
return 0;
}
static void
s1_connection_created(qb_ipcs_connection_t *c)
{
uint32_t max = MAX_MSG_SIZE;
if (multiple_connections) {
return;
}
if (send_event_on_created) {
struct qb_ipc_response_header response;
int32_t res;
response.size = sizeof(struct qb_ipc_response_header);
response.id = IPC_MSG_RES_DISPATCH;
response.error = 0;
res = qb_ipcs_event_send(c, &response,
sizeof(response));
ck_assert_int_eq(res, response.size);
}
if (reference_count_test) {
struct cs_ipcs_conn_context *context;
qb_ipcs_connection_ref(c);
qb_loop_job_add(my_loop, QB_LOOP_HIGH, c, outq_flush);
context = calloc(1, 20);
memcpy(context, "test", 4);
qb_ipcs_context_set(c, context);
}
ck_assert_int_eq(max, qb_ipcs_connection_get_buffer_size(c));
}
static volatile sig_atomic_t usr1_bit;
static void usr1_bit_setter(int signal) {
if (signal == SIGUSR1) {
usr1_bit = 1;
}
}
#define READY_SIGNALLER(name, data_arg) void (name)(void *data_arg)
typedef READY_SIGNALLER(ready_signaller_fn, );
static
READY_SIGNALLER(usr1_signaller, parent_target)
{
kill(*((pid_t *) parent_target), SIGUSR1);
}
#define NEW_PROCESS_RUNNER(name, ready_signaller_arg, signaller_data_arg, data_arg) \
void (name)(ready_signaller_fn ready_signaller_arg, \
void *signaller_data_arg, void *data_arg)
typedef NEW_PROCESS_RUNNER(new_process_runner_fn, , , );
static
NEW_PROCESS_RUNNER(run_ipc_server, ready_signaller, signaller_data, data)
{
int32_t res;
qb_loop_signal_handle handle;
struct qb_ipcs_service_handlers sh = {
.connection_accept = s1_connection_accept,
.connection_created = s1_connection_created,
.msg_process = s1_msg_process_fn,
.connection_destroyed = s1_connection_destroyed,
.connection_closed = s1_connection_closed,
};
struct qb_ipcs_poll_handlers ph;
uint32_t max_size = MAX_MSG_SIZE;
my_loop = qb_loop_create();
qb_loop_signal_add(my_loop, QB_LOOP_HIGH, SIGTERM,
NULL, exit_handler, &handle);
s1 = qb_ipcs_create(ipc_name, 4, ipc_type, &sh);
ck_assert(s1 != 0);
if (global_loop_prio != QB_LOOP_MED) {
qb_ipcs_request_rate_limit(s1,
conv_libqb_prio2ratelimit(global_loop_prio));
}
if (global_use_glib) {
#ifdef HAVE_GLIB
ph = (struct qb_ipcs_poll_handlers) {
.job_add = NULL,
.dispatch_add = gio_dispatch_add,
.dispatch_mod = gio_dispatch_mod,
.dispatch_del = gio_dispatch_del,
};
glib_loop = g_main_loop_new(NULL, FALSE);
gio_map = qb_array_create_2(16, sizeof(struct gio_to_qb_poll), 1);
ck_assert(gio_map != NULL);
#else
ck_assert(0);
#endif
} else {
ph = (struct qb_ipcs_poll_handlers) {
.job_add = my_job_add,
.dispatch_add = my_dispatch_add,
.dispatch_mod = my_dispatch_mod,
.dispatch_del = my_dispatch_del,
};
}
if (enforce_server_buffer) {
qb_ipcs_enforce_buffer_size(s1, max_size);
}
qb_ipcs_poll_handlers_set(s1, &ph);
res = qb_ipcs_run(s1);
ck_assert_int_eq(res, 0);
if (ready_signaller != NULL) {
ready_signaller(signaller_data);
}
if (global_use_glib) {
#ifdef HAVE_GLIB
g_main_loop_run(glib_loop);
#endif
} else {
qb_loop_run(my_loop);
}
qb_log(LOG_DEBUG, "loop finished - done ...");
}
static pid_t
run_function_in_new_process(const char *role,
new_process_runner_fn new_process_runner,
void *data)
{
char formatbuf[1024];
pid_t parent_target, pid1, pid2;
struct sigaction orig_sa, purpose_sa;
sigset_t orig_mask, purpose_mask, purpose_clear_mask;
sigemptyset(&purpose_mask);
sigaddset(&purpose_mask, SIGUSR1);
sigprocmask(SIG_BLOCK, &purpose_mask, &orig_mask);
purpose_clear_mask = orig_mask;
sigdelset(&purpose_clear_mask, SIGUSR1);
purpose_sa.sa_handler = usr1_bit_setter;
purpose_sa.sa_mask = purpose_mask;
purpose_sa.sa_flags = SA_RESTART;
/* Double-fork so the servers can be reaped in a timely manner */
parent_target = getpid();
pid1 = fork();
if (pid1 == 0) {
pid2 = fork();
if (pid2 == -1) {
fprintf (stderr, "Can't fork twice\n");
exit(0);
}
if (pid2 == 0) {
sigprocmask(SIG_SETMASK, &orig_mask, NULL);
if (role == NULL) {
qb_log_format_set(QB_LOG_STDERR, "lib/%f|%l[%P] %b");
} else {
snprintf(formatbuf, sizeof(formatbuf),
"lib/%%f|%%l|%s[%%P] %%b", role);
qb_log_format_set(QB_LOG_STDERR, formatbuf);
}
new_process_runner(usr1_signaller, &parent_target, data);
exit(0);
} else {
waitpid(pid2, NULL, 0);
exit(0);
}
}
usr1_bit = 0;
/* XXX assume never fails */
sigaction(SIGUSR1, &purpose_sa, &orig_sa);
do {
/* XXX assume never fails with EFAULT */
sigsuspend(&purpose_clear_mask);
} while (usr1_bit != 1);
usr1_bit = 0;
sigprocmask(SIG_SETMASK, &orig_mask, NULL);
/* give children a slight/non-strict scheduling advantage */
sched_yield();
return pid1;
}
static void
request_server_exit(void)
{
struct qb_ipc_request_header req_header;
struct qb_ipc_response_header res_header;
struct iovec iov[1];
int32_t res;
/*
* tell the server to exit
*/
req_header.id = IPC_MSG_REQ_SERVER_FAIL;
req_header.size = sizeof(struct qb_ipc_request_header);
iov[0].iov_len = req_header.size;
iov[0].iov_base = &req_header;
ck_assert_int_eq(QB_TRUE, qb_ipcc_is_connected(conn));
res = qb_ipcc_sendv_recv(conn, iov, 1,
&res_header,
sizeof(struct qb_ipc_response_header), -1);
/*
* confirm we get -ENOTCONN or ECONNRESET
*/
if (res != -ECONNRESET && res != -ENOTCONN) {
qb_log(LOG_ERR, "id:%d size:%d", res_header.id, res_header.size);
ck_assert_int_eq(res, -ENOTCONN);
}
}
static void
kill_server(pid_t pid)
{
kill(pid, SIGTERM);
waitpid(pid, NULL, 0);
}
static int32_t
verify_graceful_stop(pid_t pid)
{
int wait_rc = 0;
int status = 0;
int rc = 0;
int tries;
/* We need the server to be able to exit by itself */
for (tries = 10; tries >= 0; tries--) {
sleep(1);
wait_rc = waitpid(pid, &status, WNOHANG);
if (wait_rc > 0) {
break;
}
}
ck_assert_int_eq(wait_rc, pid);
rc = WIFEXITED(status);
if (rc) {
rc = WEXITSTATUS(status);
ck_assert_int_eq(rc, 0);
} else {
ck_assert(rc != 0);
}
return 0;
}
struct my_req {
struct qb_ipc_request_header hdr;
char message[1024 * 1024];
};
static struct my_req request;
static int32_t
send_and_check(int32_t req_id, uint32_t size,
int32_t ms_timeout, int32_t expect_perfection)
{
struct qb_ipc_response_header res_header;
int32_t res;
int32_t try_times = 0;
uint32_t max_size = MAX_MSG_SIZE;
request.hdr.id = req_id;
request.hdr.size = sizeof(struct qb_ipc_request_header) + size;
/* check that we can't send a message that is too big
* and we get the right return code.
*/
res = qb_ipcc_send(conn, &request, max_size*2);
ck_assert_int_eq(res, -EMSGSIZE);
repeat_send:
res = qb_ipcc_send(conn, &request, request.hdr.size);
try_times++;
if (res < 0) {
if (res == -EAGAIN && try_times < 10) {
goto repeat_send;
} else {
if (res == -EAGAIN && try_times >= 10) {
fc_enabled = QB_TRUE;
}
errno = -res;
qb_perror(LOG_INFO, "qb_ipcc_send");
return res;
}
}
if (req_id == IPC_MSG_REQ_DISPATCH) {
res = qb_ipcc_event_recv(conn, &res_header,
sizeof(struct qb_ipc_response_header),
ms_timeout);
} else {
res = qb_ipcc_recv(conn, &res_header,
sizeof(struct qb_ipc_response_header),
ms_timeout);
}
if (res == -EINTR) {
return -1;
}
if (res == -EAGAIN || res == -ETIMEDOUT) {
fc_enabled = QB_TRUE;
qb_perror(LOG_DEBUG, "qb_ipcc_recv");
return res;
}
if (expect_perfection) {
ck_assert_int_eq(res, sizeof(struct qb_ipc_response_header));
ck_assert_int_eq(res_header.id, req_id + 1);
ck_assert_int_eq(res_header.size, sizeof(struct qb_ipc_response_header));
}
return res;
}
+
+static int32_t
+process_async_connect(int32_t fd, int32_t revents, void *data)
+{
+ qb_loop_t *cl = (qb_loop_t *)data;
+ int res;
+
+ res = qb_ipcc_connect_continue(conn);
+ ck_assert_int_eq(res, 0);
+ qb_loop_stop(cl);
+ return 0;
+}
+static void test_ipc_connect_async(void)
+{
+ struct qb_ipc_request_header req_header;
+ struct qb_ipc_response_header res_header;
+ int32_t res;
+ pid_t pid;
+ uint32_t max_size = MAX_MSG_SIZE;
+ int connect_fd;
+ struct iovec iov[1];
+ static qb_loop_t *cl;
+
+ pid = run_function_in_new_process("server", run_ipc_server, NULL);
+ ck_assert(pid != -1);
+
+ conn = qb_ipcc_connect_async(ipc_name, max_size, &connect_fd);
+ ck_assert(conn != NULL);
+
+ cl = qb_loop_create();
+ res = qb_loop_poll_add(cl, QB_LOOP_MED,
+ connect_fd, POLLIN,
+ cl, process_async_connect);
+ ck_assert_int_eq(res, 0);
+ qb_loop_run(cl);
+
+ /* Send some data */
+ req_header.id = IPC_MSG_REQ_TX_RX;
+ req_header.size = sizeof(struct qb_ipc_request_header);
+
+ iov[0].iov_len = req_header.size;
+ iov[0].iov_base = &req_header;
+
+ res = qb_ipcc_sendv_recv(conn, iov, 1,
+ &res_header,
+ sizeof(struct qb_ipc_response_header), 5000);
+
+ ck_assert_int_ge(res, 0);
+
+ request_server_exit();
+ verify_graceful_stop(pid);
+
+
+ qb_ipcc_disconnect(conn);
+}
+
static void
test_ipc_txrx_timeout(void)
{
struct qb_ipc_request_header req_header;
struct qb_ipc_response_header res_header;
struct iovec iov[1];
int32_t res;
int32_t c = 0;
int32_t j = 0;
pid_t pid;
uint32_t max_size = MAX_MSG_SIZE;
pid = run_function_in_new_process("server", run_ipc_server, NULL);
ck_assert(pid != -1);
do {
conn = qb_ipcc_connect(ipc_name, max_size);
if (conn == NULL) {
j = waitpid(pid, NULL, WNOHANG);
ck_assert_int_eq(j, 0);
poll(NULL, 0, 400);
c++;
}
} while (conn == NULL && c < 5);
ck_assert(conn != NULL);
/* The dispatch response will only come over
* the event channel, we want to verify the receive times
* out when an event is returned with no response */
req_header.id = IPC_MSG_REQ_DISPATCH;
req_header.size = sizeof(struct qb_ipc_request_header);
iov[0].iov_len = req_header.size;
iov[0].iov_base = &req_header;
res = qb_ipcc_sendv_recv(conn, iov, 1,
&res_header,
sizeof(struct qb_ipc_response_header), 5000);
ck_assert_int_eq(res, -ETIMEDOUT);
request_server_exit();
verify_graceful_stop(pid);
/*
* this needs to free up the shared mem
*/
qb_ipcc_disconnect(conn);
}
static int32_t recv_timeout = -1;
static void
test_ipc_txrx(void)
{
int32_t j;
int32_t c = 0;
size_t size;
pid_t pid;
uint32_t max_size = MAX_MSG_SIZE;
pid = run_function_in_new_process("server", run_ipc_server, NULL);
ck_assert(pid != -1);
do {
conn = qb_ipcc_connect(ipc_name, max_size);
if (conn == NULL) {
j = waitpid(pid, NULL, WNOHANG);
ck_assert_int_eq(j, 0);
poll(NULL, 0, 400);
c++;
}
} while (conn == NULL && c < 5);
ck_assert(conn != NULL);
size = QB_MIN(sizeof(struct qb_ipc_request_header), 64);
for (j = 1; j < 19; j++) {
size *= 2;
if (size >= max_size)
break;
if (send_and_check(IPC_MSG_REQ_TX_RX, size,
recv_timeout, QB_TRUE) < 0) {
break;
}
}
if (turn_on_fc) {
/* can't signal server to shutdown if flow control is on */
ck_assert_int_eq(fc_enabled, QB_TRUE);
qb_ipcc_disconnect(conn);
/* TODO - figure out why this sleep is necessary */
sleep(1);
kill_server(pid);
} else {
request_server_exit();
qb_ipcc_disconnect(conn);
verify_graceful_stop(pid);
}
}
static void
test_ipc_getauth(void)
{
int32_t j;
int32_t c = 0;
pid_t pid;
pid_t spid;
uid_t suid;
gid_t sgid;
int res;
uint32_t max_size = MAX_MSG_SIZE;
pid = run_function_in_new_process("server", run_ipc_server, NULL);
ck_assert(pid != -1);
do {
conn = qb_ipcc_connect(ipc_name, max_size);
if (conn == NULL) {
j = waitpid(pid, NULL, WNOHANG);
ck_assert_int_eq(j, 0);
poll(NULL, 0, 400);
c++;
}
} while (conn == NULL && c < 5);
ck_assert(conn != NULL);
res = qb_ipcc_auth_get(NULL, NULL, NULL, NULL);
ck_assert(res == -EINVAL);
res = qb_ipcc_auth_get(conn, &spid, &suid, &sgid);
ck_assert(res == 0);
#ifndef HAVE_GETPEEREID
/* GETPEEREID doesn't return a PID */
ck_assert(spid != 0);
#endif
ck_assert(suid == getuid());
ck_assert(sgid == getgid());
request_server_exit();
qb_ipcc_disconnect(conn);
verify_graceful_stop(pid);
}
static void
test_ipc_exit(void)
{
struct qb_ipc_request_header req_header;
struct qb_ipc_response_header res_header;
struct iovec iov[1];
int32_t res;
int32_t c = 0;
int32_t j = 0;
pid_t pid;
uint32_t max_size = MAX_MSG_SIZE;
pid = run_function_in_new_process("server", run_ipc_server, NULL);
ck_assert(pid != -1);
do {
conn = qb_ipcc_connect(ipc_name, max_size);
if (conn == NULL) {
j = waitpid(pid, NULL, WNOHANG);
ck_assert_int_eq(j, 0);
poll(NULL, 0, 400);
c++;
}
} while (conn == NULL && c < 5);
ck_assert(conn != NULL);
req_header.id = IPC_MSG_REQ_TX_RX;
req_header.size = sizeof(struct qb_ipc_request_header);
iov[0].iov_len = req_header.size;
iov[0].iov_base = &req_header;
res = qb_ipcc_sendv_recv(conn, iov, 1,
&res_header,
sizeof(struct qb_ipc_response_header), -1);
ck_assert_int_eq(res, sizeof(struct qb_ipc_response_header));
request_server_exit();
verify_graceful_stop(pid);
/*
* this needs to free up the shared mem
*/
qb_ipcc_disconnect(conn);
}
START_TEST(test_ipc_exit_us)
{
qb_enter();
ipc_type = QB_IPC_SOCKET;
set_ipc_name(__func__);
recv_timeout = 5000;
test_ipc_exit();
qb_leave();
}
END_TEST
START_TEST(test_ipc_exit_shm)
{
qb_enter();
ipc_type = QB_IPC_SHM;
set_ipc_name(__func__);
recv_timeout = 1000;
test_ipc_exit();
qb_leave();
}
END_TEST
START_TEST(test_ipc_txrx_shm_timeout)
{
qb_enter();
ipc_type = QB_IPC_SHM;
set_ipc_name(__func__);
test_ipc_txrx_timeout();
qb_leave();
}
END_TEST
+
START_TEST(test_ipc_txrx_us_timeout)
{
qb_enter();
ipc_type = QB_IPC_SOCKET;
set_ipc_name(__func__);
test_ipc_txrx_timeout();
qb_leave();
}
END_TEST
+START_TEST(test_ipc_shm_connect_async)
+{
+ qb_enter();
+ ipc_type = QB_IPC_SHM;
+ set_ipc_name(__func__);
+ test_ipc_connect_async();
+ qb_leave();
+}
+END_TEST
+
+START_TEST(test_ipc_us_connect_async)
+{
+ qb_enter();
+ ipc_type = QB_IPC_SHM;
+ set_ipc_name(__func__);
+ test_ipc_connect_async();
+ qb_leave();
+}
+END_TEST
START_TEST(test_ipc_txrx_shm_getauth)
{
qb_enter();
ipc_type = QB_IPC_SHM;
set_ipc_name(__func__);
test_ipc_getauth();
qb_leave();
}
END_TEST
START_TEST(test_ipc_txrx_us_getauth)
{
qb_enter();
ipc_type = QB_IPC_SOCKET;
set_ipc_name(__func__);
test_ipc_getauth();
qb_leave();
}
END_TEST
START_TEST(test_ipc_txrx_shm_tmo)
{
qb_enter();
turn_on_fc = QB_FALSE;
ipc_type = QB_IPC_SHM;
set_ipc_name(__func__);
recv_timeout = 1000;
test_ipc_txrx();
qb_leave();
}
END_TEST
START_TEST(test_ipc_txrx_shm_block)
{
qb_enter();
turn_on_fc = QB_FALSE;
ipc_type = QB_IPC_SHM;
set_ipc_name(__func__);
recv_timeout = -1;
test_ipc_txrx();
qb_leave();
}
END_TEST
START_TEST(test_ipc_fc_shm)
{
qb_enter();
turn_on_fc = QB_TRUE;
ipc_type = QB_IPC_SHM;
recv_timeout = 500;
set_ipc_name(__func__);
test_ipc_txrx();
qb_leave();
}
END_TEST
START_TEST(test_ipc_txrx_us_block)
{
qb_enter();
turn_on_fc = QB_FALSE;
ipc_type = QB_IPC_SOCKET;
set_ipc_name(__func__);
recv_timeout = -1;
test_ipc_txrx();
qb_leave();
}
END_TEST
START_TEST(test_ipc_txrx_us_tmo)
{
qb_enter();
turn_on_fc = QB_FALSE;
ipc_type = QB_IPC_SOCKET;
set_ipc_name(__func__);
recv_timeout = 1000;
test_ipc_txrx();
qb_leave();
}
END_TEST
START_TEST(test_ipc_fc_us)
{
qb_enter();
turn_on_fc = QB_TRUE;
ipc_type = QB_IPC_SOCKET;
recv_timeout = 500;
set_ipc_name(__func__);
test_ipc_txrx();
qb_leave();
}
END_TEST
struct my_res {
struct qb_ipc_response_header hdr;
char message[1024 * 1024];
};
struct dispatch_data {
pid_t server_pid;
enum my_msg_ids msg_type;
uint32_t repetitions;
};
static inline
NEW_PROCESS_RUNNER(client_dispatch, ready_signaller, signaller_data, data)
{
uint32_t max_size = MAX_MSG_SIZE;
int32_t size;
int32_t c = 0;
int32_t j;
pid_t server_pid = ((struct dispatch_data *) data)->server_pid;
enum my_msg_ids msg_type = ((struct dispatch_data *) data)->msg_type;
do {
conn = qb_ipcc_connect(ipc_name, max_size);
if (conn == NULL) {
j = waitpid(server_pid, NULL, WNOHANG);
ck_assert_int_eq(j, 0);
poll(NULL, 0, 400);
c++;
}
} while (conn == NULL && c < 5);
ck_assert(conn != NULL);
if (ready_signaller != NULL) {
ready_signaller(signaller_data);
}
size = QB_MIN(sizeof(struct qb_ipc_request_header), 64);
for (uint32_t r = ((struct dispatch_data *) data)->repetitions;
r > 0; r--) {
for (j = 1; j < 19; j++) {
size *= 2;
if (size >= max_size)
break;
if (send_and_check(msg_type, size,
recv_timeout, QB_TRUE) < 0) {
break;
}
}
}
}
static void
test_ipc_dispatch(void)
{
pid_t pid;
struct dispatch_data data;
pid = run_function_in_new_process(NULL, run_ipc_server, NULL);
ck_assert(pid != -1);
data = (struct dispatch_data){.server_pid = pid,
.msg_type = IPC_MSG_REQ_DISPATCH,
.repetitions = 1};
client_dispatch(NULL, NULL, (void *) &data);
request_server_exit();
qb_ipcc_disconnect(conn);
verify_graceful_stop(pid);
}
START_TEST(test_ipc_dispatch_us)
{
qb_enter();
ipc_type = QB_IPC_SOCKET;
set_ipc_name(__func__);
test_ipc_dispatch();
qb_leave();
}
END_TEST
static int32_t events_received;
static int32_t
count_stress_events(int32_t fd, int32_t revents, void *data)
{
struct {
struct qb_ipc_response_header hdr __attribute__ ((aligned(8)));
char data[GIANT_MSG_DATA_SIZE] __attribute__ ((aligned(8)));
uint32_t sent_msgs __attribute__ ((aligned(8)));
} __attribute__ ((aligned(8))) giant_event_recv;
qb_loop_t *cl = (qb_loop_t*)data;
int32_t res;
res = qb_ipcc_event_recv(conn, &giant_event_recv,
sizeof(giant_event_recv),
-1);
if (res > 0) {
events_received++;
if ((events_received % 1000) == 0) {
qb_log(LOG_DEBUG, "RECV: %d stress events processed", events_received);
if (res != sizeof(giant_event_recv)) {
qb_log(LOG_DEBUG, "Unexpected recv size, expected %d got %d",
res, sizeof(giant_event_recv));
ck_assert_int_eq(res, sizeof(giant_event_recv));
} else if (giant_event_recv.sent_msgs != events_received) {
qb_log(LOG_DEBUG, "Server event mismatch. Server thinks we got %d msgs, but we only received %d",
giant_event_recv.sent_msgs, events_received);
/* This indicates that data corruption is occurring. Since the events
* received is placed at the end of the giant msg, it is possible
* that buffers were not allocated correctly resulting in us
* reading/writing to uninitialized memeory at some point. */
ck_assert_int_eq(giant_event_recv.sent_msgs, events_received);
}
}
} else if (res != -EAGAIN) {
qb_perror(LOG_DEBUG, "count_stress_events");
qb_loop_stop(cl);
return -1;
}
if (events_received >= num_stress_events) {
qb_loop_stop(cl);
return -1;
}
return 0;
}
static int32_t
count_bulk_events(int32_t fd, int32_t revents, void *data)
{
qb_loop_t *cl = (qb_loop_t*)data;
struct qb_ipc_response_header res_header;
int32_t res;
res = qb_ipcc_event_recv(conn, &res_header,
sizeof(struct qb_ipc_response_header),
-1);
if (res > 0) {
events_received++;
}
if (events_received >= num_bulk_events) {
qb_loop_stop(cl);
return -1;
}
return 0;
}
static void
test_ipc_stress_connections(void)
{
int32_t c = 0;
int32_t j = 0;
uint32_t max_size = MAX_MSG_SIZE;
int32_t connections = 0;
pid_t pid;
multiple_connections = QB_TRUE;
qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_CLEAR_ALL,
QB_LOG_FILTER_FILE, "*", LOG_TRACE);
qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD,
QB_LOG_FILTER_FILE, "*", LOG_INFO);
qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE);
pid = run_function_in_new_process("server", run_ipc_server, NULL);
ck_assert(pid != -1);
for (connections = 1; connections < NUM_STRESS_CONNECTIONS; connections++) {
if (conn) {
qb_ipcc_disconnect(conn);
conn = NULL;
}
do {
conn = qb_ipcc_connect(ipc_name, max_size);
if (conn == NULL) {
j = waitpid(pid, NULL, WNOHANG);
ck_assert_int_eq(j, 0);
sleep(1);
c++;
}
} while (conn == NULL && c < 5);
ck_assert(conn != NULL);
if (((connections+1) % 1000) == 0) {
qb_log(LOG_INFO, "%d ipc connections made", connections+1);
}
}
multiple_connections = QB_FALSE;
request_server_exit();
verify_graceful_stop(pid);
qb_ipcc_disconnect(conn);
qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_CLEAR_ALL,
QB_LOG_FILTER_FILE, "*", LOG_TRACE);
qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD,
QB_LOG_FILTER_FILE, "*", LOG_TRACE);
qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE);
}
static void
test_ipc_bulk_events(void)
{
int32_t c = 0;
int32_t j = 0;
pid_t pid;
int32_t res;
qb_loop_t *cl;
int32_t fd;
uint32_t max_size = MAX_MSG_SIZE;
pid = run_function_in_new_process("server", run_ipc_server, NULL);
ck_assert(pid != -1);
do {
conn = qb_ipcc_connect(ipc_name, max_size);
if (conn == NULL) {
j = waitpid(pid, NULL, WNOHANG);
ck_assert_int_eq(j, 0);
poll(NULL, 0, 400);
c++;
}
} while (conn == NULL && c < 5);
ck_assert(conn != NULL);
events_received = 0;
cl = qb_loop_create();
res = qb_ipcc_fd_get(conn, &fd);
ck_assert_int_eq(res, 0);
res = qb_loop_poll_add(cl, QB_LOOP_MED,
fd, POLLIN,
cl, count_bulk_events);
ck_assert_int_eq(res, 0);
res = send_and_check(IPC_MSG_REQ_BULK_EVENTS,
0,
recv_timeout, QB_TRUE);
ck_assert_int_eq(res, sizeof(struct qb_ipc_response_header));
qb_loop_run(cl);
ck_assert_int_eq(events_received, num_bulk_events);
request_server_exit();
qb_ipcc_disconnect(conn);
verify_graceful_stop(pid);
}
static void
test_ipc_stress_test(void)
{
struct {
struct qb_ipc_request_header hdr __attribute__ ((aligned(8)));
char data[GIANT_MSG_DATA_SIZE] __attribute__ ((aligned(8)));
uint32_t sent_msgs __attribute__ ((aligned(8)));
} __attribute__ ((aligned(8))) giant_req;
struct qb_ipc_response_header res_header;
struct iovec iov[1];
int32_t c = 0;
int32_t j = 0;
pid_t pid;
int32_t res;
qb_loop_t *cl;
int32_t fd;
uint32_t max_size = MAX_MSG_SIZE;
/* This looks strange, but it serves an important purpose.
* This test forces the server to enforce the MAX_MSG_SIZE
* limit from the server side, which overrides the client's
* buffer limit. To verify this functionality is working
* we set the client limit lower than what the server
* is enforcing. */
int32_t client_buf_size = max_size - 1024;
int32_t real_buf_size;
enforce_server_buffer = 1;
pid = run_function_in_new_process("server", run_ipc_server, NULL);
enforce_server_buffer = 0;
ck_assert(pid != -1);
do {
conn = qb_ipcc_connect(ipc_name, client_buf_size);
if (conn == NULL) {
j = waitpid(pid, NULL, WNOHANG);
ck_assert_int_eq(j, 0);
poll(NULL, 0, 400);
c++;
}
} while (conn == NULL && c < 5);
ck_assert(conn != NULL);
real_buf_size = qb_ipcc_get_buffer_size(conn);
ck_assert_int_eq(real_buf_size, max_size);
qb_log(LOG_DEBUG, "Testing %d iterations of EVENT msg passing.", num_stress_events);
events_received = 0;
cl = qb_loop_create();
res = qb_ipcc_fd_get(conn, &fd);
ck_assert_int_eq(res, 0);
res = qb_loop_poll_add(cl, QB_LOOP_MED,
fd, POLLIN,
cl, count_stress_events);
ck_assert_int_eq(res, 0);
res = send_and_check(IPC_MSG_REQ_STRESS_EVENT, 0, recv_timeout, QB_TRUE);
qb_loop_run(cl);
ck_assert_int_eq(events_received, num_stress_events);
giant_req.hdr.id = IPC_MSG_REQ_SERVER_FAIL;
giant_req.hdr.size = sizeof(giant_req);
if (giant_req.hdr.size <= client_buf_size) {
ck_assert_int_eq(1, 0);
}
iov[0].iov_len = giant_req.hdr.size;
iov[0].iov_base = &giant_req;
res = qb_ipcc_sendv_recv(conn, iov, 1,
&res_header,
sizeof(struct qb_ipc_response_header), -1);
if (res != -ECONNRESET && res != -ENOTCONN) {
qb_log(LOG_ERR, "id:%d size:%d", res_header.id, res_header.size);
ck_assert_int_eq(res, -ENOTCONN);
}
qb_ipcc_disconnect(conn);
verify_graceful_stop(pid);
}
#ifndef __clang__ /* see variable length array in structure' at the top */
START_TEST(test_ipc_stress_test_us)
{
qb_enter();
send_event_on_created = QB_FALSE;
ipc_type = QB_IPC_SOCKET;
set_ipc_name(__func__);
test_ipc_stress_test();
qb_leave();
}
END_TEST
#endif
START_TEST(test_ipc_stress_connections_us)
{
qb_enter();
ipc_type = QB_IPC_SOCKET;
set_ipc_name(__func__);
test_ipc_stress_connections();
qb_leave();
}
END_TEST
START_TEST(test_ipc_bulk_events_us)
{
qb_enter();
ipc_type = QB_IPC_SOCKET;
set_ipc_name(__func__);
test_ipc_bulk_events();
qb_leave();
}
END_TEST
static
READY_SIGNALLER(connected_signaller, _)
{
request_server_exit();
}
START_TEST(test_ipc_dispatch_us_native_prio_dlock)
{
pid_t server_pid, alphaclient_pid;
struct dispatch_data data;
qb_enter();
ipc_type = QB_IPC_SOCKET;
set_ipc_name(__func__);
/* this is to demonstrate that native event loop can deal even
with "extreme" priority disproportions */
global_loop_prio = QB_LOOP_LOW;
multiple_connections = QB_TRUE;
recv_timeout = -1;
server_pid = run_function_in_new_process("server", run_ipc_server,
NULL);
ck_assert(server_pid != -1);
data = (struct dispatch_data){.server_pid = server_pid,
.msg_type = IPC_MSG_REQ_SELF_FEED,
.repetitions = 1};
alphaclient_pid = run_function_in_new_process("alphaclient",
client_dispatch,
(void *) &data);
ck_assert(alphaclient_pid != -1);
//sleep(1);
sched_yield();
data.repetitions = 0;
client_dispatch(connected_signaller, NULL, (void *) &data);
verify_graceful_stop(server_pid);
multiple_connections = QB_FALSE;
qb_leave();
}
END_TEST
#if HAVE_GLIB
START_TEST(test_ipc_dispatch_us_glib_prio_dlock)
{
pid_t server_pid, alphaclient_pid;
struct dispatch_data data;
qb_enter();
ipc_type = QB_IPC_SOCKET;
set_ipc_name(__func__);
global_use_glib = QB_TRUE;
/* this is to make the test pass at all, since GLib is strict
on priorities -- QB_LOOP_MED or lower would fail for sure */
global_loop_prio = QB_LOOP_HIGH;
multiple_connections = QB_TRUE;
recv_timeout = -1;
server_pid = run_function_in_new_process("server", run_ipc_server,
NULL);
ck_assert(server_pid != -1);
data = (struct dispatch_data){.server_pid = server_pid,
.msg_type = IPC_MSG_REQ_SELF_FEED,
.repetitions = 1};
alphaclient_pid = run_function_in_new_process("alphaclient",
client_dispatch,
(void *) &data);
ck_assert(alphaclient_pid != -1);
//sleep(1);
sched_yield();
data.repetitions = 0;
client_dispatch(connected_signaller, NULL, (void *) &data);
verify_graceful_stop(server_pid);
multiple_connections = QB_FALSE;
global_loop_prio = QB_LOOP_MED;
global_use_glib = QB_FALSE;
qb_leave();
}
END_TEST
#endif
static void
test_ipc_event_on_created(void)
{
int32_t c = 0;
int32_t j = 0;
pid_t pid;
int32_t res;
qb_loop_t *cl;
int32_t fd;
uint32_t max_size = MAX_MSG_SIZE;
num_bulk_events = 1;
pid = run_function_in_new_process("server", run_ipc_server, NULL);
ck_assert(pid != -1);
do {
conn = qb_ipcc_connect(ipc_name, max_size);
if (conn == NULL) {
j = waitpid(pid, NULL, WNOHANG);
ck_assert_int_eq(j, 0);
poll(NULL, 0, 400);
c++;
}
} while (conn == NULL && c < 5);
ck_assert(conn != NULL);
events_received = 0;
cl = qb_loop_create();
res = qb_ipcc_fd_get(conn, &fd);
ck_assert_int_eq(res, 0);
res = qb_loop_poll_add(cl, QB_LOOP_MED,
fd, POLLIN,
cl, count_bulk_events);
ck_assert_int_eq(res, 0);
qb_loop_run(cl);
ck_assert_int_eq(events_received, num_bulk_events);
request_server_exit();
qb_ipcc_disconnect(conn);
verify_graceful_stop(pid);
}
START_TEST(test_ipc_event_on_created_us)
{
qb_enter();
send_event_on_created = QB_TRUE;
ipc_type = QB_IPC_SOCKET;
set_ipc_name(__func__);
test_ipc_event_on_created();
qb_leave();
}
END_TEST
static void
test_ipc_disconnect_after_created(void)
{
struct qb_ipc_request_header req_header;
struct qb_ipc_response_header res_header;
struct iovec iov[1];
int32_t c = 0;
int32_t j = 0;
pid_t pid;
int32_t res;
uint32_t max_size = MAX_MSG_SIZE;
pid = run_function_in_new_process("server", run_ipc_server, NULL);
ck_assert(pid != -1);
do {
conn = qb_ipcc_connect(ipc_name, max_size);
if (conn == NULL) {
j = waitpid(pid, NULL, WNOHANG);
ck_assert_int_eq(j, 0);
poll(NULL, 0, 400);
c++;
}
} while (conn == NULL && c < 5);
ck_assert(conn != NULL);
ck_assert_int_eq(QB_TRUE, qb_ipcc_is_connected(conn));
req_header.id = IPC_MSG_REQ_SERVER_DISCONNECT;
req_header.size = sizeof(struct qb_ipc_request_header);
iov[0].iov_len = req_header.size;
iov[0].iov_base = &req_header;
res = qb_ipcc_sendv_recv(conn, iov, 1,
&res_header,
sizeof(struct qb_ipc_response_header), -1);
/*
* confirm we get -ENOTCONN or -ECONNRESET
*/
if (res != -ECONNRESET && res != -ENOTCONN) {
qb_log(LOG_ERR, "id:%d size:%d", res_header.id, res_header.size);
ck_assert_int_eq(res, -ENOTCONN);
}
ck_assert_int_eq(QB_FALSE, qb_ipcc_is_connected(conn));
qb_ipcc_disconnect(conn);
kill_server(pid);
}
START_TEST(test_ipc_disconnect_after_created_us)
{
qb_enter();
disconnect_after_created = QB_TRUE;
ipc_type = QB_IPC_SOCKET;
set_ipc_name(__func__);
test_ipc_disconnect_after_created();
qb_leave();
}
END_TEST
static void
test_ipc_server_fail(void)
{
int32_t j;
int32_t c = 0;
pid_t pid;
uint32_t max_size = MAX_MSG_SIZE;
pid = run_function_in_new_process("server", run_ipc_server, NULL);
ck_assert(pid != -1);
do {
conn = qb_ipcc_connect(ipc_name, max_size);
if (conn == NULL) {
j = waitpid(pid, NULL, WNOHANG);
ck_assert_int_eq(j, 0);
poll(NULL, 0, 400);
c++;
}
} while (conn == NULL && c < 5);
ck_assert(conn != NULL);
request_server_exit();
if (_fi_unlink_inject_failure == QB_TRUE) {
_fi_truncate_called = _fi_openat_called = 0;
}
ck_assert_int_eq(QB_FALSE, qb_ipcc_is_connected(conn));
qb_ipcc_disconnect(conn);
if (_fi_unlink_inject_failure == QB_TRUE) {
ck_assert_int_ne(_fi_truncate_called + _fi_openat_called, 0);
}
verify_graceful_stop(pid);
}
START_TEST(test_ipc_server_fail_soc)
{
qb_enter();
ipc_type = QB_IPC_SOCKET;
set_ipc_name(__func__);
test_ipc_server_fail();
qb_leave();
}
END_TEST
START_TEST(test_ipc_dispatch_shm)
{
qb_enter();
ipc_type = QB_IPC_SHM;
set_ipc_name(__func__);
test_ipc_dispatch();
qb_leave();
}
END_TEST
START_TEST(test_ipc_stress_test_shm)
{
qb_enter();
send_event_on_created = QB_FALSE;
ipc_type = QB_IPC_SHM;
set_ipc_name(__func__);
test_ipc_stress_test();
qb_leave();
}
END_TEST
START_TEST(test_ipc_stress_connections_shm)
{
qb_enter();
ipc_type = QB_IPC_SHM;
set_ipc_name(__func__);
test_ipc_stress_connections();
qb_leave();
}
END_TEST
// Check perms uses illegal access to libqb internals
// DO NOT try this at home.
#include "../lib/ipc_int.h"
#include "../lib/ringbuffer_int.h"
START_TEST(test_ipc_server_perms)
{
pid_t pid;
struct stat st;
int j;
uint32_t max_size;
int res;
int c = 0;
// Can only test this if we are root
if (getuid() != 0) {
return;
}
ipc_type = QB_IPC_SHM;
set_perms_on_socket = QB_TRUE;
max_size = MAX_MSG_SIZE;
pid = run_function_in_new_process("server", run_ipc_server, NULL);
ck_assert(pid != -1);
do {
conn = qb_ipcc_connect(ipc_name, max_size);
if (conn == NULL) {
j = waitpid(pid, NULL, WNOHANG);
ck_assert_int_eq(j, 0);
poll(NULL, 0, 400);
c++;
}
} while (conn == NULL && c < 5);
ck_assert(conn != NULL);
/* Check perms - uses illegal access to libqb internals */
/* BSD uses /var/run for sockets so we can't alter the perms on the
directory */
#ifdef __linux__
char sockdir[PATH_MAX];
strcpy(sockdir, conn->request.u.shm.rb->shared_hdr->hdr_path);
*strrchr(sockdir, '/') = 0;
res = stat(sockdir, &st);
ck_assert_int_eq(res, 0);
ck_assert(st.st_mode & S_IRWXG);
ck_assert_int_eq(st.st_uid, 555);
ck_assert_int_eq(st.st_gid, 741);
#endif
res = stat(conn->request.u.shm.rb->shared_hdr->hdr_path, &st);
ck_assert_int_eq(res, 0);
ck_assert_int_eq(st.st_uid, 555);
ck_assert_int_eq(st.st_gid, 741);
qb_ipcc_disconnect(conn);
verify_graceful_stop(pid);
}
END_TEST
START_TEST(test_ipc_dispatch_shm_native_prio_dlock)
{
pid_t server_pid, alphaclient_pid;
struct dispatch_data data;
qb_enter();
ipc_type = QB_IPC_SHM;
set_ipc_name(__func__);
/* this is to demonstrate that native event loop can deal even
with "extreme" priority disproportions */
global_loop_prio = QB_LOOP_LOW;
multiple_connections = QB_TRUE;
recv_timeout = -1;
server_pid = run_function_in_new_process("server", run_ipc_server,
NULL);
ck_assert(server_pid != -1);
data = (struct dispatch_data){.server_pid = server_pid,
.msg_type = IPC_MSG_REQ_SELF_FEED,
.repetitions = 1};
alphaclient_pid = run_function_in_new_process("alphaclient",
client_dispatch,
(void *) &data);
ck_assert(alphaclient_pid != -1);
//sleep(1);
sched_yield();
data.repetitions = 0;
client_dispatch(connected_signaller, NULL, (void *) &data);
verify_graceful_stop(server_pid);
multiple_connections = QB_FALSE;
qb_leave();
}
END_TEST
#if HAVE_GLIB
START_TEST(test_ipc_dispatch_shm_glib_prio_dlock)
{
pid_t server_pid, alphaclient_pid;
struct dispatch_data data;
qb_enter();
ipc_type = QB_IPC_SOCKET;
set_ipc_name(__func__);
global_use_glib = QB_TRUE;
/* this is to make the test pass at all, since GLib is strict
on priorities -- QB_LOOP_MED or lower would fail for sure */
global_loop_prio = QB_LOOP_HIGH;
multiple_connections = QB_TRUE;
recv_timeout = -1;
server_pid = run_function_in_new_process("server", run_ipc_server,
NULL);
ck_assert(server_pid != -1);
data = (struct dispatch_data){.server_pid = server_pid,
.msg_type = IPC_MSG_REQ_SELF_FEED,
.repetitions = 1};
alphaclient_pid = run_function_in_new_process("alphaclient",
client_dispatch,
(void *) &data);
ck_assert(alphaclient_pid != -1);
//sleep(1);
sched_yield();
data.repetitions = 0;
client_dispatch(connected_signaller, NULL, (void *) &data);
verify_graceful_stop(server_pid);
multiple_connections = QB_FALSE;
global_loop_prio = QB_LOOP_MED;
global_use_glib = QB_FALSE;
qb_leave();
}
END_TEST
#endif
START_TEST(test_ipc_bulk_events_shm)
{
qb_enter();
ipc_type = QB_IPC_SHM;
set_ipc_name(__func__);
test_ipc_bulk_events();
qb_leave();
}
END_TEST
START_TEST(test_ipc_event_on_created_shm)
{
qb_enter();
send_event_on_created = QB_TRUE;
ipc_type = QB_IPC_SHM;
set_ipc_name(__func__);
test_ipc_event_on_created();
qb_leave();
}
END_TEST
START_TEST(test_ipc_server_fail_shm)
{
qb_enter();
ipc_type = QB_IPC_SHM;
set_ipc_name(__func__);
test_ipc_server_fail();
qb_leave();
}
END_TEST
#ifdef HAVE_FAILURE_INJECTION
START_TEST(test_ipcc_truncate_when_unlink_fails_shm)
{
char sock_file[PATH_MAX];
struct sockaddr_un socka;
qb_enter();
ipc_type = QB_IPC_SHM;
set_ipc_name(__func__);
sprintf(sock_file, "%s/%s", SOCKETDIR, ipc_name);
sock_file[sizeof(socka.sun_path)] = '\0';
/* If there's an old socket left from a previous run this test will fail
unexpectedly, so try to remove it first */
unlink(sock_file);
_fi_unlink_inject_failure = QB_TRUE;
test_ipc_server_fail();
_fi_unlink_inject_failure = QB_FALSE;
unlink(sock_file);
qb_leave();
}
END_TEST
#endif
static void
test_ipc_service_ref_count(void)
{
int32_t c = 0;
int32_t j = 0;
pid_t pid;
uint32_t max_size = MAX_MSG_SIZE;
reference_count_test = QB_TRUE;
pid = run_function_in_new_process("server", run_ipc_server, NULL);
ck_assert(pid != -1);
do {
conn = qb_ipcc_connect(ipc_name, max_size);
if (conn == NULL) {
j = waitpid(pid, NULL, WNOHANG);
ck_assert_int_eq(j, 0);
(void)poll(NULL, 0, 400);
c++;
}
} while (conn == NULL && c < 5);
ck_assert(conn != NULL);
sleep(5);
kill_server(pid);
}
START_TEST(test_ipc_service_ref_count_shm)
{
qb_enter();
ipc_type = QB_IPC_SHM;
set_ipc_name(__func__);
test_ipc_service_ref_count();
qb_leave();
}
END_TEST
START_TEST(test_ipc_service_ref_count_us)
{
qb_enter();
ipc_type = QB_IPC_SOCKET;
set_ipc_name(__func__);
test_ipc_service_ref_count();
qb_leave();
}
END_TEST
#if 0
static void test_max_dgram_size(void)
{
/* most implementations will not let you set a dgram buffer
* of 1 million bytes. This test verifies that the we can detect
* the max dgram buffersize regardless, and that the value we detect
* is consistent. */
int32_t init;
int32_t i;
qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_REMOVE,
QB_LOG_FILTER_FILE, "*", LOG_TRACE);
init = qb_ipcc_verify_dgram_max_msg_size(1000000);
ck_assert(init > 0);
for (i = 0; i < 100; i++) {
int try = qb_ipcc_verify_dgram_max_msg_size(1000000);
#if 0
ck_assert_int_eq(init, try);
#else
/* extra troubleshooting, report also on i and errno variables;
related: https://github.com/ClusterLabs/libqb/issues/234 */
if (init != try) {
#ifdef ci_dump_shm_usage
system("df -h | grep -e /shm >/tmp/_shm_usage");
#endif
ck_abort_msg("Assertion 'init==try' failed:"
" init==%#x, try==%#x, i=%d, errno=%d",
init, try, i, errno);
}
#endif
}
qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD,
QB_LOG_FILTER_FILE, "*", LOG_TRACE);
}
START_TEST(test_ipc_max_dgram_size)
{
qb_enter();
test_max_dgram_size();
qb_leave();
}
END_TEST
#endif
static Suite *
make_shm_suite(void)
{
TCase *tc;
Suite *s = suite_create("shm");
+ add_tcase(s, tc, test_ipc_shm_connect_async, 7);
+
add_tcase(s, tc, test_ipc_txrx_shm_getauth, 7);
add_tcase(s, tc, test_ipc_txrx_shm_timeout, 28);
add_tcase(s, tc, test_ipc_server_fail_shm, 7);
add_tcase(s, tc, test_ipc_txrx_shm_block, 7);
add_tcase(s, tc, test_ipc_txrx_shm_tmo, 7);
add_tcase(s, tc, test_ipc_fc_shm, 7);
add_tcase(s, tc, test_ipc_dispatch_shm, 15);
add_tcase(s, tc, test_ipc_stress_test_shm, 15);
add_tcase(s, tc, test_ipc_bulk_events_shm, 15);
add_tcase(s, tc, test_ipc_exit_shm, 6);
add_tcase(s, tc, test_ipc_event_on_created_shm, 9);
add_tcase(s, tc, test_ipc_service_ref_count_shm, 9);
add_tcase(s, tc, test_ipc_server_perms, 7);
add_tcase(s, tc, test_ipc_stress_connections_shm, 3600 /* ? */);
add_tcase(s, tc, test_ipc_dispatch_shm_native_prio_dlock, 15);
#if HAVE_GLIB
add_tcase(s, tc, test_ipc_dispatch_shm_glib_prio_dlock, 15);
#endif
#ifdef HAVE_FAILURE_INJECTION
add_tcase(s, tc, test_ipcc_truncate_when_unlink_fails_shm, 8);
#endif
return s;
}
static Suite *
make_soc_suite(void)
{
Suite *s = suite_create("socket");
TCase *tc;
+ add_tcase(s, tc, test_ipc_us_connect_async, 7);
+
add_tcase(s, tc, test_ipc_txrx_us_getauth, 7);
add_tcase(s, tc, test_ipc_txrx_us_timeout, 28);
/* Commented out for the moment as space in /dev/shm on the CI machines
causes random failures */
/* add_tcase(s, tc, test_ipc_max_dgram_size, 30); */
add_tcase(s, tc, test_ipc_server_fail_soc, 7);
add_tcase(s, tc, test_ipc_txrx_us_block, 7);
add_tcase(s, tc, test_ipc_txrx_us_tmo, 7);
add_tcase(s, tc, test_ipc_fc_us, 7);
add_tcase(s, tc, test_ipc_exit_us, 6);
add_tcase(s, tc, test_ipc_dispatch_us, 15);
#ifndef __clang__ /* see variable length array in structure' at the top */
add_tcase(s, tc, test_ipc_stress_test_us, 58);
#endif
add_tcase(s, tc, test_ipc_bulk_events_us, 15);
add_tcase(s, tc, test_ipc_event_on_created_us, 9);
add_tcase(s, tc, test_ipc_disconnect_after_created_us, 9);
add_tcase(s, tc, test_ipc_service_ref_count_us, 9);
add_tcase(s, tc, test_ipc_stress_connections_us, 3600 /* ? */);
add_tcase(s, tc, test_ipc_dispatch_us_native_prio_dlock, 15);
#if HAVE_GLIB
add_tcase(s, tc, test_ipc_dispatch_us_glib_prio_dlock, 15);
#endif
return s;
}
int32_t
main(void)
{
int32_t number_failed;
SRunner *sr;
Suite *s;
int32_t do_shm_tests = QB_TRUE;
set_ipc_name("ipc_test");
#ifdef DISABLE_IPC_SHM
do_shm_tests = QB_FALSE;
#endif /* DISABLE_IPC_SHM */
s = make_soc_suite();
sr = srunner_create(s);
if (do_shm_tests) {
srunner_add_suite(sr, make_shm_suite());
}
qb_log_init("check", LOG_USER, LOG_EMERG);
atexit(qb_log_fini);
qb_log_ctl(QB_LOG_SYSLOG, QB_LOG_CONF_ENABLED, QB_FALSE);
qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD,
QB_LOG_FILTER_FILE, "*", LOG_TRACE);
qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE);
qb_log_format_set(QB_LOG_STDERR, "lib/%f|%l| %b");
srunner_run_all(sr, CK_VERBOSE);
number_failed = srunner_ntests_failed(sr);
srunner_free(sr);
return (number_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE;
}

File Metadata

Mime Type
text/x-diff
Expires
Wed, Feb 26, 5:18 PM (18 m, 7 s ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1454109
Default Alt Text
(109 KB)

Event Timeline