diff --git a/include/qb/qbipcs.h b/include/qb/qbipcs.h index baf16e5..2adcd17 100644 --- a/include/qb/qbipcs.h +++ b/include/qb/qbipcs.h @@ -1,433 +1,453 @@ /* * Copyright (C) 2006-2009 Red Hat, Inc. * * Author: Steven Dake , * Angus Salkeld * * This file is part of libqb. * * libqb is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation, either version 2.1 of the License, or * (at your option) any later version. * * libqb is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with libqb. If not, see . */ #ifndef QB_IPCS_H_DEFINED #define QB_IPCS_H_DEFINED /* *INDENT-OFF* */ #ifdef __cplusplus extern "C" { #endif /* *INDENT-ON* */ #include #include #include #include #include /** * @file qbipcs.h * * Server IPC API. * * @example ipcserver.c */ enum qb_ipcs_rate_limit { QB_IPCS_RATE_FAST, QB_IPCS_RATE_NORMAL, QB_IPCS_RATE_SLOW, QB_IPCS_RATE_OFF, QB_IPCS_RATE_OFF_2, }; struct qb_ipcs_connection; typedef struct qb_ipcs_connection qb_ipcs_connection_t; struct qb_ipcs_service; typedef struct qb_ipcs_service qb_ipcs_service_t; struct qb_ipcs_stats { uint32_t active_connections; uint32_t closed_connections; }; struct qb_ipcs_connection_stats { int32_t client_pid; uint64_t requests; uint64_t responses; uint64_t events; uint64_t send_retries; uint64_t recv_retries; int32_t flow_control_state; uint64_t flow_control_count; }; struct qb_ipcs_connection_stats_2 { int32_t client_pid; uint64_t requests; uint64_t responses; uint64_t events; uint64_t send_retries; uint64_t recv_retries; int32_t flow_control_state; uint64_t flow_control_count; uint32_t event_q_length; }; typedef int32_t (*qb_ipcs_dispatch_fn_t) (int32_t fd, int32_t revents, void *data); typedef int32_t (*qb_ipcs_dispatch_add_fn)(enum qb_loop_priority p, int32_t fd, int32_t events, void *data, qb_ipcs_dispatch_fn_t fn); typedef int32_t (*qb_ipcs_dispatch_mod_fn)(enum qb_loop_priority p, int32_t fd, int32_t events, void *data, qb_ipcs_dispatch_fn_t fn); typedef int32_t (*qb_ipcs_dispatch_del_fn)(int32_t fd); typedef int32_t (*qb_ipcs_job_add_fn)(enum qb_loop_priority p, void *data, qb_loop_job_dispatch_fn dispatch_fn); struct qb_ipcs_poll_handlers { qb_ipcs_job_add_fn job_add; qb_ipcs_dispatch_add_fn dispatch_add; qb_ipcs_dispatch_mod_fn dispatch_mod; qb_ipcs_dispatch_del_fn dispatch_del; }; /** * This callback is to check wether you want to accept a new connection. * * The type of checks you should do are authentication, service availabilty * or process resource constraints. * @return 0 to accept or -errno to indicate a failure (sent back to the client) * * @note you can call qb_ipcs_connection_auth_set() within this function. */ typedef int32_t (*qb_ipcs_connection_accept_fn) (qb_ipcs_connection_t *c, uid_t uid, gid_t gid); /** * This is called after a new connection has been created. */ typedef void (*qb_ipcs_connection_created_fn) (qb_ipcs_connection_t *c); /** * This is called after a connection has been disconnected. * * @note if you return anything but 0 this function will be * repeativily called (until 0 is returned). */ typedef int32_t (*qb_ipcs_connection_closed_fn) (qb_ipcs_connection_t *c); /** * This is called just before a connection is freed. */ typedef void (*qb_ipcs_connection_destroyed_fn) (qb_ipcs_connection_t *c); /** * This is the message processing calback. * It is called with the message data. */ typedef int32_t (*qb_ipcs_msg_process_fn) (qb_ipcs_connection_t *c, void *data, size_t size); struct qb_ipcs_service_handlers { qb_ipcs_connection_accept_fn connection_accept; qb_ipcs_connection_created_fn connection_created; qb_ipcs_msg_process_fn msg_process; qb_ipcs_connection_closed_fn connection_closed; qb_ipcs_connection_destroyed_fn connection_destroyed; }; /** * Create a new IPC server. * * @param name for clients to connect to. * @param service_id an integer to associate with the service * @param type transport type. * @param handlers callbacks. * @return the new service instance. */ qb_ipcs_service_t* qb_ipcs_create(const char *name, int32_t service_id, enum qb_ipc_type type, struct qb_ipcs_service_handlers *handlers); /** * Increase the reference counter on the service object. * * @param s service instance */ void qb_ipcs_ref(qb_ipcs_service_t *s); /** * Decrease the reference counter on the service object. * * @param s service instance */ void qb_ipcs_unref(qb_ipcs_service_t *s); /** * Set your poll callbacks. * * @param s service instance * @param handlers the handlers that you want ipcs to use. */ void qb_ipcs_poll_handlers_set(qb_ipcs_service_t* s, struct qb_ipcs_poll_handlers *handlers); /** * Associate a "user" pointer with this service. * * @param s service instance * @param context the pointer to associate with this service. * @see qb_ipcs_service_context_get() */ void qb_ipcs_service_context_set(qb_ipcs_service_t* s, void *context); /** * Get the context (set previously) * * @param s service instance * @return the context * @see qb_ipcs_service_context_set() */ void *qb_ipcs_service_context_get(qb_ipcs_service_t* s); /** * run the new IPC server. * @param s service instance * @return 0 == ok; -errno to indicate a failure. Service is destroyed on failure. */ int32_t qb_ipcs_run(qb_ipcs_service_t* s); /** * Destroy the IPC server. * * @param s service instance to destroy */ void qb_ipcs_destroy(qb_ipcs_service_t* s); /** * Limit the incomming request rate. * @param s service instance * @param rl the new rate */ void qb_ipcs_request_rate_limit(qb_ipcs_service_t* s, enum qb_ipcs_rate_limit rl); /** * Send a response to a incomming request. * * @param c connection instance * @param data the message to send * @param size the size of the message * @return size sent or -errno for errors * * @note the data must include a qb_ipc_response_header at * the top of the message. The client will read the size field * to determine how much to recv. */ ssize_t qb_ipcs_response_send(qb_ipcs_connection_t *c, const void *data, size_t size); /** * Send a response to a incomming request. * * @param c connection instance * @param iov the iovec struct that points to the message to send * @param iov_len the number of iovecs. * @return size sent or -errno for errors * * @note the iov[0] must be a qb_ipc_response_header. The client will * read the size field to determine how much to recv. + * + * @note When send returns -EMSGSIZE, this means the msg is too + * large and will never succeed. To determine the max msg size + * a client can be sent, use qb_ipcs_connection_get_buffer_size() */ ssize_t qb_ipcs_response_sendv(qb_ipcs_connection_t *c, const struct iovec * iov, size_t iov_len); /** * Send an asyncronous event message to the client. * * @param c connection instance * @param data the message to send * @param size the size of the message * @return size sent or -errno for errors * * @note the data must include a qb_ipc_response_header at * the top of the message. The client will read the size field * to determine how much to recv. + * + * @note When send returns -EMSGSIZE, this means the msg is too + * large and will never succeed. To determine the max msg size + * a client can be sent, use qb_ipcs_connection_get_buffer_size() */ ssize_t qb_ipcs_event_send(qb_ipcs_connection_t *c, const void *data, size_t size); /** * Send an asyncronous event message to the client. * * @param c connection instance * @param iov the iovec struct that points to the message to send * @param iov_len the number of iovecs. * @return size sent or -errno for errors * * @note the iov[0] must be a qb_ipc_response_header. The client will * read the size field to determine how much to recv. + * + * @note When send returns -EMSGSIZE, this means the msg is too + * large and will never succeed. To determine the max msg size + * a client can be sent, use qb_ipcs_connection_get_buffer_size() */ ssize_t qb_ipcs_event_sendv(qb_ipcs_connection_t *c, const struct iovec * iov, size_t iov_len); /** * Increment the connection's reference counter. * * @param c connection instance */ void qb_ipcs_connection_ref(qb_ipcs_connection_t *c); /** * Decrement the connection's reference counter. * * @param c connection instance */ void qb_ipcs_connection_unref(qb_ipcs_connection_t *c); /** * Disconnect from this client. * * @param c connection instance */ void qb_ipcs_disconnect(qb_ipcs_connection_t *c); /** * Get the service id related to this connection's service. * (as passed into qb_ipcs_create() * * @return service id. */ int32_t qb_ipcs_service_id_get(qb_ipcs_connection_t *c); /** * Associate a "user" pointer with this connection. * * @param context the point to associate with this connection. * @param c connection instance * @see qb_ipcs_context_get() */ void qb_ipcs_context_set(qb_ipcs_connection_t *c, void *context); /** * Get the context (set previously) * * @param c connection instance * @return the context * @see qb_ipcs_context_set() */ void *qb_ipcs_context_get(qb_ipcs_connection_t *c); /** * Get the context previously set on the service backing this connection * * @param c connection instance * @return the context * @see qb_ipcs_service_context_set */ void *qb_ipcs_connection_service_context_get(qb_ipcs_connection_t *c); /** * Get the connection statistics. * * @deprecated from v0.13.0 onwards, use qb_ipcs_connection_stats_get_2 * @param stats (out) the statistics structure * @param clear_after_read clear stats after copying them into stats * @param c connection instance * @return 0 == ok; -errno to indicate a failure */ int32_t qb_ipcs_connection_stats_get(qb_ipcs_connection_t *c, struct qb_ipcs_connection_stats* stats, int32_t clear_after_read); /** * Get (and allocate) the connection statistics. * * @param clear_after_read clear stats after copying them into stats * @param c connection instance * @retval NULL if no memory or invalid connection * @retval allocated statistics structure (user must free it). */ struct qb_ipcs_connection_stats_2* qb_ipcs_connection_stats_get_2(qb_ipcs_connection_t *c, int32_t clear_after_read); /** * Get the service statistics. * * @param stats (out) the statistics structure * @param clear_after_read clear stats after copying them into stats * @param pt service instance * @return 0 == ok; -errno to indicate a failure */ int32_t qb_ipcs_stats_get(qb_ipcs_service_t* pt, struct qb_ipcs_stats* stats, int32_t clear_after_read); /** * Get the first connection. * * @note call qb_ipcs_connection_unref() after using the connection. * * @param pt service instance * @return first connection */ qb_ipcs_connection_t * qb_ipcs_connection_first_get(qb_ipcs_service_t* pt); /** * Get the next connection. * * @note call qb_ipcs_connection_unref() after using the connection. * * @param pt service instance * @param current current connection * @return next connection */ qb_ipcs_connection_t * qb_ipcs_connection_next_get(qb_ipcs_service_t* pt, qb_ipcs_connection_t *current); /** * Set the permissions on and shared memory files so that both processes can * read and write to them. * * @param conn connection instance * @param uid the user id to set. * @param gid the group id to set. * @param mode the mode to set. * * @see chmod() chown() * @note this must be called within the qb_ipcs_connection_accept_fn() * callback. */ void qb_ipcs_connection_auth_set(qb_ipcs_connection_t *conn, uid_t uid, gid_t gid, mode_t mode); +/** + * Retrieve the connection ipc buffer size. This reflects the + * largest size msg that can be sent or received. + * + * @param conn connection instance + * @return msg size in bytes, negative value on error. + */ +int32_t qb_ipcs_connection_get_buffer_size(qb_ipcs_connection_t *conn); /* *INDENT-OFF* */ #ifdef __cplusplus } #endif /* *INDENT-ON* */ #endif /* QB_IPCS_H_DEFINED */ diff --git a/include/qb/qbrb.h b/include/qb/qbrb.h index fd37626..972d660 100644 --- a/include/qb/qbrb.h +++ b/include/qb/qbrb.h @@ -1,303 +1,304 @@ /* * Copyright (C) 2010 Red Hat, Inc. * * Author: Angus Salkeld * * This file is part of libqb. * * libqb is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation, either version 2.1 of the License, or * (at your option) any later version. * * libqb is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with libqb. If not, see . */ #ifndef QB_RB_H_DEFINED #define QB_RB_H_DEFINED /* *INDENT-OFF* */ #ifdef __cplusplus extern "C" { #endif /* *INDENT-ON* */ #include #include /** * @file qbrb.h * This implements a ring buffer that works in "chunks" not bytes. * So you write/read a complete chunk or not at all. * There are two types of ring buffer normal and overwrite. * Overwrite will reclaim the oldest chunks inorder to make way for new ones, * the normal version will refuse to write a new chunk if the ring buffer * is full. * * This implementation is capable of working across processes, but one process * must only write and the other prrocess read. * * The read process will do the following: * @code * rb = qb_rb_open("test2", 2000, QB_RB_FLAG_SHARED_PROCESS|QB_RB_FLAG_CREATE); * for (i = 0; i < 200; i++) { * try_read_again: * l = qb_rb_chunk_read(rb, (void *)out, 32, 1000); * if (l < 0) { * goto try_read_again; * } * } * ... * qb_rb_close(rb); * * @endcode * * The write process will do the following: * @code * rb = qb_rb_open("test2", 2000, QB_RB_FLAG_SHARED_PROCESS); * for (i = 0; i < 200; i++) { * try_write_again: * l = qb_rb_chunk_write(rb, &v, sizeof(v)); * if (l < sizeof(v)) { * goto try_write_again; * } * } * ... * qb_rb_close(rb); * @endcode * * @author Angus Salkeld */ /** * create a ring buffer (rather than open and existing one) * @see qb_rb_open() */ #define QB_RB_FLAG_CREATE 0x01 /** * New calls to qb_rb_chunk_write() will call qb_rb_chunk_reclaim() * if there is not enough space. * If this is not set then new writes will be refused. * @see qb_rb_open() */ #define QB_RB_FLAG_OVERWRITE 0x02 /** * The ringbuffer will be shared between pthreads not processes. * This effects the type of locks/semaphores that are used. * @see qb_rb_open() */ #define QB_RB_FLAG_SHARED_THREAD 0x04 /** * The ringbuffer will be shared between processes. * This effects the type of locks/semaphores that are used. * @see qb_rb_open() */ #define QB_RB_FLAG_SHARED_PROCESS 0x08 /** * Don't use semaphores, only atomic ops. * This mean that the timeout passed into qb_rb_chunk_read() * will be ignored. */ #define QB_RB_FLAG_NO_SEMAPHORE 0x10 struct qb_ringbuffer_s; typedef struct qb_ringbuffer_s qb_ringbuffer_t; /** * Create the ring buffer with the given type. * * This creates allocates a ring buffer in shared memory. * * @param name the unique name of this ringbuffer. * @param size the requested size. * @param flags or'ed flags * @param shared_user_data_size size for a shared data area. * @note the actual size will be rounded up to the next page size. * @return a new ring buffer or NULL if there was a problem. * @see QB_RB_FLAG_CREATE, QB_RB_FLAG_OVERWRITE, QB_RB_FLAG_SHARED_THREAD, QB_RB_FLAG_SHARED_PROCESS */ qb_ringbuffer_t *qb_rb_open(const char *name, size_t size, uint32_t flags, size_t shared_user_data_size); /** * Dereference the ringbuffer and if we are the last user destroy it. * * All files, mmaped memory, semaphores and locks will be destroyed. * * @param rb ringbuffer instance */ void qb_rb_close(qb_ringbuffer_t * rb); /** * Get the name of the ringbuffer. * @param rb ringbuffer instance * @return name. */ char *qb_rb_name_get(qb_ringbuffer_t * rb); /** * Get a point to user shared data area. * * @note this is of size "shared_user_data_size" passed into qb_rb_open() * * @param rb ringbuffer instance * @return pointer to shared data. */ void *qb_rb_shared_user_data_get(qb_ringbuffer_t * rb); /** * Write a chunk to the ring buffer. * * This simply calls qb_rb_chunk_alloc() and then * qb_rb_chunk_commit(). * * @param rb ringbuffer instance * @param data (in) the data to write * @param len (in) the size of the chunk. * @return the amount of bytes actually buffered (either len or -1). * * @see qb_rb_chunk_alloc() * @see qb_rb_chunk_commit() */ ssize_t qb_rb_chunk_write(qb_ringbuffer_t * rb, const void *data, size_t len); /** * Allocate space for a chunk of the given size. * - * If type == QB_RB_FLAG_OVERWRITE then this will always return non-null - * but if it's type is QB_RB_NORMAL then when there is not enough space then - * it will return NULL. + * If type == QB_RB_FLAG_OVERWRITE and NULL is returned, memory corruption of + * the memory file has occured. The ringbuffer should be destroyed. + * If type == QB_RB_NORMAL then when there is not enough space it will + * return NULL. * * @param rb ringbuffer instance * @param len (in) the size to allocate. * @return pointer to chunk to write to, or NULL (if no space). * * @see qb_rb_chunk_alloc() */ void *qb_rb_chunk_alloc(qb_ringbuffer_t * rb, size_t len); /** * finalize the chunk. * @param rb ringbuffer instance * @param len (in) the size of the chunk. */ int32_t qb_rb_chunk_commit(qb_ringbuffer_t * rb, size_t len); /** * Read (without reclaiming) the last chunk. * * This function is a way of accessing the next chunk without a memcpy(). * You can read the chunk data in place. * * @note This function will not "pop" the chunk, you will need to call * qb_rb_chunk_reclaim(). * @param rb ringbuffer instance * @param data_out (out) a pointer to the next chunk to read (not copied). * @param ms_timeout (in) time to wait for new data. * * @return the size of the chunk (0 if buffer empty). */ ssize_t qb_rb_chunk_peek(qb_ringbuffer_t * rb, void **data_out, int32_t ms_timeout); /** * Reclaim the oldest chunk. * You will need to call this if using qb_rb_chunk_peek(). * @param rb ringbuffer instance */ void qb_rb_chunk_reclaim(qb_ringbuffer_t * rb); /** * Read the oldest chunk into data_out. * * This is the same as qb_rb_chunk_peek() memcpy() and qb_rb_chunk_reclaim(). * * @param rb ringbuffer instance * @param data_out (in/out) the chunk will be memcpy'ed into this. * @param len (in) the size of data_out. * @param ms_timeout the amount od time to wait for new data. * @return the size of the chunk, or error. */ ssize_t qb_rb_chunk_read(qb_ringbuffer_t * rb, void *data_out, size_t len, int32_t ms_timeout); /** * Get the reference count. * * @param rb ringbuffer instance * @return the number of references */ int32_t qb_rb_refcount_get(qb_ringbuffer_t * rb); /** * The amount of free space in the ring buffer. * * @note Some of this space will be consumed by the chunk headers. * @param rb ringbuffer instance */ ssize_t qb_rb_space_free(qb_ringbuffer_t * rb); /** * The total amount of data in the buffer. * * @note This includes the chunk headers (8 bytes per chunk). * @param rb ringbuffer instance */ ssize_t qb_rb_space_used(qb_ringbuffer_t * rb); /** * The total number of chunks in the buffer. * * @param rb ringbuffer instance */ ssize_t qb_rb_chunks_used(qb_ringbuffer_t * rb); /** * Write the contents of the Ring Buffer to file. * @param fd open file to write the ringbuffer data to. * @param rb ringbuffer instance * @see qb_rb_create_from_file() */ ssize_t qb_rb_write_to_file(qb_ringbuffer_t * rb, int32_t fd); /** * Load the saved ring buffer from file into tempory memory. * @param fd file with saved ringbuffer data. * @param flags same flags as passed into qb_rb_open() * @return new ringbuffer instance * @see qb_rb_write_to_file() */ qb_ringbuffer_t *qb_rb_create_from_file(int32_t fd, uint32_t flags); /** * Like 'chown' it changes the owner and group of the ringbuffers * resources. * @param owner uid of the owner to change to * @param group gid of the group to change to * @param rb ringbuffer instance * @return status (0 = ok, -errno for error) */ int32_t qb_rb_chown(qb_ringbuffer_t * rb, uid_t owner, gid_t group); /** * Like 'chmod' it changes the mode of the ringbuffers resources. * @param mode mode to change to * @param rb ringbuffer instance * @retval 0 == ok * @retval -errno for error */ int32_t qb_rb_chmod(qb_ringbuffer_t * rb, mode_t mode); /* *INDENT-OFF* */ #ifdef __cplusplus } #endif /* __cplusplus */ /* *INDENT-ON* */ #endif /* QB_RB_H_DEFINED */ diff --git a/lib/ipcs.c b/lib/ipcs.c index d3baaab..5a54983 100644 --- a/lib/ipcs.c +++ b/lib/ipcs.c @@ -1,943 +1,956 @@ /* * Copyright (C) 2010 Red Hat, Inc. * * Author: Angus Salkeld * * This file is part of libqb. * * libqb is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation, either version 2.1 of the License, or * (at your option) any later version. * * libqb is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with libqb. If not, see . */ #include "os_base.h" #include "util_int.h" #include "ipc_int.h" #include #include #include static void qb_ipcs_flowcontrol_set(struct qb_ipcs_connection *c, int32_t fc_enable); static int32_t new_event_notification(struct qb_ipcs_connection * c); static QB_LIST_DECLARE(qb_ipc_services); qb_ipcs_service_t * qb_ipcs_create(const char *name, int32_t service_id, enum qb_ipc_type type, struct qb_ipcs_service_handlers *handlers) { struct qb_ipcs_service *s; s = calloc(1, sizeof(struct qb_ipcs_service)); if (s == NULL) { return NULL; } if (type == QB_IPC_NATIVE) { #ifdef DISABLE_IPC_SHM s->type = QB_IPC_SOCKET; #else s->type = QB_IPC_SHM; #endif /* DISABLE_IPC_SHM */ } else { s->type = type; } s->pid = getpid(); s->needs_sock_for_poll = QB_FALSE; s->poll_priority = QB_LOOP_MED; /* Initial alloc ref */ qb_ipcs_ref(s); s->service_id = service_id; (void)strlcpy(s->name, name, NAME_MAX); s->serv_fns.connection_accept = handlers->connection_accept; s->serv_fns.connection_created = handlers->connection_created; s->serv_fns.msg_process = handlers->msg_process; s->serv_fns.connection_closed = handlers->connection_closed; s->serv_fns.connection_destroyed = handlers->connection_destroyed; qb_list_init(&s->connections); qb_list_init(&s->list); qb_list_add(&s->list, &qb_ipc_services); return s; } void qb_ipcs_poll_handlers_set(struct qb_ipcs_service *s, struct qb_ipcs_poll_handlers *handlers) { s->poll_fns.job_add = handlers->job_add; s->poll_fns.dispatch_add = handlers->dispatch_add; s->poll_fns.dispatch_mod = handlers->dispatch_mod; s->poll_fns.dispatch_del = handlers->dispatch_del; } void qb_ipcs_service_context_set(qb_ipcs_service_t* s, void *context) { s->context = context; } void * qb_ipcs_service_context_get(qb_ipcs_service_t* s) { return s->context; } int32_t qb_ipcs_run(struct qb_ipcs_service *s) { int32_t res = 0; if (s->poll_fns.dispatch_add == NULL || s->poll_fns.dispatch_mod == NULL || s->poll_fns.dispatch_del == NULL) { res = -EINVAL; goto run_cleanup; } switch (s->type) { case QB_IPC_SOCKET: qb_ipcs_us_init((struct qb_ipcs_service *)s); break; case QB_IPC_SHM: #ifdef DISABLE_IPC_SHM res = -ENOTSUP; #else qb_ipcs_shm_init((struct qb_ipcs_service *)s); #endif /* DISABLE_IPC_SHM */ break; case QB_IPC_POSIX_MQ: case QB_IPC_SYSV_MQ: res = -ENOTSUP; break; default: res = -EINVAL; break; } if (res == 0) { res = qb_ipcs_us_publish(s); if (res < 0) { (void)qb_ipcs_us_withdraw(s); goto run_cleanup; } } run_cleanup: if (res < 0) { /* Failed to run services, removing initial alloc reference. */ qb_ipcs_unref(s); } return res; } static int32_t _modify_dispatch_descriptor_(struct qb_ipcs_connection *c) { qb_ipcs_dispatch_mod_fn disp_mod = c->service->poll_fns.dispatch_mod; if (c->service->type == QB_IPC_SOCKET) { return disp_mod(c->service->poll_priority, c->event.u.us.sock, c->poll_events, c, qb_ipcs_dispatch_connection_request); } else { return disp_mod(c->service->poll_priority, c->setup.u.us.sock, c->poll_events, c, qb_ipcs_dispatch_connection_request); } return -EINVAL; } void qb_ipcs_request_rate_limit(struct qb_ipcs_service *s, enum qb_ipcs_rate_limit rl) { struct qb_ipcs_connection *c; enum qb_loop_priority old_p = s->poll_priority; struct qb_list_head *pos; struct qb_list_head *n; switch (rl) { case QB_IPCS_RATE_FAST: s->poll_priority = QB_LOOP_HIGH; break; case QB_IPCS_RATE_SLOW: case QB_IPCS_RATE_OFF: case QB_IPCS_RATE_OFF_2: s->poll_priority = QB_LOOP_LOW; break; default: case QB_IPCS_RATE_NORMAL: s->poll_priority = QB_LOOP_MED; break; } qb_list_for_each_safe(pos, n, &s->connections) { c = qb_list_entry(pos, struct qb_ipcs_connection, list); qb_ipcs_connection_ref(c); if (rl == QB_IPCS_RATE_OFF) { qb_ipcs_flowcontrol_set(c, 1); } else if (rl == QB_IPCS_RATE_OFF_2) { qb_ipcs_flowcontrol_set(c, 2); } else { qb_ipcs_flowcontrol_set(c, QB_FALSE); } if (old_p != s->poll_priority) { (void)_modify_dispatch_descriptor_(c); } qb_ipcs_connection_unref(c); } } void qb_ipcs_ref(struct qb_ipcs_service *s) { qb_atomic_int_inc(&s->ref_count); } void qb_ipcs_unref(struct qb_ipcs_service *s) { int32_t free_it; assert(s->ref_count > 0); free_it = qb_atomic_int_dec_and_test(&s->ref_count); if (free_it) { qb_util_log(LOG_DEBUG, "%s() - destroying", __func__); free(s); } } void qb_ipcs_destroy(struct qb_ipcs_service *s) { struct qb_ipcs_connection *c = NULL; struct qb_list_head *pos; struct qb_list_head *n; if (s == NULL) { return; } qb_list_for_each_safe(pos, n, &s->connections) { c = qb_list_entry(pos, struct qb_ipcs_connection, list); if (c == NULL) { continue; } qb_ipcs_disconnect(c); } (void)qb_ipcs_us_withdraw(s); /* service destroyed, remove initial alloc ref */ qb_ipcs_unref(s); } /* * connection API */ static struct qb_ipc_one_way * _event_sock_one_way_get(struct qb_ipcs_connection * c) { if (c->service->needs_sock_for_poll) { return &c->setup; } if (c->event.type == QB_IPC_SOCKET) { return &c->event; } return NULL; } static struct qb_ipc_one_way * _response_sock_one_way_get(struct qb_ipcs_connection * c) { if (c->service->needs_sock_for_poll) { return &c->setup; } if (c->response.type == QB_IPC_SOCKET) { return &c->response; } return NULL; } ssize_t qb_ipcs_response_send(struct qb_ipcs_connection *c, const void *data, size_t size) { ssize_t res; if (c == NULL) { return -EINVAL; } qb_ipcs_connection_ref(c); res = c->service->funcs.send(&c->response, data, size); if (res == size) { c->stats.responses++; } else if (res == -EAGAIN || res == -ETIMEDOUT) { struct qb_ipc_one_way *ow = _response_sock_one_way_get(c); if (ow) { ssize_t res2 = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT); if (res2 < 0) { res = res2; } } c->stats.send_retries++; } qb_ipcs_connection_unref(c); return res; } ssize_t qb_ipcs_response_sendv(struct qb_ipcs_connection * c, const struct iovec * iov, size_t iov_len) { ssize_t res; if (c == NULL) { return -EINVAL; } qb_ipcs_connection_ref(c); res = c->service->funcs.sendv(&c->response, iov, iov_len); if (res > 0) { c->stats.responses++; } else if (res == -EAGAIN || res == -ETIMEDOUT) { struct qb_ipc_one_way *ow = _response_sock_one_way_get(c); if (ow) { ssize_t res2 = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT); if (res2 < 0) { res = res2; } } c->stats.send_retries++; } qb_ipcs_connection_unref(c); return res; } static int32_t resend_event_notifications(struct qb_ipcs_connection *c) { ssize_t res = 0; if (!c->service->needs_sock_for_poll) { return res; } if (c->outstanding_notifiers > 0) { res = qb_ipc_us_send(&c->setup, c->receive_buf, c->outstanding_notifiers); } if (res > 0) { c->outstanding_notifiers -= res; } assert(c->outstanding_notifiers >= 0); if (c->outstanding_notifiers == 0) { c->poll_events = POLLIN | POLLPRI | POLLNVAL; (void)_modify_dispatch_descriptor_(c); } return res; } static int32_t new_event_notification(struct qb_ipcs_connection * c) { ssize_t res = 0; if (!c->service->needs_sock_for_poll) { return res; } assert(c->outstanding_notifiers >= 0); if (c->outstanding_notifiers > 0) { c->outstanding_notifiers++; res = resend_event_notifications(c); } else { res = qb_ipc_us_send(&c->setup, &c->outstanding_notifiers, 1); if (res == -EAGAIN) { /* * notify the client later, when we can. */ c->outstanding_notifiers++; c->poll_events = POLLOUT | POLLIN | POLLPRI | POLLNVAL; (void)_modify_dispatch_descriptor_(c); } } return res; } ssize_t qb_ipcs_event_send(struct qb_ipcs_connection * c, const void *data, size_t size) { ssize_t res; ssize_t resn; if (c == NULL) { return -EINVAL; } else if (size > c->event.max_msg_size) { return -EMSGSIZE; } qb_ipcs_connection_ref(c); res = c->service->funcs.send(&c->event, data, size); if (res == size) { c->stats.events++; resn = new_event_notification(c); if (resn < 0 && resn != -EAGAIN && resn != -ENOBUFS) { errno = -resn; qb_util_perror(LOG_WARNING, "new_event_notification (%s)", c->description); res = resn; } } else if (res == -EAGAIN || res == -ETIMEDOUT) { struct qb_ipc_one_way *ow = _event_sock_one_way_get(c); if (c->outstanding_notifiers > 0) { resn = resend_event_notifications(c); } if (ow) { resn = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT); if (resn < 0) { res = resn; } } c->stats.send_retries++; } qb_ipcs_connection_unref(c); return res; } ssize_t qb_ipcs_event_sendv(struct qb_ipcs_connection * c, const struct iovec * iov, size_t iov_len) { ssize_t res; ssize_t resn; if (c == NULL) { return -EINVAL; } qb_ipcs_connection_ref(c); res = c->service->funcs.sendv(&c->event, iov, iov_len); if (res > 0) { c->stats.events++; resn = new_event_notification(c); if (resn < 0 && resn != -EAGAIN) { errno = -resn; qb_util_perror(LOG_WARNING, "new_event_notification (%s)", c->description); res = resn; } } else if (res == -EAGAIN || res == -ETIMEDOUT) { struct qb_ipc_one_way *ow = _event_sock_one_way_get(c); if (c->outstanding_notifiers > 0) { resn = resend_event_notifications(c); } if (ow) { resn = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT); if (resn < 0) { res = resn; } } c->stats.send_retries++; } qb_ipcs_connection_unref(c); return res; } qb_ipcs_connection_t * qb_ipcs_connection_first_get(struct qb_ipcs_service * s) { struct qb_ipcs_connection *c; if (qb_list_empty(&s->connections)) { return NULL; } c = qb_list_first_entry(&s->connections, struct qb_ipcs_connection, list); qb_ipcs_connection_ref(c); return c; } qb_ipcs_connection_t * qb_ipcs_connection_next_get(struct qb_ipcs_service * s, struct qb_ipcs_connection * current) { struct qb_ipcs_connection *c; if (current == NULL || qb_list_is_last(¤t->list, &s->connections)) { return NULL; } c = qb_list_first_entry(¤t->list, struct qb_ipcs_connection, list); qb_ipcs_connection_ref(c); return c; } int32_t qb_ipcs_service_id_get(struct qb_ipcs_connection * c) { if (c == NULL) { return -EINVAL; } return c->service->service_id; } struct qb_ipcs_connection * qb_ipcs_connection_alloc(struct qb_ipcs_service *s) { struct qb_ipcs_connection *c = calloc(1, sizeof(struct qb_ipcs_connection)); if (c == NULL) { return NULL; } c->pid = 0; c->euid = -1; c->egid = -1; c->receive_buf = NULL; c->context = NULL; c->fc_enabled = QB_FALSE; c->state = QB_IPCS_CONNECTION_INACTIVE; c->poll_events = POLLIN | POLLPRI | POLLNVAL; c->setup.type = s->type; c->request.type = s->type; c->response.type = s->type; c->event.type = s->type; (void)strlcpy(c->description, "not set yet", CONNECTION_DESCRIPTION); /* initial alloc ref */ qb_ipcs_connection_ref(c); /* * The connection makes use of the service object. Give the connection * a reference to the service so we know the service can never be destroyed * until the connection is done with it. */ qb_ipcs_ref(s); c->service = s; qb_list_init(&c->list); return c; } void qb_ipcs_connection_ref(struct qb_ipcs_connection *c) { if (c) { qb_atomic_int_inc(&c->refcount); } } void qb_ipcs_connection_unref(struct qb_ipcs_connection *c) { int32_t free_it; if (c == NULL) { return; } if (c->refcount < 1) { qb_util_log(LOG_ERR, "ref:%d state:%d (%s)", c->refcount, c->state, c->description); assert(0); } free_it = qb_atomic_int_dec_and_test(&c->refcount); if (free_it) { qb_list_del(&c->list); if (c->service->serv_fns.connection_destroyed) { c->service->serv_fns.connection_destroyed(c); } c->service->funcs.disconnect(c); /* Let go of the connection's reference to the service */ qb_ipcs_unref(c->service); free(c->receive_buf); free(c); } } void qb_ipcs_disconnect(struct qb_ipcs_connection *c) { int32_t res = 0; qb_loop_job_dispatch_fn rerun_job; if (c == NULL) { return; } qb_util_log(LOG_DEBUG, "%s(%s) state:%d", __func__, c->description, c->state); if (c->state == QB_IPCS_CONNECTION_ACTIVE) { c->service->funcs.disconnect(c); c->state = QB_IPCS_CONNECTION_INACTIVE; c->service->stats.closed_connections++; /* return early as it's an incomplete connection. */ return; } if (c->state == QB_IPCS_CONNECTION_ESTABLISHED) { c->service->funcs.disconnect(c); c->state = QB_IPCS_CONNECTION_SHUTTING_DOWN; c->service->stats.active_connections--; c->service->stats.closed_connections++; } if (c->state == QB_IPCS_CONNECTION_SHUTTING_DOWN) { int scheduled_retry = 0; res = 0; if (c->service->serv_fns.connection_closed) { res = c->service->serv_fns.connection_closed(c); } if (res != 0) { /* OK, so they want the connection_closed * function re-run */ rerun_job = (qb_loop_job_dispatch_fn) qb_ipcs_disconnect; res = c->service->poll_fns.job_add(QB_LOOP_LOW, c, rerun_job); if (res == 0) { /* this function is going to be called again. * so hold off on the unref */ scheduled_retry = 1; } } if (scheduled_retry == 0) { /* This removes the initial alloc ref */ qb_ipcs_connection_unref(c); } } } static void qb_ipcs_flowcontrol_set(struct qb_ipcs_connection *c, int32_t fc_enable) { if (c == NULL) { return; } if (c->fc_enabled != fc_enable) { c->service->funcs.fc_set(&c->request, fc_enable); c->fc_enabled = fc_enable; c->stats.flow_control_state = fc_enable; c->stats.flow_control_count++; } } static int32_t _process_request_(struct qb_ipcs_connection *c, int32_t ms_timeout) { int32_t res = 0; ssize_t size; struct qb_ipc_request_header *hdr; qb_ipcs_connection_ref(c); if (c->service->funcs.peek && c->service->funcs.reclaim) { size = c->service->funcs.peek(&c->request, (void **)&hdr, ms_timeout); } else { hdr = c->receive_buf; size = c->service->funcs.recv(&c->request, hdr, c->request.max_msg_size, ms_timeout); } if (size < 0) { if (size != -EAGAIN && size != -ETIMEDOUT) { qb_util_perror(LOG_DEBUG, "recv from client connection failed (%s)", c->description); } else { c->stats.recv_retries++; } res = size; goto cleanup; } else if (size == 0 || hdr->id == QB_IPC_MSG_DISCONNECT) { qb_util_log(LOG_DEBUG, "client requesting a disconnect (%s)", c->description); res = -ESHUTDOWN; goto cleanup; } else { c->stats.requests++; res = c->service->serv_fns.msg_process(c, hdr, hdr->size); /* 0 == good, negative == backoff */ if (res < 0) { res = -ENOBUFS; } else { res = size; } } if (c && c->service->funcs.peek && c->service->funcs.reclaim) { c->service->funcs.reclaim(&c->request); } cleanup: qb_ipcs_connection_unref(c); return res; } #define IPC_REQUEST_TIMEOUT 10 #define MAX_RECV_MSGS 50 static ssize_t _request_q_len_get(struct qb_ipcs_connection *c) { ssize_t q_len; if (c->service->funcs.q_len_get) { q_len = c->service->funcs.q_len_get(&c->request); if (q_len <= 0) { return q_len; } if (c->service->poll_priority == QB_LOOP_MED) { q_len = QB_MIN(q_len, 5); } else if (c->service->poll_priority == QB_LOOP_LOW) { q_len = 1; } else { q_len = QB_MIN(q_len, MAX_RECV_MSGS); } } else { q_len = 1; } return q_len; } int32_t qb_ipcs_dispatch_connection_request(int32_t fd, int32_t revents, void *data) { struct qb_ipcs_connection *c = (struct qb_ipcs_connection *)data; char bytes[MAX_RECV_MSGS]; int32_t res = 0; int32_t res2; int32_t recvd = 0; ssize_t avail; if (revents & POLLNVAL) { qb_util_log(LOG_DEBUG, "NVAL conn (%s)", c->description); res = -EINVAL; goto dispatch_cleanup; } if (revents & POLLHUP) { qb_util_log(LOG_DEBUG, "HUP conn (%s)", c->description); res = -ESHUTDOWN; goto dispatch_cleanup; } if (revents & POLLOUT) { /* try resend events now that fd can write */ res = resend_event_notifications(c); if (res < 0 && res != -EAGAIN) { errno = -res; qb_util_perror(LOG_WARNING, "resend_event_notifications (%s)", c->description); } /* nothing to read */ if ((revents & POLLIN) == 0) { res = 0; goto dispatch_cleanup; } } if (c->fc_enabled) { res = 0; goto dispatch_cleanup; } avail = _request_q_len_get(c); if (c->service->needs_sock_for_poll && avail == 0) { res2 = qb_ipc_us_recv(&c->setup, bytes, 1, 0); if (qb_ipc_us_sock_error_is_disconnected(res2)) { errno = -res2; qb_util_perror(LOG_WARNING, "conn (%s) disconnected", c->description); res = -ESHUTDOWN; goto dispatch_cleanup; } else { qb_util_log(LOG_WARNING, "conn (%s) Nothing in q but got POLLIN on fd:%d (res2:%d)", c->description, fd, res2); res = 0; goto dispatch_cleanup; } } do { res = _process_request_(c, IPC_REQUEST_TIMEOUT); if (res == -ESHUTDOWN) { goto dispatch_cleanup; } if (res > 0 || res == -ENOBUFS || res == -EINVAL) { recvd++; } if (res > 0) { avail--; } } while (avail > 0 && res > 0 && !c->fc_enabled); if (c->service->needs_sock_for_poll && recvd > 0) { res2 = qb_ipc_us_recv(&c->setup, bytes, recvd, -1); if (qb_ipc_us_sock_error_is_disconnected(res2)) { errno = -res2; qb_util_perror(LOG_ERR, "error receiving from setup sock (%s)", c->description); res = -ESHUTDOWN; goto dispatch_cleanup; } } res = QB_MIN(0, res); if (res == -EAGAIN || res == -ETIMEDOUT || res == -ENOBUFS) { res = 0; } if (res != 0) { if (res != -ENOTCONN) { /* * Abnormal state (ENOTCONN is normal shutdown). */ errno = -res; qb_util_perror(LOG_ERR, "request returned error (%s)", c->description); } } dispatch_cleanup: if (res != 0) { qb_ipcs_disconnect(c); } return res; } void qb_ipcs_context_set(struct qb_ipcs_connection *c, void *context) { if (c == NULL) { return; } c->context = context; } void * qb_ipcs_context_get(struct qb_ipcs_connection *c) { if (c == NULL) { return NULL; } return c->context; } void * qb_ipcs_connection_service_context_get(qb_ipcs_connection_t *c) { if (c == NULL || c->service == NULL) { return NULL; } return c->service->context; } int32_t qb_ipcs_connection_stats_get(qb_ipcs_connection_t * c, struct qb_ipcs_connection_stats * stats, int32_t clear_after_read) { if (c == NULL) { return -EINVAL; } memcpy(stats, &c->stats, sizeof(struct qb_ipcs_connection_stats)); if (clear_after_read) { memset(&c->stats, 0, sizeof(struct qb_ipcs_connection_stats_2)); c->stats.client_pid = c->pid; } return 0; } struct qb_ipcs_connection_stats_2* qb_ipcs_connection_stats_get_2(qb_ipcs_connection_t *c, int32_t clear_after_read) { struct qb_ipcs_connection_stats_2 * stats; if (c == NULL) { errno = EINVAL; return NULL; } stats = calloc(1, sizeof(struct qb_ipcs_connection_stats_2)); if (stats == NULL) { return NULL; } memcpy(stats, &c->stats, sizeof(struct qb_ipcs_connection_stats_2)); if (c->service->funcs.q_len_get) { stats->event_q_length = c->service->funcs.q_len_get(&c->event); } else { stats->event_q_length = 0; } if (clear_after_read) { memset(&c->stats, 0, sizeof(struct qb_ipcs_connection_stats_2)); c->stats.client_pid = c->pid; } return stats; } int32_t qb_ipcs_stats_get(struct qb_ipcs_service * s, struct qb_ipcs_stats * stats, int32_t clear_after_read) { if (s == NULL) { return -EINVAL; } memcpy(stats, &s->stats, sizeof(struct qb_ipcs_stats)); if (clear_after_read) { memset(&s->stats, 0, sizeof(struct qb_ipcs_stats)); } return 0; } void qb_ipcs_connection_auth_set(qb_ipcs_connection_t *c, uid_t uid, gid_t gid, mode_t mode) { if (c) { c->auth.uid = uid; c->auth.gid = gid; c->auth.mode = mode; } } + +int32_t +qb_ipcs_connection_get_buffer_size(qb_ipcs_connection_t *c) +{ + if (c == NULL) { + return -EINVAL; + } + + /* request, response, and event shoud all have the same + * buffer size allocated. It doesn't matter which we return + * here. */ + return c->response.max_msg_size; +} diff --git a/lib/log_blackbox.c b/lib/log_blackbox.c index 8f42457..cac6787 100644 --- a/lib/log_blackbox.c +++ b/lib/log_blackbox.c @@ -1,289 +1,297 @@ /* * Copyright (C) 2011 Red Hat, Inc. * * All rights reserved. * * Author: Angus Salkeld * * libqb is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation, either version 2.1 of the License, or * (at your option) any later version. * * libqb is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with libqb. If not, see . */ #include "os_base.h" #include #include "util_int.h" #include "log_int.h" #define BB_MIN_ENTRY_SIZE (4 * sizeof(uint32_t) +\ sizeof(uint8_t) +\ 2 * sizeof(char) + sizeof(time_t)) static void _blackbox_reload(int32_t target) { struct qb_log_target *t = qb_log_target_get(target); if (t->instance == NULL) { return; } qb_rb_close(t->instance); t->instance = qb_rb_open(t->filename, t->size, QB_RB_FLAG_CREATE | QB_RB_FLAG_OVERWRITE, 0); } /* file lineno * tags * priority * function name length * function name * buffer length * buffer */ static void _blackbox_vlogger(int32_t target, struct qb_log_callsite *cs, time_t timestamp, va_list ap) { size_t max_size; size_t actual_size; uint32_t fn_size; char *chunk; char *msg_len_pt; uint32_t msg_len; struct qb_log_target *t = qb_log_target_get(target); if (t->instance == NULL) { return; } fn_size = strlen(cs->function) + 1; actual_size = 4 * sizeof(uint32_t) + sizeof(uint8_t) + fn_size + sizeof(time_t); max_size = actual_size + QB_LOG_MAX_LEN; chunk = qb_rb_chunk_alloc(t->instance, max_size); + if (chunk == NULL) { + /* something bad has happened. abort blackbox logging */ + qb_util_perror(LOG_ERR, "Blackbox allocation error, aborting blackbox log %s", t->filename); + qb_rb_close(t->instance); + t->instance = NULL; + return; + } + /* line number */ memcpy(chunk, &cs->lineno, sizeof(uint32_t)); chunk += sizeof(uint32_t); /* tags */ memcpy(chunk, &cs->tags, sizeof(uint32_t)); chunk += sizeof(uint32_t); /* log level/priority */ memcpy(chunk, &cs->priority, sizeof(uint8_t)); chunk += sizeof(uint8_t); /* function name */ memcpy(chunk, &fn_size, sizeof(uint32_t)); chunk += sizeof(uint32_t); memcpy(chunk, cs->function, fn_size); chunk += fn_size; /* timestamp */ memcpy(chunk, ×tamp, sizeof(time_t)); chunk += sizeof(time_t); /* log message length */ msg_len_pt = chunk; chunk += sizeof(uint32_t); /* log message */ msg_len = qb_vsnprintf_serialize(chunk, QB_LOG_MAX_LEN, cs->format, ap); if (msg_len >= QB_LOG_MAX_LEN) { chunk = msg_len_pt + sizeof(uint32_t); /* Reset */ msg_len = qb_vsnprintf_serialize(chunk, QB_LOG_MAX_LEN, "Log message too long to be stored in the blackbox. "\ "Maximum is QB_LOG_MAX_LEN" , ap); actual_size += msg_len; } actual_size += msg_len; /* now that we know the length, write it */ memcpy(msg_len_pt, &msg_len, sizeof(uint32_t)); (void)qb_rb_chunk_commit(t->instance, actual_size); } static void _blackbox_close(int32_t target) { struct qb_log_target *t = qb_log_target_get(target); if (t->instance) { qb_rb_close(t->instance); t->instance = NULL; } } int32_t qb_log_blackbox_open(struct qb_log_target *t) { if (t->size < 1024) { return -EINVAL; } snprintf(t->filename, PATH_MAX, "%s-%d-blackbox", t->name, getpid()); t->instance = qb_rb_open(t->filename, t->size, QB_RB_FLAG_CREATE | QB_RB_FLAG_OVERWRITE, 0); if (t->instance == NULL) { return -errno; } t->logger = NULL; t->vlogger = _blackbox_vlogger; t->reload = _blackbox_reload; t->close = _blackbox_close; return 0; } ssize_t qb_log_blackbox_write_to_file(const char *filename) { ssize_t written_size = 0; struct qb_log_target *t; int fd = open(filename, O_CREAT | O_RDWR, 0700); if (fd < 0) { return -errno; } t = qb_log_target_get(QB_LOG_BLACKBOX); if (t->instance) { written_size = qb_rb_write_to_file(t->instance, fd); } else { written_size = -ENOENT; } close(fd); return written_size; } void qb_log_blackbox_print_from_file(const char *bb_filename) { qb_ringbuffer_t *instance; ssize_t bytes_read; int max_size = 2 * QB_LOG_MAX_LEN; char *chunk; int fd; char time_buf[64]; fd = open(bb_filename, 0); if (fd < 0) { qb_util_perror(LOG_ERR, "qb_log_blackbox_print_from_file"); return; } instance = qb_rb_create_from_file(fd, 0); close(fd); if (instance == NULL) { return; } chunk = malloc(max_size); do { char *ptr; uint32_t lineno; uint32_t tags; uint8_t priority; uint32_t fn_size; char *function; uint32_t len; time_t timestamp; uint32_t msg_len; struct tm *tm; char message[QB_LOG_MAX_LEN]; bytes_read = qb_rb_chunk_read(instance, chunk, max_size, 0); if (bytes_read >= 0 && bytes_read < BB_MIN_ENTRY_SIZE) { printf("ERROR Corrupt file: blackbox header too small.\n"); goto cleanup; } else if (bytes_read < 0) { errno = -bytes_read; perror("ERROR: qb_rb_chunk_read failed"); goto cleanup; } ptr = chunk; /* lineno */ memcpy(&lineno, ptr, sizeof(uint32_t)); ptr += sizeof(uint32_t); /* tags */ memcpy(&tags, ptr, sizeof(uint32_t)); ptr += sizeof(uint32_t); /* priority */ memcpy(&priority, ptr, sizeof(uint8_t)); ptr += sizeof(uint8_t); /* function size & name */ memcpy(&fn_size, ptr, sizeof(uint32_t)); if ((fn_size + BB_MIN_ENTRY_SIZE) > bytes_read) { printf("ERROR Corrupt file: fn_size way too big %d\n", fn_size); goto cleanup; } if (fn_size <= 0) { printf("ERROR Corrupt file: fn_size negative %d\n", fn_size); goto cleanup; } ptr += sizeof(uint32_t); function = ptr; ptr += fn_size; /* timestamp size & content */ memcpy(×tamp, ptr, sizeof(time_t)); ptr += sizeof(time_t); tm = localtime(×tamp); if (tm) { (void)strftime(time_buf, sizeof(time_buf), "%b %d %T", tm); } else { snprintf(time_buf, sizeof(time_buf), "%ld", (long int)timestamp); } /* message length */ memcpy(&msg_len, ptr, sizeof(uint32_t)); if (msg_len > QB_LOG_MAX_LEN || msg_len <= 0) { printf("ERROR Corrupt file: msg_len out of bounds %d\n", msg_len); goto cleanup; } ptr += sizeof(uint32_t); /* message content */ len = qb_vsnprintf_deserialize(message, QB_LOG_MAX_LEN, ptr); assert(len > 0); message[len] = '\0'; len--; while (len > 0 && (message[len] == '\n' || message[len] == '\0')) { message[len] = '\0'; len--; } printf("%-7s %s %s(%u):%u: %s\n", qb_log_priority2str(priority), time_buf, function, lineno, tags, message); } while (bytes_read > BB_MIN_ENTRY_SIZE); cleanup: qb_rb_close(instance); free(chunk); } diff --git a/lib/ringbuffer.c b/lib/ringbuffer.c index 9bb56a0..110aee8 100644 --- a/lib/ringbuffer.c +++ b/lib/ringbuffer.c @@ -1,954 +1,956 @@ /* * Copyright (C) 2010-2011 Red Hat, Inc. * * Author: Angus Salkeld * * This file is part of libqb. * * libqb is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation, either version 2.1 of the License, or * (at your option) any later version. * * libqb is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with libqb. If not, see . */ #include "ringbuffer_int.h" #include #include "atomic_int.h" #define QB_RB_FILE_HEADER_VERSION 1 /* * #define CRAZY_DEBUG_PRINTFS 1 */ #ifdef CRAZY_DEBUG_PRINTFS #define DEBUG_PRINTF(format, args...) \ do { \ printf(format, ##args); \ } while(0) #else #define DEBUG_PRINTF(format, args...) #endif /* CRAZY_DEBUG_PRINTFS */ /* * move the write pointer to the next 128 byte boundary * write_pt goes in 4 bytes (sizeof(uint32_t)) * #define USE_CACHE_LINE_ALIGNMENT 1 */ #ifdef USE_CACHE_LINE_ALIGNMENT #define QB_CACHE_LINE_SIZE 128 #define QB_CACHE_LINE_WORDS (QB_CACHE_LINE_SIZE/sizeof(uint32_t)) #define idx_cache_line_step(idx) \ do { \ if (idx % QB_CACHE_LINE_WORDS) { \ idx += (QB_CACHE_LINE_WORDS - (idx % QB_CACHE_LINE_WORDS)); \ } \ if (idx > (rb->shared_hdr->word_size - 1)) { \ idx = ((idx) % (rb->shared_hdr->word_size)); \ } \ } while (0) #else #define QB_CACHE_LINE_SIZE 0 #define QB_CACHE_LINE_WORDS 0 #define idx_cache_line_step(idx) \ do { \ if (idx > (rb->shared_hdr->word_size - 1)) { \ idx = ((idx) % (rb->shared_hdr->word_size)); \ } \ } while (0) #endif /* the chunk header is two words * 1) the chunk data size * 2) the magic number */ #define QB_RB_CHUNK_HEADER_WORDS 2 #define QB_RB_CHUNK_HEADER_SIZE (sizeof(uint32_t) * QB_RB_CHUNK_HEADER_WORDS) /* * margin is the gap we leave when checking to see if we have enough * space for a new chunk. * So: * qb_rb_space_free() >= QB_RB_CHUNK_MARGIN + new data chunk * The extra word size is to allow for non word sized data chunks. * QB_CACHE_LINE_WORDS is to make sure we have space to align the * chunk. */ #define QB_RB_WORD_ALIGN 1 #define QB_RB_CHUNK_MARGIN (sizeof(uint32_t) * (QB_RB_CHUNK_HEADER_WORDS +\ QB_RB_WORD_ALIGN +\ QB_CACHE_LINE_WORDS)) #define QB_RB_CHUNK_MAGIC 0xA1A1A1A1 #define QB_RB_CHUNK_MAGIC_DEAD 0xD0D0D0D0 #define QB_RB_CHUNK_MAGIC_ALLOC 0xA110CED0 #define QB_RB_CHUNK_SIZE_GET(rb, pointer) rb->shared_data[pointer] #define QB_RB_CHUNK_MAGIC_GET(rb, pointer) \ qb_atomic_int_get_ex((int32_t*)&rb->shared_data[(pointer + 1) % rb->shared_hdr->word_size], \ QB_ATOMIC_ACQUIRE) #define QB_RB_CHUNK_MAGIC_SET(rb, pointer, new_val) \ qb_atomic_int_set_ex((int32_t*)&rb->shared_data[(pointer + 1) % rb->shared_hdr->word_size], \ new_val, QB_ATOMIC_RELEASE) #define QB_RB_CHUNK_DATA_GET(rb, pointer) \ &rb->shared_data[(pointer + QB_RB_CHUNK_HEADER_WORDS) % rb->shared_hdr->word_size] #define QB_MAGIC_ASSERT(_ptr_) \ do { \ uint32_t chunk_magic = QB_RB_CHUNK_MAGIC_GET(rb, _ptr_); \ if (chunk_magic != QB_RB_CHUNK_MAGIC) print_header(rb); \ assert(chunk_magic == QB_RB_CHUNK_MAGIC); \ } while (0) #define idx_step(idx) \ do { \ if (idx > (rb->shared_hdr->word_size - 1)) { \ idx = ((idx) % (rb->shared_hdr->word_size)); \ } \ } while (0) static void print_header(struct qb_ringbuffer_s * rb); static int _rb_chunk_reclaim(struct qb_ringbuffer_s * rb); qb_ringbuffer_t * qb_rb_open(const char *name, size_t size, uint32_t flags, size_t shared_user_data_size) { return qb_rb_open_2(name, size, flags, shared_user_data_size, NULL); } qb_ringbuffer_t * qb_rb_open_2(const char *name, size_t size, uint32_t flags, size_t shared_user_data_size, struct qb_rb_notifier *notifiers) { struct qb_ringbuffer_s *rb; size_t real_size; size_t shared_size; char path[PATH_MAX]; int32_t fd_hdr; int32_t fd_data; uint32_t file_flags = O_RDWR; char filename[PATH_MAX]; int32_t error = 0; void *shm_addr; long page_size = sysconf(_SC_PAGESIZE); #ifdef QB_FORCE_SHM_ALIGN page_size = QB_MAX(page_size, 16 * 1024); #endif /* QB_FORCE_SHM_ALIGN */ /* The user of this api expects the 'size' parameter passed into this function * to be reflective of the max size single write we can do to the * ringbuffer. This means we have to add both the 'margin' space used * to calculate if there is enough space for a new chunk as well as the '+1' that * prevents overlap of the read/write pointers */ size += QB_RB_CHUNK_MARGIN + 1; real_size = QB_ROUNDUP(size, page_size); shared_size = sizeof(struct qb_ringbuffer_shared_s) + shared_user_data_size; if (flags & QB_RB_FLAG_CREATE) { file_flags |= O_CREAT | O_TRUNC; } rb = calloc(1, sizeof(struct qb_ringbuffer_s)); if (rb == NULL) { return NULL; } /* * Create a shared_hdr memory segment for the header. */ snprintf(filename, PATH_MAX, "qb-%s-header", name); fd_hdr = qb_sys_mmap_file_open(path, filename, shared_size, file_flags); if (fd_hdr < 0) { error = fd_hdr; qb_util_log(LOG_ERR, "couldn't create file for mmap"); goto cleanup_hdr; } rb->shared_hdr = mmap(0, shared_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd_hdr, 0); if (rb->shared_hdr == MAP_FAILED) { error = -errno; qb_util_log(LOG_ERR, "couldn't create mmap for header"); goto cleanup_hdr; } qb_atomic_init(); rb->flags = flags; /* * create the semaphore */ if (flags & QB_RB_FLAG_CREATE) { rb->shared_data = NULL; /* rb->shared_hdr->word_size tracks data by ints and not bytes/chars. */ rb->shared_hdr->word_size = real_size / sizeof(uint32_t); rb->shared_hdr->write_pt = 0; rb->shared_hdr->read_pt = 0; (void)strlcpy(rb->shared_hdr->hdr_path, path, PATH_MAX); } if (notifiers && notifiers->post_fn) { error = 0; memcpy(&rb->notifier, notifiers, sizeof(struct qb_rb_notifier)); } else { error = qb_rb_sem_create(rb, flags); } if (error < 0) { errno = -error; qb_util_perror(LOG_ERR, "couldn't create a semaphore"); goto cleanup_hdr; } /* Create the shared_data memory segment for the actual ringbuffer. * They have to be separate. */ if (flags & QB_RB_FLAG_CREATE) { snprintf(filename, PATH_MAX, "qb-%s-data", name); fd_data = qb_sys_mmap_file_open(path, filename, real_size, file_flags); (void)strlcpy(rb->shared_hdr->data_path, path, PATH_MAX); } else { fd_data = qb_sys_mmap_file_open(path, rb->shared_hdr->data_path, real_size, file_flags); } if (fd_data < 0) { error = fd_data; qb_util_log(LOG_ERR, "couldn't create file for mmap"); goto cleanup_hdr; } qb_util_log(LOG_DEBUG, "shm size:%zd; real_size:%zd; rb->word_size:%d", size, real_size, rb->shared_hdr->word_size); /* this function closes fd_data */ error = qb_sys_circular_mmap(fd_data, &shm_addr, real_size); rb->shared_data = shm_addr; if (error != 0) { qb_util_log(LOG_ERR, "couldn't create circular mmap on %s", rb->shared_hdr->data_path); goto cleanup_data; } if (flags & QB_RB_FLAG_CREATE) { memset(rb->shared_data, 0, real_size); rb->shared_data[rb->shared_hdr->word_size] = 5; rb->shared_hdr->ref_count = 1; } else { qb_atomic_int_inc(&rb->shared_hdr->ref_count); } close(fd_hdr); return rb; cleanup_data: if (flags & QB_RB_FLAG_CREATE) { unlink(rb->shared_hdr->data_path); } cleanup_hdr: if (fd_hdr >= 0) { close(fd_hdr); } if (rb && (flags & QB_RB_FLAG_CREATE)) { unlink(rb->shared_hdr->hdr_path); if (rb->notifier.destroy_fn) { (void)rb->notifier.destroy_fn(rb->notifier.instance); } } if (rb && (rb->shared_hdr != MAP_FAILED && rb->shared_hdr != NULL)) { munmap(rb->shared_hdr, sizeof(struct qb_ringbuffer_shared_s)); } free(rb); errno = -error; return NULL; } void qb_rb_close(struct qb_ringbuffer_s * rb) { if (rb == NULL) { return; } qb_enter(); (void)qb_atomic_int_dec_and_test(&rb->shared_hdr->ref_count); if (rb->flags & QB_RB_FLAG_CREATE) { if (rb->notifier.destroy_fn) { (void)rb->notifier.destroy_fn(rb->notifier.instance); } unlink(rb->shared_hdr->data_path); unlink(rb->shared_hdr->hdr_path); qb_util_log(LOG_DEBUG, "Free'ing ringbuffer: %s", rb->shared_hdr->hdr_path); } else { qb_util_log(LOG_DEBUG, "Closing ringbuffer: %s", rb->shared_hdr->hdr_path); } munmap(rb->shared_data, (rb->shared_hdr->word_size * sizeof(uint32_t)) << 1); munmap(rb->shared_hdr, sizeof(struct qb_ringbuffer_shared_s)); free(rb); } void qb_rb_force_close(struct qb_ringbuffer_s * rb) { if (rb == NULL) { return; } qb_enter(); if (rb->notifier.destroy_fn) { (void)rb->notifier.destroy_fn(rb->notifier.instance); } errno = 0; unlink(rb->shared_hdr->data_path); qb_util_perror(LOG_DEBUG, "Force free'ing ringbuffer: %s", rb->shared_hdr->data_path); errno = 0; unlink(rb->shared_hdr->hdr_path); qb_util_perror(LOG_DEBUG, "Force free'ing ringbuffer: %s", rb->shared_hdr->hdr_path); munmap(rb->shared_data, (rb->shared_hdr->word_size * sizeof(uint32_t)) << 1); munmap(rb->shared_hdr, sizeof(struct qb_ringbuffer_shared_s)); free(rb); } char * qb_rb_name_get(struct qb_ringbuffer_s * rb) { if (rb == NULL) { return NULL; } return rb->shared_hdr->hdr_path; } void * qb_rb_shared_user_data_get(struct qb_ringbuffer_s * rb) { if (rb == NULL) { return NULL; } return rb->shared_hdr->user_data; } int32_t qb_rb_refcount_get(struct qb_ringbuffer_s * rb) { if (rb == NULL) { return -EINVAL; } return qb_atomic_int_get(&rb->shared_hdr->ref_count); } ssize_t qb_rb_space_free(struct qb_ringbuffer_s * rb) { uint32_t write_size; uint32_t read_size; size_t space_free = 0; if (rb == NULL) { return -EINVAL; } if (rb->notifier.space_used_fn) { return (rb->shared_hdr->word_size * sizeof(uint32_t)) - rb->notifier.space_used_fn(rb->notifier.instance); } write_size = rb->shared_hdr->write_pt; read_size = rb->shared_hdr->read_pt; if (write_size > read_size) { space_free = (read_size - write_size + rb->shared_hdr->word_size) - 1; } else if (write_size < read_size) { space_free = (read_size - write_size) - 1; } else { if (rb->notifier.q_len_fn && rb->notifier.q_len_fn(rb->notifier.instance) > 0) { space_free = 0; } else { space_free = rb->shared_hdr->word_size; } } /* word -> bytes */ return (space_free * sizeof(uint32_t)); } ssize_t qb_rb_space_used(struct qb_ringbuffer_s * rb) { uint32_t write_size; uint32_t read_size; size_t space_used; if (rb == NULL) { return -EINVAL; } if (rb->notifier.space_used_fn) { return rb->notifier.space_used_fn(rb->notifier.instance); } write_size = rb->shared_hdr->write_pt; read_size = rb->shared_hdr->read_pt; if (write_size > read_size) { space_used = write_size - read_size; } else if (write_size < read_size) { space_used = (write_size - read_size + rb->shared_hdr->word_size) - 1; } else { space_used = 0; } /* word -> bytes */ return (space_used * sizeof(uint32_t)); } ssize_t qb_rb_chunks_used(struct qb_ringbuffer_s *rb) { if (rb == NULL) { return -EINVAL; } if (rb->notifier.q_len_fn) { return rb->notifier.q_len_fn(rb->notifier.instance); } return -ENOTSUP; } void * qb_rb_chunk_alloc(struct qb_ringbuffer_s * rb, size_t len) { uint32_t write_pt; if (rb == NULL) { errno = EINVAL; return NULL; } /* * Reclaim data if we are over writing and we need space */ if (rb->flags & QB_RB_FLAG_OVERWRITE) { while (qb_rb_space_free(rb) < (len + QB_RB_CHUNK_MARGIN)) { int rc = _rb_chunk_reclaim(rb); - /* reclaim failure during overwrite results in an abort. */ - assert(rc == 0); + if (rc != 0) { + errno = rc; + return NULL; + } } } else { if (qb_rb_space_free(rb) < (len + QB_RB_CHUNK_MARGIN)) { errno = EAGAIN; return NULL; } } write_pt = rb->shared_hdr->write_pt; /* * insert the chunk header */ rb->shared_data[write_pt] = 0; QB_RB_CHUNK_MAGIC_SET(rb, write_pt, QB_RB_CHUNK_MAGIC_ALLOC); /* * return a pointer to the beginning of the chunk data */ return (void *)QB_RB_CHUNK_DATA_GET(rb, write_pt); } static uint32_t qb_rb_chunk_step(struct qb_ringbuffer_s * rb, uint32_t pointer) { uint32_t chunk_size = QB_RB_CHUNK_SIZE_GET(rb, pointer); /* * skip over the chunk header */ pointer += QB_RB_CHUNK_HEADER_WORDS; /* * skip over the user's data. */ pointer += (chunk_size / sizeof(uint32_t)); /* make allowance for non-word sizes */ if ((chunk_size % (sizeof(uint32_t) * QB_RB_WORD_ALIGN)) != 0) { pointer++; } idx_cache_line_step(pointer); return pointer; } int32_t qb_rb_chunk_commit(struct qb_ringbuffer_s * rb, size_t len) { uint32_t old_write_pt; if (rb == NULL) { return -EINVAL; } /* * commit the magic & chunk_size */ old_write_pt = rb->shared_hdr->write_pt; rb->shared_data[old_write_pt] = len; /* * commit the new write pointer */ rb->shared_hdr->write_pt = qb_rb_chunk_step(rb, old_write_pt); QB_RB_CHUNK_MAGIC_SET(rb, old_write_pt, QB_RB_CHUNK_MAGIC); DEBUG_PRINTF("commit [%zd] read: %u, write: %u -> %u (%u)\n", (rb->notifier.q_len_fn ? rb->notifier.q_len_fn(rb->notifier.instance) : 0), rb->shared_hdr->read_pt, old_write_pt, rb->shared_hdr->write_pt, rb->shared_hdr->word_size); /* * post the notification to the reader */ if (rb->notifier.post_fn) { return rb->notifier.post_fn(rb->notifier.instance, len); } return 0; } ssize_t qb_rb_chunk_write(struct qb_ringbuffer_s * rb, const void *data, size_t len) { char *dest = qb_rb_chunk_alloc(rb, len); int32_t res = 0; if (rb == NULL) { return -EINVAL; } if (dest == NULL) { return -errno; } memcpy(dest, data, len); res = qb_rb_chunk_commit(rb, len); if (res < 0) { return res; } return len; } static int _rb_chunk_reclaim(struct qb_ringbuffer_s * rb) { uint32_t old_read_pt; uint32_t new_read_pt; uint32_t old_chunk_size; uint32_t chunk_magic; int rc = 0; old_read_pt = rb->shared_hdr->read_pt; chunk_magic = QB_RB_CHUNK_MAGIC_GET(rb, old_read_pt); if (chunk_magic != QB_RB_CHUNK_MAGIC) { return -EINVAL; } old_chunk_size = QB_RB_CHUNK_SIZE_GET(rb, old_read_pt); new_read_pt = qb_rb_chunk_step(rb, old_read_pt); /* * clear the header */ rb->shared_data[old_read_pt] = 0; QB_RB_CHUNK_MAGIC_SET(rb, old_read_pt, QB_RB_CHUNK_MAGIC_DEAD); /* * set the new read pointer after clearing the header * to prevent a situation where a fast writer will write their * new chunk between setting the new read pointer and clearing the * header. */ rb->shared_hdr->read_pt = new_read_pt; if (rb->notifier.reclaim_fn) { rc = rb->notifier.reclaim_fn(rb->notifier.instance, old_chunk_size); if (rc < 0) { errno = -rc; qb_util_perror(LOG_WARNING, "reclaim_fn"); } } DEBUG_PRINTF("reclaim [%zd]: read: %u -> %u, write: %u\n", (rb->notifier.q_len_fn ? rb->notifier.q_len_fn(rb->notifier.instance) : 0), old_read_pt, rb->shared_hdr->read_pt, rb->shared_hdr->write_pt); return rc; } void qb_rb_chunk_reclaim(struct qb_ringbuffer_s * rb) { if (rb == NULL) { return; } _rb_chunk_reclaim(rb); } ssize_t qb_rb_chunk_peek(struct qb_ringbuffer_s * rb, void **data_out, int32_t timeout) { uint32_t read_pt; uint32_t chunk_size; uint32_t chunk_magic; int32_t res = 0; if (rb == NULL) { return -EINVAL; } if (rb->notifier.timedwait_fn) { res = rb->notifier.timedwait_fn(rb->notifier.instance, timeout); } if (res < 0 && res != -EIDRM) { if (res == -ETIMEDOUT) { return 0; } else { errno = -res; qb_util_perror(LOG_ERR, "sem_timedwait"); } return res; } read_pt = rb->shared_hdr->read_pt; chunk_magic = QB_RB_CHUNK_MAGIC_GET(rb, read_pt); if (chunk_magic != QB_RB_CHUNK_MAGIC) { if (rb->notifier.post_fn) { (void)rb->notifier.post_fn(rb->notifier.instance, res); } return 0; } chunk_size = QB_RB_CHUNK_SIZE_GET(rb, read_pt); *data_out = QB_RB_CHUNK_DATA_GET(rb, read_pt); return chunk_size; } ssize_t qb_rb_chunk_read(struct qb_ringbuffer_s * rb, void *data_out, size_t len, int32_t timeout) { uint32_t read_pt; uint32_t chunk_size; uint32_t chunk_magic; int32_t res = 0; if (rb == NULL) { return -EINVAL; } if (rb->notifier.timedwait_fn) { res = rb->notifier.timedwait_fn(rb->notifier.instance, timeout); } if (res < 0 && res != -EIDRM) { if (res != -ETIMEDOUT) { errno = -res; qb_util_perror(LOG_ERR, "sem_timedwait"); } return res; } read_pt = rb->shared_hdr->read_pt; chunk_magic = QB_RB_CHUNK_MAGIC_GET(rb, read_pt); if (chunk_magic != QB_RB_CHUNK_MAGIC) { if (rb->notifier.timedwait_fn == NULL) { return -ETIMEDOUT; } else { (void)rb->notifier.post_fn(rb->notifier.instance, res); #ifdef EBADMSG return -EBADMSG; #else return -EINVAL; #endif } } chunk_size = QB_RB_CHUNK_SIZE_GET(rb, read_pt); if (len < chunk_size) { qb_util_log(LOG_ERR, "trying to recv chunk of size %d but %d available", len, chunk_size); if (rb->notifier.post_fn) { (void)rb->notifier.post_fn(rb->notifier.instance, chunk_size); } return -ENOBUFS; } memcpy(data_out, QB_RB_CHUNK_DATA_GET(rb, read_pt), chunk_size); _rb_chunk_reclaim(rb); return chunk_size; } static void print_header(struct qb_ringbuffer_s * rb) { printf("Ringbuffer: \n"); if (rb->flags & QB_RB_FLAG_OVERWRITE) { printf(" ->OVERWRITE\n"); } else { printf(" ->NORMAL\n"); } printf(" ->write_pt [%d]\n", rb->shared_hdr->write_pt); printf(" ->read_pt [%d]\n", rb->shared_hdr->read_pt); printf(" ->size [%d words]\n", rb->shared_hdr->word_size); #ifndef S_SPLINT_S printf(" =>free [%zu bytes]\n", qb_rb_space_free(rb)); printf(" =>used [%zu bytes]\n", qb_rb_space_used(rb)); #endif /* S_SPLINT_S */ } /* * FILE HEADER ORDER * 1. word_size * 2. write_pt * 3. read_pt * 4. version * 5. header_hash * * 6. data */ ssize_t qb_rb_write_to_file(struct qb_ringbuffer_s * rb, int32_t fd) { ssize_t result; ssize_t written_size = 0; uint32_t hash = 0; uint32_t version = QB_RB_FILE_HEADER_VERSION; if (rb == NULL) { return -EINVAL; } print_header(rb); /* * 1. word_size */ result = write(fd, &rb->shared_hdr->word_size, sizeof(uint32_t)); if (result != sizeof(uint32_t)) { return -errno; } written_size += result; /* * 2. 3. store the read & write pointers */ result = write(fd, (void *)&rb->shared_hdr->write_pt, sizeof(uint32_t)); if (result != sizeof(uint32_t)) { return -errno; } written_size += result; result = write(fd, (void *)&rb->shared_hdr->read_pt, sizeof(uint32_t)); if (result != sizeof(uint32_t)) { return -errno; } written_size += result; /* * 4. version used */ result = write(fd, &version, sizeof(uint32_t)); if (result != sizeof(uint32_t)) { return -errno; } written_size += result; /* * 5. hash helps us verify header is not corrupted on file read */ hash = rb->shared_hdr->word_size + rb->shared_hdr->write_pt + rb->shared_hdr->read_pt + QB_RB_FILE_HEADER_VERSION; result = write(fd, &hash, sizeof(uint32_t)); if (result != sizeof(uint32_t)) { return -errno; } written_size += result; result = write(fd, rb->shared_data, rb->shared_hdr->word_size * sizeof(uint32_t)); if (result != rb->shared_hdr->word_size * sizeof(uint32_t)) { return -errno; } written_size += result; qb_util_log(LOG_DEBUG, " writing total of: %zd\n", written_size); return written_size; } qb_ringbuffer_t * qb_rb_create_from_file(int32_t fd, uint32_t flags) { ssize_t n_read; size_t n_required; size_t total_read = 0; uint32_t read_pt; uint32_t write_pt; struct qb_ringbuffer_s *rb; uint32_t word_size = 0; uint32_t version = 0; uint32_t hash = 0; uint32_t calculated_hash = 0; if (fd < 0) { return NULL; } /* * 1. word size */ n_required = sizeof(uint32_t); n_read = read(fd, &word_size, n_required); if (n_read != n_required) { qb_util_perror(LOG_ERR, "Unable to read blackbox file header"); return NULL; } total_read += n_read; /* * 2. 3. read & write pointers */ n_read = read(fd, &write_pt, sizeof(uint32_t)); assert(n_read == sizeof(uint32_t)); total_read += n_read; n_read = read(fd, &read_pt, sizeof(uint32_t)); assert(n_read == sizeof(uint32_t)); total_read += n_read; /* * 4. version */ n_required = sizeof(uint32_t); n_read = read(fd, &version, n_required); if (n_read != n_required) { qb_util_perror(LOG_ERR, "Unable to read blackbox file header"); return NULL; } total_read += n_read; /* * 5. Hash */ n_required = sizeof(uint32_t); n_read = read(fd, &hash, n_required); if (n_read != n_required) { qb_util_perror(LOG_ERR, "Unable to read blackbox file header"); return NULL; } total_read += n_read; calculated_hash = word_size + write_pt + read_pt + version; if (hash != calculated_hash) { qb_util_log(LOG_ERR, "Corrupt blackbox: File header hash (%d) does not match calculated hash (%d)", hash, calculated_hash); return NULL; } else if (version != QB_RB_FILE_HEADER_VERSION) { qb_util_log(LOG_ERR, "Wrong file header version. Expected %d got %d", QB_RB_FILE_HEADER_VERSION, version); return NULL; } /* * 6. data */ n_required = (word_size * sizeof(uint32_t)); rb = qb_rb_open("create_from_file", n_required, QB_RB_FLAG_CREATE | QB_RB_FLAG_NO_SEMAPHORE, 0); if (rb == NULL) { return NULL; } rb->shared_hdr->read_pt = read_pt; rb->shared_hdr->write_pt = write_pt; n_read = read(fd, rb->shared_data, n_required); if (n_read < 0) { qb_util_perror(LOG_ERR, "Unable to read blackbox file data"); goto cleanup_fail; } total_read += n_read; if (n_read != n_required) { qb_util_log(LOG_WARNING, "read %zd bytes, but expected %zu", n_read, n_required); goto cleanup_fail; } qb_util_log(LOG_DEBUG, "read total of: %zd", total_read); print_header(rb); return rb; cleanup_fail: qb_rb_close(rb); return NULL; } int32_t qb_rb_chown(struct qb_ringbuffer_s * rb, uid_t owner, gid_t group) { int32_t res; if (rb == NULL) { return -EINVAL; } res = chown(rb->shared_hdr->data_path, owner, group); if (res < 0 && errno != EPERM) { return -errno; } res = chown(rb->shared_hdr->hdr_path, owner, group); if (res < 0 && errno != EPERM) { return -errno; } return 0; } int32_t qb_rb_chmod(qb_ringbuffer_t * rb, mode_t mode) { int32_t res; if (rb == NULL) { return -EINVAL; } res = chmod(rb->shared_hdr->data_path, mode); if (res < 0) { return -errno; } res = chmod(rb->shared_hdr->hdr_path, mode); if (res < 0) { return -errno; } return 0; } diff --git a/tests/check_ipc.c b/tests/check_ipc.c index ad4888f..93d0a83 100644 --- a/tests/check_ipc.c +++ b/tests/check_ipc.c @@ -1,1384 +1,1390 @@ /* * Copyright (c) 2010 Red Hat, Inc. * * All rights reserved. * * Author: Angus Salkeld * * This file is part of libqb. * * libqb is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation, either version 2.1 of the License, or * (at your option) any later version. * * libqb is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with libqb. If not, see . */ #include "os_base.h" #include #include #include #include #include #include #include #include static const char *ipc_name = "ipc_test"; #define DEFAULT_MAX_MSG_SIZE (8192*16) static int CALCULATED_DGRAM_MAX_MSG_SIZE = 0; #define DGRAM_MAX_MSG_SIZE \ (CALCULATED_DGRAM_MAX_MSG_SIZE == 0 ? \ CALCULATED_DGRAM_MAX_MSG_SIZE = qb_ipcc_verify_dgram_max_msg_size(DEFAULT_MAX_MSG_SIZE) : \ CALCULATED_DGRAM_MAX_MSG_SIZE) #define MAX_MSG_SIZE (ipc_type == QB_IPC_SOCKET ? DGRAM_MAX_MSG_SIZE : DEFAULT_MAX_MSG_SIZE) /* The size the giant msg's data field needs to be to make * this the largests msg we can successfully send. */ #define GIANT_MSG_DATA_SIZE MAX_MSG_SIZE - sizeof(struct qb_ipc_response_header) - 8 static qb_ipcc_connection_t *conn; static enum qb_ipc_type ipc_type; enum my_msg_ids { IPC_MSG_REQ_TX_RX, IPC_MSG_RES_TX_RX, IPC_MSG_REQ_DISPATCH, IPC_MSG_RES_DISPATCH, IPC_MSG_REQ_BULK_EVENTS, IPC_MSG_RES_BULK_EVENTS, IPC_MSG_REQ_STRESS_EVENT, IPC_MSG_RES_STRESS_EVENT, IPC_MSG_REQ_SERVER_FAIL, IPC_MSG_RES_SERVER_FAIL, IPC_MSG_REQ_SERVER_DISCONNECT, IPC_MSG_RES_SERVER_DISCONNECT, }; /* Test Cases * * 1) basic send & recv differnet message sizes * * 2) send message to start dispatch (confirm receipt) * * 3) flow control * * 4) authentication * * 5) thread safety * * 6) cleanup * * 7) service availabilty * * 8) multiple services */ static qb_loop_t *my_loop; static qb_ipcs_service_t* s1; static int32_t turn_on_fc = QB_FALSE; static int32_t fc_enabled = 89; static int32_t send_event_on_created = QB_FALSE; static int32_t disconnect_after_created = QB_FALSE; static int32_t num_bulk_events = 10; static int32_t num_stress_events = 30000; static int32_t reference_count_test = QB_FALSE; static int32_t exit_handler(int32_t rsignal, void *data) { qb_log(LOG_DEBUG, "caught signal %d", rsignal); qb_ipcs_destroy(s1); return -1; } static int32_t s1_msg_process_fn(qb_ipcs_connection_t *c, void *data, size_t size) { struct qb_ipc_request_header *req_pt = (struct qb_ipc_request_header *)data; struct qb_ipc_response_header response = { 0, }; ssize_t res; if (req_pt->id == IPC_MSG_REQ_TX_RX) { response.size = sizeof(struct qb_ipc_response_header); response.id = IPC_MSG_RES_TX_RX; response.error = 0; res = qb_ipcs_response_send(c, &response, response.size); if (res < 0) { qb_perror(LOG_INFO, "qb_ipcs_response_send"); } else if (res != response.size) { qb_log(LOG_DEBUG, "qb_ipcs_response_send %zd != %d", res, response.size); } if (turn_on_fc) { qb_ipcs_request_rate_limit(s1, QB_IPCS_RATE_OFF); } } else if (req_pt->id == IPC_MSG_REQ_DISPATCH) { response.size = sizeof(struct qb_ipc_response_header); response.id = IPC_MSG_RES_DISPATCH; response.error = 0; res = qb_ipcs_event_send(c, &response, sizeof(response)); if (res < 0) { qb_perror(LOG_INFO, "qb_ipcs_event_send"); } } else if (req_pt->id == IPC_MSG_REQ_BULK_EVENTS) { int32_t m; int32_t num; struct qb_ipcs_connection_stats_2 *stats; response.size = sizeof(struct qb_ipc_response_header); response.error = 0; stats = qb_ipcs_connection_stats_get_2(c, QB_FALSE); num = stats->event_q_length; free(stats); /* crazy large message */ res = qb_ipcs_event_send(c, &response, MAX_MSG_SIZE*10); ck_assert_int_eq(res, -EMSGSIZE); /* send one event before responding */ res = qb_ipcs_event_send(c, &response, sizeof(response)); ck_assert_int_eq(res, sizeof(response)); response.id++; /* send response */ response.id = IPC_MSG_RES_BULK_EVENTS; res = qb_ipcs_response_send(c, &response, response.size); ck_assert_int_eq(res, sizeof(response)); /* send the rest of the events after the response */ for (m = 1; m < num_bulk_events; m++) { res = qb_ipcs_event_send(c, &response, sizeof(response)); if (res == -EAGAIN || res == -ENOBUFS) { /* retry */ usleep(1000); m--; continue; } ck_assert_int_eq(res, sizeof(response)); response.id++; } stats = qb_ipcs_connection_stats_get_2(c, QB_FALSE); ck_assert_int_eq(stats->event_q_length - num, num_bulk_events); free(stats); } else if (req_pt->id == IPC_MSG_REQ_STRESS_EVENT) { struct { struct qb_ipc_response_header hdr __attribute__ ((aligned(8))); char data[GIANT_MSG_DATA_SIZE] __attribute__ ((aligned(8))); uint32_t sent_msgs __attribute__ ((aligned(8))); } __attribute__ ((aligned(8))) giant_event_send; int32_t m; response.size = sizeof(struct qb_ipc_response_header); response.error = 0; response.id = IPC_MSG_RES_STRESS_EVENT; res = qb_ipcs_response_send(c, &response, response.size); ck_assert_int_eq(res, sizeof(response)); giant_event_send.hdr.error = 0; giant_event_send.hdr.id = IPC_MSG_RES_STRESS_EVENT; for (m = 0; m < num_stress_events; m++) { size_t sent_len = sizeof(struct qb_ipc_response_header); if (((m+1) % 1000) == 0) { sent_len = sizeof(giant_event_send); giant_event_send.sent_msgs = m + 1; } giant_event_send.hdr.size = sent_len; res = qb_ipcs_event_send(c, &giant_event_send, sent_len); if (res < 0) { if (res == -EAGAIN || res == -ENOBUFS) { /* yield to the receive process */ usleep(1000); m--; continue; } else { qb_perror(LOG_DEBUG, "sending stress events"); ck_assert_int_eq(res, sent_len); } } else if (((m+1) % 1000) == 0) { qb_log(LOG_DEBUG, "SENT: %d stress events sent", m+1); } giant_event_send.hdr.id++; } } else if (req_pt->id == IPC_MSG_REQ_SERVER_FAIL) { exit(0); } else if (req_pt->id == IPC_MSG_REQ_SERVER_DISCONNECT) { qb_ipcs_disconnect(c); } return 0; } static int32_t my_job_add(enum qb_loop_priority p, void *data, qb_loop_job_dispatch_fn fn) { return qb_loop_job_add(my_loop, p, data, fn); } static int32_t my_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t events, void *data, qb_ipcs_dispatch_fn_t fn) { return qb_loop_poll_add(my_loop, p, fd, events, data, fn); } static int32_t my_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t events, void *data, qb_ipcs_dispatch_fn_t fn) { return qb_loop_poll_mod(my_loop, p, fd, events, data, fn); } static int32_t my_dispatch_del(int32_t fd) { return qb_loop_poll_del(my_loop, fd); } static int32_t s1_connection_closed(qb_ipcs_connection_t *c) { qb_enter(); qb_leave(); return 0; } static void outq_flush (void *data) { static int i = 0; struct cs_ipcs_conn_context *cnx; cnx = qb_ipcs_context_get(data); qb_log(LOG_DEBUG,"iter %u\n", i); i++; if (i == 2) { qb_ipcs_destroy(s1); s1 = NULL; } /* is the reference counting is not working, this should fail * for i > 1. */ qb_ipcs_event_send(data, "test", 4); assert(memcmp(cnx, "test", 4) == 0); if (i < 5) { qb_loop_job_add(my_loop, QB_LOOP_HIGH, data, outq_flush); } else { /* this single unref should clean everything up. */ qb_ipcs_connection_unref(data); qb_log(LOG_INFO, "end of test, stopping loop"); qb_loop_stop(my_loop); } } static void s1_connection_destroyed(qb_ipcs_connection_t *c) { qb_enter(); if (reference_count_test) { struct cs_ipcs_conn_context *cnx; cnx = qb_ipcs_context_get(c); free(cnx); } else { qb_loop_stop(my_loop); } qb_leave(); } static void s1_connection_created(qb_ipcs_connection_t *c) { + int32_t max = MAX_MSG_SIZE; + if (send_event_on_created) { struct qb_ipc_response_header response; int32_t res; response.size = sizeof(struct qb_ipc_response_header); response.id = IPC_MSG_RES_DISPATCH; response.error = 0; res = qb_ipcs_event_send(c, &response, sizeof(response)); ck_assert_int_eq(res, response.size); } if (reference_count_test) { struct cs_ipcs_conn_context *context; qb_ipcs_connection_ref(c); qb_loop_job_add(my_loop, QB_LOOP_HIGH, c, outq_flush); context = calloc(1, 20); memcpy(context, "test", 4); qb_ipcs_context_set(c, context); } + + + ck_assert_int_eq(max, qb_ipcs_connection_get_buffer_size(c)); + } static void run_ipc_server(void) { int32_t res; qb_loop_signal_handle handle; struct qb_ipcs_service_handlers sh = { .connection_accept = NULL, .connection_created = s1_connection_created, .msg_process = s1_msg_process_fn, .connection_destroyed = s1_connection_destroyed, .connection_closed = s1_connection_closed, }; struct qb_ipcs_poll_handlers ph = { .job_add = my_job_add, .dispatch_add = my_dispatch_add, .dispatch_mod = my_dispatch_mod, .dispatch_del = my_dispatch_del, }; qb_loop_signal_add(my_loop, QB_LOOP_HIGH, SIGSTOP, NULL, exit_handler, &handle); qb_loop_signal_add(my_loop, QB_LOOP_HIGH, SIGTERM, NULL, exit_handler, &handle); my_loop = qb_loop_create(); s1 = qb_ipcs_create(ipc_name, 4, ipc_type, &sh); fail_if(s1 == 0); qb_ipcs_poll_handlers_set(s1, &ph); res = qb_ipcs_run(s1); ck_assert_int_eq(res, 0); qb_loop_run(my_loop); qb_log(LOG_DEBUG, "loop finished - done ..."); } static int32_t run_function_in_new_process(void (*run_ipc_server_fn)(void)) { pid_t pid = fork (); if (pid == -1) { fprintf (stderr, "Can't fork\n"); return -1; } if (pid == 0) { run_ipc_server_fn(); return 0; } return pid; } static int32_t stop_process(pid_t pid) { /* wait a bit for the server to shutdown by it's self */ usleep(100000); kill(pid, SIGTERM); waitpid(pid, NULL, 0); return 0; } struct my_req { struct qb_ipc_request_header hdr; char message[1024 * 1024]; }; static struct my_req request; static int32_t send_and_check(int32_t req_id, uint32_t size, int32_t ms_timeout, int32_t expect_perfection) { struct qb_ipc_response_header res_header; int32_t res; int32_t try_times = 0; request.hdr.id = req_id; request.hdr.size = sizeof(struct qb_ipc_request_header) + size; /* check that we can't send a message that is too big * and we get the right return code. */ res = qb_ipcc_send(conn, &request, MAX_MSG_SIZE*2); ck_assert_int_eq(res, -EMSGSIZE); repeat_send: res = qb_ipcc_send(conn, &request, request.hdr.size); try_times++; if (res < 0) { if (res == -EAGAIN && try_times < 10) { goto repeat_send; } else { if (res == -EAGAIN && try_times >= 10) { fc_enabled = QB_TRUE; } errno = -res; qb_perror(LOG_INFO, "qb_ipcc_send"); return res; } } if (req_id == IPC_MSG_REQ_DISPATCH) { res = qb_ipcc_event_recv(conn, &res_header, sizeof(struct qb_ipc_response_header), ms_timeout); } else { res = qb_ipcc_recv(conn, &res_header, sizeof(struct qb_ipc_response_header), ms_timeout); } if (res == -EINTR) { return -1; } if (res == -EAGAIN || res == -ETIMEDOUT) { fc_enabled = QB_TRUE; qb_perror(LOG_DEBUG, "qb_ipcc_recv"); return res; } if (expect_perfection) { ck_assert_int_eq(res, sizeof(struct qb_ipc_response_header)); ck_assert_int_eq(res_header.id, req_id + 1); ck_assert_int_eq(res_header.size, sizeof(struct qb_ipc_response_header)); } return res; } static int32_t recv_timeout = -1; static void test_ipc_txrx(void) { int32_t j; int32_t c = 0; size_t size; pid_t pid; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); size = QB_MIN(sizeof(struct qb_ipc_request_header), 64); for (j = 1; j < 19; j++) { size *= 2; if (size >= MAX_MSG_SIZE) break; if (send_and_check(IPC_MSG_REQ_TX_RX, size, recv_timeout, QB_TRUE) < 0) { break; } } if (turn_on_fc) { ck_assert_int_eq(fc_enabled, QB_TRUE); } qb_ipcc_disconnect(conn); stop_process(pid); } static void test_ipc_exit(void) { struct qb_ipc_request_header req_header; struct qb_ipc_response_header res_header; struct iovec iov[1]; int32_t res; int32_t c = 0; int32_t j = 0; pid_t pid; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); req_header.id = IPC_MSG_REQ_TX_RX; req_header.size = sizeof(struct qb_ipc_request_header); iov[0].iov_len = req_header.size; iov[0].iov_base = &req_header; res = qb_ipcc_sendv_recv(conn, iov, 1, &res_header, sizeof(struct qb_ipc_response_header), -1); ck_assert_int_eq(res, sizeof(struct qb_ipc_response_header)); /* kill the server */ stop_process(pid); /* * wait a bit for the server to die. */ sleep(1); /* * this needs to free up the shared mem */ qb_ipcc_disconnect(conn); } START_TEST(test_ipc_exit_us) { qb_enter(); ipc_type = QB_IPC_SOCKET; ipc_name = __func__; recv_timeout = 5000; test_ipc_exit(); qb_leave(); } END_TEST START_TEST(test_ipc_exit_shm) { qb_enter(); ipc_type = QB_IPC_SHM; ipc_name = __func__; recv_timeout = 1000; test_ipc_exit(); qb_leave(); } END_TEST START_TEST(test_ipc_txrx_shm_tmo) { qb_enter(); turn_on_fc = QB_FALSE; ipc_type = QB_IPC_SHM; ipc_name = __func__; recv_timeout = 1000; test_ipc_txrx(); qb_leave(); } END_TEST START_TEST(test_ipc_txrx_shm_block) { qb_enter(); turn_on_fc = QB_FALSE; ipc_type = QB_IPC_SHM; ipc_name = __func__; recv_timeout = -1; test_ipc_txrx(); qb_leave(); } END_TEST START_TEST(test_ipc_fc_shm) { qb_enter(); turn_on_fc = QB_TRUE; ipc_type = QB_IPC_SHM; recv_timeout = 500; ipc_name = __func__; test_ipc_txrx(); qb_leave(); } END_TEST START_TEST(test_ipc_txrx_us_block) { qb_enter(); turn_on_fc = QB_FALSE; ipc_type = QB_IPC_SOCKET; ipc_name = __func__; recv_timeout = -1; test_ipc_txrx(); qb_leave(); } END_TEST START_TEST(test_ipc_txrx_us_tmo) { qb_enter(); turn_on_fc = QB_FALSE; ipc_type = QB_IPC_SOCKET; ipc_name = __func__; recv_timeout = 1000; test_ipc_txrx(); qb_leave(); } END_TEST START_TEST(test_ipc_fc_us) { qb_enter(); turn_on_fc = QB_TRUE; ipc_type = QB_IPC_SOCKET; recv_timeout = 500; ipc_name = __func__; test_ipc_txrx(); qb_leave(); } END_TEST struct my_res { struct qb_ipc_response_header hdr; char message[1024 * 1024]; }; static void test_ipc_dispatch(void) { int32_t j; int32_t c = 0; pid_t pid; int32_t size; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); size = QB_MIN(sizeof(struct qb_ipc_request_header), 64); for (j = 1; j < 19; j++) { size *= 2; if (size >= MAX_MSG_SIZE) break; if (send_and_check(IPC_MSG_REQ_DISPATCH, size, recv_timeout, QB_TRUE) < 0) { break; } } qb_ipcc_disconnect(conn); stop_process(pid); } START_TEST(test_ipc_disp_us) { qb_enter(); ipc_type = QB_IPC_SOCKET; ipc_name = __func__; test_ipc_dispatch(); qb_leave(); } END_TEST static int32_t events_received; static int32_t count_stress_events(int32_t fd, int32_t revents, void *data) { struct { struct qb_ipc_response_header hdr __attribute__ ((aligned(8))); char data[GIANT_MSG_DATA_SIZE] __attribute__ ((aligned(8))); uint32_t sent_msgs __attribute__ ((aligned(8))); } __attribute__ ((aligned(8))) giant_event_recv; qb_loop_t *cl = (qb_loop_t*)data; int32_t res; res = qb_ipcc_event_recv(conn, &giant_event_recv, sizeof(giant_event_recv), -1); if (res > 0) { events_received++; if ((events_received % 1000) == 0) { qb_log(LOG_DEBUG, "RECV: %d stress events processed", events_received); if (res != sizeof(giant_event_recv)) { qb_log(LOG_DEBUG, "Unexpected recv size, expected %d got %d", res, sizeof(giant_event_recv)); } else if (giant_event_recv.sent_msgs != events_received) { qb_log(LOG_DEBUG, "Server event mismatch. Server thinks we got %d msgs, but we only received %d", giant_event_recv.sent_msgs, events_received); } } } else if (res != -EAGAIN) { qb_perror(LOG_DEBUG, "count_stress_events"); qb_loop_stop(cl); return -1; } if (events_received >= num_stress_events) { qb_loop_stop(cl); return -1; } return 0; } static int32_t count_bulk_events(int32_t fd, int32_t revents, void *data) { qb_loop_t *cl = (qb_loop_t*)data; struct qb_ipc_response_header res_header; int32_t res; res = qb_ipcc_event_recv(conn, &res_header, sizeof(struct qb_ipc_response_header), -1); if (res > 0) { events_received++; } if (events_received >= num_bulk_events) { qb_loop_stop(cl); return -1; } return 0; } static void test_ipc_bulk_events(void) { struct qb_ipc_request_header req_header; struct qb_ipc_response_header res_header; struct iovec iov[1]; int32_t c = 0; int32_t j = 0; pid_t pid; int32_t res; qb_loop_t *cl; int32_t fd; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); events_received = 0; cl = qb_loop_create(); res = qb_ipcc_fd_get(conn, &fd); ck_assert_int_eq(res, 0); res = qb_loop_poll_add(cl, QB_LOOP_MED, fd, POLLIN, cl, count_bulk_events); ck_assert_int_eq(res, 0); res = send_and_check(IPC_MSG_REQ_BULK_EVENTS, 0, recv_timeout, QB_TRUE); ck_assert_int_eq(res, sizeof(struct qb_ipc_response_header)); qb_loop_run(cl); ck_assert_int_eq(events_received, num_bulk_events); req_header.id = IPC_MSG_REQ_SERVER_FAIL; req_header.size = sizeof(struct qb_ipc_request_header); iov[0].iov_len = req_header.size; iov[0].iov_base = &req_header; res = qb_ipcc_sendv_recv(conn, iov, 1, &res_header, sizeof(struct qb_ipc_response_header), -1); if (res != -ECONNRESET && res != -ENOTCONN) { qb_log(LOG_ERR, "id:%d size:%d", res_header.id, res_header.size); ck_assert_int_eq(res, -ENOTCONN); } qb_ipcc_disconnect(conn); stop_process(pid); } static void test_ipc_stress_test(void) { struct qb_ipc_request_header req_header; struct qb_ipc_response_header res_header; struct iovec iov[1]; int32_t c = 0; int32_t j = 0; pid_t pid; int32_t res; qb_loop_t *cl; int32_t fd; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); qb_log(LOG_DEBUG, "Testing %d iterations of EVENT msg passing.", num_stress_events); events_received = 0; cl = qb_loop_create(); res = qb_ipcc_fd_get(conn, &fd); ck_assert_int_eq(res, 0); res = qb_loop_poll_add(cl, QB_LOOP_MED, fd, POLLIN, cl, count_stress_events); ck_assert_int_eq(res, 0); res = send_and_check(IPC_MSG_REQ_STRESS_EVENT, 0, recv_timeout, QB_TRUE); qb_loop_run(cl); ck_assert_int_eq(events_received, num_stress_events); req_header.id = IPC_MSG_REQ_SERVER_FAIL; req_header.size = sizeof(struct qb_ipc_request_header); iov[0].iov_len = req_header.size; iov[0].iov_base = &req_header; res = qb_ipcc_sendv_recv(conn, iov, 1, &res_header, sizeof(struct qb_ipc_response_header), -1); if (res != -ECONNRESET && res != -ENOTCONN) { qb_log(LOG_ERR, "id:%d size:%d", res_header.id, res_header.size); ck_assert_int_eq(res, -ENOTCONN); } qb_ipcc_disconnect(conn); stop_process(pid); } START_TEST(test_ipc_stress_test_us) { qb_enter(); send_event_on_created = QB_FALSE; ipc_type = QB_IPC_SOCKET; ipc_name = __func__; test_ipc_stress_test(); qb_leave(); } END_TEST START_TEST(test_ipc_bulk_events_us) { qb_enter(); send_event_on_created = QB_FALSE; ipc_type = QB_IPC_SOCKET; ipc_name = __func__; test_ipc_bulk_events(); qb_leave(); } END_TEST static void test_ipc_event_on_created(void) { int32_t c = 0; int32_t j = 0; pid_t pid; int32_t res; qb_loop_t *cl; int32_t fd; num_bulk_events = 1; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); events_received = 0; cl = qb_loop_create(); res = qb_ipcc_fd_get(conn, &fd); ck_assert_int_eq(res, 0); res = qb_loop_poll_add(cl, QB_LOOP_MED, fd, POLLIN, cl, count_bulk_events); ck_assert_int_eq(res, 0); qb_loop_run(cl); ck_assert_int_eq(events_received, num_bulk_events); qb_ipcc_disconnect(conn); stop_process(pid); } START_TEST(test_ipc_event_on_created_us) { qb_enter(); send_event_on_created = QB_TRUE; ipc_type = QB_IPC_SOCKET; ipc_name = __func__; test_ipc_event_on_created(); qb_leave(); } END_TEST static void test_ipc_disconnect_after_created(void) { struct qb_ipc_request_header req_header; struct qb_ipc_response_header res_header; struct iovec iov[1]; int32_t c = 0; int32_t j = 0; pid_t pid; int32_t res; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); ck_assert_int_eq(QB_TRUE, qb_ipcc_is_connected(conn)); req_header.id = IPC_MSG_REQ_SERVER_DISCONNECT; req_header.size = sizeof(struct qb_ipc_request_header); iov[0].iov_len = req_header.size; iov[0].iov_base = &req_header; res = qb_ipcc_sendv_recv(conn, iov, 1, &res_header, sizeof(struct qb_ipc_response_header), -1); /* * confirm we get -ENOTCONN or -ECONNRESET */ if (res != -ECONNRESET && res != -ENOTCONN) { qb_log(LOG_ERR, "id:%d size:%d", res_header.id, res_header.size); ck_assert_int_eq(res, -ENOTCONN); } ck_assert_int_eq(QB_FALSE, qb_ipcc_is_connected(conn)); qb_ipcc_disconnect(conn); stop_process(pid); } START_TEST(test_ipc_disconnect_after_created_us) { qb_enter(); disconnect_after_created = QB_TRUE; ipc_type = QB_IPC_SOCKET; ipc_name = __func__; test_ipc_disconnect_after_created(); qb_leave(); } END_TEST static void test_ipc_server_fail(void) { struct qb_ipc_request_header req_header; struct qb_ipc_response_header res_header; struct iovec iov[1]; int32_t res; int32_t j; int32_t c = 0; pid_t pid; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); /* * tell the server to exit */ req_header.id = IPC_MSG_REQ_SERVER_FAIL; req_header.size = sizeof(struct qb_ipc_request_header); iov[0].iov_len = req_header.size; iov[0].iov_base = &req_header; ck_assert_int_eq(QB_TRUE, qb_ipcc_is_connected(conn)); res = qb_ipcc_sendv_recv(conn, iov, 1, &res_header, sizeof(struct qb_ipc_response_header), -1); /* * confirm we get -ENOTCONN or ECONNRESET */ if (res != -ECONNRESET && res != -ENOTCONN) { qb_log(LOG_ERR, "id:%d size:%d", res_header.id, res_header.size); ck_assert_int_eq(res, -ENOTCONN); } ck_assert_int_eq(QB_FALSE, qb_ipcc_is_connected(conn)); qb_ipcc_disconnect(conn); stop_process(pid); } START_TEST(test_ipc_server_fail_soc) { qb_enter(); ipc_type = QB_IPC_SOCKET; ipc_name = __func__; test_ipc_server_fail(); qb_leave(); } END_TEST START_TEST(test_ipc_disp_shm) { qb_enter(); ipc_type = QB_IPC_SHM; ipc_name = __func__; test_ipc_dispatch(); qb_leave(); } END_TEST START_TEST(test_ipc_stress_test_shm) { qb_enter(); send_event_on_created = QB_FALSE; ipc_type = QB_IPC_SHM; ipc_name = __func__; test_ipc_stress_test(); qb_leave(); } END_TEST START_TEST(test_ipc_bulk_events_shm) { qb_enter(); ipc_type = QB_IPC_SHM; ipc_name = __func__; test_ipc_bulk_events(); qb_leave(); } END_TEST START_TEST(test_ipc_event_on_created_shm) { qb_enter(); send_event_on_created = QB_TRUE; ipc_type = QB_IPC_SHM; ipc_name = __func__; test_ipc_event_on_created(); qb_leave(); } END_TEST START_TEST(test_ipc_server_fail_shm) { qb_enter(); ipc_type = QB_IPC_SHM; ipc_name = __func__; test_ipc_server_fail(); qb_leave(); } END_TEST static void test_ipc_service_ref_count(void) { int32_t c = 0; int32_t j = 0; pid_t pid; reference_count_test = QB_TRUE; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); sleep(5); qb_ipcc_disconnect(conn); stop_process(pid); } START_TEST(test_ipc_service_ref_count_shm) { qb_enter(); ipc_type = QB_IPC_SHM; ipc_name = __func__; test_ipc_service_ref_count(); qb_leave(); } END_TEST START_TEST(test_ipc_service_ref_count_us) { qb_enter(); ipc_type = QB_IPC_SOCKET; ipc_name = __func__; test_ipc_service_ref_count(); qb_leave(); } END_TEST static void test_max_dgram_size(void) { /* most implementations will not let you set a dgram buffer * of 1 million bytes. This test verifies that the we can detect * the max dgram buffersize regardless, and that the value we detect * is consistent. */ int32_t init; int32_t i; qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_REMOVE, QB_LOG_FILTER_FILE, "*", LOG_TRACE); init = qb_ipcc_verify_dgram_max_msg_size(1000000); fail_if(init <= 0); for (i = 0; i < 100; i++) { int try = qb_ipcc_verify_dgram_max_msg_size(1000000); ck_assert_int_eq(init, try); } qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD, QB_LOG_FILTER_FILE, "*", LOG_TRACE); } START_TEST(test_ipc_max_dgram_size) { qb_enter(); test_max_dgram_size(); qb_leave(); } END_TEST static Suite * make_shm_suite(void) { TCase *tc; Suite *s = suite_create("shm"); tc = tcase_create("ipc_server_fail_shm"); tcase_add_test(tc, test_ipc_server_fail_shm); tcase_set_timeout(tc, 6); suite_add_tcase(s, tc); tc = tcase_create("ipc_txrx_shm_block"); tcase_add_test(tc, test_ipc_txrx_shm_block); tcase_set_timeout(tc, 6); suite_add_tcase(s, tc); tc = tcase_create("ipc_txrx_shm_tmo"); tcase_add_test(tc, test_ipc_txrx_shm_tmo); tcase_set_timeout(tc, 6); suite_add_tcase(s, tc); tc = tcase_create("ipc_fc_shm"); tcase_add_test(tc, test_ipc_fc_shm); tcase_set_timeout(tc, 6); suite_add_tcase(s, tc); tc = tcase_create("ipc_dispatch_shm"); tcase_add_test(tc, test_ipc_disp_shm); tcase_set_timeout(tc, 16); suite_add_tcase(s, tc); tc = tcase_create("ipc_stress_test_shm"); tcase_add_test(tc, test_ipc_stress_test_shm); tcase_set_timeout(tc, 16); suite_add_tcase(s, tc); tc = tcase_create("ipc_bulk_events_shm"); tcase_add_test(tc, test_ipc_bulk_events_shm); tcase_set_timeout(tc, 16); suite_add_tcase(s, tc); tc = tcase_create("ipc_exit_shm"); tcase_add_test(tc, test_ipc_exit_shm); tcase_set_timeout(tc, 3); suite_add_tcase(s, tc); tc = tcase_create("ipc_event_on_created_shm"); tcase_add_test(tc, test_ipc_event_on_created_shm); suite_add_tcase(s, tc); tc = tcase_create("ipc_service_ref_count_shm"); tcase_add_test(tc, test_ipc_service_ref_count_shm); tcase_set_timeout(tc, 10); suite_add_tcase(s, tc); return s; } static Suite * make_soc_suite(void) { Suite *s = suite_create("socket"); TCase *tc; tc = tcase_create("ipc_max_dgram_size"); tcase_add_test(tc, test_ipc_max_dgram_size); tcase_set_timeout(tc, 30); suite_add_tcase(s, tc); tc = tcase_create("ipc_server_fail_soc"); tcase_add_test(tc, test_ipc_server_fail_soc); tcase_set_timeout(tc, 6); suite_add_tcase(s, tc); tc = tcase_create("ipc_txrx_us_block"); tcase_add_test(tc, test_ipc_txrx_us_block); tcase_set_timeout(tc, 6); suite_add_tcase(s, tc); tc = tcase_create("ipc_txrx_us_tmo"); tcase_add_test(tc, test_ipc_txrx_us_tmo); tcase_set_timeout(tc, 6); suite_add_tcase(s, tc); tc = tcase_create("ipc_fc_us"); tcase_add_test(tc, test_ipc_fc_us); tcase_set_timeout(tc, 6); suite_add_tcase(s, tc); tc = tcase_create("ipc_exit_us"); tcase_add_test(tc, test_ipc_exit_us); tcase_set_timeout(tc, 6); suite_add_tcase(s, tc); tc = tcase_create("ipc_dispatch_us"); tcase_add_test(tc, test_ipc_disp_us); tcase_set_timeout(tc, 16); suite_add_tcase(s, tc); tc = tcase_create("ipc_stress_test_us"); tcase_add_test(tc, test_ipc_stress_test_us); tcase_set_timeout(tc, 60); suite_add_tcase(s, tc); tc = tcase_create("ipc_bulk_events_us"); tcase_add_test(tc, test_ipc_bulk_events_us); tcase_set_timeout(tc, 16); suite_add_tcase(s, tc); tc = tcase_create("ipc_event_on_created_us"); tcase_add_test(tc, test_ipc_event_on_created_us); suite_add_tcase(s, tc); tc = tcase_create("ipc_disconnect_after_created_us"); tcase_add_test(tc, test_ipc_disconnect_after_created_us); suite_add_tcase(s, tc); tc = tcase_create("ipc_service_ref_count_us"); tcase_add_test(tc, test_ipc_service_ref_count_us); tcase_set_timeout(tc, 10); suite_add_tcase(s, tc); return s; } int32_t main(void) { int32_t number_failed; SRunner *sr; Suite *s; int32_t do_shm_tests = QB_TRUE; #ifdef DISABLE_IPC_SHM do_shm_tests = QB_FALSE; #endif /* DISABLE_IPC_SHM */ s = make_soc_suite(); sr = srunner_create(s); if (do_shm_tests) { srunner_add_suite(sr, make_shm_suite()); } qb_log_init("check", LOG_USER, LOG_EMERG); qb_log_ctl(QB_LOG_SYSLOG, QB_LOG_CONF_ENABLED, QB_FALSE); qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD, QB_LOG_FILTER_FILE, "*", LOG_TRACE); qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE); qb_log_format_set(QB_LOG_STDERR, "lib/%f|%l| %b"); srunner_run_all(sr, CK_VERBOSE); number_failed = srunner_ntests_failed(sr); srunner_free(sr); return (number_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE; }