diff --git a/include/qb/qbipcs.h b/include/qb/qbipcs.h index baf16e5..2adcd17 100644 --- a/include/qb/qbipcs.h +++ b/include/qb/qbipcs.h @@ -1,433 +1,453 @@ /* * Copyright (C) 2006-2009 Red Hat, Inc. * * Author: Steven Dake , * Angus Salkeld * * This file is part of libqb. * * libqb is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation, either version 2.1 of the License, or * (at your option) any later version. * * libqb is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with libqb. If not, see . */ #ifndef QB_IPCS_H_DEFINED #define QB_IPCS_H_DEFINED /* *INDENT-OFF* */ #ifdef __cplusplus extern "C" { #endif /* *INDENT-ON* */ #include #include #include #include #include /** * @file qbipcs.h * * Server IPC API. * * @example ipcserver.c */ enum qb_ipcs_rate_limit { QB_IPCS_RATE_FAST, QB_IPCS_RATE_NORMAL, QB_IPCS_RATE_SLOW, QB_IPCS_RATE_OFF, QB_IPCS_RATE_OFF_2, }; struct qb_ipcs_connection; typedef struct qb_ipcs_connection qb_ipcs_connection_t; struct qb_ipcs_service; typedef struct qb_ipcs_service qb_ipcs_service_t; struct qb_ipcs_stats { uint32_t active_connections; uint32_t closed_connections; }; struct qb_ipcs_connection_stats { int32_t client_pid; uint64_t requests; uint64_t responses; uint64_t events; uint64_t send_retries; uint64_t recv_retries; int32_t flow_control_state; uint64_t flow_control_count; }; struct qb_ipcs_connection_stats_2 { int32_t client_pid; uint64_t requests; uint64_t responses; uint64_t events; uint64_t send_retries; uint64_t recv_retries; int32_t flow_control_state; uint64_t flow_control_count; uint32_t event_q_length; }; typedef int32_t (*qb_ipcs_dispatch_fn_t) (int32_t fd, int32_t revents, void *data); typedef int32_t (*qb_ipcs_dispatch_add_fn)(enum qb_loop_priority p, int32_t fd, int32_t events, void *data, qb_ipcs_dispatch_fn_t fn); typedef int32_t (*qb_ipcs_dispatch_mod_fn)(enum qb_loop_priority p, int32_t fd, int32_t events, void *data, qb_ipcs_dispatch_fn_t fn); typedef int32_t (*qb_ipcs_dispatch_del_fn)(int32_t fd); typedef int32_t (*qb_ipcs_job_add_fn)(enum qb_loop_priority p, void *data, qb_loop_job_dispatch_fn dispatch_fn); struct qb_ipcs_poll_handlers { qb_ipcs_job_add_fn job_add; qb_ipcs_dispatch_add_fn dispatch_add; qb_ipcs_dispatch_mod_fn dispatch_mod; qb_ipcs_dispatch_del_fn dispatch_del; }; /** * This callback is to check wether you want to accept a new connection. * * The type of checks you should do are authentication, service availabilty * or process resource constraints. * @return 0 to accept or -errno to indicate a failure (sent back to the client) * * @note you can call qb_ipcs_connection_auth_set() within this function. */ typedef int32_t (*qb_ipcs_connection_accept_fn) (qb_ipcs_connection_t *c, uid_t uid, gid_t gid); /** * This is called after a new connection has been created. */ typedef void (*qb_ipcs_connection_created_fn) (qb_ipcs_connection_t *c); /** * This is called after a connection has been disconnected. * * @note if you return anything but 0 this function will be * repeativily called (until 0 is returned). */ typedef int32_t (*qb_ipcs_connection_closed_fn) (qb_ipcs_connection_t *c); /** * This is called just before a connection is freed. */ typedef void (*qb_ipcs_connection_destroyed_fn) (qb_ipcs_connection_t *c); /** * This is the message processing calback. * It is called with the message data. */ typedef int32_t (*qb_ipcs_msg_process_fn) (qb_ipcs_connection_t *c, void *data, size_t size); struct qb_ipcs_service_handlers { qb_ipcs_connection_accept_fn connection_accept; qb_ipcs_connection_created_fn connection_created; qb_ipcs_msg_process_fn msg_process; qb_ipcs_connection_closed_fn connection_closed; qb_ipcs_connection_destroyed_fn connection_destroyed; }; /** * Create a new IPC server. * * @param name for clients to connect to. * @param service_id an integer to associate with the service * @param type transport type. * @param handlers callbacks. * @return the new service instance. */ qb_ipcs_service_t* qb_ipcs_create(const char *name, int32_t service_id, enum qb_ipc_type type, struct qb_ipcs_service_handlers *handlers); /** * Increase the reference counter on the service object. * * @param s service instance */ void qb_ipcs_ref(qb_ipcs_service_t *s); /** * Decrease the reference counter on the service object. * * @param s service instance */ void qb_ipcs_unref(qb_ipcs_service_t *s); /** * Set your poll callbacks. * * @param s service instance * @param handlers the handlers that you want ipcs to use. */ void qb_ipcs_poll_handlers_set(qb_ipcs_service_t* s, struct qb_ipcs_poll_handlers *handlers); /** * Associate a "user" pointer with this service. * * @param s service instance * @param context the pointer to associate with this service. * @see qb_ipcs_service_context_get() */ void qb_ipcs_service_context_set(qb_ipcs_service_t* s, void *context); /** * Get the context (set previously) * * @param s service instance * @return the context * @see qb_ipcs_service_context_set() */ void *qb_ipcs_service_context_get(qb_ipcs_service_t* s); /** * run the new IPC server. * @param s service instance * @return 0 == ok; -errno to indicate a failure. Service is destroyed on failure. */ int32_t qb_ipcs_run(qb_ipcs_service_t* s); /** * Destroy the IPC server. * * @param s service instance to destroy */ void qb_ipcs_destroy(qb_ipcs_service_t* s); /** * Limit the incomming request rate. * @param s service instance * @param rl the new rate */ void qb_ipcs_request_rate_limit(qb_ipcs_service_t* s, enum qb_ipcs_rate_limit rl); /** * Send a response to a incomming request. * * @param c connection instance * @param data the message to send * @param size the size of the message * @return size sent or -errno for errors * * @note the data must include a qb_ipc_response_header at * the top of the message. The client will read the size field * to determine how much to recv. */ ssize_t qb_ipcs_response_send(qb_ipcs_connection_t *c, const void *data, size_t size); /** * Send a response to a incomming request. * * @param c connection instance * @param iov the iovec struct that points to the message to send * @param iov_len the number of iovecs. * @return size sent or -errno for errors * * @note the iov[0] must be a qb_ipc_response_header. The client will * read the size field to determine how much to recv. + * + * @note When send returns -EMSGSIZE, this means the msg is too + * large and will never succeed. To determine the max msg size + * a client can be sent, use qb_ipcs_connection_get_buffer_size() */ ssize_t qb_ipcs_response_sendv(qb_ipcs_connection_t *c, const struct iovec * iov, size_t iov_len); /** * Send an asyncronous event message to the client. * * @param c connection instance * @param data the message to send * @param size the size of the message * @return size sent or -errno for errors * * @note the data must include a qb_ipc_response_header at * the top of the message. The client will read the size field * to determine how much to recv. + * + * @note When send returns -EMSGSIZE, this means the msg is too + * large and will never succeed. To determine the max msg size + * a client can be sent, use qb_ipcs_connection_get_buffer_size() */ ssize_t qb_ipcs_event_send(qb_ipcs_connection_t *c, const void *data, size_t size); /** * Send an asyncronous event message to the client. * * @param c connection instance * @param iov the iovec struct that points to the message to send * @param iov_len the number of iovecs. * @return size sent or -errno for errors * * @note the iov[0] must be a qb_ipc_response_header. The client will * read the size field to determine how much to recv. + * + * @note When send returns -EMSGSIZE, this means the msg is too + * large and will never succeed. To determine the max msg size + * a client can be sent, use qb_ipcs_connection_get_buffer_size() */ ssize_t qb_ipcs_event_sendv(qb_ipcs_connection_t *c, const struct iovec * iov, size_t iov_len); /** * Increment the connection's reference counter. * * @param c connection instance */ void qb_ipcs_connection_ref(qb_ipcs_connection_t *c); /** * Decrement the connection's reference counter. * * @param c connection instance */ void qb_ipcs_connection_unref(qb_ipcs_connection_t *c); /** * Disconnect from this client. * * @param c connection instance */ void qb_ipcs_disconnect(qb_ipcs_connection_t *c); /** * Get the service id related to this connection's service. * (as passed into qb_ipcs_create() * * @return service id. */ int32_t qb_ipcs_service_id_get(qb_ipcs_connection_t *c); /** * Associate a "user" pointer with this connection. * * @param context the point to associate with this connection. * @param c connection instance * @see qb_ipcs_context_get() */ void qb_ipcs_context_set(qb_ipcs_connection_t *c, void *context); /** * Get the context (set previously) * * @param c connection instance * @return the context * @see qb_ipcs_context_set() */ void *qb_ipcs_context_get(qb_ipcs_connection_t *c); /** * Get the context previously set on the service backing this connection * * @param c connection instance * @return the context * @see qb_ipcs_service_context_set */ void *qb_ipcs_connection_service_context_get(qb_ipcs_connection_t *c); /** * Get the connection statistics. * * @deprecated from v0.13.0 onwards, use qb_ipcs_connection_stats_get_2 * @param stats (out) the statistics structure * @param clear_after_read clear stats after copying them into stats * @param c connection instance * @return 0 == ok; -errno to indicate a failure */ int32_t qb_ipcs_connection_stats_get(qb_ipcs_connection_t *c, struct qb_ipcs_connection_stats* stats, int32_t clear_after_read); /** * Get (and allocate) the connection statistics. * * @param clear_after_read clear stats after copying them into stats * @param c connection instance * @retval NULL if no memory or invalid connection * @retval allocated statistics structure (user must free it). */ struct qb_ipcs_connection_stats_2* qb_ipcs_connection_stats_get_2(qb_ipcs_connection_t *c, int32_t clear_after_read); /** * Get the service statistics. * * @param stats (out) the statistics structure * @param clear_after_read clear stats after copying them into stats * @param pt service instance * @return 0 == ok; -errno to indicate a failure */ int32_t qb_ipcs_stats_get(qb_ipcs_service_t* pt, struct qb_ipcs_stats* stats, int32_t clear_after_read); /** * Get the first connection. * * @note call qb_ipcs_connection_unref() after using the connection. * * @param pt service instance * @return first connection */ qb_ipcs_connection_t * qb_ipcs_connection_first_get(qb_ipcs_service_t* pt); /** * Get the next connection. * * @note call qb_ipcs_connection_unref() after using the connection. * * @param pt service instance * @param current current connection * @return next connection */ qb_ipcs_connection_t * qb_ipcs_connection_next_get(qb_ipcs_service_t* pt, qb_ipcs_connection_t *current); /** * Set the permissions on and shared memory files so that both processes can * read and write to them. * * @param conn connection instance * @param uid the user id to set. * @param gid the group id to set. * @param mode the mode to set. * * @see chmod() chown() * @note this must be called within the qb_ipcs_connection_accept_fn() * callback. */ void qb_ipcs_connection_auth_set(qb_ipcs_connection_t *conn, uid_t uid, gid_t gid, mode_t mode); +/** + * Retrieve the connection ipc buffer size. This reflects the + * largest size msg that can be sent or received. + * + * @param conn connection instance + * @return msg size in bytes, negative value on error. + */ +int32_t qb_ipcs_connection_get_buffer_size(qb_ipcs_connection_t *conn); /* *INDENT-OFF* */ #ifdef __cplusplus } #endif /* *INDENT-ON* */ #endif /* QB_IPCS_H_DEFINED */ diff --git a/lib/ipcs.c b/lib/ipcs.c index d3baaab..5a54983 100644 --- a/lib/ipcs.c +++ b/lib/ipcs.c @@ -1,943 +1,956 @@ /* * Copyright (C) 2010 Red Hat, Inc. * * Author: Angus Salkeld * * This file is part of libqb. * * libqb is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation, either version 2.1 of the License, or * (at your option) any later version. * * libqb is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with libqb. If not, see . */ #include "os_base.h" #include "util_int.h" #include "ipc_int.h" #include #include #include static void qb_ipcs_flowcontrol_set(struct qb_ipcs_connection *c, int32_t fc_enable); static int32_t new_event_notification(struct qb_ipcs_connection * c); static QB_LIST_DECLARE(qb_ipc_services); qb_ipcs_service_t * qb_ipcs_create(const char *name, int32_t service_id, enum qb_ipc_type type, struct qb_ipcs_service_handlers *handlers) { struct qb_ipcs_service *s; s = calloc(1, sizeof(struct qb_ipcs_service)); if (s == NULL) { return NULL; } if (type == QB_IPC_NATIVE) { #ifdef DISABLE_IPC_SHM s->type = QB_IPC_SOCKET; #else s->type = QB_IPC_SHM; #endif /* DISABLE_IPC_SHM */ } else { s->type = type; } s->pid = getpid(); s->needs_sock_for_poll = QB_FALSE; s->poll_priority = QB_LOOP_MED; /* Initial alloc ref */ qb_ipcs_ref(s); s->service_id = service_id; (void)strlcpy(s->name, name, NAME_MAX); s->serv_fns.connection_accept = handlers->connection_accept; s->serv_fns.connection_created = handlers->connection_created; s->serv_fns.msg_process = handlers->msg_process; s->serv_fns.connection_closed = handlers->connection_closed; s->serv_fns.connection_destroyed = handlers->connection_destroyed; qb_list_init(&s->connections); qb_list_init(&s->list); qb_list_add(&s->list, &qb_ipc_services); return s; } void qb_ipcs_poll_handlers_set(struct qb_ipcs_service *s, struct qb_ipcs_poll_handlers *handlers) { s->poll_fns.job_add = handlers->job_add; s->poll_fns.dispatch_add = handlers->dispatch_add; s->poll_fns.dispatch_mod = handlers->dispatch_mod; s->poll_fns.dispatch_del = handlers->dispatch_del; } void qb_ipcs_service_context_set(qb_ipcs_service_t* s, void *context) { s->context = context; } void * qb_ipcs_service_context_get(qb_ipcs_service_t* s) { return s->context; } int32_t qb_ipcs_run(struct qb_ipcs_service *s) { int32_t res = 0; if (s->poll_fns.dispatch_add == NULL || s->poll_fns.dispatch_mod == NULL || s->poll_fns.dispatch_del == NULL) { res = -EINVAL; goto run_cleanup; } switch (s->type) { case QB_IPC_SOCKET: qb_ipcs_us_init((struct qb_ipcs_service *)s); break; case QB_IPC_SHM: #ifdef DISABLE_IPC_SHM res = -ENOTSUP; #else qb_ipcs_shm_init((struct qb_ipcs_service *)s); #endif /* DISABLE_IPC_SHM */ break; case QB_IPC_POSIX_MQ: case QB_IPC_SYSV_MQ: res = -ENOTSUP; break; default: res = -EINVAL; break; } if (res == 0) { res = qb_ipcs_us_publish(s); if (res < 0) { (void)qb_ipcs_us_withdraw(s); goto run_cleanup; } } run_cleanup: if (res < 0) { /* Failed to run services, removing initial alloc reference. */ qb_ipcs_unref(s); } return res; } static int32_t _modify_dispatch_descriptor_(struct qb_ipcs_connection *c) { qb_ipcs_dispatch_mod_fn disp_mod = c->service->poll_fns.dispatch_mod; if (c->service->type == QB_IPC_SOCKET) { return disp_mod(c->service->poll_priority, c->event.u.us.sock, c->poll_events, c, qb_ipcs_dispatch_connection_request); } else { return disp_mod(c->service->poll_priority, c->setup.u.us.sock, c->poll_events, c, qb_ipcs_dispatch_connection_request); } return -EINVAL; } void qb_ipcs_request_rate_limit(struct qb_ipcs_service *s, enum qb_ipcs_rate_limit rl) { struct qb_ipcs_connection *c; enum qb_loop_priority old_p = s->poll_priority; struct qb_list_head *pos; struct qb_list_head *n; switch (rl) { case QB_IPCS_RATE_FAST: s->poll_priority = QB_LOOP_HIGH; break; case QB_IPCS_RATE_SLOW: case QB_IPCS_RATE_OFF: case QB_IPCS_RATE_OFF_2: s->poll_priority = QB_LOOP_LOW; break; default: case QB_IPCS_RATE_NORMAL: s->poll_priority = QB_LOOP_MED; break; } qb_list_for_each_safe(pos, n, &s->connections) { c = qb_list_entry(pos, struct qb_ipcs_connection, list); qb_ipcs_connection_ref(c); if (rl == QB_IPCS_RATE_OFF) { qb_ipcs_flowcontrol_set(c, 1); } else if (rl == QB_IPCS_RATE_OFF_2) { qb_ipcs_flowcontrol_set(c, 2); } else { qb_ipcs_flowcontrol_set(c, QB_FALSE); } if (old_p != s->poll_priority) { (void)_modify_dispatch_descriptor_(c); } qb_ipcs_connection_unref(c); } } void qb_ipcs_ref(struct qb_ipcs_service *s) { qb_atomic_int_inc(&s->ref_count); } void qb_ipcs_unref(struct qb_ipcs_service *s) { int32_t free_it; assert(s->ref_count > 0); free_it = qb_atomic_int_dec_and_test(&s->ref_count); if (free_it) { qb_util_log(LOG_DEBUG, "%s() - destroying", __func__); free(s); } } void qb_ipcs_destroy(struct qb_ipcs_service *s) { struct qb_ipcs_connection *c = NULL; struct qb_list_head *pos; struct qb_list_head *n; if (s == NULL) { return; } qb_list_for_each_safe(pos, n, &s->connections) { c = qb_list_entry(pos, struct qb_ipcs_connection, list); if (c == NULL) { continue; } qb_ipcs_disconnect(c); } (void)qb_ipcs_us_withdraw(s); /* service destroyed, remove initial alloc ref */ qb_ipcs_unref(s); } /* * connection API */ static struct qb_ipc_one_way * _event_sock_one_way_get(struct qb_ipcs_connection * c) { if (c->service->needs_sock_for_poll) { return &c->setup; } if (c->event.type == QB_IPC_SOCKET) { return &c->event; } return NULL; } static struct qb_ipc_one_way * _response_sock_one_way_get(struct qb_ipcs_connection * c) { if (c->service->needs_sock_for_poll) { return &c->setup; } if (c->response.type == QB_IPC_SOCKET) { return &c->response; } return NULL; } ssize_t qb_ipcs_response_send(struct qb_ipcs_connection *c, const void *data, size_t size) { ssize_t res; if (c == NULL) { return -EINVAL; } qb_ipcs_connection_ref(c); res = c->service->funcs.send(&c->response, data, size); if (res == size) { c->stats.responses++; } else if (res == -EAGAIN || res == -ETIMEDOUT) { struct qb_ipc_one_way *ow = _response_sock_one_way_get(c); if (ow) { ssize_t res2 = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT); if (res2 < 0) { res = res2; } } c->stats.send_retries++; } qb_ipcs_connection_unref(c); return res; } ssize_t qb_ipcs_response_sendv(struct qb_ipcs_connection * c, const struct iovec * iov, size_t iov_len) { ssize_t res; if (c == NULL) { return -EINVAL; } qb_ipcs_connection_ref(c); res = c->service->funcs.sendv(&c->response, iov, iov_len); if (res > 0) { c->stats.responses++; } else if (res == -EAGAIN || res == -ETIMEDOUT) { struct qb_ipc_one_way *ow = _response_sock_one_way_get(c); if (ow) { ssize_t res2 = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT); if (res2 < 0) { res = res2; } } c->stats.send_retries++; } qb_ipcs_connection_unref(c); return res; } static int32_t resend_event_notifications(struct qb_ipcs_connection *c) { ssize_t res = 0; if (!c->service->needs_sock_for_poll) { return res; } if (c->outstanding_notifiers > 0) { res = qb_ipc_us_send(&c->setup, c->receive_buf, c->outstanding_notifiers); } if (res > 0) { c->outstanding_notifiers -= res; } assert(c->outstanding_notifiers >= 0); if (c->outstanding_notifiers == 0) { c->poll_events = POLLIN | POLLPRI | POLLNVAL; (void)_modify_dispatch_descriptor_(c); } return res; } static int32_t new_event_notification(struct qb_ipcs_connection * c) { ssize_t res = 0; if (!c->service->needs_sock_for_poll) { return res; } assert(c->outstanding_notifiers >= 0); if (c->outstanding_notifiers > 0) { c->outstanding_notifiers++; res = resend_event_notifications(c); } else { res = qb_ipc_us_send(&c->setup, &c->outstanding_notifiers, 1); if (res == -EAGAIN) { /* * notify the client later, when we can. */ c->outstanding_notifiers++; c->poll_events = POLLOUT | POLLIN | POLLPRI | POLLNVAL; (void)_modify_dispatch_descriptor_(c); } } return res; } ssize_t qb_ipcs_event_send(struct qb_ipcs_connection * c, const void *data, size_t size) { ssize_t res; ssize_t resn; if (c == NULL) { return -EINVAL; } else if (size > c->event.max_msg_size) { return -EMSGSIZE; } qb_ipcs_connection_ref(c); res = c->service->funcs.send(&c->event, data, size); if (res == size) { c->stats.events++; resn = new_event_notification(c); if (resn < 0 && resn != -EAGAIN && resn != -ENOBUFS) { errno = -resn; qb_util_perror(LOG_WARNING, "new_event_notification (%s)", c->description); res = resn; } } else if (res == -EAGAIN || res == -ETIMEDOUT) { struct qb_ipc_one_way *ow = _event_sock_one_way_get(c); if (c->outstanding_notifiers > 0) { resn = resend_event_notifications(c); } if (ow) { resn = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT); if (resn < 0) { res = resn; } } c->stats.send_retries++; } qb_ipcs_connection_unref(c); return res; } ssize_t qb_ipcs_event_sendv(struct qb_ipcs_connection * c, const struct iovec * iov, size_t iov_len) { ssize_t res; ssize_t resn; if (c == NULL) { return -EINVAL; } qb_ipcs_connection_ref(c); res = c->service->funcs.sendv(&c->event, iov, iov_len); if (res > 0) { c->stats.events++; resn = new_event_notification(c); if (resn < 0 && resn != -EAGAIN) { errno = -resn; qb_util_perror(LOG_WARNING, "new_event_notification (%s)", c->description); res = resn; } } else if (res == -EAGAIN || res == -ETIMEDOUT) { struct qb_ipc_one_way *ow = _event_sock_one_way_get(c); if (c->outstanding_notifiers > 0) { resn = resend_event_notifications(c); } if (ow) { resn = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT); if (resn < 0) { res = resn; } } c->stats.send_retries++; } qb_ipcs_connection_unref(c); return res; } qb_ipcs_connection_t * qb_ipcs_connection_first_get(struct qb_ipcs_service * s) { struct qb_ipcs_connection *c; if (qb_list_empty(&s->connections)) { return NULL; } c = qb_list_first_entry(&s->connections, struct qb_ipcs_connection, list); qb_ipcs_connection_ref(c); return c; } qb_ipcs_connection_t * qb_ipcs_connection_next_get(struct qb_ipcs_service * s, struct qb_ipcs_connection * current) { struct qb_ipcs_connection *c; if (current == NULL || qb_list_is_last(¤t->list, &s->connections)) { return NULL; } c = qb_list_first_entry(¤t->list, struct qb_ipcs_connection, list); qb_ipcs_connection_ref(c); return c; } int32_t qb_ipcs_service_id_get(struct qb_ipcs_connection * c) { if (c == NULL) { return -EINVAL; } return c->service->service_id; } struct qb_ipcs_connection * qb_ipcs_connection_alloc(struct qb_ipcs_service *s) { struct qb_ipcs_connection *c = calloc(1, sizeof(struct qb_ipcs_connection)); if (c == NULL) { return NULL; } c->pid = 0; c->euid = -1; c->egid = -1; c->receive_buf = NULL; c->context = NULL; c->fc_enabled = QB_FALSE; c->state = QB_IPCS_CONNECTION_INACTIVE; c->poll_events = POLLIN | POLLPRI | POLLNVAL; c->setup.type = s->type; c->request.type = s->type; c->response.type = s->type; c->event.type = s->type; (void)strlcpy(c->description, "not set yet", CONNECTION_DESCRIPTION); /* initial alloc ref */ qb_ipcs_connection_ref(c); /* * The connection makes use of the service object. Give the connection * a reference to the service so we know the service can never be destroyed * until the connection is done with it. */ qb_ipcs_ref(s); c->service = s; qb_list_init(&c->list); return c; } void qb_ipcs_connection_ref(struct qb_ipcs_connection *c) { if (c) { qb_atomic_int_inc(&c->refcount); } } void qb_ipcs_connection_unref(struct qb_ipcs_connection *c) { int32_t free_it; if (c == NULL) { return; } if (c->refcount < 1) { qb_util_log(LOG_ERR, "ref:%d state:%d (%s)", c->refcount, c->state, c->description); assert(0); } free_it = qb_atomic_int_dec_and_test(&c->refcount); if (free_it) { qb_list_del(&c->list); if (c->service->serv_fns.connection_destroyed) { c->service->serv_fns.connection_destroyed(c); } c->service->funcs.disconnect(c); /* Let go of the connection's reference to the service */ qb_ipcs_unref(c->service); free(c->receive_buf); free(c); } } void qb_ipcs_disconnect(struct qb_ipcs_connection *c) { int32_t res = 0; qb_loop_job_dispatch_fn rerun_job; if (c == NULL) { return; } qb_util_log(LOG_DEBUG, "%s(%s) state:%d", __func__, c->description, c->state); if (c->state == QB_IPCS_CONNECTION_ACTIVE) { c->service->funcs.disconnect(c); c->state = QB_IPCS_CONNECTION_INACTIVE; c->service->stats.closed_connections++; /* return early as it's an incomplete connection. */ return; } if (c->state == QB_IPCS_CONNECTION_ESTABLISHED) { c->service->funcs.disconnect(c); c->state = QB_IPCS_CONNECTION_SHUTTING_DOWN; c->service->stats.active_connections--; c->service->stats.closed_connections++; } if (c->state == QB_IPCS_CONNECTION_SHUTTING_DOWN) { int scheduled_retry = 0; res = 0; if (c->service->serv_fns.connection_closed) { res = c->service->serv_fns.connection_closed(c); } if (res != 0) { /* OK, so they want the connection_closed * function re-run */ rerun_job = (qb_loop_job_dispatch_fn) qb_ipcs_disconnect; res = c->service->poll_fns.job_add(QB_LOOP_LOW, c, rerun_job); if (res == 0) { /* this function is going to be called again. * so hold off on the unref */ scheduled_retry = 1; } } if (scheduled_retry == 0) { /* This removes the initial alloc ref */ qb_ipcs_connection_unref(c); } } } static void qb_ipcs_flowcontrol_set(struct qb_ipcs_connection *c, int32_t fc_enable) { if (c == NULL) { return; } if (c->fc_enabled != fc_enable) { c->service->funcs.fc_set(&c->request, fc_enable); c->fc_enabled = fc_enable; c->stats.flow_control_state = fc_enable; c->stats.flow_control_count++; } } static int32_t _process_request_(struct qb_ipcs_connection *c, int32_t ms_timeout) { int32_t res = 0; ssize_t size; struct qb_ipc_request_header *hdr; qb_ipcs_connection_ref(c); if (c->service->funcs.peek && c->service->funcs.reclaim) { size = c->service->funcs.peek(&c->request, (void **)&hdr, ms_timeout); } else { hdr = c->receive_buf; size = c->service->funcs.recv(&c->request, hdr, c->request.max_msg_size, ms_timeout); } if (size < 0) { if (size != -EAGAIN && size != -ETIMEDOUT) { qb_util_perror(LOG_DEBUG, "recv from client connection failed (%s)", c->description); } else { c->stats.recv_retries++; } res = size; goto cleanup; } else if (size == 0 || hdr->id == QB_IPC_MSG_DISCONNECT) { qb_util_log(LOG_DEBUG, "client requesting a disconnect (%s)", c->description); res = -ESHUTDOWN; goto cleanup; } else { c->stats.requests++; res = c->service->serv_fns.msg_process(c, hdr, hdr->size); /* 0 == good, negative == backoff */ if (res < 0) { res = -ENOBUFS; } else { res = size; } } if (c && c->service->funcs.peek && c->service->funcs.reclaim) { c->service->funcs.reclaim(&c->request); } cleanup: qb_ipcs_connection_unref(c); return res; } #define IPC_REQUEST_TIMEOUT 10 #define MAX_RECV_MSGS 50 static ssize_t _request_q_len_get(struct qb_ipcs_connection *c) { ssize_t q_len; if (c->service->funcs.q_len_get) { q_len = c->service->funcs.q_len_get(&c->request); if (q_len <= 0) { return q_len; } if (c->service->poll_priority == QB_LOOP_MED) { q_len = QB_MIN(q_len, 5); } else if (c->service->poll_priority == QB_LOOP_LOW) { q_len = 1; } else { q_len = QB_MIN(q_len, MAX_RECV_MSGS); } } else { q_len = 1; } return q_len; } int32_t qb_ipcs_dispatch_connection_request(int32_t fd, int32_t revents, void *data) { struct qb_ipcs_connection *c = (struct qb_ipcs_connection *)data; char bytes[MAX_RECV_MSGS]; int32_t res = 0; int32_t res2; int32_t recvd = 0; ssize_t avail; if (revents & POLLNVAL) { qb_util_log(LOG_DEBUG, "NVAL conn (%s)", c->description); res = -EINVAL; goto dispatch_cleanup; } if (revents & POLLHUP) { qb_util_log(LOG_DEBUG, "HUP conn (%s)", c->description); res = -ESHUTDOWN; goto dispatch_cleanup; } if (revents & POLLOUT) { /* try resend events now that fd can write */ res = resend_event_notifications(c); if (res < 0 && res != -EAGAIN) { errno = -res; qb_util_perror(LOG_WARNING, "resend_event_notifications (%s)", c->description); } /* nothing to read */ if ((revents & POLLIN) == 0) { res = 0; goto dispatch_cleanup; } } if (c->fc_enabled) { res = 0; goto dispatch_cleanup; } avail = _request_q_len_get(c); if (c->service->needs_sock_for_poll && avail == 0) { res2 = qb_ipc_us_recv(&c->setup, bytes, 1, 0); if (qb_ipc_us_sock_error_is_disconnected(res2)) { errno = -res2; qb_util_perror(LOG_WARNING, "conn (%s) disconnected", c->description); res = -ESHUTDOWN; goto dispatch_cleanup; } else { qb_util_log(LOG_WARNING, "conn (%s) Nothing in q but got POLLIN on fd:%d (res2:%d)", c->description, fd, res2); res = 0; goto dispatch_cleanup; } } do { res = _process_request_(c, IPC_REQUEST_TIMEOUT); if (res == -ESHUTDOWN) { goto dispatch_cleanup; } if (res > 0 || res == -ENOBUFS || res == -EINVAL) { recvd++; } if (res > 0) { avail--; } } while (avail > 0 && res > 0 && !c->fc_enabled); if (c->service->needs_sock_for_poll && recvd > 0) { res2 = qb_ipc_us_recv(&c->setup, bytes, recvd, -1); if (qb_ipc_us_sock_error_is_disconnected(res2)) { errno = -res2; qb_util_perror(LOG_ERR, "error receiving from setup sock (%s)", c->description); res = -ESHUTDOWN; goto dispatch_cleanup; } } res = QB_MIN(0, res); if (res == -EAGAIN || res == -ETIMEDOUT || res == -ENOBUFS) { res = 0; } if (res != 0) { if (res != -ENOTCONN) { /* * Abnormal state (ENOTCONN is normal shutdown). */ errno = -res; qb_util_perror(LOG_ERR, "request returned error (%s)", c->description); } } dispatch_cleanup: if (res != 0) { qb_ipcs_disconnect(c); } return res; } void qb_ipcs_context_set(struct qb_ipcs_connection *c, void *context) { if (c == NULL) { return; } c->context = context; } void * qb_ipcs_context_get(struct qb_ipcs_connection *c) { if (c == NULL) { return NULL; } return c->context; } void * qb_ipcs_connection_service_context_get(qb_ipcs_connection_t *c) { if (c == NULL || c->service == NULL) { return NULL; } return c->service->context; } int32_t qb_ipcs_connection_stats_get(qb_ipcs_connection_t * c, struct qb_ipcs_connection_stats * stats, int32_t clear_after_read) { if (c == NULL) { return -EINVAL; } memcpy(stats, &c->stats, sizeof(struct qb_ipcs_connection_stats)); if (clear_after_read) { memset(&c->stats, 0, sizeof(struct qb_ipcs_connection_stats_2)); c->stats.client_pid = c->pid; } return 0; } struct qb_ipcs_connection_stats_2* qb_ipcs_connection_stats_get_2(qb_ipcs_connection_t *c, int32_t clear_after_read) { struct qb_ipcs_connection_stats_2 * stats; if (c == NULL) { errno = EINVAL; return NULL; } stats = calloc(1, sizeof(struct qb_ipcs_connection_stats_2)); if (stats == NULL) { return NULL; } memcpy(stats, &c->stats, sizeof(struct qb_ipcs_connection_stats_2)); if (c->service->funcs.q_len_get) { stats->event_q_length = c->service->funcs.q_len_get(&c->event); } else { stats->event_q_length = 0; } if (clear_after_read) { memset(&c->stats, 0, sizeof(struct qb_ipcs_connection_stats_2)); c->stats.client_pid = c->pid; } return stats; } int32_t qb_ipcs_stats_get(struct qb_ipcs_service * s, struct qb_ipcs_stats * stats, int32_t clear_after_read) { if (s == NULL) { return -EINVAL; } memcpy(stats, &s->stats, sizeof(struct qb_ipcs_stats)); if (clear_after_read) { memset(&s->stats, 0, sizeof(struct qb_ipcs_stats)); } return 0; } void qb_ipcs_connection_auth_set(qb_ipcs_connection_t *c, uid_t uid, gid_t gid, mode_t mode) { if (c) { c->auth.uid = uid; c->auth.gid = gid; c->auth.mode = mode; } } + +int32_t +qb_ipcs_connection_get_buffer_size(qb_ipcs_connection_t *c) +{ + if (c == NULL) { + return -EINVAL; + } + + /* request, response, and event shoud all have the same + * buffer size allocated. It doesn't matter which we return + * here. */ + return c->response.max_msg_size; +} diff --git a/tests/check_ipc.c b/tests/check_ipc.c index ad4888f..93d0a83 100644 --- a/tests/check_ipc.c +++ b/tests/check_ipc.c @@ -1,1384 +1,1390 @@ /* * Copyright (c) 2010 Red Hat, Inc. * * All rights reserved. * * Author: Angus Salkeld * * This file is part of libqb. * * libqb is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation, either version 2.1 of the License, or * (at your option) any later version. * * libqb is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with libqb. If not, see . */ #include "os_base.h" #include #include #include #include #include #include #include #include static const char *ipc_name = "ipc_test"; #define DEFAULT_MAX_MSG_SIZE (8192*16) static int CALCULATED_DGRAM_MAX_MSG_SIZE = 0; #define DGRAM_MAX_MSG_SIZE \ (CALCULATED_DGRAM_MAX_MSG_SIZE == 0 ? \ CALCULATED_DGRAM_MAX_MSG_SIZE = qb_ipcc_verify_dgram_max_msg_size(DEFAULT_MAX_MSG_SIZE) : \ CALCULATED_DGRAM_MAX_MSG_SIZE) #define MAX_MSG_SIZE (ipc_type == QB_IPC_SOCKET ? DGRAM_MAX_MSG_SIZE : DEFAULT_MAX_MSG_SIZE) /* The size the giant msg's data field needs to be to make * this the largests msg we can successfully send. */ #define GIANT_MSG_DATA_SIZE MAX_MSG_SIZE - sizeof(struct qb_ipc_response_header) - 8 static qb_ipcc_connection_t *conn; static enum qb_ipc_type ipc_type; enum my_msg_ids { IPC_MSG_REQ_TX_RX, IPC_MSG_RES_TX_RX, IPC_MSG_REQ_DISPATCH, IPC_MSG_RES_DISPATCH, IPC_MSG_REQ_BULK_EVENTS, IPC_MSG_RES_BULK_EVENTS, IPC_MSG_REQ_STRESS_EVENT, IPC_MSG_RES_STRESS_EVENT, IPC_MSG_REQ_SERVER_FAIL, IPC_MSG_RES_SERVER_FAIL, IPC_MSG_REQ_SERVER_DISCONNECT, IPC_MSG_RES_SERVER_DISCONNECT, }; /* Test Cases * * 1) basic send & recv differnet message sizes * * 2) send message to start dispatch (confirm receipt) * * 3) flow control * * 4) authentication * * 5) thread safety * * 6) cleanup * * 7) service availabilty * * 8) multiple services */ static qb_loop_t *my_loop; static qb_ipcs_service_t* s1; static int32_t turn_on_fc = QB_FALSE; static int32_t fc_enabled = 89; static int32_t send_event_on_created = QB_FALSE; static int32_t disconnect_after_created = QB_FALSE; static int32_t num_bulk_events = 10; static int32_t num_stress_events = 30000; static int32_t reference_count_test = QB_FALSE; static int32_t exit_handler(int32_t rsignal, void *data) { qb_log(LOG_DEBUG, "caught signal %d", rsignal); qb_ipcs_destroy(s1); return -1; } static int32_t s1_msg_process_fn(qb_ipcs_connection_t *c, void *data, size_t size) { struct qb_ipc_request_header *req_pt = (struct qb_ipc_request_header *)data; struct qb_ipc_response_header response = { 0, }; ssize_t res; if (req_pt->id == IPC_MSG_REQ_TX_RX) { response.size = sizeof(struct qb_ipc_response_header); response.id = IPC_MSG_RES_TX_RX; response.error = 0; res = qb_ipcs_response_send(c, &response, response.size); if (res < 0) { qb_perror(LOG_INFO, "qb_ipcs_response_send"); } else if (res != response.size) { qb_log(LOG_DEBUG, "qb_ipcs_response_send %zd != %d", res, response.size); } if (turn_on_fc) { qb_ipcs_request_rate_limit(s1, QB_IPCS_RATE_OFF); } } else if (req_pt->id == IPC_MSG_REQ_DISPATCH) { response.size = sizeof(struct qb_ipc_response_header); response.id = IPC_MSG_RES_DISPATCH; response.error = 0; res = qb_ipcs_event_send(c, &response, sizeof(response)); if (res < 0) { qb_perror(LOG_INFO, "qb_ipcs_event_send"); } } else if (req_pt->id == IPC_MSG_REQ_BULK_EVENTS) { int32_t m; int32_t num; struct qb_ipcs_connection_stats_2 *stats; response.size = sizeof(struct qb_ipc_response_header); response.error = 0; stats = qb_ipcs_connection_stats_get_2(c, QB_FALSE); num = stats->event_q_length; free(stats); /* crazy large message */ res = qb_ipcs_event_send(c, &response, MAX_MSG_SIZE*10); ck_assert_int_eq(res, -EMSGSIZE); /* send one event before responding */ res = qb_ipcs_event_send(c, &response, sizeof(response)); ck_assert_int_eq(res, sizeof(response)); response.id++; /* send response */ response.id = IPC_MSG_RES_BULK_EVENTS; res = qb_ipcs_response_send(c, &response, response.size); ck_assert_int_eq(res, sizeof(response)); /* send the rest of the events after the response */ for (m = 1; m < num_bulk_events; m++) { res = qb_ipcs_event_send(c, &response, sizeof(response)); if (res == -EAGAIN || res == -ENOBUFS) { /* retry */ usleep(1000); m--; continue; } ck_assert_int_eq(res, sizeof(response)); response.id++; } stats = qb_ipcs_connection_stats_get_2(c, QB_FALSE); ck_assert_int_eq(stats->event_q_length - num, num_bulk_events); free(stats); } else if (req_pt->id == IPC_MSG_REQ_STRESS_EVENT) { struct { struct qb_ipc_response_header hdr __attribute__ ((aligned(8))); char data[GIANT_MSG_DATA_SIZE] __attribute__ ((aligned(8))); uint32_t sent_msgs __attribute__ ((aligned(8))); } __attribute__ ((aligned(8))) giant_event_send; int32_t m; response.size = sizeof(struct qb_ipc_response_header); response.error = 0; response.id = IPC_MSG_RES_STRESS_EVENT; res = qb_ipcs_response_send(c, &response, response.size); ck_assert_int_eq(res, sizeof(response)); giant_event_send.hdr.error = 0; giant_event_send.hdr.id = IPC_MSG_RES_STRESS_EVENT; for (m = 0; m < num_stress_events; m++) { size_t sent_len = sizeof(struct qb_ipc_response_header); if (((m+1) % 1000) == 0) { sent_len = sizeof(giant_event_send); giant_event_send.sent_msgs = m + 1; } giant_event_send.hdr.size = sent_len; res = qb_ipcs_event_send(c, &giant_event_send, sent_len); if (res < 0) { if (res == -EAGAIN || res == -ENOBUFS) { /* yield to the receive process */ usleep(1000); m--; continue; } else { qb_perror(LOG_DEBUG, "sending stress events"); ck_assert_int_eq(res, sent_len); } } else if (((m+1) % 1000) == 0) { qb_log(LOG_DEBUG, "SENT: %d stress events sent", m+1); } giant_event_send.hdr.id++; } } else if (req_pt->id == IPC_MSG_REQ_SERVER_FAIL) { exit(0); } else if (req_pt->id == IPC_MSG_REQ_SERVER_DISCONNECT) { qb_ipcs_disconnect(c); } return 0; } static int32_t my_job_add(enum qb_loop_priority p, void *data, qb_loop_job_dispatch_fn fn) { return qb_loop_job_add(my_loop, p, data, fn); } static int32_t my_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t events, void *data, qb_ipcs_dispatch_fn_t fn) { return qb_loop_poll_add(my_loop, p, fd, events, data, fn); } static int32_t my_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t events, void *data, qb_ipcs_dispatch_fn_t fn) { return qb_loop_poll_mod(my_loop, p, fd, events, data, fn); } static int32_t my_dispatch_del(int32_t fd) { return qb_loop_poll_del(my_loop, fd); } static int32_t s1_connection_closed(qb_ipcs_connection_t *c) { qb_enter(); qb_leave(); return 0; } static void outq_flush (void *data) { static int i = 0; struct cs_ipcs_conn_context *cnx; cnx = qb_ipcs_context_get(data); qb_log(LOG_DEBUG,"iter %u\n", i); i++; if (i == 2) { qb_ipcs_destroy(s1); s1 = NULL; } /* is the reference counting is not working, this should fail * for i > 1. */ qb_ipcs_event_send(data, "test", 4); assert(memcmp(cnx, "test", 4) == 0); if (i < 5) { qb_loop_job_add(my_loop, QB_LOOP_HIGH, data, outq_flush); } else { /* this single unref should clean everything up. */ qb_ipcs_connection_unref(data); qb_log(LOG_INFO, "end of test, stopping loop"); qb_loop_stop(my_loop); } } static void s1_connection_destroyed(qb_ipcs_connection_t *c) { qb_enter(); if (reference_count_test) { struct cs_ipcs_conn_context *cnx; cnx = qb_ipcs_context_get(c); free(cnx); } else { qb_loop_stop(my_loop); } qb_leave(); } static void s1_connection_created(qb_ipcs_connection_t *c) { + int32_t max = MAX_MSG_SIZE; + if (send_event_on_created) { struct qb_ipc_response_header response; int32_t res; response.size = sizeof(struct qb_ipc_response_header); response.id = IPC_MSG_RES_DISPATCH; response.error = 0; res = qb_ipcs_event_send(c, &response, sizeof(response)); ck_assert_int_eq(res, response.size); } if (reference_count_test) { struct cs_ipcs_conn_context *context; qb_ipcs_connection_ref(c); qb_loop_job_add(my_loop, QB_LOOP_HIGH, c, outq_flush); context = calloc(1, 20); memcpy(context, "test", 4); qb_ipcs_context_set(c, context); } + + + ck_assert_int_eq(max, qb_ipcs_connection_get_buffer_size(c)); + } static void run_ipc_server(void) { int32_t res; qb_loop_signal_handle handle; struct qb_ipcs_service_handlers sh = { .connection_accept = NULL, .connection_created = s1_connection_created, .msg_process = s1_msg_process_fn, .connection_destroyed = s1_connection_destroyed, .connection_closed = s1_connection_closed, }; struct qb_ipcs_poll_handlers ph = { .job_add = my_job_add, .dispatch_add = my_dispatch_add, .dispatch_mod = my_dispatch_mod, .dispatch_del = my_dispatch_del, }; qb_loop_signal_add(my_loop, QB_LOOP_HIGH, SIGSTOP, NULL, exit_handler, &handle); qb_loop_signal_add(my_loop, QB_LOOP_HIGH, SIGTERM, NULL, exit_handler, &handle); my_loop = qb_loop_create(); s1 = qb_ipcs_create(ipc_name, 4, ipc_type, &sh); fail_if(s1 == 0); qb_ipcs_poll_handlers_set(s1, &ph); res = qb_ipcs_run(s1); ck_assert_int_eq(res, 0); qb_loop_run(my_loop); qb_log(LOG_DEBUG, "loop finished - done ..."); } static int32_t run_function_in_new_process(void (*run_ipc_server_fn)(void)) { pid_t pid = fork (); if (pid == -1) { fprintf (stderr, "Can't fork\n"); return -1; } if (pid == 0) { run_ipc_server_fn(); return 0; } return pid; } static int32_t stop_process(pid_t pid) { /* wait a bit for the server to shutdown by it's self */ usleep(100000); kill(pid, SIGTERM); waitpid(pid, NULL, 0); return 0; } struct my_req { struct qb_ipc_request_header hdr; char message[1024 * 1024]; }; static struct my_req request; static int32_t send_and_check(int32_t req_id, uint32_t size, int32_t ms_timeout, int32_t expect_perfection) { struct qb_ipc_response_header res_header; int32_t res; int32_t try_times = 0; request.hdr.id = req_id; request.hdr.size = sizeof(struct qb_ipc_request_header) + size; /* check that we can't send a message that is too big * and we get the right return code. */ res = qb_ipcc_send(conn, &request, MAX_MSG_SIZE*2); ck_assert_int_eq(res, -EMSGSIZE); repeat_send: res = qb_ipcc_send(conn, &request, request.hdr.size); try_times++; if (res < 0) { if (res == -EAGAIN && try_times < 10) { goto repeat_send; } else { if (res == -EAGAIN && try_times >= 10) { fc_enabled = QB_TRUE; } errno = -res; qb_perror(LOG_INFO, "qb_ipcc_send"); return res; } } if (req_id == IPC_MSG_REQ_DISPATCH) { res = qb_ipcc_event_recv(conn, &res_header, sizeof(struct qb_ipc_response_header), ms_timeout); } else { res = qb_ipcc_recv(conn, &res_header, sizeof(struct qb_ipc_response_header), ms_timeout); } if (res == -EINTR) { return -1; } if (res == -EAGAIN || res == -ETIMEDOUT) { fc_enabled = QB_TRUE; qb_perror(LOG_DEBUG, "qb_ipcc_recv"); return res; } if (expect_perfection) { ck_assert_int_eq(res, sizeof(struct qb_ipc_response_header)); ck_assert_int_eq(res_header.id, req_id + 1); ck_assert_int_eq(res_header.size, sizeof(struct qb_ipc_response_header)); } return res; } static int32_t recv_timeout = -1; static void test_ipc_txrx(void) { int32_t j; int32_t c = 0; size_t size; pid_t pid; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); size = QB_MIN(sizeof(struct qb_ipc_request_header), 64); for (j = 1; j < 19; j++) { size *= 2; if (size >= MAX_MSG_SIZE) break; if (send_and_check(IPC_MSG_REQ_TX_RX, size, recv_timeout, QB_TRUE) < 0) { break; } } if (turn_on_fc) { ck_assert_int_eq(fc_enabled, QB_TRUE); } qb_ipcc_disconnect(conn); stop_process(pid); } static void test_ipc_exit(void) { struct qb_ipc_request_header req_header; struct qb_ipc_response_header res_header; struct iovec iov[1]; int32_t res; int32_t c = 0; int32_t j = 0; pid_t pid; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); req_header.id = IPC_MSG_REQ_TX_RX; req_header.size = sizeof(struct qb_ipc_request_header); iov[0].iov_len = req_header.size; iov[0].iov_base = &req_header; res = qb_ipcc_sendv_recv(conn, iov, 1, &res_header, sizeof(struct qb_ipc_response_header), -1); ck_assert_int_eq(res, sizeof(struct qb_ipc_response_header)); /* kill the server */ stop_process(pid); /* * wait a bit for the server to die. */ sleep(1); /* * this needs to free up the shared mem */ qb_ipcc_disconnect(conn); } START_TEST(test_ipc_exit_us) { qb_enter(); ipc_type = QB_IPC_SOCKET; ipc_name = __func__; recv_timeout = 5000; test_ipc_exit(); qb_leave(); } END_TEST START_TEST(test_ipc_exit_shm) { qb_enter(); ipc_type = QB_IPC_SHM; ipc_name = __func__; recv_timeout = 1000; test_ipc_exit(); qb_leave(); } END_TEST START_TEST(test_ipc_txrx_shm_tmo) { qb_enter(); turn_on_fc = QB_FALSE; ipc_type = QB_IPC_SHM; ipc_name = __func__; recv_timeout = 1000; test_ipc_txrx(); qb_leave(); } END_TEST START_TEST(test_ipc_txrx_shm_block) { qb_enter(); turn_on_fc = QB_FALSE; ipc_type = QB_IPC_SHM; ipc_name = __func__; recv_timeout = -1; test_ipc_txrx(); qb_leave(); } END_TEST START_TEST(test_ipc_fc_shm) { qb_enter(); turn_on_fc = QB_TRUE; ipc_type = QB_IPC_SHM; recv_timeout = 500; ipc_name = __func__; test_ipc_txrx(); qb_leave(); } END_TEST START_TEST(test_ipc_txrx_us_block) { qb_enter(); turn_on_fc = QB_FALSE; ipc_type = QB_IPC_SOCKET; ipc_name = __func__; recv_timeout = -1; test_ipc_txrx(); qb_leave(); } END_TEST START_TEST(test_ipc_txrx_us_tmo) { qb_enter(); turn_on_fc = QB_FALSE; ipc_type = QB_IPC_SOCKET; ipc_name = __func__; recv_timeout = 1000; test_ipc_txrx(); qb_leave(); } END_TEST START_TEST(test_ipc_fc_us) { qb_enter(); turn_on_fc = QB_TRUE; ipc_type = QB_IPC_SOCKET; recv_timeout = 500; ipc_name = __func__; test_ipc_txrx(); qb_leave(); } END_TEST struct my_res { struct qb_ipc_response_header hdr; char message[1024 * 1024]; }; static void test_ipc_dispatch(void) { int32_t j; int32_t c = 0; pid_t pid; int32_t size; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); size = QB_MIN(sizeof(struct qb_ipc_request_header), 64); for (j = 1; j < 19; j++) { size *= 2; if (size >= MAX_MSG_SIZE) break; if (send_and_check(IPC_MSG_REQ_DISPATCH, size, recv_timeout, QB_TRUE) < 0) { break; } } qb_ipcc_disconnect(conn); stop_process(pid); } START_TEST(test_ipc_disp_us) { qb_enter(); ipc_type = QB_IPC_SOCKET; ipc_name = __func__; test_ipc_dispatch(); qb_leave(); } END_TEST static int32_t events_received; static int32_t count_stress_events(int32_t fd, int32_t revents, void *data) { struct { struct qb_ipc_response_header hdr __attribute__ ((aligned(8))); char data[GIANT_MSG_DATA_SIZE] __attribute__ ((aligned(8))); uint32_t sent_msgs __attribute__ ((aligned(8))); } __attribute__ ((aligned(8))) giant_event_recv; qb_loop_t *cl = (qb_loop_t*)data; int32_t res; res = qb_ipcc_event_recv(conn, &giant_event_recv, sizeof(giant_event_recv), -1); if (res > 0) { events_received++; if ((events_received % 1000) == 0) { qb_log(LOG_DEBUG, "RECV: %d stress events processed", events_received); if (res != sizeof(giant_event_recv)) { qb_log(LOG_DEBUG, "Unexpected recv size, expected %d got %d", res, sizeof(giant_event_recv)); } else if (giant_event_recv.sent_msgs != events_received) { qb_log(LOG_DEBUG, "Server event mismatch. Server thinks we got %d msgs, but we only received %d", giant_event_recv.sent_msgs, events_received); } } } else if (res != -EAGAIN) { qb_perror(LOG_DEBUG, "count_stress_events"); qb_loop_stop(cl); return -1; } if (events_received >= num_stress_events) { qb_loop_stop(cl); return -1; } return 0; } static int32_t count_bulk_events(int32_t fd, int32_t revents, void *data) { qb_loop_t *cl = (qb_loop_t*)data; struct qb_ipc_response_header res_header; int32_t res; res = qb_ipcc_event_recv(conn, &res_header, sizeof(struct qb_ipc_response_header), -1); if (res > 0) { events_received++; } if (events_received >= num_bulk_events) { qb_loop_stop(cl); return -1; } return 0; } static void test_ipc_bulk_events(void) { struct qb_ipc_request_header req_header; struct qb_ipc_response_header res_header; struct iovec iov[1]; int32_t c = 0; int32_t j = 0; pid_t pid; int32_t res; qb_loop_t *cl; int32_t fd; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); events_received = 0; cl = qb_loop_create(); res = qb_ipcc_fd_get(conn, &fd); ck_assert_int_eq(res, 0); res = qb_loop_poll_add(cl, QB_LOOP_MED, fd, POLLIN, cl, count_bulk_events); ck_assert_int_eq(res, 0); res = send_and_check(IPC_MSG_REQ_BULK_EVENTS, 0, recv_timeout, QB_TRUE); ck_assert_int_eq(res, sizeof(struct qb_ipc_response_header)); qb_loop_run(cl); ck_assert_int_eq(events_received, num_bulk_events); req_header.id = IPC_MSG_REQ_SERVER_FAIL; req_header.size = sizeof(struct qb_ipc_request_header); iov[0].iov_len = req_header.size; iov[0].iov_base = &req_header; res = qb_ipcc_sendv_recv(conn, iov, 1, &res_header, sizeof(struct qb_ipc_response_header), -1); if (res != -ECONNRESET && res != -ENOTCONN) { qb_log(LOG_ERR, "id:%d size:%d", res_header.id, res_header.size); ck_assert_int_eq(res, -ENOTCONN); } qb_ipcc_disconnect(conn); stop_process(pid); } static void test_ipc_stress_test(void) { struct qb_ipc_request_header req_header; struct qb_ipc_response_header res_header; struct iovec iov[1]; int32_t c = 0; int32_t j = 0; pid_t pid; int32_t res; qb_loop_t *cl; int32_t fd; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); qb_log(LOG_DEBUG, "Testing %d iterations of EVENT msg passing.", num_stress_events); events_received = 0; cl = qb_loop_create(); res = qb_ipcc_fd_get(conn, &fd); ck_assert_int_eq(res, 0); res = qb_loop_poll_add(cl, QB_LOOP_MED, fd, POLLIN, cl, count_stress_events); ck_assert_int_eq(res, 0); res = send_and_check(IPC_MSG_REQ_STRESS_EVENT, 0, recv_timeout, QB_TRUE); qb_loop_run(cl); ck_assert_int_eq(events_received, num_stress_events); req_header.id = IPC_MSG_REQ_SERVER_FAIL; req_header.size = sizeof(struct qb_ipc_request_header); iov[0].iov_len = req_header.size; iov[0].iov_base = &req_header; res = qb_ipcc_sendv_recv(conn, iov, 1, &res_header, sizeof(struct qb_ipc_response_header), -1); if (res != -ECONNRESET && res != -ENOTCONN) { qb_log(LOG_ERR, "id:%d size:%d", res_header.id, res_header.size); ck_assert_int_eq(res, -ENOTCONN); } qb_ipcc_disconnect(conn); stop_process(pid); } START_TEST(test_ipc_stress_test_us) { qb_enter(); send_event_on_created = QB_FALSE; ipc_type = QB_IPC_SOCKET; ipc_name = __func__; test_ipc_stress_test(); qb_leave(); } END_TEST START_TEST(test_ipc_bulk_events_us) { qb_enter(); send_event_on_created = QB_FALSE; ipc_type = QB_IPC_SOCKET; ipc_name = __func__; test_ipc_bulk_events(); qb_leave(); } END_TEST static void test_ipc_event_on_created(void) { int32_t c = 0; int32_t j = 0; pid_t pid; int32_t res; qb_loop_t *cl; int32_t fd; num_bulk_events = 1; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); events_received = 0; cl = qb_loop_create(); res = qb_ipcc_fd_get(conn, &fd); ck_assert_int_eq(res, 0); res = qb_loop_poll_add(cl, QB_LOOP_MED, fd, POLLIN, cl, count_bulk_events); ck_assert_int_eq(res, 0); qb_loop_run(cl); ck_assert_int_eq(events_received, num_bulk_events); qb_ipcc_disconnect(conn); stop_process(pid); } START_TEST(test_ipc_event_on_created_us) { qb_enter(); send_event_on_created = QB_TRUE; ipc_type = QB_IPC_SOCKET; ipc_name = __func__; test_ipc_event_on_created(); qb_leave(); } END_TEST static void test_ipc_disconnect_after_created(void) { struct qb_ipc_request_header req_header; struct qb_ipc_response_header res_header; struct iovec iov[1]; int32_t c = 0; int32_t j = 0; pid_t pid; int32_t res; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); ck_assert_int_eq(QB_TRUE, qb_ipcc_is_connected(conn)); req_header.id = IPC_MSG_REQ_SERVER_DISCONNECT; req_header.size = sizeof(struct qb_ipc_request_header); iov[0].iov_len = req_header.size; iov[0].iov_base = &req_header; res = qb_ipcc_sendv_recv(conn, iov, 1, &res_header, sizeof(struct qb_ipc_response_header), -1); /* * confirm we get -ENOTCONN or -ECONNRESET */ if (res != -ECONNRESET && res != -ENOTCONN) { qb_log(LOG_ERR, "id:%d size:%d", res_header.id, res_header.size); ck_assert_int_eq(res, -ENOTCONN); } ck_assert_int_eq(QB_FALSE, qb_ipcc_is_connected(conn)); qb_ipcc_disconnect(conn); stop_process(pid); } START_TEST(test_ipc_disconnect_after_created_us) { qb_enter(); disconnect_after_created = QB_TRUE; ipc_type = QB_IPC_SOCKET; ipc_name = __func__; test_ipc_disconnect_after_created(); qb_leave(); } END_TEST static void test_ipc_server_fail(void) { struct qb_ipc_request_header req_header; struct qb_ipc_response_header res_header; struct iovec iov[1]; int32_t res; int32_t j; int32_t c = 0; pid_t pid; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); /* * tell the server to exit */ req_header.id = IPC_MSG_REQ_SERVER_FAIL; req_header.size = sizeof(struct qb_ipc_request_header); iov[0].iov_len = req_header.size; iov[0].iov_base = &req_header; ck_assert_int_eq(QB_TRUE, qb_ipcc_is_connected(conn)); res = qb_ipcc_sendv_recv(conn, iov, 1, &res_header, sizeof(struct qb_ipc_response_header), -1); /* * confirm we get -ENOTCONN or ECONNRESET */ if (res != -ECONNRESET && res != -ENOTCONN) { qb_log(LOG_ERR, "id:%d size:%d", res_header.id, res_header.size); ck_assert_int_eq(res, -ENOTCONN); } ck_assert_int_eq(QB_FALSE, qb_ipcc_is_connected(conn)); qb_ipcc_disconnect(conn); stop_process(pid); } START_TEST(test_ipc_server_fail_soc) { qb_enter(); ipc_type = QB_IPC_SOCKET; ipc_name = __func__; test_ipc_server_fail(); qb_leave(); } END_TEST START_TEST(test_ipc_disp_shm) { qb_enter(); ipc_type = QB_IPC_SHM; ipc_name = __func__; test_ipc_dispatch(); qb_leave(); } END_TEST START_TEST(test_ipc_stress_test_shm) { qb_enter(); send_event_on_created = QB_FALSE; ipc_type = QB_IPC_SHM; ipc_name = __func__; test_ipc_stress_test(); qb_leave(); } END_TEST START_TEST(test_ipc_bulk_events_shm) { qb_enter(); ipc_type = QB_IPC_SHM; ipc_name = __func__; test_ipc_bulk_events(); qb_leave(); } END_TEST START_TEST(test_ipc_event_on_created_shm) { qb_enter(); send_event_on_created = QB_TRUE; ipc_type = QB_IPC_SHM; ipc_name = __func__; test_ipc_event_on_created(); qb_leave(); } END_TEST START_TEST(test_ipc_server_fail_shm) { qb_enter(); ipc_type = QB_IPC_SHM; ipc_name = __func__; test_ipc_server_fail(); qb_leave(); } END_TEST static void test_ipc_service_ref_count(void) { int32_t c = 0; int32_t j = 0; pid_t pid; reference_count_test = QB_TRUE; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); sleep(5); qb_ipcc_disconnect(conn); stop_process(pid); } START_TEST(test_ipc_service_ref_count_shm) { qb_enter(); ipc_type = QB_IPC_SHM; ipc_name = __func__; test_ipc_service_ref_count(); qb_leave(); } END_TEST START_TEST(test_ipc_service_ref_count_us) { qb_enter(); ipc_type = QB_IPC_SOCKET; ipc_name = __func__; test_ipc_service_ref_count(); qb_leave(); } END_TEST static void test_max_dgram_size(void) { /* most implementations will not let you set a dgram buffer * of 1 million bytes. This test verifies that the we can detect * the max dgram buffersize regardless, and that the value we detect * is consistent. */ int32_t init; int32_t i; qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_REMOVE, QB_LOG_FILTER_FILE, "*", LOG_TRACE); init = qb_ipcc_verify_dgram_max_msg_size(1000000); fail_if(init <= 0); for (i = 0; i < 100; i++) { int try = qb_ipcc_verify_dgram_max_msg_size(1000000); ck_assert_int_eq(init, try); } qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD, QB_LOG_FILTER_FILE, "*", LOG_TRACE); } START_TEST(test_ipc_max_dgram_size) { qb_enter(); test_max_dgram_size(); qb_leave(); } END_TEST static Suite * make_shm_suite(void) { TCase *tc; Suite *s = suite_create("shm"); tc = tcase_create("ipc_server_fail_shm"); tcase_add_test(tc, test_ipc_server_fail_shm); tcase_set_timeout(tc, 6); suite_add_tcase(s, tc); tc = tcase_create("ipc_txrx_shm_block"); tcase_add_test(tc, test_ipc_txrx_shm_block); tcase_set_timeout(tc, 6); suite_add_tcase(s, tc); tc = tcase_create("ipc_txrx_shm_tmo"); tcase_add_test(tc, test_ipc_txrx_shm_tmo); tcase_set_timeout(tc, 6); suite_add_tcase(s, tc); tc = tcase_create("ipc_fc_shm"); tcase_add_test(tc, test_ipc_fc_shm); tcase_set_timeout(tc, 6); suite_add_tcase(s, tc); tc = tcase_create("ipc_dispatch_shm"); tcase_add_test(tc, test_ipc_disp_shm); tcase_set_timeout(tc, 16); suite_add_tcase(s, tc); tc = tcase_create("ipc_stress_test_shm"); tcase_add_test(tc, test_ipc_stress_test_shm); tcase_set_timeout(tc, 16); suite_add_tcase(s, tc); tc = tcase_create("ipc_bulk_events_shm"); tcase_add_test(tc, test_ipc_bulk_events_shm); tcase_set_timeout(tc, 16); suite_add_tcase(s, tc); tc = tcase_create("ipc_exit_shm"); tcase_add_test(tc, test_ipc_exit_shm); tcase_set_timeout(tc, 3); suite_add_tcase(s, tc); tc = tcase_create("ipc_event_on_created_shm"); tcase_add_test(tc, test_ipc_event_on_created_shm); suite_add_tcase(s, tc); tc = tcase_create("ipc_service_ref_count_shm"); tcase_add_test(tc, test_ipc_service_ref_count_shm); tcase_set_timeout(tc, 10); suite_add_tcase(s, tc); return s; } static Suite * make_soc_suite(void) { Suite *s = suite_create("socket"); TCase *tc; tc = tcase_create("ipc_max_dgram_size"); tcase_add_test(tc, test_ipc_max_dgram_size); tcase_set_timeout(tc, 30); suite_add_tcase(s, tc); tc = tcase_create("ipc_server_fail_soc"); tcase_add_test(tc, test_ipc_server_fail_soc); tcase_set_timeout(tc, 6); suite_add_tcase(s, tc); tc = tcase_create("ipc_txrx_us_block"); tcase_add_test(tc, test_ipc_txrx_us_block); tcase_set_timeout(tc, 6); suite_add_tcase(s, tc); tc = tcase_create("ipc_txrx_us_tmo"); tcase_add_test(tc, test_ipc_txrx_us_tmo); tcase_set_timeout(tc, 6); suite_add_tcase(s, tc); tc = tcase_create("ipc_fc_us"); tcase_add_test(tc, test_ipc_fc_us); tcase_set_timeout(tc, 6); suite_add_tcase(s, tc); tc = tcase_create("ipc_exit_us"); tcase_add_test(tc, test_ipc_exit_us); tcase_set_timeout(tc, 6); suite_add_tcase(s, tc); tc = tcase_create("ipc_dispatch_us"); tcase_add_test(tc, test_ipc_disp_us); tcase_set_timeout(tc, 16); suite_add_tcase(s, tc); tc = tcase_create("ipc_stress_test_us"); tcase_add_test(tc, test_ipc_stress_test_us); tcase_set_timeout(tc, 60); suite_add_tcase(s, tc); tc = tcase_create("ipc_bulk_events_us"); tcase_add_test(tc, test_ipc_bulk_events_us); tcase_set_timeout(tc, 16); suite_add_tcase(s, tc); tc = tcase_create("ipc_event_on_created_us"); tcase_add_test(tc, test_ipc_event_on_created_us); suite_add_tcase(s, tc); tc = tcase_create("ipc_disconnect_after_created_us"); tcase_add_test(tc, test_ipc_disconnect_after_created_us); suite_add_tcase(s, tc); tc = tcase_create("ipc_service_ref_count_us"); tcase_add_test(tc, test_ipc_service_ref_count_us); tcase_set_timeout(tc, 10); suite_add_tcase(s, tc); return s; } int32_t main(void) { int32_t number_failed; SRunner *sr; Suite *s; int32_t do_shm_tests = QB_TRUE; #ifdef DISABLE_IPC_SHM do_shm_tests = QB_FALSE; #endif /* DISABLE_IPC_SHM */ s = make_soc_suite(); sr = srunner_create(s); if (do_shm_tests) { srunner_add_suite(sr, make_shm_suite()); } qb_log_init("check", LOG_USER, LOG_EMERG); qb_log_ctl(QB_LOG_SYSLOG, QB_LOG_CONF_ENABLED, QB_FALSE); qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD, QB_LOG_FILTER_FILE, "*", LOG_TRACE); qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE); qb_log_format_set(QB_LOG_STDERR, "lib/%f|%l| %b"); srunner_run_all(sr, CK_VERBOSE); number_failed = srunner_ntests_failed(sr); srunner_free(sr); return (number_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE; }