diff --git a/lib/ipc_int.h b/lib/ipc_int.h index a38ec98..50714c5 100644 --- a/lib/ipc_int.h +++ b/lib/ipc_int.h @@ -1,204 +1,205 @@ /* * Copyright (C) 2009 Red Hat, Inc. * * Author: Steven Dake * Angus Salkeld * * This file is part of libqb. * * libqb is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation, either version 2.1 of the License, or * (at your option) any later version. * * libqb is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with libqb. If not, see . */ #ifndef QB_IPC_INT_H_DEFINED #define QB_IPC_INT_H_DEFINED #include "os_base.h" #include #include #include #include #include #include #include #define QB_IPC_MAX_WAIT_MS 2000 /* Client Server SEND CONN REQ -> ACCEPT & CREATE queues or DENY <- SEND ACCEPT(with details)/DENY */ struct qb_ipc_connection_request { struct qb_ipc_request_header hdr; uint32_t max_msg_size; } __attribute__ ((aligned(8))); struct qb_ipc_event_connection_request { struct qb_ipc_request_header hdr; intptr_t connection; } __attribute__ ((aligned(8))); struct qb_ipc_connection_response { struct qb_ipc_response_header hdr; int32_t connection_type; uint32_t max_msg_size; intptr_t connection; char request[PATH_MAX]; char response[PATH_MAX]; char event[PATH_MAX]; } __attribute__ ((aligned(8))); struct qb_ipcc_connection; struct qb_ipc_one_way { size_t max_msg_size; enum qb_ipc_type type; union { struct { int32_t sock; + char *sock_name; void* shared_data; char shared_file_name[NAME_MAX]; } us; struct { qb_ringbuffer_t *rb; } shm; } u; }; struct qb_ipcc_funcs { ssize_t (*recv)(struct qb_ipc_one_way *one_way, void *buf, size_t buf_size, int32_t timeout); ssize_t (*send)(struct qb_ipc_one_way *one_way, const void *data, size_t size); ssize_t (*sendv)(struct qb_ipc_one_way *one_way, const struct iovec *iov, size_t iov_len); void (*disconnect)(struct qb_ipcc_connection* c); int32_t (*fc_get)(struct qb_ipc_one_way *one_way); }; struct qb_ipcc_connection { char name[NAME_MAX]; int32_t needs_sock_for_poll; struct qb_ipc_one_way setup; struct qb_ipc_one_way request; struct qb_ipc_one_way response; struct qb_ipc_one_way event; struct qb_ipcc_funcs funcs; struct qb_ipc_request_header *receive_buf; uint32_t fc_enable_max; int32_t is_connected; void * context; }; int32_t qb_ipcc_us_setup_connect(struct qb_ipcc_connection *c, struct qb_ipc_connection_response *r); ssize_t qb_ipc_us_send(struct qb_ipc_one_way *one_way, const void *msg, size_t len); ssize_t qb_ipc_us_recv(struct qb_ipc_one_way *one_way, void *msg, size_t len, int32_t timeout); -int32_t qb_ipc_us_ready(struct qb_ipc_one_way *one_way, int32_t ms_timeout, int32_t events); +int32_t qb_ipc_us_ready(struct qb_ipc_one_way *ow_data, struct qb_ipc_one_way *ow_conn, + int32_t ms_timeout, int32_t events); void qb_ipcc_us_sock_close(int32_t sock); int32_t qb_ipcc_us_connect(struct qb_ipcc_connection *c, struct qb_ipc_connection_response * response); int32_t qb_ipcc_shm_connect(struct qb_ipcc_connection *c, struct qb_ipc_connection_response * response); struct qb_ipcs_service; struct qb_ipcs_connection; struct qb_ipcs_funcs { int32_t (*connect)(struct qb_ipcs_service *s, struct qb_ipcs_connection *c, struct qb_ipc_connection_response *r); void (*disconnect)(struct qb_ipcs_connection *c); ssize_t (*recv)(struct qb_ipc_one_way *one_way, void *buf, size_t buf_size, int32_t timeout); ssize_t (*peek)(struct qb_ipc_one_way *one_way, void **data_out, int32_t timeout); void (*reclaim)(struct qb_ipc_one_way *one_way); ssize_t (*send)(struct qb_ipc_one_way *one_way, const void *data, size_t size); ssize_t (*sendv)(struct qb_ipc_one_way *one_way, const struct iovec* iov, size_t iov_len); void (*fc_set)(struct qb_ipc_one_way *one_way, int32_t fc_enable); ssize_t (*q_len_get)(struct qb_ipc_one_way *one_way); }; struct qb_ipcs_service { enum qb_ipc_type type; char name[NAME_MAX]; int32_t service_id; int32_t ref_count; pid_t pid; int32_t needs_sock_for_poll; int32_t server_sock; struct qb_ipcs_service_handlers serv_fns; struct qb_ipcs_poll_handlers poll_fns; struct qb_ipcs_funcs funcs; enum qb_loop_priority poll_priority; struct qb_list_head connections; struct qb_list_head list; struct qb_ipcs_stats stats; }; enum qb_ipcs_connection_state { QB_IPCS_CONNECTION_INACTIVE, QB_IPCS_CONNECTION_ACTIVE, QB_IPCS_CONNECTION_ESTABLISHED, QB_IPCS_CONNECTION_SHUTTING_DOWN, }; #define CONNECTION_DESCRIPTION (16) struct qb_ipcs_connection_auth { uid_t uid; gid_t gid; mode_t mode; }; struct qb_ipcs_connection { enum qb_ipcs_connection_state state; int32_t refcount; pid_t pid; uid_t euid; gid_t egid; struct qb_ipcs_connection_auth auth; struct qb_ipc_one_way setup; struct qb_ipc_one_way request; struct qb_ipc_one_way response; struct qb_ipc_one_way event; struct qb_ipcs_service *service; struct qb_list_head list; struct qb_ipc_request_header *receive_buf; void *context; int32_t fc_enabled; int32_t poll_events; int32_t outstanding_notifiers; char description[CONNECTION_DESCRIPTION]; struct qb_ipcs_connection_stats_2 stats; }; void qb_ipcs_us_init(struct qb_ipcs_service *s); void qb_ipcs_shm_init(struct qb_ipcs_service *s); int32_t qb_ipcs_us_publish(struct qb_ipcs_service *s); int32_t qb_ipcs_us_withdraw(struct qb_ipcs_service *s); int32_t qb_ipcc_us_sock_connect(const char *socket_name, int32_t * sock_pt); int32_t qb_ipcs_dispatch_connection_request(int32_t fd, int32_t revents, void *data); int32_t qb_ipcs_dispatch_service_request(int32_t fd, int32_t revents, void *data); struct qb_ipcs_connection* qb_ipcs_connection_alloc(struct qb_ipcs_service *s); -void qb_ipcs_sockets_disconnect(struct qb_ipcs_connection *c); int32_t qb_ipcs_process_request(struct qb_ipcs_service *s, struct qb_ipc_request_header *hdr); int32_t qb_ipc_us_sock_error_is_disconnected(int err); #endif /* QB_IPC_INT_H_DEFINED */ diff --git a/lib/ipc_setup.c b/lib/ipc_setup.c index deee409..d1b0831 100644 --- a/lib/ipc_setup.c +++ b/lib/ipc_setup.c @@ -1,785 +1,719 @@ /* * Copyright (C) 2010,2013 Red Hat, Inc. * * Author: Angus Salkeld * * This file is part of libqb. * * libqb is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation, either version 2.1 of the License, or * (at your option) any later version. * * libqb is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with libqb. If not, see . */ #include "os_base.h" #if defined(HAVE_GETPEERUCRED) #include #endif #ifdef HAVE_SYS_UN_H #include #endif /* HAVE_SYS_UN_H */ #ifdef HAVE_SYS_STAT_H #include #endif #ifdef HAVE_SYS_MMAN_H #include #endif #include #include #include #include #include "util_int.h" #include "ipc_int.h" struct ipc_auth_ugp { uid_t uid; gid_t gid; pid_t pid; }; static int32_t qb_ipcs_us_connection_acceptor(int fd, int revent, void *data); - #ifndef MSG_NOSIGNAL #define MSG_NOSIGNAL 0 #endif ssize_t qb_ipc_us_send(struct qb_ipc_one_way *one_way, const void *msg, size_t len) { int32_t result; int32_t processed = 0; char *rbuf = (char *)msg; qb_sigpipe_ctl(QB_SIGPIPE_IGNORE); retry_send: result = send(one_way->u.us.sock, - &rbuf[processed], - len - processed, - MSG_NOSIGNAL); + &rbuf[processed], len - processed, MSG_NOSIGNAL); if (result == -1) { if (errno == EAGAIN && processed > 0) { goto retry_send; } else { qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT); return -errno; } } processed += result; if (processed != len) { goto retry_send; } qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT); return processed; } static ssize_t qb_ipc_us_recv_msghdr(int32_t s, struct msghdr *hdr, char *msg, size_t len) { int32_t result; int32_t processed = 0; qb_sigpipe_ctl(QB_SIGPIPE_IGNORE); retry_recv: hdr->msg_iov->iov_base = &msg[processed]; hdr->msg_iov->iov_len = len - processed; result = recvmsg(s, hdr, MSG_NOSIGNAL | MSG_WAITALL); if (result == -1 && errno == EAGAIN) { goto retry_recv; } if (result == -1) { qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT); return -errno; } if (result == 0) { qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT); qb_util_log(LOG_DEBUG, "recv(fd %d) got 0 bytes assuming ENOTCONN", s); return -ENOTCONN; } processed += result; if (processed != len) { goto retry_recv; } qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT); assert(processed == len); return processed; } int32_t qb_ipc_us_sock_error_is_disconnected(int err) { if (err == -EAGAIN || err == -ETIMEDOUT || err == -EINTR || #ifdef EWOULDBLOCK err == -EWOULDBLOCK || #endif err == -EMSGSIZE || err == -ENOMSG || err == -EINVAL) { return QB_FALSE; } return QB_TRUE; } int32_t -qb_ipc_us_ready(struct qb_ipc_one_way * one_way, +qb_ipc_us_ready(struct qb_ipc_one_way * ow_data, + struct qb_ipc_one_way * ow_conn, int32_t ms_timeout, int32_t events) { - struct pollfd ufds; + struct pollfd ufds[2]; int32_t poll_events; + int numfds = 1; + int i; - ufds.fd = one_way->u.us.sock; - ufds.events = events; - ufds.revents = 0; + ufds[0].fd = ow_data->u.us.sock; + ufds[0].events = events; + ufds[0].revents = 0; - poll_events = poll(&ufds, 1, ms_timeout); + if (ow_conn && ow_data != ow_conn) { + numfds++; + ufds[1].fd = ow_conn->u.us.sock; + ufds[1].events = POLLIN; + ufds[1].revents = 0; + } + poll_events = poll(ufds, numfds, ms_timeout); if ((poll_events == -1 && errno == EINTR) || poll_events == 0) { return -EAGAIN; } else if (poll_events == -1) { return -errno; - } else if (poll_events == 1 && (ufds.revents & POLLERR)) { - qb_util_log(LOG_DEBUG, "poll(fd %d) got POLLERR", one_way->u.us.sock); - return -ENOTCONN; - } else if (poll_events == 1 && (ufds.revents & POLLHUP)) { - qb_util_log(LOG_DEBUG, "poll(fd %d) got POLLHUP", one_way->u.us.sock); - return -ENOTCONN; - } else if (poll_events == 1 && (ufds.revents & POLLNVAL)) { - qb_util_log(LOG_DEBUG, "poll(fd %d) got POLLNVAL", one_way->u.us.sock); - return -ENOTCONN; + } + for (i = 0; i < poll_events; i++) { + if (ufds[i].revents & POLLERR) { + qb_util_log(LOG_DEBUG, "poll(fd %d) got POLLERR", + ufds[i].fd); + return -ENOTCONN; + } else if (ufds[i].revents & POLLHUP) { + qb_util_log(LOG_DEBUG, "poll(fd %d) got POLLHUP", + ufds[i].fd); + return -ENOTCONN; + } else if (ufds[i].revents & POLLNVAL) { + qb_util_log(LOG_DEBUG, "poll(fd %d) got POLLNVAL", + ufds[i].fd); + return -ENOTCONN; + } else if (ufds[i].revents == 0) { + qb_util_log(LOG_DEBUG, "poll(fd %d) zero revents", + ufds[i].fd); + return -ENOTCONN; + } } return 0; } /* * recv an entire message - and try hard to get all of it. */ ssize_t qb_ipc_us_recv(struct qb_ipc_one_way * one_way, void *msg, size_t len, int32_t timeout) { int32_t result; int32_t final_rc = 0; int32_t processed = 0; int32_t to_recv = len; char *data = msg; qb_sigpipe_ctl(QB_SIGPIPE_IGNORE); retry_recv: result = recv(one_way->u.us.sock, &data[processed], to_recv, MSG_NOSIGNAL | MSG_WAITALL); if (result == -1) { - if (errno == EAGAIN && - (processed > 0 || timeout == -1)) { - result = qb_ipc_us_ready(one_way, timeout, - POLLIN); + if (errno == EAGAIN && (processed > 0 || timeout == -1)) { + result = qb_ipc_us_ready(one_way, NULL, timeout, POLLIN); if (result == 0 || result == -EAGAIN) { goto retry_recv; } final_rc = result; goto cleanup_sigpipe; } else if (errno == ECONNRESET || errno == EPIPE) { final_rc = -ENOTCONN; goto cleanup_sigpipe; } else { final_rc = -errno; goto cleanup_sigpipe; } } if (result == 0) { final_rc = -ENOTCONN; goto cleanup_sigpipe; } processed += result; to_recv -= result; if (processed != len) { goto retry_recv; } final_rc = processed; - cleanup_sigpipe: +cleanup_sigpipe: qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT); return final_rc; } -int32_t -qb_ipcc_us_sock_connect(const char *socket_name, int32_t * sock_pt) +static int32_t +qb_ipcc_stream_sock_connect(const char *socket_name, int32_t * sock_pt) { int32_t request_fd; struct sockaddr_un address; int32_t res = 0; request_fd = socket(PF_UNIX, SOCK_STREAM, 0); if (request_fd == -1) { return -errno; } qb_socket_nosigpipe(request_fd); res = qb_sys_fd_nonblock_cloexec_set(request_fd); if (res < 0) { goto error_connect; } memset(&address, 0, sizeof(struct sockaddr_un)); address.sun_family = AF_UNIX; #ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN address.sun_len = QB_SUN_LEN(&address); #endif #if defined(QB_LINUX) || defined(QB_CYGWIN) snprintf(address.sun_path + 1, UNIX_PATH_MAX - 1, "%s", socket_name); #else snprintf(address.sun_path, UNIX_PATH_MAX, "%s/%s", SOCKETDIR, socket_name); #endif if (connect(request_fd, (struct sockaddr *)&address, QB_SUN_LEN(&address)) == -1) { res = -errno; goto error_connect; } *sock_pt = request_fd; return 0; error_connect: close(request_fd); *sock_pt = -1; return res; } void qb_ipcc_us_sock_close(int32_t sock) { shutdown(sock, SHUT_RDWR); close(sock); } int32_t qb_ipcc_us_setup_connect(struct qb_ipcc_connection *c, struct qb_ipc_connection_response *r) { int32_t res; struct qb_ipc_connection_request request; #ifdef QB_LINUX int off = 0; int on = 1; #endif - res = qb_ipcc_us_sock_connect(c->name, &c->setup.u.us.sock); + res = qb_ipcc_stream_sock_connect(c->name, &c->setup.u.us.sock); if (res != 0) { return res; } - #ifdef QB_LINUX - setsockopt(c->setup.u.us.sock, SOL_SOCKET, SO_PASSCRED, &on, sizeof(on)); + setsockopt(c->setup.u.us.sock, SOL_SOCKET, SO_PASSCRED, &on, + sizeof(on)); #endif memset(&request, 0, sizeof(request)); request.hdr.id = QB_IPC_MSG_AUTHENTICATE; request.hdr.size = sizeof(request); request.max_msg_size = c->setup.max_msg_size; res = qb_ipc_us_send(&c->setup, &request, request.hdr.size); if (res < 0) { qb_ipcc_us_sock_close(c->setup.u.us.sock); return res; } - #ifdef QB_LINUX - setsockopt(c->setup.u.us.sock, SOL_SOCKET, SO_PASSCRED, &off, sizeof(off)); + setsockopt(c->setup.u.us.sock, SOL_SOCKET, SO_PASSCRED, &off, + sizeof(off)); #endif res = qb_ipc_us_recv(&c->setup, r, sizeof(struct qb_ipc_connection_response), -1); if (res < 0) { return res; } if (r->hdr.error != 0) { return r->hdr.error; } return 0; } /* ************************************************************************** * SERVER */ int32_t qb_ipcs_us_publish(struct qb_ipcs_service * s) { struct sockaddr_un un_addr; int32_t res; /* * Create socket for IPC clients, name socket, listen for connections */ s->server_sock = socket(PF_UNIX, SOCK_STREAM, 0); if (s->server_sock == -1) { res = -errno; qb_util_perror(LOG_ERR, "Cannot create server socket"); return res; } res = qb_sys_fd_nonblock_cloexec_set(s->server_sock); if (res < 0) { goto error_close; } memset(&un_addr, 0, sizeof(struct sockaddr_un)); un_addr.sun_family = AF_UNIX; #if defined(QB_BSD) || defined(QB_DARWIN) un_addr.sun_len = SUN_LEN(&un_addr); #endif qb_util_log(LOG_INFO, "server name: %s", s->name); #if defined(QB_LINUX) || defined(QB_CYGWIN) snprintf(un_addr.sun_path + 1, UNIX_PATH_MAX - 1, "%s", s->name); #else { struct stat stat_out; res = stat(SOCKETDIR, &stat_out); if (res == -1 || (res == 0 && !S_ISDIR(stat_out.st_mode))) { res = -errno; qb_util_log(LOG_CRIT, "Required directory not present %s", SOCKETDIR); goto error_close; } snprintf(un_addr.sun_path, UNIX_PATH_MAX, "%s/%s", SOCKETDIR, s->name); unlink(un_addr.sun_path); } #endif res = bind(s->server_sock, (struct sockaddr *)&un_addr, QB_SUN_LEN(&un_addr)); if (res) { res = -errno; qb_util_perror(LOG_ERR, "Could not bind AF_UNIX (%s)", un_addr.sun_path); goto error_close; } /* * Allow everyone to write to the socket since the IPC layer handles * security automatically */ #if !defined(QB_LINUX) && !defined(QB_CYGWIN) res = chmod(un_addr.sun_path, S_IRWXU | S_IRWXG | S_IRWXO); #endif if (listen(s->server_sock, SERVER_BACKLOG) == -1) { qb_util_perror(LOG_ERR, "socket listen failed"); } res = s->poll_fns.dispatch_add(s->poll_priority, s->server_sock, POLLIN | POLLPRI | POLLNVAL, s, qb_ipcs_us_connection_acceptor); return res; error_close: close(s->server_sock); return res; } int32_t qb_ipcs_us_withdraw(struct qb_ipcs_service * s) { qb_util_log(LOG_INFO, "withdrawing server sockets"); shutdown(s->server_sock, SHUT_RDWR); close(s->server_sock); return 0; } static int32_t handle_new_connection(struct qb_ipcs_service *s, int32_t auth_result, int32_t sock, void *msg, size_t len, struct ipc_auth_ugp *ugp) { struct qb_ipcs_connection *c = NULL; struct qb_ipc_connection_request *req = msg; int32_t res = auth_result; int32_t res2 = 0; struct qb_ipc_connection_response response; c = qb_ipcs_connection_alloc(s); if (c == NULL) { qb_ipcc_us_sock_close(sock); return -ENOMEM; } c->receive_buf = calloc(1, req->max_msg_size); if (c->receive_buf == NULL) { free(c); qb_ipcc_us_sock_close(sock); return -ENOMEM; } c->setup.u.us.sock = sock; c->request.max_msg_size = req->max_msg_size; c->response.max_msg_size = req->max_msg_size; c->event.max_msg_size = req->max_msg_size; c->pid = ugp->pid; c->auth.uid = c->euid = ugp->uid; c->auth.gid = c->egid = ugp->gid; c->auth.mode = 0600; c->stats.client_pid = ugp->pid; snprintf(c->description, CONNECTION_DESCRIPTION, - "%d-%d-%d", s->pid, ugp->pid, - c->setup.u.us.sock); + "%d-%d-%d", s->pid, ugp->pid, c->setup.u.us.sock); if (auth_result == 0 && c->service->serv_fns.connection_accept) { res = c->service->serv_fns.connection_accept(c, c->euid, c->egid); } if (res != 0) { goto send_response; } qb_util_log(LOG_DEBUG, "IPC credentials authenticated (%s)", c->description); memset(&response, 0, sizeof(response)); if (s->funcs.connect) { res = s->funcs.connect(s, c, &response); if (res != 0) { goto send_response; } } /* * The connection is good, add it to the active connection list */ c->state = QB_IPCS_CONNECTION_ACTIVE; qb_list_add(&c->list, &s->connections); - if (s->needs_sock_for_poll) { - qb_ipcs_connection_ref(c); - res = s->poll_fns.dispatch_add(s->poll_priority, - c->setup.u.us.sock, - POLLIN | POLLPRI | POLLNVAL, - c, - qb_ipcs_dispatch_connection_request); - if (res < 0) { - qb_util_log(LOG_ERR, - "Error adding socket to mainloop (%s).", - c->description); - } - } - if (s->type == QB_IPC_SOCKET) { - c->request.u.us.sock = c->setup.u.us.sock; - c->response.u.us.sock = c->setup.u.us.sock; - } - send_response: response.hdr.id = QB_IPC_MSG_AUTHENTICATE; response.hdr.size = sizeof(response); response.hdr.error = res; if (res == 0) { response.connection = (intptr_t) c; response.connection_type = s->type; response.max_msg_size = c->request.max_msg_size; s->stats.active_connections++; } res2 = qb_ipc_us_send(&c->setup, &response, response.hdr.size); if (res == 0 && res2 != response.hdr.size) { res = res2; } if (res == 0) { - if (s->type != QB_IPC_SOCKET) { - qb_ipcs_connection_ref(c); - if (s->serv_fns.connection_created) { - s->serv_fns.connection_created(c); - } - if (c->state == QB_IPCS_CONNECTION_ACTIVE) { - c->state = QB_IPCS_CONNECTION_ESTABLISHED; - } - qb_ipcs_connection_unref(c); + qb_ipcs_connection_ref(c); + if (s->serv_fns.connection_created) { + s->serv_fns.connection_created(c); + } + if (c->state == QB_IPCS_CONNECTION_ACTIVE) { + c->state = QB_IPCS_CONNECTION_ESTABLISHED; } + qb_ipcs_connection_unref(c); } else { if (res == -EACCES) { qb_util_log(LOG_ERR, "Invalid IPC credentials (%s).", c->description); } else { errno = -res; - qb_util_perror(LOG_ERR, "Error in connection setup (%s)", + qb_util_perror(LOG_ERR, + "Error in connection setup (%s)", c->description); } qb_ipcs_disconnect(c); } return res; } -static void -handle_connection_new_sock(struct qb_ipcs_service *s, int32_t sock, void *msg) -{ - struct qb_ipcs_connection *c = NULL; - struct qb_ipc_event_connection_request *req = msg; - - c = (struct qb_ipcs_connection *)req->connection; - qb_ipcs_connection_ref(c); - c->event.u.us.sock = sock; - if (c->state == QB_IPCS_CONNECTION_ACTIVE) { - c->state = QB_IPCS_CONNECTION_ESTABLISHED; - } - if (s->serv_fns.connection_created) { - s->serv_fns.connection_created(c); - } - - if (c->state == QB_IPCS_CONNECTION_ESTABLISHED && - s->type == QB_IPC_SOCKET) { - int32_t res; - qb_ipcs_connection_ref(c); - res = s->poll_fns.dispatch_add(s->poll_priority, - c->request.u.us.sock, - POLLIN | POLLPRI | POLLNVAL, - c, - qb_ipcs_dispatch_connection_request); - if (res < 0) { - qb_util_log(LOG_ERR, - "Error adding socket to mainloop (%s).", - c->description); - } - } - - qb_ipcs_connection_unref(c); -} - static int32_t qb_ipcs_uc_recv_and_auth(int32_t sock, void *msg, size_t len, struct ipc_auth_ugp *ugp) { int32_t res = 0; struct msghdr msg_recv; struct iovec iov_recv; #ifdef SO_PASSCRED char cmsg_cred[CMSG_SPACE(sizeof(struct ucred))]; int off = 0; int on = 1; #endif msg_recv.msg_iov = &iov_recv; msg_recv.msg_iovlen = 1; msg_recv.msg_name = 0; msg_recv.msg_namelen = 0; #ifdef SO_PASSCRED msg_recv.msg_control = (void *)cmsg_cred; msg_recv.msg_controllen = sizeof(cmsg_cred); #endif #ifdef QB_SOLARIS msg_recv.msg_accrights = 0; msg_recv.msg_accrightslen = 0; #else msg_recv.msg_flags = 0; #endif /* QB_SOLARIS */ iov_recv.iov_base = msg; iov_recv.iov_len = len; #ifdef SO_PASSCRED setsockopt(sock, SOL_SOCKET, SO_PASSCRED, &on, sizeof(on)); #endif res = qb_ipc_us_recv_msghdr(sock, &msg_recv, msg, len); if (res < 0) { goto cleanup_and_return; } if (res != len) { res = -EIO; goto cleanup_and_return; } /* * currently support getpeerucred, getpeereid, and SO_PASSCRED credential * retrieval mechanisms for various Platforms */ #ifdef HAVE_GETPEERUCRED /* * Solaris and some BSD systems */ { ucred_t *uc = NULL; if (getpeerucred(sock, &uc) == 0) { res = 0; ugp->uid = ucred_geteuid(uc); ugp->gid = ucred_getegid(uc); ugp->pid = ucred_getpid(uc); ucred_free(uc); } else { res = -errno; } } #elif HAVE_GETPEEREID /* * Usually MacOSX systems */ { /* * TODO get the peer's pid. * c->pid = ?; */ if (getpeereid(sock, &ugp->uid, &ugp->gid) == 0) { res = 0; } else { res = -errno; } } #elif SO_PASSCRED /* * Usually Linux systems */ { struct ucred cred; struct cmsghdr *cmsg; res = -EINVAL; - for (cmsg = CMSG_FIRSTHDR(&msg_recv); cmsg != NULL; cmsg = CMSG_NXTHDR(&msg_recv, cmsg)) { + for (cmsg = CMSG_FIRSTHDR(&msg_recv); cmsg != NULL; + cmsg = CMSG_NXTHDR(&msg_recv, cmsg)) { if (cmsg->cmsg_type != SCM_CREDENTIALS) continue; memcpy(&cred, CMSG_DATA(cmsg), sizeof(struct ucred)); res = 0; ugp->pid = cred.pid; ugp->uid = cred.uid; ugp->gid = cred.gid; break; } } #else /* no credentials */ ugp->pid = 0; ugp->uid = 0; ugp->gid = 0; res = -ENOTSUP; #endif /* no credentials */ cleanup_and_return: #ifdef SO_PASSCRED setsockopt(sock, SOL_SOCKET, SO_PASSCRED, &off, sizeof(off)); #endif return res; } static int32_t qb_ipcs_us_connection_acceptor(int fd, int revent, void *data) { struct sockaddr_un un_addr; int32_t new_fd; struct qb_ipcs_service *s = (struct qb_ipcs_service *)data; int32_t res; struct qb_ipc_connection_request setup_msg; struct ipc_auth_ugp ugp; socklen_t addrlen = sizeof(struct sockaddr_un); - if (revent & (POLLNVAL|POLLHUP|POLLERR)) { + if (revent & (POLLNVAL | POLLHUP | POLLERR)) { /* * handle shutdown more cleanly. */ return -1; } retry_accept: errno = 0; new_fd = accept(fd, (struct sockaddr *)&un_addr, &addrlen); if (new_fd == -1 && errno == EINTR) { goto retry_accept; } if (new_fd == -1 && errno == EBADF) { qb_util_perror(LOG_ERR, "Could not accept client connection from fd:%d", fd); return -1; } if (new_fd == -1) { qb_util_perror(LOG_ERR, "Could not accept client connection"); /* This is an error, but -1 would indicate disconnect * from the poll loop */ return 0; } res = qb_sys_fd_nonblock_cloexec_set(new_fd); if (res < 0) { close(new_fd); /* This is an error, but -1 would indicate disconnect * from the poll loop */ return 0; } res = qb_ipcs_uc_recv_and_auth(new_fd, &setup_msg, sizeof(setup_msg), &ugp); if (res < 0) { close(new_fd); /* This is an error, but -1 would indicate disconnect * from the poll loop */ return 0; } if (setup_msg.hdr.id == QB_IPC_MSG_AUTHENTICATE) { (void)handle_new_connection(s, res, new_fd, &setup_msg, sizeof(setup_msg), &ugp); - } else if (setup_msg.hdr.id == QB_IPC_MSG_NEW_EVENT_SOCK) { - if (res == 0) { - handle_connection_new_sock(s, new_fd, &setup_msg); - } else { - close(new_fd); - } } else { close(new_fd); } return 0; } - -void -qb_ipcs_sockets_disconnect(struct qb_ipcs_connection *c) -{ - int sock = -1; - - qb_enter(); - if (c->service->needs_sock_for_poll && c->setup.u.us.sock > 0) { - sock = c->setup.u.us.sock; - qb_ipcc_us_sock_close(sock); - c->setup.u.us.sock = -1; - } - if (c->request.type == QB_IPC_SOCKET) { - sock = c->request.u.us.sock; - } - if (sock > 0) { - (void)c->service->poll_fns.dispatch_del(sock); - qb_ipcs_connection_unref(c); - } -} diff --git a/lib/ipc_shm.c b/lib/ipc_shm.c index f719b6b..79fbf80 100644 --- a/lib/ipc_shm.c +++ b/lib/ipc_shm.c @@ -1,342 +1,367 @@ /* * Copyright (C) 2010 Red Hat, Inc. * * Author: Angus Salkeld * * This file is part of libqb. * * libqb is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation, either version 2.1 of the License, or * (at your option) any later version. * * libqb is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with libqb. If not, see . */ #include "os_base.h" #include "ipc_int.h" #include "util_int.h" #include "ringbuffer_int.h" #include #include #include #include /* * utility functions * -------------------------------------------------------- */ /* * client functions * -------------------------------------------------------- */ static void qb_ipcc_shm_disconnect(struct qb_ipcc_connection *c) { if (c->is_connected) { qb_rb_close(c->request.u.shm.rb); qb_rb_close(c->response.u.shm.rb); qb_rb_close(c->event.u.shm.rb); } else { qb_rb_force_close(c->request.u.shm.rb); qb_rb_force_close(c->response.u.shm.rb); qb_rb_force_close(c->event.u.shm.rb); } } static ssize_t qb_ipc_shm_send(struct qb_ipc_one_way *one_way, const void *msg_ptr, size_t msg_len) { return qb_rb_chunk_write(one_way->u.shm.rb, msg_ptr, msg_len); } static ssize_t qb_ipc_shm_sendv(struct qb_ipc_one_way *one_way, const struct iovec *iov, size_t iov_len) { char *dest; int32_t res = 0; int32_t total_size = 0; int32_t i; char *pt = NULL; if (one_way->u.shm.rb == NULL) { return -ENOTCONN; } for (i = 0; i < iov_len; i++) { total_size += iov[i].iov_len; } dest = qb_rb_chunk_alloc(one_way->u.shm.rb, total_size); if (dest == NULL) { return -errno; } pt = dest; for (i = 0; i < iov_len; i++) { memcpy(pt, iov[i].iov_base, iov[i].iov_len); pt += iov[i].iov_len; } res = qb_rb_chunk_commit(one_way->u.shm.rb, total_size); if (res < 0) { return res; } return total_size; } static ssize_t qb_ipc_shm_recv(struct qb_ipc_one_way *one_way, void *msg_ptr, size_t msg_len, int32_t ms_timeout) { if (one_way->u.shm.rb == NULL) { return -ENOTCONN; } return qb_rb_chunk_read(one_way->u.shm.rb, (void *)msg_ptr, msg_len, ms_timeout); } static ssize_t qb_ipc_shm_peek(struct qb_ipc_one_way *one_way, void **data_out, int32_t ms_timeout) { ssize_t rc; if (one_way->u.shm.rb == NULL) { return -ENOTCONN; } rc = qb_rb_chunk_peek(one_way->u.shm.rb, data_out, ms_timeout); if (rc == 0) { return -EAGAIN; } return rc; } static void qb_ipc_shm_reclaim(struct qb_ipc_one_way *one_way) { if (one_way->u.shm.rb != NULL) { qb_rb_chunk_reclaim(one_way->u.shm.rb); } } static void qb_ipc_shm_fc_set(struct qb_ipc_one_way *one_way, int32_t fc_enable) { int32_t *fc; fc = qb_rb_shared_user_data_get(one_way->u.shm.rb); qb_util_log(LOG_TRACE, "setting fc to %d", fc_enable); qb_atomic_int_set(fc, fc_enable); } static int32_t qb_ipc_shm_fc_get(struct qb_ipc_one_way *one_way) { int32_t *fc; int32_t rc = qb_rb_refcount_get(one_way->u.shm.rb); if (rc != 2) { return -ENOTCONN; } fc = qb_rb_shared_user_data_get(one_way->u.shm.rb); return qb_atomic_int_get(fc); } static ssize_t qb_ipc_shm_q_len_get(struct qb_ipc_one_way *one_way) { if (one_way->u.shm.rb == NULL) { return -ENOTCONN; } return qb_rb_chunks_used(one_way->u.shm.rb); } int32_t qb_ipcc_shm_connect(struct qb_ipcc_connection * c, struct qb_ipc_connection_response * response) { int32_t res = 0; c->funcs.send = qb_ipc_shm_send; c->funcs.sendv = qb_ipc_shm_sendv; c->funcs.recv = qb_ipc_shm_recv; c->funcs.fc_get = qb_ipc_shm_fc_get; c->funcs.disconnect = qb_ipcc_shm_disconnect; c->needs_sock_for_poll = QB_TRUE; if (strlen(c->name) > (NAME_MAX - 20)) { errno = EINVAL; return -errno; } c->request.u.shm.rb = qb_rb_open(response->request, c->request.max_msg_size, QB_RB_FLAG_SHARED_PROCESS, sizeof(int32_t)); if (c->request.u.shm.rb == NULL) { res = -errno; qb_util_perror(LOG_ERR, "qb_rb_open:REQUEST"); goto return_error; } c->response.u.shm.rb = qb_rb_open(response->response, c->response.max_msg_size, QB_RB_FLAG_SHARED_PROCESS, 0); if (c->response.u.shm.rb == NULL) { res = -errno; qb_util_perror(LOG_ERR, "qb_rb_open:RESPONSE"); goto cleanup_request; } c->event.u.shm.rb = qb_rb_open(response->event, c->response.max_msg_size, QB_RB_FLAG_SHARED_PROCESS, 0); if (c->event.u.shm.rb == NULL) { res = -errno; qb_util_perror(LOG_ERR, "qb_rb_open:EVENT"); goto cleanup_request_response; } return 0; cleanup_request_response: qb_rb_close(c->response.u.shm.rb); cleanup_request: qb_rb_close(c->request.u.shm.rb); return_error: errno = -res; qb_util_perror(LOG_ERR, "connection failed"); return res; } /* * service functions * -------------------------------------------------------- */ static void qb_ipcs_shm_disconnect(struct qb_ipcs_connection *c) { - if (c->response.u.shm.rb) { - qb_rb_close(c->response.u.shm.rb); - c->response.u.shm.rb = NULL; - } - if (c->event.u.shm.rb) { - qb_rb_close(c->event.u.shm.rb); - c->event.u.shm.rb = NULL; + if (c->state == QB_IPCS_CONNECTION_ESTABLISHED || + c->state == QB_IPCS_CONNECTION_ACTIVE) { + if (c->setup.u.us.sock > 0) { + qb_ipcc_us_sock_close(c->setup.u.us.sock); + (void)c->service->poll_fns.dispatch_del(c->setup.u.us.sock); + qb_ipcs_connection_unref(c); + c->setup.u.us.sock = -1; + } } - if (c->request.u.shm.rb) { - qb_rb_close(c->request.u.shm.rb); - c->request.u.shm.rb = NULL; + if (c->state == QB_IPCS_CONNECTION_SHUTTING_DOWN || + c->state == QB_IPCS_CONNECTION_ACTIVE) { + if (c->response.u.shm.rb) { + qb_rb_close(c->response.u.shm.rb); + c->response.u.shm.rb = NULL; + } + if (c->event.u.shm.rb) { + qb_rb_close(c->event.u.shm.rb); + c->event.u.shm.rb = NULL; + } + if (c->request.u.shm.rb) { + qb_rb_close(c->request.u.shm.rb); + c->request.u.shm.rb = NULL; + } } } static int32_t qb_ipcs_shm_rb_open(struct qb_ipcs_connection *c, struct qb_ipc_one_way *ow, const char *rb_name) { int32_t res = 0; ow->u.shm.rb = qb_rb_open(rb_name, ow->max_msg_size, QB_RB_FLAG_CREATE | QB_RB_FLAG_SHARED_PROCESS, sizeof(int32_t)); if (ow->u.shm.rb == NULL) { res = -errno; qb_util_perror(LOG_ERR, "qb_rb_open:%s", rb_name); return res; } res = qb_rb_chown(ow->u.shm.rb, c->auth.uid, c->auth.gid); if (res != 0) { qb_util_perror(LOG_ERR, "qb_rb_chown:%s", rb_name); goto cleanup; } res = qb_rb_chmod(ow->u.shm.rb, c->auth.mode); if (res != 0) { qb_util_perror(LOG_ERR, "qb_rb_chmod:%s", rb_name); goto cleanup; } return res; cleanup: qb_rb_close(ow->u.shm.rb); return res; } static int32_t qb_ipcs_shm_connect(struct qb_ipcs_service *s, struct qb_ipcs_connection *c, struct qb_ipc_connection_response *r) { int32_t res; qb_util_log(LOG_DEBUG, "connecting to client [%d]", c->pid); snprintf(r->request, NAME_MAX, "%s-request-%s", s->name, c->description); snprintf(r->response, NAME_MAX, "%s-response-%s", s->name, c->description); snprintf(r->event, NAME_MAX, "%s-event-%s", s->name, c->description); res = qb_ipcs_shm_rb_open(c, &c->request, r->request); if (res != 0) { goto cleanup; } res = qb_ipcs_shm_rb_open(c, &c->response, r->response); if (res != 0) { goto cleanup_request; } res = qb_ipcs_shm_rb_open(c, &c->event, r->event); if (res != 0) { goto cleanup_request_response; } + res = s->poll_fns.dispatch_add(s->poll_priority, + c->setup.u.us.sock, + POLLIN | POLLPRI | POLLNVAL, + c, qb_ipcs_dispatch_connection_request); + if (res == 0) { + qb_ipcs_connection_ref(c); + } else { + qb_util_log(LOG_ERR, + "Error adding socket to mainloop (%s).", + c->description); + goto cleanup_request_response; + } + r->hdr.error = 0; return 0; cleanup_request_response: qb_rb_close(c->request.u.shm.rb); cleanup_request: qb_rb_close(c->response.u.shm.rb); cleanup: r->hdr.error = res; errno = -res; qb_util_perror(LOG_ERR, "shm connection FAILED"); return res; } void qb_ipcs_shm_init(struct qb_ipcs_service *s) { s->funcs.connect = qb_ipcs_shm_connect; s->funcs.disconnect = qb_ipcs_shm_disconnect; s->funcs.recv = qb_ipc_shm_recv; s->funcs.peek = qb_ipc_shm_peek; s->funcs.reclaim = qb_ipc_shm_reclaim; s->funcs.send = qb_ipc_shm_send; s->funcs.sendv = qb_ipc_shm_sendv; s->funcs.fc_set = qb_ipc_shm_fc_set; s->funcs.q_len_get = qb_ipc_shm_q_len_get; s->needs_sock_for_poll = QB_TRUE; } diff --git a/lib/ipc_socket.c b/lib/ipc_socket.c index b366039..10a694e 100644 --- a/lib/ipc_socket.c +++ b/lib/ipc_socket.c @@ -1,412 +1,644 @@ /* * Copyright (C) 2010,2013 Red Hat, Inc. * * Author: Angus Salkeld * * This file is part of libqb. * * libqb is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation, either version 2.1 of the License, or * (at your option) any later version. * * libqb is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with libqb. If not, see . */ #include "os_base.h" #ifdef HAVE_SYS_UN_H #include #endif /* HAVE_SYS_UN_H */ #ifdef HAVE_SYS_MMAN_H #include #endif #include #include #include #include #include "util_int.h" #include "ipc_int.h" struct ipc_us_control { int32_t sent; int32_t flow_control; }; #define SHM_CONTROL_SIZE (3 * sizeof(struct ipc_us_control)) +static void +set_sock_addr(struct sockaddr_un *address, const char *socket_name) +{ + memset(address, 0, sizeof(struct sockaddr_un)); + address->sun_family = AF_UNIX; +#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN + address->sun_len = QB_SUN_LEN(&address); +#endif + +#if defined(QB_LINUX) || defined(QB_CYGWIN) + snprintf(address->sun_path + 1, UNIX_PATH_MAX - 1, "%s", socket_name); +#else + snprintf(address->sun_path, UNIX_PATH_MAX, "%s/%s", SOCKETDIR, + socket_name); +#endif +} + +static int32_t +qb_ipc_dgram_sock_setup(const char *base_name, + const char *service_name, int32_t * sock_pt) +{ + int32_t request_fd; + struct sockaddr_un local_address; + int32_t res = 0; + char sock_path[PATH_MAX]; + + request_fd = socket(PF_UNIX, SOCK_DGRAM, 0); + if (request_fd == -1) { + return -errno; + } + + qb_socket_nosigpipe(request_fd); + res = qb_sys_fd_nonblock_cloexec_set(request_fd); + if (res < 0) { + goto error_connect; + } + snprintf(sock_path, PATH_MAX, "%s-%s", base_name, service_name); + set_sock_addr(&local_address, sock_path); + res = bind(request_fd, (struct sockaddr *)&local_address, + sizeof(local_address)); + if (res < 0) { + goto error_connect; + } + + *sock_pt = request_fd; + return 0; + +error_connect: + close(request_fd); + *sock_pt = -1; + + return res; +} + +static int32_t +set_sock_size(int sockfd, size_t max_msg_size) +{ + int32_t rc; + unsigned int optval; + socklen_t optlen = sizeof(optval); + + rc = getsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, &optval, &optlen); + + qb_util_log(LOG_DEBUG, "%d: getsockopt(%d, needed:%d) actual:%d", + rc, sockfd, max_msg_size, optval); + + if (rc == 0 && optval < max_msg_size) { + optval = max_msg_size; + optlen = sizeof(optval); + rc = setsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, &optval, optlen); + } + return rc; +} + +/* + * bind to "base_name-local_name" + * connect to "base_name-remote_name" + * output sock_pt + */ +static int32_t +qb_ipc_dgram_sock_connect(const char *base_name, + const char *local_name, + const char *remote_name, + int32_t max_msg_size, int32_t * sock_pt) +{ + char sock_path[PATH_MAX]; + struct sockaddr_un remote_address; + int32_t res = qb_ipc_dgram_sock_setup(base_name, local_name, + sock_pt); + if (res < 0) { + return res; + } + + snprintf(sock_path, PATH_MAX, "%s-%s", base_name, remote_name); + set_sock_addr(&remote_address, sock_path); + if (connect(*sock_pt, (struct sockaddr *)&remote_address, + QB_SUN_LEN(&remote_address)) == -1) { + res = -errno; + goto error_connect; + } + + return set_sock_size(*sock_pt, max_msg_size); + +error_connect: + close(*sock_pt); + *sock_pt = -1; + + return res; +} + +static int32_t +_finish_connecting(struct qb_ipc_one_way *one_way) +{ + struct sockaddr_un remote_address; + int res; + int error; + int retry = 0; + + set_sock_addr(&remote_address, one_way->u.us.sock_name); + + /* this retry loop is here to help connecting when trying to send + * an event right after connection setup. + */ + do { + errno = 0; + res = connect(one_way->u.us.sock, + (struct sockaddr *)&remote_address, + QB_SUN_LEN(&remote_address)); + if (res == -1) { + error = -errno; + qb_util_perror(LOG_DEBUG, "error calling connect()"); + retry++; + usleep(100000); + } + } while (res == -1 && retry < 10); + if (res == -1) { + return error; + } + + free(one_way->u.us.sock_name); + one_way->u.us.sock_name = NULL; + + return set_sock_size(one_way->u.us.sock, one_way->max_msg_size); +} /* * client functions * -------------------------------------------------------- */ static void qb_ipcc_us_disconnect(struct qb_ipcc_connection *c) { munmap(c->request.u.us.shared_data, SHM_CONTROL_SIZE); unlink(c->request.u.us.shared_file_name); close(c->request.u.us.sock); close(c->event.u.us.sock); } static ssize_t qb_ipc_socket_send(struct qb_ipc_one_way *one_way, - const void *msg_ptr, size_t msg_len) + const void *msg_ptr, size_t msg_len) { ssize_t rc = 0; - struct ipc_us_control *ctl = NULL; - + struct ipc_us_control *ctl; ctl = (struct ipc_us_control *)one_way->u.us.shared_data; - rc = qb_ipc_us_send(one_way, msg_ptr, msg_len); + if (one_way->u.us.sock_name) { + rc = _finish_connecting(one_way); + if (rc < 0) { + qb_util_log(LOG_ERR, "socket connect-on-send"); + return rc; + } + } + + qb_sigpipe_ctl(QB_SIGPIPE_IGNORE); + rc = send(one_way->u.us.sock, msg_ptr, msg_len, MSG_NOSIGNAL); + if (rc == -1) { + rc = -errno; + if (errno != EAGAIN) { + qb_util_perror(LOG_ERR, "socket_send:send"); + } + } + qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT); - if (rc == msg_len && ctl) { + if (ctl && rc == msg_len) { qb_atomic_int_inc(&ctl->sent); } return rc; } static ssize_t qb_ipc_socket_sendv(struct qb_ipc_one_way *one_way, const struct iovec *iov, size_t iov_len) { - int32_t result; - int32_t processed = 0; - int32_t total_processed = 0; - int32_t iov_p = 0; + int32_t rc; struct ipc_us_control *ctl; - char *rbuf = (char *)iov[iov_p].iov_base; - ctl = (struct ipc_us_control *)one_way->u.us.shared_data; qb_sigpipe_ctl(QB_SIGPIPE_IGNORE); -retry_send: - result = send(one_way->u.us.sock, - &rbuf[processed], - iov[iov_p].iov_len - processed, - MSG_NOSIGNAL); - - if (result == -1) { - if (errno == EAGAIN && - (processed > 0 || iov_p > 0)) { - goto retry_send; - } else { - qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT); - return -errno; + if (one_way->u.us.sock_name) { + rc = _finish_connecting(one_way); + if (rc < 0) { + qb_util_perror(LOG_ERR, "socket connect-on-sendv"); + return rc; } } - processed += result; - if (processed == iov[iov_p].iov_len) { - iov_p++; - total_processed += processed; - if (iov_p < iov_len) { - processed = 0; - rbuf = (char *)iov[iov_p].iov_base; - goto retry_send; + rc = writev(one_way->u.us.sock, iov, iov_len); + + if (rc == -1) { + rc = -errno; + if (errno != EAGAIN) { + qb_util_perror(LOG_ERR, "socket_sendv:writev %d", + one_way->u.us.sock); } - } else { - goto retry_send; } qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT); - if (total_processed > 0 && ctl) { + if (ctl && rc > 0) { qb_atomic_int_inc(&ctl->sent); } - return total_processed; + return rc; } - /* * recv a message of unknown size. */ static ssize_t -qb_ipc_us_recv_at_most(struct qb_ipc_one_way * one_way, - void *msg, size_t len, int32_t timeout) +qb_ipc_us_recv_at_most(struct qb_ipc_one_way *one_way, + void *msg, size_t len, int32_t timeout) { int32_t result; int32_t final_rc = 0; - int32_t processed = 0; - int32_t to_recv = sizeof(struct qb_ipc_request_header); + int32_t to_recv = 0; char *data = msg; struct ipc_us_control *ctl = NULL; - struct qb_ipc_request_header *hdr = NULL; + int32_t time_waited = 0; + int32_t time_to_wait = timeout; + + if (timeout == -1) { + time_to_wait = 1000; + } qb_sigpipe_ctl(QB_SIGPIPE_IGNORE); -retry_recv: - result = recv(one_way->u.us.sock, &data[processed], to_recv, - MSG_NOSIGNAL | MSG_WAITALL); +retry_peek: + result = recv(one_way->u.us.sock, data, + sizeof(struct qb_ipc_request_header), + MSG_NOSIGNAL | MSG_PEEK); + if (result == -1) { - if (errno == EAGAIN && - (processed > 0 || timeout == -1)) { - /* - * Don't spin too hard else we can consume too - * much cpu. - */ - result = qb_ipc_us_ready(one_way, - 100, - POLLIN); - if (result == 0 || result == -EAGAIN) { - goto retry_recv; - } - final_rc = result; - goto cleanup_sigpipe; + if (errno == EAGAIN && (time_waited < timeout || timeout == -1)) { + result = qb_ipc_us_ready(one_way, NULL, + time_to_wait, POLLIN); + time_waited += time_to_wait; + goto retry_peek; } else { - final_rc = -errno; - goto cleanup_sigpipe; + return -errno; } + } + if (result >= sizeof(struct qb_ipc_request_header)) { + struct qb_ipc_request_header *hdr = NULL; + hdr = (struct qb_ipc_request_header *)msg; + to_recv = hdr->size; + } + + result = recv(one_way->u.us.sock, data, to_recv, + MSG_NOSIGNAL | MSG_WAITALL); + if (result == -1) { + final_rc = -errno; + goto cleanup_sigpipe; } else if (result == 0) { + qb_util_log(LOG_DEBUG, "recv == 0 -> ENOTCONN"); + final_rc = -ENOTCONN; goto cleanup_sigpipe; } - processed += result; - if (processed >= sizeof(struct qb_ipc_request_header) && hdr == NULL) { - hdr = (struct qb_ipc_request_header*)msg; - } - if (hdr) { - to_recv = hdr->size - processed; - } else { - to_recv = len - processed; - } - if (to_recv > 0) { - goto retry_recv; - } - final_rc = processed; + final_rc = result; ctl = (struct ipc_us_control *)one_way->u.us.shared_data; if (ctl) { (void)qb_atomic_int_dec_and_test(&ctl->sent); } - cleanup_sigpipe: +cleanup_sigpipe: qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT); return final_rc; } - static void qb_ipc_us_fc_set(struct qb_ipc_one_way *one_way, int32_t fc_enable) { struct ipc_us_control *ctl = (struct ipc_us_control *)one_way->u.us.shared_data; - qb_util_log(LOG_TRACE, "setting fc to %d", - fc_enable); + qb_util_log(LOG_TRACE, "setting fc to %d", fc_enable); qb_atomic_int_set(&ctl->flow_control, fc_enable); } static int32_t qb_ipc_us_fc_get(struct qb_ipc_one_way *one_way) { struct ipc_us_control *ctl = (struct ipc_us_control *)one_way->u.us.shared_data; return qb_atomic_int_get(&ctl->flow_control); } static ssize_t qb_ipc_us_q_len_get(struct qb_ipc_one_way *one_way) { struct ipc_us_control *ctl = (struct ipc_us_control *)one_way->u.us.shared_data; return qb_atomic_int_get(&ctl->sent); } -/* - * setup: - * send -> server - * recv <- server - * call us, we connect to the dgram sockets - */ int32_t -qb_ipcc_us_connect(struct qb_ipcc_connection *c, - struct qb_ipc_connection_response *r) +qb_ipcc_us_connect(struct qb_ipcc_connection * c, + struct qb_ipc_connection_response * r) { int32_t res; - struct qb_ipc_event_connection_request request; char path[PATH_MAX]; int32_t fd_hdr; - char * shm_ptr; + char *shm_ptr; qb_atomic_init(); c->needs_sock_for_poll = QB_FALSE; c->funcs.send = qb_ipc_socket_send; c->funcs.sendv = qb_ipc_socket_sendv; c->funcs.recv = qb_ipc_us_recv_at_most; c->funcs.fc_get = qb_ipc_us_fc_get; c->funcs.disconnect = qb_ipcc_us_disconnect; - c->request.u.us.sock = c->setup.u.us.sock; - c->response.u.us.sock = c->setup.u.us.sock; - c->setup.u.us.sock = -1; - fd_hdr = qb_sys_mmap_file_open(path, r->request, SHM_CONTROL_SIZE, O_RDWR); if (fd_hdr < 0) { res = fd_hdr; errno = -fd_hdr; qb_util_perror(LOG_ERR, "couldn't open file for mmap"); return res; } (void)strlcpy(c->request.u.us.shared_file_name, r->request, NAME_MAX); shm_ptr = mmap(0, SHM_CONTROL_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd_hdr, 0); if (shm_ptr == MAP_FAILED) { res = -errno; qb_util_perror(LOG_ERR, "couldn't create mmap for header"); goto cleanup_hdr; } c->request.u.us.shared_data = shm_ptr; c->response.u.us.shared_data = shm_ptr + sizeof(struct ipc_us_control); c->event.u.us.shared_data = shm_ptr + (2 * sizeof(struct ipc_us_control)); close(fd_hdr); - res = qb_ipcc_us_sock_connect(c->name, &c->event.u.us.sock); + res = qb_ipc_dgram_sock_connect(r->response, "response", "request", + r->max_msg_size, &c->request.u.us.sock); if (res != 0) { goto cleanup_hdr; } + c->response.u.us.sock = c->request.u.us.sock; - memset(&request, 0, sizeof(request)); - request.hdr.id = QB_IPC_MSG_NEW_EVENT_SOCK; - request.hdr.size = sizeof(request); - request.connection = r->connection; - res = qb_ipc_us_send(&c->event, &request, request.hdr.size); - if (res < 0) { - qb_ipcc_us_sock_close(c->event.u.us.sock); + res = qb_ipc_dgram_sock_connect(r->response, "event", "event-tx", + r->max_msg_size, &c->event.u.us.sock); + if (res != 0) { goto cleanup_hdr; } return 0; cleanup_hdr: close(fd_hdr); + close(c->event.u.us.sock); + close(c->request.u.us.sock); unlink(r->request); munmap(c->request.u.us.shared_data, SHM_CONTROL_SIZE); return res; } - /* * service functions * -------------------------------------------------------- */ +static int32_t +_sock_connection_liveliness(int32_t fd, int32_t revents, void *data) +{ + struct qb_ipcs_connection *c = (struct qb_ipcs_connection *)data; + + qb_util_log(LOG_DEBUG, "LIVENESS: fd %d event %d conn (%s)", + fd, revents, c->description); + if (revents & POLLNVAL) { + qb_util_log(LOG_DEBUG, "NVAL conn (%s)", c->description); + return -EINVAL; + } + if (revents & POLLHUP) { + qb_util_log(LOG_DEBUG, "HUP conn (%s)", c->description); + qb_ipcs_disconnect(c); + return -ESHUTDOWN; + } + return 0; +} + +static int32_t +_sock_add_to_mainloop(struct qb_ipcs_connection *c) +{ + int res; + + res = c->service->poll_fns.dispatch_add(c->service->poll_priority, + c->request.u.us.sock, + POLLIN | POLLPRI | POLLNVAL, + c, + qb_ipcs_dispatch_connection_request); + + if (res < 0) { + qb_util_log(LOG_ERR, + "Error adding socket to mainloop (%s).", + c->description); + return res; + } + qb_ipcs_connection_ref(c); + + res = c->service->poll_fns.dispatch_add(c->service->poll_priority, + c->setup.u.us.sock, + POLLIN | POLLPRI | POLLNVAL, + c, _sock_connection_liveliness); + qb_util_log(LOG_DEBUG, "added %d to poll loop (liveness)", + c->setup.u.us.sock); + if (res < 0) { + qb_util_perror(LOG_ERR, "Error adding setupfd to mainloop"); + (void)c->service->poll_fns.dispatch_del(c->request.u.us.sock); + return res; + } + qb_ipcs_connection_ref(c); + return res; +} + +static void +_sock_rm_from_mainloop(struct qb_ipcs_connection *c) +{ + (void)c->service->poll_fns.dispatch_del(c->request.u.us.sock); + qb_ipcs_connection_unref(c); + + (void)c->service->poll_fns.dispatch_del(c->setup.u.us.sock); + qb_ipcs_connection_unref(c); +} + static void qb_ipcs_us_disconnect(struct qb_ipcs_connection *c) { qb_enter(); - munmap(c->request.u.us.shared_data, SHM_CONTROL_SIZE); - unlink(c->request.u.us.shared_file_name); - qb_ipcc_us_sock_close(c->request.u.us.sock); - qb_ipcc_us_sock_close(c->event.u.us.sock); + if (c->state == QB_IPCS_CONNECTION_ESTABLISHED || + c->state == QB_IPCS_CONNECTION_ACTIVE) { + _sock_rm_from_mainloop(c); + + qb_ipcc_us_sock_close(c->setup.u.us.sock); + qb_ipcc_us_sock_close(c->request.u.us.sock); + qb_ipcc_us_sock_close(c->event.u.us.sock); + } + if (c->state == QB_IPCS_CONNECTION_SHUTTING_DOWN || + c->state == QB_IPCS_CONNECTION_ACTIVE) { + munmap(c->request.u.us.shared_data, SHM_CONTROL_SIZE); + unlink(c->request.u.us.shared_file_name); + } } static int32_t qb_ipcs_us_connect(struct qb_ipcs_service *s, struct qb_ipcs_connection *c, struct qb_ipc_connection_response *r) { char path[PATH_MAX]; int32_t fd_hdr; int32_t res = 0; struct ipc_us_control *ctl; - char * shm_ptr; + char *shm_ptr; + + qb_util_log(LOG_DEBUG, "connecting to client (%s)", c->description); - qb_util_log(LOG_DEBUG, "connecting to client (%s)", - c->description); + c->request.u.us.sock = c->setup.u.us.sock; + c->response.u.us.sock = c->setup.u.us.sock; snprintf(r->request, NAME_MAX, "qb-%s-control-%s", s->name, c->description); + snprintf(r->response, NAME_MAX, "qb-%s-%s", s->name, c->description); fd_hdr = qb_sys_mmap_file_open(path, r->request, SHM_CONTROL_SIZE, O_CREAT | O_TRUNC | O_RDWR); if (fd_hdr < 0) { res = fd_hdr; errno = -fd_hdr; qb_util_perror(LOG_ERR, "couldn't create file for mmap (%s)", c->description); return res; } (void)strlcpy(r->request, path, PATH_MAX); (void)strlcpy(c->request.u.us.shared_file_name, r->request, NAME_MAX); res = chown(r->request, c->auth.uid, c->auth.gid); if (res != 0) { /* ignore res, this is just for the compiler warnings. */ res = 0; } res = chmod(r->request, c->auth.mode); if (res != 0) { /* ignore res, this is just for the compiler warnings. - */ + */ res = 0; } shm_ptr = mmap(0, SHM_CONTROL_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd_hdr, 0); if (shm_ptr == MAP_FAILED) { res = -errno; qb_util_perror(LOG_ERR, "couldn't create mmap for header (%s)", c->description); goto cleanup_hdr; } c->request.u.us.shared_data = shm_ptr; c->response.u.us.shared_data = shm_ptr + sizeof(struct ipc_us_control); c->event.u.us.shared_data = shm_ptr + (2 * sizeof(struct ipc_us_control)); ctl = (struct ipc_us_control *)c->request.u.us.shared_data; ctl->sent = 0; ctl->flow_control = 0; ctl = (struct ipc_us_control *)c->response.u.us.shared_data; ctl->sent = 0; ctl->flow_control = 0; ctl = (struct ipc_us_control *)c->event.u.us.shared_data; ctl->sent = 0; ctl->flow_control = 0; close(fd_hdr); + + /* request channel */ + res = qb_ipc_dgram_sock_setup(r->response, "request", + &c->request.u.us.sock); + if (res < 0) { + goto cleanup_hdr; + } + c->setup.u.us.sock_name = NULL; + c->request.u.us.sock_name = NULL; + + /* response channel */ + c->response.u.us.sock = c->request.u.us.sock; + snprintf(path, PATH_MAX, "%s-%s", r->response, "response"); + c->response.u.us.sock_name = strdup(path); + + /* event channel */ + res = qb_ipc_dgram_sock_setup(r->response, "event-tx", + &c->event.u.us.sock); + if (res < 0) { + goto cleanup_hdr; + } + snprintf(path, PATH_MAX, "%s-%s", r->response, "event"); + c->event.u.us.sock_name = strdup(path); + + res = _sock_add_to_mainloop(c); + if (res < 0) { + goto cleanup_hdr; + } + return res; cleanup_hdr: + free(c->response.u.us.sock_name); + free(c->event.u.us.sock_name); + close(fd_hdr); unlink(r->request); munmap(c->request.u.us.shared_data, SHM_CONTROL_SIZE); return res; } - void qb_ipcs_us_init(struct qb_ipcs_service *s) { s->funcs.connect = qb_ipcs_us_connect; s->funcs.disconnect = qb_ipcs_us_disconnect; s->funcs.recv = qb_ipc_us_recv_at_most; s->funcs.peek = NULL; s->funcs.reclaim = NULL; s->funcs.send = qb_ipc_socket_send; s->funcs.sendv = qb_ipc_socket_sendv; s->funcs.fc_set = qb_ipc_us_fc_set; s->funcs.q_len_get = qb_ipc_us_q_len_get; s->needs_sock_for_poll = QB_FALSE; qb_atomic_init(); } diff --git a/lib/ipcc.c b/lib/ipcc.c index 1ec1fc0..1f0d551 100644 --- a/lib/ipcc.c +++ b/lib/ipcc.c @@ -1,421 +1,421 @@ /* * Copyright (C) 2010 Red Hat, Inc. * * Author: Angus Salkeld * * This file is part of libqb. * * libqb is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation, either version 2.1 of the License, or * (at your option) any later version. * * libqb is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with libqb. If not, see . */ #include "os_base.h" #include "ipc_int.h" #include "util_int.h" #include #include qb_ipcc_connection_t * qb_ipcc_connect(const char *name, size_t max_msg_size) { int32_t res; qb_ipcc_connection_t *c = NULL; struct qb_ipc_connection_response response; c = calloc(1, sizeof(struct qb_ipcc_connection)); if (c == NULL) { return NULL; } c->setup.max_msg_size = QB_MAX(max_msg_size, sizeof(struct qb_ipc_connection_response)); (void)strlcpy(c->name, name, NAME_MAX); res = qb_ipcc_us_setup_connect(c, &response); if (res < 0) { goto disconnect_and_cleanup; } c->response.type = response.connection_type; c->request.type = response.connection_type; c->event.type = response.connection_type; c->setup.type = response.connection_type; c->response.max_msg_size = response.max_msg_size; c->request.max_msg_size = response.max_msg_size; c->event.max_msg_size = response.max_msg_size; c->receive_buf = calloc(1, response.max_msg_size); c->fc_enable_max = 1; if (c->receive_buf == NULL) { res = -ENOMEM; goto disconnect_and_cleanup; } switch (c->request.type) { case QB_IPC_SHM: res = qb_ipcc_shm_connect(c, &response); break; case QB_IPC_SOCKET: res = qb_ipcc_us_connect(c, &response); break; case QB_IPC_POSIX_MQ: case QB_IPC_SYSV_MQ: res = -ENOTSUP; break; default: res = -EINVAL; break; } if (res != 0) { goto disconnect_and_cleanup; } c->is_connected = QB_TRUE; return c; disconnect_and_cleanup: qb_ipcc_us_sock_close(c->setup.u.us.sock); free(c->receive_buf); free(c); errno = -res; return NULL; } static void _check_connection_state(struct qb_ipcc_connection * c, int32_t res) { if (res >= 0) return; if (qb_ipc_us_sock_error_is_disconnected(res)) { errno = -res; qb_util_perror(LOG_DEBUG, "interpreting result %d as a disconnect", res); c->is_connected = QB_FALSE; } } static struct qb_ipc_one_way * _event_sock_one_way_get(struct qb_ipcc_connection * c) { if (c->needs_sock_for_poll) { return &c->setup; } if (c->event.type == QB_IPC_SOCKET) { return &c->event; } return NULL; } static struct qb_ipc_one_way * _response_sock_one_way_get(struct qb_ipcc_connection * c) { if (c->needs_sock_for_poll) { return &c->setup; } if (c->response.type == QB_IPC_SOCKET) { return &c->response; } return NULL; } ssize_t qb_ipcc_send(struct qb_ipcc_connection * c, const void *msg_ptr, size_t msg_len) { ssize_t res; ssize_t res2; if (c == NULL) { return -EINVAL; } if (msg_len > c->request.max_msg_size) { return -EMSGSIZE; } if (c->funcs.fc_get) { res = c->funcs.fc_get(&c->request); if (res < 0) { return res; } else if (res > 0 && res <= c->fc_enable_max) { return -EAGAIN; } else { /* * we can transmit */ } } res = c->funcs.send(&c->request, msg_ptr, msg_len); if (res == msg_len && c->needs_sock_for_poll) { do { res2 = qb_ipc_us_send(&c->setup, msg_ptr, 1); } while (res2 == -EAGAIN); if (res2 == -EPIPE) { res2 = -ENOTCONN; } if (res2 != 1) { res = res2; } } _check_connection_state(c, res); return res; } int32_t qb_ipcc_fc_enable_max_set(struct qb_ipcc_connection * c, uint32_t max) { if (c == NULL || max > 2) { return -EINVAL; } c->fc_enable_max = max; return 0; } ssize_t qb_ipcc_sendv(struct qb_ipcc_connection * c, const struct iovec * iov, size_t iov_len) { int32_t total_size = 0; int32_t i; int32_t res; int32_t res2; for (i = 0; i < iov_len; i++) { total_size += iov[i].iov_len; } if (c == NULL) { return -EINVAL; } if (total_size > c->request.max_msg_size) { return -EMSGSIZE; } if (c->funcs.fc_get) { res = c->funcs.fc_get(&c->request); if (res < 0) { return res; } else if (res > 0 && res <= c->fc_enable_max) { return -EAGAIN; } else { /* * we can transmit */ } } res = c->funcs.sendv(&c->request, iov, iov_len); if (res > 0 && c->needs_sock_for_poll) { do { res2 = qb_ipc_us_send(&c->setup, &res, 1); } while (res2 == -EAGAIN); if (res2 == -EPIPE) { res2 = -ENOTCONN; } if (res2 != 1) { res = res2; } } _check_connection_state(c, res); return res; } ssize_t qb_ipcc_recv(struct qb_ipcc_connection * c, void *msg_ptr, size_t msg_len, int32_t ms_timeout) { int32_t res = 0; int32_t res2 = 0; int32_t poll_ms = 0; if (c == NULL) { return -EINVAL; } res = c->funcs.recv(&c->response, msg_ptr, msg_len, ms_timeout); if (res == -EAGAIN || res == -ETIMEDOUT) { struct qb_ipc_one_way *ow = _response_sock_one_way_get(c); - if (ow == NULL) return res; if (res == -EAGAIN) poll_ms = ms_timeout; - res2 = qb_ipc_us_ready(ow, poll_ms, POLLIN); + res2 = qb_ipc_us_ready(ow, &c->setup, poll_ms, POLLIN); if (res2 < 0) { res = res2; } } _check_connection_state(c, res); return res; } ssize_t qb_ipcc_sendv_recv(qb_ipcc_connection_t * c, const struct iovec * iov, uint32_t iov_len, void *res_msg, size_t res_len, int32_t ms_timeout) { ssize_t res = 0; int32_t timeout_now; int32_t timeout_rem = ms_timeout; if (c == NULL) { return -EINVAL; } if (c->funcs.fc_get) { res = c->funcs.fc_get(&c->request); if (res < 0) { return res; } else if (res > 0 && res <= c->fc_enable_max) { return -EAGAIN; } else { /* * we can transmit */ } } res = qb_ipcc_sendv(c, iov, iov_len); if (res < 0) { return res; } do { if (timeout_rem > QB_IPC_MAX_WAIT_MS || ms_timeout == -1) { timeout_now = QB_IPC_MAX_WAIT_MS; } else { timeout_now = timeout_rem; } res = qb_ipcc_recv(c, res_msg, res_len, timeout_now); if (res == -ETIMEDOUT) { if (ms_timeout < 0) { res = -EAGAIN; } else { timeout_rem -= timeout_now; if (timeout_rem > 0) { res = -EAGAIN; } } } else if (res < 0 && res != -EAGAIN) { errno = -res; qb_util_perror(LOG_DEBUG, "qb_ipcc_recv %d timeout:(%d/%d)", res, timeout_now, timeout_rem); } } while (res == -EAGAIN && c->is_connected); return res; } int32_t qb_ipcc_fd_get(struct qb_ipcc_connection * c, int32_t * fd) { if (c == NULL) { return -EINVAL; } if (c->event.type == QB_IPC_SOCKET) { *fd = c->event.u.us.sock; } else { *fd = c->setup.u.us.sock; } return 0; } ssize_t qb_ipcc_event_recv(struct qb_ipcc_connection * c, void *msg_pt, size_t msg_len, int32_t ms_timeout) { char one_byte = 1; int32_t res; ssize_t size; struct qb_ipc_one_way *ow = NULL; if (c == NULL) { return -EINVAL; } ow = _event_sock_one_way_get(c); if (ow) { - res = qb_ipc_us_ready(ow, ms_timeout, POLLIN); + res = qb_ipc_us_ready(ow, &c->setup, + ms_timeout, POLLIN); if (res < 0) { _check_connection_state(c, res); return res; } } size = c->funcs.recv(&c->event, msg_pt, msg_len, ms_timeout); if (size < 0) { _check_connection_state(c, size); return size; } if (c->needs_sock_for_poll) { res = qb_ipc_us_recv(&c->setup, &one_byte, 1, -1); if (res < 0) { _check_connection_state(c, res); return res; } } return size; } void qb_ipcc_disconnect(struct qb_ipcc_connection *c) { struct qb_ipc_one_way *ow = NULL; int32_t res = 0; qb_util_log(LOG_DEBUG, "%s()", __func__); if (c == NULL) { return; } ow = _event_sock_one_way_get(c); if (ow) { - res = qb_ipc_us_ready(ow, 0, POLLIN); + res = qb_ipc_us_ready(ow, &c->setup, 0, POLLIN); _check_connection_state(c, res); qb_ipcc_us_sock_close(ow->u.us.sock); } if (c->funcs.disconnect) { c->funcs.disconnect(c); } free(c->receive_buf); free(c); } void qb_ipcc_context_set(struct qb_ipcc_connection *c, void *context) { if (c == NULL) { return; } c->context = context; } void *qb_ipcc_context_get(struct qb_ipcc_connection *c) { if (c == NULL) { return NULL; } return c->context; } int32_t qb_ipcc_is_connected(qb_ipcc_connection_t *c) { struct qb_ipc_one_way *ow; if (c == NULL) { return QB_FALSE; } ow = _response_sock_one_way_get(c); if (ow) { - _check_connection_state(c, qb_ipc_us_ready(ow, 0, POLLIN)); + _check_connection_state(c, qb_ipc_us_ready(ow, &c->setup, 0, POLLIN)); } return c->is_connected; } diff --git a/lib/ipcs.c b/lib/ipcs.c index 0fd8c59..e18f271 100644 --- a/lib/ipcs.c +++ b/lib/ipcs.c @@ -1,878 +1,876 @@ /* * Copyright (C) 2010 Red Hat, Inc. * * Author: Angus Salkeld * * This file is part of libqb. * * libqb is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation, either version 2.1 of the License, or * (at your option) any later version. * * libqb is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with libqb. If not, see . */ #include "os_base.h" #include "util_int.h" #include "ipc_int.h" #include #include #include static void qb_ipcs_flowcontrol_set(struct qb_ipcs_connection *c, int32_t fc_enable); static int32_t new_event_notification(struct qb_ipcs_connection * c); static QB_LIST_DECLARE(qb_ipc_services); qb_ipcs_service_t * qb_ipcs_create(const char *name, int32_t service_id, enum qb_ipc_type type, struct qb_ipcs_service_handlers *handlers) { struct qb_ipcs_service *s; s = calloc(1, sizeof(struct qb_ipcs_service)); if (s == NULL) { return NULL; } if (type == QB_IPC_NATIVE) { #ifdef DISABLE_IPC_SHM s->type = QB_IPC_SOCKET; #else s->type = QB_IPC_SHM; #endif /* DISABLE_IPC_SHM */ } else { s->type = type; } s->pid = getpid(); s->needs_sock_for_poll = QB_FALSE; s->poll_priority = QB_LOOP_MED; s->ref_count = 1; s->service_id = service_id; (void)strlcpy(s->name, name, NAME_MAX); s->serv_fns.connection_accept = handlers->connection_accept; s->serv_fns.connection_created = handlers->connection_created; s->serv_fns.msg_process = handlers->msg_process; s->serv_fns.connection_closed = handlers->connection_closed; s->serv_fns.connection_destroyed = handlers->connection_destroyed; qb_list_init(&s->connections); qb_list_init(&s->list); qb_list_add(&s->list, &qb_ipc_services); return s; } void qb_ipcs_poll_handlers_set(struct qb_ipcs_service *s, struct qb_ipcs_poll_handlers *handlers) { s->poll_fns.job_add = handlers->job_add; s->poll_fns.dispatch_add = handlers->dispatch_add; s->poll_fns.dispatch_mod = handlers->dispatch_mod; s->poll_fns.dispatch_del = handlers->dispatch_del; } int32_t qb_ipcs_run(struct qb_ipcs_service *s) { int32_t res = 0; if (s->poll_fns.dispatch_add == NULL || s->poll_fns.dispatch_mod == NULL || s->poll_fns.dispatch_del == NULL) { return -EINVAL; } switch (s->type) { case QB_IPC_SOCKET: qb_ipcs_us_init((struct qb_ipcs_service *)s); break; case QB_IPC_SHM: #ifdef DISABLE_IPC_SHM res = -ENOTSUP; #else qb_ipcs_shm_init((struct qb_ipcs_service *)s); #endif /* DISABLE_IPC_SHM */ break; case QB_IPC_POSIX_MQ: case QB_IPC_SYSV_MQ: res = -ENOTSUP; break; default: res = -EINVAL; break; } if (res < 0) { qb_ipcs_unref(s); return res; } res = qb_ipcs_us_publish(s); if (res < 0) { (void)qb_ipcs_us_withdraw(s); qb_ipcs_unref(s); return res; } return res; } static int32_t _modify_dispatch_descriptor_(struct qb_ipcs_connection *c) { qb_ipcs_dispatch_mod_fn disp_mod = c->service->poll_fns.dispatch_mod; if (c->service->type == QB_IPC_SOCKET) { return disp_mod(c->service->poll_priority, c->event.u.us.sock, c->poll_events, c, qb_ipcs_dispatch_connection_request); } else { return disp_mod(c->service->poll_priority, c->setup.u.us.sock, c->poll_events, c, qb_ipcs_dispatch_connection_request); } return -EINVAL; } void qb_ipcs_request_rate_limit(struct qb_ipcs_service *s, enum qb_ipcs_rate_limit rl) { struct qb_ipcs_connection *c; enum qb_loop_priority old_p = s->poll_priority; struct qb_list_head *pos; struct qb_list_head *n; switch (rl) { case QB_IPCS_RATE_FAST: s->poll_priority = QB_LOOP_HIGH; break; case QB_IPCS_RATE_SLOW: case QB_IPCS_RATE_OFF: case QB_IPCS_RATE_OFF_2: s->poll_priority = QB_LOOP_LOW; break; default: case QB_IPCS_RATE_NORMAL: s->poll_priority = QB_LOOP_MED; break; } qb_list_for_each_safe(pos, n, &s->connections) { c = qb_list_entry(pos, struct qb_ipcs_connection, list); qb_ipcs_connection_ref(c); if (rl == QB_IPCS_RATE_OFF) { qb_ipcs_flowcontrol_set(c, 1); } else if (rl == QB_IPCS_RATE_OFF_2) { qb_ipcs_flowcontrol_set(c, 2); } else { qb_ipcs_flowcontrol_set(c, QB_FALSE); } if (old_p != s->poll_priority) { (void)_modify_dispatch_descriptor_(c); } qb_ipcs_connection_unref(c); } } void qb_ipcs_ref(struct qb_ipcs_service *s) { qb_atomic_int_inc(&s->ref_count); } void qb_ipcs_unref(struct qb_ipcs_service *s) { int32_t free_it; struct qb_ipcs_connection *c = NULL; struct qb_list_head *pos; struct qb_list_head *n; assert(s->ref_count > 0); free_it = qb_atomic_int_dec_and_test(&s->ref_count); if (free_it) { qb_util_log(LOG_DEBUG, "%s() - destroying", __func__); qb_list_for_each_safe(pos, n, &s->connections) { c = qb_list_entry(pos, struct qb_ipcs_connection, list); if (c == NULL) { continue; } qb_ipcs_disconnect(c); } (void)qb_ipcs_us_withdraw(s); free(s); } } void qb_ipcs_destroy(struct qb_ipcs_service *s) { qb_ipcs_unref(s); } /* * connection API */ static struct qb_ipc_one_way * _event_sock_one_way_get(struct qb_ipcs_connection * c) { if (c->service->needs_sock_for_poll) { return &c->setup; } if (c->event.type == QB_IPC_SOCKET) { return &c->event; } return NULL; } static struct qb_ipc_one_way * _response_sock_one_way_get(struct qb_ipcs_connection * c) { if (c->service->needs_sock_for_poll) { return &c->setup; } if (c->response.type == QB_IPC_SOCKET) { return &c->response; } return NULL; } ssize_t qb_ipcs_response_send(struct qb_ipcs_connection *c, const void *data, size_t size) { ssize_t res; if (c == NULL) { return -EINVAL; } qb_ipcs_connection_ref(c); res = c->service->funcs.send(&c->response, data, size); if (res == size) { c->stats.responses++; } else if (res == -EAGAIN || res == -ETIMEDOUT) { struct qb_ipc_one_way *ow = _response_sock_one_way_get(c); if (ow) { - ssize_t res2 = qb_ipc_us_ready(ow, 0, POLLOUT); + ssize_t res2 = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT); if (res2 < 0) { res = res2; } } c->stats.send_retries++; } qb_ipcs_connection_unref(c); return res; } ssize_t qb_ipcs_response_sendv(struct qb_ipcs_connection * c, const struct iovec * iov, size_t iov_len) { ssize_t res; if (c == NULL) { return -EINVAL; } qb_ipcs_connection_ref(c); res = c->service->funcs.sendv(&c->response, iov, iov_len); if (res > 0) { c->stats.responses++; } else if (res == -EAGAIN || res == -ETIMEDOUT) { struct qb_ipc_one_way *ow = _response_sock_one_way_get(c); if (ow) { - ssize_t res2 = qb_ipc_us_ready(ow, 0, POLLOUT); + ssize_t res2 = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT); if (res2 < 0) { res = res2; } } c->stats.send_retries++; } qb_ipcs_connection_unref(c); return res; } static int32_t resend_event_notifications(struct qb_ipcs_connection *c) { ssize_t res = 0; if (c->outstanding_notifiers > 0) { res = qb_ipc_us_send(&c->setup, c->receive_buf, c->outstanding_notifiers); } if (res > 0) { c->outstanding_notifiers -= res; } assert(c->outstanding_notifiers >= 0); if (c->outstanding_notifiers == 0) { c->poll_events = POLLIN | POLLPRI | POLLNVAL; (void)_modify_dispatch_descriptor_(c); } return res; } static int32_t new_event_notification(struct qb_ipcs_connection * c) { ssize_t res = 0; if (!c->service->needs_sock_for_poll) { return res; } assert(c->outstanding_notifiers >= 0); if (c->outstanding_notifiers > 0) { c->outstanding_notifiers++; } else { res = qb_ipc_us_send(&c->setup, &c->outstanding_notifiers, 1); if (res == -EAGAIN) { /* * notify the client later, when we can. */ c->outstanding_notifiers++; c->poll_events = POLLOUT | POLLIN | POLLPRI | POLLNVAL; (void)_modify_dispatch_descriptor_(c); } } return res; } ssize_t qb_ipcs_event_send(struct qb_ipcs_connection * c, const void *data, size_t size) { ssize_t res; ssize_t resn; if (c == NULL) { return -EINVAL; } qb_ipcs_connection_ref(c); if (size > c->event.max_msg_size) { return -EMSGSIZE; } res = c->service->funcs.send(&c->event, data, size); if (res == size) { c->stats.events++; resn = new_event_notification(c); if (resn < 0 && resn != -EAGAIN) { errno = -resn; qb_util_perror(LOG_WARNING, "new_event_notification (%s)", c->description); res = resn; } } else if (res == -EAGAIN || res == -ETIMEDOUT) { struct qb_ipc_one_way *ow = _event_sock_one_way_get(c); if (ow) { - resn = qb_ipc_us_ready(ow, 0, POLLOUT); + resn = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT); if (resn < 0) { res = resn; } } c->stats.send_retries++; } qb_ipcs_connection_unref(c); return res; } ssize_t qb_ipcs_event_sendv(struct qb_ipcs_connection * c, const struct iovec * iov, size_t iov_len) { ssize_t res; ssize_t resn; if (c == NULL) { return -EINVAL; } qb_ipcs_connection_ref(c); res = c->service->funcs.sendv(&c->event, iov, iov_len); if (res > 0) { c->stats.events++; resn = new_event_notification(c); if (resn < 0 && resn != -EAGAIN) { errno = -resn; qb_util_perror(LOG_WARNING, "new_event_notification (%s)", c->description); res = resn; } } else if (res == -EAGAIN || res == -ETIMEDOUT) { struct qb_ipc_one_way *ow = _event_sock_one_way_get(c); if (ow) { - resn = qb_ipc_us_ready(ow, 0, POLLOUT); + resn = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT); if (resn < 0) { res = resn; } } c->stats.send_retries++; } qb_ipcs_connection_unref(c); return res; } qb_ipcs_connection_t * qb_ipcs_connection_first_get(struct qb_ipcs_service * s) { struct qb_ipcs_connection *c; if (qb_list_empty(&s->connections)) { return NULL; } - c = qb_list_first_entry(&s->connections, struct qb_ipcs_connection, list); + c = qb_list_first_entry(&s->connections, struct qb_ipcs_connection, + list); qb_ipcs_connection_ref(c); return c; } qb_ipcs_connection_t * qb_ipcs_connection_next_get(struct qb_ipcs_service * s, struct qb_ipcs_connection * current) { struct qb_ipcs_connection *c; if (current == NULL || qb_list_is_last(¤t->list, &s->connections)) { return NULL; } - c = qb_list_first_entry(¤t->list, struct qb_ipcs_connection, list); + c = qb_list_first_entry(¤t->list, struct qb_ipcs_connection, + list); qb_ipcs_connection_ref(c); return c; } int32_t qb_ipcs_service_id_get(struct qb_ipcs_connection * c) { if (c == NULL) { return -EINVAL; } return c->service->service_id; } struct qb_ipcs_connection * qb_ipcs_connection_alloc(struct qb_ipcs_service *s) { struct qb_ipcs_connection *c = calloc(1, sizeof(struct qb_ipcs_connection)); if (c == NULL) { return NULL; } c->refcount = 1; c->service = s; c->pid = 0; c->euid = -1; c->egid = -1; qb_list_init(&c->list); c->receive_buf = NULL; c->context = NULL; c->fc_enabled = QB_FALSE; c->state = QB_IPCS_CONNECTION_INACTIVE; c->poll_events = POLLIN | POLLPRI | POLLNVAL; c->setup.type = s->type; c->request.type = s->type; c->response.type = s->type; c->event.type = s->type; (void)strlcpy(c->description, "not set yet", CONNECTION_DESCRIPTION); return c; } void qb_ipcs_connection_ref(struct qb_ipcs_connection *c) { if (c) { qb_atomic_int_inc(&c->refcount); } } void qb_ipcs_connection_unref(struct qb_ipcs_connection *c) { int32_t free_it; if (c == NULL) { return; } if (c->refcount < 1) { qb_util_log(LOG_ERR, "ref:%d state:%d (%s)", - c->refcount, c->state, - c->description); + c->refcount, c->state, c->description); assert(0); } free_it = qb_atomic_int_dec_and_test(&c->refcount); if (free_it) { qb_list_del(&c->list); if (c->service->serv_fns.connection_destroyed) { c->service->serv_fns.connection_destroyed(c); } c->service->funcs.disconnect(c); free(c->receive_buf); free(c); } } void qb_ipcs_disconnect(struct qb_ipcs_connection *c) { int32_t res = 0; qb_loop_job_dispatch_fn rerun_job; if (c == NULL) { return; } qb_util_log(LOG_DEBUG, "%s(%s) state:%d", __func__, c->description, c->state); if (c->state == QB_IPCS_CONNECTION_ACTIVE) { + c->service->funcs.disconnect(c); c->state = QB_IPCS_CONNECTION_INACTIVE; c->service->stats.closed_connections++; - - qb_ipcs_sockets_disconnect(c); /* return early as it's an incomplete connection. */ return; } if (c->state == QB_IPCS_CONNECTION_ESTABLISHED) { + c->service->funcs.disconnect(c); c->state = QB_IPCS_CONNECTION_SHUTTING_DOWN; c->service->stats.active_connections--; c->service->stats.closed_connections++; - - qb_ipcs_sockets_disconnect(c); } if (c->state == QB_IPCS_CONNECTION_SHUTTING_DOWN) { res = 0; if (c->service->serv_fns.connection_closed) { res = c->service->serv_fns.connection_closed(c); } if (res == 0) { qb_ipcs_connection_unref(c); } else { /* OK, so they want the connection_closed * function re-run */ rerun_job = (qb_loop_job_dispatch_fn) qb_ipcs_disconnect; res = c->service->poll_fns.job_add(QB_LOOP_LOW, - c, - rerun_job); + c, rerun_job); if (res != 0) { /* last ditch attempt to cleanup */ qb_ipcs_connection_unref(c); } } } } static void qb_ipcs_flowcontrol_set(struct qb_ipcs_connection *c, int32_t fc_enable) { if (c == NULL) { return; } if (c->fc_enabled != fc_enable) { c->service->funcs.fc_set(&c->request, fc_enable); c->fc_enabled = fc_enable; c->stats.flow_control_state = fc_enable; c->stats.flow_control_count++; } } static int32_t _process_request_(struct qb_ipcs_connection *c, int32_t ms_timeout) { int32_t res = 0; ssize_t size; struct qb_ipc_request_header *hdr; qb_ipcs_connection_ref(c); if (c->service->funcs.peek && c->service->funcs.reclaim) { size = c->service->funcs.peek(&c->request, (void **)&hdr, ms_timeout); } else { hdr = c->receive_buf; size = c->service->funcs.recv(&c->request, hdr, c->request.max_msg_size, ms_timeout); } if (size < 0) { if (size != -EAGAIN && size != -ETIMEDOUT) { qb_util_perror(LOG_DEBUG, "recv from client connection failed (%s)", c->description); } else { c->stats.recv_retries++; } res = size; goto cleanup; } else if (size == 0 || hdr->id == QB_IPC_MSG_DISCONNECT) { qb_util_log(LOG_DEBUG, "client requesting a disconnect (%s)", c->description); qb_ipcs_disconnect(c); c = NULL; res = -ESHUTDOWN; } else { c->stats.requests++; res = c->service->serv_fns.msg_process(c, hdr, hdr->size); /* 0 == good, negative == backoff */ if (res < 0) { res = -ENOBUFS; } else { res = size; } } if (c && c->service->funcs.peek && c->service->funcs.reclaim) { c->service->funcs.reclaim(&c->request); } cleanup: qb_ipcs_connection_unref(c); return res; } #define IPC_REQUEST_TIMEOUT 10 #define MAX_RECV_MSGS 50 int32_t qb_ipcs_dispatch_service_request(int32_t fd, int32_t revents, void *data) { int32_t res = _process_request_((struct qb_ipcs_connection *)data, IPC_REQUEST_TIMEOUT); if (res > 0) { return 0; } return res; } static ssize_t _request_q_len_get(struct qb_ipcs_connection *c) { ssize_t q_len; if (c->service->funcs.q_len_get) { q_len = c->service->funcs.q_len_get(&c->request); if (q_len <= 0) { return q_len; } if (c->service->poll_priority == QB_LOOP_MED) { q_len = QB_MIN(q_len, 5); } else if (c->service->poll_priority == QB_LOOP_LOW) { q_len = 1; } else { q_len = QB_MIN(q_len, MAX_RECV_MSGS); } } else { q_len = 1; } return q_len; } int32_t qb_ipcs_dispatch_connection_request(int32_t fd, int32_t revents, void *data) { struct qb_ipcs_connection *c = (struct qb_ipcs_connection *)data; char bytes[MAX_RECV_MSGS]; int32_t res; int32_t res2; int32_t recvd = 0; ssize_t avail; if (revents & POLLNVAL) { qb_util_log(LOG_DEBUG, "NVAL conn (%s)", c->description); return -EINVAL; } if (revents & POLLHUP) { qb_util_log(LOG_DEBUG, "HUP conn (%s)", c->description); qb_ipcs_disconnect(c); return -ESHUTDOWN; } if (revents & POLLOUT) { res = resend_event_notifications(c); if (res < 0 && res != -EAGAIN) { errno = -res; qb_util_perror(LOG_WARNING, "resend_event_notifications (%s)", c->description); } if ((revents & POLLIN) == 0) { return 0; } } if (c->fc_enabled) { return 0; } avail = _request_q_len_get(c); if (c->service->needs_sock_for_poll && avail == 0) { res2 = qb_ipc_us_recv(&c->setup, bytes, 1, 0); if (qb_ipc_us_sock_error_is_disconnected(res2)) { errno = -res2; qb_util_perror(LOG_WARNING, "conn (%s) disconnected", c->description); qb_ipcs_disconnect(c); return -ESHUTDOWN; } else { qb_util_log(LOG_WARNING, "conn (%s) Nothing in q but got POLLIN on fd:%d (res2:%d)", c->description, fd, res2); return 0; } } do { res = _process_request_(c, IPC_REQUEST_TIMEOUT); if (res > 0 || res == -ENOBUFS || res == -EINVAL) { recvd++; } if (res > 0) { avail--; } } while (avail > 0 && res > 0 && !c->fc_enabled); if (c->service->needs_sock_for_poll && recvd > 0) { res2 = qb_ipc_us_recv(&c->setup, bytes, recvd, -1); if (res2 < 0) { errno = -res2; qb_util_perror(LOG_ERR, "error receiving from setup sock (%s)", c->description); } } res = QB_MIN(0, res); if (res == -EAGAIN || res == -ETIMEDOUT || res == -ENOBUFS) { res = 0; } if (res != 0) { if (res != -ENOTCONN) { /* * Abnormal state (ENOTCONN is normal shutdown). */ errno = -res; qb_util_perror(LOG_ERR, "request returned error (%s)", c->description); } qb_ipcs_connection_unref(c); } return res; } void qb_ipcs_context_set(struct qb_ipcs_connection *c, void *context) { if (c == NULL) { return; } c->context = context; } void * qb_ipcs_context_get(struct qb_ipcs_connection *c) { if (c == NULL) { return NULL; } return c->context; } int32_t qb_ipcs_connection_stats_get(qb_ipcs_connection_t * c, struct qb_ipcs_connection_stats * stats, int32_t clear_after_read) { if (c == NULL) { return -EINVAL; } memcpy(stats, &c->stats, sizeof(struct qb_ipcs_connection_stats)); if (clear_after_read) { memset(&c->stats, 0, sizeof(struct qb_ipcs_connection_stats_2)); c->stats.client_pid = c->pid; } return 0; } struct qb_ipcs_connection_stats_2* qb_ipcs_connection_stats_get_2(qb_ipcs_connection_t *c, int32_t clear_after_read) { - struct qb_ipcs_connection_stats_2* stats; + struct qb_ipcs_connection_stats_2 * stats; if (c == NULL) { errno = EINVAL; return NULL; } stats = calloc(1, sizeof(struct qb_ipcs_connection_stats_2)); if (stats == NULL) { return NULL; } memcpy(stats, &c->stats, sizeof(struct qb_ipcs_connection_stats_2)); if (c->service->funcs.q_len_get) { stats->event_q_length = c->service->funcs.q_len_get(&c->event); } else { stats->event_q_length = 0; } if (clear_after_read) { memset(&c->stats, 0, sizeof(struct qb_ipcs_connection_stats_2)); c->stats.client_pid = c->pid; } return stats; } int32_t qb_ipcs_stats_get(struct qb_ipcs_service * s, struct qb_ipcs_stats * stats, int32_t clear_after_read) { if (s == NULL) { return -EINVAL; } memcpy(stats, &s->stats, sizeof(struct qb_ipcs_stats)); if (clear_after_read) { memset(&s->stats, 0, sizeof(struct qb_ipcs_stats)); } return 0; } void qb_ipcs_connection_auth_set(qb_ipcs_connection_t *c, uid_t uid, gid_t gid, mode_t mode) { if (c) { c->auth.uid = uid; c->auth.gid = gid; c->auth.mode = mode; } } diff --git a/tests/check_ipc.c b/tests/check_ipc.c index c26e0f1..0624ae5 100644 --- a/tests/check_ipc.c +++ b/tests/check_ipc.c @@ -1,1022 +1,1026 @@ /* * Copyright (c) 2010 Red Hat, Inc. * * All rights reserved. * * Author: Angus Salkeld * * This file is part of libqb. * * libqb is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation, either version 2.1 of the License, or * (at your option) any later version. * * libqb is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with libqb. If not, see . */ #include "os_base.h" #include #include #include #include #include #include #include #include static const char *ipc_name = "ipc_test"; #define MAX_MSG_SIZE (8192*16) static qb_ipcc_connection_t *conn; static enum qb_ipc_type ipc_type; enum my_msg_ids { IPC_MSG_REQ_TX_RX, IPC_MSG_RES_TX_RX, IPC_MSG_REQ_DISPATCH, IPC_MSG_RES_DISPATCH, IPC_MSG_REQ_BULK_EVENTS, IPC_MSG_RES_BULK_EVENTS, IPC_MSG_REQ_SERVER_FAIL, IPC_MSG_RES_SERVER_FAIL, IPC_MSG_REQ_SERVER_DISCONNECT, IPC_MSG_RES_SERVER_DISCONNECT, }; /* Test Cases * * 1) basic send & recv differnet message sizes * * 2) send message to start dispatch (confirm receipt) * * 3) flow control * * 4) authentication * * 5) thread safety * * 6) cleanup * * 7) service availabilty * * 8) multiple services */ static qb_loop_t *my_loop; static qb_ipcs_service_t* s1; static int32_t turn_on_fc = QB_FALSE; static int32_t fc_enabled = 89; static int32_t send_event_on_created = QB_FALSE; static int32_t disconnect_after_created = QB_FALSE; static int32_t num_bulk_events = 10; static int32_t exit_handler(int32_t rsignal, void *data) { qb_log(LOG_DEBUG, "caught signal %d", rsignal); qb_ipcs_destroy(s1); return -1; } static int32_t s1_msg_process_fn(qb_ipcs_connection_t *c, void *data, size_t size) { struct qb_ipc_request_header *req_pt = (struct qb_ipc_request_header *)data; struct qb_ipc_response_header response; ssize_t res; if (req_pt->id == IPC_MSG_REQ_TX_RX) { response.size = sizeof(struct qb_ipc_response_header); response.id = IPC_MSG_RES_TX_RX; response.error = 0; res = qb_ipcs_response_send(c, &response, response.size); if (res < 0) { qb_perror(LOG_INFO, "qb_ipcs_response_send"); } else if (res != response.size) { qb_log(LOG_DEBUG, "qb_ipcs_response_send %zd != %d", res, response.size); } if (turn_on_fc) { qb_ipcs_request_rate_limit(s1, QB_IPCS_RATE_OFF); } } else if (req_pt->id == IPC_MSG_REQ_DISPATCH) { response.size = sizeof(struct qb_ipc_response_header); response.id = IPC_MSG_RES_DISPATCH; response.error = 0; res = qb_ipcs_event_send(c, &response, sizeof(response)); if (res < 0) { qb_perror(LOG_INFO, "qb_ipcs_event_send"); } } else if (req_pt->id == IPC_MSG_REQ_BULK_EVENTS) { int32_t m; int32_t num; struct qb_ipcs_connection_stats_2 *stats; response.size = sizeof(struct qb_ipc_response_header); response.error = 0; stats = qb_ipcs_connection_stats_get_2(c, QB_FALSE); num = stats->event_q_length; free(stats); /* crazy large message */ res = qb_ipcs_event_send(c, &response, MAX_MSG_SIZE*10); ck_assert_int_eq(res, -EMSGSIZE); for (m = 0; m < num_bulk_events; m++) { res = qb_ipcs_event_send(c, &response, sizeof(response)); ck_assert_int_eq(res, sizeof(response)); response.id++; } stats = qb_ipcs_connection_stats_get_2(c, QB_FALSE); ck_assert_int_eq(stats->event_q_length - num, num_bulk_events); free(stats); response.id = IPC_MSG_RES_BULK_EVENTS; res = qb_ipcs_response_send(c, &response, response.size); ck_assert_int_eq(res, sizeof(response)); } else if (req_pt->id == IPC_MSG_REQ_SERVER_FAIL) { exit(0); } else if (req_pt->id == IPC_MSG_REQ_SERVER_DISCONNECT) { qb_ipcs_disconnect(c); } return 0; } static int32_t my_job_add(enum qb_loop_priority p, void *data, qb_loop_job_dispatch_fn fn) { return qb_loop_job_add(my_loop, p, data, fn); } static int32_t my_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t events, void *data, qb_ipcs_dispatch_fn_t fn) { return qb_loop_poll_add(my_loop, p, fd, events, data, fn); } static int32_t my_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t events, void *data, qb_ipcs_dispatch_fn_t fn) { return qb_loop_poll_mod(my_loop, p, fd, events, data, fn); } static int32_t my_dispatch_del(int32_t fd) { return qb_loop_poll_del(my_loop, fd); } static int32_t s1_connection_closed(qb_ipcs_connection_t *c) { qb_enter(); qb_leave(); return 0; } static void s1_connection_destroyed(qb_ipcs_connection_t *c) { qb_enter(); qb_loop_stop(my_loop); qb_leave(); } static void s1_connection_created(qb_ipcs_connection_t *c) { if (send_event_on_created) { struct qb_ipc_response_header response; int32_t res; response.size = sizeof(struct qb_ipc_response_header); response.id = IPC_MSG_RES_DISPATCH; response.error = 0; res = qb_ipcs_event_send(c, &response, sizeof(response)); ck_assert_int_eq(res, response.size); } } static void run_ipc_server(void) { int32_t res; qb_loop_signal_handle handle; struct qb_ipcs_service_handlers sh = { .connection_accept = NULL, .connection_created = s1_connection_created, .msg_process = s1_msg_process_fn, .connection_destroyed = s1_connection_destroyed, .connection_closed = s1_connection_closed, }; struct qb_ipcs_poll_handlers ph = { .job_add = my_job_add, .dispatch_add = my_dispatch_add, .dispatch_mod = my_dispatch_mod, .dispatch_del = my_dispatch_del, }; qb_loop_signal_add(my_loop, QB_LOOP_HIGH, SIGSTOP, NULL, exit_handler, &handle); qb_loop_signal_add(my_loop, QB_LOOP_HIGH, SIGTERM, NULL, exit_handler, &handle); my_loop = qb_loop_create(); s1 = qb_ipcs_create(ipc_name, 4, ipc_type, &sh); fail_if(s1 == 0); qb_ipcs_poll_handlers_set(s1, &ph); res = qb_ipcs_run(s1); ck_assert_int_eq(res, 0); qb_loop_run(my_loop); } static int32_t run_function_in_new_process(void (*run_ipc_server_fn)(void)) { pid_t pid = fork (); if (pid == -1) { fprintf (stderr, "Can't fork\n"); return -1; } if (pid == 0) { run_ipc_server_fn(); return 0; } return pid; } static int32_t stop_process(pid_t pid) { /* wait a bit for the server to shutdown by it's self */ usleep(100000); kill(pid, SIGTERM); waitpid(pid, NULL, 0); return 0; } struct my_req { struct qb_ipc_request_header hdr; char message[1024 * 1024]; }; static struct my_req request; static int32_t send_and_check(int32_t req_id, uint32_t size, int32_t ms_timeout, int32_t expect_perfection) { struct qb_ipc_response_header res_header; int32_t res; int32_t try_times = 0; request.hdr.id = req_id; request.hdr.size = sizeof(struct qb_ipc_request_header) + size; /* check that we can't send a message that is too big * and we get the right return code. */ res = qb_ipcc_send(conn, &request, MAX_MSG_SIZE*2); ck_assert_int_eq(res, -EMSGSIZE); repeat_send: - res = qb_ipcc_send(conn, &request, request.hdr.size); try_times++; if (res < 0) { if (res == -EAGAIN && try_times < 10) { goto repeat_send; } else { if (res == -EAGAIN && try_times >= 10) { fc_enabled = QB_TRUE; } errno = -res; qb_perror(LOG_INFO, "qb_ipcc_send"); return res; } } if (req_id == IPC_MSG_REQ_DISPATCH) { res = qb_ipcc_event_recv(conn, &res_header, sizeof(struct qb_ipc_response_header), ms_timeout); } else { res = qb_ipcc_recv(conn, &res_header, sizeof(struct qb_ipc_response_header), ms_timeout); } if (res == -EINTR) { return -1; } if (res == -EAGAIN || res == -ETIMEDOUT) { fc_enabled = QB_TRUE; qb_perror(LOG_DEBUG, "qb_ipcc_recv"); return res; } if (expect_perfection) { ck_assert_int_eq(res, sizeof(struct qb_ipc_response_header)); ck_assert_int_eq(res_header.id, req_id + 1); ck_assert_int_eq(res_header.size, sizeof(struct qb_ipc_response_header)); } return res; } static int32_t recv_timeout = -1; static void test_ipc_txrx(void) { int32_t j; int32_t c = 0; size_t size; pid_t pid; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); size = QB_MIN(sizeof(struct qb_ipc_request_header), 64); for (j = 1; j < 19; j++) { size *= 2; if (size >= MAX_MSG_SIZE) break; if (send_and_check(IPC_MSG_REQ_TX_RX, size, recv_timeout, QB_TRUE) < 0) { break; } } if (turn_on_fc) { ck_assert_int_eq(fc_enabled, QB_TRUE); } qb_ipcc_disconnect(conn); stop_process(pid); } static void test_ipc_exit(void) { struct qb_ipc_request_header req_header; struct qb_ipc_response_header res_header; struct iovec iov[1]; int32_t res; int32_t c = 0; int32_t j = 0; pid_t pid; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); req_header.id = IPC_MSG_REQ_TX_RX; req_header.size = sizeof(struct qb_ipc_request_header); iov[0].iov_len = req_header.size; iov[0].iov_base = &req_header; res = qb_ipcc_sendv_recv(conn, iov, 1, &res_header, sizeof(struct qb_ipc_response_header), -1); ck_assert_int_eq(res, sizeof(struct qb_ipc_response_header)); /* kill the server */ stop_process(pid); /* * wait a bit for the server to die. */ sleep(1); /* * this needs to free up the shared mem */ qb_ipcc_disconnect(conn); } START_TEST(test_ipc_exit_us) { qb_enter(); ipc_type = QB_IPC_SOCKET; ipc_name = __func__; recv_timeout = 5000; test_ipc_exit(); qb_leave(); } END_TEST START_TEST(test_ipc_exit_shm) { qb_enter(); ipc_type = QB_IPC_SHM; ipc_name = __func__; recv_timeout = 1000; test_ipc_exit(); qb_leave(); } END_TEST START_TEST(test_ipc_txrx_shm_tmo) { qb_enter(); turn_on_fc = QB_FALSE; ipc_type = QB_IPC_SHM; ipc_name = __func__; recv_timeout = 1000; test_ipc_txrx(); qb_leave(); } END_TEST START_TEST(test_ipc_txrx_shm_block) { qb_enter(); turn_on_fc = QB_FALSE; ipc_type = QB_IPC_SHM; ipc_name = __func__; recv_timeout = -1; test_ipc_txrx(); qb_leave(); } END_TEST START_TEST(test_ipc_fc_shm) { qb_enter(); turn_on_fc = QB_TRUE; ipc_type = QB_IPC_SHM; recv_timeout = 500; ipc_name = __func__; test_ipc_txrx(); qb_leave(); } END_TEST START_TEST(test_ipc_txrx_us_block) { qb_enter(); turn_on_fc = QB_FALSE; ipc_type = QB_IPC_SOCKET; ipc_name = __func__; recv_timeout = -1; test_ipc_txrx(); qb_leave(); } END_TEST START_TEST(test_ipc_txrx_us_tmo) { qb_enter(); turn_on_fc = QB_FALSE; ipc_type = QB_IPC_SOCKET; ipc_name = __func__; recv_timeout = 1000; test_ipc_txrx(); qb_leave(); } END_TEST START_TEST(test_ipc_fc_us) { qb_enter(); turn_on_fc = QB_TRUE; ipc_type = QB_IPC_SOCKET; recv_timeout = 500; ipc_name = __func__; test_ipc_txrx(); qb_leave(); } END_TEST struct my_res { struct qb_ipc_response_header hdr; char message[1024 * 1024]; }; static void test_ipc_dispatch(void) { int32_t j; int32_t c = 0; pid_t pid; int32_t size; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); size = QB_MIN(sizeof(struct qb_ipc_request_header), 64); for (j = 1; j < 19; j++) { size *= 2; if (size >= MAX_MSG_SIZE) break; if (send_and_check(IPC_MSG_REQ_DISPATCH, size, recv_timeout, QB_TRUE) < 0) { break; } } qb_ipcc_disconnect(conn); stop_process(pid); } START_TEST(test_ipc_disp_us) { qb_enter(); ipc_type = QB_IPC_SOCKET; ipc_name = __func__; test_ipc_dispatch(); qb_leave(); } END_TEST static int32_t events_received; static int32_t count_bulk_events(int32_t fd, int32_t revents, void *data) { qb_loop_t *cl = (qb_loop_t*)data; + struct qb_ipc_response_header res_header; + int32_t res; events_received++; + res = qb_ipcc_event_recv(conn, &res_header, + sizeof(struct qb_ipc_response_header), + -1); if (events_received >= num_bulk_events) { qb_loop_stop(cl); return -1; } return 0; } static void test_ipc_bulk_events(void) { struct qb_ipc_request_header req_header; struct qb_ipc_response_header res_header; struct iovec iov[1]; int32_t c = 0; int32_t j = 0; pid_t pid; int32_t res; qb_loop_t *cl; int32_t fd; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); events_received = 0; cl = qb_loop_create(); res = qb_ipcc_fd_get(conn, &fd); ck_assert_int_eq(res, 0); res = qb_loop_poll_add(cl, QB_LOOP_MED, fd, POLLIN, cl, count_bulk_events); ck_assert_int_eq(res, 0); res = send_and_check(IPC_MSG_REQ_BULK_EVENTS, 0, recv_timeout, QB_TRUE); ck_assert_int_eq(res, sizeof(struct qb_ipc_response_header)); qb_loop_run(cl); ck_assert_int_eq(events_received, num_bulk_events); req_header.id = IPC_MSG_REQ_SERVER_FAIL; req_header.size = sizeof(struct qb_ipc_request_header); iov[0].iov_len = req_header.size; iov[0].iov_base = &req_header; res = qb_ipcc_sendv_recv(conn, iov, 1, &res_header, sizeof(struct qb_ipc_response_header), -1); if (res != -ECONNRESET && res != -ENOTCONN) { qb_log(LOG_ERR, "id:%d size:%d", res_header.id, res_header.size); ck_assert_int_eq(res, -ENOTCONN); } qb_ipcc_disconnect(conn); stop_process(pid); } START_TEST(test_ipc_bulk_events_us) { qb_enter(); send_event_on_created = QB_FALSE; ipc_type = QB_IPC_SOCKET; ipc_name = __func__; test_ipc_bulk_events(); qb_leave(); } END_TEST static void test_ipc_event_on_created(void) { int32_t c = 0; int32_t j = 0; pid_t pid; int32_t res; qb_loop_t *cl; int32_t fd; num_bulk_events = 1; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); events_received = 0; cl = qb_loop_create(); res = qb_ipcc_fd_get(conn, &fd); ck_assert_int_eq(res, 0); res = qb_loop_poll_add(cl, QB_LOOP_MED, fd, POLLIN, cl, count_bulk_events); ck_assert_int_eq(res, 0); qb_loop_run(cl); ck_assert_int_eq(events_received, num_bulk_events); qb_ipcc_disconnect(conn); stop_process(pid); } START_TEST(test_ipc_event_on_created_us) { qb_enter(); send_event_on_created = QB_TRUE; ipc_type = QB_IPC_SOCKET; ipc_name = __func__; test_ipc_event_on_created(); qb_leave(); } END_TEST static void test_ipc_disconnect_after_created(void) { struct qb_ipc_request_header req_header; struct qb_ipc_response_header res_header; struct iovec iov[1]; int32_t c = 0; int32_t j = 0; pid_t pid; int32_t res; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); ck_assert_int_eq(QB_TRUE, qb_ipcc_is_connected(conn)); req_header.id = IPC_MSG_REQ_SERVER_DISCONNECT; req_header.size = sizeof(struct qb_ipc_request_header); iov[0].iov_len = req_header.size; iov[0].iov_base = &req_header; res = qb_ipcc_sendv_recv(conn, iov, 1, &res_header, sizeof(struct qb_ipc_response_header), -1); /* * confirm we get -ENOTCONN */ ck_assert_int_eq(res, -ENOTCONN); ck_assert_int_eq(QB_FALSE, qb_ipcc_is_connected(conn)); qb_ipcc_disconnect(conn); stop_process(pid); } START_TEST(test_ipc_disconnect_after_created_us) { qb_enter(); disconnect_after_created = QB_TRUE; ipc_type = QB_IPC_SOCKET; ipc_name = __func__; test_ipc_disconnect_after_created(); qb_leave(); } END_TEST static void test_ipc_server_fail(void) { struct qb_ipc_request_header req_header; struct qb_ipc_response_header res_header; struct iovec iov[1]; int32_t res; int32_t j; int32_t c = 0; pid_t pid; pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); sleep(1); do { conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); sleep(1); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); /* * tell the server to exit */ req_header.id = IPC_MSG_REQ_SERVER_FAIL; req_header.size = sizeof(struct qb_ipc_request_header); iov[0].iov_len = req_header.size; iov[0].iov_base = &req_header; ck_assert_int_eq(QB_TRUE, qb_ipcc_is_connected(conn)); res = qb_ipcc_sendv_recv(conn, iov, 1, &res_header, sizeof(struct qb_ipc_response_header), -1); /* * confirm we get -ENOTCONN */ ck_assert_int_eq(res, -ENOTCONN); ck_assert_int_eq(QB_FALSE, qb_ipcc_is_connected(conn)); qb_ipcc_disconnect(conn); stop_process(pid); } START_TEST(test_ipc_server_fail_soc) { qb_enter(); ipc_type = QB_IPC_SOCKET; ipc_name = __func__; test_ipc_server_fail(); qb_leave(); } END_TEST START_TEST(test_ipc_disp_shm) { qb_enter(); ipc_type = QB_IPC_SHM; ipc_name = __func__; test_ipc_dispatch(); qb_leave(); } END_TEST START_TEST(test_ipc_bulk_events_shm) { qb_enter(); ipc_type = QB_IPC_SHM; ipc_name = __func__; test_ipc_bulk_events(); qb_leave(); } END_TEST START_TEST(test_ipc_event_on_created_shm) { qb_enter(); send_event_on_created = QB_TRUE; ipc_type = QB_IPC_SHM; ipc_name = __func__; test_ipc_event_on_created(); qb_leave(); } END_TEST START_TEST(test_ipc_server_fail_shm) { qb_enter(); ipc_type = QB_IPC_SHM; ipc_name = __func__; test_ipc_server_fail(); qb_leave(); } END_TEST static Suite * make_shm_suite(void) { TCase *tc; Suite *s = suite_create("shm"); tc = tcase_create("ipc_server_fail_shm"); tcase_add_test(tc, test_ipc_server_fail_shm); tcase_set_timeout(tc, 6); suite_add_tcase(s, tc); tc = tcase_create("ipc_txrx_shm_block"); tcase_add_test(tc, test_ipc_txrx_shm_block); tcase_set_timeout(tc, 6); suite_add_tcase(s, tc); tc = tcase_create("ipc_txrx_shm_tmo"); tcase_add_test(tc, test_ipc_txrx_shm_tmo); tcase_set_timeout(tc, 6); suite_add_tcase(s, tc); tc = tcase_create("ipc_fc_shm"); tcase_add_test(tc, test_ipc_fc_shm); tcase_set_timeout(tc, 6); suite_add_tcase(s, tc); tc = tcase_create("ipc_dispatch_shm"); tcase_add_test(tc, test_ipc_disp_shm); tcase_set_timeout(tc, 16); suite_add_tcase(s, tc); tc = tcase_create("ipc_bulk_events_shm"); tcase_add_test(tc, test_ipc_bulk_events_shm); tcase_set_timeout(tc, 16); suite_add_tcase(s, tc); tc = tcase_create("ipc_exit_shm"); tcase_add_test(tc, test_ipc_exit_shm); tcase_set_timeout(tc, 3); suite_add_tcase(s, tc); tc = tcase_create("ipc_event_on_created_shm"); tcase_add_test(tc, test_ipc_event_on_created_shm); suite_add_tcase(s, tc); return s; } static Suite * make_soc_suite(void) { Suite *s = suite_create("socket"); TCase *tc; tc = tcase_create("ipc_server_fail_soc"); tcase_add_test(tc, test_ipc_server_fail_soc); tcase_set_timeout(tc, 6); suite_add_tcase(s, tc); tc = tcase_create("ipc_txrx_us_block"); tcase_add_test(tc, test_ipc_txrx_us_block); tcase_set_timeout(tc, 6); suite_add_tcase(s, tc); tc = tcase_create("ipc_txrx_us_tmo"); tcase_add_test(tc, test_ipc_txrx_us_tmo); tcase_set_timeout(tc, 6); suite_add_tcase(s, tc); tc = tcase_create("ipc_fc_us"); tcase_add_test(tc, test_ipc_fc_us); tcase_set_timeout(tc, 6); suite_add_tcase(s, tc); tc = tcase_create("ipc_exit_us"); tcase_add_test(tc, test_ipc_exit_us); tcase_set_timeout(tc, 6); suite_add_tcase(s, tc); tc = tcase_create("ipc_dispatch_us"); tcase_add_test(tc, test_ipc_disp_us); tcase_set_timeout(tc, 16); suite_add_tcase(s, tc); tc = tcase_create("ipc_bulk_events_us"); tcase_add_test(tc, test_ipc_bulk_events_us); tcase_set_timeout(tc, 16); suite_add_tcase(s, tc); tc = tcase_create("ipc_event_on_created_us"); tcase_add_test(tc, test_ipc_event_on_created_us); suite_add_tcase(s, tc); tc = tcase_create("ipc_disconnect_after_created_us"); tcase_add_test(tc, test_ipc_disconnect_after_created_us); suite_add_tcase(s, tc); return s; } int32_t main(void) { int32_t number_failed; SRunner *sr; Suite *s; int32_t do_shm_tests = QB_TRUE; #ifdef DISABLE_IPC_SHM do_shm_tests = QB_FALSE; #endif /* DISABLE_IPC_SHM */ s = make_soc_suite(); sr = srunner_create(s); if (do_shm_tests) { srunner_add_suite(sr, make_shm_suite()); } qb_log_init("check", LOG_USER, LOG_EMERG); qb_log_ctl(QB_LOG_SYSLOG, QB_LOG_CONF_ENABLED, QB_FALSE); qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD, QB_LOG_FILTER_FILE, "*", LOG_TRACE); qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE); qb_log_format_set(QB_LOG_STDERR, "lib/%f|%l| %b"); srunner_run_all(sr, CK_VERBOSE); number_failed = srunner_ntests_failed(sr); srunner_free(sr); return (number_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE; }