diff --git a/lib/loop_poll.c b/lib/loop_poll.c index 49c9650..117a276 100644 --- a/lib/loop_poll.c +++ b/lib/loop_poll.c @@ -1,780 +1,780 @@ /* * Copyright (C) 2010 Red Hat, Inc. * * Author: Angus Salkeld * * This file is part of libqb. * * libqb is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation, either version 2.1 of the License, or * (at your option) any later version. * * libqb is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with libqb. If not, see . */ #include "os_base.h" #ifdef HAVE_SYS_RESOURCE_H #include #endif #include #if defined(__DARWIN_NSIG) #define QB_MAX_NUM_SIGNALS __DARWIN_NSIG #else #if defined(NSIG) #define QB_MAX_NUM_SIGNALS NSIG - 1 #else #define QB_MAX_NUM_SIGNALS 31 #endif #endif #include "loop_poll_int.h" /* * Define this to log slow (>10ms) jobs. */ #undef DEBUG_DISPATCH_TIME /* logs, std(in|out|err), pipe */ #define POLL_FDS_USED_MISC 50 #ifdef HAVE_EPOLL #define USE_EPOLL 1 #else #ifdef HAVE_KQUEUE #define USE_KQUEUE 1 #else #define USE_POLL 1 #endif /* HAVE_KQUEUE */ #endif /* HAVE_EPOLL */ static int32_t _qb_signal_add_to_jobs_(struct qb_loop *l, struct qb_poll_entry *pe); static void _poll_entry_check_generate_(struct qb_poll_entry *pe) { int32_t i; for (i = 0; i < 200; i++) { pe->check = random(); if (pe->check != 0 && pe->check != 0xffffffff) { break; } } } static void _poll_entry_mark_deleted_(struct qb_poll_entry *pe) { pe->ufd.fd = -1; pe->state = QB_POLL_ENTRY_DELETED; pe->check = 0; } static void _poll_entry_empty_(struct qb_poll_entry *pe) { memset(pe, 0, sizeof(struct qb_poll_entry)); pe->ufd.fd = -1; } static void _poll_dispatch_and_take_back_(struct qb_loop_item *item, enum qb_loop_priority p) { struct qb_poll_entry *pe = (struct qb_poll_entry *)item; int32_t res; #ifdef DEBUG_DISPATCH_TIME uint64_t start; uint64_t stop; int32_t log_warn = QB_FALSE; start = qb_util_nano_current_get(); #endif /* DEBUG_DISPATCH_TIME */ assert(pe->state == QB_POLL_ENTRY_JOBLIST); assert(pe->item.type == QB_LOOP_FD); res = pe->poll_dispatch_fn(pe->ufd.fd, pe->ufd.revents, pe->item.user_data); if (res < 0) { _poll_entry_mark_deleted_(pe); - } else { + } else if (pe->state != QB_POLL_ENTRY_DELETED) { pe->state = QB_POLL_ENTRY_ACTIVE; pe->ufd.revents = 0; } #ifdef DEBUG_DISPATCH_TIME if (pe->state == QB_POLL_ENTRY_ACTIVE) { pe->runs++; if ((pe->runs % 50) == 0) { log_warn = QB_TRUE; } stop = qb_util_nano_current_get(); if ((stop - start) > (10 * QB_TIME_NS_IN_MSEC)) { log_warn = QB_TRUE; } if (log_warn && pe->item.type == QB_LOOP_FD) { qb_util_log(LOG_INFO, "[fd:%d] dispatch:%p runs:%d duration:%d ms", pe->ufd.fd, pe->poll_dispatch_fn, pe->runs, (int32_t) ((stop - start) / QB_TIME_NS_IN_MSEC)); } } #endif /* DEBUG_DISPATCH_TIME */ } void qb_poll_fds_usage_check_(struct qb_poll_source *s) { struct rlimit lim; static int32_t socks_limit = 0; int32_t send_event = QB_FALSE; int32_t socks_used = 0; int32_t socks_avail = 0; struct qb_poll_entry *pe; int32_t i; if (socks_limit == 0) { if (getrlimit(RLIMIT_NOFILE, &lim) == -1) { qb_util_perror(LOG_WARNING, "getrlimit"); return; } socks_limit = lim.rlim_cur; socks_limit -= POLL_FDS_USED_MISC; if (socks_limit < 0) { socks_limit = 0; } } for (i = 0; i < s->poll_entry_count; i++) { assert(qb_array_index(s->poll_entries, i, (void **)&pe) == 0); if ((pe->state == QB_POLL_ENTRY_ACTIVE || pe->state == QB_POLL_ENTRY_JOBLIST) && pe->ufd.fd != -1) { socks_used++; } if (pe->state == QB_POLL_ENTRY_DELETED) { _poll_entry_empty_(pe); } } socks_avail = socks_limit - socks_used; if (socks_avail < 0) { socks_avail = 0; } send_event = QB_FALSE; if (s->not_enough_fds) { if (socks_avail > 2) { s->not_enough_fds = QB_FALSE; send_event = QB_TRUE; } } else { if (socks_avail <= 1) { s->not_enough_fds = QB_TRUE; send_event = QB_TRUE; } } if (send_event && s->low_fds_event_fn) { s->low_fds_event_fn(s->not_enough_fds, socks_avail); } } struct qb_loop_source * qb_loop_poll_create(struct qb_loop *l) { struct qb_poll_source *s = malloc(sizeof(struct qb_poll_source)); if (s == NULL) { return NULL; } s->s.l = l; s->s.dispatch_and_take_back = _poll_dispatch_and_take_back_; s->poll_entries = qb_array_create_2(16, sizeof(struct qb_poll_entry), 16); s->poll_entry_count = 0; s->low_fds_event_fn = NULL; s->not_enough_fds = QB_FALSE; #ifdef USE_EPOLL (void)qb_epoll_init(s); #endif #ifdef USE_KQUEUE (void)qb_kqueue_init(s); #endif #ifdef USE_POLL (void)qb_poll_init(s); #endif /* USE_POLL */ return (struct qb_loop_source *)s; } void qb_loop_poll_destroy(struct qb_loop *l) { struct qb_poll_source *s = (struct qb_poll_source *)l->fd_source; qb_array_free(s->poll_entries); s->driver.fini(s); free(s); } int32_t qb_loop_poll_low_fds_event_set(struct qb_loop *l, qb_loop_poll_low_fds_event_fn fn) { struct qb_poll_source *s = (struct qb_poll_source *)l->fd_source; s->low_fds_event_fn = fn; return 0; } static int32_t _get_empty_array_position_(struct qb_poll_source *s) { int32_t found = QB_FALSE; uint32_t install_pos; int32_t res = 0; struct qb_poll_entry *pe; for (install_pos = 0; install_pos < s->poll_entry_count; install_pos++) { assert(qb_array_index (s->poll_entries, install_pos, (void **)&pe) == 0); if (pe->state == QB_POLL_ENTRY_EMPTY) { found = QB_TRUE; break; } } if (found == QB_FALSE) { #ifdef USE_POLL struct pollfd *ufds; int32_t new_size = (s->poll_entry_count + 1) * sizeof(struct pollfd); ufds = realloc(s->ufds, new_size); if (ufds == NULL) { return -ENOMEM; } s->ufds = ufds; #endif /* USE_POLL */ /* * Grow pollfd list */ res = qb_array_grow(s->poll_entries, s->poll_entry_count + 1); if (res != 0) { return res; } s->poll_entry_count += 1; install_pos = s->poll_entry_count - 1; } return install_pos; } static int32_t _poll_add_(struct qb_loop *l, enum qb_loop_priority p, int32_t fd, int32_t events, void *data, struct qb_poll_entry **pe_pt) { struct qb_poll_entry *pe; uint32_t install_pos; int32_t res = 0; struct qb_poll_source *s; if (l == NULL) { return -EINVAL; } s = (struct qb_poll_source *)l->fd_source; install_pos = _get_empty_array_position_(s); assert(qb_array_index(s->poll_entries, install_pos, (void **)&pe) == 0); pe->state = QB_POLL_ENTRY_ACTIVE; pe->install_pos = install_pos; _poll_entry_check_generate_(pe); pe->ufd.fd = fd; pe->ufd.events = events; pe->ufd.revents = 0; pe->item.user_data = data; pe->item.source = (struct qb_loop_source *)l->fd_source; pe->p = p; pe->runs = 0; res = s->driver.add(s, pe, fd, events); if (res == 0) { *pe_pt = pe; return 0; } else { pe->state = QB_POLL_ENTRY_EMPTY; return res; } } static int32_t _qb_poll_add_to_jobs_(struct qb_loop *l, struct qb_poll_entry *pe) { assert(pe->item.type == QB_LOOP_FD); qb_loop_level_item_add(&l->level[pe->p], &pe->item); pe->state = QB_POLL_ENTRY_JOBLIST; return 1; } int32_t qb_loop_poll_add(struct qb_loop * lp, enum qb_loop_priority p, int32_t fd, int32_t events, void *data, qb_loop_poll_dispatch_fn dispatch_fn) { struct qb_poll_entry *pe = NULL; int32_t size; int32_t new_size; int32_t res; struct qb_loop *l = lp; if (l == NULL) { l = qb_loop_default_get(); } size = ((struct qb_poll_source *)l->fd_source)->poll_entry_count; res = _poll_add_(l, p, fd, events, data, &pe); if (res != 0) { qb_util_perror(LOG_ERR, "couldn't add poll entryfor FD %d", fd); return res; } new_size = ((struct qb_poll_source *)l->fd_source)->poll_entry_count; pe->poll_dispatch_fn = dispatch_fn; pe->item.type = QB_LOOP_FD; pe->add_to_jobs = _qb_poll_add_to_jobs_; if (new_size > size) { qb_util_log(LOG_TRACE, "grown poll array to %d for FD %d", new_size, fd); } return res; } int32_t qb_loop_poll_mod(struct qb_loop * lp, enum qb_loop_priority p, int32_t fd, int32_t events, void *data, qb_loop_poll_dispatch_fn dispatch_fn) { uint32_t i; int32_t res = 0; struct qb_poll_entry *pe; struct qb_poll_source *s; struct qb_loop *l = lp; if (l == NULL) { l = qb_loop_default_get(); } s = (struct qb_poll_source *)l->fd_source; /* * Find file descriptor to modify events and dispatch function */ for (i = 0; i < s->poll_entry_count; i++) { assert(qb_array_index(s->poll_entries, i, (void **)&pe) == 0); if (pe->ufd.fd != fd) { continue; } if (pe->state == QB_POLL_ENTRY_DELETED || pe->check == 0) { qb_util_log(LOG_ERR, "poll_mod : can't modify entry already deleted"); return -EBADF; } pe->poll_dispatch_fn = dispatch_fn; pe->item.user_data = data; pe->p = p; if (pe->ufd.events != events) { res = s->driver.mod(s, pe, fd, events); pe->ufd.events = events; } return res; } return -EBADF; } int32_t qb_loop_poll_del(struct qb_loop * lp, int32_t fd) { int32_t i; int32_t res = 0; struct qb_poll_entry *pe; struct qb_poll_source *s; struct qb_loop *l = lp; if (l == NULL) { l = qb_loop_default_get(); } s = (struct qb_poll_source *)l->fd_source; for (i = 0; i < s->poll_entry_count; i++) { assert(qb_array_index(s->poll_entries, i, (void **)&pe) == 0); if (pe->ufd.fd != fd || pe->item.type != QB_LOOP_FD) { continue; } if (pe->state == QB_POLL_ENTRY_DELETED || pe->state == QB_POLL_ENTRY_EMPTY) { return 0; } if (pe->state == QB_POLL_ENTRY_JOBLIST) { qb_loop_level_item_del(&l->level[pe->p], &pe->item); } res = s->driver.del(s, pe, fd, i); _poll_entry_mark_deleted_(pe); return res; } return -EBADF; } static int32_t pipe_fds[2] = { -1, -1 }; struct qb_signal_source { struct qb_loop_source s; struct qb_list_head sig_head; sigset_t signal_superset; }; struct qb_loop_sig { struct qb_loop_item item; int32_t signal; enum qb_loop_priority p; qb_loop_signal_dispatch_fn dispatch_fn; struct qb_loop_sig *cloned_from; }; static void _handle_real_signal_(int signal_num, siginfo_t * si, void *context) { int32_t sig = signal_num; int32_t res = 0; if (pipe_fds[1] > 0) { try_again: res = write(pipe_fds[1], &sig, sizeof(int32_t)); if (res == -1 && errno == EAGAIN) { goto try_again; } else if (res != sizeof(int32_t)) { qb_util_log(LOG_ERR, "failed to write signal to pipe [%d]", res); } } qb_util_log(LOG_TRACE, "got real signal [%d] sent to pipe", sig); } static void _signal_dispatch_and_take_back_(struct qb_loop_item *item, enum qb_loop_priority p) { struct qb_loop_sig *sig = (struct qb_loop_sig *)item; int32_t res; res = sig->dispatch_fn(sig->signal, sig->item.user_data); if (res != 0) { (void)qb_loop_signal_del(sig->cloned_from->item.source->l, sig->cloned_from); } free(sig); } struct qb_loop_source * qb_loop_signals_create(struct qb_loop *l) { int32_t res = 0; struct qb_poll_entry *pe; struct qb_signal_source *s = calloc(1, sizeof(struct qb_signal_source)); if (s == NULL) { return NULL; } s->s.l = l; s->s.dispatch_and_take_back = _signal_dispatch_and_take_back_; s->s.poll = NULL; qb_list_init(&s->sig_head); sigemptyset(&s->signal_superset); if (pipe_fds[0] < 0) { res = pipe(pipe_fds); if (res == -1) { res = -errno; qb_util_perror(LOG_ERR, "Can't light pipe"); goto error_exit; } (void)qb_sys_fd_nonblock_cloexec_set(pipe_fds[0]); (void)qb_sys_fd_nonblock_cloexec_set(pipe_fds[1]); res = _poll_add_(l, QB_LOOP_HIGH, pipe_fds[0], POLLIN, NULL, &pe); if (res == 0) { pe->poll_dispatch_fn = NULL; pe->item.type = QB_LOOP_SIG; pe->add_to_jobs = _qb_signal_add_to_jobs_; } else { qb_util_perror(LOG_ERR, "Can't smoke pipe"); goto error_exit; } } return (struct qb_loop_source *)s; error_exit: errno = -res; free(s); if (pipe_fds[0] >= 0) { close(pipe_fds[0]); } if (pipe_fds[1] >= 0) { close(pipe_fds[1]); } return NULL; } void qb_loop_signals_destroy(struct qb_loop *l) { struct qb_signal_source *s = (struct qb_signal_source *)l->signal_source; struct qb_list_head *list; struct qb_list_head *n; struct qb_loop_item *item; close(pipe_fds[0]); pipe_fds[0] = -1; close(pipe_fds[1]); pipe_fds[1] = -1; qb_list_for_each_safe(list, n, &s->sig_head) { item = qb_list_entry(list, struct qb_loop_item, list); qb_list_del(&item->list); free(item); } free(l->signal_source); } static int32_t _qb_signal_add_to_jobs_(struct qb_loop *l, struct qb_poll_entry *pe) { struct qb_signal_source *s = (struct qb_signal_source *)l->signal_source; struct qb_list_head *list; struct qb_loop_sig *sig; struct qb_loop_item *item; struct qb_loop_sig *new_sig_job; int32_t the_signal; ssize_t res; int32_t jobs_added = 0; res = read(pipe_fds[0], &the_signal, sizeof(int32_t)); if (res != sizeof(int32_t)) { qb_util_perror(LOG_WARNING, "failed to read pipe"); return 0; } pe->ufd.revents = 0; qb_list_for_each(list, &s->sig_head) { item = qb_list_entry(list, struct qb_loop_item, list); sig = (struct qb_loop_sig *)item; if (sig->signal == the_signal) { new_sig_job = calloc(1, sizeof(struct qb_loop_sig)); if (new_sig_job == NULL) { return jobs_added; } memcpy(new_sig_job, sig, sizeof(struct qb_loop_sig)); qb_util_log(LOG_TRACE, "adding signal [%d] to job queue %p", the_signal, sig); new_sig_job->cloned_from = sig; qb_loop_level_item_add(&l->level[sig->p], &new_sig_job->item); jobs_added++; } } return jobs_added; } static void _adjust_sigactions_(struct qb_signal_source *s) { struct qb_loop_sig *sig; struct qb_loop_item *item; struct sigaction sa; int32_t i; int32_t needed; sa.sa_flags = SA_SIGINFO; sa.sa_sigaction = _handle_real_signal_; sigemptyset(&s->signal_superset); sigemptyset(&sa.sa_mask); /* re-set to default */ for (i = 0; i < QB_MAX_NUM_SIGNALS; i++) { needed = QB_FALSE; qb_list_for_each_entry(item, &s->sig_head, list) { sig = (struct qb_loop_sig *)item; if (i == sig->signal) { needed = QB_TRUE; break; } } if (needed) { sigaddset(&s->signal_superset, i); sigaction(i, &sa, NULL); } else { (void)signal(i, SIG_DFL); } } } int32_t qb_loop_signal_add(qb_loop_t * lp, enum qb_loop_priority p, int32_t the_sig, void *data, qb_loop_signal_dispatch_fn dispatch_fn, qb_loop_signal_handle * handle) { struct qb_loop_sig *sig; struct qb_signal_source *s; struct qb_loop *l = lp; if (l == NULL) { l = qb_loop_default_get(); } if (l == NULL || dispatch_fn == NULL) { return -EINVAL; } if (p < QB_LOOP_LOW || p > QB_LOOP_HIGH) { return -EINVAL; } s = (struct qb_signal_source *)l->signal_source; sig = calloc(1, sizeof(struct qb_loop_sig)); if (sig == NULL) { return -errno; } sig->dispatch_fn = dispatch_fn; sig->p = p; sig->signal = the_sig; sig->item.user_data = data; sig->item.source = l->signal_source; sig->item.type = QB_LOOP_SIG; qb_list_init(&sig->item.list); qb_list_add_tail(&sig->item.list, &s->sig_head); if (sigismember(&s->signal_superset, the_sig) != 1) { _adjust_sigactions_(s); } if (handle) { *handle = sig; } return 0; } int32_t qb_loop_signal_mod(qb_loop_t * lp, enum qb_loop_priority p, int32_t the_sig, void *data, qb_loop_signal_dispatch_fn dispatch_fn, qb_loop_signal_handle handle) { struct qb_signal_source *s; struct qb_loop_sig *sig = (struct qb_loop_sig *)handle; struct qb_loop *l = lp; if (l == NULL) { l = qb_loop_default_get(); } if (l == NULL || dispatch_fn == NULL || handle == NULL) { return -EINVAL; } if (p < QB_LOOP_LOW || p > QB_LOOP_HIGH) { return -EINVAL; } s = (struct qb_signal_source *)l->signal_source; sig->item.user_data = data; sig->item.type = QB_LOOP_SIG; sig->dispatch_fn = dispatch_fn; sig->p = p; if (sig->signal != the_sig) { sig->signal = the_sig; _adjust_sigactions_(s); } return 0; } int32_t qb_loop_signal_del(qb_loop_t * lp, qb_loop_signal_handle handle) { struct qb_signal_source *s; struct qb_loop_sig *sig = (struct qb_loop_sig *)handle; struct qb_loop_sig *sig_clone; struct qb_loop *l = lp; struct qb_loop_item *item; if (l == NULL) { l = qb_loop_default_get(); } if (l == NULL || handle == NULL) { return -EINVAL; } s = (struct qb_signal_source *)l->signal_source; qb_list_for_each_entry(item, &l->level[sig->p].wait_head, list) { if (item->type != QB_LOOP_SIG) { continue; } sig_clone = (struct qb_loop_sig *)item; if (sig_clone->cloned_from == sig) { qb_util_log(LOG_TRACE, "deleting sig in WAITLIST"); qb_list_del(&sig_clone->item.list); free(sig_clone); break; } } qb_list_for_each_entry(item, &l->level[sig->p].job_head, list) { if (item->type != QB_LOOP_SIG) { continue; } sig_clone = (struct qb_loop_sig *)item; if (sig_clone->cloned_from == sig) { qb_loop_level_item_del(&l->level[sig->p], item); qb_util_log(LOG_TRACE, "deleting sig in JOBLIST"); break; } } qb_list_del(&sig->item.list); free(sig); _adjust_sigactions_(s); return 0; } diff --git a/tests/check_ipc.c b/tests/check_ipc.c index ce9a7c0..79faa16 100644 --- a/tests/check_ipc.c +++ b/tests/check_ipc.c @@ -1,1549 +1,1645 @@ /* * Copyright (c) 2010 Red Hat, Inc. * * All rights reserved. * * Author: Angus Salkeld * * This file is part of libqb. * * libqb is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation, either version 2.1 of the License, or * (at your option) any later version. * * libqb is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with libqb. If not, see . */ #include "os_base.h" #include #include #include #include #include #include #include #include static const char *ipc_name = "ipc_test"; #define DEFAULT_MAX_MSG_SIZE (8192*16) static int CALCULATED_DGRAM_MAX_MSG_SIZE = 0; #define DGRAM_MAX_MSG_SIZE \ (CALCULATED_DGRAM_MAX_MSG_SIZE == 0 ? \ CALCULATED_DGRAM_MAX_MSG_SIZE = qb_ipcc_verify_dgram_max_msg_size(DEFAULT_MAX_MSG_SIZE) : \ CALCULATED_DGRAM_MAX_MSG_SIZE) #define MAX_MSG_SIZE (ipc_type == QB_IPC_SOCKET ? DGRAM_MAX_MSG_SIZE : DEFAULT_MAX_MSG_SIZE) /* The size the giant msg's data field needs to be to make * this the largests msg we can successfully send. */ #define GIANT_MSG_DATA_SIZE MAX_MSG_SIZE - sizeof(struct qb_ipc_response_header) - 8 static int enforce_server_buffer=0; static qb_ipcc_connection_t *conn; static enum qb_ipc_type ipc_type; enum my_msg_ids { IPC_MSG_REQ_TX_RX, IPC_MSG_RES_TX_RX, IPC_MSG_REQ_DISPATCH, IPC_MSG_RES_DISPATCH, IPC_MSG_REQ_BULK_EVENTS, IPC_MSG_RES_BULK_EVENTS, IPC_MSG_REQ_STRESS_EVENT, IPC_MSG_RES_STRESS_EVENT, IPC_MSG_REQ_SERVER_FAIL, IPC_MSG_RES_SERVER_FAIL, IPC_MSG_REQ_SERVER_DISCONNECT, IPC_MSG_RES_SERVER_DISCONNECT, }; /* Test Cases * * 1) basic send & recv differnet message sizes * * 2) send message to start dispatch (confirm receipt) * * 3) flow control * * 4) authentication * * 5) thread safety * * 6) cleanup * * 7) service availabilty * * 8) multiple services */ static qb_loop_t *my_loop; static qb_ipcs_service_t* s1; static int32_t turn_on_fc = QB_FALSE; static int32_t fc_enabled = 89; static int32_t send_event_on_created = QB_FALSE; static int32_t disconnect_after_created = QB_FALSE; static int32_t num_bulk_events = 10; static int32_t num_stress_events = 30000; static int32_t reference_count_test = QB_FALSE; +static int32_t multiple_connections = QB_FALSE; static int32_t exit_handler(int32_t rsignal, void *data) { qb_log(LOG_DEBUG, "caught signal %d", rsignal); qb_ipcs_destroy(s1); return -1; } static int32_t s1_msg_process_fn(qb_ipcs_connection_t *c, void *data, size_t size) { struct qb_ipc_request_header *req_pt = (struct qb_ipc_request_header *)data; struct qb_ipc_response_header response = { 0, }; ssize_t res; if (req_pt->id == IPC_MSG_REQ_TX_RX) { response.size = sizeof(struct qb_ipc_response_header); response.id = IPC_MSG_RES_TX_RX; response.error = 0; res = qb_ipcs_response_send(c, &response, response.size); if (res < 0) { qb_perror(LOG_INFO, "qb_ipcs_response_send"); } else if (res != response.size) { qb_log(LOG_DEBUG, "qb_ipcs_response_send %zd != %d", res, response.size); } if (turn_on_fc) { qb_ipcs_request_rate_limit(s1, QB_IPCS_RATE_OFF); } } else if (req_pt->id == IPC_MSG_REQ_DISPATCH) { response.size = sizeof(struct qb_ipc_response_header); response.id = IPC_MSG_RES_DISPATCH; response.error = 0; res = qb_ipcs_event_send(c, &response, sizeof(response)); if (res < 0) { qb_perror(LOG_INFO, "qb_ipcs_event_send"); } } else if (req_pt->id == IPC_MSG_REQ_BULK_EVENTS) { int32_t m; int32_t num; struct qb_ipcs_connection_stats_2 *stats; uint32_t max_size = MAX_MSG_SIZE; response.size = sizeof(struct qb_ipc_response_header); response.error = 0; stats = qb_ipcs_connection_stats_get_2(c, QB_FALSE); num = stats->event_q_length; free(stats); /* crazy large message */ res = qb_ipcs_event_send(c, &response, max_size*10); ck_assert_int_eq(res, -EMSGSIZE); /* send one event before responding */ res = qb_ipcs_event_send(c, &response, sizeof(response)); ck_assert_int_eq(res, sizeof(response)); response.id++; /* There should be one more item in the event queue now. */ stats = qb_ipcs_connection_stats_get_2(c, QB_FALSE); ck_assert_int_eq(stats->event_q_length - num, 1); free(stats); /* send response */ response.id = IPC_MSG_RES_BULK_EVENTS; res = qb_ipcs_response_send(c, &response, response.size); ck_assert_int_eq(res, sizeof(response)); /* send the rest of the events after the response */ for (m = 1; m < num_bulk_events; m++) { res = qb_ipcs_event_send(c, &response, sizeof(response)); if (res == -EAGAIN || res == -ENOBUFS) { /* retry */ usleep(1000); m--; continue; } ck_assert_int_eq(res, sizeof(response)); response.id++; } } else if (req_pt->id == IPC_MSG_REQ_STRESS_EVENT) { struct { struct qb_ipc_response_header hdr __attribute__ ((aligned(8))); char data[GIANT_MSG_DATA_SIZE] __attribute__ ((aligned(8))); uint32_t sent_msgs __attribute__ ((aligned(8))); } __attribute__ ((aligned(8))) giant_event_send; int32_t m; response.size = sizeof(struct qb_ipc_response_header); response.error = 0; response.id = IPC_MSG_RES_STRESS_EVENT; res = qb_ipcs_response_send(c, &response, response.size); ck_assert_int_eq(res, sizeof(response)); giant_event_send.hdr.error = 0; giant_event_send.hdr.id = IPC_MSG_RES_STRESS_EVENT; for (m = 0; m < num_stress_events; m++) { size_t sent_len = sizeof(struct qb_ipc_response_header); if (((m+1) % 1000) == 0) { sent_len = sizeof(giant_event_send); giant_event_send.sent_msgs = m + 1; } giant_event_send.hdr.size = sent_len; res = qb_ipcs_event_send(c, &giant_event_send, sent_len); if (res < 0) { if (res == -EAGAIN || res == -ENOBUFS) { /* yield to the receive process */ usleep(1000); m--; continue; } else { qb_perror(LOG_DEBUG, "sending stress events"); ck_assert_int_eq(res, sent_len); } } else if (((m+1) % 1000) == 0) { qb_log(LOG_DEBUG, "SENT: %d stress events sent", m+1); } giant_event_send.hdr.id++; } } else if (req_pt->id == IPC_MSG_REQ_SERVER_FAIL) { exit(0); } else if (req_pt->id == IPC_MSG_REQ_SERVER_DISCONNECT) { + multiple_connections = QB_FALSE; qb_ipcs_disconnect(c); } return 0; } static int32_t my_job_add(enum qb_loop_priority p, void *data, qb_loop_job_dispatch_fn fn) { return qb_loop_job_add(my_loop, p, data, fn); } static int32_t my_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t events, void *data, qb_ipcs_dispatch_fn_t fn) { return qb_loop_poll_add(my_loop, p, fd, events, data, fn); } static int32_t my_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t events, void *data, qb_ipcs_dispatch_fn_t fn) { return qb_loop_poll_mod(my_loop, p, fd, events, data, fn); } static int32_t my_dispatch_del(int32_t fd) { return qb_loop_poll_del(my_loop, fd); } static int32_t s1_connection_closed(qb_ipcs_connection_t *c) { + if (multiple_connections) { + return 0; + } qb_enter(); qb_leave(); return 0; } static void outq_flush (void *data) { static int i = 0; struct cs_ipcs_conn_context *cnx; cnx = qb_ipcs_context_get(data); qb_log(LOG_DEBUG,"iter %u\n", i); i++; if (i == 2) { qb_ipcs_destroy(s1); s1 = NULL; } /* is the reference counting is not working, this should fail * for i > 1. */ qb_ipcs_event_send(data, "test", 4); assert(memcmp(cnx, "test", 4) == 0); if (i < 5) { qb_loop_job_add(my_loop, QB_LOOP_HIGH, data, outq_flush); } else { /* this single unref should clean everything up. */ qb_ipcs_connection_unref(data); qb_log(LOG_INFO, "end of test, stopping loop"); qb_loop_stop(my_loop); } } static void s1_connection_destroyed(qb_ipcs_connection_t *c) { + if (multiple_connections) { + return; + } + qb_enter(); if (reference_count_test) { struct cs_ipcs_conn_context *cnx; cnx = qb_ipcs_context_get(c); free(cnx); } else { qb_loop_stop(my_loop); } qb_leave(); } static void s1_connection_created(qb_ipcs_connection_t *c) { uint32_t max = MAX_MSG_SIZE; + if (multiple_connections) { + return; + } if (send_event_on_created) { struct qb_ipc_response_header response; int32_t res; response.size = sizeof(struct qb_ipc_response_header); response.id = IPC_MSG_RES_DISPATCH; response.error = 0; res = qb_ipcs_event_send(c, &response, sizeof(response)); ck_assert_int_eq(res, response.size); } if (reference_count_test) { struct cs_ipcs_conn_context *context; qb_ipcs_connection_ref(c); qb_loop_job_add(my_loop, QB_LOOP_HIGH, c, outq_flush); context = calloc(1, 20); memcpy(context, "test", 4); qb_ipcs_context_set(c, context); } ck_assert_int_eq(max, qb_ipcs_connection_get_buffer_size(c)); } static void run_ipc_server(void) { int32_t res; qb_loop_signal_handle handle; struct qb_ipcs_service_handlers sh = { .connection_accept = NULL, .connection_created = s1_connection_created, .msg_process = s1_msg_process_fn, .connection_destroyed = s1_connection_destroyed, .connection_closed = s1_connection_closed, }; struct qb_ipcs_poll_handlers ph = { .job_add = my_job_add, .dispatch_add = my_dispatch_add, .dispatch_mod = my_dispatch_mod, .dispatch_del = my_dispatch_del, }; uint32_t max_size = MAX_MSG_SIZE; qb_loop_signal_add(my_loop, QB_LOOP_HIGH, SIGSTOP, NULL, exit_handler, &handle); qb_loop_signal_add(my_loop, QB_LOOP_HIGH, SIGTERM, NULL, exit_handler, &handle); my_loop = qb_loop_create(); s1 = qb_ipcs_create(ipc_name, 4, ipc_type, &sh); fail_if(s1 == 0); if (enforce_server_buffer) { qb_ipcs_enforce_buffer_size(s1, max_size); } qb_ipcs_poll_handlers_set(s1, &ph); res = qb_ipcs_run(s1); ck_assert_int_eq(res, 0); qb_loop_run(my_loop); qb_log(LOG_DEBUG, "loop finished - done ..."); } static int32_t run_function_in_new_process(void (*run_ipc_server_fn)(void)) { pid_t pid = fork (); if (pid == -1) { fprintf (stderr, "Can't fork\n"); return -1; } if (pid == 0) { run_ipc_server_fn(); return 0; } return pid; } static void request_server_exit(void) { struct qb_ipc_request_header req_header; struct qb_ipc_response_header res_header; struct iovec iov[1]; int32_t res; /* * tell the server to exit */ req_header.id = IPC_MSG_REQ_SERVER_FAIL; req_header.size = sizeof(struct qb_ipc_request_header); iov[0].iov_len = req_header.size; iov[0].iov_base = &req_header; ck_assert_int_eq(QB_TRUE, qb_ipcc_is_connected(conn)); res = qb_ipcc_sendv_recv(conn, iov, 1, &res_header, sizeof(struct qb_ipc_response_header), -1); /* * confirm we get -ENOTCONN or ECONNRESET */ if (res != -ECONNRESET && res != -ENOTCONN) { qb_log(LOG_ERR, "id:%d size:%d", res_header.id, res_header.size); ck_assert_int_eq(res, -ENOTCONN); } } static void kill_server(pid_t pid) { kill(pid, SIGTERM); waitpid(pid, NULL, 0); } static int32_t verify_graceful_stop(pid_t pid) { int wait_rc = 0; int status = 0; int rc = 0; int tries; /* We need the server to be able to exit by itself */ for (tries = 10; tries >= 0; tries--) { sleep(1); wait_rc = waitpid(pid, &status, WNOHANG); if (wait_rc > 0) { break; } } ck_assert_int_eq(wait_rc, pid); rc = WIFEXITED(status); if (rc) { rc = WEXITSTATUS(status); ck_assert_int_eq(rc, 0); } else { fail_if(rc == 0); } return 0; } struct my_req { struct qb_ipc_request_header hdr; char message[1024 * 1024]; }; static struct my_req request; static int32_t send_and_check(int32_t req_id, uint32_t size, int32_t ms_timeout, int32_t expect_perfection) { struct qb_ipc_response_header res_header; int32_t res; int32_t try_times = 0; uint32_t max_size = MAX_MSG_SIZE; request.hdr.id = req_id; request.hdr.size = sizeof(struct qb_ipc_request_header) + size; /* check that we can't send a message that is too big * and we get the right return code. */ res = qb_ipcc_send(conn, &request, max_size*2); ck_assert_int_eq(res, -EMSGSIZE); repeat_send: res = qb_ipcc_send(conn, &request, request.hdr.size); try_times++; if (res < 0) { if (res == -EAGAIN && try_times < 10) { goto repeat_send; } else { if (res == -EAGAIN && try_times >= 10) { fc_enabled = QB_TRUE; } errno = -res; qb_perror(LOG_INFO, "qb_ipcc_send"); return res; } } if (req_id == IPC_MSG_REQ_DISPATCH) { res = qb_ipcc_event_recv(conn, &res_header, sizeof(struct qb_ipc_response_header), ms_timeout); } else { res = qb_ipcc_recv(conn, &res_header, sizeof(struct qb_ipc_response_header), ms_timeout); } if (res == -EINTR) { return -1; } if (res == -EAGAIN || res == -ETIMEDOUT) { fc_enabled = QB_TRUE; qb_perror(LOG_DEBUG, "qb_ipcc_recv"); return res; } if (expect_perfection) { ck_assert_int_eq(res, sizeof(struct qb_ipc_response_header)); ck_assert_int_eq(res_header.id, req_id + 1); ck_assert_int_eq(res_header.size, sizeof(struct qb_ipc_response_header)); } return res; } static void test_ipc_txrx_timeout(void) { struct qb_ipc_request_header req_header; struct qb_ipc_response_header res_header; struct iovec iov[1]; int32_t res; int32_t c = 0; int32_t j = 0; pid_t pid; uint32_t max_size = MAX_MSG_SIZE; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, max_size); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); /* The dispatch response will only come over * the event channel, we want to verify the receive times * out when an event is returned with no response */ req_header.id = IPC_MSG_REQ_DISPATCH; req_header.size = sizeof(struct qb_ipc_request_header); iov[0].iov_len = req_header.size; iov[0].iov_base = &req_header; res = qb_ipcc_sendv_recv(conn, iov, 1, &res_header, sizeof(struct qb_ipc_response_header), 5000); ck_assert_int_eq(res, -ETIMEDOUT); request_server_exit(); verify_graceful_stop(pid); /* * wait a bit for the server to die. */ sleep(1); /* * this needs to free up the shared mem */ qb_ipcc_disconnect(conn); } static int32_t recv_timeout = -1; static void test_ipc_txrx(void) { int32_t j; int32_t c = 0; size_t size; pid_t pid; uint32_t max_size = MAX_MSG_SIZE; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, max_size); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); size = QB_MIN(sizeof(struct qb_ipc_request_header), 64); for (j = 1; j < 19; j++) { size *= 2; if (size >= max_size) break; if (send_and_check(IPC_MSG_REQ_TX_RX, size, recv_timeout, QB_TRUE) < 0) { break; } } if (turn_on_fc) { /* can't signal server to shutdown if flow control is on */ ck_assert_int_eq(fc_enabled, QB_TRUE); qb_ipcc_disconnect(conn); /* TODO - figure out why this sleep is necessary */ sleep(1); kill_server(pid); } else { request_server_exit(); qb_ipcc_disconnect(conn); verify_graceful_stop(pid); } } static void test_ipc_exit(void) { struct qb_ipc_request_header req_header; struct qb_ipc_response_header res_header; struct iovec iov[1]; int32_t res; int32_t c = 0; int32_t j = 0; pid_t pid; uint32_t max_size = MAX_MSG_SIZE; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, max_size); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); req_header.id = IPC_MSG_REQ_TX_RX; req_header.size = sizeof(struct qb_ipc_request_header); iov[0].iov_len = req_header.size; iov[0].iov_base = &req_header; res = qb_ipcc_sendv_recv(conn, iov, 1, &res_header, sizeof(struct qb_ipc_response_header), -1); ck_assert_int_eq(res, sizeof(struct qb_ipc_response_header)); request_server_exit(); verify_graceful_stop(pid); /* * wait a bit for the server to die. */ sleep(1); /* * this needs to free up the shared mem */ qb_ipcc_disconnect(conn); } START_TEST(test_ipc_exit_us) { qb_enter(); ipc_type = QB_IPC_SOCKET; ipc_name = __func__; recv_timeout = 5000; test_ipc_exit(); qb_leave(); } END_TEST START_TEST(test_ipc_exit_shm) { qb_enter(); ipc_type = QB_IPC_SHM; ipc_name = __func__; recv_timeout = 1000; test_ipc_exit(); qb_leave(); } END_TEST START_TEST(test_ipc_txrx_shm_timeout) { qb_enter(); ipc_type = QB_IPC_SHM; ipc_name = __func__; test_ipc_txrx_timeout(); qb_leave(); } END_TEST START_TEST(test_ipc_txrx_us_timeout) { qb_enter(); ipc_type = QB_IPC_SOCKET; ipc_name = __func__; test_ipc_txrx_timeout(); qb_leave(); } END_TEST START_TEST(test_ipc_txrx_shm_tmo) { qb_enter(); turn_on_fc = QB_FALSE; ipc_type = QB_IPC_SHM; ipc_name = __func__; recv_timeout = 1000; test_ipc_txrx(); qb_leave(); } END_TEST START_TEST(test_ipc_txrx_shm_block) { qb_enter(); turn_on_fc = QB_FALSE; ipc_type = QB_IPC_SHM; ipc_name = __func__; recv_timeout = -1; test_ipc_txrx(); qb_leave(); } END_TEST START_TEST(test_ipc_fc_shm) { qb_enter(); turn_on_fc = QB_TRUE; ipc_type = QB_IPC_SHM; recv_timeout = 500; ipc_name = __func__; test_ipc_txrx(); qb_leave(); } END_TEST START_TEST(test_ipc_txrx_us_block) { qb_enter(); turn_on_fc = QB_FALSE; ipc_type = QB_IPC_SOCKET; ipc_name = __func__; recv_timeout = -1; test_ipc_txrx(); qb_leave(); } END_TEST START_TEST(test_ipc_txrx_us_tmo) { qb_enter(); turn_on_fc = QB_FALSE; ipc_type = QB_IPC_SOCKET; ipc_name = __func__; recv_timeout = 1000; test_ipc_txrx(); qb_leave(); } END_TEST START_TEST(test_ipc_fc_us) { qb_enter(); turn_on_fc = QB_TRUE; ipc_type = QB_IPC_SOCKET; recv_timeout = 500; ipc_name = __func__; test_ipc_txrx(); qb_leave(); } END_TEST struct my_res { struct qb_ipc_response_header hdr; char message[1024 * 1024]; }; static void test_ipc_dispatch(void) { int32_t j; int32_t c = 0; pid_t pid; int32_t size; uint32_t max_size = MAX_MSG_SIZE; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, max_size); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); size = QB_MIN(sizeof(struct qb_ipc_request_header), 64); for (j = 1; j < 19; j++) { size *= 2; if (size >= max_size) break; if (send_and_check(IPC_MSG_REQ_DISPATCH, size, recv_timeout, QB_TRUE) < 0) { break; } } request_server_exit(); qb_ipcc_disconnect(conn); verify_graceful_stop(pid); } START_TEST(test_ipc_disp_us) { qb_enter(); ipc_type = QB_IPC_SOCKET; ipc_name = __func__; test_ipc_dispatch(); qb_leave(); } END_TEST static int32_t events_received; static int32_t count_stress_events(int32_t fd, int32_t revents, void *data) { struct { struct qb_ipc_response_header hdr __attribute__ ((aligned(8))); char data[GIANT_MSG_DATA_SIZE] __attribute__ ((aligned(8))); uint32_t sent_msgs __attribute__ ((aligned(8))); } __attribute__ ((aligned(8))) giant_event_recv; qb_loop_t *cl = (qb_loop_t*)data; int32_t res; res = qb_ipcc_event_recv(conn, &giant_event_recv, sizeof(giant_event_recv), -1); if (res > 0) { events_received++; if ((events_received % 1000) == 0) { qb_log(LOG_DEBUG, "RECV: %d stress events processed", events_received); if (res != sizeof(giant_event_recv)) { qb_log(LOG_DEBUG, "Unexpected recv size, expected %d got %d", res, sizeof(giant_event_recv)); ck_assert_int_eq(res, sizeof(giant_event_recv)); } else if (giant_event_recv.sent_msgs != events_received) { qb_log(LOG_DEBUG, "Server event mismatch. Server thinks we got %d msgs, but we only received %d", giant_event_recv.sent_msgs, events_received); /* This indicates that data corruption is occurring. Since the events * received is placed at the end of the giant msg, it is possible * that buffers were not allocated correctly resulting in us * reading/writing to uninitialized memeory at some point. */ ck_assert_int_eq(giant_event_recv.sent_msgs, events_received); } } } else if (res != -EAGAIN) { qb_perror(LOG_DEBUG, "count_stress_events"); qb_loop_stop(cl); return -1; } if (events_received >= num_stress_events) { qb_loop_stop(cl); return -1; } return 0; } static int32_t count_bulk_events(int32_t fd, int32_t revents, void *data) { qb_loop_t *cl = (qb_loop_t*)data; struct qb_ipc_response_header res_header; int32_t res; res = qb_ipcc_event_recv(conn, &res_header, sizeof(struct qb_ipc_response_header), -1); if (res > 0) { events_received++; } if (events_received >= num_bulk_events) { qb_loop_stop(cl); return -1; } return 0; } +static void +test_ipc_stress_connections(void) +{ + int32_t c = 0; + int32_t j = 0; + uint32_t max_size = MAX_MSG_SIZE; + int32_t connections = 0; + pid_t pid; + + multiple_connections = QB_TRUE; + + qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_CLEAR_ALL, + QB_LOG_FILTER_FILE, "*", LOG_TRACE); + qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD, + QB_LOG_FILTER_FILE, "*", LOG_INFO); + qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE); + + pid = run_function_in_new_process(run_ipc_server); + fail_if(pid == -1); + sleep(1); + + for (connections = 1; connections < 70000; connections++) { + if (conn) { + qb_ipcc_disconnect(conn); + conn = NULL; + } + do { + conn = qb_ipcc_connect(ipc_name, max_size); + if (conn == NULL) { + j = waitpid(pid, NULL, WNOHANG); + ck_assert_int_eq(j, 0); + sleep(1); + c++; + } + } while (conn == NULL && c < 5); + fail_if(conn == NULL); + + if (((connections+1) % 1000) == 0) { + qb_log(LOG_INFO, "%d ipc connections made", connections+1); + } + } + multiple_connections = QB_FALSE; + + request_server_exit(); + verify_graceful_stop(pid); + qb_ipcc_disconnect(conn); + + qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_CLEAR_ALL, + QB_LOG_FILTER_FILE, "*", LOG_TRACE); + qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD, + QB_LOG_FILTER_FILE, "*", LOG_TRACE); + qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE); +} + static void test_ipc_bulk_events(void) { int32_t c = 0; int32_t j = 0; pid_t pid; int32_t res; qb_loop_t *cl; int32_t fd; uint32_t max_size = MAX_MSG_SIZE; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, max_size); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); events_received = 0; cl = qb_loop_create(); res = qb_ipcc_fd_get(conn, &fd); ck_assert_int_eq(res, 0); res = qb_loop_poll_add(cl, QB_LOOP_MED, fd, POLLIN, cl, count_bulk_events); ck_assert_int_eq(res, 0); res = send_and_check(IPC_MSG_REQ_BULK_EVENTS, 0, recv_timeout, QB_TRUE); ck_assert_int_eq(res, sizeof(struct qb_ipc_response_header)); qb_loop_run(cl); ck_assert_int_eq(events_received, num_bulk_events); request_server_exit(); qb_ipcc_disconnect(conn); verify_graceful_stop(pid); } static void test_ipc_stress_test(void) { struct { struct qb_ipc_request_header hdr __attribute__ ((aligned(8))); char data[GIANT_MSG_DATA_SIZE] __attribute__ ((aligned(8))); uint32_t sent_msgs __attribute__ ((aligned(8))); } __attribute__ ((aligned(8))) giant_req; struct qb_ipc_response_header res_header; struct iovec iov[1]; int32_t c = 0; int32_t j = 0; pid_t pid; int32_t res; qb_loop_t *cl; int32_t fd; uint32_t max_size = MAX_MSG_SIZE; /* This looks strange, but it serves an important purpose. * This test forces the server to enforce the MAX_MSG_SIZE * limit from the server side, which overrides the client's * buffer limit. To verify this functionality is working * we set the client limit lower than what the server * is enforcing. */ int32_t client_buf_size = max_size - 1024; int32_t real_buf_size; enforce_server_buffer = 1; pid = run_function_in_new_process(run_ipc_server); enforce_server_buffer = 0; fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, client_buf_size); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); real_buf_size = qb_ipcc_get_buffer_size(conn); ck_assert_int_eq(real_buf_size, max_size); qb_log(LOG_DEBUG, "Testing %d iterations of EVENT msg passing.", num_stress_events); events_received = 0; cl = qb_loop_create(); res = qb_ipcc_fd_get(conn, &fd); ck_assert_int_eq(res, 0); res = qb_loop_poll_add(cl, QB_LOOP_MED, fd, POLLIN, cl, count_stress_events); ck_assert_int_eq(res, 0); res = send_and_check(IPC_MSG_REQ_STRESS_EVENT, 0, recv_timeout, QB_TRUE); qb_loop_run(cl); ck_assert_int_eq(events_received, num_stress_events); giant_req.hdr.id = IPC_MSG_REQ_SERVER_FAIL; giant_req.hdr.size = sizeof(giant_req); if (giant_req.hdr.size <= client_buf_size) { ck_assert_int_eq(1, 0); } iov[0].iov_len = giant_req.hdr.size; iov[0].iov_base = &giant_req; res = qb_ipcc_sendv_recv(conn, iov, 1, &res_header, sizeof(struct qb_ipc_response_header), -1); if (res != -ECONNRESET && res != -ENOTCONN) { qb_log(LOG_ERR, "id:%d size:%d", res_header.id, res_header.size); ck_assert_int_eq(res, -ENOTCONN); } qb_ipcc_disconnect(conn); verify_graceful_stop(pid); } START_TEST(test_ipc_stress_test_us) { qb_enter(); send_event_on_created = QB_FALSE; ipc_type = QB_IPC_SOCKET; ipc_name = __func__; test_ipc_stress_test(); qb_leave(); } END_TEST +START_TEST(test_ipc_stress_connections_us) +{ + qb_enter(); + ipc_type = QB_IPC_SOCKET; + ipc_name = __func__; + test_ipc_stress_connections(); + qb_leave(); +} +END_TEST + START_TEST(test_ipc_bulk_events_us) { qb_enter(); send_event_on_created = QB_FALSE; ipc_type = QB_IPC_SOCKET; ipc_name = __func__; test_ipc_bulk_events(); qb_leave(); } END_TEST static void test_ipc_event_on_created(void) { int32_t c = 0; int32_t j = 0; pid_t pid; int32_t res; qb_loop_t *cl; int32_t fd; uint32_t max_size = MAX_MSG_SIZE; num_bulk_events = 1; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, max_size); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); events_received = 0; cl = qb_loop_create(); res = qb_ipcc_fd_get(conn, &fd); ck_assert_int_eq(res, 0); res = qb_loop_poll_add(cl, QB_LOOP_MED, fd, POLLIN, cl, count_bulk_events); ck_assert_int_eq(res, 0); qb_loop_run(cl); ck_assert_int_eq(events_received, num_bulk_events); request_server_exit(); qb_ipcc_disconnect(conn); verify_graceful_stop(pid); } START_TEST(test_ipc_event_on_created_us) { qb_enter(); send_event_on_created = QB_TRUE; ipc_type = QB_IPC_SOCKET; ipc_name = __func__; test_ipc_event_on_created(); qb_leave(); } END_TEST static void test_ipc_disconnect_after_created(void) { struct qb_ipc_request_header req_header; struct qb_ipc_response_header res_header; struct iovec iov[1]; int32_t c = 0; int32_t j = 0; pid_t pid; int32_t res; uint32_t max_size = MAX_MSG_SIZE; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, max_size); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); ck_assert_int_eq(QB_TRUE, qb_ipcc_is_connected(conn)); req_header.id = IPC_MSG_REQ_SERVER_DISCONNECT; req_header.size = sizeof(struct qb_ipc_request_header); iov[0].iov_len = req_header.size; iov[0].iov_base = &req_header; res = qb_ipcc_sendv_recv(conn, iov, 1, &res_header, sizeof(struct qb_ipc_response_header), -1); /* * confirm we get -ENOTCONN or -ECONNRESET */ if (res != -ECONNRESET && res != -ENOTCONN) { qb_log(LOG_ERR, "id:%d size:%d", res_header.id, res_header.size); ck_assert_int_eq(res, -ENOTCONN); } ck_assert_int_eq(QB_FALSE, qb_ipcc_is_connected(conn)); qb_ipcc_disconnect(conn); kill_server(pid); } START_TEST(test_ipc_disconnect_after_created_us) { qb_enter(); disconnect_after_created = QB_TRUE; ipc_type = QB_IPC_SOCKET; ipc_name = __func__; test_ipc_disconnect_after_created(); qb_leave(); } END_TEST static void test_ipc_server_fail(void) { int32_t j; int32_t c = 0; pid_t pid; uint32_t max_size = MAX_MSG_SIZE; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, max_size); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); request_server_exit(); ck_assert_int_eq(QB_FALSE, qb_ipcc_is_connected(conn)); qb_ipcc_disconnect(conn); verify_graceful_stop(pid); } START_TEST(test_ipc_server_fail_soc) { qb_enter(); ipc_type = QB_IPC_SOCKET; ipc_name = __func__; test_ipc_server_fail(); qb_leave(); } END_TEST START_TEST(test_ipc_disp_shm) { qb_enter(); ipc_type = QB_IPC_SHM; ipc_name = __func__; test_ipc_dispatch(); qb_leave(); } END_TEST START_TEST(test_ipc_stress_test_shm) { qb_enter(); send_event_on_created = QB_FALSE; ipc_type = QB_IPC_SHM; ipc_name = __func__; test_ipc_stress_test(); qb_leave(); } END_TEST +START_TEST(test_ipc_stress_connections_shm) +{ + qb_enter(); + ipc_type = QB_IPC_SHM; + ipc_name = __func__; + test_ipc_stress_connections(); + qb_leave(); +} +END_TEST + START_TEST(test_ipc_bulk_events_shm) { qb_enter(); ipc_type = QB_IPC_SHM; ipc_name = __func__; test_ipc_bulk_events(); qb_leave(); } END_TEST START_TEST(test_ipc_event_on_created_shm) { qb_enter(); send_event_on_created = QB_TRUE; ipc_type = QB_IPC_SHM; ipc_name = __func__; test_ipc_event_on_created(); qb_leave(); } END_TEST START_TEST(test_ipc_server_fail_shm) { qb_enter(); ipc_type = QB_IPC_SHM; ipc_name = __func__; test_ipc_server_fail(); qb_leave(); } END_TEST static void test_ipc_service_ref_count(void) { int32_t c = 0; int32_t j = 0; pid_t pid; uint32_t max_size = MAX_MSG_SIZE; reference_count_test = QB_TRUE; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, max_size); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); sleep(5); kill_server(pid); } START_TEST(test_ipc_service_ref_count_shm) { qb_enter(); ipc_type = QB_IPC_SHM; ipc_name = __func__; test_ipc_service_ref_count(); qb_leave(); } END_TEST START_TEST(test_ipc_service_ref_count_us) { qb_enter(); ipc_type = QB_IPC_SOCKET; ipc_name = __func__; test_ipc_service_ref_count(); qb_leave(); } END_TEST static void test_max_dgram_size(void) { /* most implementations will not let you set a dgram buffer * of 1 million bytes. This test verifies that the we can detect * the max dgram buffersize regardless, and that the value we detect * is consistent. */ int32_t init; int32_t i; qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_REMOVE, QB_LOG_FILTER_FILE, "*", LOG_TRACE); init = qb_ipcc_verify_dgram_max_msg_size(1000000); fail_if(init <= 0); for (i = 0; i < 100; i++) { int try = qb_ipcc_verify_dgram_max_msg_size(1000000); ck_assert_int_eq(init, try); } qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD, QB_LOG_FILTER_FILE, "*", LOG_TRACE); } START_TEST(test_ipc_max_dgram_size) { qb_enter(); test_max_dgram_size(); qb_leave(); } END_TEST static Suite * make_shm_suite(void) { TCase *tc; Suite *s = suite_create("shm"); tc = tcase_create("ipc_txrx_shm_timeout"); tcase_add_test(tc, test_ipc_txrx_shm_timeout); tcase_set_timeout(tc, 30); suite_add_tcase(s, tc); tc = tcase_create("ipc_server_fail_shm"); tcase_add_test(tc, test_ipc_server_fail_shm); tcase_set_timeout(tc, 8); suite_add_tcase(s, tc); tc = tcase_create("ipc_txrx_shm_block"); tcase_add_test(tc, test_ipc_txrx_shm_block); tcase_set_timeout(tc, 8); suite_add_tcase(s, tc); tc = tcase_create("ipc_txrx_shm_tmo"); tcase_add_test(tc, test_ipc_txrx_shm_tmo); tcase_set_timeout(tc, 8); suite_add_tcase(s, tc); tc = tcase_create("ipc_fc_shm"); tcase_add_test(tc, test_ipc_fc_shm); tcase_set_timeout(tc, 8); suite_add_tcase(s, tc); tc = tcase_create("ipc_dispatch_shm"); tcase_add_test(tc, test_ipc_disp_shm); tcase_set_timeout(tc, 16); suite_add_tcase(s, tc); tc = tcase_create("ipc_stress_test_shm"); tcase_add_test(tc, test_ipc_stress_test_shm); tcase_set_timeout(tc, 16); suite_add_tcase(s, tc); tc = tcase_create("ipc_bulk_events_shm"); tcase_add_test(tc, test_ipc_bulk_events_shm); tcase_set_timeout(tc, 16); suite_add_tcase(s, tc); tc = tcase_create("ipc_exit_shm"); tcase_add_test(tc, test_ipc_exit_shm); tcase_set_timeout(tc, 8); suite_add_tcase(s, tc); tc = tcase_create("ipc_event_on_created_shm"); tcase_add_test(tc, test_ipc_event_on_created_shm); tcase_set_timeout(tc, 10); suite_add_tcase(s, tc); tc = tcase_create("ipc_service_ref_count_shm"); tcase_add_test(tc, test_ipc_service_ref_count_shm); tcase_set_timeout(tc, 10); suite_add_tcase(s, tc); + tc = tcase_create("ipc_stress_connections"); + tcase_add_test(tc, test_ipc_stress_connections_shm); + tcase_set_timeout(tc, 200); + suite_add_tcase(s, tc); + return s; } static Suite * make_soc_suite(void) { Suite *s = suite_create("socket"); TCase *tc; tc = tcase_create("ipc_txrx_us_timeout"); tcase_add_test(tc, test_ipc_txrx_us_timeout); tcase_set_timeout(tc, 30); suite_add_tcase(s, tc); tc = tcase_create("ipc_max_dgram_size"); tcase_add_test(tc, test_ipc_max_dgram_size); tcase_set_timeout(tc, 30); suite_add_tcase(s, tc); tc = tcase_create("ipc_server_fail_soc"); tcase_add_test(tc, test_ipc_server_fail_soc); tcase_set_timeout(tc, 8); suite_add_tcase(s, tc); tc = tcase_create("ipc_txrx_us_block"); tcase_add_test(tc, test_ipc_txrx_us_block); tcase_set_timeout(tc, 8); suite_add_tcase(s, tc); tc = tcase_create("ipc_txrx_us_tmo"); tcase_add_test(tc, test_ipc_txrx_us_tmo); tcase_set_timeout(tc, 8); suite_add_tcase(s, tc); tc = tcase_create("ipc_fc_us"); tcase_add_test(tc, test_ipc_fc_us); tcase_set_timeout(tc, 8); suite_add_tcase(s, tc); tc = tcase_create("ipc_exit_us"); tcase_add_test(tc, test_ipc_exit_us); tcase_set_timeout(tc, 8); suite_add_tcase(s, tc); tc = tcase_create("ipc_dispatch_us"); tcase_add_test(tc, test_ipc_disp_us); tcase_set_timeout(tc, 16); suite_add_tcase(s, tc); tc = tcase_create("ipc_stress_test_us"); tcase_add_test(tc, test_ipc_stress_test_us); tcase_set_timeout(tc, 60); suite_add_tcase(s, tc); tc = tcase_create("ipc_bulk_events_us"); tcase_add_test(tc, test_ipc_bulk_events_us); tcase_set_timeout(tc, 16); suite_add_tcase(s, tc); tc = tcase_create("ipc_event_on_created_us"); tcase_add_test(tc, test_ipc_event_on_created_us); tcase_set_timeout(tc, 10); suite_add_tcase(s, tc); tc = tcase_create("ipc_disconnect_after_created_us"); tcase_add_test(tc, test_ipc_disconnect_after_created_us); tcase_set_timeout(tc, 10); suite_add_tcase(s, tc); tc = tcase_create("ipc_service_ref_count_us"); tcase_add_test(tc, test_ipc_service_ref_count_us); tcase_set_timeout(tc, 10); suite_add_tcase(s, tc); + tc = tcase_create("ipc_stress_connections"); + tcase_add_test(tc, test_ipc_stress_connections_us); + tcase_set_timeout(tc, 200); + suite_add_tcase(s, tc); + return s; } int32_t main(void) { int32_t number_failed; SRunner *sr; Suite *s; int32_t do_shm_tests = QB_TRUE; #ifdef DISABLE_IPC_SHM do_shm_tests = QB_FALSE; #endif /* DISABLE_IPC_SHM */ s = make_soc_suite(); sr = srunner_create(s); if (do_shm_tests) { srunner_add_suite(sr, make_shm_suite()); } qb_log_init("check", LOG_USER, LOG_EMERG); qb_log_ctl(QB_LOG_SYSLOG, QB_LOG_CONF_ENABLED, QB_FALSE); qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD, QB_LOG_FILTER_FILE, "*", LOG_TRACE); qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE); qb_log_format_set(QB_LOG_STDERR, "lib/%f|%l| %b"); srunner_run_all(sr, CK_VERBOSE); number_failed = srunner_ntests_failed(sr); srunner_free(sr); return (number_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE; }