diff --git a/lib/ipc_socket.c b/lib/ipc_socket.c index 47ad0c8..6a22e75 100644 --- a/lib/ipc_socket.c +++ b/lib/ipc_socket.c @@ -1,645 +1,645 @@ /* * Copyright (C) 2010,2013 Red Hat, Inc. * * Author: Angus Salkeld * * This file is part of libqb. * * libqb is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation, either version 2.1 of the License, or * (at your option) any later version. * * libqb is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with libqb. If not, see . */ #include "os_base.h" #ifdef HAVE_SYS_UN_H #include #endif /* HAVE_SYS_UN_H */ #ifdef HAVE_SYS_MMAN_H #include #endif #include #include #include #include #include "util_int.h" #include "ipc_int.h" struct ipc_us_control { int32_t sent; int32_t flow_control; }; #define SHM_CONTROL_SIZE (3 * sizeof(struct ipc_us_control)) static void set_sock_addr(struct sockaddr_un *address, const char *socket_name) { memset(address, 0, sizeof(struct sockaddr_un)); address->sun_family = AF_UNIX; #ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN address->sun_len = QB_SUN_LEN(address); #endif #if defined(QB_LINUX) || defined(QB_CYGWIN) snprintf(address->sun_path + 1, UNIX_PATH_MAX - 1, "%s", socket_name); #else snprintf(address->sun_path, UNIX_PATH_MAX, "%s/%s", SOCKETDIR, socket_name); #endif } static int32_t qb_ipc_dgram_sock_setup(const char *base_name, const char *service_name, int32_t * sock_pt) { int32_t request_fd; struct sockaddr_un local_address; int32_t res = 0; char sock_path[PATH_MAX]; request_fd = socket(PF_UNIX, SOCK_DGRAM, 0); if (request_fd == -1) { return -errno; } qb_socket_nosigpipe(request_fd); res = qb_sys_fd_nonblock_cloexec_set(request_fd); if (res < 0) { goto error_connect; } snprintf(sock_path, PATH_MAX, "%s-%s", base_name, service_name); set_sock_addr(&local_address, sock_path); res = bind(request_fd, (struct sockaddr *)&local_address, sizeof(local_address)); if (res < 0) { goto error_connect; } *sock_pt = request_fd; return 0; error_connect: close(request_fd); *sock_pt = -1; return res; } static int32_t set_sock_size(int sockfd, size_t max_msg_size) { int32_t rc; unsigned int optval; socklen_t optlen = sizeof(optval); rc = getsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, &optval, &optlen); qb_util_log(LOG_DEBUG, "%d: getsockopt(%d, needed:%d) actual:%d", rc, sockfd, max_msg_size, optval); if (rc == 0 && optval < max_msg_size) { optval = max_msg_size; optlen = sizeof(optval); rc = setsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, &optval, optlen); } return rc; } /* * bind to "base_name-local_name" * connect to "base_name-remote_name" * output sock_pt */ static int32_t qb_ipc_dgram_sock_connect(const char *base_name, const char *local_name, const char *remote_name, int32_t max_msg_size, int32_t * sock_pt) { char sock_path[PATH_MAX]; struct sockaddr_un remote_address; int32_t res = qb_ipc_dgram_sock_setup(base_name, local_name, sock_pt); if (res < 0) { return res; } snprintf(sock_path, PATH_MAX, "%s-%s", base_name, remote_name); set_sock_addr(&remote_address, sock_path); if (connect(*sock_pt, (struct sockaddr *)&remote_address, QB_SUN_LEN(&remote_address)) == -1) { res = -errno; goto error_connect; } return set_sock_size(*sock_pt, max_msg_size); error_connect: close(*sock_pt); *sock_pt = -1; return res; } static int32_t _finish_connecting(struct qb_ipc_one_way *one_way) { struct sockaddr_un remote_address; int res; int error; int retry = 0; set_sock_addr(&remote_address, one_way->u.us.sock_name); /* this retry loop is here to help connecting when trying to send * an event right after connection setup. */ do { errno = 0; res = connect(one_way->u.us.sock, (struct sockaddr *)&remote_address, QB_SUN_LEN(&remote_address)); if (res == -1) { error = -errno; qb_util_perror(LOG_DEBUG, "error calling connect()"); retry++; usleep(100000); } } while (res == -1 && retry < 10); if (res == -1) { return error; } free(one_way->u.us.sock_name); one_way->u.us.sock_name = NULL; return set_sock_size(one_way->u.us.sock, one_way->max_msg_size); } /* * client functions * -------------------------------------------------------- */ static void qb_ipcc_us_disconnect(struct qb_ipcc_connection *c) { munmap(c->request.u.us.shared_data, SHM_CONTROL_SIZE); unlink(c->request.u.us.shared_file_name); close(c->request.u.us.sock); close(c->event.u.us.sock); } static ssize_t qb_ipc_socket_send(struct qb_ipc_one_way *one_way, const void *msg_ptr, size_t msg_len) { ssize_t rc = 0; struct ipc_us_control *ctl; ctl = (struct ipc_us_control *)one_way->u.us.shared_data; if (one_way->u.us.sock_name) { rc = _finish_connecting(one_way); if (rc < 0) { qb_util_log(LOG_ERR, "socket connect-on-send"); return rc; } } qb_sigpipe_ctl(QB_SIGPIPE_IGNORE); rc = send(one_way->u.us.sock, msg_ptr, msg_len, MSG_NOSIGNAL); if (rc == -1) { rc = -errno; - if (errno != EAGAIN) { + if (errno != EAGAIN && errno != ENOBUFS) { qb_util_perror(LOG_ERR, "socket_send:send"); } } qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT); if (ctl && rc == msg_len) { qb_atomic_int_inc(&ctl->sent); } return rc; } static ssize_t qb_ipc_socket_sendv(struct qb_ipc_one_way *one_way, const struct iovec *iov, size_t iov_len) { int32_t rc; struct ipc_us_control *ctl; ctl = (struct ipc_us_control *)one_way->u.us.shared_data; qb_sigpipe_ctl(QB_SIGPIPE_IGNORE); if (one_way->u.us.sock_name) { rc = _finish_connecting(one_way); if (rc < 0) { qb_util_perror(LOG_ERR, "socket connect-on-sendv"); return rc; } } rc = writev(one_way->u.us.sock, iov, iov_len); if (rc == -1) { rc = -errno; - if (errno != EAGAIN) { + if (errno != EAGAIN && errno != ENOBUFS) { qb_util_perror(LOG_ERR, "socket_sendv:writev %d", one_way->u.us.sock); } } qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT); if (ctl && rc > 0) { qb_atomic_int_inc(&ctl->sent); } return rc; } /* * recv a message of unknown size. */ static ssize_t qb_ipc_us_recv_at_most(struct qb_ipc_one_way *one_way, void *msg, size_t len, int32_t timeout) { int32_t result; int32_t final_rc = 0; int32_t to_recv = 0; char *data = msg; struct ipc_us_control *ctl = NULL; int32_t time_waited = 0; int32_t time_to_wait = timeout; if (timeout == -1) { time_to_wait = 1000; } qb_sigpipe_ctl(QB_SIGPIPE_IGNORE); retry_peek: result = recv(one_way->u.us.sock, data, sizeof(struct qb_ipc_request_header), MSG_NOSIGNAL | MSG_PEEK); if (result == -1) { if (errno == EAGAIN && (time_waited < timeout || timeout == -1)) { result = qb_ipc_us_ready(one_way, NULL, time_to_wait, POLLIN); time_waited += time_to_wait; goto retry_peek; } else { return -errno; } } if (result >= sizeof(struct qb_ipc_request_header)) { struct qb_ipc_request_header *hdr = NULL; hdr = (struct qb_ipc_request_header *)msg; to_recv = hdr->size; } result = recv(one_way->u.us.sock, data, to_recv, MSG_NOSIGNAL | MSG_WAITALL); if (result == -1) { final_rc = -errno; goto cleanup_sigpipe; } else if (result == 0) { qb_util_log(LOG_DEBUG, "recv == 0 -> ENOTCONN"); final_rc = -ENOTCONN; goto cleanup_sigpipe; } final_rc = result; ctl = (struct ipc_us_control *)one_way->u.us.shared_data; if (ctl) { (void)qb_atomic_int_dec_and_test(&ctl->sent); } cleanup_sigpipe: qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT); return final_rc; } static void qb_ipc_us_fc_set(struct qb_ipc_one_way *one_way, int32_t fc_enable) { struct ipc_us_control *ctl = (struct ipc_us_control *)one_way->u.us.shared_data; qb_util_log(LOG_TRACE, "setting fc to %d", fc_enable); qb_atomic_int_set(&ctl->flow_control, fc_enable); } static int32_t qb_ipc_us_fc_get(struct qb_ipc_one_way *one_way) { struct ipc_us_control *ctl = (struct ipc_us_control *)one_way->u.us.shared_data; return qb_atomic_int_get(&ctl->flow_control); } static ssize_t qb_ipc_us_q_len_get(struct qb_ipc_one_way *one_way) { struct ipc_us_control *ctl = (struct ipc_us_control *)one_way->u.us.shared_data; return qb_atomic_int_get(&ctl->sent); } int32_t qb_ipcc_us_connect(struct qb_ipcc_connection * c, struct qb_ipc_connection_response * r) { int32_t res; char path[PATH_MAX]; int32_t fd_hdr; char *shm_ptr; qb_atomic_init(); c->needs_sock_for_poll = QB_FALSE; c->funcs.send = qb_ipc_socket_send; c->funcs.sendv = qb_ipc_socket_sendv; c->funcs.recv = qb_ipc_us_recv_at_most; c->funcs.fc_get = qb_ipc_us_fc_get; c->funcs.disconnect = qb_ipcc_us_disconnect; fd_hdr = qb_sys_mmap_file_open(path, r->request, SHM_CONTROL_SIZE, O_RDWR); if (fd_hdr < 0) { res = fd_hdr; errno = -fd_hdr; qb_util_perror(LOG_ERR, "couldn't open file for mmap"); return res; } (void)strlcpy(c->request.u.us.shared_file_name, r->request, NAME_MAX); shm_ptr = mmap(0, SHM_CONTROL_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd_hdr, 0); if (shm_ptr == MAP_FAILED) { res = -errno; qb_util_perror(LOG_ERR, "couldn't create mmap for header"); goto cleanup_hdr; } c->request.u.us.shared_data = shm_ptr; c->response.u.us.shared_data = shm_ptr + sizeof(struct ipc_us_control); c->event.u.us.shared_data = shm_ptr + (2 * sizeof(struct ipc_us_control)); close(fd_hdr); res = qb_ipc_dgram_sock_connect(r->response, "response", "request", r->max_msg_size, &c->request.u.us.sock); if (res != 0) { goto cleanup_hdr; } c->response.u.us.sock = c->request.u.us.sock; res = qb_ipc_dgram_sock_connect(r->response, "event", "event-tx", r->max_msg_size, &c->event.u.us.sock); if (res != 0) { goto cleanup_hdr; } return 0; cleanup_hdr: close(fd_hdr); close(c->event.u.us.sock); close(c->request.u.us.sock); unlink(r->request); munmap(c->request.u.us.shared_data, SHM_CONTROL_SIZE); return res; } /* * service functions * -------------------------------------------------------- */ static int32_t _sock_connection_liveliness(int32_t fd, int32_t revents, void *data) { struct qb_ipcs_connection *c = (struct qb_ipcs_connection *)data; qb_util_log(LOG_DEBUG, "LIVENESS: fd %d event %d conn (%s)", fd, revents, c->description); if (revents & POLLNVAL) { qb_util_log(LOG_DEBUG, "NVAL conn (%s)", c->description); qb_ipcs_disconnect(c); return -EINVAL; } if (revents & POLLHUP) { qb_util_log(LOG_DEBUG, "HUP conn (%s)", c->description); qb_ipcs_disconnect(c); return -ESHUTDOWN; } return 0; } static int32_t _sock_add_to_mainloop(struct qb_ipcs_connection *c) { int res; res = c->service->poll_fns.dispatch_add(c->service->poll_priority, c->request.u.us.sock, POLLIN | POLLPRI | POLLNVAL, c, qb_ipcs_dispatch_connection_request); if (res < 0) { qb_util_log(LOG_ERR, "Error adding socket to mainloop (%s).", c->description); return res; } qb_ipcs_connection_ref(c); res = c->service->poll_fns.dispatch_add(c->service->poll_priority, c->setup.u.us.sock, POLLIN | POLLPRI | POLLNVAL, c, _sock_connection_liveliness); qb_util_log(LOG_DEBUG, "added %d to poll loop (liveness)", c->setup.u.us.sock); if (res < 0) { qb_util_perror(LOG_ERR, "Error adding setupfd to mainloop"); (void)c->service->poll_fns.dispatch_del(c->request.u.us.sock); return res; } qb_ipcs_connection_ref(c); return res; } static void _sock_rm_from_mainloop(struct qb_ipcs_connection *c) { (void)c->service->poll_fns.dispatch_del(c->request.u.us.sock); qb_ipcs_connection_unref(c); (void)c->service->poll_fns.dispatch_del(c->setup.u.us.sock); qb_ipcs_connection_unref(c); } static void qb_ipcs_us_disconnect(struct qb_ipcs_connection *c) { qb_enter(); if (c->state == QB_IPCS_CONNECTION_ESTABLISHED || c->state == QB_IPCS_CONNECTION_ACTIVE) { _sock_rm_from_mainloop(c); qb_ipcc_us_sock_close(c->setup.u.us.sock); qb_ipcc_us_sock_close(c->request.u.us.sock); qb_ipcc_us_sock_close(c->event.u.us.sock); } if (c->state == QB_IPCS_CONNECTION_SHUTTING_DOWN || c->state == QB_IPCS_CONNECTION_ACTIVE) { munmap(c->request.u.us.shared_data, SHM_CONTROL_SIZE); unlink(c->request.u.us.shared_file_name); } } static int32_t qb_ipcs_us_connect(struct qb_ipcs_service *s, struct qb_ipcs_connection *c, struct qb_ipc_connection_response *r) { char path[PATH_MAX]; int32_t fd_hdr; int32_t res = 0; struct ipc_us_control *ctl; char *shm_ptr; qb_util_log(LOG_DEBUG, "connecting to client (%s)", c->description); c->request.u.us.sock = c->setup.u.us.sock; c->response.u.us.sock = c->setup.u.us.sock; snprintf(r->request, NAME_MAX, "qb-%s-control-%s", s->name, c->description); snprintf(r->response, NAME_MAX, "qb-%s-%s", s->name, c->description); fd_hdr = qb_sys_mmap_file_open(path, r->request, SHM_CONTROL_SIZE, O_CREAT | O_TRUNC | O_RDWR); if (fd_hdr < 0) { res = fd_hdr; errno = -fd_hdr; qb_util_perror(LOG_ERR, "couldn't create file for mmap (%s)", c->description); return res; } (void)strlcpy(r->request, path, PATH_MAX); (void)strlcpy(c->request.u.us.shared_file_name, r->request, NAME_MAX); res = chown(r->request, c->auth.uid, c->auth.gid); if (res != 0) { /* ignore res, this is just for the compiler warnings. */ res = 0; } res = chmod(r->request, c->auth.mode); if (res != 0) { /* ignore res, this is just for the compiler warnings. */ res = 0; } shm_ptr = mmap(0, SHM_CONTROL_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd_hdr, 0); if (shm_ptr == MAP_FAILED) { res = -errno; qb_util_perror(LOG_ERR, "couldn't create mmap for header (%s)", c->description); goto cleanup_hdr; } c->request.u.us.shared_data = shm_ptr; c->response.u.us.shared_data = shm_ptr + sizeof(struct ipc_us_control); c->event.u.us.shared_data = shm_ptr + (2 * sizeof(struct ipc_us_control)); ctl = (struct ipc_us_control *)c->request.u.us.shared_data; ctl->sent = 0; ctl->flow_control = 0; ctl = (struct ipc_us_control *)c->response.u.us.shared_data; ctl->sent = 0; ctl->flow_control = 0; ctl = (struct ipc_us_control *)c->event.u.us.shared_data; ctl->sent = 0; ctl->flow_control = 0; close(fd_hdr); /* request channel */ res = qb_ipc_dgram_sock_setup(r->response, "request", &c->request.u.us.sock); if (res < 0) { goto cleanup_hdr; } c->setup.u.us.sock_name = NULL; c->request.u.us.sock_name = NULL; /* response channel */ c->response.u.us.sock = c->request.u.us.sock; snprintf(path, PATH_MAX, "%s-%s", r->response, "response"); c->response.u.us.sock_name = strdup(path); /* event channel */ res = qb_ipc_dgram_sock_setup(r->response, "event-tx", &c->event.u.us.sock); if (res < 0) { goto cleanup_hdr; } snprintf(path, PATH_MAX, "%s-%s", r->response, "event"); c->event.u.us.sock_name = strdup(path); res = _sock_add_to_mainloop(c); if (res < 0) { goto cleanup_hdr; } return res; cleanup_hdr: free(c->response.u.us.sock_name); free(c->event.u.us.sock_name); close(fd_hdr); unlink(r->request); munmap(c->request.u.us.shared_data, SHM_CONTROL_SIZE); return res; } void qb_ipcs_us_init(struct qb_ipcs_service *s) { s->funcs.connect = qb_ipcs_us_connect; s->funcs.disconnect = qb_ipcs_us_disconnect; s->funcs.recv = qb_ipc_us_recv_at_most; s->funcs.peek = NULL; s->funcs.reclaim = NULL; s->funcs.send = qb_ipc_socket_send; s->funcs.sendv = qb_ipc_socket_sendv; s->funcs.fc_set = qb_ipc_us_fc_set; s->funcs.q_len_get = qb_ipc_us_q_len_get; s->needs_sock_for_poll = QB_FALSE; qb_atomic_init(); } diff --git a/lib/ipcs.c b/lib/ipcs.c index 038cd12..38188f3 100644 --- a/lib/ipcs.c +++ b/lib/ipcs.c @@ -1,936 +1,937 @@ /* * Copyright (C) 2010 Red Hat, Inc. * * Author: Angus Salkeld * * This file is part of libqb. * * libqb is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation, either version 2.1 of the License, or * (at your option) any later version. * * libqb is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with libqb. If not, see . */ #include "os_base.h" #include "util_int.h" #include "ipc_int.h" #include #include #include static void qb_ipcs_flowcontrol_set(struct qb_ipcs_connection *c, int32_t fc_enable); static int32_t new_event_notification(struct qb_ipcs_connection * c); static QB_LIST_DECLARE(qb_ipc_services); qb_ipcs_service_t * qb_ipcs_create(const char *name, int32_t service_id, enum qb_ipc_type type, struct qb_ipcs_service_handlers *handlers) { struct qb_ipcs_service *s; s = calloc(1, sizeof(struct qb_ipcs_service)); if (s == NULL) { return NULL; } if (type == QB_IPC_NATIVE) { #ifdef DISABLE_IPC_SHM s->type = QB_IPC_SOCKET; #else s->type = QB_IPC_SHM; #endif /* DISABLE_IPC_SHM */ } else { s->type = type; } s->pid = getpid(); s->needs_sock_for_poll = QB_FALSE; s->poll_priority = QB_LOOP_MED; /* Initial alloc ref */ qb_ipcs_ref(s); s->service_id = service_id; (void)strlcpy(s->name, name, NAME_MAX); s->serv_fns.connection_accept = handlers->connection_accept; s->serv_fns.connection_created = handlers->connection_created; s->serv_fns.msg_process = handlers->msg_process; s->serv_fns.connection_closed = handlers->connection_closed; s->serv_fns.connection_destroyed = handlers->connection_destroyed; qb_list_init(&s->connections); qb_list_init(&s->list); qb_list_add(&s->list, &qb_ipc_services); return s; } void qb_ipcs_poll_handlers_set(struct qb_ipcs_service *s, struct qb_ipcs_poll_handlers *handlers) { s->poll_fns.job_add = handlers->job_add; s->poll_fns.dispatch_add = handlers->dispatch_add; s->poll_fns.dispatch_mod = handlers->dispatch_mod; s->poll_fns.dispatch_del = handlers->dispatch_del; } void qb_ipcs_service_context_set(qb_ipcs_service_t* s, void *context) { s->context = context; } void * qb_ipcs_service_context_get(qb_ipcs_service_t* s) { return s->context; } int32_t qb_ipcs_run(struct qb_ipcs_service *s) { int32_t res = 0; if (s->poll_fns.dispatch_add == NULL || s->poll_fns.dispatch_mod == NULL || s->poll_fns.dispatch_del == NULL) { res = -EINVAL; goto run_cleanup; } switch (s->type) { case QB_IPC_SOCKET: qb_ipcs_us_init((struct qb_ipcs_service *)s); break; case QB_IPC_SHM: #ifdef DISABLE_IPC_SHM res = -ENOTSUP; #else qb_ipcs_shm_init((struct qb_ipcs_service *)s); #endif /* DISABLE_IPC_SHM */ break; case QB_IPC_POSIX_MQ: case QB_IPC_SYSV_MQ: res = -ENOTSUP; break; default: res = -EINVAL; break; } if (res == 0) { res = qb_ipcs_us_publish(s); if (res < 0) { (void)qb_ipcs_us_withdraw(s); goto run_cleanup; } } run_cleanup: if (res < 0) { /* Failed to run services, removing initial alloc reference. */ qb_ipcs_unref(s); } return res; } static int32_t _modify_dispatch_descriptor_(struct qb_ipcs_connection *c) { qb_ipcs_dispatch_mod_fn disp_mod = c->service->poll_fns.dispatch_mod; if (c->service->type == QB_IPC_SOCKET) { return disp_mod(c->service->poll_priority, c->event.u.us.sock, c->poll_events, c, qb_ipcs_dispatch_connection_request); } else { return disp_mod(c->service->poll_priority, c->setup.u.us.sock, c->poll_events, c, qb_ipcs_dispatch_connection_request); } return -EINVAL; } void qb_ipcs_request_rate_limit(struct qb_ipcs_service *s, enum qb_ipcs_rate_limit rl) { struct qb_ipcs_connection *c; enum qb_loop_priority old_p = s->poll_priority; struct qb_list_head *pos; struct qb_list_head *n; switch (rl) { case QB_IPCS_RATE_FAST: s->poll_priority = QB_LOOP_HIGH; break; case QB_IPCS_RATE_SLOW: case QB_IPCS_RATE_OFF: case QB_IPCS_RATE_OFF_2: s->poll_priority = QB_LOOP_LOW; break; default: case QB_IPCS_RATE_NORMAL: s->poll_priority = QB_LOOP_MED; break; } qb_list_for_each_safe(pos, n, &s->connections) { c = qb_list_entry(pos, struct qb_ipcs_connection, list); qb_ipcs_connection_ref(c); if (rl == QB_IPCS_RATE_OFF) { qb_ipcs_flowcontrol_set(c, 1); } else if (rl == QB_IPCS_RATE_OFF_2) { qb_ipcs_flowcontrol_set(c, 2); } else { qb_ipcs_flowcontrol_set(c, QB_FALSE); } if (old_p != s->poll_priority) { (void)_modify_dispatch_descriptor_(c); } qb_ipcs_connection_unref(c); } } void qb_ipcs_ref(struct qb_ipcs_service *s) { qb_atomic_int_inc(&s->ref_count); } void qb_ipcs_unref(struct qb_ipcs_service *s) { int32_t free_it; assert(s->ref_count > 0); free_it = qb_atomic_int_dec_and_test(&s->ref_count); if (free_it) { qb_util_log(LOG_DEBUG, "%s() - destroying", __func__); free(s); } } void qb_ipcs_destroy(struct qb_ipcs_service *s) { struct qb_ipcs_connection *c = NULL; struct qb_list_head *pos; struct qb_list_head *n; if (s == NULL) { return; } qb_list_for_each_safe(pos, n, &s->connections) { c = qb_list_entry(pos, struct qb_ipcs_connection, list); if (c == NULL) { continue; } qb_ipcs_disconnect(c); } (void)qb_ipcs_us_withdraw(s); /* service destroyed, remove initial alloc ref */ qb_ipcs_unref(s); } /* * connection API */ static struct qb_ipc_one_way * _event_sock_one_way_get(struct qb_ipcs_connection * c) { if (c->service->needs_sock_for_poll) { return &c->setup; } if (c->event.type == QB_IPC_SOCKET) { return &c->event; } return NULL; } static struct qb_ipc_one_way * _response_sock_one_way_get(struct qb_ipcs_connection * c) { if (c->service->needs_sock_for_poll) { return &c->setup; } if (c->response.type == QB_IPC_SOCKET) { return &c->response; } return NULL; } ssize_t qb_ipcs_response_send(struct qb_ipcs_connection *c, const void *data, size_t size) { ssize_t res; if (c == NULL) { return -EINVAL; } qb_ipcs_connection_ref(c); res = c->service->funcs.send(&c->response, data, size); if (res == size) { c->stats.responses++; } else if (res == -EAGAIN || res == -ETIMEDOUT) { struct qb_ipc_one_way *ow = _response_sock_one_way_get(c); if (ow) { ssize_t res2 = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT); if (res2 < 0) { res = res2; } } c->stats.send_retries++; } qb_ipcs_connection_unref(c); return res; } ssize_t qb_ipcs_response_sendv(struct qb_ipcs_connection * c, const struct iovec * iov, size_t iov_len) { ssize_t res; if (c == NULL) { return -EINVAL; } qb_ipcs_connection_ref(c); res = c->service->funcs.sendv(&c->response, iov, iov_len); if (res > 0) { c->stats.responses++; } else if (res == -EAGAIN || res == -ETIMEDOUT) { struct qb_ipc_one_way *ow = _response_sock_one_way_get(c); if (ow) { ssize_t res2 = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT); if (res2 < 0) { res = res2; } } c->stats.send_retries++; } qb_ipcs_connection_unref(c); return res; } static int32_t resend_event_notifications(struct qb_ipcs_connection *c) { ssize_t res = 0; if (c->outstanding_notifiers > 0) { res = qb_ipc_us_send(&c->setup, c->receive_buf, c->outstanding_notifiers); } if (res > 0) { c->outstanding_notifiers -= res; } assert(c->outstanding_notifiers >= 0); if (c->outstanding_notifiers == 0) { c->poll_events = POLLIN | POLLPRI | POLLNVAL; (void)_modify_dispatch_descriptor_(c); } return res; } static int32_t new_event_notification(struct qb_ipcs_connection * c) { ssize_t res = 0; if (!c->service->needs_sock_for_poll) { return res; } assert(c->outstanding_notifiers >= 0); if (c->outstanding_notifiers > 0) { c->outstanding_notifiers++; + resend_event_notifications(c); } else { res = qb_ipc_us_send(&c->setup, &c->outstanding_notifiers, 1); if (res == -EAGAIN) { /* * notify the client later, when we can. */ c->outstanding_notifiers++; c->poll_events = POLLOUT | POLLIN | POLLPRI | POLLNVAL; (void)_modify_dispatch_descriptor_(c); } } return res; } ssize_t qb_ipcs_event_send(struct qb_ipcs_connection * c, const void *data, size_t size) { ssize_t res; ssize_t resn; if (c == NULL) { return -EINVAL; } else if (size > c->event.max_msg_size) { return -EMSGSIZE; } qb_ipcs_connection_ref(c); res = c->service->funcs.send(&c->event, data, size); if (res == size) { c->stats.events++; resn = new_event_notification(c); if (resn < 0 && resn != -EAGAIN) { errno = -resn; qb_util_perror(LOG_WARNING, "new_event_notification (%s)", c->description); res = resn; } } else if (res == -EAGAIN || res == -ETIMEDOUT) { struct qb_ipc_one_way *ow = _event_sock_one_way_get(c); if (ow) { resn = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT); if (resn < 0) { res = resn; } } c->stats.send_retries++; } qb_ipcs_connection_unref(c); return res; } ssize_t qb_ipcs_event_sendv(struct qb_ipcs_connection * c, const struct iovec * iov, size_t iov_len) { ssize_t res; ssize_t resn; if (c == NULL) { return -EINVAL; } qb_ipcs_connection_ref(c); res = c->service->funcs.sendv(&c->event, iov, iov_len); if (res > 0) { c->stats.events++; resn = new_event_notification(c); if (resn < 0 && resn != -EAGAIN) { errno = -resn; qb_util_perror(LOG_WARNING, "new_event_notification (%s)", c->description); res = resn; } } else if (res == -EAGAIN || res == -ETIMEDOUT) { struct qb_ipc_one_way *ow = _event_sock_one_way_get(c); if (ow) { resn = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT); if (resn < 0) { res = resn; } } c->stats.send_retries++; } qb_ipcs_connection_unref(c); return res; } qb_ipcs_connection_t * qb_ipcs_connection_first_get(struct qb_ipcs_service * s) { struct qb_ipcs_connection *c; if (qb_list_empty(&s->connections)) { return NULL; } c = qb_list_first_entry(&s->connections, struct qb_ipcs_connection, list); qb_ipcs_connection_ref(c); return c; } qb_ipcs_connection_t * qb_ipcs_connection_next_get(struct qb_ipcs_service * s, struct qb_ipcs_connection * current) { struct qb_ipcs_connection *c; if (current == NULL || qb_list_is_last(¤t->list, &s->connections)) { return NULL; } c = qb_list_first_entry(¤t->list, struct qb_ipcs_connection, list); qb_ipcs_connection_ref(c); return c; } int32_t qb_ipcs_service_id_get(struct qb_ipcs_connection * c) { if (c == NULL) { return -EINVAL; } return c->service->service_id; } struct qb_ipcs_connection * qb_ipcs_connection_alloc(struct qb_ipcs_service *s) { struct qb_ipcs_connection *c = calloc(1, sizeof(struct qb_ipcs_connection)); if (c == NULL) { return NULL; } c->pid = 0; c->euid = -1; c->egid = -1; c->receive_buf = NULL; c->context = NULL; c->fc_enabled = QB_FALSE; c->state = QB_IPCS_CONNECTION_INACTIVE; c->poll_events = POLLIN | POLLPRI | POLLNVAL; c->setup.type = s->type; c->request.type = s->type; c->response.type = s->type; c->event.type = s->type; (void)strlcpy(c->description, "not set yet", CONNECTION_DESCRIPTION); /* initial alloc ref */ qb_ipcs_connection_ref(c); /* * The connection makes use of the service object. Give the connection * a reference to the service so we know the service can never be destroyed * until the connection is done with it. */ qb_ipcs_ref(s); c->service = s; qb_list_init(&c->list); return c; } void qb_ipcs_connection_ref(struct qb_ipcs_connection *c) { if (c) { qb_atomic_int_inc(&c->refcount); } } void qb_ipcs_connection_unref(struct qb_ipcs_connection *c) { int32_t free_it; if (c == NULL) { return; } if (c->refcount < 1) { qb_util_log(LOG_ERR, "ref:%d state:%d (%s)", c->refcount, c->state, c->description); assert(0); } free_it = qb_atomic_int_dec_and_test(&c->refcount); if (free_it) { qb_list_del(&c->list); if (c->service->serv_fns.connection_destroyed) { c->service->serv_fns.connection_destroyed(c); } c->service->funcs.disconnect(c); /* Let go of the connection's reference to the service */ qb_ipcs_unref(c->service); free(c->receive_buf); free(c); } } void qb_ipcs_disconnect(struct qb_ipcs_connection *c) { int32_t res = 0; qb_loop_job_dispatch_fn rerun_job; if (c == NULL) { return; } qb_util_log(LOG_DEBUG, "%s(%s) state:%d", __func__, c->description, c->state); if (c->state == QB_IPCS_CONNECTION_ACTIVE) { c->service->funcs.disconnect(c); c->state = QB_IPCS_CONNECTION_INACTIVE; c->service->stats.closed_connections++; /* return early as it's an incomplete connection. */ return; } if (c->state == QB_IPCS_CONNECTION_ESTABLISHED) { c->service->funcs.disconnect(c); c->state = QB_IPCS_CONNECTION_SHUTTING_DOWN; c->service->stats.active_connections--; c->service->stats.closed_connections++; } if (c->state == QB_IPCS_CONNECTION_SHUTTING_DOWN) { int scheduled_retry = 0; res = 0; if (c->service->serv_fns.connection_closed) { res = c->service->serv_fns.connection_closed(c); } if (res != 0) { /* OK, so they want the connection_closed * function re-run */ rerun_job = (qb_loop_job_dispatch_fn) qb_ipcs_disconnect; res = c->service->poll_fns.job_add(QB_LOOP_LOW, c, rerun_job); if (res == 0) { /* this function is going to be called again. * so hold off on the unref */ scheduled_retry = 1; } } if (scheduled_retry == 0) { /* This removes the initial alloc ref */ qb_ipcs_connection_unref(c); } } } static void qb_ipcs_flowcontrol_set(struct qb_ipcs_connection *c, int32_t fc_enable) { if (c == NULL) { return; } if (c->fc_enabled != fc_enable) { c->service->funcs.fc_set(&c->request, fc_enable); c->fc_enabled = fc_enable; c->stats.flow_control_state = fc_enable; c->stats.flow_control_count++; } } static int32_t _process_request_(struct qb_ipcs_connection *c, int32_t ms_timeout) { int32_t res = 0; ssize_t size; struct qb_ipc_request_header *hdr; qb_ipcs_connection_ref(c); if (c->service->funcs.peek && c->service->funcs.reclaim) { size = c->service->funcs.peek(&c->request, (void **)&hdr, ms_timeout); } else { hdr = c->receive_buf; size = c->service->funcs.recv(&c->request, hdr, c->request.max_msg_size, ms_timeout); } if (size < 0) { if (size != -EAGAIN && size != -ETIMEDOUT) { qb_util_perror(LOG_DEBUG, "recv from client connection failed (%s)", c->description); } else { c->stats.recv_retries++; } res = size; goto cleanup; } else if (size == 0 || hdr->id == QB_IPC_MSG_DISCONNECT) { qb_util_log(LOG_DEBUG, "client requesting a disconnect (%s)", c->description); qb_ipcs_disconnect(c); c = NULL; res = -ESHUTDOWN; } else { c->stats.requests++; res = c->service->serv_fns.msg_process(c, hdr, hdr->size); /* 0 == good, negative == backoff */ if (res < 0) { res = -ENOBUFS; } else { res = size; } } if (c && c->service->funcs.peek && c->service->funcs.reclaim) { c->service->funcs.reclaim(&c->request); } cleanup: qb_ipcs_connection_unref(c); return res; } #define IPC_REQUEST_TIMEOUT 10 #define MAX_RECV_MSGS 50 int32_t qb_ipcs_dispatch_service_request(int32_t fd, int32_t revents, void *data) { int32_t res = _process_request_((struct qb_ipcs_connection *)data, IPC_REQUEST_TIMEOUT); if (res > 0) { return 0; } return res; } static ssize_t _request_q_len_get(struct qb_ipcs_connection *c) { ssize_t q_len; if (c->service->funcs.q_len_get) { q_len = c->service->funcs.q_len_get(&c->request); if (q_len <= 0) { return q_len; } if (c->service->poll_priority == QB_LOOP_MED) { q_len = QB_MIN(q_len, 5); } else if (c->service->poll_priority == QB_LOOP_LOW) { q_len = 1; } else { q_len = QB_MIN(q_len, MAX_RECV_MSGS); } } else { q_len = 1; } return q_len; } int32_t qb_ipcs_dispatch_connection_request(int32_t fd, int32_t revents, void *data) { struct qb_ipcs_connection *c = (struct qb_ipcs_connection *)data; char bytes[MAX_RECV_MSGS]; int32_t res = 0; int32_t res2; int32_t recvd = 0; ssize_t avail; if (revents & POLLNVAL) { qb_util_log(LOG_DEBUG, "NVAL conn (%s)", c->description); res = -EINVAL; goto dispatch_cleanup; } if (revents & POLLHUP) { qb_util_log(LOG_DEBUG, "HUP conn (%s)", c->description); res = -ESHUTDOWN; goto dispatch_cleanup; } if (revents & POLLOUT) { /* try resend events now that fd can write */ res = resend_event_notifications(c); if (res < 0 && res != -EAGAIN) { errno = -res; qb_util_perror(LOG_WARNING, "resend_event_notifications (%s)", c->description); } /* nothing to read */ if ((revents & POLLIN) == 0) { res = 0; goto dispatch_cleanup; } } if (c->fc_enabled) { res = 0; goto dispatch_cleanup; } avail = _request_q_len_get(c); if (c->service->needs_sock_for_poll && avail == 0) { res2 = qb_ipc_us_recv(&c->setup, bytes, 1, 0); if (qb_ipc_us_sock_error_is_disconnected(res2)) { errno = -res2; qb_util_perror(LOG_WARNING, "conn (%s) disconnected", c->description); res = -ESHUTDOWN; goto dispatch_cleanup; } else { qb_util_log(LOG_WARNING, "conn (%s) Nothing in q but got POLLIN on fd:%d (res2:%d)", c->description, fd, res2); res = 0; goto dispatch_cleanup; } } do { res = _process_request_(c, IPC_REQUEST_TIMEOUT); if (res > 0 || res == -ENOBUFS || res == -EINVAL) { recvd++; } if (res > 0) { avail--; } } while (avail > 0 && res > 0 && !c->fc_enabled); if (c->service->needs_sock_for_poll && recvd > 0) { res2 = qb_ipc_us_recv(&c->setup, bytes, recvd, -1); if (qb_ipc_us_sock_error_is_disconnected(res2)) { errno = -res2; qb_util_perror(LOG_ERR, "error receiving from setup sock (%s)", c->description); res = -ESHUTDOWN; goto dispatch_cleanup; } } res = QB_MIN(0, res); if (res == -EAGAIN || res == -ETIMEDOUT || res == -ENOBUFS) { res = 0; } if (res != 0) { if (res != -ENOTCONN) { /* * Abnormal state (ENOTCONN is normal shutdown). */ errno = -res; qb_util_perror(LOG_ERR, "request returned error (%s)", c->description); } } dispatch_cleanup: if (res != 0) { qb_ipcs_disconnect(c); } return res; } void qb_ipcs_context_set(struct qb_ipcs_connection *c, void *context) { if (c == NULL) { return; } c->context = context; } void * qb_ipcs_context_get(struct qb_ipcs_connection *c) { if (c == NULL) { return NULL; } return c->context; } void * qb_ipcs_connection_service_context_get(qb_ipcs_connection_t *c) { if (c == NULL || c->service == NULL) { return NULL; } return c->service->context; } int32_t qb_ipcs_connection_stats_get(qb_ipcs_connection_t * c, struct qb_ipcs_connection_stats * stats, int32_t clear_after_read) { if (c == NULL) { return -EINVAL; } memcpy(stats, &c->stats, sizeof(struct qb_ipcs_connection_stats)); if (clear_after_read) { memset(&c->stats, 0, sizeof(struct qb_ipcs_connection_stats_2)); c->stats.client_pid = c->pid; } return 0; } struct qb_ipcs_connection_stats_2* qb_ipcs_connection_stats_get_2(qb_ipcs_connection_t *c, int32_t clear_after_read) { struct qb_ipcs_connection_stats_2 * stats; if (c == NULL) { errno = EINVAL; return NULL; } stats = calloc(1, sizeof(struct qb_ipcs_connection_stats_2)); if (stats == NULL) { return NULL; } memcpy(stats, &c->stats, sizeof(struct qb_ipcs_connection_stats_2)); if (c->service->funcs.q_len_get) { stats->event_q_length = c->service->funcs.q_len_get(&c->event); } else { stats->event_q_length = 0; } if (clear_after_read) { memset(&c->stats, 0, sizeof(struct qb_ipcs_connection_stats_2)); c->stats.client_pid = c->pid; } return stats; } int32_t qb_ipcs_stats_get(struct qb_ipcs_service * s, struct qb_ipcs_stats * stats, int32_t clear_after_read) { if (s == NULL) { return -EINVAL; } memcpy(stats, &s->stats, sizeof(struct qb_ipcs_stats)); if (clear_after_read) { memset(&s->stats, 0, sizeof(struct qb_ipcs_stats)); } return 0; } void qb_ipcs_connection_auth_set(qb_ipcs_connection_t *c, uid_t uid, gid_t gid, mode_t mode) { if (c) { c->auth.uid = uid; c->auth.gid = gid; c->auth.mode = mode; } } diff --git a/lib/loop_poll_kqueue.c b/lib/loop_poll_kqueue.c index 3149640..871e4b0 100644 --- a/lib/loop_poll_kqueue.c +++ b/lib/loop_poll_kqueue.c @@ -1,210 +1,212 @@ /* * Copyright (C) 2012 Red Hat, Inc. * * Author: Angus Salkeld * * This file is part of libqb. * * libqb is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation, either version 2.1 of the License, or * (at your option) any later version. * * libqb is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with libqb. If not, see . */ #include "os_base.h" #include "loop_poll_int.h" #ifdef HAVE_SYS_EVENT_H #include #endif /* HAVE_SYS_EVENT_H */ #define MAX_EVENTS 12 static int32_t _poll_to_filter_(int32_t event) { int32_t out = 0; if (event & POLLIN) out |= EVFILT_READ; if (event & POLLOUT) out |= EVFILT_WRITE; return out; } static void _fini(struct qb_poll_source *s) { if (s->epollfd != -1) { close(s->epollfd); s->epollfd = -1; } } static int32_t _add(struct qb_poll_source *s, struct qb_poll_entry *pe, int32_t fd, int32_t events) { int32_t res = 0; struct kevent ke; short filters = _poll_to_filter_(events); - EV_SET(&ke, fd, filters, EV_ADD, 0, 0, (intptr_t)pe); + EV_SET(&ke, fd, filters, EV_ADD | EV_ENABLE, 0, 0, (intptr_t)pe); res = kevent(s->epollfd, &ke, 1, NULL, 0, NULL); if (res == -1) { res = -errno; qb_util_perror(LOG_ERR, "kevent(add)"); } return res; } static int32_t _mod(struct qb_poll_source *s, struct qb_poll_entry *pe, int32_t fd, int32_t events) { int32_t res = 0; struct kevent ke[2]; short new_filters = _poll_to_filter_(events); short old_filters = _poll_to_filter_(pe->ufd.events); EV_SET(&ke[0], fd, old_filters, EV_DELETE, 0, 0, (intptr_t)pe); - EV_SET(&ke[1], fd, new_filters, EV_ADD, 0, 0, (intptr_t)pe); + EV_SET(&ke[1], fd, new_filters, EV_ADD | EV_ENABLE, 0, 0, (intptr_t)pe); res = kevent(s->epollfd, ke, 2, NULL, 0, NULL); if (res == -1) { res = -errno; qb_util_perror(LOG_ERR, "kevent(mod)"); } return res; } static int32_t _del(struct qb_poll_source *s, struct qb_poll_entry *pe, int32_t fd, int32_t events) { int32_t res = 0; struct kevent ke; short filters = _poll_to_filter_(events); EV_SET(&ke, fd, filters, EV_DELETE, 0, 0, (intptr_t)pe); res = kevent(s->epollfd, &ke, 1, NULL, 0, NULL); if (res == -1 && errno == ENOENT) { /* * Not a problem already cleaned up. */ res = 0; } else if (res == -1) { res = -errno; qb_util_perror(LOG_ERR, "kevent(del)"); } return res; } static int32_t _poll_and_add_to_jobs_(struct qb_loop_source *src, int32_t ms_timeout) { int32_t i; int32_t event_count; int32_t new_jobs = 0; int32_t revents = 0; struct qb_poll_entry *pe = NULL; struct qb_poll_source *s = (struct qb_poll_source *)src; struct kevent events[MAX_EVENTS]; struct timespec timeout = { 0, 0 }; struct timespec *timeout_pt = &timeout; if (ms_timeout > 0) { qb_timespec_add_ms(&timeout, ms_timeout); } else if (ms_timeout < 0) { timeout_pt = NULL; } qb_poll_fds_usage_check_(s); retry_poll: event_count = kevent(s->epollfd, NULL, 0, events, MAX_EVENTS, timeout_pt); if (errno == EINTR && event_count == -1) { goto retry_poll; } else if (event_count == -1) { qb_util_perror(LOG_ERR, "kevent(poll)"); return -errno; } for (i = 0; i < event_count; i++) { revents = 0; pe = (struct qb_poll_entry *)events[i].udata; +#if 0 if (events[i].flags) { qb_util_log(LOG_TRACE, "got flags %d on fd %d.", events[i].flags, pe->ufd.fd); } +#endif if (events[i].flags & EV_ERROR) { qb_util_log(LOG_WARNING, "got EV_ERROR on fd %d.", pe->ufd.fd); revents |= POLLERR; } if (events[i].flags & EV_EOF) { qb_util_log(LOG_INFO, "got EV_EOF on fd %d.", pe->ufd.fd); revents |= POLLHUP; } if (events[i].filter == EVFILT_READ) { revents |= POLLIN; } if (events[i].filter == EVFILT_WRITE) { revents |= POLLOUT; } if (pe->ufd.fd == -1 || pe->state == QB_POLL_ENTRY_DELETED) { qb_util_log(LOG_WARNING, "can't post new event to a deleted entry."); /* * empty/deleted */ EV_SET(&events[i], events[i].ident, events[i].filter, EV_DELETE, 0, 0, (intptr_t)pe); (void)kevent(s->epollfd, &events[i], 1, NULL, 0, NULL); continue; } if (pe->ufd.fd != events[i].ident) { qb_util_log(LOG_WARNING, "can't find poll entry for new event."); continue; } if (revents == pe->ufd.revents || pe->state == QB_POLL_ENTRY_JOBLIST) { /* * entry already in the job queue. */ continue; } pe->ufd.revents = revents; new_jobs += pe->add_to_jobs(src->l, pe); } return new_jobs; } int32_t qb_kqueue_init(struct qb_poll_source *s) { s->epollfd = kqueue(); if (s->epollfd < 0) { qb_util_perror(LOG_ERR, "kqueue()"); return -errno; } s->driver.fini = _fini; s->driver.add = _add; s->driver.mod = _mod; s->driver.del = _del; s->s.poll = _poll_and_add_to_jobs_; return 0; } diff --git a/tests/check_ipc.c b/tests/check_ipc.c index 2de1d85..4502481 100644 --- a/tests/check_ipc.c +++ b/tests/check_ipc.c @@ -1,1143 +1,1312 @@ /* * Copyright (c) 2010 Red Hat, Inc. * * All rights reserved. * * Author: Angus Salkeld * * This file is part of libqb. * * libqb is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation, either version 2.1 of the License, or * (at your option) any later version. * * libqb is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with libqb. If not, see . */ #include "os_base.h" #include #include #include #include #include #include #include #include static const char *ipc_name = "ipc_test"; #define MAX_MSG_SIZE (8192*16) static qb_ipcc_connection_t *conn; static enum qb_ipc_type ipc_type; enum my_msg_ids { IPC_MSG_REQ_TX_RX, IPC_MSG_RES_TX_RX, IPC_MSG_REQ_DISPATCH, IPC_MSG_RES_DISPATCH, IPC_MSG_REQ_BULK_EVENTS, IPC_MSG_RES_BULK_EVENTS, + IPC_MSG_REQ_STRESS_EVENT, + IPC_MSG_RES_STRESS_EVENT, IPC_MSG_REQ_SERVER_FAIL, IPC_MSG_RES_SERVER_FAIL, IPC_MSG_REQ_SERVER_DISCONNECT, IPC_MSG_RES_SERVER_DISCONNECT, }; /* Test Cases * * 1) basic send & recv differnet message sizes * * 2) send message to start dispatch (confirm receipt) * * 3) flow control * * 4) authentication * * 5) thread safety * * 6) cleanup * * 7) service availabilty * * 8) multiple services */ static qb_loop_t *my_loop; static qb_ipcs_service_t* s1; static int32_t turn_on_fc = QB_FALSE; static int32_t fc_enabled = 89; static int32_t send_event_on_created = QB_FALSE; static int32_t disconnect_after_created = QB_FALSE; static int32_t num_bulk_events = 10; +static int32_t num_stress_events = 30000; static int32_t reference_count_test = QB_FALSE; static int32_t exit_handler(int32_t rsignal, void *data) { qb_log(LOG_DEBUG, "caught signal %d", rsignal); qb_ipcs_destroy(s1); return -1; } static int32_t s1_msg_process_fn(qb_ipcs_connection_t *c, void *data, size_t size) { struct qb_ipc_request_header *req_pt = (struct qb_ipc_request_header *)data; - struct qb_ipc_response_header response; + struct qb_ipc_response_header response = { 0, }; ssize_t res; if (req_pt->id == IPC_MSG_REQ_TX_RX) { response.size = sizeof(struct qb_ipc_response_header); response.id = IPC_MSG_RES_TX_RX; response.error = 0; res = qb_ipcs_response_send(c, &response, response.size); if (res < 0) { qb_perror(LOG_INFO, "qb_ipcs_response_send"); } else if (res != response.size) { qb_log(LOG_DEBUG, "qb_ipcs_response_send %zd != %d", res, response.size); } if (turn_on_fc) { qb_ipcs_request_rate_limit(s1, QB_IPCS_RATE_OFF); } } else if (req_pt->id == IPC_MSG_REQ_DISPATCH) { response.size = sizeof(struct qb_ipc_response_header); response.id = IPC_MSG_RES_DISPATCH; response.error = 0; res = qb_ipcs_event_send(c, &response, sizeof(response)); if (res < 0) { qb_perror(LOG_INFO, "qb_ipcs_event_send"); } } else if (req_pt->id == IPC_MSG_REQ_BULK_EVENTS) { int32_t m; int32_t num; struct qb_ipcs_connection_stats_2 *stats; response.size = sizeof(struct qb_ipc_response_header); response.error = 0; stats = qb_ipcs_connection_stats_get_2(c, QB_FALSE); num = stats->event_q_length; free(stats); /* crazy large message */ res = qb_ipcs_event_send(c, &response, MAX_MSG_SIZE*10); ck_assert_int_eq(res, -EMSGSIZE); - for (m = 0; m < num_bulk_events; m++) { - res = qb_ipcs_event_send(c, &response, - sizeof(response)); + /* send one event before responding */ + res = qb_ipcs_event_send(c, &response, sizeof(response)); + ck_assert_int_eq(res, sizeof(response)); + response.id++; + + /* send response */ + response.id = IPC_MSG_RES_BULK_EVENTS; + res = qb_ipcs_response_send(c, &response, response.size); + ck_assert_int_eq(res, sizeof(response)); + + /* send the rest of the events after the response */ + for (m = 1; m < num_bulk_events; m++) { + res = qb_ipcs_event_send(c, &response, sizeof(response)); + + if (res == -EAGAIN || res == -ENOBUFS) { + /* retry */ + usleep(1000); + m--; + continue; + } ck_assert_int_eq(res, sizeof(response)); response.id++; } stats = qb_ipcs_connection_stats_get_2(c, QB_FALSE); ck_assert_int_eq(stats->event_q_length - num, num_bulk_events); free(stats); - response.id = IPC_MSG_RES_BULK_EVENTS; + } else if (req_pt->id == IPC_MSG_REQ_STRESS_EVENT) { + int32_t m; + + response.size = sizeof(struct qb_ipc_response_header); + response.error = 0; + + response.id = IPC_MSG_RES_STRESS_EVENT; res = qb_ipcs_response_send(c, &response, response.size); ck_assert_int_eq(res, sizeof(response)); + response.id=IPC_MSG_RES_STRESS_EVENT; + for (m = 0; m < num_stress_events; m++) { + res = qb_ipcs_event_send(c, &response, + sizeof(response)); + if (res < 0) { + if (res == -EAGAIN || res == -ENOBUFS) { + /* yield to the receive process */ + usleep(1000); + m--; + continue; + } else { + qb_perror(LOG_DEBUG, "sending stress events"); + ck_assert_int_eq(res, sizeof(response)); + } + } + response.id++; + + if ((m % 1000) == 0) { + qb_log(LOG_DEBUG, "SENT: %d stress events sent", m); + } + } + } else if (req_pt->id == IPC_MSG_REQ_SERVER_FAIL) { exit(0); } else if (req_pt->id == IPC_MSG_REQ_SERVER_DISCONNECT) { qb_ipcs_disconnect(c); } return 0; } static int32_t my_job_add(enum qb_loop_priority p, void *data, qb_loop_job_dispatch_fn fn) { return qb_loop_job_add(my_loop, p, data, fn); } static int32_t my_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t events, void *data, qb_ipcs_dispatch_fn_t fn) { return qb_loop_poll_add(my_loop, p, fd, events, data, fn); } static int32_t my_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t events, void *data, qb_ipcs_dispatch_fn_t fn) { return qb_loop_poll_mod(my_loop, p, fd, events, data, fn); } static int32_t my_dispatch_del(int32_t fd) { return qb_loop_poll_del(my_loop, fd); } static int32_t s1_connection_closed(qb_ipcs_connection_t *c) { qb_enter(); qb_leave(); return 0; } static void outq_flush (void *data) { static int i = 0; struct cs_ipcs_conn_context *cnx; cnx = qb_ipcs_context_get(data); qb_log(LOG_DEBUG,"iter %u\n", i); i++; if (i == 2) { qb_ipcs_destroy(s1); s1 = NULL; } /* is the reference counting is not working, this should fail * for i > 1. */ qb_ipcs_event_send(data, "test", 4); assert(memcmp(cnx, "test", 4) == 0); if (i < 5) { qb_loop_job_add(my_loop, QB_LOOP_HIGH, data, outq_flush); } else { /* this single unref should clean everything up. */ qb_ipcs_connection_unref(data); qb_log(LOG_INFO, "end of test, stopping loop"); qb_loop_stop(my_loop); } } static void s1_connection_destroyed(qb_ipcs_connection_t *c) { qb_enter(); if (reference_count_test) { struct cs_ipcs_conn_context *cnx; cnx = qb_ipcs_context_get(c); free(cnx); } else { qb_loop_stop(my_loop); } qb_leave(); } static void s1_connection_created(qb_ipcs_connection_t *c) { if (send_event_on_created) { struct qb_ipc_response_header response; int32_t res; response.size = sizeof(struct qb_ipc_response_header); response.id = IPC_MSG_RES_DISPATCH; response.error = 0; res = qb_ipcs_event_send(c, &response, sizeof(response)); ck_assert_int_eq(res, response.size); } if (reference_count_test) { struct cs_ipcs_conn_context *context; qb_ipcs_connection_ref(c); qb_loop_job_add(my_loop, QB_LOOP_HIGH, c, outq_flush); context = calloc(1, 20); memcpy(context, "test", 4); qb_ipcs_context_set(c, context); } } static void run_ipc_server(void) { int32_t res; qb_loop_signal_handle handle; struct qb_ipcs_service_handlers sh = { .connection_accept = NULL, .connection_created = s1_connection_created, .msg_process = s1_msg_process_fn, .connection_destroyed = s1_connection_destroyed, .connection_closed = s1_connection_closed, }; struct qb_ipcs_poll_handlers ph = { .job_add = my_job_add, .dispatch_add = my_dispatch_add, .dispatch_mod = my_dispatch_mod, .dispatch_del = my_dispatch_del, }; qb_loop_signal_add(my_loop, QB_LOOP_HIGH, SIGSTOP, NULL, exit_handler, &handle); qb_loop_signal_add(my_loop, QB_LOOP_HIGH, SIGTERM, NULL, exit_handler, &handle); my_loop = qb_loop_create(); s1 = qb_ipcs_create(ipc_name, 4, ipc_type, &sh); fail_if(s1 == 0); qb_ipcs_poll_handlers_set(s1, &ph); res = qb_ipcs_run(s1); ck_assert_int_eq(res, 0); qb_loop_run(my_loop); qb_log(LOG_DEBUG, "loop finished - done ..."); } static int32_t run_function_in_new_process(void (*run_ipc_server_fn)(void)) { pid_t pid = fork (); if (pid == -1) { fprintf (stderr, "Can't fork\n"); return -1; } if (pid == 0) { run_ipc_server_fn(); return 0; } return pid; } static int32_t stop_process(pid_t pid) { /* wait a bit for the server to shutdown by it's self */ usleep(100000); kill(pid, SIGTERM); waitpid(pid, NULL, 0); return 0; } struct my_req { struct qb_ipc_request_header hdr; char message[1024 * 1024]; }; static struct my_req request; static int32_t send_and_check(int32_t req_id, uint32_t size, int32_t ms_timeout, int32_t expect_perfection) { struct qb_ipc_response_header res_header; int32_t res; int32_t try_times = 0; request.hdr.id = req_id; request.hdr.size = sizeof(struct qb_ipc_request_header) + size; /* check that we can't send a message that is too big * and we get the right return code. */ res = qb_ipcc_send(conn, &request, MAX_MSG_SIZE*2); ck_assert_int_eq(res, -EMSGSIZE); repeat_send: res = qb_ipcc_send(conn, &request, request.hdr.size); try_times++; if (res < 0) { if (res == -EAGAIN && try_times < 10) { goto repeat_send; } else { if (res == -EAGAIN && try_times >= 10) { fc_enabled = QB_TRUE; } errno = -res; qb_perror(LOG_INFO, "qb_ipcc_send"); return res; } } if (req_id == IPC_MSG_REQ_DISPATCH) { res = qb_ipcc_event_recv(conn, &res_header, sizeof(struct qb_ipc_response_header), ms_timeout); } else { res = qb_ipcc_recv(conn, &res_header, sizeof(struct qb_ipc_response_header), ms_timeout); } if (res == -EINTR) { return -1; } if (res == -EAGAIN || res == -ETIMEDOUT) { fc_enabled = QB_TRUE; qb_perror(LOG_DEBUG, "qb_ipcc_recv"); return res; } if (expect_perfection) { ck_assert_int_eq(res, sizeof(struct qb_ipc_response_header)); ck_assert_int_eq(res_header.id, req_id + 1); ck_assert_int_eq(res_header.size, sizeof(struct qb_ipc_response_header)); } return res; } static int32_t recv_timeout = -1; static void test_ipc_txrx(void) { int32_t j; int32_t c = 0; size_t size; pid_t pid; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); size = QB_MIN(sizeof(struct qb_ipc_request_header), 64); for (j = 1; j < 19; j++) { size *= 2; if (size >= MAX_MSG_SIZE) break; if (send_and_check(IPC_MSG_REQ_TX_RX, size, recv_timeout, QB_TRUE) < 0) { break; } } if (turn_on_fc) { ck_assert_int_eq(fc_enabled, QB_TRUE); } qb_ipcc_disconnect(conn); stop_process(pid); } static void test_ipc_exit(void) { struct qb_ipc_request_header req_header; struct qb_ipc_response_header res_header; struct iovec iov[1]; int32_t res; int32_t c = 0; int32_t j = 0; pid_t pid; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); req_header.id = IPC_MSG_REQ_TX_RX; req_header.size = sizeof(struct qb_ipc_request_header); iov[0].iov_len = req_header.size; iov[0].iov_base = &req_header; res = qb_ipcc_sendv_recv(conn, iov, 1, &res_header, sizeof(struct qb_ipc_response_header), -1); ck_assert_int_eq(res, sizeof(struct qb_ipc_response_header)); /* kill the server */ stop_process(pid); /* * wait a bit for the server to die. */ sleep(1); /* * this needs to free up the shared mem */ qb_ipcc_disconnect(conn); } START_TEST(test_ipc_exit_us) { qb_enter(); ipc_type = QB_IPC_SOCKET; ipc_name = __func__; recv_timeout = 5000; test_ipc_exit(); qb_leave(); } END_TEST START_TEST(test_ipc_exit_shm) { qb_enter(); ipc_type = QB_IPC_SHM; ipc_name = __func__; recv_timeout = 1000; test_ipc_exit(); qb_leave(); } END_TEST START_TEST(test_ipc_txrx_shm_tmo) { qb_enter(); turn_on_fc = QB_FALSE; ipc_type = QB_IPC_SHM; ipc_name = __func__; recv_timeout = 1000; test_ipc_txrx(); qb_leave(); } END_TEST START_TEST(test_ipc_txrx_shm_block) { qb_enter(); turn_on_fc = QB_FALSE; ipc_type = QB_IPC_SHM; ipc_name = __func__; recv_timeout = -1; test_ipc_txrx(); qb_leave(); } END_TEST START_TEST(test_ipc_fc_shm) { qb_enter(); turn_on_fc = QB_TRUE; ipc_type = QB_IPC_SHM; recv_timeout = 500; ipc_name = __func__; test_ipc_txrx(); qb_leave(); } END_TEST START_TEST(test_ipc_txrx_us_block) { qb_enter(); turn_on_fc = QB_FALSE; ipc_type = QB_IPC_SOCKET; ipc_name = __func__; recv_timeout = -1; test_ipc_txrx(); qb_leave(); } END_TEST START_TEST(test_ipc_txrx_us_tmo) { qb_enter(); turn_on_fc = QB_FALSE; ipc_type = QB_IPC_SOCKET; ipc_name = __func__; recv_timeout = 1000; test_ipc_txrx(); qb_leave(); } END_TEST START_TEST(test_ipc_fc_us) { qb_enter(); turn_on_fc = QB_TRUE; ipc_type = QB_IPC_SOCKET; recv_timeout = 500; ipc_name = __func__; test_ipc_txrx(); qb_leave(); } END_TEST struct my_res { struct qb_ipc_response_header hdr; char message[1024 * 1024]; }; static void test_ipc_dispatch(void) { int32_t j; int32_t c = 0; pid_t pid; int32_t size; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); size = QB_MIN(sizeof(struct qb_ipc_request_header), 64); for (j = 1; j < 19; j++) { size *= 2; if (size >= MAX_MSG_SIZE) break; if (send_and_check(IPC_MSG_REQ_DISPATCH, size, recv_timeout, QB_TRUE) < 0) { break; } } qb_ipcc_disconnect(conn); stop_process(pid); } START_TEST(test_ipc_disp_us) { qb_enter(); ipc_type = QB_IPC_SOCKET; ipc_name = __func__; test_ipc_dispatch(); qb_leave(); } END_TEST static int32_t events_received; +static int32_t +count_stress_events(int32_t fd, int32_t revents, void *data) +{ + qb_loop_t *cl = (qb_loop_t*)data; + struct qb_ipc_response_header res_header = { 0, }; + int32_t res; + + res = qb_ipcc_event_recv(conn, &res_header, + sizeof(struct qb_ipc_response_header), + -1); + if (res > 0) { + events_received++; + + if ((events_received % 1000) == 0) { + qb_log(LOG_DEBUG, "RECV: %d stress events processed", events_received); + } + } else if (res != -EAGAIN) { + qb_perror(LOG_DEBUG, "count_stress_events"); + qb_loop_stop(cl); + return -1; + } + + if (events_received >= num_stress_events) { + qb_loop_stop(cl); + return -1; + } + return 0; +} static int32_t count_bulk_events(int32_t fd, int32_t revents, void *data) { qb_loop_t *cl = (qb_loop_t*)data; struct qb_ipc_response_header res_header; int32_t res; res = qb_ipcc_event_recv(conn, &res_header, sizeof(struct qb_ipc_response_header), -1); if (res > 0) { events_received++; } if (events_received >= num_bulk_events) { qb_loop_stop(cl); return -1; } return 0; } static void test_ipc_bulk_events(void) { struct qb_ipc_request_header req_header; struct qb_ipc_response_header res_header; struct iovec iov[1]; int32_t c = 0; int32_t j = 0; pid_t pid; int32_t res; qb_loop_t *cl; int32_t fd; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); events_received = 0; cl = qb_loop_create(); res = qb_ipcc_fd_get(conn, &fd); ck_assert_int_eq(res, 0); res = qb_loop_poll_add(cl, QB_LOOP_MED, fd, POLLIN, cl, count_bulk_events); ck_assert_int_eq(res, 0); res = send_and_check(IPC_MSG_REQ_BULK_EVENTS, 0, recv_timeout, QB_TRUE); ck_assert_int_eq(res, sizeof(struct qb_ipc_response_header)); qb_loop_run(cl); ck_assert_int_eq(events_received, num_bulk_events); req_header.id = IPC_MSG_REQ_SERVER_FAIL; req_header.size = sizeof(struct qb_ipc_request_header); iov[0].iov_len = req_header.size; iov[0].iov_base = &req_header; res = qb_ipcc_sendv_recv(conn, iov, 1, &res_header, sizeof(struct qb_ipc_response_header), -1); if (res != -ECONNRESET && res != -ENOTCONN) { qb_log(LOG_ERR, "id:%d size:%d", res_header.id, res_header.size); ck_assert_int_eq(res, -ENOTCONN); } qb_ipcc_disconnect(conn); stop_process(pid); } +static void +test_ipc_stress_test(void) +{ + struct qb_ipc_request_header req_header; + struct qb_ipc_response_header res_header; + struct iovec iov[1]; + int32_t c = 0; + int32_t j = 0; + pid_t pid; + int32_t res; + qb_loop_t *cl; + int32_t fd; + + pid = run_function_in_new_process(run_ipc_server); + fail_if(pid == -1); + sleep(1); + + do { + conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE); + if (conn == NULL) { + j = waitpid(pid, NULL, WNOHANG); + ck_assert_int_eq(j, 0); + sleep(1); + c++; + } + } while (conn == NULL && c < 5); + fail_if(conn == NULL); + + qb_log(LOG_DEBUG, "Testing %d iterations of EVENT msg passing.", num_stress_events); + + events_received = 0; + cl = qb_loop_create(); + res = qb_ipcc_fd_get(conn, &fd); + ck_assert_int_eq(res, 0); + res = qb_loop_poll_add(cl, QB_LOOP_MED, + fd, POLLIN, + cl, count_stress_events); + ck_assert_int_eq(res, 0); + + res = send_and_check(IPC_MSG_REQ_STRESS_EVENT, 0, recv_timeout, QB_TRUE); + + qb_loop_run(cl); + ck_assert_int_eq(events_received, num_stress_events); + + req_header.id = IPC_MSG_REQ_SERVER_FAIL; + req_header.size = sizeof(struct qb_ipc_request_header); + + iov[0].iov_len = req_header.size; + iov[0].iov_base = &req_header; + res = qb_ipcc_sendv_recv(conn, iov, 1, + &res_header, + sizeof(struct qb_ipc_response_header), -1); + if (res != -ECONNRESET && res != -ENOTCONN) { + qb_log(LOG_ERR, "id:%d size:%d", res_header.id, res_header.size); + ck_assert_int_eq(res, -ENOTCONN); + } + + qb_ipcc_disconnect(conn); + stop_process(pid); +} + +START_TEST(test_ipc_stress_test_us) +{ + qb_enter(); + send_event_on_created = QB_FALSE; + ipc_type = QB_IPC_SOCKET; + ipc_name = __func__; + test_ipc_stress_test(); + qb_leave(); +} +END_TEST + START_TEST(test_ipc_bulk_events_us) { qb_enter(); send_event_on_created = QB_FALSE; ipc_type = QB_IPC_SOCKET; ipc_name = __func__; test_ipc_bulk_events(); qb_leave(); } END_TEST static void test_ipc_event_on_created(void) { int32_t c = 0; int32_t j = 0; pid_t pid; int32_t res; qb_loop_t *cl; int32_t fd; num_bulk_events = 1; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); events_received = 0; cl = qb_loop_create(); res = qb_ipcc_fd_get(conn, &fd); ck_assert_int_eq(res, 0); res = qb_loop_poll_add(cl, QB_LOOP_MED, fd, POLLIN, cl, count_bulk_events); ck_assert_int_eq(res, 0); qb_loop_run(cl); ck_assert_int_eq(events_received, num_bulk_events); qb_ipcc_disconnect(conn); stop_process(pid); } START_TEST(test_ipc_event_on_created_us) { qb_enter(); send_event_on_created = QB_TRUE; ipc_type = QB_IPC_SOCKET; ipc_name = __func__; test_ipc_event_on_created(); qb_leave(); } END_TEST static void test_ipc_disconnect_after_created(void) { struct qb_ipc_request_header req_header; struct qb_ipc_response_header res_header; struct iovec iov[1]; int32_t c = 0; int32_t j = 0; pid_t pid; int32_t res; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); ck_assert_int_eq(QB_TRUE, qb_ipcc_is_connected(conn)); req_header.id = IPC_MSG_REQ_SERVER_DISCONNECT; req_header.size = sizeof(struct qb_ipc_request_header); iov[0].iov_len = req_header.size; iov[0].iov_base = &req_header; res = qb_ipcc_sendv_recv(conn, iov, 1, &res_header, sizeof(struct qb_ipc_response_header), -1); /* * confirm we get -ENOTCONN or -ECONNRESET */ if (res != -ECONNRESET && res != -ENOTCONN) { qb_log(LOG_ERR, "id:%d size:%d", res_header.id, res_header.size); ck_assert_int_eq(res, -ENOTCONN); } ck_assert_int_eq(QB_FALSE, qb_ipcc_is_connected(conn)); qb_ipcc_disconnect(conn); stop_process(pid); } START_TEST(test_ipc_disconnect_after_created_us) { qb_enter(); disconnect_after_created = QB_TRUE; ipc_type = QB_IPC_SOCKET; ipc_name = __func__; test_ipc_disconnect_after_created(); qb_leave(); } END_TEST static void test_ipc_server_fail(void) { struct qb_ipc_request_header req_header; struct qb_ipc_response_header res_header; struct iovec iov[1]; int32_t res; int32_t j; int32_t c = 0; pid_t pid; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); /* * tell the server to exit */ req_header.id = IPC_MSG_REQ_SERVER_FAIL; req_header.size = sizeof(struct qb_ipc_request_header); iov[0].iov_len = req_header.size; iov[0].iov_base = &req_header; ck_assert_int_eq(QB_TRUE, qb_ipcc_is_connected(conn)); res = qb_ipcc_sendv_recv(conn, iov, 1, &res_header, sizeof(struct qb_ipc_response_header), -1); /* * confirm we get -ENOTCONN or ECONNRESET */ if (res != -ECONNRESET && res != -ENOTCONN) { qb_log(LOG_ERR, "id:%d size:%d", res_header.id, res_header.size); ck_assert_int_eq(res, -ENOTCONN); } ck_assert_int_eq(QB_FALSE, qb_ipcc_is_connected(conn)); qb_ipcc_disconnect(conn); stop_process(pid); } START_TEST(test_ipc_server_fail_soc) { qb_enter(); ipc_type = QB_IPC_SOCKET; ipc_name = __func__; test_ipc_server_fail(); qb_leave(); } END_TEST START_TEST(test_ipc_disp_shm) { qb_enter(); ipc_type = QB_IPC_SHM; ipc_name = __func__; test_ipc_dispatch(); qb_leave(); } END_TEST +START_TEST(test_ipc_stress_test_shm) +{ + qb_enter(); + send_event_on_created = QB_FALSE; + ipc_type = QB_IPC_SHM; + ipc_name = __func__; + test_ipc_stress_test(); + qb_leave(); +} +END_TEST + START_TEST(test_ipc_bulk_events_shm) { qb_enter(); ipc_type = QB_IPC_SHM; ipc_name = __func__; test_ipc_bulk_events(); qb_leave(); } END_TEST START_TEST(test_ipc_event_on_created_shm) { qb_enter(); send_event_on_created = QB_TRUE; ipc_type = QB_IPC_SHM; ipc_name = __func__; test_ipc_event_on_created(); qb_leave(); } END_TEST START_TEST(test_ipc_server_fail_shm) { qb_enter(); ipc_type = QB_IPC_SHM; ipc_name = __func__; test_ipc_server_fail(); qb_leave(); } END_TEST static void test_ipc_service_ref_count(void) { int32_t c = 0; int32_t j = 0; pid_t pid; reference_count_test = QB_TRUE; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); sleep(5); qb_ipcc_disconnect(conn); stop_process(pid); } START_TEST(test_ipc_service_ref_count_shm) { qb_enter(); ipc_type = QB_IPC_SHM; ipc_name = __func__; test_ipc_service_ref_count(); qb_leave(); } END_TEST START_TEST(test_ipc_service_ref_count_us) { qb_enter(); ipc_type = QB_IPC_SOCKET; ipc_name = __func__; test_ipc_service_ref_count(); qb_leave(); } END_TEST static Suite * make_shm_suite(void) { TCase *tc; Suite *s = suite_create("shm"); tc = tcase_create("ipc_server_fail_shm"); tcase_add_test(tc, test_ipc_server_fail_shm); tcase_set_timeout(tc, 6); suite_add_tcase(s, tc); tc = tcase_create("ipc_txrx_shm_block"); tcase_add_test(tc, test_ipc_txrx_shm_block); tcase_set_timeout(tc, 6); suite_add_tcase(s, tc); tc = tcase_create("ipc_txrx_shm_tmo"); tcase_add_test(tc, test_ipc_txrx_shm_tmo); tcase_set_timeout(tc, 6); suite_add_tcase(s, tc); tc = tcase_create("ipc_fc_shm"); tcase_add_test(tc, test_ipc_fc_shm); tcase_set_timeout(tc, 6); suite_add_tcase(s, tc); tc = tcase_create("ipc_dispatch_shm"); tcase_add_test(tc, test_ipc_disp_shm); tcase_set_timeout(tc, 16); suite_add_tcase(s, tc); + tc = tcase_create("ipc_stress_test_shm"); + tcase_add_test(tc, test_ipc_stress_test_shm); + tcase_set_timeout(tc, 16); + suite_add_tcase(s, tc); + tc = tcase_create("ipc_bulk_events_shm"); tcase_add_test(tc, test_ipc_bulk_events_shm); tcase_set_timeout(tc, 16); suite_add_tcase(s, tc); tc = tcase_create("ipc_exit_shm"); tcase_add_test(tc, test_ipc_exit_shm); tcase_set_timeout(tc, 3); suite_add_tcase(s, tc); tc = tcase_create("ipc_event_on_created_shm"); tcase_add_test(tc, test_ipc_event_on_created_shm); suite_add_tcase(s, tc); tc = tcase_create("ipc_service_ref_count_shm"); tcase_add_test(tc, test_ipc_service_ref_count_shm); tcase_set_timeout(tc, 10); suite_add_tcase(s, tc); return s; } static Suite * make_soc_suite(void) { Suite *s = suite_create("socket"); TCase *tc; tc = tcase_create("ipc_server_fail_soc"); tcase_add_test(tc, test_ipc_server_fail_soc); tcase_set_timeout(tc, 6); suite_add_tcase(s, tc); tc = tcase_create("ipc_txrx_us_block"); tcase_add_test(tc, test_ipc_txrx_us_block); tcase_set_timeout(tc, 6); suite_add_tcase(s, tc); tc = tcase_create("ipc_txrx_us_tmo"); tcase_add_test(tc, test_ipc_txrx_us_tmo); tcase_set_timeout(tc, 6); suite_add_tcase(s, tc); tc = tcase_create("ipc_fc_us"); tcase_add_test(tc, test_ipc_fc_us); tcase_set_timeout(tc, 6); suite_add_tcase(s, tc); tc = tcase_create("ipc_exit_us"); tcase_add_test(tc, test_ipc_exit_us); tcase_set_timeout(tc, 6); suite_add_tcase(s, tc); tc = tcase_create("ipc_dispatch_us"); tcase_add_test(tc, test_ipc_disp_us); tcase_set_timeout(tc, 16); suite_add_tcase(s, tc); + tc = tcase_create("ipc_stress_test_us"); + tcase_add_test(tc, test_ipc_stress_test_us); + tcase_set_timeout(tc, 60); + suite_add_tcase(s, tc); + tc = tcase_create("ipc_bulk_events_us"); tcase_add_test(tc, test_ipc_bulk_events_us); tcase_set_timeout(tc, 16); suite_add_tcase(s, tc); tc = tcase_create("ipc_event_on_created_us"); tcase_add_test(tc, test_ipc_event_on_created_us); suite_add_tcase(s, tc); tc = tcase_create("ipc_disconnect_after_created_us"); tcase_add_test(tc, test_ipc_disconnect_after_created_us); suite_add_tcase(s, tc); tc = tcase_create("ipc_service_ref_count_us"); tcase_add_test(tc, test_ipc_service_ref_count_us); tcase_set_timeout(tc, 10); suite_add_tcase(s, tc); return s; } int32_t main(void) { int32_t number_failed; SRunner *sr; Suite *s; int32_t do_shm_tests = QB_TRUE; #ifdef DISABLE_IPC_SHM do_shm_tests = QB_FALSE; #endif /* DISABLE_IPC_SHM */ s = make_soc_suite(); sr = srunner_create(s); if (do_shm_tests) { srunner_add_suite(sr, make_shm_suite()); } qb_log_init("check", LOG_USER, LOG_EMERG); qb_log_ctl(QB_LOG_SYSLOG, QB_LOG_CONF_ENABLED, QB_FALSE); qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD, QB_LOG_FILTER_FILE, "*", LOG_TRACE); qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE); qb_log_format_set(QB_LOG_STDERR, "lib/%f|%l| %b"); srunner_run_all(sr, CK_VERBOSE); number_failed = srunner_ntests_failed(sr); srunner_free(sr); return (number_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE; }