Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/include/qb/qbipcc.h b/include/qb/qbipcc.h
index bdbfe03..77631e0 100644
--- a/include/qb/qbipcc.h
+++ b/include/qb/qbipcc.h
@@ -1,240 +1,257 @@
/*
* Copyright (C) 2006-2007, 2009 Red Hat, Inc.
*
* Author: Steven Dake <sdake@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef QB_IPCC_H_DEFINED
#define QB_IPCC_H_DEFINED
/* *INDENT-OFF* */
#ifdef __cplusplus
extern "C" {
#endif
/* *INDENT-ON* */
#include <qb/qbconfig.h>
#include <pthread.h>
#include <sys/poll.h>
#include <sys/socket.h>
#include <qb/qbhdb.h>
#include <qb/qbipc_common.h>
/**
* @file qbipcc.h
*
* Client IPC API.
*
* @par Lifecycle of an IPC connection.
* An IPC connection is made to the server with qb_ipcc_connect(). This function
* connects to the server and requests channels be created for communication.
* To disconnect, the client either exits or executes the function qb_ipcc_disconnect().
*
* @par Synchronous communication
* The function qb_ipcc_sendv_recv() sends an iovector request and receives a response.
*
* @par Asynchronous requests from the client
* The function qb_ipcc_sendv() sends an iovector request.
* The function qb_ipcc_send() sends an message buffer request.
*
* @par Asynchronous events from the server
* The qb_ipcc_event_recv() function receives an out-of-band asyncronous message.
* The asynchronous messages are queued and can provide very high out-of-band performance.
* To determine when to call qb_ipcc_event_recv() the qb_ipcc_fd_get() call is
* used to obtain a file descriptor used in the poll() or select() system calls.
*
* @example ipcclient.c
* This is an example of how to use the client.
*/
typedef struct qb_ipcc_connection qb_ipcc_connection_t;
/**
* Create a connection to an IPC service.
*
* @param name name of the service.
* @param max_msg_size biggest msg size.
* @return NULL (error: see errno) or a connection object.
*
* @note It is recommended to do a one time check on the
* max_msg_size value using qb_ipcc_verify_dgram_max_msg_size
* _BEFORE_ calling the connect function when IPC_SOCKET is in use.
* Some distributions while allow large message buffers to be
* set on the socket, but not actually honor them because of
* kernel state values. The qb_ipcc_verify_dgram_max_msg_size
* function both sets the socket buffer size and verifies it by
* doing a send/recv.
*/
qb_ipcc_connection_t*
qb_ipcc_connect(const char *name, size_t max_msg_size);
/**
* Test kernel dgram socket buffers to verify the largest size up
* to the max_msg_size value a single msg can be. Rounds down to the
* nearest 1k.
*
* @param max_msg_size biggest msg size.
* @return -1 if max size can not be detected, positive value
* representing the largest single msg up to max_msg_size
* that can successfully be sent over a unix dgram socket.
*/
int32_t
qb_ipcc_verify_dgram_max_msg_size(size_t max_msg_size);
/**
* Disconnect an IPC connection.
*
* @param c connection instance
*/
void qb_ipcc_disconnect(qb_ipcc_connection_t* c);
/**
* Get the file descriptor to poll.
*
* @param c connection instance
* @param fd (out) file descriptor to poll
*/
int32_t qb_ipcc_fd_get(qb_ipcc_connection_t* c, int32_t * fd);
/**
* Set the maximum allowable flowcontrol value.
*
* @note the default is 1
*
* @param c connection instance
* @param max the max allowable flowcontrol value (1 or 2)
*/
int32_t qb_ipcc_fc_enable_max_set(qb_ipcc_connection_t * c, uint32_t max);
/**
* Send a message.
*
* @param c connection instance
* @param msg_ptr pointer to a message to send
* @param msg_len the size of the message
* @return (size sent, -errno == error)
*
* @note the msg_ptr must include a qb_ipc_request_header at
* the top of the message. The server will read the size field
* to determine how much to recv.
*/
ssize_t qb_ipcc_send(qb_ipcc_connection_t* c, const void *msg_ptr,
size_t msg_len);
/**
* Send a message (iovec).
*
* @param c connection instance
* @param iov pointer to an iovec struct to send
* @param iov_len the number of iovecs used
* @return (size sent, -errno == error)
*
* @note the iov[0] must be a qb_ipc_request_header. The server will
* read the size field to determine how much to recv.
*/
ssize_t qb_ipcc_sendv(qb_ipcc_connection_t* c, const struct iovec* iov,
size_t iov_len);
/**
* Receive a response.
*
* @param c connection instance
* @param msg_ptr pointer to a message buffer to receive into
* @param msg_len the size of the buffer
* @param ms_timeout max time to wait for a response
* @return (size recv'ed, -errno == error)
*
* @note that msg_ptr will include a qb_ipc_response_header at
* the top of the message.
*/
ssize_t qb_ipcc_recv(qb_ipcc_connection_t* c, void *msg_ptr,
size_t msg_len, int32_t ms_timeout);
/**
* This is a convenience function that simply sends and then recvs.
*
* @param c connection instance
* @param iov pointer to an iovec struct to send
* @param iov_len the number of iovecs used
* @param msg_ptr pointer to a message buffer to receive into
* @param msg_len the size of the buffer
* @param ms_timeout max time to wait for a response
*
* @note the iov[0] must include a qb_ipc_request_header at
* the top of the message. The server will read the size field
* to determine how much to recv.
* @note that msg_ptr will include a qb_ipc_response_header at
* the top of the message.
*
* @see qb_ipcc_sendv() qb_ipcc_recv()
*/
ssize_t qb_ipcc_sendv_recv(qb_ipcc_connection_t *c,
const struct iovec *iov, uint32_t iov_len,
void *msg_ptr, size_t msg_len,
int32_t ms_timeout);
/**
* Receive an event.
*
* @param c connection instance
* @param msg_ptr pointer to a message buffer to receive into
* @param msg_len the size of the buffer
* @param ms_timeout time in milli seconds to wait for a message
* 0 == no wait, negative == block, positive == wait X ms.
* @param ms_timeout max time to wait for a response
* @return size of the message or error (-errno)
*
* @note that msg_ptr will include a qb_ipc_response_header at
* the top of the message.
*/
ssize_t qb_ipcc_event_recv(qb_ipcc_connection_t* c, void *msg_ptr,
size_t msg_len, int32_t ms_timeout);
/**
* Associate a "user" pointer with this connection.
*
* @param context the point to associate with this connection.
* @param c connection instance
* @see qb_ipcc_context_get()
*/
void qb_ipcc_context_set(qb_ipcc_connection_t *c, void *context);
/**
* Get the context (set previously)
*
* @param c connection instance
* @return the context
* @see qb_ipcc_context_set()
*/
void *qb_ipcc_context_get(qb_ipcc_connection_t *c);
/**
* Is the connection connected?
*
* @param c connection instance
* @retval QB_TRUE when connected
* @retval QB_FALSE when not connected
*/
int32_t qb_ipcc_is_connected(qb_ipcc_connection_t *c);
+/**
+ * What is the actual buffer size used after the connection.
+ *
+ * @note The buffer size is guaranteed to be at least the size
+ * of the value given in qb_ipcc_connect, but it is possible
+ * the server will enforce a larger size depending on the
+ * implementation. If the server side is known to enforce
+ * a buffer size, use this function after the client connection
+ * is established to retrieve the buffer size in use. It is
+ * important for the client side to know the buffer size in use
+ * so the client can successfully retrieve large server events.
+ *
+ * @param c connection instance
+ * @retval connection size in bytes or -error code
+ */
+int32_t qb_ipcc_get_buffer_size(qb_ipcc_connection_t * c);
+
/* *INDENT-OFF* */
#ifdef __cplusplus
}
#endif
/* *INDENT-ON* */
#endif /* QB_IPCC_H_DEFINED */
diff --git a/include/qb/qbipcs.h b/include/qb/qbipcs.h
index 2adcd17..b9883d5 100644
--- a/include/qb/qbipcs.h
+++ b/include/qb/qbipcs.h
@@ -1,453 +1,465 @@
/*
* Copyright (C) 2006-2009 Red Hat, Inc.
*
* Author: Steven Dake <sdake@redhat.com>,
* Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef QB_IPCS_H_DEFINED
#define QB_IPCS_H_DEFINED
/* *INDENT-OFF* */
#ifdef __cplusplus
extern "C" {
#endif
/* *INDENT-ON* */
#include <stdlib.h>
#include <sys/uio.h>
#include <qb/qbipc_common.h>
#include <qb/qbhdb.h>
#include <qb/qbloop.h>
/**
* @file qbipcs.h
*
* Server IPC API.
*
* @example ipcserver.c
*/
enum qb_ipcs_rate_limit {
QB_IPCS_RATE_FAST,
QB_IPCS_RATE_NORMAL,
QB_IPCS_RATE_SLOW,
QB_IPCS_RATE_OFF,
QB_IPCS_RATE_OFF_2,
};
struct qb_ipcs_connection;
typedef struct qb_ipcs_connection qb_ipcs_connection_t;
struct qb_ipcs_service;
typedef struct qb_ipcs_service qb_ipcs_service_t;
struct qb_ipcs_stats {
uint32_t active_connections;
uint32_t closed_connections;
};
struct qb_ipcs_connection_stats {
int32_t client_pid;
uint64_t requests;
uint64_t responses;
uint64_t events;
uint64_t send_retries;
uint64_t recv_retries;
int32_t flow_control_state;
uint64_t flow_control_count;
};
struct qb_ipcs_connection_stats_2 {
int32_t client_pid;
uint64_t requests;
uint64_t responses;
uint64_t events;
uint64_t send_retries;
uint64_t recv_retries;
int32_t flow_control_state;
uint64_t flow_control_count;
uint32_t event_q_length;
};
typedef int32_t (*qb_ipcs_dispatch_fn_t) (int32_t fd, int32_t revents,
void *data);
typedef int32_t (*qb_ipcs_dispatch_add_fn)(enum qb_loop_priority p,
int32_t fd,
int32_t events,
void *data,
qb_ipcs_dispatch_fn_t fn);
typedef int32_t (*qb_ipcs_dispatch_mod_fn)(enum qb_loop_priority p,
int32_t fd,
int32_t events,
void *data,
qb_ipcs_dispatch_fn_t fn);
typedef int32_t (*qb_ipcs_dispatch_del_fn)(int32_t fd);
typedef int32_t (*qb_ipcs_job_add_fn)(enum qb_loop_priority p,
void *data,
qb_loop_job_dispatch_fn dispatch_fn);
struct qb_ipcs_poll_handlers {
qb_ipcs_job_add_fn job_add;
qb_ipcs_dispatch_add_fn dispatch_add;
qb_ipcs_dispatch_mod_fn dispatch_mod;
qb_ipcs_dispatch_del_fn dispatch_del;
};
/**
* This callback is to check wether you want to accept a new connection.
*
* The type of checks you should do are authentication, service availabilty
* or process resource constraints.
* @return 0 to accept or -errno to indicate a failure (sent back to the client)
*
* @note you can call qb_ipcs_connection_auth_set() within this function.
*/
typedef int32_t (*qb_ipcs_connection_accept_fn) (qb_ipcs_connection_t *c,
uid_t uid, gid_t gid);
/**
* This is called after a new connection has been created.
*/
typedef void (*qb_ipcs_connection_created_fn) (qb_ipcs_connection_t *c);
/**
* This is called after a connection has been disconnected.
*
* @note if you return anything but 0 this function will be
* repeativily called (until 0 is returned).
*/
typedef int32_t (*qb_ipcs_connection_closed_fn) (qb_ipcs_connection_t *c);
/**
* This is called just before a connection is freed.
*/
typedef void (*qb_ipcs_connection_destroyed_fn) (qb_ipcs_connection_t *c);
/**
* This is the message processing calback.
* It is called with the message data.
*/
typedef int32_t (*qb_ipcs_msg_process_fn) (qb_ipcs_connection_t *c,
void *data, size_t size);
struct qb_ipcs_service_handlers {
qb_ipcs_connection_accept_fn connection_accept;
qb_ipcs_connection_created_fn connection_created;
qb_ipcs_msg_process_fn msg_process;
qb_ipcs_connection_closed_fn connection_closed;
qb_ipcs_connection_destroyed_fn connection_destroyed;
};
/**
* Create a new IPC server.
*
* @param name for clients to connect to.
* @param service_id an integer to associate with the service
* @param type transport type.
* @param handlers callbacks.
* @return the new service instance.
*/
qb_ipcs_service_t* qb_ipcs_create(const char *name,
int32_t service_id,
enum qb_ipc_type type,
struct qb_ipcs_service_handlers *handlers);
/**
* Increase the reference counter on the service object.
*
* @param s service instance
*/
void qb_ipcs_ref(qb_ipcs_service_t *s);
/**
* Decrease the reference counter on the service object.
*
* @param s service instance
*/
void qb_ipcs_unref(qb_ipcs_service_t *s);
/**
* Set your poll callbacks.
*
* @param s service instance
* @param handlers the handlers that you want ipcs to use.
*/
void qb_ipcs_poll_handlers_set(qb_ipcs_service_t* s,
struct qb_ipcs_poll_handlers *handlers);
/**
* Associate a "user" pointer with this service.
*
* @param s service instance
* @param context the pointer to associate with this service.
* @see qb_ipcs_service_context_get()
*/
void qb_ipcs_service_context_set(qb_ipcs_service_t* s,
void *context);
/**
* Get the context (set previously)
*
* @param s service instance
* @return the context
* @see qb_ipcs_service_context_set()
*/
void *qb_ipcs_service_context_get(qb_ipcs_service_t* s);
/**
* run the new IPC server.
* @param s service instance
* @return 0 == ok; -errno to indicate a failure. Service is destroyed on failure.
*/
int32_t qb_ipcs_run(qb_ipcs_service_t* s);
/**
* Destroy the IPC server.
*
* @param s service instance to destroy
*/
void qb_ipcs_destroy(qb_ipcs_service_t* s);
/**
* Limit the incomming request rate.
* @param s service instance
* @param rl the new rate
*/
void qb_ipcs_request_rate_limit(qb_ipcs_service_t* s,
enum qb_ipcs_rate_limit rl);
/**
* Send a response to a incomming request.
*
* @param c connection instance
* @param data the message to send
* @param size the size of the message
* @return size sent or -errno for errors
*
* @note the data must include a qb_ipc_response_header at
* the top of the message. The client will read the size field
* to determine how much to recv.
*/
ssize_t qb_ipcs_response_send(qb_ipcs_connection_t *c, const void *data,
size_t size);
/**
* Send a response to a incomming request.
*
* @param c connection instance
* @param iov the iovec struct that points to the message to send
* @param iov_len the number of iovecs.
* @return size sent or -errno for errors
*
* @note the iov[0] must be a qb_ipc_response_header. The client will
* read the size field to determine how much to recv.
*
* @note When send returns -EMSGSIZE, this means the msg is too
* large and will never succeed. To determine the max msg size
* a client can be sent, use qb_ipcs_connection_get_buffer_size()
*/
ssize_t qb_ipcs_response_sendv(qb_ipcs_connection_t *c,
const struct iovec * iov, size_t iov_len);
/**
* Send an asyncronous event message to the client.
*
* @param c connection instance
* @param data the message to send
* @param size the size of the message
* @return size sent or -errno for errors
*
* @note the data must include a qb_ipc_response_header at
* the top of the message. The client will read the size field
* to determine how much to recv.
*
* @note When send returns -EMSGSIZE, this means the msg is too
* large and will never succeed. To determine the max msg size
* a client can be sent, use qb_ipcs_connection_get_buffer_size()
*/
ssize_t qb_ipcs_event_send(qb_ipcs_connection_t *c, const void *data,
size_t size);
/**
* Send an asyncronous event message to the client.
*
* @param c connection instance
* @param iov the iovec struct that points to the message to send
* @param iov_len the number of iovecs.
* @return size sent or -errno for errors
*
* @note the iov[0] must be a qb_ipc_response_header. The client will
* read the size field to determine how much to recv.
*
* @note When send returns -EMSGSIZE, this means the msg is too
* large and will never succeed. To determine the max msg size
* a client can be sent, use qb_ipcs_connection_get_buffer_size()
*/
ssize_t qb_ipcs_event_sendv(qb_ipcs_connection_t *c, const struct iovec * iov,
size_t iov_len);
/**
* Increment the connection's reference counter.
*
* @param c connection instance
*/
void qb_ipcs_connection_ref(qb_ipcs_connection_t *c);
/**
* Decrement the connection's reference counter.
*
* @param c connection instance
*/
void qb_ipcs_connection_unref(qb_ipcs_connection_t *c);
/**
* Disconnect from this client.
*
* @param c connection instance
*/
void qb_ipcs_disconnect(qb_ipcs_connection_t *c);
/**
* Get the service id related to this connection's service.
* (as passed into qb_ipcs_create()
*
* @return service id.
*/
int32_t qb_ipcs_service_id_get(qb_ipcs_connection_t *c);
/**
* Associate a "user" pointer with this connection.
*
* @param context the point to associate with this connection.
* @param c connection instance
* @see qb_ipcs_context_get()
*/
void qb_ipcs_context_set(qb_ipcs_connection_t *c, void *context);
/**
* Get the context (set previously)
*
* @param c connection instance
* @return the context
* @see qb_ipcs_context_set()
*/
void *qb_ipcs_context_get(qb_ipcs_connection_t *c);
/**
* Get the context previously set on the service backing this connection
*
* @param c connection instance
* @return the context
* @see qb_ipcs_service_context_set
*/
void *qb_ipcs_connection_service_context_get(qb_ipcs_connection_t *c);
/**
* Get the connection statistics.
*
* @deprecated from v0.13.0 onwards, use qb_ipcs_connection_stats_get_2
* @param stats (out) the statistics structure
* @param clear_after_read clear stats after copying them into stats
* @param c connection instance
* @return 0 == ok; -errno to indicate a failure
*/
int32_t qb_ipcs_connection_stats_get(qb_ipcs_connection_t *c,
struct qb_ipcs_connection_stats* stats,
int32_t clear_after_read);
/**
* Get (and allocate) the connection statistics.
*
* @param clear_after_read clear stats after copying them into stats
* @param c connection instance
* @retval NULL if no memory or invalid connection
* @retval allocated statistics structure (user must free it).
*/
struct qb_ipcs_connection_stats_2*
qb_ipcs_connection_stats_get_2(qb_ipcs_connection_t *c,
int32_t clear_after_read);
/**
* Get the service statistics.
*
* @param stats (out) the statistics structure
* @param clear_after_read clear stats after copying them into stats
* @param pt service instance
* @return 0 == ok; -errno to indicate a failure
*/
int32_t qb_ipcs_stats_get(qb_ipcs_service_t* pt,
struct qb_ipcs_stats* stats,
int32_t clear_after_read);
/**
* Get the first connection.
*
* @note call qb_ipcs_connection_unref() after using the connection.
*
* @param pt service instance
* @return first connection
*/
qb_ipcs_connection_t * qb_ipcs_connection_first_get(qb_ipcs_service_t* pt);
/**
* Get the next connection.
*
* @note call qb_ipcs_connection_unref() after using the connection.
*
* @param pt service instance
* @param current current connection
* @return next connection
*/
qb_ipcs_connection_t * qb_ipcs_connection_next_get(qb_ipcs_service_t* pt,
qb_ipcs_connection_t *current);
/**
* Set the permissions on and shared memory files so that both processes can
* read and write to them.
*
* @param conn connection instance
* @param uid the user id to set.
* @param gid the group id to set.
* @param mode the mode to set.
*
* @see chmod() chown()
* @note this must be called within the qb_ipcs_connection_accept_fn()
* callback.
*/
void qb_ipcs_connection_auth_set(qb_ipcs_connection_t *conn, uid_t uid,
gid_t gid, mode_t mode);
/**
* Retrieve the connection ipc buffer size. This reflects the
* largest size msg that can be sent or received.
*
* @param conn connection instance
* @return msg size in bytes, negative value on error.
*/
int32_t qb_ipcs_connection_get_buffer_size(qb_ipcs_connection_t *conn);
+/**
+ * Enforce the max buffer size clients must use from the server side.
+ *
+ * @note Setting this will force client connections to use at least
+ * 'max_buf_size' bytes as their buffer size. If this value is not set
+ * on the server, the clients enforce their own buffer sizes.
+ *
+ * @param ipc server instance
+ * @param max buffer size in bytes
+ */
+void qb_ipcs_enforce_buffer_size(qb_ipcs_service_t *s, uint32_t max_buf_size);
+
/* *INDENT-OFF* */
#ifdef __cplusplus
}
#endif
/* *INDENT-ON* */
#endif /* QB_IPCS_H_DEFINED */
diff --git a/lib/ipc_int.h b/lib/ipc_int.h
index 6c0fd22..c22417b 100644
--- a/lib/ipc_int.h
+++ b/lib/ipc_int.h
@@ -1,206 +1,207 @@
/*
* Copyright (C) 2009 Red Hat, Inc.
*
* Author: Steven Dake <sdake@redhat.com>
* Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef QB_IPC_INT_H_DEFINED
#define QB_IPC_INT_H_DEFINED
#include "os_base.h"
#include <dirent.h>
#include <qb/qblist.h>
#include <qb/qbloop.h>
#include <qb/qbipcc.h>
#include <qb/qbipcs.h>
#include <qb/qbipc_common.h>
#include <qb/qbrb.h>
#define QB_IPC_MAX_WAIT_MS 2000
/*
Client Server
SEND CONN REQ ->
ACCEPT & CREATE queues
or DENY
<- SEND ACCEPT(with details)/DENY
*/
struct qb_ipc_connection_request {
struct qb_ipc_request_header hdr;
uint32_t max_msg_size;
} __attribute__ ((aligned(8)));
struct qb_ipc_event_connection_request {
struct qb_ipc_request_header hdr;
intptr_t connection;
} __attribute__ ((aligned(8)));
struct qb_ipc_connection_response {
struct qb_ipc_response_header hdr;
int32_t connection_type;
uint32_t max_msg_size;
intptr_t connection;
char request[PATH_MAX];
char response[PATH_MAX];
char event[PATH_MAX];
} __attribute__ ((aligned(8)));
struct qb_ipcc_connection;
struct qb_ipc_one_way {
size_t max_msg_size;
enum qb_ipc_type type;
union {
struct {
int32_t sock;
char *sock_name;
void* shared_data;
char shared_file_name[NAME_MAX];
} us;
struct {
qb_ringbuffer_t *rb;
} shm;
} u;
};
struct qb_ipcc_funcs {
ssize_t (*recv)(struct qb_ipc_one_way *one_way, void *buf, size_t buf_size, int32_t timeout);
ssize_t (*send)(struct qb_ipc_one_way *one_way, const void *data, size_t size);
ssize_t (*sendv)(struct qb_ipc_one_way *one_way, const struct iovec *iov, size_t iov_len);
void (*disconnect)(struct qb_ipcc_connection* c);
int32_t (*fc_get)(struct qb_ipc_one_way *one_way);
};
struct qb_ipcc_connection {
char name[NAME_MAX];
int32_t needs_sock_for_poll;
struct qb_ipc_one_way setup;
struct qb_ipc_one_way request;
struct qb_ipc_one_way response;
struct qb_ipc_one_way event;
struct qb_ipcc_funcs funcs;
struct qb_ipc_request_header *receive_buf;
uint32_t fc_enable_max;
int32_t is_connected;
void * context;
};
int32_t qb_ipcc_us_setup_connect(struct qb_ipcc_connection *c,
struct qb_ipc_connection_response *r);
ssize_t qb_ipc_us_send(struct qb_ipc_one_way *one_way, const void *msg, size_t len);
ssize_t qb_ipc_us_recv(struct qb_ipc_one_way *one_way, void *msg, size_t len, int32_t timeout);
int32_t qb_ipc_us_ready(struct qb_ipc_one_way *ow_data, struct qb_ipc_one_way *ow_conn,
int32_t ms_timeout, int32_t events);
void qb_ipcc_us_sock_close(int32_t sock);
int32_t qb_ipcc_us_connect(struct qb_ipcc_connection *c, struct qb_ipc_connection_response * response);
int32_t qb_ipcc_shm_connect(struct qb_ipcc_connection *c, struct qb_ipc_connection_response * response);
struct qb_ipcs_service;
struct qb_ipcs_connection;
struct qb_ipcs_funcs {
int32_t (*connect)(struct qb_ipcs_service *s, struct qb_ipcs_connection *c,
struct qb_ipc_connection_response *r);
void (*disconnect)(struct qb_ipcs_connection *c);
ssize_t (*recv)(struct qb_ipc_one_way *one_way, void *buf, size_t buf_size, int32_t timeout);
ssize_t (*peek)(struct qb_ipc_one_way *one_way, void **data_out, int32_t timeout);
void (*reclaim)(struct qb_ipc_one_way *one_way);
ssize_t (*send)(struct qb_ipc_one_way *one_way, const void *data, size_t size);
ssize_t (*sendv)(struct qb_ipc_one_way *one_way, const struct iovec* iov, size_t iov_len);
void (*fc_set)(struct qb_ipc_one_way *one_way, int32_t fc_enable);
ssize_t (*q_len_get)(struct qb_ipc_one_way *one_way);
};
struct qb_ipcs_service {
enum qb_ipc_type type;
char name[NAME_MAX];
+ uint32_t max_buffer_size;
int32_t service_id;
int32_t ref_count;
pid_t pid;
int32_t needs_sock_for_poll;
int32_t server_sock;
struct qb_ipcs_service_handlers serv_fns;
struct qb_ipcs_poll_handlers poll_fns;
struct qb_ipcs_funcs funcs;
enum qb_loop_priority poll_priority;
struct qb_list_head connections;
struct qb_list_head list;
struct qb_ipcs_stats stats;
void *context;
};
enum qb_ipcs_connection_state {
QB_IPCS_CONNECTION_INACTIVE,
QB_IPCS_CONNECTION_ACTIVE,
QB_IPCS_CONNECTION_ESTABLISHED,
QB_IPCS_CONNECTION_SHUTTING_DOWN,
};
#define CONNECTION_DESCRIPTION (16)
struct qb_ipcs_connection_auth {
uid_t uid;
gid_t gid;
mode_t mode;
};
struct qb_ipcs_connection {
enum qb_ipcs_connection_state state;
int32_t refcount;
pid_t pid;
uid_t euid;
gid_t egid;
struct qb_ipcs_connection_auth auth;
struct qb_ipc_one_way setup;
struct qb_ipc_one_way request;
struct qb_ipc_one_way response;
struct qb_ipc_one_way event;
struct qb_ipcs_service *service;
struct qb_list_head list;
struct qb_ipc_request_header *receive_buf;
void *context;
int32_t fc_enabled;
int32_t poll_events;
int32_t outstanding_notifiers;
char description[CONNECTION_DESCRIPTION];
struct qb_ipcs_connection_stats_2 stats;
};
void qb_ipcs_us_init(struct qb_ipcs_service *s);
void qb_ipcs_shm_init(struct qb_ipcs_service *s);
int32_t qb_ipcs_us_publish(struct qb_ipcs_service *s);
int32_t qb_ipcs_us_withdraw(struct qb_ipcs_service *s);
int32_t qb_ipcc_us_sock_connect(const char *socket_name, int32_t * sock_pt);
int32_t qb_ipcs_dispatch_connection_request(int32_t fd, int32_t revents, void *data);
struct qb_ipcs_connection* qb_ipcs_connection_alloc(struct qb_ipcs_service *s);
int32_t qb_ipcs_process_request(struct qb_ipcs_service *s,
struct qb_ipc_request_header *hdr);
int32_t qb_ipc_us_sock_error_is_disconnected(int err);
#endif /* QB_IPC_INT_H_DEFINED */
diff --git a/lib/ipc_setup.c b/lib/ipc_setup.c
index b6beb6a..3c22e6f 100644
--- a/lib/ipc_setup.c
+++ b/lib/ipc_setup.c
@@ -1,723 +1,723 @@
/*
* Copyright (C) 2010,2013 Red Hat, Inc.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os_base.h"
#if defined(HAVE_GETPEERUCRED)
#include <ucred.h>
#endif
#ifdef HAVE_SYS_UN_H
#include <sys/un.h>
#endif /* HAVE_SYS_UN_H */
#ifdef HAVE_SYS_STAT_H
#include <sys/stat.h>
#endif
#ifdef HAVE_SYS_MMAN_H
#include <sys/mman.h>
#endif
#include <qb/qbatomic.h>
#include <qb/qbipcs.h>
#include <qb/qbloop.h>
#include <qb/qbdefs.h>
#include "util_int.h"
#include "ipc_int.h"
struct ipc_auth_ugp {
uid_t uid;
gid_t gid;
pid_t pid;
};
static int32_t qb_ipcs_us_connection_acceptor(int fd, int revent, void *data);
ssize_t
qb_ipc_us_send(struct qb_ipc_one_way *one_way, const void *msg, size_t len)
{
int32_t result;
int32_t processed = 0;
char *rbuf = (char *)msg;
qb_sigpipe_ctl(QB_SIGPIPE_IGNORE);
retry_send:
result = send(one_way->u.us.sock,
&rbuf[processed], len - processed, MSG_NOSIGNAL);
if (result == -1) {
if (errno == EAGAIN && processed > 0) {
goto retry_send;
} else {
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
return -errno;
}
}
processed += result;
if (processed != len) {
goto retry_send;
}
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
return processed;
}
static ssize_t
qb_ipc_us_recv_msghdr(int32_t s, struct msghdr *hdr, char *msg, size_t len)
{
int32_t result;
int32_t processed = 0;
qb_sigpipe_ctl(QB_SIGPIPE_IGNORE);
retry_recv:
hdr->msg_iov->iov_base = &msg[processed];
hdr->msg_iov->iov_len = len - processed;
result = recvmsg(s, hdr, MSG_NOSIGNAL | MSG_WAITALL);
if (result == -1 && errno == EAGAIN) {
goto retry_recv;
}
if (result == -1) {
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
return -errno;
}
if (result == 0) {
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
qb_util_log(LOG_DEBUG,
"recv(fd %d) got 0 bytes assuming ENOTCONN", s);
return -ENOTCONN;
}
processed += result;
if (processed != len) {
goto retry_recv;
}
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
assert(processed == len);
return processed;
}
int32_t
qb_ipc_us_sock_error_is_disconnected(int err)
{
if (err >= 0) {
return QB_FALSE;
} else if (err == -EAGAIN ||
err == -ETIMEDOUT ||
err == -EINTR ||
#ifdef EWOULDBLOCK
err == -EWOULDBLOCK ||
#endif
err == -EMSGSIZE ||
err == -ENOMSG ||
err == -EINVAL) {
return QB_FALSE;
}
return QB_TRUE;
}
int32_t
qb_ipc_us_ready(struct qb_ipc_one_way * ow_data,
struct qb_ipc_one_way * ow_conn,
int32_t ms_timeout, int32_t events)
{
struct pollfd ufds[2];
int32_t poll_events;
int numfds = 1;
int i;
ufds[0].fd = ow_data->u.us.sock;
ufds[0].events = events;
ufds[0].revents = 0;
if (ow_conn && ow_data != ow_conn) {
numfds++;
ufds[1].fd = ow_conn->u.us.sock;
ufds[1].events = POLLIN;
ufds[1].revents = 0;
}
poll_events = poll(ufds, numfds, ms_timeout);
if ((poll_events == -1 && errno == EINTR) || poll_events == 0) {
return -EAGAIN;
} else if (poll_events == -1) {
return -errno;
}
for (i = 0; i < poll_events; i++) {
if (ufds[i].revents & POLLERR) {
qb_util_log(LOG_DEBUG, "poll(fd %d) got POLLERR",
ufds[i].fd);
return -ENOTCONN;
} else if (ufds[i].revents & POLLHUP) {
qb_util_log(LOG_DEBUG, "poll(fd %d) got POLLHUP",
ufds[i].fd);
return -ENOTCONN;
} else if (ufds[i].revents & POLLNVAL) {
qb_util_log(LOG_DEBUG, "poll(fd %d) got POLLNVAL",
ufds[i].fd);
return -ENOTCONN;
} else if (ufds[i].revents == 0) {
qb_util_log(LOG_DEBUG, "poll(fd %d) zero revents",
ufds[i].fd);
return -ENOTCONN;
}
}
return 0;
}
/*
* recv an entire message - and try hard to get all of it.
*/
ssize_t
qb_ipc_us_recv(struct qb_ipc_one_way * one_way,
void *msg, size_t len, int32_t timeout)
{
int32_t result;
int32_t final_rc = 0;
int32_t processed = 0;
int32_t to_recv = len;
char *data = msg;
qb_sigpipe_ctl(QB_SIGPIPE_IGNORE);
retry_recv:
result = recv(one_way->u.us.sock, &data[processed], to_recv,
MSG_NOSIGNAL | MSG_WAITALL);
if (result == -1) {
if (errno == EAGAIN && (processed > 0 || timeout == -1)) {
result = qb_ipc_us_ready(one_way, NULL, timeout, POLLIN);
if (result == 0 || result == -EAGAIN) {
goto retry_recv;
}
final_rc = result;
goto cleanup_sigpipe;
} else if (errno == ECONNRESET || errno == EPIPE) {
final_rc = -ENOTCONN;
goto cleanup_sigpipe;
} else {
final_rc = -errno;
goto cleanup_sigpipe;
}
}
if (result == 0) {
final_rc = -ENOTCONN;
goto cleanup_sigpipe;
}
processed += result;
to_recv -= result;
if (processed != len) {
goto retry_recv;
}
final_rc = processed;
cleanup_sigpipe:
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
return final_rc;
}
static int32_t
qb_ipcc_stream_sock_connect(const char *socket_name, int32_t * sock_pt)
{
int32_t request_fd;
struct sockaddr_un address;
int32_t res = 0;
request_fd = socket(PF_UNIX, SOCK_STREAM, 0);
if (request_fd == -1) {
return -errno;
}
qb_socket_nosigpipe(request_fd);
res = qb_sys_fd_nonblock_cloexec_set(request_fd);
if (res < 0) {
goto error_connect;
}
memset(&address, 0, sizeof(struct sockaddr_un));
address.sun_family = AF_UNIX;
#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
address.sun_len = QB_SUN_LEN(&address);
#endif
#if defined(QB_LINUX) || defined(QB_CYGWIN)
snprintf(address.sun_path + 1, UNIX_PATH_MAX - 1, "%s", socket_name);
#else
snprintf(address.sun_path, sizeof(address.sun_path), "%s/%s", SOCKETDIR,
socket_name);
#endif
if (connect(request_fd, (struct sockaddr *)&address,
QB_SUN_LEN(&address)) == -1) {
res = -errno;
goto error_connect;
}
*sock_pt = request_fd;
return 0;
error_connect:
close(request_fd);
*sock_pt = -1;
return res;
}
void
qb_ipcc_us_sock_close(int32_t sock)
{
shutdown(sock, SHUT_RDWR);
close(sock);
}
int32_t
qb_ipcc_us_setup_connect(struct qb_ipcc_connection *c,
struct qb_ipc_connection_response *r)
{
int32_t res;
struct qb_ipc_connection_request request;
#ifdef QB_LINUX
int off = 0;
int on = 1;
#endif
res = qb_ipcc_stream_sock_connect(c->name, &c->setup.u.us.sock);
if (res != 0) {
return res;
}
#ifdef QB_LINUX
setsockopt(c->setup.u.us.sock, SOL_SOCKET, SO_PASSCRED, &on,
sizeof(on));
#endif
memset(&request, 0, sizeof(request));
request.hdr.id = QB_IPC_MSG_AUTHENTICATE;
request.hdr.size = sizeof(request);
request.max_msg_size = c->setup.max_msg_size;
res = qb_ipc_us_send(&c->setup, &request, request.hdr.size);
if (res < 0) {
qb_ipcc_us_sock_close(c->setup.u.us.sock);
return res;
}
#ifdef QB_LINUX
setsockopt(c->setup.u.us.sock, SOL_SOCKET, SO_PASSCRED, &off,
sizeof(off));
#endif
res =
qb_ipc_us_recv(&c->setup, r,
sizeof(struct qb_ipc_connection_response), -1);
if (res < 0) {
return res;
}
if (r->hdr.error != 0) {
return r->hdr.error;
}
return 0;
}
/*
**************************************************************************
* SERVER
*/
int32_t
qb_ipcs_us_publish(struct qb_ipcs_service * s)
{
struct sockaddr_un un_addr;
int32_t res;
#ifdef SO_PASSCRED
int on = 1;
#endif
/*
* Create socket for IPC clients, name socket, listen for connections
*/
s->server_sock = socket(PF_UNIX, SOCK_STREAM, 0);
if (s->server_sock == -1) {
res = -errno;
qb_util_perror(LOG_ERR, "Cannot create server socket");
return res;
}
res = qb_sys_fd_nonblock_cloexec_set(s->server_sock);
if (res < 0) {
goto error_close;
}
memset(&un_addr, 0, sizeof(struct sockaddr_un));
un_addr.sun_family = AF_UNIX;
#if defined(QB_BSD) || defined(QB_DARWIN)
un_addr.sun_len = SUN_LEN(&un_addr);
#endif
qb_util_log(LOG_INFO, "server name: %s", s->name);
#if defined(QB_LINUX) || defined(QB_CYGWIN)
snprintf(un_addr.sun_path + 1, UNIX_PATH_MAX - 1, "%s", s->name);
#else
{
struct stat stat_out;
res = stat(SOCKETDIR, &stat_out);
if (res == -1 || (res == 0 && !S_ISDIR(stat_out.st_mode))) {
res = -errno;
qb_util_log(LOG_CRIT,
"Required directory not present %s",
SOCKETDIR);
goto error_close;
}
snprintf(un_addr.sun_path, sizeof(un_addr.sun_path), "%s/%s", SOCKETDIR,
s->name);
unlink(un_addr.sun_path);
}
#endif
res = bind(s->server_sock, (struct sockaddr *)&un_addr,
QB_SUN_LEN(&un_addr));
if (res) {
res = -errno;
qb_util_perror(LOG_ERR, "Could not bind AF_UNIX (%s)",
un_addr.sun_path);
goto error_close;
}
/*
* Allow everyone to write to the socket since the IPC layer handles
* security automatically
*/
#if !defined(QB_LINUX) && !defined(QB_CYGWIN)
res = chmod(un_addr.sun_path, S_IRWXU | S_IRWXG | S_IRWXO);
#endif
#ifdef SO_PASSCRED
setsockopt(s->server_sock, SOL_SOCKET, SO_PASSCRED, &on, sizeof(on));
#endif
if (listen(s->server_sock, SERVER_BACKLOG) == -1) {
qb_util_perror(LOG_ERR, "socket listen failed");
}
res = s->poll_fns.dispatch_add(s->poll_priority, s->server_sock,
POLLIN | POLLPRI | POLLNVAL,
s, qb_ipcs_us_connection_acceptor);
return res;
error_close:
close(s->server_sock);
return res;
}
int32_t
qb_ipcs_us_withdraw(struct qb_ipcs_service * s)
{
qb_util_log(LOG_INFO, "withdrawing server sockets");
shutdown(s->server_sock, SHUT_RDWR);
close(s->server_sock);
return 0;
}
static int32_t
handle_new_connection(struct qb_ipcs_service *s,
int32_t auth_result,
int32_t sock,
void *msg, size_t len, struct ipc_auth_ugp *ugp)
{
struct qb_ipcs_connection *c = NULL;
struct qb_ipc_connection_request *req = msg;
int32_t res = auth_result;
int32_t res2 = 0;
struct qb_ipc_connection_response response;
c = qb_ipcs_connection_alloc(s);
if (c == NULL) {
qb_ipcc_us_sock_close(sock);
return -ENOMEM;
}
c->receive_buf = calloc(1, req->max_msg_size);
if (c->receive_buf == NULL) {
free(c);
qb_ipcc_us_sock_close(sock);
return -ENOMEM;
}
c->setup.u.us.sock = sock;
- c->request.max_msg_size = req->max_msg_size;
- c->response.max_msg_size = req->max_msg_size;
- c->event.max_msg_size = req->max_msg_size;
+ c->request.max_msg_size = QB_MAX(req->max_msg_size, s->max_buffer_size);
+ c->response.max_msg_size = QB_MAX(req->max_msg_size, s->max_buffer_size);
+ c->event.max_msg_size = QB_MAX(req->max_msg_size, s->max_buffer_size);
c->pid = ugp->pid;
c->auth.uid = c->euid = ugp->uid;
c->auth.gid = c->egid = ugp->gid;
c->auth.mode = 0600;
c->stats.client_pid = ugp->pid;
snprintf(c->description, CONNECTION_DESCRIPTION,
"%d-%d-%d", s->pid, ugp->pid, c->setup.u.us.sock);
if (auth_result == 0 && c->service->serv_fns.connection_accept) {
res = c->service->serv_fns.connection_accept(c,
c->euid, c->egid);
}
if (res != 0) {
goto send_response;
}
qb_util_log(LOG_DEBUG, "IPC credentials authenticated (%s)",
c->description);
memset(&response, 0, sizeof(response));
if (s->funcs.connect) {
res = s->funcs.connect(s, c, &response);
if (res != 0) {
goto send_response;
}
}
/*
* The connection is good, add it to the active connection list
*/
c->state = QB_IPCS_CONNECTION_ACTIVE;
qb_list_add(&c->list, &s->connections);
send_response:
response.hdr.id = QB_IPC_MSG_AUTHENTICATE;
response.hdr.size = sizeof(response);
response.hdr.error = res;
if (res == 0) {
response.connection = (intptr_t) c;
response.connection_type = s->type;
response.max_msg_size = c->request.max_msg_size;
s->stats.active_connections++;
}
res2 = qb_ipc_us_send(&c->setup, &response, response.hdr.size);
if (res == 0 && res2 != response.hdr.size) {
res = res2;
}
if (res == 0) {
qb_ipcs_connection_ref(c);
if (s->serv_fns.connection_created) {
s->serv_fns.connection_created(c);
}
if (c->state == QB_IPCS_CONNECTION_ACTIVE) {
c->state = QB_IPCS_CONNECTION_ESTABLISHED;
}
qb_ipcs_connection_unref(c);
} else {
if (res == -EACCES) {
qb_util_log(LOG_ERR, "Invalid IPC credentials (%s).",
c->description);
} else {
errno = -res;
qb_util_perror(LOG_ERR,
"Error in connection setup (%s)",
c->description);
}
qb_ipcs_disconnect(c);
}
return res;
}
static int32_t
qb_ipcs_uc_recv_and_auth(int32_t sock, void *msg, size_t len,
struct ipc_auth_ugp *ugp)
{
int32_t res = 0;
struct msghdr msg_recv;
struct iovec iov_recv;
#ifdef SO_PASSCRED
char cmsg_cred[CMSG_SPACE(sizeof(struct ucred))];
int off = 0;
int on = 1;
#endif
msg_recv.msg_iov = &iov_recv;
msg_recv.msg_iovlen = 1;
msg_recv.msg_name = 0;
msg_recv.msg_namelen = 0;
#ifdef SO_PASSCRED
msg_recv.msg_control = (void *)cmsg_cred;
msg_recv.msg_controllen = sizeof(cmsg_cred);
#endif
#ifdef QB_SOLARIS
msg_recv.msg_accrights = 0;
msg_recv.msg_accrightslen = 0;
#else
msg_recv.msg_flags = 0;
#endif /* QB_SOLARIS */
iov_recv.iov_base = msg;
iov_recv.iov_len = len;
#ifdef SO_PASSCRED
setsockopt(sock, SOL_SOCKET, SO_PASSCRED, &on, sizeof(on));
#endif
res = qb_ipc_us_recv_msghdr(sock, &msg_recv, msg, len);
if (res < 0) {
goto cleanup_and_return;
}
if (res != len) {
res = -EIO;
goto cleanup_and_return;
}
/*
* currently support getpeerucred, getpeereid, and SO_PASSCRED credential
* retrieval mechanisms for various Platforms
*/
#ifdef HAVE_GETPEERUCRED
/*
* Solaris and some BSD systems
*/
{
ucred_t *uc = NULL;
if (getpeerucred(sock, &uc) == 0) {
res = 0;
ugp->uid = ucred_geteuid(uc);
ugp->gid = ucred_getegid(uc);
ugp->pid = ucred_getpid(uc);
ucred_free(uc);
} else {
res = -errno;
}
}
#elif HAVE_GETPEEREID
/*
* Usually MacOSX systems
*/
{
/*
* TODO get the peer's pid.
* c->pid = ?;
*/
if (getpeereid(sock, &ugp->uid, &ugp->gid) == 0) {
res = 0;
} else {
res = -errno;
}
}
#elif SO_PASSCRED
/*
* Usually Linux systems
*/
{
struct ucred cred;
struct cmsghdr *cmsg;
res = -EINVAL;
for (cmsg = CMSG_FIRSTHDR(&msg_recv); cmsg != NULL;
cmsg = CMSG_NXTHDR(&msg_recv, cmsg)) {
if (cmsg->cmsg_type != SCM_CREDENTIALS)
continue;
memcpy(&cred, CMSG_DATA(cmsg), sizeof(struct ucred));
res = 0;
ugp->pid = cred.pid;
ugp->uid = cred.uid;
ugp->gid = cred.gid;
break;
}
}
#else /* no credentials */
ugp->pid = 0;
ugp->uid = 0;
ugp->gid = 0;
res = -ENOTSUP;
#endif /* no credentials */
cleanup_and_return:
#ifdef SO_PASSCRED
setsockopt(sock, SOL_SOCKET, SO_PASSCRED, &off, sizeof(off));
#endif
return res;
}
static int32_t
qb_ipcs_us_connection_acceptor(int fd, int revent, void *data)
{
struct sockaddr_un un_addr;
int32_t new_fd;
struct qb_ipcs_service *s = (struct qb_ipcs_service *)data;
int32_t res;
struct qb_ipc_connection_request setup_msg;
struct ipc_auth_ugp ugp;
socklen_t addrlen = sizeof(struct sockaddr_un);
if (revent & (POLLNVAL | POLLHUP | POLLERR)) {
/*
* handle shutdown more cleanly.
*/
return -1;
}
retry_accept:
errno = 0;
new_fd = accept(fd, (struct sockaddr *)&un_addr, &addrlen);
if (new_fd == -1 && errno == EINTR) {
goto retry_accept;
}
if (new_fd == -1 && errno == EBADF) {
qb_util_perror(LOG_ERR,
"Could not accept client connection from fd:%d",
fd);
return -1;
}
if (new_fd == -1) {
qb_util_perror(LOG_ERR, "Could not accept client connection");
/* This is an error, but -1 would indicate disconnect
* from the poll loop
*/
return 0;
}
res = qb_sys_fd_nonblock_cloexec_set(new_fd);
if (res < 0) {
close(new_fd);
/* This is an error, but -1 would indicate disconnect
* from the poll loop
*/
return 0;
}
res = qb_ipcs_uc_recv_and_auth(new_fd, &setup_msg, sizeof(setup_msg),
&ugp);
if (res < 0) {
close(new_fd);
/* This is an error, but -1 would indicate disconnect
* from the poll loop
*/
return 0;
}
if (setup_msg.hdr.id == QB_IPC_MSG_AUTHENTICATE) {
(void)handle_new_connection(s, res, new_fd, &setup_msg,
sizeof(setup_msg), &ugp);
} else {
close(new_fd);
}
return 0;
}
diff --git a/lib/ipcc.c b/lib/ipcc.c
index 782cc53..061cb1c 100644
--- a/lib/ipcc.c
+++ b/lib/ipcc.c
@@ -1,424 +1,434 @@
/*
* Copyright (C) 2010 Red Hat, Inc.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os_base.h"
#include "ipc_int.h"
#include "util_int.h"
#include <qb/qbdefs.h>
#include <qb/qbipcc.h>
qb_ipcc_connection_t *
qb_ipcc_connect(const char *name, size_t max_msg_size)
{
int32_t res;
qb_ipcc_connection_t *c = NULL;
struct qb_ipc_connection_response response;
c = calloc(1, sizeof(struct qb_ipcc_connection));
if (c == NULL) {
return NULL;
}
c->setup.max_msg_size = QB_MAX(max_msg_size,
sizeof(struct qb_ipc_connection_response));
(void)strlcpy(c->name, name, NAME_MAX);
res = qb_ipcc_us_setup_connect(c, &response);
if (res < 0) {
goto disconnect_and_cleanup;
}
c->response.type = response.connection_type;
c->request.type = response.connection_type;
c->event.type = response.connection_type;
c->setup.type = response.connection_type;
c->response.max_msg_size = response.max_msg_size;
c->request.max_msg_size = response.max_msg_size;
c->event.max_msg_size = response.max_msg_size;
c->receive_buf = calloc(1, response.max_msg_size);
c->fc_enable_max = 1;
if (c->receive_buf == NULL) {
res = -ENOMEM;
goto disconnect_and_cleanup;
}
switch (c->request.type) {
case QB_IPC_SHM:
res = qb_ipcc_shm_connect(c, &response);
break;
case QB_IPC_SOCKET:
res = qb_ipcc_us_connect(c, &response);
break;
case QB_IPC_POSIX_MQ:
case QB_IPC_SYSV_MQ:
res = -ENOTSUP;
break;
default:
res = -EINVAL;
break;
}
if (res != 0) {
goto disconnect_and_cleanup;
}
c->is_connected = QB_TRUE;
return c;
disconnect_and_cleanup:
qb_ipcc_us_sock_close(c->setup.u.us.sock);
free(c->receive_buf);
free(c);
errno = -res;
return NULL;
}
static int32_t
_check_connection_state_with(struct qb_ipcc_connection * c, int32_t res,
struct qb_ipc_one_way * one_way,
int32_t ms_timeout, int32_t events)
{
if (res >= 0) return res;
if (qb_ipc_us_sock_error_is_disconnected(res)) {
errno = -res;
qb_util_perror(LOG_DEBUG,
"interpreting result %d as a disconnect",
res);
c->is_connected = QB_FALSE;
}
if (res == -EAGAIN || res == -ETIMEDOUT) {
int32_t res2;
int32_t poll_ms = ms_timeout;
if (res == -ETIMEDOUT) {
poll_ms = 0;
}
res2 = qb_ipc_us_ready(one_way, &c->setup, poll_ms, events);
if (qb_ipc_us_sock_error_is_disconnected(res2)) {
errno = -res2;
qb_util_perror(LOG_DEBUG,
"%s %d %s",
"interpreting result",
res2,
"(from socket) as a disconnect");
c->is_connected = QB_FALSE;
}
res = res2;
}
return res;
}
static int32_t
_check_connection_state(struct qb_ipcc_connection * c, int32_t res)
{
if (res >= 0) return res;
if (qb_ipc_us_sock_error_is_disconnected(res)) {
errno = -res;
qb_util_perror(LOG_DEBUG,
"interpreting result %d as a disconnect",
res);
c->is_connected = QB_FALSE;
}
return res;
}
static struct qb_ipc_one_way *
_event_sock_one_way_get(struct qb_ipcc_connection * c)
{
if (c->needs_sock_for_poll) {
return &c->setup;
}
return &c->event;
}
static struct qb_ipc_one_way *
_response_sock_one_way_get(struct qb_ipcc_connection * c)
{
if (c->needs_sock_for_poll) {
return &c->setup;
}
return &c->response;
}
ssize_t
qb_ipcc_send(struct qb_ipcc_connection * c, const void *msg_ptr, size_t msg_len)
{
ssize_t res;
ssize_t res2;
if (c == NULL) {
return -EINVAL;
}
if (msg_len > c->request.max_msg_size) {
return -EMSGSIZE;
}
if (c->funcs.fc_get) {
res = c->funcs.fc_get(&c->request);
if (res < 0) {
return res;
} else if (res > 0 && res <= c->fc_enable_max) {
return -EAGAIN;
} else {
/*
* we can transmit
*/
}
}
res = c->funcs.send(&c->request, msg_ptr, msg_len);
if (res == msg_len && c->needs_sock_for_poll) {
do {
res2 = qb_ipc_us_send(&c->setup, msg_ptr, 1);
} while (res2 == -EAGAIN);
if (res2 == -EPIPE) {
res2 = -ENOTCONN;
}
if (res2 != 1) {
res = res2;
}
}
return _check_connection_state(c, res);
}
int32_t
qb_ipcc_fc_enable_max_set(struct qb_ipcc_connection * c, uint32_t max)
{
if (c == NULL || max > 2) {
return -EINVAL;
}
c->fc_enable_max = max;
return 0;
}
ssize_t
qb_ipcc_sendv(struct qb_ipcc_connection * c, const struct iovec * iov,
size_t iov_len)
{
int32_t total_size = 0;
int32_t i;
int32_t res;
int32_t res2;
for (i = 0; i < iov_len; i++) {
total_size += iov[i].iov_len;
}
if (c == NULL) {
return -EINVAL;
}
if (total_size > c->request.max_msg_size) {
return -EMSGSIZE;
}
if (c->funcs.fc_get) {
res = c->funcs.fc_get(&c->request);
if (res < 0) {
return res;
} else if (res > 0 && res <= c->fc_enable_max) {
return -EAGAIN;
} else {
/*
* we can transmit
*/
}
}
res = c->funcs.sendv(&c->request, iov, iov_len);
if (res > 0 && c->needs_sock_for_poll) {
do {
res2 = qb_ipc_us_send(&c->setup, &res, 1);
} while (res2 == -EAGAIN);
if (res2 == -EPIPE) {
res2 = -ENOTCONN;
}
if (res2 != 1) {
res = res2;
}
}
return _check_connection_state(c, res);
}
ssize_t
qb_ipcc_recv(struct qb_ipcc_connection * c, void *msg_ptr,
size_t msg_len, int32_t ms_timeout)
{
int32_t res = 0;
if (c == NULL) {
return -EINVAL;
}
res = c->funcs.recv(&c->response, msg_ptr, msg_len, ms_timeout);
return _check_connection_state_with(c, res,
_response_sock_one_way_get(c),
ms_timeout, POLLIN);
}
ssize_t
qb_ipcc_sendv_recv(qb_ipcc_connection_t * c,
const struct iovec * iov, uint32_t iov_len,
void *res_msg, size_t res_len, int32_t ms_timeout)
{
ssize_t res = 0;
int32_t timeout_now;
int32_t timeout_rem = ms_timeout;
if (c == NULL) {
return -EINVAL;
}
if (c->funcs.fc_get) {
res = c->funcs.fc_get(&c->request);
if (res < 0) {
return res;
} else if (res > 0 && res <= c->fc_enable_max) {
return -EAGAIN;
} else {
/*
* we can transmit
*/
}
}
res = qb_ipcc_sendv(c, iov, iov_len);
if (res < 0) {
return res;
}
do {
if (timeout_rem > QB_IPC_MAX_WAIT_MS || ms_timeout == -1) {
timeout_now = QB_IPC_MAX_WAIT_MS;
} else {
timeout_now = timeout_rem;
}
res = qb_ipcc_recv(c, res_msg, res_len, timeout_now);
if (res == -ETIMEDOUT) {
if (ms_timeout < 0) {
res = -EAGAIN;
} else {
timeout_rem -= timeout_now;
if (timeout_rem > 0) {
res = -EAGAIN;
}
}
} else if (res < 0 && res != -EAGAIN) {
errno = -res;
qb_util_perror(LOG_DEBUG,
"qb_ipcc_recv %d timeout:(%d/%d)",
res, timeout_now, timeout_rem);
}
} while (res == -EAGAIN && c->is_connected);
return res;
}
int32_t
qb_ipcc_fd_get(struct qb_ipcc_connection * c, int32_t * fd)
{
if (c == NULL) {
return -EINVAL;
}
if (c->event.type == QB_IPC_SOCKET) {
*fd = c->event.u.us.sock;
} else {
*fd = c->setup.u.us.sock;
}
return 0;
}
ssize_t
qb_ipcc_event_recv(struct qb_ipcc_connection * c, void *msg_pt,
size_t msg_len, int32_t ms_timeout)
{
char one_byte = 1;
int32_t res;
ssize_t size;
if (c == NULL) {
return -EINVAL;
}
res = _check_connection_state_with(c, -EAGAIN, _event_sock_one_way_get(c),
ms_timeout, POLLIN);
if (res < 0) {
return res;
}
size = c->funcs.recv(&c->event, msg_pt, msg_len, ms_timeout);
if (size > 0 && c->needs_sock_for_poll) {
res = qb_ipc_us_recv(&c->setup, &one_byte, 1, -1);
if (res != 1) {
size = res;
}
}
return _check_connection_state(c, size);
}
void
qb_ipcc_disconnect(struct qb_ipcc_connection *c)
{
struct qb_ipc_one_way *ow = NULL;
qb_util_log(LOG_DEBUG, "%s()", __func__);
if (c == NULL) {
return;
}
ow = _event_sock_one_way_get(c);
(void)_check_connection_state_with(c, -EAGAIN, ow, 0, POLLIN);
if (c->funcs.disconnect) {
c->funcs.disconnect(c);
}
free(c->receive_buf);
free(c);
}
void
qb_ipcc_context_set(struct qb_ipcc_connection *c, void *context)
{
if (c == NULL) {
return;
}
c->context = context;
}
void *qb_ipcc_context_get(struct qb_ipcc_connection *c)
{
if (c == NULL) {
return NULL;
}
return c->context;
}
int32_t
qb_ipcc_is_connected(qb_ipcc_connection_t *c)
{
struct qb_ipc_one_way *ow;
if (c == NULL) {
return QB_FALSE;
}
ow = _response_sock_one_way_get(c);
(void)_check_connection_state_with(c, -EAGAIN, ow, 0, POLLIN);
return c->is_connected;
}
+
+int32_t
+qb_ipcc_get_buffer_size(qb_ipcc_connection_t * c)
+{
+ if (c == NULL) {
+ return -EINVAL;
+ }
+
+ return c->event.max_msg_size;
+}
diff --git a/lib/ipcs.c b/lib/ipcs.c
index 5a54983..f21d093 100644
--- a/lib/ipcs.c
+++ b/lib/ipcs.c
@@ -1,956 +1,964 @@
/*
* Copyright (C) 2010 Red Hat, Inc.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os_base.h"
#include "util_int.h"
#include "ipc_int.h"
#include <qb/qbdefs.h>
#include <qb/qbatomic.h>
#include <qb/qbipcs.h>
static void qb_ipcs_flowcontrol_set(struct qb_ipcs_connection *c,
int32_t fc_enable);
static int32_t
new_event_notification(struct qb_ipcs_connection * c);
static QB_LIST_DECLARE(qb_ipc_services);
qb_ipcs_service_t *
qb_ipcs_create(const char *name,
int32_t service_id,
enum qb_ipc_type type, struct qb_ipcs_service_handlers *handlers)
{
struct qb_ipcs_service *s;
s = calloc(1, sizeof(struct qb_ipcs_service));
if (s == NULL) {
return NULL;
}
if (type == QB_IPC_NATIVE) {
#ifdef DISABLE_IPC_SHM
s->type = QB_IPC_SOCKET;
#else
s->type = QB_IPC_SHM;
#endif /* DISABLE_IPC_SHM */
} else {
s->type = type;
}
s->pid = getpid();
s->needs_sock_for_poll = QB_FALSE;
s->poll_priority = QB_LOOP_MED;
/* Initial alloc ref */
qb_ipcs_ref(s);
s->service_id = service_id;
(void)strlcpy(s->name, name, NAME_MAX);
s->serv_fns.connection_accept = handlers->connection_accept;
s->serv_fns.connection_created = handlers->connection_created;
s->serv_fns.msg_process = handlers->msg_process;
s->serv_fns.connection_closed = handlers->connection_closed;
s->serv_fns.connection_destroyed = handlers->connection_destroyed;
qb_list_init(&s->connections);
qb_list_init(&s->list);
qb_list_add(&s->list, &qb_ipc_services);
return s;
}
void
qb_ipcs_poll_handlers_set(struct qb_ipcs_service *s,
struct qb_ipcs_poll_handlers *handlers)
{
s->poll_fns.job_add = handlers->job_add;
s->poll_fns.dispatch_add = handlers->dispatch_add;
s->poll_fns.dispatch_mod = handlers->dispatch_mod;
s->poll_fns.dispatch_del = handlers->dispatch_del;
}
void
qb_ipcs_service_context_set(qb_ipcs_service_t* s,
void *context)
{
s->context = context;
}
void *
qb_ipcs_service_context_get(qb_ipcs_service_t* s)
{
return s->context;
}
int32_t
qb_ipcs_run(struct qb_ipcs_service *s)
{
int32_t res = 0;
if (s->poll_fns.dispatch_add == NULL ||
s->poll_fns.dispatch_mod == NULL ||
s->poll_fns.dispatch_del == NULL) {
res = -EINVAL;
goto run_cleanup;
}
switch (s->type) {
case QB_IPC_SOCKET:
qb_ipcs_us_init((struct qb_ipcs_service *)s);
break;
case QB_IPC_SHM:
#ifdef DISABLE_IPC_SHM
res = -ENOTSUP;
#else
qb_ipcs_shm_init((struct qb_ipcs_service *)s);
#endif /* DISABLE_IPC_SHM */
break;
case QB_IPC_POSIX_MQ:
case QB_IPC_SYSV_MQ:
res = -ENOTSUP;
break;
default:
res = -EINVAL;
break;
}
if (res == 0) {
res = qb_ipcs_us_publish(s);
if (res < 0) {
(void)qb_ipcs_us_withdraw(s);
goto run_cleanup;
}
}
run_cleanup:
if (res < 0) {
/* Failed to run services, removing initial alloc reference. */
qb_ipcs_unref(s);
}
return res;
}
static int32_t
_modify_dispatch_descriptor_(struct qb_ipcs_connection *c)
{
qb_ipcs_dispatch_mod_fn disp_mod = c->service->poll_fns.dispatch_mod;
if (c->service->type == QB_IPC_SOCKET) {
return disp_mod(c->service->poll_priority,
c->event.u.us.sock,
c->poll_events, c,
qb_ipcs_dispatch_connection_request);
} else {
return disp_mod(c->service->poll_priority,
c->setup.u.us.sock,
c->poll_events, c,
qb_ipcs_dispatch_connection_request);
}
return -EINVAL;
}
void
qb_ipcs_request_rate_limit(struct qb_ipcs_service *s,
enum qb_ipcs_rate_limit rl)
{
struct qb_ipcs_connection *c;
enum qb_loop_priority old_p = s->poll_priority;
struct qb_list_head *pos;
struct qb_list_head *n;
switch (rl) {
case QB_IPCS_RATE_FAST:
s->poll_priority = QB_LOOP_HIGH;
break;
case QB_IPCS_RATE_SLOW:
case QB_IPCS_RATE_OFF:
case QB_IPCS_RATE_OFF_2:
s->poll_priority = QB_LOOP_LOW;
break;
default:
case QB_IPCS_RATE_NORMAL:
s->poll_priority = QB_LOOP_MED;
break;
}
qb_list_for_each_safe(pos, n, &s->connections) {
c = qb_list_entry(pos, struct qb_ipcs_connection, list);
qb_ipcs_connection_ref(c);
if (rl == QB_IPCS_RATE_OFF) {
qb_ipcs_flowcontrol_set(c, 1);
} else if (rl == QB_IPCS_RATE_OFF_2) {
qb_ipcs_flowcontrol_set(c, 2);
} else {
qb_ipcs_flowcontrol_set(c, QB_FALSE);
}
if (old_p != s->poll_priority) {
(void)_modify_dispatch_descriptor_(c);
}
qb_ipcs_connection_unref(c);
}
}
void
qb_ipcs_ref(struct qb_ipcs_service *s)
{
qb_atomic_int_inc(&s->ref_count);
}
void
qb_ipcs_unref(struct qb_ipcs_service *s)
{
int32_t free_it;
assert(s->ref_count > 0);
free_it = qb_atomic_int_dec_and_test(&s->ref_count);
if (free_it) {
qb_util_log(LOG_DEBUG, "%s() - destroying", __func__);
free(s);
}
}
void
qb_ipcs_destroy(struct qb_ipcs_service *s)
{
struct qb_ipcs_connection *c = NULL;
struct qb_list_head *pos;
struct qb_list_head *n;
if (s == NULL) {
return;
}
qb_list_for_each_safe(pos, n, &s->connections) {
c = qb_list_entry(pos, struct qb_ipcs_connection, list);
if (c == NULL) {
continue;
}
qb_ipcs_disconnect(c);
}
(void)qb_ipcs_us_withdraw(s);
/* service destroyed, remove initial alloc ref */
qb_ipcs_unref(s);
}
/*
* connection API
*/
static struct qb_ipc_one_way *
_event_sock_one_way_get(struct qb_ipcs_connection * c)
{
if (c->service->needs_sock_for_poll) {
return &c->setup;
}
if (c->event.type == QB_IPC_SOCKET) {
return &c->event;
}
return NULL;
}
static struct qb_ipc_one_way *
_response_sock_one_way_get(struct qb_ipcs_connection * c)
{
if (c->service->needs_sock_for_poll) {
return &c->setup;
}
if (c->response.type == QB_IPC_SOCKET) {
return &c->response;
}
return NULL;
}
ssize_t
qb_ipcs_response_send(struct qb_ipcs_connection *c, const void *data,
size_t size)
{
ssize_t res;
if (c == NULL) {
return -EINVAL;
}
qb_ipcs_connection_ref(c);
res = c->service->funcs.send(&c->response, data, size);
if (res == size) {
c->stats.responses++;
} else if (res == -EAGAIN || res == -ETIMEDOUT) {
struct qb_ipc_one_way *ow = _response_sock_one_way_get(c);
if (ow) {
ssize_t res2 = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT);
if (res2 < 0) {
res = res2;
}
}
c->stats.send_retries++;
}
qb_ipcs_connection_unref(c);
return res;
}
ssize_t
qb_ipcs_response_sendv(struct qb_ipcs_connection * c, const struct iovec * iov,
size_t iov_len)
{
ssize_t res;
if (c == NULL) {
return -EINVAL;
}
qb_ipcs_connection_ref(c);
res = c->service->funcs.sendv(&c->response, iov, iov_len);
if (res > 0) {
c->stats.responses++;
} else if (res == -EAGAIN || res == -ETIMEDOUT) {
struct qb_ipc_one_way *ow = _response_sock_one_way_get(c);
if (ow) {
ssize_t res2 = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT);
if (res2 < 0) {
res = res2;
}
}
c->stats.send_retries++;
}
qb_ipcs_connection_unref(c);
return res;
}
static int32_t
resend_event_notifications(struct qb_ipcs_connection *c)
{
ssize_t res = 0;
if (!c->service->needs_sock_for_poll) {
return res;
}
if (c->outstanding_notifiers > 0) {
res = qb_ipc_us_send(&c->setup, c->receive_buf,
c->outstanding_notifiers);
}
if (res > 0) {
c->outstanding_notifiers -= res;
}
assert(c->outstanding_notifiers >= 0);
if (c->outstanding_notifiers == 0) {
c->poll_events = POLLIN | POLLPRI | POLLNVAL;
(void)_modify_dispatch_descriptor_(c);
}
return res;
}
static int32_t
new_event_notification(struct qb_ipcs_connection * c)
{
ssize_t res = 0;
if (!c->service->needs_sock_for_poll) {
return res;
}
assert(c->outstanding_notifiers >= 0);
if (c->outstanding_notifiers > 0) {
c->outstanding_notifiers++;
res = resend_event_notifications(c);
} else {
res = qb_ipc_us_send(&c->setup, &c->outstanding_notifiers, 1);
if (res == -EAGAIN) {
/*
* notify the client later, when we can.
*/
c->outstanding_notifiers++;
c->poll_events = POLLOUT | POLLIN | POLLPRI | POLLNVAL;
(void)_modify_dispatch_descriptor_(c);
}
}
return res;
}
ssize_t
qb_ipcs_event_send(struct qb_ipcs_connection * c, const void *data, size_t size)
{
ssize_t res;
ssize_t resn;
if (c == NULL) {
return -EINVAL;
} else if (size > c->event.max_msg_size) {
return -EMSGSIZE;
}
qb_ipcs_connection_ref(c);
res = c->service->funcs.send(&c->event, data, size);
if (res == size) {
c->stats.events++;
resn = new_event_notification(c);
if (resn < 0 && resn != -EAGAIN && resn != -ENOBUFS) {
errno = -resn;
qb_util_perror(LOG_WARNING,
"new_event_notification (%s)",
c->description);
res = resn;
}
} else if (res == -EAGAIN || res == -ETIMEDOUT) {
struct qb_ipc_one_way *ow = _event_sock_one_way_get(c);
if (c->outstanding_notifiers > 0) {
resn = resend_event_notifications(c);
}
if (ow) {
resn = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT);
if (resn < 0) {
res = resn;
}
}
c->stats.send_retries++;
}
qb_ipcs_connection_unref(c);
return res;
}
ssize_t
qb_ipcs_event_sendv(struct qb_ipcs_connection * c,
const struct iovec * iov, size_t iov_len)
{
ssize_t res;
ssize_t resn;
if (c == NULL) {
return -EINVAL;
}
qb_ipcs_connection_ref(c);
res = c->service->funcs.sendv(&c->event, iov, iov_len);
if (res > 0) {
c->stats.events++;
resn = new_event_notification(c);
if (resn < 0 && resn != -EAGAIN) {
errno = -resn;
qb_util_perror(LOG_WARNING,
"new_event_notification (%s)",
c->description);
res = resn;
}
} else if (res == -EAGAIN || res == -ETIMEDOUT) {
struct qb_ipc_one_way *ow = _event_sock_one_way_get(c);
if (c->outstanding_notifiers > 0) {
resn = resend_event_notifications(c);
}
if (ow) {
resn = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT);
if (resn < 0) {
res = resn;
}
}
c->stats.send_retries++;
}
qb_ipcs_connection_unref(c);
return res;
}
qb_ipcs_connection_t *
qb_ipcs_connection_first_get(struct qb_ipcs_service * s)
{
struct qb_ipcs_connection *c;
if (qb_list_empty(&s->connections)) {
return NULL;
}
c = qb_list_first_entry(&s->connections, struct qb_ipcs_connection,
list);
qb_ipcs_connection_ref(c);
return c;
}
qb_ipcs_connection_t *
qb_ipcs_connection_next_get(struct qb_ipcs_service * s,
struct qb_ipcs_connection * current)
{
struct qb_ipcs_connection *c;
if (current == NULL ||
qb_list_is_last(&current->list, &s->connections)) {
return NULL;
}
c = qb_list_first_entry(&current->list, struct qb_ipcs_connection,
list);
qb_ipcs_connection_ref(c);
return c;
}
int32_t
qb_ipcs_service_id_get(struct qb_ipcs_connection * c)
{
if (c == NULL) {
return -EINVAL;
}
return c->service->service_id;
}
struct qb_ipcs_connection *
qb_ipcs_connection_alloc(struct qb_ipcs_service *s)
{
struct qb_ipcs_connection *c =
calloc(1, sizeof(struct qb_ipcs_connection));
if (c == NULL) {
return NULL;
}
c->pid = 0;
c->euid = -1;
c->egid = -1;
c->receive_buf = NULL;
c->context = NULL;
c->fc_enabled = QB_FALSE;
c->state = QB_IPCS_CONNECTION_INACTIVE;
c->poll_events = POLLIN | POLLPRI | POLLNVAL;
c->setup.type = s->type;
c->request.type = s->type;
c->response.type = s->type;
c->event.type = s->type;
(void)strlcpy(c->description, "not set yet", CONNECTION_DESCRIPTION);
/* initial alloc ref */
qb_ipcs_connection_ref(c);
/*
* The connection makes use of the service object. Give the connection
* a reference to the service so we know the service can never be destroyed
* until the connection is done with it.
*/
qb_ipcs_ref(s);
c->service = s;
qb_list_init(&c->list);
return c;
}
void
qb_ipcs_connection_ref(struct qb_ipcs_connection *c)
{
if (c) {
qb_atomic_int_inc(&c->refcount);
}
}
void
qb_ipcs_connection_unref(struct qb_ipcs_connection *c)
{
int32_t free_it;
if (c == NULL) {
return;
}
if (c->refcount < 1) {
qb_util_log(LOG_ERR, "ref:%d state:%d (%s)",
c->refcount, c->state, c->description);
assert(0);
}
free_it = qb_atomic_int_dec_and_test(&c->refcount);
if (free_it) {
qb_list_del(&c->list);
if (c->service->serv_fns.connection_destroyed) {
c->service->serv_fns.connection_destroyed(c);
}
c->service->funcs.disconnect(c);
/* Let go of the connection's reference to the service */
qb_ipcs_unref(c->service);
free(c->receive_buf);
free(c);
}
}
void
qb_ipcs_disconnect(struct qb_ipcs_connection *c)
{
int32_t res = 0;
qb_loop_job_dispatch_fn rerun_job;
if (c == NULL) {
return;
}
qb_util_log(LOG_DEBUG, "%s(%s) state:%d",
__func__, c->description, c->state);
if (c->state == QB_IPCS_CONNECTION_ACTIVE) {
c->service->funcs.disconnect(c);
c->state = QB_IPCS_CONNECTION_INACTIVE;
c->service->stats.closed_connections++;
/* return early as it's an incomplete connection.
*/
return;
}
if (c->state == QB_IPCS_CONNECTION_ESTABLISHED) {
c->service->funcs.disconnect(c);
c->state = QB_IPCS_CONNECTION_SHUTTING_DOWN;
c->service->stats.active_connections--;
c->service->stats.closed_connections++;
}
if (c->state == QB_IPCS_CONNECTION_SHUTTING_DOWN) {
int scheduled_retry = 0;
res = 0;
if (c->service->serv_fns.connection_closed) {
res = c->service->serv_fns.connection_closed(c);
}
if (res != 0) {
/* OK, so they want the connection_closed
* function re-run */
rerun_job =
(qb_loop_job_dispatch_fn) qb_ipcs_disconnect;
res = c->service->poll_fns.job_add(QB_LOOP_LOW,
c, rerun_job);
if (res == 0) {
/* this function is going to be called again.
* so hold off on the unref */
scheduled_retry = 1;
}
}
if (scheduled_retry == 0) {
/* This removes the initial alloc ref */
qb_ipcs_connection_unref(c);
}
}
}
static void
qb_ipcs_flowcontrol_set(struct qb_ipcs_connection *c, int32_t fc_enable)
{
if (c == NULL) {
return;
}
if (c->fc_enabled != fc_enable) {
c->service->funcs.fc_set(&c->request, fc_enable);
c->fc_enabled = fc_enable;
c->stats.flow_control_state = fc_enable;
c->stats.flow_control_count++;
}
}
static int32_t
_process_request_(struct qb_ipcs_connection *c, int32_t ms_timeout)
{
int32_t res = 0;
ssize_t size;
struct qb_ipc_request_header *hdr;
qb_ipcs_connection_ref(c);
if (c->service->funcs.peek && c->service->funcs.reclaim) {
size = c->service->funcs.peek(&c->request, (void **)&hdr,
ms_timeout);
} else {
hdr = c->receive_buf;
size = c->service->funcs.recv(&c->request,
hdr,
c->request.max_msg_size,
ms_timeout);
}
if (size < 0) {
if (size != -EAGAIN && size != -ETIMEDOUT) {
qb_util_perror(LOG_DEBUG,
"recv from client connection failed (%s)",
c->description);
} else {
c->stats.recv_retries++;
}
res = size;
goto cleanup;
} else if (size == 0 || hdr->id == QB_IPC_MSG_DISCONNECT) {
qb_util_log(LOG_DEBUG, "client requesting a disconnect (%s)",
c->description);
res = -ESHUTDOWN;
goto cleanup;
} else {
c->stats.requests++;
res = c->service->serv_fns.msg_process(c, hdr, hdr->size);
/* 0 == good, negative == backoff */
if (res < 0) {
res = -ENOBUFS;
} else {
res = size;
}
}
if (c && c->service->funcs.peek && c->service->funcs.reclaim) {
c->service->funcs.reclaim(&c->request);
}
cleanup:
qb_ipcs_connection_unref(c);
return res;
}
#define IPC_REQUEST_TIMEOUT 10
#define MAX_RECV_MSGS 50
static ssize_t
_request_q_len_get(struct qb_ipcs_connection *c)
{
ssize_t q_len;
if (c->service->funcs.q_len_get) {
q_len = c->service->funcs.q_len_get(&c->request);
if (q_len <= 0) {
return q_len;
}
if (c->service->poll_priority == QB_LOOP_MED) {
q_len = QB_MIN(q_len, 5);
} else if (c->service->poll_priority == QB_LOOP_LOW) {
q_len = 1;
} else {
q_len = QB_MIN(q_len, MAX_RECV_MSGS);
}
} else {
q_len = 1;
}
return q_len;
}
int32_t
qb_ipcs_dispatch_connection_request(int32_t fd, int32_t revents, void *data)
{
struct qb_ipcs_connection *c = (struct qb_ipcs_connection *)data;
char bytes[MAX_RECV_MSGS];
int32_t res = 0;
int32_t res2;
int32_t recvd = 0;
ssize_t avail;
if (revents & POLLNVAL) {
qb_util_log(LOG_DEBUG, "NVAL conn (%s)", c->description);
res = -EINVAL;
goto dispatch_cleanup;
}
if (revents & POLLHUP) {
qb_util_log(LOG_DEBUG, "HUP conn (%s)", c->description);
res = -ESHUTDOWN;
goto dispatch_cleanup;
}
if (revents & POLLOUT) {
/* try resend events now that fd can write */
res = resend_event_notifications(c);
if (res < 0 && res != -EAGAIN) {
errno = -res;
qb_util_perror(LOG_WARNING,
"resend_event_notifications (%s)",
c->description);
}
/* nothing to read */
if ((revents & POLLIN) == 0) {
res = 0;
goto dispatch_cleanup;
}
}
if (c->fc_enabled) {
res = 0;
goto dispatch_cleanup;
}
avail = _request_q_len_get(c);
if (c->service->needs_sock_for_poll && avail == 0) {
res2 = qb_ipc_us_recv(&c->setup, bytes, 1, 0);
if (qb_ipc_us_sock_error_is_disconnected(res2)) {
errno = -res2;
qb_util_perror(LOG_WARNING, "conn (%s) disconnected",
c->description);
res = -ESHUTDOWN;
goto dispatch_cleanup;
} else {
qb_util_log(LOG_WARNING,
"conn (%s) Nothing in q but got POLLIN on fd:%d (res2:%d)",
c->description, fd, res2);
res = 0;
goto dispatch_cleanup;
}
}
do {
res = _process_request_(c, IPC_REQUEST_TIMEOUT);
if (res == -ESHUTDOWN) {
goto dispatch_cleanup;
}
if (res > 0 || res == -ENOBUFS || res == -EINVAL) {
recvd++;
}
if (res > 0) {
avail--;
}
} while (avail > 0 && res > 0 && !c->fc_enabled);
if (c->service->needs_sock_for_poll && recvd > 0) {
res2 = qb_ipc_us_recv(&c->setup, bytes, recvd, -1);
if (qb_ipc_us_sock_error_is_disconnected(res2)) {
errno = -res2;
qb_util_perror(LOG_ERR, "error receiving from setup sock (%s)", c->description);
res = -ESHUTDOWN;
goto dispatch_cleanup;
}
}
res = QB_MIN(0, res);
if (res == -EAGAIN || res == -ETIMEDOUT || res == -ENOBUFS) {
res = 0;
}
if (res != 0) {
if (res != -ENOTCONN) {
/*
* Abnormal state (ENOTCONN is normal shutdown).
*/
errno = -res;
qb_util_perror(LOG_ERR, "request returned error (%s)",
c->description);
}
}
dispatch_cleanup:
if (res != 0) {
qb_ipcs_disconnect(c);
}
return res;
}
void
qb_ipcs_context_set(struct qb_ipcs_connection *c, void *context)
{
if (c == NULL) {
return;
}
c->context = context;
}
void *
qb_ipcs_context_get(struct qb_ipcs_connection *c)
{
if (c == NULL) {
return NULL;
}
return c->context;
}
void *
qb_ipcs_connection_service_context_get(qb_ipcs_connection_t *c)
{
if (c == NULL || c->service == NULL) {
return NULL;
}
return c->service->context;
}
int32_t
qb_ipcs_connection_stats_get(qb_ipcs_connection_t * c,
struct qb_ipcs_connection_stats * stats,
int32_t clear_after_read)
{
if (c == NULL) {
return -EINVAL;
}
memcpy(stats, &c->stats, sizeof(struct qb_ipcs_connection_stats));
if (clear_after_read) {
memset(&c->stats, 0, sizeof(struct qb_ipcs_connection_stats_2));
c->stats.client_pid = c->pid;
}
return 0;
}
struct qb_ipcs_connection_stats_2*
qb_ipcs_connection_stats_get_2(qb_ipcs_connection_t *c,
int32_t clear_after_read)
{
struct qb_ipcs_connection_stats_2 * stats;
if (c == NULL) {
errno = EINVAL;
return NULL;
}
stats = calloc(1, sizeof(struct qb_ipcs_connection_stats_2));
if (stats == NULL) {
return NULL;
}
memcpy(stats, &c->stats, sizeof(struct qb_ipcs_connection_stats_2));
if (c->service->funcs.q_len_get) {
stats->event_q_length = c->service->funcs.q_len_get(&c->event);
} else {
stats->event_q_length = 0;
}
if (clear_after_read) {
memset(&c->stats, 0, sizeof(struct qb_ipcs_connection_stats_2));
c->stats.client_pid = c->pid;
}
return stats;
}
int32_t
qb_ipcs_stats_get(struct qb_ipcs_service * s,
struct qb_ipcs_stats * stats, int32_t clear_after_read)
{
if (s == NULL) {
return -EINVAL;
}
memcpy(stats, &s->stats, sizeof(struct qb_ipcs_stats));
if (clear_after_read) {
memset(&s->stats, 0, sizeof(struct qb_ipcs_stats));
}
return 0;
}
void
qb_ipcs_connection_auth_set(qb_ipcs_connection_t *c, uid_t uid,
gid_t gid, mode_t mode)
{
if (c) {
c->auth.uid = uid;
c->auth.gid = gid;
c->auth.mode = mode;
}
}
int32_t
qb_ipcs_connection_get_buffer_size(qb_ipcs_connection_t *c)
{
if (c == NULL) {
return -EINVAL;
}
/* request, response, and event shoud all have the same
* buffer size allocated. It doesn't matter which we return
* here. */
return c->response.max_msg_size;
}
+
+void qb_ipcs_enforce_buffer_size(qb_ipcs_service_t *s, uint32_t buf_size)
+{
+ if (s == NULL) {
+ return;
+ }
+ s->max_buffer_size = buf_size;
+}
diff --git a/tests/check_ipc.c b/tests/check_ipc.c
index 93d0a83..6a80fec 100644
--- a/tests/check_ipc.c
+++ b/tests/check_ipc.c
@@ -1,1390 +1,1415 @@
/*
* Copyright (c) 2010 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os_base.h"
#include <sys/wait.h>
#include <signal.h>
#include <check.h>
#include <qb/qbdefs.h>
#include <qb/qblog.h>
#include <qb/qbipcc.h>
#include <qb/qbipcs.h>
#include <qb/qbloop.h>
static const char *ipc_name = "ipc_test";
#define DEFAULT_MAX_MSG_SIZE (8192*16)
static int CALCULATED_DGRAM_MAX_MSG_SIZE = 0;
#define DGRAM_MAX_MSG_SIZE \
(CALCULATED_DGRAM_MAX_MSG_SIZE == 0 ? \
CALCULATED_DGRAM_MAX_MSG_SIZE = qb_ipcc_verify_dgram_max_msg_size(DEFAULT_MAX_MSG_SIZE) : \
CALCULATED_DGRAM_MAX_MSG_SIZE)
#define MAX_MSG_SIZE (ipc_type == QB_IPC_SOCKET ? DGRAM_MAX_MSG_SIZE : DEFAULT_MAX_MSG_SIZE)
/* The size the giant msg's data field needs to be to make
* this the largests msg we can successfully send. */
#define GIANT_MSG_DATA_SIZE MAX_MSG_SIZE - sizeof(struct qb_ipc_response_header) - 8
+static int enforce_server_buffer=0;
static qb_ipcc_connection_t *conn;
static enum qb_ipc_type ipc_type;
enum my_msg_ids {
IPC_MSG_REQ_TX_RX,
IPC_MSG_RES_TX_RX,
IPC_MSG_REQ_DISPATCH,
IPC_MSG_RES_DISPATCH,
IPC_MSG_REQ_BULK_EVENTS,
IPC_MSG_RES_BULK_EVENTS,
IPC_MSG_REQ_STRESS_EVENT,
IPC_MSG_RES_STRESS_EVENT,
IPC_MSG_REQ_SERVER_FAIL,
IPC_MSG_RES_SERVER_FAIL,
IPC_MSG_REQ_SERVER_DISCONNECT,
IPC_MSG_RES_SERVER_DISCONNECT,
};
/* Test Cases
*
* 1) basic send & recv differnet message sizes
*
* 2) send message to start dispatch (confirm receipt)
*
* 3) flow control
*
* 4) authentication
*
* 5) thread safety
*
* 6) cleanup
*
* 7) service availabilty
*
* 8) multiple services
*/
static qb_loop_t *my_loop;
static qb_ipcs_service_t* s1;
static int32_t turn_on_fc = QB_FALSE;
static int32_t fc_enabled = 89;
static int32_t send_event_on_created = QB_FALSE;
static int32_t disconnect_after_created = QB_FALSE;
static int32_t num_bulk_events = 10;
static int32_t num_stress_events = 30000;
static int32_t reference_count_test = QB_FALSE;
static int32_t
exit_handler(int32_t rsignal, void *data)
{
qb_log(LOG_DEBUG, "caught signal %d", rsignal);
qb_ipcs_destroy(s1);
return -1;
}
static int32_t
s1_msg_process_fn(qb_ipcs_connection_t *c,
void *data, size_t size)
{
struct qb_ipc_request_header *req_pt = (struct qb_ipc_request_header *)data;
struct qb_ipc_response_header response = { 0, };
ssize_t res;
if (req_pt->id == IPC_MSG_REQ_TX_RX) {
response.size = sizeof(struct qb_ipc_response_header);
response.id = IPC_MSG_RES_TX_RX;
response.error = 0;
res = qb_ipcs_response_send(c, &response, response.size);
if (res < 0) {
qb_perror(LOG_INFO, "qb_ipcs_response_send");
} else if (res != response.size) {
qb_log(LOG_DEBUG, "qb_ipcs_response_send %zd != %d",
res, response.size);
}
if (turn_on_fc) {
qb_ipcs_request_rate_limit(s1, QB_IPCS_RATE_OFF);
}
} else if (req_pt->id == IPC_MSG_REQ_DISPATCH) {
response.size = sizeof(struct qb_ipc_response_header);
response.id = IPC_MSG_RES_DISPATCH;
response.error = 0;
res = qb_ipcs_event_send(c, &response,
sizeof(response));
if (res < 0) {
qb_perror(LOG_INFO, "qb_ipcs_event_send");
}
} else if (req_pt->id == IPC_MSG_REQ_BULK_EVENTS) {
int32_t m;
int32_t num;
struct qb_ipcs_connection_stats_2 *stats;
response.size = sizeof(struct qb_ipc_response_header);
response.error = 0;
stats = qb_ipcs_connection_stats_get_2(c, QB_FALSE);
num = stats->event_q_length;
free(stats);
/* crazy large message */
res = qb_ipcs_event_send(c, &response,
MAX_MSG_SIZE*10);
ck_assert_int_eq(res, -EMSGSIZE);
/* send one event before responding */
res = qb_ipcs_event_send(c, &response, sizeof(response));
ck_assert_int_eq(res, sizeof(response));
response.id++;
/* send response */
response.id = IPC_MSG_RES_BULK_EVENTS;
res = qb_ipcs_response_send(c, &response, response.size);
ck_assert_int_eq(res, sizeof(response));
/* send the rest of the events after the response */
for (m = 1; m < num_bulk_events; m++) {
res = qb_ipcs_event_send(c, &response, sizeof(response));
if (res == -EAGAIN || res == -ENOBUFS) {
/* retry */
usleep(1000);
m--;
continue;
}
ck_assert_int_eq(res, sizeof(response));
response.id++;
}
stats = qb_ipcs_connection_stats_get_2(c, QB_FALSE);
ck_assert_int_eq(stats->event_q_length - num, num_bulk_events);
free(stats);
} else if (req_pt->id == IPC_MSG_REQ_STRESS_EVENT) {
struct {
struct qb_ipc_response_header hdr __attribute__ ((aligned(8)));
char data[GIANT_MSG_DATA_SIZE] __attribute__ ((aligned(8)));
uint32_t sent_msgs __attribute__ ((aligned(8)));
} __attribute__ ((aligned(8))) giant_event_send;
int32_t m;
response.size = sizeof(struct qb_ipc_response_header);
response.error = 0;
response.id = IPC_MSG_RES_STRESS_EVENT;
res = qb_ipcs_response_send(c, &response, response.size);
ck_assert_int_eq(res, sizeof(response));
giant_event_send.hdr.error = 0;
giant_event_send.hdr.id = IPC_MSG_RES_STRESS_EVENT;
for (m = 0; m < num_stress_events; m++) {
size_t sent_len = sizeof(struct qb_ipc_response_header);
if (((m+1) % 1000) == 0) {
sent_len = sizeof(giant_event_send);
giant_event_send.sent_msgs = m + 1;
}
giant_event_send.hdr.size = sent_len;
res = qb_ipcs_event_send(c, &giant_event_send, sent_len);
if (res < 0) {
if (res == -EAGAIN || res == -ENOBUFS) {
/* yield to the receive process */
usleep(1000);
m--;
continue;
} else {
qb_perror(LOG_DEBUG, "sending stress events");
ck_assert_int_eq(res, sent_len);
}
} else if (((m+1) % 1000) == 0) {
qb_log(LOG_DEBUG, "SENT: %d stress events sent", m+1);
}
giant_event_send.hdr.id++;
}
} else if (req_pt->id == IPC_MSG_REQ_SERVER_FAIL) {
exit(0);
} else if (req_pt->id == IPC_MSG_REQ_SERVER_DISCONNECT) {
qb_ipcs_disconnect(c);
}
return 0;
}
static int32_t
my_job_add(enum qb_loop_priority p,
void *data,
qb_loop_job_dispatch_fn fn)
{
return qb_loop_job_add(my_loop, p, data, fn);
}
static int32_t
my_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t events,
void *data, qb_ipcs_dispatch_fn_t fn)
{
return qb_loop_poll_add(my_loop, p, fd, events, data, fn);
}
static int32_t
my_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t events,
void *data, qb_ipcs_dispatch_fn_t fn)
{
return qb_loop_poll_mod(my_loop, p, fd, events, data, fn);
}
static int32_t
my_dispatch_del(int32_t fd)
{
return qb_loop_poll_del(my_loop, fd);
}
static int32_t
s1_connection_closed(qb_ipcs_connection_t *c)
{
qb_enter();
qb_leave();
return 0;
}
static void
outq_flush (void *data)
{
static int i = 0;
struct cs_ipcs_conn_context *cnx;
cnx = qb_ipcs_context_get(data);
qb_log(LOG_DEBUG,"iter %u\n", i);
i++;
if (i == 2) {
qb_ipcs_destroy(s1);
s1 = NULL;
}
/* is the reference counting is not working, this should fail
* for i > 1.
*/
qb_ipcs_event_send(data, "test", 4);
assert(memcmp(cnx, "test", 4) == 0);
if (i < 5) {
qb_loop_job_add(my_loop, QB_LOOP_HIGH, data, outq_flush);
} else {
/* this single unref should clean everything up.
*/
qb_ipcs_connection_unref(data);
qb_log(LOG_INFO, "end of test, stopping loop");
qb_loop_stop(my_loop);
}
}
static void
s1_connection_destroyed(qb_ipcs_connection_t *c)
{
qb_enter();
if (reference_count_test) {
struct cs_ipcs_conn_context *cnx;
cnx = qb_ipcs_context_get(c);
free(cnx);
} else {
qb_loop_stop(my_loop);
}
qb_leave();
}
static void
s1_connection_created(qb_ipcs_connection_t *c)
{
int32_t max = MAX_MSG_SIZE;
if (send_event_on_created) {
struct qb_ipc_response_header response;
int32_t res;
response.size = sizeof(struct qb_ipc_response_header);
response.id = IPC_MSG_RES_DISPATCH;
response.error = 0;
res = qb_ipcs_event_send(c, &response,
sizeof(response));
ck_assert_int_eq(res, response.size);
}
if (reference_count_test) {
struct cs_ipcs_conn_context *context;
qb_ipcs_connection_ref(c);
qb_loop_job_add(my_loop, QB_LOOP_HIGH, c, outq_flush);
context = calloc(1, 20);
memcpy(context, "test", 4);
qb_ipcs_context_set(c, context);
}
ck_assert_int_eq(max, qb_ipcs_connection_get_buffer_size(c));
}
static void
run_ipc_server(void)
{
int32_t res;
qb_loop_signal_handle handle;
struct qb_ipcs_service_handlers sh = {
.connection_accept = NULL,
.connection_created = s1_connection_created,
.msg_process = s1_msg_process_fn,
.connection_destroyed = s1_connection_destroyed,
.connection_closed = s1_connection_closed,
};
struct qb_ipcs_poll_handlers ph = {
.job_add = my_job_add,
.dispatch_add = my_dispatch_add,
.dispatch_mod = my_dispatch_mod,
.dispatch_del = my_dispatch_del,
};
qb_loop_signal_add(my_loop, QB_LOOP_HIGH, SIGSTOP,
NULL, exit_handler, &handle);
qb_loop_signal_add(my_loop, QB_LOOP_HIGH, SIGTERM,
NULL, exit_handler, &handle);
my_loop = qb_loop_create();
s1 = qb_ipcs_create(ipc_name, 4, ipc_type, &sh);
fail_if(s1 == 0);
+ if (enforce_server_buffer) {
+ qb_ipcs_enforce_buffer_size(s1, MAX_MSG_SIZE);
+ }
qb_ipcs_poll_handlers_set(s1, &ph);
res = qb_ipcs_run(s1);
ck_assert_int_eq(res, 0);
qb_loop_run(my_loop);
qb_log(LOG_DEBUG, "loop finished - done ...");
}
static int32_t
run_function_in_new_process(void (*run_ipc_server_fn)(void))
{
pid_t pid = fork ();
if (pid == -1) {
fprintf (stderr, "Can't fork\n");
return -1;
}
if (pid == 0) {
run_ipc_server_fn();
return 0;
}
return pid;
}
static int32_t
stop_process(pid_t pid)
{
/* wait a bit for the server to shutdown by it's self */
usleep(100000);
kill(pid, SIGTERM);
waitpid(pid, NULL, 0);
return 0;
}
struct my_req {
struct qb_ipc_request_header hdr;
char message[1024 * 1024];
};
static struct my_req request;
static int32_t
send_and_check(int32_t req_id, uint32_t size,
int32_t ms_timeout, int32_t expect_perfection)
{
struct qb_ipc_response_header res_header;
int32_t res;
int32_t try_times = 0;
request.hdr.id = req_id;
request.hdr.size = sizeof(struct qb_ipc_request_header) + size;
/* check that we can't send a message that is too big
* and we get the right return code.
*/
res = qb_ipcc_send(conn, &request, MAX_MSG_SIZE*2);
ck_assert_int_eq(res, -EMSGSIZE);
-
repeat_send:
res = qb_ipcc_send(conn, &request, request.hdr.size);
try_times++;
if (res < 0) {
if (res == -EAGAIN && try_times < 10) {
goto repeat_send;
} else {
if (res == -EAGAIN && try_times >= 10) {
fc_enabled = QB_TRUE;
}
errno = -res;
qb_perror(LOG_INFO, "qb_ipcc_send");
return res;
}
}
if (req_id == IPC_MSG_REQ_DISPATCH) {
res = qb_ipcc_event_recv(conn, &res_header,
sizeof(struct qb_ipc_response_header),
ms_timeout);
} else {
res = qb_ipcc_recv(conn, &res_header,
sizeof(struct qb_ipc_response_header),
ms_timeout);
}
if (res == -EINTR) {
return -1;
}
if (res == -EAGAIN || res == -ETIMEDOUT) {
fc_enabled = QB_TRUE;
qb_perror(LOG_DEBUG, "qb_ipcc_recv");
return res;
}
if (expect_perfection) {
ck_assert_int_eq(res, sizeof(struct qb_ipc_response_header));
ck_assert_int_eq(res_header.id, req_id + 1);
ck_assert_int_eq(res_header.size, sizeof(struct qb_ipc_response_header));
}
return res;
}
static int32_t recv_timeout = -1;
static void
test_ipc_txrx(void)
{
int32_t j;
int32_t c = 0;
size_t size;
pid_t pid;
pid = run_function_in_new_process(run_ipc_server);
fail_if(pid == -1);
sleep(1);
do {
conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE);
if (conn == NULL) {
j = waitpid(pid, NULL, WNOHANG);
ck_assert_int_eq(j, 0);
sleep(1);
c++;
}
} while (conn == NULL && c < 5);
fail_if(conn == NULL);
size = QB_MIN(sizeof(struct qb_ipc_request_header), 64);
for (j = 1; j < 19; j++) {
size *= 2;
if (size >= MAX_MSG_SIZE)
break;
if (send_and_check(IPC_MSG_REQ_TX_RX, size,
recv_timeout, QB_TRUE) < 0) {
break;
}
}
if (turn_on_fc) {
ck_assert_int_eq(fc_enabled, QB_TRUE);
}
qb_ipcc_disconnect(conn);
stop_process(pid);
}
static void
test_ipc_exit(void)
{
struct qb_ipc_request_header req_header;
struct qb_ipc_response_header res_header;
struct iovec iov[1];
int32_t res;
int32_t c = 0;
int32_t j = 0;
pid_t pid;
pid = run_function_in_new_process(run_ipc_server);
fail_if(pid == -1);
sleep(1);
do {
conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE);
if (conn == NULL) {
j = waitpid(pid, NULL, WNOHANG);
ck_assert_int_eq(j, 0);
sleep(1);
c++;
}
} while (conn == NULL && c < 5);
fail_if(conn == NULL);
req_header.id = IPC_MSG_REQ_TX_RX;
req_header.size = sizeof(struct qb_ipc_request_header);
iov[0].iov_len = req_header.size;
iov[0].iov_base = &req_header;
res = qb_ipcc_sendv_recv(conn, iov, 1,
&res_header,
sizeof(struct qb_ipc_response_header), -1);
ck_assert_int_eq(res, sizeof(struct qb_ipc_response_header));
/* kill the server */
stop_process(pid);
/*
* wait a bit for the server to die.
*/
sleep(1);
/*
* this needs to free up the shared mem
*/
qb_ipcc_disconnect(conn);
}
START_TEST(test_ipc_exit_us)
{
qb_enter();
ipc_type = QB_IPC_SOCKET;
ipc_name = __func__;
recv_timeout = 5000;
test_ipc_exit();
qb_leave();
}
END_TEST
START_TEST(test_ipc_exit_shm)
{
qb_enter();
ipc_type = QB_IPC_SHM;
ipc_name = __func__;
recv_timeout = 1000;
test_ipc_exit();
qb_leave();
}
END_TEST
START_TEST(test_ipc_txrx_shm_tmo)
{
qb_enter();
turn_on_fc = QB_FALSE;
ipc_type = QB_IPC_SHM;
ipc_name = __func__;
recv_timeout = 1000;
test_ipc_txrx();
qb_leave();
}
END_TEST
START_TEST(test_ipc_txrx_shm_block)
{
qb_enter();
turn_on_fc = QB_FALSE;
ipc_type = QB_IPC_SHM;
ipc_name = __func__;
recv_timeout = -1;
test_ipc_txrx();
qb_leave();
}
END_TEST
START_TEST(test_ipc_fc_shm)
{
qb_enter();
turn_on_fc = QB_TRUE;
ipc_type = QB_IPC_SHM;
recv_timeout = 500;
ipc_name = __func__;
test_ipc_txrx();
qb_leave();
}
END_TEST
START_TEST(test_ipc_txrx_us_block)
{
qb_enter();
turn_on_fc = QB_FALSE;
ipc_type = QB_IPC_SOCKET;
ipc_name = __func__;
recv_timeout = -1;
test_ipc_txrx();
qb_leave();
}
END_TEST
START_TEST(test_ipc_txrx_us_tmo)
{
qb_enter();
turn_on_fc = QB_FALSE;
ipc_type = QB_IPC_SOCKET;
ipc_name = __func__;
recv_timeout = 1000;
test_ipc_txrx();
qb_leave();
}
END_TEST
START_TEST(test_ipc_fc_us)
{
qb_enter();
turn_on_fc = QB_TRUE;
ipc_type = QB_IPC_SOCKET;
recv_timeout = 500;
ipc_name = __func__;
test_ipc_txrx();
qb_leave();
}
END_TEST
struct my_res {
struct qb_ipc_response_header hdr;
char message[1024 * 1024];
};
static void
test_ipc_dispatch(void)
{
int32_t j;
int32_t c = 0;
pid_t pid;
int32_t size;
pid = run_function_in_new_process(run_ipc_server);
fail_if(pid == -1);
sleep(1);
do {
conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE);
if (conn == NULL) {
j = waitpid(pid, NULL, WNOHANG);
ck_assert_int_eq(j, 0);
sleep(1);
c++;
}
} while (conn == NULL && c < 5);
fail_if(conn == NULL);
size = QB_MIN(sizeof(struct qb_ipc_request_header), 64);
for (j = 1; j < 19; j++) {
size *= 2;
if (size >= MAX_MSG_SIZE)
break;
if (send_and_check(IPC_MSG_REQ_DISPATCH, size,
recv_timeout, QB_TRUE) < 0) {
break;
}
}
qb_ipcc_disconnect(conn);
stop_process(pid);
}
START_TEST(test_ipc_disp_us)
{
qb_enter();
ipc_type = QB_IPC_SOCKET;
ipc_name = __func__;
test_ipc_dispatch();
qb_leave();
}
END_TEST
static int32_t events_received;
static int32_t
count_stress_events(int32_t fd, int32_t revents, void *data)
{
struct {
struct qb_ipc_response_header hdr __attribute__ ((aligned(8)));
char data[GIANT_MSG_DATA_SIZE] __attribute__ ((aligned(8)));
uint32_t sent_msgs __attribute__ ((aligned(8)));
} __attribute__ ((aligned(8))) giant_event_recv;
qb_loop_t *cl = (qb_loop_t*)data;
int32_t res;
res = qb_ipcc_event_recv(conn, &giant_event_recv,
sizeof(giant_event_recv),
-1);
if (res > 0) {
events_received++;
if ((events_received % 1000) == 0) {
qb_log(LOG_DEBUG, "RECV: %d stress events processed", events_received);
if (res != sizeof(giant_event_recv)) {
qb_log(LOG_DEBUG, "Unexpected recv size, expected %d got %d",
res, sizeof(giant_event_recv));
} else if (giant_event_recv.sent_msgs != events_received) {
qb_log(LOG_DEBUG, "Server event mismatch. Server thinks we got %d msgs, but we only received %d",
giant_event_recv.sent_msgs, events_received);
}
}
} else if (res != -EAGAIN) {
qb_perror(LOG_DEBUG, "count_stress_events");
qb_loop_stop(cl);
return -1;
}
if (events_received >= num_stress_events) {
qb_loop_stop(cl);
return -1;
}
return 0;
}
static int32_t
count_bulk_events(int32_t fd, int32_t revents, void *data)
{
qb_loop_t *cl = (qb_loop_t*)data;
struct qb_ipc_response_header res_header;
int32_t res;
res = qb_ipcc_event_recv(conn, &res_header,
sizeof(struct qb_ipc_response_header),
-1);
if (res > 0) {
events_received++;
}
if (events_received >= num_bulk_events) {
qb_loop_stop(cl);
return -1;
}
return 0;
}
static void
test_ipc_bulk_events(void)
{
struct qb_ipc_request_header req_header;
struct qb_ipc_response_header res_header;
struct iovec iov[1];
int32_t c = 0;
int32_t j = 0;
pid_t pid;
int32_t res;
qb_loop_t *cl;
int32_t fd;
pid = run_function_in_new_process(run_ipc_server);
fail_if(pid == -1);
sleep(1);
do {
conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE);
if (conn == NULL) {
j = waitpid(pid, NULL, WNOHANG);
ck_assert_int_eq(j, 0);
sleep(1);
c++;
}
} while (conn == NULL && c < 5);
fail_if(conn == NULL);
events_received = 0;
cl = qb_loop_create();
res = qb_ipcc_fd_get(conn, &fd);
ck_assert_int_eq(res, 0);
res = qb_loop_poll_add(cl, QB_LOOP_MED,
fd, POLLIN,
cl, count_bulk_events);
ck_assert_int_eq(res, 0);
res = send_and_check(IPC_MSG_REQ_BULK_EVENTS,
0,
recv_timeout, QB_TRUE);
ck_assert_int_eq(res, sizeof(struct qb_ipc_response_header));
qb_loop_run(cl);
ck_assert_int_eq(events_received, num_bulk_events);
req_header.id = IPC_MSG_REQ_SERVER_FAIL;
req_header.size = sizeof(struct qb_ipc_request_header);
iov[0].iov_len = req_header.size;
iov[0].iov_base = &req_header;
res = qb_ipcc_sendv_recv(conn, iov, 1,
&res_header,
sizeof(struct qb_ipc_response_header), -1);
if (res != -ECONNRESET && res != -ENOTCONN) {
qb_log(LOG_ERR, "id:%d size:%d", res_header.id, res_header.size);
ck_assert_int_eq(res, -ENOTCONN);
}
qb_ipcc_disconnect(conn);
stop_process(pid);
}
static void
test_ipc_stress_test(void)
{
- struct qb_ipc_request_header req_header;
+ struct {
+ struct qb_ipc_request_header hdr __attribute__ ((aligned(8)));
+ char data[GIANT_MSG_DATA_SIZE] __attribute__ ((aligned(8)));
+ uint32_t sent_msgs __attribute__ ((aligned(8)));
+ } __attribute__ ((aligned(8))) giant_req;
+
struct qb_ipc_response_header res_header;
struct iovec iov[1];
int32_t c = 0;
int32_t j = 0;
pid_t pid;
int32_t res;
qb_loop_t *cl;
int32_t fd;
-
+ /* This looks strange, but it serves an important purpose.
+ * This test forces the server to enforce the MAX_MSG_SIZE
+ * limit from the server side, which overrides the client's
+ * buffer limit. To verify this functionality is working
+ * we set the client limit lower than what the server
+ * is enforcing. */
+ int32_t client_buf_size = MAX_MSG_SIZE - 1024;
+ int32_t real_buf_size;
+
+ enforce_server_buffer = 1;
pid = run_function_in_new_process(run_ipc_server);
+ enforce_server_buffer = 0;
fail_if(pid == -1);
sleep(1);
do {
- conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE);
+ conn = qb_ipcc_connect(ipc_name, client_buf_size);
if (conn == NULL) {
j = waitpid(pid, NULL, WNOHANG);
ck_assert_int_eq(j, 0);
sleep(1);
c++;
}
} while (conn == NULL && c < 5);
fail_if(conn == NULL);
+ real_buf_size = qb_ipcc_get_buffer_size(conn);
+ ck_assert_int_eq(real_buf_size, MAX_MSG_SIZE);
+
qb_log(LOG_DEBUG, "Testing %d iterations of EVENT msg passing.", num_stress_events);
events_received = 0;
cl = qb_loop_create();
res = qb_ipcc_fd_get(conn, &fd);
ck_assert_int_eq(res, 0);
res = qb_loop_poll_add(cl, QB_LOOP_MED,
fd, POLLIN,
cl, count_stress_events);
ck_assert_int_eq(res, 0);
res = send_and_check(IPC_MSG_REQ_STRESS_EVENT, 0, recv_timeout, QB_TRUE);
qb_loop_run(cl);
ck_assert_int_eq(events_received, num_stress_events);
- req_header.id = IPC_MSG_REQ_SERVER_FAIL;
- req_header.size = sizeof(struct qb_ipc_request_header);
+ giant_req.hdr.id = IPC_MSG_REQ_SERVER_FAIL;
+ giant_req.hdr.size = sizeof(giant_req);
- iov[0].iov_len = req_header.size;
- iov[0].iov_base = &req_header;
+ if (giant_req.hdr.size <= client_buf_size) {
+ ck_assert_int_eq(1, 0);
+ }
+
+ iov[0].iov_len = giant_req.hdr.size;
+ iov[0].iov_base = &giant_req;
res = qb_ipcc_sendv_recv(conn, iov, 1,
&res_header,
sizeof(struct qb_ipc_response_header), -1);
if (res != -ECONNRESET && res != -ENOTCONN) {
qb_log(LOG_ERR, "id:%d size:%d", res_header.id, res_header.size);
ck_assert_int_eq(res, -ENOTCONN);
}
qb_ipcc_disconnect(conn);
stop_process(pid);
}
START_TEST(test_ipc_stress_test_us)
{
qb_enter();
send_event_on_created = QB_FALSE;
ipc_type = QB_IPC_SOCKET;
ipc_name = __func__;
test_ipc_stress_test();
qb_leave();
}
END_TEST
START_TEST(test_ipc_bulk_events_us)
{
qb_enter();
send_event_on_created = QB_FALSE;
ipc_type = QB_IPC_SOCKET;
ipc_name = __func__;
test_ipc_bulk_events();
qb_leave();
}
END_TEST
static void
test_ipc_event_on_created(void)
{
int32_t c = 0;
int32_t j = 0;
pid_t pid;
int32_t res;
qb_loop_t *cl;
int32_t fd;
num_bulk_events = 1;
pid = run_function_in_new_process(run_ipc_server);
fail_if(pid == -1);
sleep(1);
do {
conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE);
if (conn == NULL) {
j = waitpid(pid, NULL, WNOHANG);
ck_assert_int_eq(j, 0);
sleep(1);
c++;
}
} while (conn == NULL && c < 5);
fail_if(conn == NULL);
events_received = 0;
cl = qb_loop_create();
res = qb_ipcc_fd_get(conn, &fd);
ck_assert_int_eq(res, 0);
res = qb_loop_poll_add(cl, QB_LOOP_MED,
fd, POLLIN,
cl, count_bulk_events);
ck_assert_int_eq(res, 0);
qb_loop_run(cl);
ck_assert_int_eq(events_received, num_bulk_events);
qb_ipcc_disconnect(conn);
stop_process(pid);
}
START_TEST(test_ipc_event_on_created_us)
{
qb_enter();
send_event_on_created = QB_TRUE;
ipc_type = QB_IPC_SOCKET;
ipc_name = __func__;
test_ipc_event_on_created();
qb_leave();
}
END_TEST
static void
test_ipc_disconnect_after_created(void)
{
struct qb_ipc_request_header req_header;
struct qb_ipc_response_header res_header;
struct iovec iov[1];
int32_t c = 0;
int32_t j = 0;
pid_t pid;
int32_t res;
pid = run_function_in_new_process(run_ipc_server);
fail_if(pid == -1);
sleep(1);
do {
conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE);
if (conn == NULL) {
j = waitpid(pid, NULL, WNOHANG);
ck_assert_int_eq(j, 0);
sleep(1);
c++;
}
} while (conn == NULL && c < 5);
fail_if(conn == NULL);
ck_assert_int_eq(QB_TRUE, qb_ipcc_is_connected(conn));
req_header.id = IPC_MSG_REQ_SERVER_DISCONNECT;
req_header.size = sizeof(struct qb_ipc_request_header);
iov[0].iov_len = req_header.size;
iov[0].iov_base = &req_header;
res = qb_ipcc_sendv_recv(conn, iov, 1,
&res_header,
sizeof(struct qb_ipc_response_header), -1);
/*
* confirm we get -ENOTCONN or -ECONNRESET
*/
if (res != -ECONNRESET && res != -ENOTCONN) {
qb_log(LOG_ERR, "id:%d size:%d", res_header.id, res_header.size);
ck_assert_int_eq(res, -ENOTCONN);
}
ck_assert_int_eq(QB_FALSE, qb_ipcc_is_connected(conn));
qb_ipcc_disconnect(conn);
stop_process(pid);
}
START_TEST(test_ipc_disconnect_after_created_us)
{
qb_enter();
disconnect_after_created = QB_TRUE;
ipc_type = QB_IPC_SOCKET;
ipc_name = __func__;
test_ipc_disconnect_after_created();
qb_leave();
}
END_TEST
static void
test_ipc_server_fail(void)
{
struct qb_ipc_request_header req_header;
struct qb_ipc_response_header res_header;
struct iovec iov[1];
int32_t res;
int32_t j;
int32_t c = 0;
pid_t pid;
pid = run_function_in_new_process(run_ipc_server);
fail_if(pid == -1);
sleep(1);
do {
conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE);
if (conn == NULL) {
j = waitpid(pid, NULL, WNOHANG);
ck_assert_int_eq(j, 0);
sleep(1);
c++;
}
} while (conn == NULL && c < 5);
fail_if(conn == NULL);
/*
* tell the server to exit
*/
req_header.id = IPC_MSG_REQ_SERVER_FAIL;
req_header.size = sizeof(struct qb_ipc_request_header);
iov[0].iov_len = req_header.size;
iov[0].iov_base = &req_header;
ck_assert_int_eq(QB_TRUE, qb_ipcc_is_connected(conn));
res = qb_ipcc_sendv_recv(conn, iov, 1,
&res_header,
sizeof(struct qb_ipc_response_header), -1);
/*
* confirm we get -ENOTCONN or ECONNRESET
*/
if (res != -ECONNRESET && res != -ENOTCONN) {
qb_log(LOG_ERR, "id:%d size:%d", res_header.id, res_header.size);
ck_assert_int_eq(res, -ENOTCONN);
}
ck_assert_int_eq(QB_FALSE, qb_ipcc_is_connected(conn));
qb_ipcc_disconnect(conn);
stop_process(pid);
}
START_TEST(test_ipc_server_fail_soc)
{
qb_enter();
ipc_type = QB_IPC_SOCKET;
ipc_name = __func__;
test_ipc_server_fail();
qb_leave();
}
END_TEST
START_TEST(test_ipc_disp_shm)
{
qb_enter();
ipc_type = QB_IPC_SHM;
ipc_name = __func__;
test_ipc_dispatch();
qb_leave();
}
END_TEST
START_TEST(test_ipc_stress_test_shm)
{
qb_enter();
send_event_on_created = QB_FALSE;
ipc_type = QB_IPC_SHM;
ipc_name = __func__;
test_ipc_stress_test();
qb_leave();
}
END_TEST
START_TEST(test_ipc_bulk_events_shm)
{
qb_enter();
ipc_type = QB_IPC_SHM;
ipc_name = __func__;
test_ipc_bulk_events();
qb_leave();
}
END_TEST
START_TEST(test_ipc_event_on_created_shm)
{
qb_enter();
send_event_on_created = QB_TRUE;
ipc_type = QB_IPC_SHM;
ipc_name = __func__;
test_ipc_event_on_created();
qb_leave();
}
END_TEST
START_TEST(test_ipc_server_fail_shm)
{
qb_enter();
ipc_type = QB_IPC_SHM;
ipc_name = __func__;
test_ipc_server_fail();
qb_leave();
}
END_TEST
static void
test_ipc_service_ref_count(void)
{
int32_t c = 0;
int32_t j = 0;
pid_t pid;
reference_count_test = QB_TRUE;
pid = run_function_in_new_process(run_ipc_server);
fail_if(pid == -1);
sleep(1);
do {
conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE);
if (conn == NULL) {
j = waitpid(pid, NULL, WNOHANG);
ck_assert_int_eq(j, 0);
sleep(1);
c++;
}
} while (conn == NULL && c < 5);
fail_if(conn == NULL);
sleep(5);
qb_ipcc_disconnect(conn);
stop_process(pid);
}
START_TEST(test_ipc_service_ref_count_shm)
{
qb_enter();
ipc_type = QB_IPC_SHM;
ipc_name = __func__;
test_ipc_service_ref_count();
qb_leave();
}
END_TEST
START_TEST(test_ipc_service_ref_count_us)
{
qb_enter();
ipc_type = QB_IPC_SOCKET;
ipc_name = __func__;
test_ipc_service_ref_count();
qb_leave();
}
END_TEST
static void test_max_dgram_size(void)
{
/* most implementations will not let you set a dgram buffer
* of 1 million bytes. This test verifies that the we can detect
* the max dgram buffersize regardless, and that the value we detect
* is consistent. */
int32_t init;
int32_t i;
qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_REMOVE,
QB_LOG_FILTER_FILE, "*", LOG_TRACE);
init = qb_ipcc_verify_dgram_max_msg_size(1000000);
fail_if(init <= 0);
for (i = 0; i < 100; i++) {
int try = qb_ipcc_verify_dgram_max_msg_size(1000000);
ck_assert_int_eq(init, try);
}
qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD,
QB_LOG_FILTER_FILE, "*", LOG_TRACE);
}
START_TEST(test_ipc_max_dgram_size)
{
qb_enter();
test_max_dgram_size();
qb_leave();
}
END_TEST
static Suite *
make_shm_suite(void)
{
TCase *tc;
Suite *s = suite_create("shm");
tc = tcase_create("ipc_server_fail_shm");
tcase_add_test(tc, test_ipc_server_fail_shm);
tcase_set_timeout(tc, 6);
suite_add_tcase(s, tc);
tc = tcase_create("ipc_txrx_shm_block");
tcase_add_test(tc, test_ipc_txrx_shm_block);
tcase_set_timeout(tc, 6);
suite_add_tcase(s, tc);
tc = tcase_create("ipc_txrx_shm_tmo");
tcase_add_test(tc, test_ipc_txrx_shm_tmo);
tcase_set_timeout(tc, 6);
suite_add_tcase(s, tc);
tc = tcase_create("ipc_fc_shm");
tcase_add_test(tc, test_ipc_fc_shm);
tcase_set_timeout(tc, 6);
suite_add_tcase(s, tc);
tc = tcase_create("ipc_dispatch_shm");
tcase_add_test(tc, test_ipc_disp_shm);
tcase_set_timeout(tc, 16);
suite_add_tcase(s, tc);
tc = tcase_create("ipc_stress_test_shm");
tcase_add_test(tc, test_ipc_stress_test_shm);
tcase_set_timeout(tc, 16);
suite_add_tcase(s, tc);
tc = tcase_create("ipc_bulk_events_shm");
tcase_add_test(tc, test_ipc_bulk_events_shm);
tcase_set_timeout(tc, 16);
suite_add_tcase(s, tc);
tc = tcase_create("ipc_exit_shm");
tcase_add_test(tc, test_ipc_exit_shm);
tcase_set_timeout(tc, 3);
suite_add_tcase(s, tc);
tc = tcase_create("ipc_event_on_created_shm");
tcase_add_test(tc, test_ipc_event_on_created_shm);
suite_add_tcase(s, tc);
tc = tcase_create("ipc_service_ref_count_shm");
tcase_add_test(tc, test_ipc_service_ref_count_shm);
tcase_set_timeout(tc, 10);
suite_add_tcase(s, tc);
return s;
}
static Suite *
make_soc_suite(void)
{
Suite *s = suite_create("socket");
TCase *tc;
tc = tcase_create("ipc_max_dgram_size");
tcase_add_test(tc, test_ipc_max_dgram_size);
tcase_set_timeout(tc, 30);
suite_add_tcase(s, tc);
tc = tcase_create("ipc_server_fail_soc");
tcase_add_test(tc, test_ipc_server_fail_soc);
tcase_set_timeout(tc, 6);
suite_add_tcase(s, tc);
tc = tcase_create("ipc_txrx_us_block");
tcase_add_test(tc, test_ipc_txrx_us_block);
tcase_set_timeout(tc, 6);
suite_add_tcase(s, tc);
tc = tcase_create("ipc_txrx_us_tmo");
tcase_add_test(tc, test_ipc_txrx_us_tmo);
tcase_set_timeout(tc, 6);
suite_add_tcase(s, tc);
tc = tcase_create("ipc_fc_us");
tcase_add_test(tc, test_ipc_fc_us);
tcase_set_timeout(tc, 6);
suite_add_tcase(s, tc);
tc = tcase_create("ipc_exit_us");
tcase_add_test(tc, test_ipc_exit_us);
tcase_set_timeout(tc, 6);
suite_add_tcase(s, tc);
tc = tcase_create("ipc_dispatch_us");
tcase_add_test(tc, test_ipc_disp_us);
tcase_set_timeout(tc, 16);
suite_add_tcase(s, tc);
tc = tcase_create("ipc_stress_test_us");
tcase_add_test(tc, test_ipc_stress_test_us);
tcase_set_timeout(tc, 60);
suite_add_tcase(s, tc);
tc = tcase_create("ipc_bulk_events_us");
tcase_add_test(tc, test_ipc_bulk_events_us);
tcase_set_timeout(tc, 16);
suite_add_tcase(s, tc);
tc = tcase_create("ipc_event_on_created_us");
tcase_add_test(tc, test_ipc_event_on_created_us);
suite_add_tcase(s, tc);
tc = tcase_create("ipc_disconnect_after_created_us");
tcase_add_test(tc, test_ipc_disconnect_after_created_us);
suite_add_tcase(s, tc);
tc = tcase_create("ipc_service_ref_count_us");
tcase_add_test(tc, test_ipc_service_ref_count_us);
tcase_set_timeout(tc, 10);
suite_add_tcase(s, tc);
return s;
}
int32_t
main(void)
{
int32_t number_failed;
SRunner *sr;
Suite *s;
int32_t do_shm_tests = QB_TRUE;
#ifdef DISABLE_IPC_SHM
do_shm_tests = QB_FALSE;
#endif /* DISABLE_IPC_SHM */
s = make_soc_suite();
sr = srunner_create(s);
if (do_shm_tests) {
srunner_add_suite(sr, make_shm_suite());
}
qb_log_init("check", LOG_USER, LOG_EMERG);
qb_log_ctl(QB_LOG_SYSLOG, QB_LOG_CONF_ENABLED, QB_FALSE);
qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD,
QB_LOG_FILTER_FILE, "*", LOG_TRACE);
qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE);
qb_log_format_set(QB_LOG_STDERR, "lib/%f|%l| %b");
srunner_run_all(sr, CK_VERBOSE);
number_failed = srunner_ntests_failed(sr);
srunner_free(sr);
return (number_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE;
}

File Metadata

Mime Type
text/x-diff
Expires
Thu, Jul 10, 1:27 AM (13 h, 56 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2009484
Default Alt Text
(109 KB)

Event Timeline