Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/lib/ipc_setup.c b/lib/ipc_setup.c
index f5dec9f..c144a5e 100644
--- a/lib/ipc_setup.c
+++ b/lib/ipc_setup.c
@@ -1,934 +1,934 @@
/*
* Copyright (C) 2010,2013 Red Hat, Inc.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os_base.h"
#include <poll.h>
#if defined(HAVE_GETPEERUCRED)
#include <ucred.h>
#endif
#ifdef HAVE_SYS_UN_H
#include <sys/un.h>
#endif /* HAVE_SYS_UN_H */
#ifdef HAVE_SYS_STAT_H
#include <sys/stat.h>
#endif
#ifdef HAVE_SYS_MMAN_H
#include <sys/mman.h>
#endif
#include <qb/qbatomic.h>
#include <qb/qbipcs.h>
#include <qb/qbloop.h>
#include <qb/qbdefs.h>
#include "util_int.h"
#include "ipc_int.h"
struct ipc_auth_ugp {
uid_t uid;
gid_t gid;
pid_t pid;
};
struct ipc_auth_data {
int32_t sock;
struct qb_ipcs_service *s;
union {
struct qb_ipc_connection_request req;
struct qb_ipc_connection_response res;
} msg;
struct msghdr msg_recv;
struct iovec iov_recv;
struct ipc_auth_ugp ugp;
size_t processed;
size_t len;
#ifdef SO_PASSCRED
char *cmsg_cred;
#endif
};
static int32_t qb_ipcs_us_connection_acceptor(int fd, int revent, void *data);
ssize_t
qb_ipc_us_send(struct qb_ipc_one_way *one_way, const void *msg, size_t len)
{
int32_t result;
int32_t processed = 0;
char *rbuf = (char *)msg;
qb_sigpipe_ctl(QB_SIGPIPE_IGNORE);
retry_send:
result = send(one_way->u.us.sock,
&rbuf[processed], len - processed, MSG_NOSIGNAL);
if (result == -1) {
if (errno == EAGAIN && processed > 0) {
goto retry_send;
} else {
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
return -errno;
}
}
processed += result;
if (processed != len) {
goto retry_send;
}
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
return processed;
}
static ssize_t
qb_ipc_us_recv_msghdr(struct ipc_auth_data *data)
{
char *msg = (char *) &data->msg;
int32_t result;
qb_sigpipe_ctl(QB_SIGPIPE_IGNORE);
retry_recv:
data->msg_recv.msg_iov->iov_base = &msg[data->processed];
data->msg_recv.msg_iov->iov_len = data->len - data->processed;
result = recvmsg(data->sock, &data->msg_recv, MSG_NOSIGNAL | MSG_WAITALL);
if (result == -1 && errno == EAGAIN) {
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
return -EAGAIN;
}
if (result == -1) {
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
return -errno;
}
if (result == 0) {
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
qb_util_log(LOG_DEBUG,
"recv(fd %d) got 0 bytes assuming ENOTCONN", data->sock);
return -ENOTCONN;
}
data->processed += result;
if (data->processed != data->len) {
goto retry_recv;
}
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
assert(data->processed == data->len);
return data->processed;
}
int32_t
qb_ipc_us_sock_error_is_disconnected(int err)
{
if (err >= 0) {
return QB_FALSE;
} else if (err == -EAGAIN ||
err == -ETIMEDOUT ||
err == -EINTR ||
#ifdef EWOULDBLOCK
err == -EWOULDBLOCK ||
#endif
err == -EMSGSIZE ||
err == -ENOMSG ||
err == -EINVAL) {
return QB_FALSE;
}
return QB_TRUE;
}
int32_t
qb_ipc_us_ready(struct qb_ipc_one_way * ow_data,
struct qb_ipc_one_way * ow_conn,
int32_t ms_timeout, int32_t events)
{
struct pollfd ufds[2];
int32_t poll_events;
int numfds = 1;
int i;
ufds[0].fd = ow_data->u.us.sock;
ufds[0].events = events;
ufds[0].revents = 0;
if (ow_conn && ow_data != ow_conn) {
numfds++;
ufds[1].fd = ow_conn->u.us.sock;
ufds[1].events = POLLIN;
ufds[1].revents = 0;
}
poll_events = poll(ufds, numfds, ms_timeout);
if ((poll_events == -1 && errno == EINTR) || poll_events == 0) {
return -EAGAIN;
} else if (poll_events == -1) {
return -errno;
}
for (i = 0; i < poll_events; i++) {
if (ufds[i].revents & POLLERR) {
qb_util_log(LOG_DEBUG, "poll(fd %d) got POLLERR",
ufds[i].fd);
return -ENOTCONN;
} else if (ufds[i].revents & POLLHUP) {
qb_util_log(LOG_DEBUG, "poll(fd %d) got POLLHUP",
ufds[i].fd);
return -ENOTCONN;
} else if (ufds[i].revents & POLLNVAL) {
qb_util_log(LOG_DEBUG, "poll(fd %d) got POLLNVAL",
ufds[i].fd);
return -ENOTCONN;
} else if (ufds[i].revents == 0) {
qb_util_log(LOG_DEBUG, "poll(fd %d) zero revents",
ufds[i].fd);
return -ENOTCONN;
}
}
return 0;
}
/*
* recv an entire message - and try hard to get all of it.
*/
ssize_t
qb_ipc_us_recv(struct qb_ipc_one_way * one_way,
void *msg, size_t len, int32_t timeout)
{
int32_t result;
int32_t final_rc = 0;
int32_t processed = 0;
int32_t to_recv = len;
char *data = msg;
qb_sigpipe_ctl(QB_SIGPIPE_IGNORE);
retry_recv:
result = recv(one_way->u.us.sock, &data[processed], to_recv,
MSG_NOSIGNAL | MSG_WAITALL);
if (result == -1) {
if (errno == EAGAIN && (processed > 0 || timeout == -1)) {
result = qb_ipc_us_ready(one_way, NULL, timeout, POLLIN);
if (result == 0 || result == -EAGAIN) {
goto retry_recv;
}
final_rc = result;
goto cleanup_sigpipe;
} else if (errno == ECONNRESET || errno == EPIPE) {
final_rc = -ENOTCONN;
goto cleanup_sigpipe;
} else {
final_rc = -errno;
goto cleanup_sigpipe;
}
}
if (result == 0) {
final_rc = -ENOTCONN;
goto cleanup_sigpipe;
}
processed += result;
to_recv -= result;
if (processed != len) {
goto retry_recv;
}
final_rc = processed;
cleanup_sigpipe:
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
return final_rc;
}
static int32_t
qb_ipcc_stream_sock_connect(const char *socket_name, int32_t * sock_pt)
{
int32_t request_fd;
struct sockaddr_un address;
int32_t res = 0;
request_fd = socket(PF_UNIX, SOCK_STREAM, 0);
if (request_fd == -1) {
return -errno;
}
qb_socket_nosigpipe(request_fd);
res = qb_sys_fd_nonblock_cloexec_set(request_fd);
if (res < 0) {
goto error_connect;
}
memset(&address, 0, sizeof(struct sockaddr_un));
address.sun_family = AF_UNIX;
#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
address.sun_len = QB_SUN_LEN(&address);
#endif
if (!use_filesystem_sockets()) {
snprintf(address.sun_path + 1, UNIX_PATH_MAX - 1, "%s", socket_name);
} else {
snprintf(address.sun_path, sizeof(address.sun_path), "%s/%s", SOCKETDIR,
socket_name);
}
if (connect(request_fd, (struct sockaddr *)&address,
QB_SUN_LEN(&address)) == -1) {
res = -errno;
goto error_connect;
}
*sock_pt = request_fd;
return 0;
error_connect:
close(request_fd);
*sock_pt = -1;
return res;
}
void
qb_ipcc_us_sock_close(int32_t sock)
{
shutdown(sock, SHUT_RDWR);
close(sock);
}
static int32_t
qb_ipc_auth_creds(struct ipc_auth_data *data)
{
int32_t res = 0;
/*
* currently support getpeerucred, getpeereid, and SO_PASSCRED credential
* retrieval mechanisms for various Platforms
*/
#ifdef HAVE_GETPEERUCRED
/*
* Solaris and some BSD systems
*/
{
ucred_t *uc = NULL;
if (getpeerucred(data->sock, &uc) == 0) {
res = 0;
data->ugp.uid = ucred_geteuid(uc);
data->ugp.gid = ucred_getegid(uc);
data->ugp.pid = ucred_getpid(uc);
ucred_free(uc);
} else {
res = -errno;
}
}
#elif defined(HAVE_GETPEEREID)
/*
* Usually MacOSX systems
*/
{
/*
* TODO get the peer's pid.
* c->pid = ?;
*/
if (getpeereid(data->sock, &data->ugp.uid, &data->ugp.gid) == 0) {
res = 0;
} else {
res = -errno;
}
}
#elif defined(SO_PASSCRED)
/*
* Usually Linux systems
*/
{
struct ucred cred;
struct cmsghdr *cmsg;
res = -EINVAL;
for (cmsg = CMSG_FIRSTHDR(&data->msg_recv); cmsg != NULL;
cmsg = CMSG_NXTHDR(&data->msg_recv, cmsg)) {
if (cmsg->cmsg_type != SCM_CREDENTIALS)
continue;
memcpy(&cred, CMSG_DATA(cmsg), sizeof(struct ucred));
res = 0;
data->ugp.pid = cred.pid;
data->ugp.uid = cred.uid;
data->ugp.gid = cred.gid;
break;
}
}
#else /* no credentials */
data->ugp.pid = 0;
data->ugp.uid = 0;
data->ugp.gid = 0;
res = -ENOTSUP;
#endif /* no credentials */
return res;
}
static void
destroy_ipc_auth_data(struct ipc_auth_data *data)
{
if (data->s) {
qb_ipcs_unref(data->s);
}
#ifdef SO_PASSCRED
free(data->cmsg_cred);
#endif
free(data);
}
static struct ipc_auth_data *
init_ipc_auth_data(int sock, size_t len)
{
struct ipc_auth_data *data = calloc(1, sizeof(struct ipc_auth_data));
if (data == NULL) {
return NULL;
}
data->msg_recv.msg_iov = &data->iov_recv;
data->msg_recv.msg_iovlen = 1;
data->msg_recv.msg_name = 0;
data->msg_recv.msg_namelen = 0;
#ifdef SO_PASSCRED
data->cmsg_cred = calloc(1, CMSG_SPACE(sizeof(struct ucred)));
if (data->cmsg_cred == NULL) {
destroy_ipc_auth_data(data);
return NULL;
}
data->msg_recv.msg_control = (void *)data->cmsg_cred;
data->msg_recv.msg_controllen = CMSG_SPACE(sizeof(struct ucred));
#endif
#ifdef QB_SOLARIS
data->msg_recv.msg_accrights = 0;
data->msg_recv.msg_accrightslen = 0;
#else
data->msg_recv.msg_flags = 0;
#endif /* QB_SOLARIS */
data->len = len;
data->iov_recv.iov_base = &data->msg;
data->iov_recv.iov_len = data->len;
data->sock = sock;
return data;
}
int32_t
qb_ipcc_us_setup_connect(struct qb_ipcc_connection *c,
struct qb_ipc_connection_response *r)
{
int32_t res;
struct qb_ipc_connection_request request;
struct ipc_auth_data *data;
#ifdef QB_LINUX
int off = 0;
int on = 1;
#endif
res = qb_ipcc_stream_sock_connect(c->name, &c->setup.u.us.sock);
if (res != 0) {
return res;
}
#ifdef QB_LINUX
setsockopt(c->setup.u.us.sock, SOL_SOCKET, SO_PASSCRED, &on,
sizeof(on));
#endif
memset(&request, 0, sizeof(request));
request.hdr.id = QB_IPC_MSG_AUTHENTICATE;
request.hdr.size = sizeof(request);
request.max_msg_size = c->setup.max_msg_size;
res = qb_ipc_us_send(&c->setup, &request, request.hdr.size);
if (res < 0) {
qb_ipcc_us_sock_close(c->setup.u.us.sock);
return res;
}
data = init_ipc_auth_data(c->setup.u.us.sock, sizeof(struct qb_ipc_connection_response));
if (data == NULL) {
qb_ipcc_us_sock_close(c->setup.u.us.sock);
return -ENOMEM;
}
qb_ipc_us_ready(&c->setup, NULL, -1, POLLIN);
res = qb_ipc_us_recv_msghdr(data);
#ifdef QB_LINUX
setsockopt(c->setup.u.us.sock, SOL_SOCKET, SO_PASSCRED, &off,
sizeof(off));
#endif
if (res != data->len) {
destroy_ipc_auth_data(data);
return res;
}
memcpy(r, &data->msg.res, sizeof(struct qb_ipc_connection_response));
qb_ipc_auth_creds(data);
c->egid = data->ugp.gid;
c->euid = data->ugp.uid;
c->server_pid = data->ugp.pid;
destroy_ipc_auth_data(data);
return r->hdr.error;
}
/*
**************************************************************************
* SERVER
*/
int32_t
qb_ipcs_us_publish(struct qb_ipcs_service * s)
{
struct sockaddr_un un_addr;
int32_t res;
#ifdef SO_PASSCRED
int on = 1;
#endif
/*
* Create socket for IPC clients, name socket, listen for connections
*/
s->server_sock = socket(PF_UNIX, SOCK_STREAM, 0);
if (s->server_sock == -1) {
res = -errno;
qb_util_perror(LOG_ERR, "Cannot create server socket");
return res;
}
res = qb_sys_fd_nonblock_cloexec_set(s->server_sock);
if (res < 0) {
goto error_close;
}
memset(&un_addr, 0, sizeof(struct sockaddr_un));
un_addr.sun_family = AF_UNIX;
#if defined(QB_BSD) || defined(QB_DARWIN)
un_addr.sun_len = SUN_LEN(&un_addr);
#endif
qb_util_log(LOG_INFO, "server name: %s", s->name);
if (!use_filesystem_sockets()) {
snprintf(un_addr.sun_path + 1, UNIX_PATH_MAX - 1, "%s", s->name);
}
else {
struct stat stat_out;
res = stat(SOCKETDIR, &stat_out);
if (res == -1 || (res == 0 && !S_ISDIR(stat_out.st_mode))) {
res = -errno;
qb_util_log(LOG_CRIT,
"Required directory not present %s",
SOCKETDIR);
goto error_close;
}
snprintf(un_addr.sun_path, sizeof(un_addr.sun_path), "%s/%s", SOCKETDIR,
s->name);
unlink(un_addr.sun_path);
}
res = bind(s->server_sock, (struct sockaddr *)&un_addr,
QB_SUN_LEN(&un_addr));
if (res) {
res = -errno;
qb_util_perror(LOG_ERR, "Could not bind AF_UNIX (%s)",
un_addr.sun_path);
goto error_close;
}
/*
* Allow everyone to write to the socket since the IPC layer handles
* security automatically
*/
if (use_filesystem_sockets()) {
- res = chmod(un_addr.sun_path, S_IRWXU | S_IRWXG | S_IRWXO);
+ (void)chmod(un_addr.sun_path, S_IRWXU | S_IRWXG | S_IRWXO);
}
#ifdef SO_PASSCRED
- setsockopt(s->server_sock, SOL_SOCKET, SO_PASSCRED, &on, sizeof(on));
+ (void)setsockopt(s->server_sock, SOL_SOCKET, SO_PASSCRED, &on, sizeof(on));
#endif
if (listen(s->server_sock, SERVER_BACKLOG) == -1) {
qb_util_perror(LOG_ERR, "socket listen failed");
}
res = s->poll_fns.dispatch_add(s->poll_priority, s->server_sock,
POLLIN | POLLPRI | POLLNVAL,
s, qb_ipcs_us_connection_acceptor);
return res;
error_close:
close(s->server_sock);
return res;
}
int32_t
qb_ipcs_us_withdraw(struct qb_ipcs_service * s)
{
qb_util_log(LOG_INFO, "withdrawing server sockets");
(void)s->poll_fns.dispatch_del(s->server_sock);
shutdown(s->server_sock, SHUT_RDWR);
if (use_filesystem_sockets()) {
struct sockaddr_un sockname;
socklen_t socklen = sizeof(sockname);
if ((getsockname(s->server_sock, (struct sockaddr *)&sockname, &socklen) == 0) &&
sockname.sun_family == AF_UNIX) {
#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
/*
* Terminating NUL on FreeBSD is not part of the sun_path.
* Add it to use sun_path as a parameter of unlink
*/
sockname.sun_path[sockname.sun_len - offsetof(struct sockaddr_un, sun_path)] = '\0';
#endif
unlink(sockname.sun_path);
}
}
close(s->server_sock);
s->server_sock = -1;
return 0;
}
static int32_t
handle_new_connection(struct qb_ipcs_service *s,
int32_t auth_result,
int32_t sock,
void *msg, size_t len, struct ipc_auth_ugp *ugp)
{
struct qb_ipcs_connection *c = NULL;
struct qb_ipc_connection_request *req = msg;
int32_t res = auth_result;
int32_t res2 = 0;
uint32_t max_buffer_size = QB_MAX(req->max_msg_size, s->max_buffer_size);
struct qb_ipc_connection_response response;
const char suffix[] = "/qb";
int desc_len;
c = qb_ipcs_connection_alloc(s);
if (c == NULL) {
qb_ipcc_us_sock_close(sock);
return -ENOMEM;
}
c->receive_buf = calloc(1, max_buffer_size);
if (c->receive_buf == NULL) {
free(c);
qb_ipcc_us_sock_close(sock);
return -ENOMEM;
}
c->setup.u.us.sock = sock;
c->request.max_msg_size = max_buffer_size;
c->response.max_msg_size = max_buffer_size;
c->event.max_msg_size = max_buffer_size;
c->pid = ugp->pid;
c->auth.uid = c->euid = ugp->uid;
c->auth.gid = c->egid = ugp->gid;
c->auth.mode = 0600;
c->stats.client_pid = ugp->pid;
memset(&response, 0, sizeof(response));
#if defined(QB_LINUX) || defined(QB_CYGWIN)
desc_len = snprintf(c->description, CONNECTION_DESCRIPTION - sizeof suffix,
"/dev/shm/qb-%d-%d-%d-XXXXXX", s->pid, ugp->pid, c->setup.u.us.sock);
if (desc_len < 0) {
res = -errno;
goto send_response;
}
if (desc_len >= CONNECTION_DESCRIPTION - sizeof suffix) {
res = -ENAMETOOLONG;
goto send_response;
}
if (mkdtemp(c->description) == NULL) {
res = -errno;
goto send_response;
}
if (chmod(c->description, 0770)) {
res = -errno;
goto send_response;
}
/* chown can fail because we might not be root */
(void)chown(c->description, c->auth.uid, c->auth.gid);
/* We can't pass just a directory spec to the clients */
memcpy(c->description + desc_len, suffix, sizeof suffix);
#else
desc_len = snprintf(c->description, CONNECTION_DESCRIPTION,
"%d-%d-%d", s->pid, ugp->pid, c->setup.u.us.sock);
if (desc_len < 0) {
res = -errno;
goto send_response;
}
if (desc_len >= CONNECTION_DESCRIPTION) {
res = -ENAMETOOLONG;
goto send_response;
}
#endif
if (auth_result == 0 && c->service->serv_fns.connection_accept) {
res = c->service->serv_fns.connection_accept(c,
c->euid, c->egid);
}
if (res != 0) {
goto send_response;
}
qb_util_log(LOG_DEBUG, "IPC credentials authenticated (%s)",
c->description);
if (s->funcs.connect) {
res = s->funcs.connect(s, c, &response);
if (res != 0) {
goto send_response;
}
}
/*
* The connection is good, add it to the active connection list
*/
c->state = QB_IPCS_CONNECTION_ACTIVE;
qb_list_add(&c->list, &s->connections);
send_response:
response.hdr.id = QB_IPC_MSG_AUTHENTICATE;
response.hdr.size = sizeof(response);
response.hdr.error = res;
if (res == 0) {
response.connection = (intptr_t) c;
response.connection_type = s->type;
response.max_msg_size = c->request.max_msg_size;
s->stats.active_connections++;
}
res2 = qb_ipc_us_send(&c->setup, &response, response.hdr.size);
if (res == 0 && res2 != response.hdr.size) {
res = res2;
}
if (res == 0) {
qb_ipcs_connection_ref(c);
if (s->serv_fns.connection_created) {
s->serv_fns.connection_created(c);
}
if (c->state == QB_IPCS_CONNECTION_ACTIVE) {
c->state = QB_IPCS_CONNECTION_ESTABLISHED;
}
qb_ipcs_connection_unref(c);
} else {
if (res == -EACCES) {
qb_util_log(LOG_ERR, "Invalid IPC credentials (%s).",
c->description);
} else if (res == -EAGAIN) {
qb_util_log(LOG_WARNING, "Denied connection, is not ready (%s)",
c->description);
} else {
errno = -res;
qb_util_perror(LOG_ERR,
"Error in connection setup (%s)",
c->description);
}
if (c->state == QB_IPCS_CONNECTION_INACTIVE) {
/* This removes the initial alloc ref */
qb_ipcs_connection_unref(c);
qb_ipcc_us_sock_close(sock);
} else {
qb_ipcs_disconnect(c);
}
}
return res;
}
static int32_t
process_auth(int32_t fd, int32_t revents, void *d)
{
struct ipc_auth_data *data = (struct ipc_auth_data *) d;
int32_t res = 0;
#ifdef SO_PASSCRED
int off = 0;
#endif
if (data->s->server_sock == -1) {
qb_util_log(LOG_DEBUG, "Closing fd (%d) for server shutdown", fd);
res = -ESHUTDOWN;
goto cleanup_and_return;
}
if (revents & POLLNVAL) {
qb_util_log(LOG_DEBUG, "NVAL conn fd (%d)", fd);
res = -EINVAL;
goto cleanup_and_return;
}
if (revents & POLLHUP) {
qb_util_log(LOG_DEBUG, "HUP conn fd (%d)", fd);
res = -ESHUTDOWN;
goto cleanup_and_return;
}
if ((revents & POLLIN) == 0) {
return 0;
}
res = qb_ipc_us_recv_msghdr(data);
if (res == -EAGAIN) {
/* yield to mainloop, Let mainloop call us again */
return 0;
}
if (res != data->len) {
res = -EIO;
goto cleanup_and_return;
}
res = qb_ipc_auth_creds(data);
cleanup_and_return:
#ifdef SO_PASSCRED
setsockopt(data->sock, SOL_SOCKET, SO_PASSCRED, &off, sizeof(off));
#endif
(void)data->s->poll_fns.dispatch_del(data->sock);
if (res < 0) {
close(data->sock);
} else if (data->msg.req.hdr.id == QB_IPC_MSG_AUTHENTICATE) {
(void)handle_new_connection(data->s, res, data->sock, &data->msg, data->len, &data->ugp);
} else {
close(data->sock);
}
destroy_ipc_auth_data(data);
return 1;
}
static void
qb_ipcs_uc_recv_and_auth(int32_t sock, struct qb_ipcs_service *s)
{
int res = 0;
struct ipc_auth_data *data = NULL;
#ifdef SO_PASSCRED
int on = 1;
#endif
data = init_ipc_auth_data(sock, sizeof(struct qb_ipc_connection_request));
if (data == NULL) {
close(sock);
/* -ENOMEM */
return;
}
data->s = s;
qb_ipcs_ref(data->s);
#ifdef SO_PASSCRED
setsockopt(sock, SOL_SOCKET, SO_PASSCRED, &on, sizeof(on));
#endif
res = s->poll_fns.dispatch_add(s->poll_priority,
data->sock,
POLLIN | POLLPRI | POLLNVAL,
data, process_auth);
if (res < 0) {
qb_util_log(LOG_DEBUG, "Failed to arrange for AUTH for fd (%d)",
data->sock);
close(sock);
destroy_ipc_auth_data(data);
}
}
static int32_t
qb_ipcs_us_connection_acceptor(int fd, int revent, void *data)
{
struct sockaddr_un un_addr;
int32_t new_fd;
struct qb_ipcs_service *s = (struct qb_ipcs_service *)data;
int32_t res;
socklen_t addrlen = sizeof(struct sockaddr_un);
if (revent & (POLLNVAL | POLLHUP | POLLERR)) {
/*
* handle shutdown more cleanly.
*/
return -1;
}
retry_accept:
errno = 0;
new_fd = accept(fd, (struct sockaddr *)&un_addr, &addrlen);
if (new_fd == -1 && errno == EINTR) {
goto retry_accept;
}
if (new_fd == -1 && errno == EBADF) {
qb_util_perror(LOG_ERR,
"Could not accept client connection from fd:%d",
fd);
return -1;
}
if (new_fd == -1) {
qb_util_perror(LOG_ERR, "Could not accept client connection");
/* This is an error, but -1 would indicate disconnect
* from the poll loop
*/
return 0;
}
res = qb_sys_fd_nonblock_cloexec_set(new_fd);
if (res < 0) {
close(new_fd);
/* This is an error, but -1 would indicate disconnect
* from the poll loop
*/
return 0;
}
qb_ipcs_uc_recv_and_auth(new_fd, s);
return 0;
}
void remove_tempdir(const char *name)
{
#if defined(QB_LINUX) || defined(QB_CYGWIN)
char dirname[PATH_MAX];
char *slash = strrchr(name, '/');
if (slash && slash - name < sizeof dirname) {
memcpy(dirname, name, slash - name);
dirname[slash - name] = '\0';
/* This gets called more than it needs to be really, so we don't check
* the return code. It's more of a desperate attempt to clean up after ourself
* in either the server or client.
*/
(void)rmdir(dirname);
}
#endif
}
diff --git a/lib/ipc_socket.c b/lib/ipc_socket.c
index 0a13ebf..178a634 100644
--- a/lib/ipc_socket.c
+++ b/lib/ipc_socket.c
@@ -1,921 +1,921 @@
/*
* Copyright (C) 2010,2013 Red Hat, Inc.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os_base.h"
#include <poll.h>
#ifdef HAVE_SYS_UN_H
#include <sys/un.h>
#endif /* HAVE_SYS_UN_H */
#ifdef HAVE_SYS_MMAN_H
#include <sys/mman.h>
#endif
#include <qb/qbatomic.h>
#include <qb/qbipcs.h>
#include <qb/qbloop.h>
#include <qb/qbdefs.h>
#include "util_int.h"
#include "ipc_int.h"
struct ipc_us_control {
int32_t sent;
int32_t flow_control;
};
#define SHM_CONTROL_SIZE (3 * sizeof(struct ipc_us_control))
int use_filesystem_sockets(void)
{
static int need_init = 1;
static int filesystem_sockets = 0;
if (need_init) {
#if defined(QB_LINUX) || defined(QB_CYGWIN)
struct stat buf;
if (stat(FORCESOCKETSFILE, &buf) == 0) {
filesystem_sockets = 1;
}
#else
filesystem_sockets = 1;
#endif
need_init = 0;
}
return filesystem_sockets;
}
static void
set_sock_addr(struct sockaddr_un *address, const char *socket_name)
{
memset(address, 0, sizeof(struct sockaddr_un));
address->sun_family = AF_UNIX;
#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
address->sun_len = QB_SUN_LEN(address);
#endif
if (socket_name[0] == '/' || !use_filesystem_sockets()) {
snprintf(address->sun_path + 1, UNIX_PATH_MAX - 1, "%s", socket_name);
} else {
snprintf(address->sun_path, sizeof(address->sun_path), "%s/%s", SOCKETDIR,
socket_name);
}
}
static int32_t
qb_ipc_dgram_sock_setup(const char *base_name,
const char *service_name, int32_t * sock_pt,
gid_t gid)
{
int32_t request_fd;
struct sockaddr_un local_address;
int32_t res = 0;
char sock_path[PATH_MAX];
request_fd = socket(PF_UNIX, SOCK_DGRAM, 0);
if (request_fd == -1) {
return -errno;
}
qb_socket_nosigpipe(request_fd);
res = qb_sys_fd_nonblock_cloexec_set(request_fd);
if (res < 0) {
goto error_connect;
}
snprintf(sock_path, PATH_MAX, "%s-%s", base_name, service_name);
set_sock_addr(&local_address, sock_path);
if (use_filesystem_sockets()) {
- res = unlink(local_address.sun_path);
+ (void)unlink(local_address.sun_path);
}
res = bind(request_fd, (struct sockaddr *)&local_address,
sizeof(local_address));
if (use_filesystem_sockets()) {
(void)chmod(local_address.sun_path, 0660);
(void)chown(local_address.sun_path, -1, gid);
}
if (res < 0) {
goto error_connect;
}
*sock_pt = request_fd;
return 0;
error_connect:
close(request_fd);
*sock_pt = -1;
return res;
}
static int32_t
set_sock_size(int sockfd, size_t max_msg_size)
{
int32_t rc;
unsigned int optval;
socklen_t optlen = sizeof(optval);
rc = getsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, &optval, &optlen);
qb_util_log(LOG_TRACE, "%d: getsockopt(%d, SO_SNDBUF, needed:%d) actual:%d",
rc, sockfd, max_msg_size, optval);
/* The optval <= max_msg_size check is weird...
* during testing it was discovered in some instances if the
* default optval is exactly equal to our max_msg_size, we couldn't
* actually send a message that large unless we explicitly set
* it using setsockopt... there is no good explaination for this. Most
* likely this is hitting some sort of "off by one" error in the kernel. */
if (rc == 0 && optval <= max_msg_size) {
optval = max_msg_size;
optlen = sizeof(optval);
rc = setsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, &optval, optlen);
}
if (rc != 0) {
return -errno;
}
rc = getsockopt(sockfd, SOL_SOCKET, SO_RCVBUF, &optval, &optlen);
qb_util_log(LOG_TRACE, "%d: getsockopt(%d, SO_RCVBUF, needed:%d) actual:%d",
rc, sockfd, max_msg_size, optval);
/* Set the sockets receive buffer size to match the send buffer. On
* FreeBSD without this calls to sendto() will result in an ENOBUFS error
* if the message is larger than net.local.dgram.recvspace sysctl. */
if (rc == 0 && optval <= max_msg_size) {
optval = max_msg_size;
optlen = sizeof(optval);
rc = setsockopt(sockfd, SOL_SOCKET, SO_RCVBUF, &optval, optlen);
}
if (rc != 0) {
return -errno;
}
return rc;
}
static int32_t
dgram_verify_msg_size(size_t max_msg_size)
{
int32_t rc = -1;
int32_t sockets[2];
int32_t tries = 0;
int32_t write_passed = 0;
int32_t read_passed = 0;
char buf[max_msg_size];
if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets) < 0) {
qb_util_perror(LOG_DEBUG, "error calling socketpair()");
goto cleanup_socks;
}
if (set_sock_size(sockets[0], max_msg_size) != 0) {
qb_util_log(LOG_DEBUG, "error set_sock_size(sockets[0],%#x)",
max_msg_size);
goto cleanup_socks;
}
if (set_sock_size(sockets[1], max_msg_size) != 0) {
qb_util_log(LOG_DEBUG, "error set_sock_size(sockets[1],%#x)",
max_msg_size);
goto cleanup_socks;
}
for (tries = 0; tries < 3; tries++) {
if (write_passed == 0) {
rc = write(sockets[1], buf, max_msg_size);
if (rc < 0 && (errno == EAGAIN || errno == EINTR)) {
continue;
} else if (rc == max_msg_size) {
write_passed = 1;
} else {
break;
}
}
if (read_passed == 0) {
rc = read(sockets[0], buf, max_msg_size);
if (rc < 0 && (errno == EAGAIN || errno == EINTR)) {
continue;
} else if (rc == max_msg_size) {
read_passed = 1;
} else {
break;
}
}
if (read_passed && write_passed) {
rc = 0;
break;
}
}
cleanup_socks:
close(sockets[0]);
close(sockets[1]);
return rc;
}
int32_t
qb_ipcc_verify_dgram_max_msg_size(size_t max_msg_size)
{
int32_t i;
int32_t last = -1;
int32_t inc = 2048;
if (dgram_verify_msg_size(max_msg_size) == 0) {
return max_msg_size;
}
for (i = inc; i < max_msg_size; i+=inc) {
if (dgram_verify_msg_size(i) == 0) {
last = i;
} else if (inc >= 512) {
i-=inc;
inc = inc/2;
} else {
break;
}
}
return last;
}
/*
* bind to "base_name-local_name"
* connect to "base_name-remote_name"
* output sock_pt
*/
static int32_t
qb_ipc_dgram_sock_connect(const char *base_name,
const char *local_name,
const char *remote_name,
int32_t max_msg_size, int32_t * sock_pt, gid_t gid)
{
char sock_path[PATH_MAX];
struct sockaddr_un remote_address;
int32_t res = qb_ipc_dgram_sock_setup(base_name, local_name,
sock_pt, gid);
if (res < 0) {
return res;
}
snprintf(sock_path, PATH_MAX, "%s-%s", base_name, remote_name);
set_sock_addr(&remote_address, sock_path);
if (connect(*sock_pt, (struct sockaddr *)&remote_address,
QB_SUN_LEN(&remote_address)) == -1) {
res = -errno;
goto error_connect;
}
return set_sock_size(*sock_pt, max_msg_size);
error_connect:
close(*sock_pt);
*sock_pt = -1;
return res;
}
static int32_t
_finish_connecting(struct qb_ipc_one_way *one_way)
{
struct sockaddr_un remote_address;
int res;
int error;
int retry = 0;
set_sock_addr(&remote_address, one_way->u.us.sock_name);
/* this retry loop is here to help connecting when trying to send
* an event right after connection setup.
*/
do {
errno = 0;
res = connect(one_way->u.us.sock,
(struct sockaddr *)&remote_address,
QB_SUN_LEN(&remote_address));
if (res == -1) {
error = -errno;
qb_util_perror(LOG_DEBUG, "error calling connect()");
retry++;
usleep(100000);
}
} while (res == -1 && retry < 10);
if (res == -1) {
return error;
}
/* Beside disposing no longer needed value, this also signals that
we are done with connect-on-send arrangement at the server side
(i.e. for response and event channels). */
free(one_way->u.us.sock_name);
one_way->u.us.sock_name = NULL;
return set_sock_size(one_way->u.us.sock, one_way->max_msg_size);
}
/*
* client functions
* --------------------------------------------------------
*/
static void
qb_ipcc_us_disconnect(struct qb_ipcc_connection *c)
{
munmap(c->request.u.us.shared_data, SHM_CONTROL_SIZE);
unlink(c->request.u.us.shared_file_name);
if (use_filesystem_sockets()) {
struct sockaddr_un un_addr;
socklen_t un_addr_len = sizeof(struct sockaddr_un);
char *base_name;
char sock_name[PATH_MAX];
size_t length;
if (getsockname(c->response.u.us.sock, (struct sockaddr *)&un_addr, &un_addr_len) == 0) {
length = strlen(un_addr.sun_path);
base_name = strndup(un_addr.sun_path,
length - /* strlen("-response") */ 9);
qb_util_log(LOG_DEBUG, "unlinking socket bound files with base_name=%s length=%d",base_name,length);
snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"request");
qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
unlink(sock_name);
snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"event");
qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
unlink(sock_name);
snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"event-tx");
qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
unlink(sock_name);
snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"response");
qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
unlink(sock_name);
free(base_name);
}
}
/* Last-ditch attempt to tidy up after ourself */
remove_tempdir(c->request.u.us.shared_file_name);
qb_ipcc_us_sock_close(c->event.u.us.sock);
qb_ipcc_us_sock_close(c->request.u.us.sock);
qb_ipcc_us_sock_close(c->setup.u.us.sock);
}
static ssize_t
qb_ipc_socket_send(struct qb_ipc_one_way *one_way,
const void *msg_ptr, size_t msg_len)
{
ssize_t rc = 0;
struct ipc_us_control *ctl;
ctl = (struct ipc_us_control *)one_way->u.us.shared_data;
if (one_way->u.us.sock_name) {
rc = _finish_connecting(one_way);
if (rc < 0) {
qb_util_log(LOG_ERR, "socket connect-on-send");
return rc;
}
}
qb_sigpipe_ctl(QB_SIGPIPE_IGNORE);
rc = send(one_way->u.us.sock, msg_ptr, msg_len, MSG_NOSIGNAL);
if (rc == -1) {
rc = -errno;
if (errno != EAGAIN && errno != ENOBUFS) {
qb_util_perror(LOG_DEBUG, "socket_send:send");
}
}
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
if (ctl && rc == msg_len) {
qb_atomic_int_inc(&ctl->sent);
}
return rc;
}
static ssize_t
qb_ipc_socket_sendv(struct qb_ipc_one_way *one_way, const struct iovec *iov,
size_t iov_len)
{
int32_t rc;
struct ipc_us_control *ctl;
ctl = (struct ipc_us_control *)one_way->u.us.shared_data;
qb_sigpipe_ctl(QB_SIGPIPE_IGNORE);
if (one_way->u.us.sock_name) {
rc = _finish_connecting(one_way);
if (rc < 0) {
qb_util_perror(LOG_ERR, "socket connect-on-sendv");
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
return rc;
}
}
rc = writev(one_way->u.us.sock, iov, iov_len);
if (rc == -1) {
rc = -errno;
if (errno != EAGAIN && errno != ENOBUFS) {
qb_util_perror(LOG_DEBUG, "socket_sendv:writev %d",
one_way->u.us.sock);
}
}
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
if (ctl && rc > 0) {
qb_atomic_int_inc(&ctl->sent);
}
return rc;
}
/*
* recv a message of unknown size.
*/
static ssize_t
qb_ipc_us_recv_at_most(struct qb_ipc_one_way *one_way,
void *msg, size_t len, int32_t timeout)
{
int32_t result;
int32_t final_rc = 0;
int32_t to_recv = 0;
char *data = msg;
struct ipc_us_control *ctl = NULL;
int32_t time_waited = 0;
int32_t time_to_wait = timeout;
if (timeout == -1) {
time_to_wait = 1000;
}
qb_sigpipe_ctl(QB_SIGPIPE_IGNORE);
retry_peek:
result = recv(one_way->u.us.sock, data,
sizeof(struct qb_ipc_request_header),
MSG_NOSIGNAL | MSG_PEEK);
if (result == -1) {
if (errno != EAGAIN) {
final_rc = -errno;
if (use_filesystem_sockets()) {
if (errno == ECONNRESET || errno == EPIPE) {
final_rc = -ENOTCONN;
}
}
goto cleanup_sigpipe;
}
/* check to see if we have enough time left to try again */
if (time_waited < timeout || timeout == -1) {
result = qb_ipc_us_ready(one_way, NULL, time_to_wait, POLLIN);
if (qb_ipc_us_sock_error_is_disconnected(result)) {
final_rc = result;
goto cleanup_sigpipe;
}
time_waited += time_to_wait;
goto retry_peek;
} else if (time_waited >= timeout) {
final_rc = -ETIMEDOUT;
goto cleanup_sigpipe;
}
}
if (result >= sizeof(struct qb_ipc_request_header)) {
struct qb_ipc_request_header *hdr = NULL;
hdr = (struct qb_ipc_request_header *)msg;
to_recv = hdr->size;
}
result = recv(one_way->u.us.sock, data, to_recv,
MSG_NOSIGNAL | MSG_WAITALL);
if (result == -1) {
final_rc = -errno;
goto cleanup_sigpipe;
} else if (result == 0) {
qb_util_log(LOG_DEBUG, "recv == 0 -> ENOTCONN");
final_rc = -ENOTCONN;
goto cleanup_sigpipe;
}
final_rc = result;
ctl = (struct ipc_us_control *)one_way->u.us.shared_data;
if (ctl) {
(void)qb_atomic_int_dec_and_test(&ctl->sent);
}
cleanup_sigpipe:
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
return final_rc;
}
static void
qb_ipc_us_fc_set(struct qb_ipc_one_way *one_way, int32_t fc_enable)
{
struct ipc_us_control *ctl =
(struct ipc_us_control *)one_way->u.us.shared_data;
qb_util_log(LOG_TRACE, "setting fc to %d", fc_enable);
qb_atomic_int_set(&ctl->flow_control, fc_enable);
}
static int32_t
qb_ipc_us_fc_get(struct qb_ipc_one_way *one_way)
{
struct ipc_us_control *ctl =
(struct ipc_us_control *)one_way->u.us.shared_data;
return qb_atomic_int_get(&ctl->flow_control);
}
static ssize_t
qb_ipc_us_q_len_get(struct qb_ipc_one_way *one_way)
{
struct ipc_us_control *ctl =
(struct ipc_us_control *)one_way->u.us.shared_data;
return qb_atomic_int_get(&ctl->sent);
}
int32_t
qb_ipcc_us_connect(struct qb_ipcc_connection * c,
struct qb_ipc_connection_response * r)
{
int32_t res;
char path[PATH_MAX];
int32_t fd_hdr;
char *shm_ptr;
qb_atomic_init();
c->needs_sock_for_poll = QB_FALSE;
c->funcs.send = qb_ipc_socket_send;
c->funcs.sendv = qb_ipc_socket_sendv;
c->funcs.recv = qb_ipc_us_recv_at_most;
c->funcs.fc_get = qb_ipc_us_fc_get;
c->funcs.disconnect = qb_ipcc_us_disconnect;
fd_hdr = qb_sys_mmap_file_open(path, r->request,
SHM_CONTROL_SIZE, O_RDWR);
if (fd_hdr < 0) {
res = fd_hdr;
errno = -fd_hdr;
qb_util_perror(LOG_ERR, "couldn't open file for mmap");
return res;
}
(void)strlcpy(c->request.u.us.shared_file_name, r->request, NAME_MAX);
shm_ptr = mmap(0, SHM_CONTROL_SIZE,
PROT_READ | PROT_WRITE, MAP_SHARED, fd_hdr, 0);
if (shm_ptr == MAP_FAILED) {
res = -errno;
qb_util_perror(LOG_ERR, "couldn't create mmap for header");
goto cleanup_hdr;
}
c->request.u.us.shared_data = shm_ptr;
c->response.u.us.shared_data = shm_ptr + sizeof(struct ipc_us_control);
c->event.u.us.shared_data = shm_ptr + (2 * sizeof(struct ipc_us_control));
close(fd_hdr);
fd_hdr = -1;
res = qb_ipc_dgram_sock_connect(r->response, "response", "request",
r->max_msg_size, &c->request.u.us.sock, c->egid);
if (res != 0) {
goto cleanup_hdr;
}
c->response.u.us.sock = c->request.u.us.sock;
res = qb_ipc_dgram_sock_connect(r->response, "event", "event-tx",
r->max_msg_size, &c->event.u.us.sock, c->egid);
if (res != 0) {
goto cleanup_hdr;
}
return 0;
cleanup_hdr:
if (fd_hdr >= 0) {
close(fd_hdr);
}
close(c->event.u.us.sock);
close(c->request.u.us.sock);
unlink(r->request);
munmap(c->request.u.us.shared_data, SHM_CONTROL_SIZE);
return res;
}
/*
* service functions
* --------------------------------------------------------
*/
static int32_t
_sock_connection_liveliness(int32_t fd, int32_t revents, void *data)
{
struct qb_ipcs_connection *c = (struct qb_ipcs_connection *)data;
qb_util_log(LOG_DEBUG, "LIVENESS: fd %d event %d conn (%s)",
fd, revents, c->description);
if (revents & POLLNVAL) {
qb_util_log(LOG_DEBUG, "NVAL conn (%s)", c->description);
qb_ipcs_disconnect(c);
return -EINVAL;
}
if (revents & POLLHUP) {
qb_util_log(LOG_DEBUG, "HUP conn (%s)", c->description);
qb_ipcs_disconnect(c);
return -ESHUTDOWN;
}
/* If we actually get POLLIN for some reason here, it most
* certainly means EOF. Do a recv on the fd to detect eof and
* then disconnect */
if (revents & POLLIN) {
char buf[10];
int res;
res = recv(fd, buf, sizeof(buf), MSG_DONTWAIT);
if (res < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
res = -errno;
} else if (res == 0) {
qb_util_log(LOG_DEBUG, "EOF conn (%s)", c->description);
res = -ESHUTDOWN;
}
if (res < 0) {
qb_ipcs_disconnect(c);
return res;
}
}
return 0;
}
static int32_t
_sock_add_to_mainloop(struct qb_ipcs_connection *c)
{
int res;
res = c->service->poll_fns.dispatch_add(c->service->poll_priority,
c->request.u.us.sock,
POLLIN | POLLPRI | POLLNVAL,
c,
qb_ipcs_dispatch_connection_request);
if (res < 0) {
qb_util_log(LOG_ERR,
"Error adding socket to mainloop (%s).",
c->description);
return res;
}
res = c->service->poll_fns.dispatch_add(c->service->poll_priority,
c->setup.u.us.sock,
POLLIN | POLLPRI | POLLNVAL,
c, _sock_connection_liveliness);
qb_util_log(LOG_DEBUG, "added %d to poll loop (liveness)",
c->setup.u.us.sock);
if (res < 0) {
qb_util_perror(LOG_ERR, "Error adding setupfd to mainloop");
(void)c->service->poll_fns.dispatch_del(c->request.u.us.sock);
return res;
}
return res;
}
static void
_sock_rm_from_mainloop(struct qb_ipcs_connection *c)
{
(void)c->service->poll_fns.dispatch_del(c->request.u.us.sock);
(void)c->service->poll_fns.dispatch_del(c->setup.u.us.sock);
}
static void
qb_ipcs_us_disconnect(struct qb_ipcs_connection *c)
{
qb_enter();
if (c->state == QB_IPCS_CONNECTION_ESTABLISHED ||
c->state == QB_IPCS_CONNECTION_ACTIVE) {
_sock_rm_from_mainloop(c);
/* Free the temporaries denoting which respective socket
name on the client's side to connect upon the first
send operation -- normally the variable is free'd once
the connection is established but there may have been
no chance for that. */
free(c->response.u.us.sock_name);
c->response.u.us.sock_name = NULL;
free(c->event.u.us.sock_name);
c->event.u.us.sock_name = NULL;
if (use_filesystem_sockets()) {
struct sockaddr_un un_addr;
socklen_t un_addr_len = sizeof(struct sockaddr_un);
char *base_name;
char sock_name[PATH_MAX];
size_t length;
if (getsockname(c->request.u.us.sock, (struct sockaddr *)&un_addr, &un_addr_len) == 0) {
length = strlen(un_addr.sun_path);
base_name = strndup(un_addr.sun_path,
length - /* strlen("-request") */ 8);
qb_util_log(LOG_DEBUG, "unlinking socket bound files with base_name=%s length=%d",base_name,length);
snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"request");
qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
unlink(sock_name);
snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"event");
qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
unlink(sock_name);
snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"event-tx");
qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
unlink(sock_name);
snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"response");
qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
unlink(sock_name);
free(base_name);
}
}
qb_ipcc_us_sock_close(c->setup.u.us.sock);
qb_ipcc_us_sock_close(c->request.u.us.sock);
qb_ipcc_us_sock_close(c->event.u.us.sock);
}
if (c->state == QB_IPCS_CONNECTION_SHUTTING_DOWN ||
c->state == QB_IPCS_CONNECTION_ACTIVE) {
munmap(c->request.u.us.shared_data, SHM_CONTROL_SIZE);
unlink(c->request.u.us.shared_file_name);
}
remove_tempdir(c->description);
}
static int32_t
qb_ipcs_us_connect(struct qb_ipcs_service *s,
struct qb_ipcs_connection *c,
struct qb_ipc_connection_response *r)
{
char path[PATH_MAX];
int32_t fd_hdr;
int32_t res = 0;
struct ipc_us_control *ctl;
char *shm_ptr;
qb_util_log(LOG_DEBUG, "connecting to client (%s)", c->description);
c->request.u.us.sock = c->setup.u.us.sock;
c->response.u.us.sock = c->setup.u.us.sock;
snprintf(r->request, NAME_MAX, "%s-control-%s",
c->description, s->name);
snprintf(r->response, NAME_MAX, "%s-%s", c->description, s->name);
fd_hdr = qb_sys_mmap_file_open(path, r->request,
SHM_CONTROL_SIZE,
O_CREAT | O_TRUNC | O_RDWR | O_EXCL);
if (fd_hdr < 0) {
res = fd_hdr;
errno = -fd_hdr;
qb_util_perror(LOG_ERR, "couldn't create file for mmap (%s)",
c->description);
return res;
}
(void)strlcpy(r->request, path, PATH_MAX);
(void)strlcpy(c->request.u.us.shared_file_name, r->request, NAME_MAX);
res = chown(r->request, c->auth.uid, c->auth.gid);
if (res != 0) {
/* ignore res, this is just for the compiler warnings.
*/
res = 0;
}
res = chmod(r->request, c->auth.mode);
if (res != 0) {
/* ignore res, this is just for the compiler warnings.
*/
res = 0;
}
shm_ptr = mmap(0, SHM_CONTROL_SIZE,
PROT_READ | PROT_WRITE, MAP_SHARED, fd_hdr, 0);
if (shm_ptr == MAP_FAILED) {
res = -errno;
qb_util_perror(LOG_ERR, "couldn't create mmap for header (%s)",
c->description);
goto cleanup_hdr;
}
c->request.u.us.shared_data = shm_ptr;
c->response.u.us.shared_data = shm_ptr + sizeof(struct ipc_us_control);
c->event.u.us.shared_data = shm_ptr + (2 * sizeof(struct ipc_us_control));
ctl = (struct ipc_us_control *)c->request.u.us.shared_data;
ctl->sent = 0;
ctl->flow_control = 0;
ctl = (struct ipc_us_control *)c->response.u.us.shared_data;
ctl->sent = 0;
ctl->flow_control = 0;
ctl = (struct ipc_us_control *)c->event.u.us.shared_data;
ctl->sent = 0;
ctl->flow_control = 0;
close(fd_hdr);
fd_hdr = -1;
/* request channel */
res = qb_ipc_dgram_sock_setup(r->response, "request",
&c->request.u.us.sock, c->egid);
if (res < 0) {
goto cleanup_hdr;
}
res = set_sock_size(c->request.u.us.sock, c->request.max_msg_size);
if (res != 0) {
goto cleanup_hdr;
}
c->setup.u.us.sock_name = NULL;
c->request.u.us.sock_name = NULL;
/* response channel */
c->response.u.us.sock = c->request.u.us.sock;
snprintf(path, PATH_MAX, "%s-%s", r->response, "response");
c->response.u.us.sock_name = strdup(path);
/* event channel */
res = qb_ipc_dgram_sock_setup(r->response, "event-tx",
&c->event.u.us.sock, c->egid);
if (res < 0) {
goto cleanup_hdr;
}
res = set_sock_size(c->event.u.us.sock, c->event.max_msg_size);
if (res != 0) {
goto cleanup_hdr;
}
snprintf(path, PATH_MAX, "%s-%s", r->response, "event");
c->event.u.us.sock_name = strdup(path);
res = _sock_add_to_mainloop(c);
if (res < 0) {
goto cleanup_hdr;
}
return res;
cleanup_hdr:
free(c->response.u.us.sock_name);
free(c->event.u.us.sock_name);
if (fd_hdr >= 0) {
close(fd_hdr);
}
unlink(r->request);
munmap(c->request.u.us.shared_data, SHM_CONTROL_SIZE);
return res;
}
void
qb_ipcs_us_init(struct qb_ipcs_service *s)
{
s->funcs.connect = qb_ipcs_us_connect;
s->funcs.disconnect = qb_ipcs_us_disconnect;
s->funcs.recv = qb_ipc_us_recv_at_most;
s->funcs.peek = NULL;
s->funcs.reclaim = NULL;
s->funcs.send = qb_ipc_socket_send;
s->funcs.sendv = qb_ipc_socket_sendv;
s->funcs.fc_set = qb_ipc_us_fc_set;
s->funcs.q_len_get = qb_ipc_us_q_len_get;
s->needs_sock_for_poll = QB_FALSE;
qb_atomic_init();
}
diff --git a/tests/check_loop.c b/tests/check_loop.c
index c017c2c..2bea159 100644
--- a/tests/check_loop.c
+++ b/tests/check_loop.c
@@ -1,782 +1,782 @@
/*
* Copyright (c) 2010 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os_base.h"
#include "check_common.h"
#include <qb/qbdefs.h>
#include <qb/qbutil.h>
#include <qb/qbloop.h>
#include <qb/qblog.h>
static int32_t job_1_run_count = 0;
static int32_t job_2_run_count = 0;
static int32_t job_3_run_count = 0;
static int32_t job_order_1 = 1;
static int32_t job_order_2 = 2;
static int32_t job_order_3 = 3;
static int32_t job_order_4 = 4;
static int32_t job_order_5 = 5;
static int32_t job_order_6 = 6;
static int32_t job_order_7 = 7;
static int32_t job_order_8 = 8;
static int32_t job_order_9 = 9;
static int32_t job_order_10 = 10;
static int32_t job_order_11 = 11;
static int32_t job_order_12 = 12;
static int32_t job_order_13 = 13;
static void job_1(void *data)
{
job_1_run_count++;
}
static void job_order_check(void *data)
{
int32_t * order = (int32_t *)data;
job_1_run_count++;
ck_assert_int_eq(job_1_run_count, *order);
if (job_1_run_count == 1) {
qb_loop_job_add(NULL, QB_LOOP_MED, &job_order_10, job_order_check);
qb_loop_job_add(NULL, QB_LOOP_MED, &job_order_11, job_order_check);
qb_loop_job_add(NULL, QB_LOOP_MED, &job_order_12, job_order_check);
qb_loop_job_add(NULL, QB_LOOP_MED, &job_order_13, job_order_check);
} else if (job_1_run_count >= 13) {
qb_loop_stop(NULL);
}
}
static void job_stop(void *data)
{
qb_loop_t *l = (qb_loop_t *)data;
job_3_run_count++;
qb_loop_stop(l);
}
static void job_2(void *data)
{
int32_t res;
qb_loop_t *l = (qb_loop_t *)data;
job_2_run_count++;
res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_stop);
ck_assert_int_eq(res, 0);
}
static void job_1_r(void *data)
{
int32_t res;
qb_loop_t *l = (qb_loop_t *)data;
job_1_run_count++;
res = qb_loop_job_add(l, QB_LOOP_MED, data, job_2);
ck_assert_int_eq(res, 0);
}
static void job_1_add_nuts(void *data)
{
int32_t res;
qb_loop_t *l = (qb_loop_t *)data;
job_1_run_count++;
res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_MED, data, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_MED, data, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_MED, data, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_MED, data, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_LOW, data, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_LOW, data, job_1);
ck_assert_int_eq(res, 0);
if (job_1_run_count < 500) {
res = qb_loop_job_add(l, QB_LOOP_LOW, data, job_1_add_nuts);
ck_assert_int_eq(res, 0);
} else {
res = qb_loop_job_add(l, QB_LOOP_LOW, data, job_stop);
ck_assert_int_eq(res, 0);
}
ck_assert_int_eq(res, 0);
}
START_TEST(test_loop_job_input)
{
int32_t res;
qb_loop_t *l;
res = qb_loop_job_add(NULL, QB_LOOP_LOW, NULL, job_2);
ck_assert_int_eq(res, -EINVAL);
l = qb_loop_create();
ck_assert(l != NULL);
res = qb_loop_job_add(NULL, QB_LOOP_LOW, NULL, job_2);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, 89, NULL, job_2);
ck_assert_int_eq(res, -EINVAL);
res = qb_loop_job_add(l, QB_LOOP_LOW, NULL, NULL);
ck_assert_int_eq(res, -EINVAL);
qb_loop_destroy(l);
}
END_TEST
START_TEST(test_loop_job_1)
{
int32_t res;
qb_loop_t *l = qb_loop_create();
ck_assert(l != NULL);
res = qb_loop_job_add(l, QB_LOOP_LOW, NULL, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_LOW, l, job_stop);
ck_assert_int_eq(res, 0);
qb_loop_run(l);
ck_assert_int_eq(job_1_run_count, 1);
qb_loop_destroy(l);
}
END_TEST
START_TEST(test_loop_job_4)
{
int32_t res;
qb_loop_t *l = qb_loop_create();
ck_assert(l != NULL);
res = qb_loop_job_add(l, QB_LOOP_LOW, l, job_1_r);
ck_assert_int_eq(res, 0);
qb_loop_run(l);
ck_assert_int_eq(job_1_run_count, 1);
ck_assert_int_eq(job_2_run_count, 1);
ck_assert_int_eq(job_3_run_count, 1);
qb_loop_destroy(l);
}
END_TEST
START_TEST(test_loop_job_nuts)
{
int32_t res;
qb_loop_t *l = qb_loop_create();
ck_assert(l != NULL);
res = qb_loop_job_add(l, QB_LOOP_LOW, l, job_1_add_nuts);
ck_assert_int_eq(res, 0);
qb_loop_run(l);
ck_assert(job_1_run_count >= 500);
qb_loop_destroy(l);
}
END_TEST
START_TEST(test_loop_job_order)
{
int32_t res;
qb_loop_t *l = qb_loop_create();
ck_assert(l != NULL);
job_1_run_count = 0;
res = qb_loop_job_add(l, QB_LOOP_MED, &job_order_1, job_order_check);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_MED, &job_order_2, job_order_check);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_MED, &job_order_3, job_order_check);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_MED, &job_order_4, job_order_check);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_MED, &job_order_5, job_order_check);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_MED, &job_order_6, job_order_check);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_MED, &job_order_7, job_order_check);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_MED, &job_order_8, job_order_check);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_MED, &job_order_9, job_order_check);
ck_assert_int_eq(res, 0);
qb_loop_run(l);
qb_loop_destroy(l);
}
END_TEST
static qb_util_stopwatch_t *rl_sw;
#define RATE_LIMIT_RUNTIME_SEC 3
static void job_add_self(void *data)
{
int32_t res;
uint64_t elapsed1;
qb_loop_t *l = (qb_loop_t *)data;
job_1_run_count++;
qb_util_stopwatch_stop(rl_sw);
elapsed1 = qb_util_stopwatch_us_elapsed_get(rl_sw);
if (elapsed1 > (RATE_LIMIT_RUNTIME_SEC * QB_TIME_US_IN_SEC)) {
/* run for 3 seconds */
qb_loop_stop(l);
return;
}
res = qb_loop_job_add(l, QB_LOOP_MED, data, job_add_self);
ck_assert_int_eq(res, 0);
}
START_TEST(test_job_rate_limit)
{
int32_t res;
qb_loop_t *l = qb_loop_create();
ck_assert(l != NULL);
rl_sw = qb_util_stopwatch_create();
ck_assert(rl_sw != NULL);
qb_util_stopwatch_start(rl_sw);
res = qb_loop_job_add(l, QB_LOOP_MED, l, job_add_self);
ck_assert_int_eq(res, 0);
qb_loop_run(l);
/*
* the test is to confirm that a single job does not run away
* and cause cpu spin. We are going to say that a spin is more than
* one job per 50ms if there is only one job pending in the loop.
*/
_ck_assert_int(job_1_run_count, <, (RATE_LIMIT_RUNTIME_SEC * (QB_TIME_MS_IN_SEC/50)) + 10);
qb_loop_destroy(l);
qb_util_stopwatch_free(rl_sw);
}
END_TEST
static void job_stop_and_del_1(void *data)
{
int32_t res;
qb_loop_t *l = (qb_loop_t *)data;
job_3_run_count++;
res = qb_loop_job_del(l, QB_LOOP_MED, l, job_1);
ck_assert_int_eq(res, 0);
qb_loop_stop(l);
}
START_TEST(test_job_add_del)
{
int32_t res;
qb_loop_t *l = qb_loop_create();
ck_assert(l != NULL);
res = qb_loop_job_add(l, QB_LOOP_MED, l, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_del(l, QB_LOOP_MED, l, job_1);
ck_assert_int_eq(res, 0);
job_1_run_count = 0;
job_3_run_count = 0;
res = qb_loop_job_add(l, QB_LOOP_MED, l, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_HIGH, l, job_stop_and_del_1);
ck_assert_int_eq(res, 0);
qb_loop_run(l);
ck_assert_int_eq(job_1_run_count, 0);
ck_assert_int_eq(job_3_run_count, 1);
qb_loop_destroy(l);
}
END_TEST
static Suite *loop_job_suite(void)
{
TCase *tc;
Suite *s = suite_create("loop_job");
add_tcase(s, tc, test_loop_job_input);
add_tcase(s, tc, test_loop_job_1);
add_tcase(s, tc, test_loop_job_4);
add_tcase(s, tc, test_loop_job_nuts, 5);
add_tcase(s, tc, test_job_rate_limit, 5);
add_tcase(s, tc, test_job_add_del);
add_tcase(s, tc, test_loop_job_order);
return s;
}
/*
* -----------------------------------------------------------------------
* Timers
*/
static qb_loop_timer_handle test_th;
static qb_loop_timer_handle test_th2;
static void check_time_left(void *data)
{
qb_loop_t *l = (qb_loop_t *)data;
/* NOTE: We are checking the 'stop_loop' timer here, not our own */
uint64_t abs_time = qb_loop_timer_expire_time_get(l, test_th);
uint64_t rel_time = qb_loop_timer_expire_time_remaining(l, test_th);
ck_assert(abs_time > 0ULL);
ck_assert(rel_time > 0ULL);
ck_assert(abs_time > rel_time);
ck_assert(rel_time <= 60*QB_TIME_NS_IN_MSEC);
}
START_TEST(test_loop_timer_input)
{
int32_t res;
qb_loop_t *l;
res = qb_loop_timer_add(NULL, QB_LOOP_LOW, 5*QB_TIME_NS_IN_MSEC, NULL, job_2, &test_th);
ck_assert_int_eq(res, -EINVAL);
l = qb_loop_create();
ck_assert(l != NULL);
res = qb_loop_timer_add(NULL, QB_LOOP_LOW, 5*QB_TIME_NS_IN_MSEC, NULL, job_2, &test_th);
ck_assert_int_eq(res, 0);
res = qb_loop_timer_add(l, QB_LOOP_LOW, 5*QB_TIME_NS_IN_MSEC, l, NULL, &test_th);
ck_assert_int_eq(res, -EINVAL);
qb_loop_destroy(l);
}
END_TEST
static void one_shot_tmo(void * data)
{
static int32_t been_here = QB_FALSE;
ck_assert_int_eq(been_here, QB_FALSE);
been_here = QB_TRUE;
}
static qb_loop_timer_handle reset_th;
static int32_t reset_timer_step = 0;
static void reset_one_shot_tmo(void*data)
{
int32_t res;
qb_loop_t *l = data;
if (reset_timer_step == 0) {
res = qb_loop_timer_del(l, reset_th);
ck_assert_int_eq(res, -EINVAL);
res = qb_loop_timer_is_running(l, reset_th);
ck_assert_int_eq(res, QB_FALSE);
res = qb_loop_timer_add(l, QB_LOOP_LOW, 8*QB_TIME_NS_IN_MSEC, l, reset_one_shot_tmo, &reset_th);
ck_assert_int_eq(res, 0);
}
reset_timer_step++;
}
START_TEST(test_loop_timer_basic)
{
int32_t res;
qb_loop_t *l = qb_loop_create();
ck_assert(l != NULL);
res = qb_loop_timer_add(l, QB_LOOP_LOW, 5*QB_TIME_NS_IN_MSEC, l, one_shot_tmo, &test_th);
ck_assert_int_eq(res, 0);
res = qb_loop_timer_is_running(l, test_th);
ck_assert_int_eq(res, QB_TRUE);
res = qb_loop_timer_add(l, QB_LOOP_LOW, 7*QB_TIME_NS_IN_MSEC, l, reset_one_shot_tmo, &reset_th);
ck_assert_int_eq(res, 0);
res = qb_loop_timer_add(l, QB_LOOP_HIGH, 20*QB_TIME_NS_IN_MSEC, l, check_time_left, &test_th2);
ck_assert_int_eq(res, 0);
res = qb_loop_timer_add(l, QB_LOOP_LOW, 60*QB_TIME_NS_IN_MSEC, l, job_stop, &test_th);
ck_assert_int_eq(res, 0);
qb_loop_run(l);
ck_assert_int_eq(reset_timer_step, 2);
qb_loop_destroy(l);
}
END_TEST
struct qb_stop_watch {
uint64_t start;
uint64_t end;
qb_loop_t *l;
uint64_t ns_timer;
int64_t total;
int32_t count;
int32_t killer;
qb_loop_timer_handle th;
};
static void stop_watch_tmo(void*data)
{
struct qb_stop_watch *sw = (struct qb_stop_watch *)data;
float per;
int64_t diff;
sw->end = qb_util_nano_current_get();
diff = sw->end - sw->start;
if (diff < sw->ns_timer) {
printf("timer expired early! by %"PRIi64"\n", (int64_t)(sw->ns_timer - diff));
}
ck_assert(diff >= sw->ns_timer);
sw->total += diff;
sw->total -= sw->ns_timer;
sw->start = sw->end;
sw->count++;
if (sw->count < 50) {
qb_loop_timer_add(sw->l, QB_LOOP_LOW, sw->ns_timer, data, stop_watch_tmo, &sw->th);
} else {
per = ((sw->total * 100) / sw->count) / (float)sw->ns_timer;
printf("average error for %"PRIu64" ns timer is %"PRIi64" (ns) (%f)\n",
sw->ns_timer,
- sw->total/sw->count, per);
+ (int64_t)(sw->total/sw->count), per);
if (sw->killer) {
qb_loop_stop(sw->l);
}
}
}
static void start_timer(qb_loop_t *l, struct qb_stop_watch *sw, uint64_t timeout, int32_t killer)
{
int32_t res;
sw->l = l;
sw->count = 0;
sw->total = 0;
sw->killer = killer;
sw->ns_timer = timeout;
sw->start = qb_util_nano_current_get();
res = qb_loop_timer_add(sw->l, QB_LOOP_LOW, sw->ns_timer, sw, stop_watch_tmo, &sw->th);
ck_assert_int_eq(res, 0);
}
START_TEST(test_loop_timer_precision)
{
int32_t i;
uint64_t tmo;
struct qb_stop_watch sw[11];
qb_loop_t *l = qb_loop_create();
ck_assert(l != NULL);
for (i = 0; i < 10; i++) {
tmo = ((1 + i * 9) * QB_TIME_NS_IN_MSEC) + 500000;
start_timer(l, &sw[i], tmo, QB_FALSE);
}
start_timer(l, &sw[i], 100 * QB_TIME_NS_IN_MSEC, QB_TRUE);
qb_loop_run(l);
qb_loop_destroy(l);
}
END_TEST
static int expire_leak_counter = 0;
#define EXPIRE_NUM_RUNS 10
static int expire_leak_runs = 0;
static void empty_func_tmo(void*data)
{
expire_leak_counter++;
}
static void stop_func_tmo(void*data)
{
qb_loop_t *l = (qb_loop_t *)data;
qb_log(LOG_DEBUG, "expire_leak_counter:%d", expire_leak_counter);
qb_loop_stop(l);
}
static void next_func_tmo(void*data)
{
qb_loop_t *l = (qb_loop_t *)data;
int32_t i;
uint64_t tmo;
uint64_t max_tmo = 0;
qb_loop_timer_handle th;
qb_log(LOG_DEBUG, "expire_leak_counter:%d", expire_leak_counter);
for (i = 0; i < 300; i++) {
tmo = ((1 + i) * QB_TIME_NS_IN_MSEC) + 500000;
qb_loop_timer_add(l, QB_LOOP_LOW, tmo, NULL, empty_func_tmo, &th);
qb_loop_timer_add(l, QB_LOOP_MED, tmo, NULL, empty_func_tmo, &th);
qb_loop_timer_add(l, QB_LOOP_HIGH, tmo, NULL, empty_func_tmo, &th);
max_tmo = QB_MAX(max_tmo, tmo);
}
expire_leak_runs++;
if (expire_leak_runs == EXPIRE_NUM_RUNS) {
qb_loop_timer_add(l, QB_LOOP_LOW, max_tmo, l, stop_func_tmo, &th);
} else {
qb_loop_timer_add(l, QB_LOOP_LOW, max_tmo, l, next_func_tmo, &th);
}
}
/*
* make sure that file descriptors don't get leaked with no qb_loop_timer_del()
*/
START_TEST(test_loop_timer_expire_leak)
{
int32_t i;
uint64_t tmo;
uint64_t max_tmo = 0;
qb_loop_timer_handle th;
qb_loop_t *l = qb_loop_create();
ck_assert(l != NULL);
expire_leak_counter = 0;
for (i = 0; i < 300; i++) {
tmo = ((1 + i) * QB_TIME_NS_IN_MSEC) + 500000;
qb_loop_timer_add(l, QB_LOOP_LOW, tmo, NULL, empty_func_tmo, &th);
qb_loop_timer_add(l, QB_LOOP_MED, tmo, NULL, empty_func_tmo, &th);
qb_loop_timer_add(l, QB_LOOP_HIGH, tmo, NULL, empty_func_tmo, &th);
max_tmo = QB_MAX(max_tmo, tmo);
}
qb_loop_timer_add(l, QB_LOOP_LOW, max_tmo, l, next_func_tmo, &th);
expire_leak_runs = 1;
qb_loop_run(l);
ck_assert_int_eq(expire_leak_counter, 300*3* EXPIRE_NUM_RUNS);
qb_loop_destroy(l);
}
END_TEST
static int received_signum = 0;
static int received_sigs = 0;
static int32_t
sig_handler(int32_t rsignal, void *data)
{
qb_loop_t *l = (qb_loop_t *)data;
qb_log(LOG_DEBUG, "caught signal %d", rsignal);
received_signum = rsignal;
received_sigs++;
qb_loop_job_add(l, QB_LOOP_LOW, NULL, job_stop);
return 0;
}
START_TEST(test_loop_sig_handling)
{
qb_loop_signal_handle handle;
qb_loop_t *l = qb_loop_create();
ck_assert(l != NULL);
qb_loop_signal_add(l, QB_LOOP_HIGH, SIGINT,
l, sig_handler, &handle);
qb_loop_signal_add(l, QB_LOOP_HIGH, SIGTERM,
l, sig_handler, &handle);
qb_loop_signal_add(l, QB_LOOP_HIGH, SIGQUIT,
l, sig_handler, &handle);
kill(getpid(), SIGINT);
qb_loop_run(l);
ck_assert_int_eq(received_signum, SIGINT);
kill(getpid(), SIGQUIT);
qb_loop_run(l);
ck_assert_int_eq(received_signum, SIGQUIT);
qb_loop_destroy(l);
}
END_TEST
/* Globals for this test only */
static int our_signal_called = 0;
static qb_loop_t *this_l;
static void handle_nonqb_signal(int num)
{
our_signal_called = 1;
qb_loop_job_add(this_l, QB_LOOP_LOW, NULL, job_stop);
}
START_TEST(test_loop_dont_override_other_signals)
{
qb_loop_signal_handle handle;
this_l = qb_loop_create();
ck_assert(this_l != NULL);
signal(SIGUSR1, handle_nonqb_signal);
qb_loop_signal_add(this_l, QB_LOOP_HIGH, SIGINT,
this_l, sig_handler, &handle);
kill(getpid(), SIGUSR1);
qb_loop_run(this_l);
ck_assert_int_eq(our_signal_called, 1);
qb_loop_destroy(this_l);
}
END_TEST
START_TEST(test_loop_sig_only_get_one)
{
int res;
qb_loop_signal_handle handle;
qb_loop_t *l = qb_loop_create();
ck_assert(l != NULL);
/* make sure we only get one call to the handler
* don't assume we are going to exit the loop.
*/
received_sigs = 0;
qb_loop_signal_add(l, QB_LOOP_LOW, SIGINT,
l, sig_handler, &handle);
res = qb_loop_job_add(l, QB_LOOP_MED, NULL, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_HIGH, NULL, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_MED, NULL, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_HIGH, NULL, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_HIGH, NULL, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_MED, NULL, job_1);
ck_assert_int_eq(res, 0);
kill(getpid(), SIGINT);
qb_loop_run(l);
ck_assert_int_eq(received_signum, SIGINT);
ck_assert_int_eq(received_sigs, 1);
qb_loop_destroy(l);
}
END_TEST
static qb_loop_signal_handle sig_hdl;
static void
job_rm_sig_handler(void *data)
{
int res;
qb_loop_t *l = (qb_loop_t *)data;
res = qb_loop_signal_del(l, sig_hdl);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_LOW, NULL, job_stop);
ck_assert_int_eq(res, 0);
}
START_TEST(test_loop_sig_delete)
{
int res;
qb_loop_t *l = qb_loop_create();
ck_assert(l != NULL);
/* make sure we can remove a signal job from the job queue.
*/
received_sigs = 0;
received_signum = 0;
res = qb_loop_signal_add(l, QB_LOOP_MED, SIGINT,
l, sig_handler, &sig_hdl);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_HIGH, NULL,
job_rm_sig_handler);
ck_assert_int_eq(res, 0);
kill(getpid(), SIGINT);
qb_loop_run(l);
ck_assert_int_eq(received_sigs, 0);
ck_assert_int_eq(received_signum, 0);
qb_loop_destroy(l);
}
END_TEST
static Suite *
loop_timer_suite(void)
{
TCase *tc;
Suite *s = suite_create("loop_timers");
add_tcase(s, tc, test_loop_timer_input);
add_tcase(s, tc, test_loop_timer_basic, 30);
add_tcase(s, tc, test_loop_timer_precision, 30);
add_tcase(s, tc, test_loop_timer_expire_leak, 30);
return s;
}
static Suite *
loop_signal_suite(void)
{
TCase *tc;
Suite *s = suite_create("loop_signal_suite");
add_tcase(s, tc, test_loop_sig_handling, 10);
add_tcase(s, tc, test_loop_sig_only_get_one);
add_tcase(s, tc, test_loop_sig_delete);
add_tcase(s, tc, test_loop_dont_override_other_signals);
return s;
}
int32_t
main(void)
{
int32_t number_failed;
SRunner *sr = srunner_create(loop_job_suite());
srunner_add_suite (sr, loop_timer_suite());
srunner_add_suite (sr, loop_signal_suite());
qb_log_init("check", LOG_USER, LOG_EMERG);
atexit(qb_log_fini);
qb_log_ctl(QB_LOG_SYSLOG, QB_LOG_CONF_ENABLED, QB_FALSE);
qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD,
QB_LOG_FILTER_FILE, "*", LOG_INFO);
qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE);
srunner_run_all(sr, CK_VERBOSE);
number_failed = srunner_ntests_failed(sr);
srunner_free(sr);
return (number_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE;
}

File Metadata

Mime Type
text/x-diff
Expires
Tue, Jul 8, 6:21 PM (17 h, 15 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2002590
Default Alt Text
(66 KB)

Event Timeline