Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F4624415
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
66 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/lib/ipc_setup.c b/lib/ipc_setup.c
index f5dec9f..c144a5e 100644
--- a/lib/ipc_setup.c
+++ b/lib/ipc_setup.c
@@ -1,934 +1,934 @@
/*
* Copyright (C) 2010,2013 Red Hat, Inc.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os_base.h"
#include <poll.h>
#if defined(HAVE_GETPEERUCRED)
#include <ucred.h>
#endif
#ifdef HAVE_SYS_UN_H
#include <sys/un.h>
#endif /* HAVE_SYS_UN_H */
#ifdef HAVE_SYS_STAT_H
#include <sys/stat.h>
#endif
#ifdef HAVE_SYS_MMAN_H
#include <sys/mman.h>
#endif
#include <qb/qbatomic.h>
#include <qb/qbipcs.h>
#include <qb/qbloop.h>
#include <qb/qbdefs.h>
#include "util_int.h"
#include "ipc_int.h"
struct ipc_auth_ugp {
uid_t uid;
gid_t gid;
pid_t pid;
};
struct ipc_auth_data {
int32_t sock;
struct qb_ipcs_service *s;
union {
struct qb_ipc_connection_request req;
struct qb_ipc_connection_response res;
} msg;
struct msghdr msg_recv;
struct iovec iov_recv;
struct ipc_auth_ugp ugp;
size_t processed;
size_t len;
#ifdef SO_PASSCRED
char *cmsg_cred;
#endif
};
static int32_t qb_ipcs_us_connection_acceptor(int fd, int revent, void *data);
ssize_t
qb_ipc_us_send(struct qb_ipc_one_way *one_way, const void *msg, size_t len)
{
int32_t result;
int32_t processed = 0;
char *rbuf = (char *)msg;
qb_sigpipe_ctl(QB_SIGPIPE_IGNORE);
retry_send:
result = send(one_way->u.us.sock,
&rbuf[processed], len - processed, MSG_NOSIGNAL);
if (result == -1) {
if (errno == EAGAIN && processed > 0) {
goto retry_send;
} else {
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
return -errno;
}
}
processed += result;
if (processed != len) {
goto retry_send;
}
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
return processed;
}
static ssize_t
qb_ipc_us_recv_msghdr(struct ipc_auth_data *data)
{
char *msg = (char *) &data->msg;
int32_t result;
qb_sigpipe_ctl(QB_SIGPIPE_IGNORE);
retry_recv:
data->msg_recv.msg_iov->iov_base = &msg[data->processed];
data->msg_recv.msg_iov->iov_len = data->len - data->processed;
result = recvmsg(data->sock, &data->msg_recv, MSG_NOSIGNAL | MSG_WAITALL);
if (result == -1 && errno == EAGAIN) {
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
return -EAGAIN;
}
if (result == -1) {
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
return -errno;
}
if (result == 0) {
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
qb_util_log(LOG_DEBUG,
"recv(fd %d) got 0 bytes assuming ENOTCONN", data->sock);
return -ENOTCONN;
}
data->processed += result;
if (data->processed != data->len) {
goto retry_recv;
}
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
assert(data->processed == data->len);
return data->processed;
}
int32_t
qb_ipc_us_sock_error_is_disconnected(int err)
{
if (err >= 0) {
return QB_FALSE;
} else if (err == -EAGAIN ||
err == -ETIMEDOUT ||
err == -EINTR ||
#ifdef EWOULDBLOCK
err == -EWOULDBLOCK ||
#endif
err == -EMSGSIZE ||
err == -ENOMSG ||
err == -EINVAL) {
return QB_FALSE;
}
return QB_TRUE;
}
int32_t
qb_ipc_us_ready(struct qb_ipc_one_way * ow_data,
struct qb_ipc_one_way * ow_conn,
int32_t ms_timeout, int32_t events)
{
struct pollfd ufds[2];
int32_t poll_events;
int numfds = 1;
int i;
ufds[0].fd = ow_data->u.us.sock;
ufds[0].events = events;
ufds[0].revents = 0;
if (ow_conn && ow_data != ow_conn) {
numfds++;
ufds[1].fd = ow_conn->u.us.sock;
ufds[1].events = POLLIN;
ufds[1].revents = 0;
}
poll_events = poll(ufds, numfds, ms_timeout);
if ((poll_events == -1 && errno == EINTR) || poll_events == 0) {
return -EAGAIN;
} else if (poll_events == -1) {
return -errno;
}
for (i = 0; i < poll_events; i++) {
if (ufds[i].revents & POLLERR) {
qb_util_log(LOG_DEBUG, "poll(fd %d) got POLLERR",
ufds[i].fd);
return -ENOTCONN;
} else if (ufds[i].revents & POLLHUP) {
qb_util_log(LOG_DEBUG, "poll(fd %d) got POLLHUP",
ufds[i].fd);
return -ENOTCONN;
} else if (ufds[i].revents & POLLNVAL) {
qb_util_log(LOG_DEBUG, "poll(fd %d) got POLLNVAL",
ufds[i].fd);
return -ENOTCONN;
} else if (ufds[i].revents == 0) {
qb_util_log(LOG_DEBUG, "poll(fd %d) zero revents",
ufds[i].fd);
return -ENOTCONN;
}
}
return 0;
}
/*
* recv an entire message - and try hard to get all of it.
*/
ssize_t
qb_ipc_us_recv(struct qb_ipc_one_way * one_way,
void *msg, size_t len, int32_t timeout)
{
int32_t result;
int32_t final_rc = 0;
int32_t processed = 0;
int32_t to_recv = len;
char *data = msg;
qb_sigpipe_ctl(QB_SIGPIPE_IGNORE);
retry_recv:
result = recv(one_way->u.us.sock, &data[processed], to_recv,
MSG_NOSIGNAL | MSG_WAITALL);
if (result == -1) {
if (errno == EAGAIN && (processed > 0 || timeout == -1)) {
result = qb_ipc_us_ready(one_way, NULL, timeout, POLLIN);
if (result == 0 || result == -EAGAIN) {
goto retry_recv;
}
final_rc = result;
goto cleanup_sigpipe;
} else if (errno == ECONNRESET || errno == EPIPE) {
final_rc = -ENOTCONN;
goto cleanup_sigpipe;
} else {
final_rc = -errno;
goto cleanup_sigpipe;
}
}
if (result == 0) {
final_rc = -ENOTCONN;
goto cleanup_sigpipe;
}
processed += result;
to_recv -= result;
if (processed != len) {
goto retry_recv;
}
final_rc = processed;
cleanup_sigpipe:
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
return final_rc;
}
static int32_t
qb_ipcc_stream_sock_connect(const char *socket_name, int32_t * sock_pt)
{
int32_t request_fd;
struct sockaddr_un address;
int32_t res = 0;
request_fd = socket(PF_UNIX, SOCK_STREAM, 0);
if (request_fd == -1) {
return -errno;
}
qb_socket_nosigpipe(request_fd);
res = qb_sys_fd_nonblock_cloexec_set(request_fd);
if (res < 0) {
goto error_connect;
}
memset(&address, 0, sizeof(struct sockaddr_un));
address.sun_family = AF_UNIX;
#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
address.sun_len = QB_SUN_LEN(&address);
#endif
if (!use_filesystem_sockets()) {
snprintf(address.sun_path + 1, UNIX_PATH_MAX - 1, "%s", socket_name);
} else {
snprintf(address.sun_path, sizeof(address.sun_path), "%s/%s", SOCKETDIR,
socket_name);
}
if (connect(request_fd, (struct sockaddr *)&address,
QB_SUN_LEN(&address)) == -1) {
res = -errno;
goto error_connect;
}
*sock_pt = request_fd;
return 0;
error_connect:
close(request_fd);
*sock_pt = -1;
return res;
}
void
qb_ipcc_us_sock_close(int32_t sock)
{
shutdown(sock, SHUT_RDWR);
close(sock);
}
static int32_t
qb_ipc_auth_creds(struct ipc_auth_data *data)
{
int32_t res = 0;
/*
* currently support getpeerucred, getpeereid, and SO_PASSCRED credential
* retrieval mechanisms for various Platforms
*/
#ifdef HAVE_GETPEERUCRED
/*
* Solaris and some BSD systems
*/
{
ucred_t *uc = NULL;
if (getpeerucred(data->sock, &uc) == 0) {
res = 0;
data->ugp.uid = ucred_geteuid(uc);
data->ugp.gid = ucred_getegid(uc);
data->ugp.pid = ucred_getpid(uc);
ucred_free(uc);
} else {
res = -errno;
}
}
#elif defined(HAVE_GETPEEREID)
/*
* Usually MacOSX systems
*/
{
/*
* TODO get the peer's pid.
* c->pid = ?;
*/
if (getpeereid(data->sock, &data->ugp.uid, &data->ugp.gid) == 0) {
res = 0;
} else {
res = -errno;
}
}
#elif defined(SO_PASSCRED)
/*
* Usually Linux systems
*/
{
struct ucred cred;
struct cmsghdr *cmsg;
res = -EINVAL;
for (cmsg = CMSG_FIRSTHDR(&data->msg_recv); cmsg != NULL;
cmsg = CMSG_NXTHDR(&data->msg_recv, cmsg)) {
if (cmsg->cmsg_type != SCM_CREDENTIALS)
continue;
memcpy(&cred, CMSG_DATA(cmsg), sizeof(struct ucred));
res = 0;
data->ugp.pid = cred.pid;
data->ugp.uid = cred.uid;
data->ugp.gid = cred.gid;
break;
}
}
#else /* no credentials */
data->ugp.pid = 0;
data->ugp.uid = 0;
data->ugp.gid = 0;
res = -ENOTSUP;
#endif /* no credentials */
return res;
}
static void
destroy_ipc_auth_data(struct ipc_auth_data *data)
{
if (data->s) {
qb_ipcs_unref(data->s);
}
#ifdef SO_PASSCRED
free(data->cmsg_cred);
#endif
free(data);
}
static struct ipc_auth_data *
init_ipc_auth_data(int sock, size_t len)
{
struct ipc_auth_data *data = calloc(1, sizeof(struct ipc_auth_data));
if (data == NULL) {
return NULL;
}
data->msg_recv.msg_iov = &data->iov_recv;
data->msg_recv.msg_iovlen = 1;
data->msg_recv.msg_name = 0;
data->msg_recv.msg_namelen = 0;
#ifdef SO_PASSCRED
data->cmsg_cred = calloc(1, CMSG_SPACE(sizeof(struct ucred)));
if (data->cmsg_cred == NULL) {
destroy_ipc_auth_data(data);
return NULL;
}
data->msg_recv.msg_control = (void *)data->cmsg_cred;
data->msg_recv.msg_controllen = CMSG_SPACE(sizeof(struct ucred));
#endif
#ifdef QB_SOLARIS
data->msg_recv.msg_accrights = 0;
data->msg_recv.msg_accrightslen = 0;
#else
data->msg_recv.msg_flags = 0;
#endif /* QB_SOLARIS */
data->len = len;
data->iov_recv.iov_base = &data->msg;
data->iov_recv.iov_len = data->len;
data->sock = sock;
return data;
}
int32_t
qb_ipcc_us_setup_connect(struct qb_ipcc_connection *c,
struct qb_ipc_connection_response *r)
{
int32_t res;
struct qb_ipc_connection_request request;
struct ipc_auth_data *data;
#ifdef QB_LINUX
int off = 0;
int on = 1;
#endif
res = qb_ipcc_stream_sock_connect(c->name, &c->setup.u.us.sock);
if (res != 0) {
return res;
}
#ifdef QB_LINUX
setsockopt(c->setup.u.us.sock, SOL_SOCKET, SO_PASSCRED, &on,
sizeof(on));
#endif
memset(&request, 0, sizeof(request));
request.hdr.id = QB_IPC_MSG_AUTHENTICATE;
request.hdr.size = sizeof(request);
request.max_msg_size = c->setup.max_msg_size;
res = qb_ipc_us_send(&c->setup, &request, request.hdr.size);
if (res < 0) {
qb_ipcc_us_sock_close(c->setup.u.us.sock);
return res;
}
data = init_ipc_auth_data(c->setup.u.us.sock, sizeof(struct qb_ipc_connection_response));
if (data == NULL) {
qb_ipcc_us_sock_close(c->setup.u.us.sock);
return -ENOMEM;
}
qb_ipc_us_ready(&c->setup, NULL, -1, POLLIN);
res = qb_ipc_us_recv_msghdr(data);
#ifdef QB_LINUX
setsockopt(c->setup.u.us.sock, SOL_SOCKET, SO_PASSCRED, &off,
sizeof(off));
#endif
if (res != data->len) {
destroy_ipc_auth_data(data);
return res;
}
memcpy(r, &data->msg.res, sizeof(struct qb_ipc_connection_response));
qb_ipc_auth_creds(data);
c->egid = data->ugp.gid;
c->euid = data->ugp.uid;
c->server_pid = data->ugp.pid;
destroy_ipc_auth_data(data);
return r->hdr.error;
}
/*
**************************************************************************
* SERVER
*/
int32_t
qb_ipcs_us_publish(struct qb_ipcs_service * s)
{
struct sockaddr_un un_addr;
int32_t res;
#ifdef SO_PASSCRED
int on = 1;
#endif
/*
* Create socket for IPC clients, name socket, listen for connections
*/
s->server_sock = socket(PF_UNIX, SOCK_STREAM, 0);
if (s->server_sock == -1) {
res = -errno;
qb_util_perror(LOG_ERR, "Cannot create server socket");
return res;
}
res = qb_sys_fd_nonblock_cloexec_set(s->server_sock);
if (res < 0) {
goto error_close;
}
memset(&un_addr, 0, sizeof(struct sockaddr_un));
un_addr.sun_family = AF_UNIX;
#if defined(QB_BSD) || defined(QB_DARWIN)
un_addr.sun_len = SUN_LEN(&un_addr);
#endif
qb_util_log(LOG_INFO, "server name: %s", s->name);
if (!use_filesystem_sockets()) {
snprintf(un_addr.sun_path + 1, UNIX_PATH_MAX - 1, "%s", s->name);
}
else {
struct stat stat_out;
res = stat(SOCKETDIR, &stat_out);
if (res == -1 || (res == 0 && !S_ISDIR(stat_out.st_mode))) {
res = -errno;
qb_util_log(LOG_CRIT,
"Required directory not present %s",
SOCKETDIR);
goto error_close;
}
snprintf(un_addr.sun_path, sizeof(un_addr.sun_path), "%s/%s", SOCKETDIR,
s->name);
unlink(un_addr.sun_path);
}
res = bind(s->server_sock, (struct sockaddr *)&un_addr,
QB_SUN_LEN(&un_addr));
if (res) {
res = -errno;
qb_util_perror(LOG_ERR, "Could not bind AF_UNIX (%s)",
un_addr.sun_path);
goto error_close;
}
/*
* Allow everyone to write to the socket since the IPC layer handles
* security automatically
*/
if (use_filesystem_sockets()) {
- res = chmod(un_addr.sun_path, S_IRWXU | S_IRWXG | S_IRWXO);
+ (void)chmod(un_addr.sun_path, S_IRWXU | S_IRWXG | S_IRWXO);
}
#ifdef SO_PASSCRED
- setsockopt(s->server_sock, SOL_SOCKET, SO_PASSCRED, &on, sizeof(on));
+ (void)setsockopt(s->server_sock, SOL_SOCKET, SO_PASSCRED, &on, sizeof(on));
#endif
if (listen(s->server_sock, SERVER_BACKLOG) == -1) {
qb_util_perror(LOG_ERR, "socket listen failed");
}
res = s->poll_fns.dispatch_add(s->poll_priority, s->server_sock,
POLLIN | POLLPRI | POLLNVAL,
s, qb_ipcs_us_connection_acceptor);
return res;
error_close:
close(s->server_sock);
return res;
}
int32_t
qb_ipcs_us_withdraw(struct qb_ipcs_service * s)
{
qb_util_log(LOG_INFO, "withdrawing server sockets");
(void)s->poll_fns.dispatch_del(s->server_sock);
shutdown(s->server_sock, SHUT_RDWR);
if (use_filesystem_sockets()) {
struct sockaddr_un sockname;
socklen_t socklen = sizeof(sockname);
if ((getsockname(s->server_sock, (struct sockaddr *)&sockname, &socklen) == 0) &&
sockname.sun_family == AF_UNIX) {
#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
/*
* Terminating NUL on FreeBSD is not part of the sun_path.
* Add it to use sun_path as a parameter of unlink
*/
sockname.sun_path[sockname.sun_len - offsetof(struct sockaddr_un, sun_path)] = '\0';
#endif
unlink(sockname.sun_path);
}
}
close(s->server_sock);
s->server_sock = -1;
return 0;
}
static int32_t
handle_new_connection(struct qb_ipcs_service *s,
int32_t auth_result,
int32_t sock,
void *msg, size_t len, struct ipc_auth_ugp *ugp)
{
struct qb_ipcs_connection *c = NULL;
struct qb_ipc_connection_request *req = msg;
int32_t res = auth_result;
int32_t res2 = 0;
uint32_t max_buffer_size = QB_MAX(req->max_msg_size, s->max_buffer_size);
struct qb_ipc_connection_response response;
const char suffix[] = "/qb";
int desc_len;
c = qb_ipcs_connection_alloc(s);
if (c == NULL) {
qb_ipcc_us_sock_close(sock);
return -ENOMEM;
}
c->receive_buf = calloc(1, max_buffer_size);
if (c->receive_buf == NULL) {
free(c);
qb_ipcc_us_sock_close(sock);
return -ENOMEM;
}
c->setup.u.us.sock = sock;
c->request.max_msg_size = max_buffer_size;
c->response.max_msg_size = max_buffer_size;
c->event.max_msg_size = max_buffer_size;
c->pid = ugp->pid;
c->auth.uid = c->euid = ugp->uid;
c->auth.gid = c->egid = ugp->gid;
c->auth.mode = 0600;
c->stats.client_pid = ugp->pid;
memset(&response, 0, sizeof(response));
#if defined(QB_LINUX) || defined(QB_CYGWIN)
desc_len = snprintf(c->description, CONNECTION_DESCRIPTION - sizeof suffix,
"/dev/shm/qb-%d-%d-%d-XXXXXX", s->pid, ugp->pid, c->setup.u.us.sock);
if (desc_len < 0) {
res = -errno;
goto send_response;
}
if (desc_len >= CONNECTION_DESCRIPTION - sizeof suffix) {
res = -ENAMETOOLONG;
goto send_response;
}
if (mkdtemp(c->description) == NULL) {
res = -errno;
goto send_response;
}
if (chmod(c->description, 0770)) {
res = -errno;
goto send_response;
}
/* chown can fail because we might not be root */
(void)chown(c->description, c->auth.uid, c->auth.gid);
/* We can't pass just a directory spec to the clients */
memcpy(c->description + desc_len, suffix, sizeof suffix);
#else
desc_len = snprintf(c->description, CONNECTION_DESCRIPTION,
"%d-%d-%d", s->pid, ugp->pid, c->setup.u.us.sock);
if (desc_len < 0) {
res = -errno;
goto send_response;
}
if (desc_len >= CONNECTION_DESCRIPTION) {
res = -ENAMETOOLONG;
goto send_response;
}
#endif
if (auth_result == 0 && c->service->serv_fns.connection_accept) {
res = c->service->serv_fns.connection_accept(c,
c->euid, c->egid);
}
if (res != 0) {
goto send_response;
}
qb_util_log(LOG_DEBUG, "IPC credentials authenticated (%s)",
c->description);
if (s->funcs.connect) {
res = s->funcs.connect(s, c, &response);
if (res != 0) {
goto send_response;
}
}
/*
* The connection is good, add it to the active connection list
*/
c->state = QB_IPCS_CONNECTION_ACTIVE;
qb_list_add(&c->list, &s->connections);
send_response:
response.hdr.id = QB_IPC_MSG_AUTHENTICATE;
response.hdr.size = sizeof(response);
response.hdr.error = res;
if (res == 0) {
response.connection = (intptr_t) c;
response.connection_type = s->type;
response.max_msg_size = c->request.max_msg_size;
s->stats.active_connections++;
}
res2 = qb_ipc_us_send(&c->setup, &response, response.hdr.size);
if (res == 0 && res2 != response.hdr.size) {
res = res2;
}
if (res == 0) {
qb_ipcs_connection_ref(c);
if (s->serv_fns.connection_created) {
s->serv_fns.connection_created(c);
}
if (c->state == QB_IPCS_CONNECTION_ACTIVE) {
c->state = QB_IPCS_CONNECTION_ESTABLISHED;
}
qb_ipcs_connection_unref(c);
} else {
if (res == -EACCES) {
qb_util_log(LOG_ERR, "Invalid IPC credentials (%s).",
c->description);
} else if (res == -EAGAIN) {
qb_util_log(LOG_WARNING, "Denied connection, is not ready (%s)",
c->description);
} else {
errno = -res;
qb_util_perror(LOG_ERR,
"Error in connection setup (%s)",
c->description);
}
if (c->state == QB_IPCS_CONNECTION_INACTIVE) {
/* This removes the initial alloc ref */
qb_ipcs_connection_unref(c);
qb_ipcc_us_sock_close(sock);
} else {
qb_ipcs_disconnect(c);
}
}
return res;
}
static int32_t
process_auth(int32_t fd, int32_t revents, void *d)
{
struct ipc_auth_data *data = (struct ipc_auth_data *) d;
int32_t res = 0;
#ifdef SO_PASSCRED
int off = 0;
#endif
if (data->s->server_sock == -1) {
qb_util_log(LOG_DEBUG, "Closing fd (%d) for server shutdown", fd);
res = -ESHUTDOWN;
goto cleanup_and_return;
}
if (revents & POLLNVAL) {
qb_util_log(LOG_DEBUG, "NVAL conn fd (%d)", fd);
res = -EINVAL;
goto cleanup_and_return;
}
if (revents & POLLHUP) {
qb_util_log(LOG_DEBUG, "HUP conn fd (%d)", fd);
res = -ESHUTDOWN;
goto cleanup_and_return;
}
if ((revents & POLLIN) == 0) {
return 0;
}
res = qb_ipc_us_recv_msghdr(data);
if (res == -EAGAIN) {
/* yield to mainloop, Let mainloop call us again */
return 0;
}
if (res != data->len) {
res = -EIO;
goto cleanup_and_return;
}
res = qb_ipc_auth_creds(data);
cleanup_and_return:
#ifdef SO_PASSCRED
setsockopt(data->sock, SOL_SOCKET, SO_PASSCRED, &off, sizeof(off));
#endif
(void)data->s->poll_fns.dispatch_del(data->sock);
if (res < 0) {
close(data->sock);
} else if (data->msg.req.hdr.id == QB_IPC_MSG_AUTHENTICATE) {
(void)handle_new_connection(data->s, res, data->sock, &data->msg, data->len, &data->ugp);
} else {
close(data->sock);
}
destroy_ipc_auth_data(data);
return 1;
}
static void
qb_ipcs_uc_recv_and_auth(int32_t sock, struct qb_ipcs_service *s)
{
int res = 0;
struct ipc_auth_data *data = NULL;
#ifdef SO_PASSCRED
int on = 1;
#endif
data = init_ipc_auth_data(sock, sizeof(struct qb_ipc_connection_request));
if (data == NULL) {
close(sock);
/* -ENOMEM */
return;
}
data->s = s;
qb_ipcs_ref(data->s);
#ifdef SO_PASSCRED
setsockopt(sock, SOL_SOCKET, SO_PASSCRED, &on, sizeof(on));
#endif
res = s->poll_fns.dispatch_add(s->poll_priority,
data->sock,
POLLIN | POLLPRI | POLLNVAL,
data, process_auth);
if (res < 0) {
qb_util_log(LOG_DEBUG, "Failed to arrange for AUTH for fd (%d)",
data->sock);
close(sock);
destroy_ipc_auth_data(data);
}
}
static int32_t
qb_ipcs_us_connection_acceptor(int fd, int revent, void *data)
{
struct sockaddr_un un_addr;
int32_t new_fd;
struct qb_ipcs_service *s = (struct qb_ipcs_service *)data;
int32_t res;
socklen_t addrlen = sizeof(struct sockaddr_un);
if (revent & (POLLNVAL | POLLHUP | POLLERR)) {
/*
* handle shutdown more cleanly.
*/
return -1;
}
retry_accept:
errno = 0;
new_fd = accept(fd, (struct sockaddr *)&un_addr, &addrlen);
if (new_fd == -1 && errno == EINTR) {
goto retry_accept;
}
if (new_fd == -1 && errno == EBADF) {
qb_util_perror(LOG_ERR,
"Could not accept client connection from fd:%d",
fd);
return -1;
}
if (new_fd == -1) {
qb_util_perror(LOG_ERR, "Could not accept client connection");
/* This is an error, but -1 would indicate disconnect
* from the poll loop
*/
return 0;
}
res = qb_sys_fd_nonblock_cloexec_set(new_fd);
if (res < 0) {
close(new_fd);
/* This is an error, but -1 would indicate disconnect
* from the poll loop
*/
return 0;
}
qb_ipcs_uc_recv_and_auth(new_fd, s);
return 0;
}
void remove_tempdir(const char *name)
{
#if defined(QB_LINUX) || defined(QB_CYGWIN)
char dirname[PATH_MAX];
char *slash = strrchr(name, '/');
if (slash && slash - name < sizeof dirname) {
memcpy(dirname, name, slash - name);
dirname[slash - name] = '\0';
/* This gets called more than it needs to be really, so we don't check
* the return code. It's more of a desperate attempt to clean up after ourself
* in either the server or client.
*/
(void)rmdir(dirname);
}
#endif
}
diff --git a/lib/ipc_socket.c b/lib/ipc_socket.c
index 0a13ebf..178a634 100644
--- a/lib/ipc_socket.c
+++ b/lib/ipc_socket.c
@@ -1,921 +1,921 @@
/*
* Copyright (C) 2010,2013 Red Hat, Inc.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os_base.h"
#include <poll.h>
#ifdef HAVE_SYS_UN_H
#include <sys/un.h>
#endif /* HAVE_SYS_UN_H */
#ifdef HAVE_SYS_MMAN_H
#include <sys/mman.h>
#endif
#include <qb/qbatomic.h>
#include <qb/qbipcs.h>
#include <qb/qbloop.h>
#include <qb/qbdefs.h>
#include "util_int.h"
#include "ipc_int.h"
struct ipc_us_control {
int32_t sent;
int32_t flow_control;
};
#define SHM_CONTROL_SIZE (3 * sizeof(struct ipc_us_control))
int use_filesystem_sockets(void)
{
static int need_init = 1;
static int filesystem_sockets = 0;
if (need_init) {
#if defined(QB_LINUX) || defined(QB_CYGWIN)
struct stat buf;
if (stat(FORCESOCKETSFILE, &buf) == 0) {
filesystem_sockets = 1;
}
#else
filesystem_sockets = 1;
#endif
need_init = 0;
}
return filesystem_sockets;
}
static void
set_sock_addr(struct sockaddr_un *address, const char *socket_name)
{
memset(address, 0, sizeof(struct sockaddr_un));
address->sun_family = AF_UNIX;
#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
address->sun_len = QB_SUN_LEN(address);
#endif
if (socket_name[0] == '/' || !use_filesystem_sockets()) {
snprintf(address->sun_path + 1, UNIX_PATH_MAX - 1, "%s", socket_name);
} else {
snprintf(address->sun_path, sizeof(address->sun_path), "%s/%s", SOCKETDIR,
socket_name);
}
}
static int32_t
qb_ipc_dgram_sock_setup(const char *base_name,
const char *service_name, int32_t * sock_pt,
gid_t gid)
{
int32_t request_fd;
struct sockaddr_un local_address;
int32_t res = 0;
char sock_path[PATH_MAX];
request_fd = socket(PF_UNIX, SOCK_DGRAM, 0);
if (request_fd == -1) {
return -errno;
}
qb_socket_nosigpipe(request_fd);
res = qb_sys_fd_nonblock_cloexec_set(request_fd);
if (res < 0) {
goto error_connect;
}
snprintf(sock_path, PATH_MAX, "%s-%s", base_name, service_name);
set_sock_addr(&local_address, sock_path);
if (use_filesystem_sockets()) {
- res = unlink(local_address.sun_path);
+ (void)unlink(local_address.sun_path);
}
res = bind(request_fd, (struct sockaddr *)&local_address,
sizeof(local_address));
if (use_filesystem_sockets()) {
(void)chmod(local_address.sun_path, 0660);
(void)chown(local_address.sun_path, -1, gid);
}
if (res < 0) {
goto error_connect;
}
*sock_pt = request_fd;
return 0;
error_connect:
close(request_fd);
*sock_pt = -1;
return res;
}
static int32_t
set_sock_size(int sockfd, size_t max_msg_size)
{
int32_t rc;
unsigned int optval;
socklen_t optlen = sizeof(optval);
rc = getsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, &optval, &optlen);
qb_util_log(LOG_TRACE, "%d: getsockopt(%d, SO_SNDBUF, needed:%d) actual:%d",
rc, sockfd, max_msg_size, optval);
/* The optval <= max_msg_size check is weird...
* during testing it was discovered in some instances if the
* default optval is exactly equal to our max_msg_size, we couldn't
* actually send a message that large unless we explicitly set
* it using setsockopt... there is no good explaination for this. Most
* likely this is hitting some sort of "off by one" error in the kernel. */
if (rc == 0 && optval <= max_msg_size) {
optval = max_msg_size;
optlen = sizeof(optval);
rc = setsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, &optval, optlen);
}
if (rc != 0) {
return -errno;
}
rc = getsockopt(sockfd, SOL_SOCKET, SO_RCVBUF, &optval, &optlen);
qb_util_log(LOG_TRACE, "%d: getsockopt(%d, SO_RCVBUF, needed:%d) actual:%d",
rc, sockfd, max_msg_size, optval);
/* Set the sockets receive buffer size to match the send buffer. On
* FreeBSD without this calls to sendto() will result in an ENOBUFS error
* if the message is larger than net.local.dgram.recvspace sysctl. */
if (rc == 0 && optval <= max_msg_size) {
optval = max_msg_size;
optlen = sizeof(optval);
rc = setsockopt(sockfd, SOL_SOCKET, SO_RCVBUF, &optval, optlen);
}
if (rc != 0) {
return -errno;
}
return rc;
}
static int32_t
dgram_verify_msg_size(size_t max_msg_size)
{
int32_t rc = -1;
int32_t sockets[2];
int32_t tries = 0;
int32_t write_passed = 0;
int32_t read_passed = 0;
char buf[max_msg_size];
if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets) < 0) {
qb_util_perror(LOG_DEBUG, "error calling socketpair()");
goto cleanup_socks;
}
if (set_sock_size(sockets[0], max_msg_size) != 0) {
qb_util_log(LOG_DEBUG, "error set_sock_size(sockets[0],%#x)",
max_msg_size);
goto cleanup_socks;
}
if (set_sock_size(sockets[1], max_msg_size) != 0) {
qb_util_log(LOG_DEBUG, "error set_sock_size(sockets[1],%#x)",
max_msg_size);
goto cleanup_socks;
}
for (tries = 0; tries < 3; tries++) {
if (write_passed == 0) {
rc = write(sockets[1], buf, max_msg_size);
if (rc < 0 && (errno == EAGAIN || errno == EINTR)) {
continue;
} else if (rc == max_msg_size) {
write_passed = 1;
} else {
break;
}
}
if (read_passed == 0) {
rc = read(sockets[0], buf, max_msg_size);
if (rc < 0 && (errno == EAGAIN || errno == EINTR)) {
continue;
} else if (rc == max_msg_size) {
read_passed = 1;
} else {
break;
}
}
if (read_passed && write_passed) {
rc = 0;
break;
}
}
cleanup_socks:
close(sockets[0]);
close(sockets[1]);
return rc;
}
int32_t
qb_ipcc_verify_dgram_max_msg_size(size_t max_msg_size)
{
int32_t i;
int32_t last = -1;
int32_t inc = 2048;
if (dgram_verify_msg_size(max_msg_size) == 0) {
return max_msg_size;
}
for (i = inc; i < max_msg_size; i+=inc) {
if (dgram_verify_msg_size(i) == 0) {
last = i;
} else if (inc >= 512) {
i-=inc;
inc = inc/2;
} else {
break;
}
}
return last;
}
/*
* bind to "base_name-local_name"
* connect to "base_name-remote_name"
* output sock_pt
*/
static int32_t
qb_ipc_dgram_sock_connect(const char *base_name,
const char *local_name,
const char *remote_name,
int32_t max_msg_size, int32_t * sock_pt, gid_t gid)
{
char sock_path[PATH_MAX];
struct sockaddr_un remote_address;
int32_t res = qb_ipc_dgram_sock_setup(base_name, local_name,
sock_pt, gid);
if (res < 0) {
return res;
}
snprintf(sock_path, PATH_MAX, "%s-%s", base_name, remote_name);
set_sock_addr(&remote_address, sock_path);
if (connect(*sock_pt, (struct sockaddr *)&remote_address,
QB_SUN_LEN(&remote_address)) == -1) {
res = -errno;
goto error_connect;
}
return set_sock_size(*sock_pt, max_msg_size);
error_connect:
close(*sock_pt);
*sock_pt = -1;
return res;
}
static int32_t
_finish_connecting(struct qb_ipc_one_way *one_way)
{
struct sockaddr_un remote_address;
int res;
int error;
int retry = 0;
set_sock_addr(&remote_address, one_way->u.us.sock_name);
/* this retry loop is here to help connecting when trying to send
* an event right after connection setup.
*/
do {
errno = 0;
res = connect(one_way->u.us.sock,
(struct sockaddr *)&remote_address,
QB_SUN_LEN(&remote_address));
if (res == -1) {
error = -errno;
qb_util_perror(LOG_DEBUG, "error calling connect()");
retry++;
usleep(100000);
}
} while (res == -1 && retry < 10);
if (res == -1) {
return error;
}
/* Beside disposing no longer needed value, this also signals that
we are done with connect-on-send arrangement at the server side
(i.e. for response and event channels). */
free(one_way->u.us.sock_name);
one_way->u.us.sock_name = NULL;
return set_sock_size(one_way->u.us.sock, one_way->max_msg_size);
}
/*
* client functions
* --------------------------------------------------------
*/
static void
qb_ipcc_us_disconnect(struct qb_ipcc_connection *c)
{
munmap(c->request.u.us.shared_data, SHM_CONTROL_SIZE);
unlink(c->request.u.us.shared_file_name);
if (use_filesystem_sockets()) {
struct sockaddr_un un_addr;
socklen_t un_addr_len = sizeof(struct sockaddr_un);
char *base_name;
char sock_name[PATH_MAX];
size_t length;
if (getsockname(c->response.u.us.sock, (struct sockaddr *)&un_addr, &un_addr_len) == 0) {
length = strlen(un_addr.sun_path);
base_name = strndup(un_addr.sun_path,
length - /* strlen("-response") */ 9);
qb_util_log(LOG_DEBUG, "unlinking socket bound files with base_name=%s length=%d",base_name,length);
snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"request");
qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
unlink(sock_name);
snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"event");
qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
unlink(sock_name);
snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"event-tx");
qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
unlink(sock_name);
snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"response");
qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
unlink(sock_name);
free(base_name);
}
}
/* Last-ditch attempt to tidy up after ourself */
remove_tempdir(c->request.u.us.shared_file_name);
qb_ipcc_us_sock_close(c->event.u.us.sock);
qb_ipcc_us_sock_close(c->request.u.us.sock);
qb_ipcc_us_sock_close(c->setup.u.us.sock);
}
static ssize_t
qb_ipc_socket_send(struct qb_ipc_one_way *one_way,
const void *msg_ptr, size_t msg_len)
{
ssize_t rc = 0;
struct ipc_us_control *ctl;
ctl = (struct ipc_us_control *)one_way->u.us.shared_data;
if (one_way->u.us.sock_name) {
rc = _finish_connecting(one_way);
if (rc < 0) {
qb_util_log(LOG_ERR, "socket connect-on-send");
return rc;
}
}
qb_sigpipe_ctl(QB_SIGPIPE_IGNORE);
rc = send(one_way->u.us.sock, msg_ptr, msg_len, MSG_NOSIGNAL);
if (rc == -1) {
rc = -errno;
if (errno != EAGAIN && errno != ENOBUFS) {
qb_util_perror(LOG_DEBUG, "socket_send:send");
}
}
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
if (ctl && rc == msg_len) {
qb_atomic_int_inc(&ctl->sent);
}
return rc;
}
static ssize_t
qb_ipc_socket_sendv(struct qb_ipc_one_way *one_way, const struct iovec *iov,
size_t iov_len)
{
int32_t rc;
struct ipc_us_control *ctl;
ctl = (struct ipc_us_control *)one_way->u.us.shared_data;
qb_sigpipe_ctl(QB_SIGPIPE_IGNORE);
if (one_way->u.us.sock_name) {
rc = _finish_connecting(one_way);
if (rc < 0) {
qb_util_perror(LOG_ERR, "socket connect-on-sendv");
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
return rc;
}
}
rc = writev(one_way->u.us.sock, iov, iov_len);
if (rc == -1) {
rc = -errno;
if (errno != EAGAIN && errno != ENOBUFS) {
qb_util_perror(LOG_DEBUG, "socket_sendv:writev %d",
one_way->u.us.sock);
}
}
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
if (ctl && rc > 0) {
qb_atomic_int_inc(&ctl->sent);
}
return rc;
}
/*
* recv a message of unknown size.
*/
static ssize_t
qb_ipc_us_recv_at_most(struct qb_ipc_one_way *one_way,
void *msg, size_t len, int32_t timeout)
{
int32_t result;
int32_t final_rc = 0;
int32_t to_recv = 0;
char *data = msg;
struct ipc_us_control *ctl = NULL;
int32_t time_waited = 0;
int32_t time_to_wait = timeout;
if (timeout == -1) {
time_to_wait = 1000;
}
qb_sigpipe_ctl(QB_SIGPIPE_IGNORE);
retry_peek:
result = recv(one_way->u.us.sock, data,
sizeof(struct qb_ipc_request_header),
MSG_NOSIGNAL | MSG_PEEK);
if (result == -1) {
if (errno != EAGAIN) {
final_rc = -errno;
if (use_filesystem_sockets()) {
if (errno == ECONNRESET || errno == EPIPE) {
final_rc = -ENOTCONN;
}
}
goto cleanup_sigpipe;
}
/* check to see if we have enough time left to try again */
if (time_waited < timeout || timeout == -1) {
result = qb_ipc_us_ready(one_way, NULL, time_to_wait, POLLIN);
if (qb_ipc_us_sock_error_is_disconnected(result)) {
final_rc = result;
goto cleanup_sigpipe;
}
time_waited += time_to_wait;
goto retry_peek;
} else if (time_waited >= timeout) {
final_rc = -ETIMEDOUT;
goto cleanup_sigpipe;
}
}
if (result >= sizeof(struct qb_ipc_request_header)) {
struct qb_ipc_request_header *hdr = NULL;
hdr = (struct qb_ipc_request_header *)msg;
to_recv = hdr->size;
}
result = recv(one_way->u.us.sock, data, to_recv,
MSG_NOSIGNAL | MSG_WAITALL);
if (result == -1) {
final_rc = -errno;
goto cleanup_sigpipe;
} else if (result == 0) {
qb_util_log(LOG_DEBUG, "recv == 0 -> ENOTCONN");
final_rc = -ENOTCONN;
goto cleanup_sigpipe;
}
final_rc = result;
ctl = (struct ipc_us_control *)one_way->u.us.shared_data;
if (ctl) {
(void)qb_atomic_int_dec_and_test(&ctl->sent);
}
cleanup_sigpipe:
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
return final_rc;
}
static void
qb_ipc_us_fc_set(struct qb_ipc_one_way *one_way, int32_t fc_enable)
{
struct ipc_us_control *ctl =
(struct ipc_us_control *)one_way->u.us.shared_data;
qb_util_log(LOG_TRACE, "setting fc to %d", fc_enable);
qb_atomic_int_set(&ctl->flow_control, fc_enable);
}
static int32_t
qb_ipc_us_fc_get(struct qb_ipc_one_way *one_way)
{
struct ipc_us_control *ctl =
(struct ipc_us_control *)one_way->u.us.shared_data;
return qb_atomic_int_get(&ctl->flow_control);
}
static ssize_t
qb_ipc_us_q_len_get(struct qb_ipc_one_way *one_way)
{
struct ipc_us_control *ctl =
(struct ipc_us_control *)one_way->u.us.shared_data;
return qb_atomic_int_get(&ctl->sent);
}
int32_t
qb_ipcc_us_connect(struct qb_ipcc_connection * c,
struct qb_ipc_connection_response * r)
{
int32_t res;
char path[PATH_MAX];
int32_t fd_hdr;
char *shm_ptr;
qb_atomic_init();
c->needs_sock_for_poll = QB_FALSE;
c->funcs.send = qb_ipc_socket_send;
c->funcs.sendv = qb_ipc_socket_sendv;
c->funcs.recv = qb_ipc_us_recv_at_most;
c->funcs.fc_get = qb_ipc_us_fc_get;
c->funcs.disconnect = qb_ipcc_us_disconnect;
fd_hdr = qb_sys_mmap_file_open(path, r->request,
SHM_CONTROL_SIZE, O_RDWR);
if (fd_hdr < 0) {
res = fd_hdr;
errno = -fd_hdr;
qb_util_perror(LOG_ERR, "couldn't open file for mmap");
return res;
}
(void)strlcpy(c->request.u.us.shared_file_name, r->request, NAME_MAX);
shm_ptr = mmap(0, SHM_CONTROL_SIZE,
PROT_READ | PROT_WRITE, MAP_SHARED, fd_hdr, 0);
if (shm_ptr == MAP_FAILED) {
res = -errno;
qb_util_perror(LOG_ERR, "couldn't create mmap for header");
goto cleanup_hdr;
}
c->request.u.us.shared_data = shm_ptr;
c->response.u.us.shared_data = shm_ptr + sizeof(struct ipc_us_control);
c->event.u.us.shared_data = shm_ptr + (2 * sizeof(struct ipc_us_control));
close(fd_hdr);
fd_hdr = -1;
res = qb_ipc_dgram_sock_connect(r->response, "response", "request",
r->max_msg_size, &c->request.u.us.sock, c->egid);
if (res != 0) {
goto cleanup_hdr;
}
c->response.u.us.sock = c->request.u.us.sock;
res = qb_ipc_dgram_sock_connect(r->response, "event", "event-tx",
r->max_msg_size, &c->event.u.us.sock, c->egid);
if (res != 0) {
goto cleanup_hdr;
}
return 0;
cleanup_hdr:
if (fd_hdr >= 0) {
close(fd_hdr);
}
close(c->event.u.us.sock);
close(c->request.u.us.sock);
unlink(r->request);
munmap(c->request.u.us.shared_data, SHM_CONTROL_SIZE);
return res;
}
/*
* service functions
* --------------------------------------------------------
*/
static int32_t
_sock_connection_liveliness(int32_t fd, int32_t revents, void *data)
{
struct qb_ipcs_connection *c = (struct qb_ipcs_connection *)data;
qb_util_log(LOG_DEBUG, "LIVENESS: fd %d event %d conn (%s)",
fd, revents, c->description);
if (revents & POLLNVAL) {
qb_util_log(LOG_DEBUG, "NVAL conn (%s)", c->description);
qb_ipcs_disconnect(c);
return -EINVAL;
}
if (revents & POLLHUP) {
qb_util_log(LOG_DEBUG, "HUP conn (%s)", c->description);
qb_ipcs_disconnect(c);
return -ESHUTDOWN;
}
/* If we actually get POLLIN for some reason here, it most
* certainly means EOF. Do a recv on the fd to detect eof and
* then disconnect */
if (revents & POLLIN) {
char buf[10];
int res;
res = recv(fd, buf, sizeof(buf), MSG_DONTWAIT);
if (res < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
res = -errno;
} else if (res == 0) {
qb_util_log(LOG_DEBUG, "EOF conn (%s)", c->description);
res = -ESHUTDOWN;
}
if (res < 0) {
qb_ipcs_disconnect(c);
return res;
}
}
return 0;
}
static int32_t
_sock_add_to_mainloop(struct qb_ipcs_connection *c)
{
int res;
res = c->service->poll_fns.dispatch_add(c->service->poll_priority,
c->request.u.us.sock,
POLLIN | POLLPRI | POLLNVAL,
c,
qb_ipcs_dispatch_connection_request);
if (res < 0) {
qb_util_log(LOG_ERR,
"Error adding socket to mainloop (%s).",
c->description);
return res;
}
res = c->service->poll_fns.dispatch_add(c->service->poll_priority,
c->setup.u.us.sock,
POLLIN | POLLPRI | POLLNVAL,
c, _sock_connection_liveliness);
qb_util_log(LOG_DEBUG, "added %d to poll loop (liveness)",
c->setup.u.us.sock);
if (res < 0) {
qb_util_perror(LOG_ERR, "Error adding setupfd to mainloop");
(void)c->service->poll_fns.dispatch_del(c->request.u.us.sock);
return res;
}
return res;
}
static void
_sock_rm_from_mainloop(struct qb_ipcs_connection *c)
{
(void)c->service->poll_fns.dispatch_del(c->request.u.us.sock);
(void)c->service->poll_fns.dispatch_del(c->setup.u.us.sock);
}
static void
qb_ipcs_us_disconnect(struct qb_ipcs_connection *c)
{
qb_enter();
if (c->state == QB_IPCS_CONNECTION_ESTABLISHED ||
c->state == QB_IPCS_CONNECTION_ACTIVE) {
_sock_rm_from_mainloop(c);
/* Free the temporaries denoting which respective socket
name on the client's side to connect upon the first
send operation -- normally the variable is free'd once
the connection is established but there may have been
no chance for that. */
free(c->response.u.us.sock_name);
c->response.u.us.sock_name = NULL;
free(c->event.u.us.sock_name);
c->event.u.us.sock_name = NULL;
if (use_filesystem_sockets()) {
struct sockaddr_un un_addr;
socklen_t un_addr_len = sizeof(struct sockaddr_un);
char *base_name;
char sock_name[PATH_MAX];
size_t length;
if (getsockname(c->request.u.us.sock, (struct sockaddr *)&un_addr, &un_addr_len) == 0) {
length = strlen(un_addr.sun_path);
base_name = strndup(un_addr.sun_path,
length - /* strlen("-request") */ 8);
qb_util_log(LOG_DEBUG, "unlinking socket bound files with base_name=%s length=%d",base_name,length);
snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"request");
qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
unlink(sock_name);
snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"event");
qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
unlink(sock_name);
snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"event-tx");
qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
unlink(sock_name);
snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"response");
qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
unlink(sock_name);
free(base_name);
}
}
qb_ipcc_us_sock_close(c->setup.u.us.sock);
qb_ipcc_us_sock_close(c->request.u.us.sock);
qb_ipcc_us_sock_close(c->event.u.us.sock);
}
if (c->state == QB_IPCS_CONNECTION_SHUTTING_DOWN ||
c->state == QB_IPCS_CONNECTION_ACTIVE) {
munmap(c->request.u.us.shared_data, SHM_CONTROL_SIZE);
unlink(c->request.u.us.shared_file_name);
}
remove_tempdir(c->description);
}
static int32_t
qb_ipcs_us_connect(struct qb_ipcs_service *s,
struct qb_ipcs_connection *c,
struct qb_ipc_connection_response *r)
{
char path[PATH_MAX];
int32_t fd_hdr;
int32_t res = 0;
struct ipc_us_control *ctl;
char *shm_ptr;
qb_util_log(LOG_DEBUG, "connecting to client (%s)", c->description);
c->request.u.us.sock = c->setup.u.us.sock;
c->response.u.us.sock = c->setup.u.us.sock;
snprintf(r->request, NAME_MAX, "%s-control-%s",
c->description, s->name);
snprintf(r->response, NAME_MAX, "%s-%s", c->description, s->name);
fd_hdr = qb_sys_mmap_file_open(path, r->request,
SHM_CONTROL_SIZE,
O_CREAT | O_TRUNC | O_RDWR | O_EXCL);
if (fd_hdr < 0) {
res = fd_hdr;
errno = -fd_hdr;
qb_util_perror(LOG_ERR, "couldn't create file for mmap (%s)",
c->description);
return res;
}
(void)strlcpy(r->request, path, PATH_MAX);
(void)strlcpy(c->request.u.us.shared_file_name, r->request, NAME_MAX);
res = chown(r->request, c->auth.uid, c->auth.gid);
if (res != 0) {
/* ignore res, this is just for the compiler warnings.
*/
res = 0;
}
res = chmod(r->request, c->auth.mode);
if (res != 0) {
/* ignore res, this is just for the compiler warnings.
*/
res = 0;
}
shm_ptr = mmap(0, SHM_CONTROL_SIZE,
PROT_READ | PROT_WRITE, MAP_SHARED, fd_hdr, 0);
if (shm_ptr == MAP_FAILED) {
res = -errno;
qb_util_perror(LOG_ERR, "couldn't create mmap for header (%s)",
c->description);
goto cleanup_hdr;
}
c->request.u.us.shared_data = shm_ptr;
c->response.u.us.shared_data = shm_ptr + sizeof(struct ipc_us_control);
c->event.u.us.shared_data = shm_ptr + (2 * sizeof(struct ipc_us_control));
ctl = (struct ipc_us_control *)c->request.u.us.shared_data;
ctl->sent = 0;
ctl->flow_control = 0;
ctl = (struct ipc_us_control *)c->response.u.us.shared_data;
ctl->sent = 0;
ctl->flow_control = 0;
ctl = (struct ipc_us_control *)c->event.u.us.shared_data;
ctl->sent = 0;
ctl->flow_control = 0;
close(fd_hdr);
fd_hdr = -1;
/* request channel */
res = qb_ipc_dgram_sock_setup(r->response, "request",
&c->request.u.us.sock, c->egid);
if (res < 0) {
goto cleanup_hdr;
}
res = set_sock_size(c->request.u.us.sock, c->request.max_msg_size);
if (res != 0) {
goto cleanup_hdr;
}
c->setup.u.us.sock_name = NULL;
c->request.u.us.sock_name = NULL;
/* response channel */
c->response.u.us.sock = c->request.u.us.sock;
snprintf(path, PATH_MAX, "%s-%s", r->response, "response");
c->response.u.us.sock_name = strdup(path);
/* event channel */
res = qb_ipc_dgram_sock_setup(r->response, "event-tx",
&c->event.u.us.sock, c->egid);
if (res < 0) {
goto cleanup_hdr;
}
res = set_sock_size(c->event.u.us.sock, c->event.max_msg_size);
if (res != 0) {
goto cleanup_hdr;
}
snprintf(path, PATH_MAX, "%s-%s", r->response, "event");
c->event.u.us.sock_name = strdup(path);
res = _sock_add_to_mainloop(c);
if (res < 0) {
goto cleanup_hdr;
}
return res;
cleanup_hdr:
free(c->response.u.us.sock_name);
free(c->event.u.us.sock_name);
if (fd_hdr >= 0) {
close(fd_hdr);
}
unlink(r->request);
munmap(c->request.u.us.shared_data, SHM_CONTROL_SIZE);
return res;
}
void
qb_ipcs_us_init(struct qb_ipcs_service *s)
{
s->funcs.connect = qb_ipcs_us_connect;
s->funcs.disconnect = qb_ipcs_us_disconnect;
s->funcs.recv = qb_ipc_us_recv_at_most;
s->funcs.peek = NULL;
s->funcs.reclaim = NULL;
s->funcs.send = qb_ipc_socket_send;
s->funcs.sendv = qb_ipc_socket_sendv;
s->funcs.fc_set = qb_ipc_us_fc_set;
s->funcs.q_len_get = qb_ipc_us_q_len_get;
s->needs_sock_for_poll = QB_FALSE;
qb_atomic_init();
}
diff --git a/tests/check_loop.c b/tests/check_loop.c
index c017c2c..2bea159 100644
--- a/tests/check_loop.c
+++ b/tests/check_loop.c
@@ -1,782 +1,782 @@
/*
* Copyright (c) 2010 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os_base.h"
#include "check_common.h"
#include <qb/qbdefs.h>
#include <qb/qbutil.h>
#include <qb/qbloop.h>
#include <qb/qblog.h>
static int32_t job_1_run_count = 0;
static int32_t job_2_run_count = 0;
static int32_t job_3_run_count = 0;
static int32_t job_order_1 = 1;
static int32_t job_order_2 = 2;
static int32_t job_order_3 = 3;
static int32_t job_order_4 = 4;
static int32_t job_order_5 = 5;
static int32_t job_order_6 = 6;
static int32_t job_order_7 = 7;
static int32_t job_order_8 = 8;
static int32_t job_order_9 = 9;
static int32_t job_order_10 = 10;
static int32_t job_order_11 = 11;
static int32_t job_order_12 = 12;
static int32_t job_order_13 = 13;
static void job_1(void *data)
{
job_1_run_count++;
}
static void job_order_check(void *data)
{
int32_t * order = (int32_t *)data;
job_1_run_count++;
ck_assert_int_eq(job_1_run_count, *order);
if (job_1_run_count == 1) {
qb_loop_job_add(NULL, QB_LOOP_MED, &job_order_10, job_order_check);
qb_loop_job_add(NULL, QB_LOOP_MED, &job_order_11, job_order_check);
qb_loop_job_add(NULL, QB_LOOP_MED, &job_order_12, job_order_check);
qb_loop_job_add(NULL, QB_LOOP_MED, &job_order_13, job_order_check);
} else if (job_1_run_count >= 13) {
qb_loop_stop(NULL);
}
}
static void job_stop(void *data)
{
qb_loop_t *l = (qb_loop_t *)data;
job_3_run_count++;
qb_loop_stop(l);
}
static void job_2(void *data)
{
int32_t res;
qb_loop_t *l = (qb_loop_t *)data;
job_2_run_count++;
res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_stop);
ck_assert_int_eq(res, 0);
}
static void job_1_r(void *data)
{
int32_t res;
qb_loop_t *l = (qb_loop_t *)data;
job_1_run_count++;
res = qb_loop_job_add(l, QB_LOOP_MED, data, job_2);
ck_assert_int_eq(res, 0);
}
static void job_1_add_nuts(void *data)
{
int32_t res;
qb_loop_t *l = (qb_loop_t *)data;
job_1_run_count++;
res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_MED, data, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_MED, data, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_MED, data, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_MED, data, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_LOW, data, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_LOW, data, job_1);
ck_assert_int_eq(res, 0);
if (job_1_run_count < 500) {
res = qb_loop_job_add(l, QB_LOOP_LOW, data, job_1_add_nuts);
ck_assert_int_eq(res, 0);
} else {
res = qb_loop_job_add(l, QB_LOOP_LOW, data, job_stop);
ck_assert_int_eq(res, 0);
}
ck_assert_int_eq(res, 0);
}
START_TEST(test_loop_job_input)
{
int32_t res;
qb_loop_t *l;
res = qb_loop_job_add(NULL, QB_LOOP_LOW, NULL, job_2);
ck_assert_int_eq(res, -EINVAL);
l = qb_loop_create();
ck_assert(l != NULL);
res = qb_loop_job_add(NULL, QB_LOOP_LOW, NULL, job_2);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, 89, NULL, job_2);
ck_assert_int_eq(res, -EINVAL);
res = qb_loop_job_add(l, QB_LOOP_LOW, NULL, NULL);
ck_assert_int_eq(res, -EINVAL);
qb_loop_destroy(l);
}
END_TEST
START_TEST(test_loop_job_1)
{
int32_t res;
qb_loop_t *l = qb_loop_create();
ck_assert(l != NULL);
res = qb_loop_job_add(l, QB_LOOP_LOW, NULL, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_LOW, l, job_stop);
ck_assert_int_eq(res, 0);
qb_loop_run(l);
ck_assert_int_eq(job_1_run_count, 1);
qb_loop_destroy(l);
}
END_TEST
START_TEST(test_loop_job_4)
{
int32_t res;
qb_loop_t *l = qb_loop_create();
ck_assert(l != NULL);
res = qb_loop_job_add(l, QB_LOOP_LOW, l, job_1_r);
ck_assert_int_eq(res, 0);
qb_loop_run(l);
ck_assert_int_eq(job_1_run_count, 1);
ck_assert_int_eq(job_2_run_count, 1);
ck_assert_int_eq(job_3_run_count, 1);
qb_loop_destroy(l);
}
END_TEST
START_TEST(test_loop_job_nuts)
{
int32_t res;
qb_loop_t *l = qb_loop_create();
ck_assert(l != NULL);
res = qb_loop_job_add(l, QB_LOOP_LOW, l, job_1_add_nuts);
ck_assert_int_eq(res, 0);
qb_loop_run(l);
ck_assert(job_1_run_count >= 500);
qb_loop_destroy(l);
}
END_TEST
START_TEST(test_loop_job_order)
{
int32_t res;
qb_loop_t *l = qb_loop_create();
ck_assert(l != NULL);
job_1_run_count = 0;
res = qb_loop_job_add(l, QB_LOOP_MED, &job_order_1, job_order_check);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_MED, &job_order_2, job_order_check);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_MED, &job_order_3, job_order_check);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_MED, &job_order_4, job_order_check);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_MED, &job_order_5, job_order_check);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_MED, &job_order_6, job_order_check);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_MED, &job_order_7, job_order_check);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_MED, &job_order_8, job_order_check);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_MED, &job_order_9, job_order_check);
ck_assert_int_eq(res, 0);
qb_loop_run(l);
qb_loop_destroy(l);
}
END_TEST
static qb_util_stopwatch_t *rl_sw;
#define RATE_LIMIT_RUNTIME_SEC 3
static void job_add_self(void *data)
{
int32_t res;
uint64_t elapsed1;
qb_loop_t *l = (qb_loop_t *)data;
job_1_run_count++;
qb_util_stopwatch_stop(rl_sw);
elapsed1 = qb_util_stopwatch_us_elapsed_get(rl_sw);
if (elapsed1 > (RATE_LIMIT_RUNTIME_SEC * QB_TIME_US_IN_SEC)) {
/* run for 3 seconds */
qb_loop_stop(l);
return;
}
res = qb_loop_job_add(l, QB_LOOP_MED, data, job_add_self);
ck_assert_int_eq(res, 0);
}
START_TEST(test_job_rate_limit)
{
int32_t res;
qb_loop_t *l = qb_loop_create();
ck_assert(l != NULL);
rl_sw = qb_util_stopwatch_create();
ck_assert(rl_sw != NULL);
qb_util_stopwatch_start(rl_sw);
res = qb_loop_job_add(l, QB_LOOP_MED, l, job_add_self);
ck_assert_int_eq(res, 0);
qb_loop_run(l);
/*
* the test is to confirm that a single job does not run away
* and cause cpu spin. We are going to say that a spin is more than
* one job per 50ms if there is only one job pending in the loop.
*/
_ck_assert_int(job_1_run_count, <, (RATE_LIMIT_RUNTIME_SEC * (QB_TIME_MS_IN_SEC/50)) + 10);
qb_loop_destroy(l);
qb_util_stopwatch_free(rl_sw);
}
END_TEST
static void job_stop_and_del_1(void *data)
{
int32_t res;
qb_loop_t *l = (qb_loop_t *)data;
job_3_run_count++;
res = qb_loop_job_del(l, QB_LOOP_MED, l, job_1);
ck_assert_int_eq(res, 0);
qb_loop_stop(l);
}
START_TEST(test_job_add_del)
{
int32_t res;
qb_loop_t *l = qb_loop_create();
ck_assert(l != NULL);
res = qb_loop_job_add(l, QB_LOOP_MED, l, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_del(l, QB_LOOP_MED, l, job_1);
ck_assert_int_eq(res, 0);
job_1_run_count = 0;
job_3_run_count = 0;
res = qb_loop_job_add(l, QB_LOOP_MED, l, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_HIGH, l, job_stop_and_del_1);
ck_assert_int_eq(res, 0);
qb_loop_run(l);
ck_assert_int_eq(job_1_run_count, 0);
ck_assert_int_eq(job_3_run_count, 1);
qb_loop_destroy(l);
}
END_TEST
static Suite *loop_job_suite(void)
{
TCase *tc;
Suite *s = suite_create("loop_job");
add_tcase(s, tc, test_loop_job_input);
add_tcase(s, tc, test_loop_job_1);
add_tcase(s, tc, test_loop_job_4);
add_tcase(s, tc, test_loop_job_nuts, 5);
add_tcase(s, tc, test_job_rate_limit, 5);
add_tcase(s, tc, test_job_add_del);
add_tcase(s, tc, test_loop_job_order);
return s;
}
/*
* -----------------------------------------------------------------------
* Timers
*/
static qb_loop_timer_handle test_th;
static qb_loop_timer_handle test_th2;
static void check_time_left(void *data)
{
qb_loop_t *l = (qb_loop_t *)data;
/* NOTE: We are checking the 'stop_loop' timer here, not our own */
uint64_t abs_time = qb_loop_timer_expire_time_get(l, test_th);
uint64_t rel_time = qb_loop_timer_expire_time_remaining(l, test_th);
ck_assert(abs_time > 0ULL);
ck_assert(rel_time > 0ULL);
ck_assert(abs_time > rel_time);
ck_assert(rel_time <= 60*QB_TIME_NS_IN_MSEC);
}
START_TEST(test_loop_timer_input)
{
int32_t res;
qb_loop_t *l;
res = qb_loop_timer_add(NULL, QB_LOOP_LOW, 5*QB_TIME_NS_IN_MSEC, NULL, job_2, &test_th);
ck_assert_int_eq(res, -EINVAL);
l = qb_loop_create();
ck_assert(l != NULL);
res = qb_loop_timer_add(NULL, QB_LOOP_LOW, 5*QB_TIME_NS_IN_MSEC, NULL, job_2, &test_th);
ck_assert_int_eq(res, 0);
res = qb_loop_timer_add(l, QB_LOOP_LOW, 5*QB_TIME_NS_IN_MSEC, l, NULL, &test_th);
ck_assert_int_eq(res, -EINVAL);
qb_loop_destroy(l);
}
END_TEST
static void one_shot_tmo(void * data)
{
static int32_t been_here = QB_FALSE;
ck_assert_int_eq(been_here, QB_FALSE);
been_here = QB_TRUE;
}
static qb_loop_timer_handle reset_th;
static int32_t reset_timer_step = 0;
static void reset_one_shot_tmo(void*data)
{
int32_t res;
qb_loop_t *l = data;
if (reset_timer_step == 0) {
res = qb_loop_timer_del(l, reset_th);
ck_assert_int_eq(res, -EINVAL);
res = qb_loop_timer_is_running(l, reset_th);
ck_assert_int_eq(res, QB_FALSE);
res = qb_loop_timer_add(l, QB_LOOP_LOW, 8*QB_TIME_NS_IN_MSEC, l, reset_one_shot_tmo, &reset_th);
ck_assert_int_eq(res, 0);
}
reset_timer_step++;
}
START_TEST(test_loop_timer_basic)
{
int32_t res;
qb_loop_t *l = qb_loop_create();
ck_assert(l != NULL);
res = qb_loop_timer_add(l, QB_LOOP_LOW, 5*QB_TIME_NS_IN_MSEC, l, one_shot_tmo, &test_th);
ck_assert_int_eq(res, 0);
res = qb_loop_timer_is_running(l, test_th);
ck_assert_int_eq(res, QB_TRUE);
res = qb_loop_timer_add(l, QB_LOOP_LOW, 7*QB_TIME_NS_IN_MSEC, l, reset_one_shot_tmo, &reset_th);
ck_assert_int_eq(res, 0);
res = qb_loop_timer_add(l, QB_LOOP_HIGH, 20*QB_TIME_NS_IN_MSEC, l, check_time_left, &test_th2);
ck_assert_int_eq(res, 0);
res = qb_loop_timer_add(l, QB_LOOP_LOW, 60*QB_TIME_NS_IN_MSEC, l, job_stop, &test_th);
ck_assert_int_eq(res, 0);
qb_loop_run(l);
ck_assert_int_eq(reset_timer_step, 2);
qb_loop_destroy(l);
}
END_TEST
struct qb_stop_watch {
uint64_t start;
uint64_t end;
qb_loop_t *l;
uint64_t ns_timer;
int64_t total;
int32_t count;
int32_t killer;
qb_loop_timer_handle th;
};
static void stop_watch_tmo(void*data)
{
struct qb_stop_watch *sw = (struct qb_stop_watch *)data;
float per;
int64_t diff;
sw->end = qb_util_nano_current_get();
diff = sw->end - sw->start;
if (diff < sw->ns_timer) {
printf("timer expired early! by %"PRIi64"\n", (int64_t)(sw->ns_timer - diff));
}
ck_assert(diff >= sw->ns_timer);
sw->total += diff;
sw->total -= sw->ns_timer;
sw->start = sw->end;
sw->count++;
if (sw->count < 50) {
qb_loop_timer_add(sw->l, QB_LOOP_LOW, sw->ns_timer, data, stop_watch_tmo, &sw->th);
} else {
per = ((sw->total * 100) / sw->count) / (float)sw->ns_timer;
printf("average error for %"PRIu64" ns timer is %"PRIi64" (ns) (%f)\n",
sw->ns_timer,
- sw->total/sw->count, per);
+ (int64_t)(sw->total/sw->count), per);
if (sw->killer) {
qb_loop_stop(sw->l);
}
}
}
static void start_timer(qb_loop_t *l, struct qb_stop_watch *sw, uint64_t timeout, int32_t killer)
{
int32_t res;
sw->l = l;
sw->count = 0;
sw->total = 0;
sw->killer = killer;
sw->ns_timer = timeout;
sw->start = qb_util_nano_current_get();
res = qb_loop_timer_add(sw->l, QB_LOOP_LOW, sw->ns_timer, sw, stop_watch_tmo, &sw->th);
ck_assert_int_eq(res, 0);
}
START_TEST(test_loop_timer_precision)
{
int32_t i;
uint64_t tmo;
struct qb_stop_watch sw[11];
qb_loop_t *l = qb_loop_create();
ck_assert(l != NULL);
for (i = 0; i < 10; i++) {
tmo = ((1 + i * 9) * QB_TIME_NS_IN_MSEC) + 500000;
start_timer(l, &sw[i], tmo, QB_FALSE);
}
start_timer(l, &sw[i], 100 * QB_TIME_NS_IN_MSEC, QB_TRUE);
qb_loop_run(l);
qb_loop_destroy(l);
}
END_TEST
static int expire_leak_counter = 0;
#define EXPIRE_NUM_RUNS 10
static int expire_leak_runs = 0;
static void empty_func_tmo(void*data)
{
expire_leak_counter++;
}
static void stop_func_tmo(void*data)
{
qb_loop_t *l = (qb_loop_t *)data;
qb_log(LOG_DEBUG, "expire_leak_counter:%d", expire_leak_counter);
qb_loop_stop(l);
}
static void next_func_tmo(void*data)
{
qb_loop_t *l = (qb_loop_t *)data;
int32_t i;
uint64_t tmo;
uint64_t max_tmo = 0;
qb_loop_timer_handle th;
qb_log(LOG_DEBUG, "expire_leak_counter:%d", expire_leak_counter);
for (i = 0; i < 300; i++) {
tmo = ((1 + i) * QB_TIME_NS_IN_MSEC) + 500000;
qb_loop_timer_add(l, QB_LOOP_LOW, tmo, NULL, empty_func_tmo, &th);
qb_loop_timer_add(l, QB_LOOP_MED, tmo, NULL, empty_func_tmo, &th);
qb_loop_timer_add(l, QB_LOOP_HIGH, tmo, NULL, empty_func_tmo, &th);
max_tmo = QB_MAX(max_tmo, tmo);
}
expire_leak_runs++;
if (expire_leak_runs == EXPIRE_NUM_RUNS) {
qb_loop_timer_add(l, QB_LOOP_LOW, max_tmo, l, stop_func_tmo, &th);
} else {
qb_loop_timer_add(l, QB_LOOP_LOW, max_tmo, l, next_func_tmo, &th);
}
}
/*
* make sure that file descriptors don't get leaked with no qb_loop_timer_del()
*/
START_TEST(test_loop_timer_expire_leak)
{
int32_t i;
uint64_t tmo;
uint64_t max_tmo = 0;
qb_loop_timer_handle th;
qb_loop_t *l = qb_loop_create();
ck_assert(l != NULL);
expire_leak_counter = 0;
for (i = 0; i < 300; i++) {
tmo = ((1 + i) * QB_TIME_NS_IN_MSEC) + 500000;
qb_loop_timer_add(l, QB_LOOP_LOW, tmo, NULL, empty_func_tmo, &th);
qb_loop_timer_add(l, QB_LOOP_MED, tmo, NULL, empty_func_tmo, &th);
qb_loop_timer_add(l, QB_LOOP_HIGH, tmo, NULL, empty_func_tmo, &th);
max_tmo = QB_MAX(max_tmo, tmo);
}
qb_loop_timer_add(l, QB_LOOP_LOW, max_tmo, l, next_func_tmo, &th);
expire_leak_runs = 1;
qb_loop_run(l);
ck_assert_int_eq(expire_leak_counter, 300*3* EXPIRE_NUM_RUNS);
qb_loop_destroy(l);
}
END_TEST
static int received_signum = 0;
static int received_sigs = 0;
static int32_t
sig_handler(int32_t rsignal, void *data)
{
qb_loop_t *l = (qb_loop_t *)data;
qb_log(LOG_DEBUG, "caught signal %d", rsignal);
received_signum = rsignal;
received_sigs++;
qb_loop_job_add(l, QB_LOOP_LOW, NULL, job_stop);
return 0;
}
START_TEST(test_loop_sig_handling)
{
qb_loop_signal_handle handle;
qb_loop_t *l = qb_loop_create();
ck_assert(l != NULL);
qb_loop_signal_add(l, QB_LOOP_HIGH, SIGINT,
l, sig_handler, &handle);
qb_loop_signal_add(l, QB_LOOP_HIGH, SIGTERM,
l, sig_handler, &handle);
qb_loop_signal_add(l, QB_LOOP_HIGH, SIGQUIT,
l, sig_handler, &handle);
kill(getpid(), SIGINT);
qb_loop_run(l);
ck_assert_int_eq(received_signum, SIGINT);
kill(getpid(), SIGQUIT);
qb_loop_run(l);
ck_assert_int_eq(received_signum, SIGQUIT);
qb_loop_destroy(l);
}
END_TEST
/* Globals for this test only */
static int our_signal_called = 0;
static qb_loop_t *this_l;
static void handle_nonqb_signal(int num)
{
our_signal_called = 1;
qb_loop_job_add(this_l, QB_LOOP_LOW, NULL, job_stop);
}
START_TEST(test_loop_dont_override_other_signals)
{
qb_loop_signal_handle handle;
this_l = qb_loop_create();
ck_assert(this_l != NULL);
signal(SIGUSR1, handle_nonqb_signal);
qb_loop_signal_add(this_l, QB_LOOP_HIGH, SIGINT,
this_l, sig_handler, &handle);
kill(getpid(), SIGUSR1);
qb_loop_run(this_l);
ck_assert_int_eq(our_signal_called, 1);
qb_loop_destroy(this_l);
}
END_TEST
START_TEST(test_loop_sig_only_get_one)
{
int res;
qb_loop_signal_handle handle;
qb_loop_t *l = qb_loop_create();
ck_assert(l != NULL);
/* make sure we only get one call to the handler
* don't assume we are going to exit the loop.
*/
received_sigs = 0;
qb_loop_signal_add(l, QB_LOOP_LOW, SIGINT,
l, sig_handler, &handle);
res = qb_loop_job_add(l, QB_LOOP_MED, NULL, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_HIGH, NULL, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_MED, NULL, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_HIGH, NULL, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_HIGH, NULL, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_MED, NULL, job_1);
ck_assert_int_eq(res, 0);
kill(getpid(), SIGINT);
qb_loop_run(l);
ck_assert_int_eq(received_signum, SIGINT);
ck_assert_int_eq(received_sigs, 1);
qb_loop_destroy(l);
}
END_TEST
static qb_loop_signal_handle sig_hdl;
static void
job_rm_sig_handler(void *data)
{
int res;
qb_loop_t *l = (qb_loop_t *)data;
res = qb_loop_signal_del(l, sig_hdl);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_LOW, NULL, job_stop);
ck_assert_int_eq(res, 0);
}
START_TEST(test_loop_sig_delete)
{
int res;
qb_loop_t *l = qb_loop_create();
ck_assert(l != NULL);
/* make sure we can remove a signal job from the job queue.
*/
received_sigs = 0;
received_signum = 0;
res = qb_loop_signal_add(l, QB_LOOP_MED, SIGINT,
l, sig_handler, &sig_hdl);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_HIGH, NULL,
job_rm_sig_handler);
ck_assert_int_eq(res, 0);
kill(getpid(), SIGINT);
qb_loop_run(l);
ck_assert_int_eq(received_sigs, 0);
ck_assert_int_eq(received_signum, 0);
qb_loop_destroy(l);
}
END_TEST
static Suite *
loop_timer_suite(void)
{
TCase *tc;
Suite *s = suite_create("loop_timers");
add_tcase(s, tc, test_loop_timer_input);
add_tcase(s, tc, test_loop_timer_basic, 30);
add_tcase(s, tc, test_loop_timer_precision, 30);
add_tcase(s, tc, test_loop_timer_expire_leak, 30);
return s;
}
static Suite *
loop_signal_suite(void)
{
TCase *tc;
Suite *s = suite_create("loop_signal_suite");
add_tcase(s, tc, test_loop_sig_handling, 10);
add_tcase(s, tc, test_loop_sig_only_get_one);
add_tcase(s, tc, test_loop_sig_delete);
add_tcase(s, tc, test_loop_dont_override_other_signals);
return s;
}
int32_t
main(void)
{
int32_t number_failed;
SRunner *sr = srunner_create(loop_job_suite());
srunner_add_suite (sr, loop_timer_suite());
srunner_add_suite (sr, loop_signal_suite());
qb_log_init("check", LOG_USER, LOG_EMERG);
atexit(qb_log_fini);
qb_log_ctl(QB_LOG_SYSLOG, QB_LOG_CONF_ENABLED, QB_FALSE);
qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD,
QB_LOG_FILTER_FILE, "*", LOG_INFO);
qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE);
srunner_run_all(sr, CK_VERBOSE);
number_failed = srunner_ntests_failed(sr);
srunner_free(sr);
return (number_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE;
}
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Tue, Jul 8, 6:21 PM (11 h, 58 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2002590
Default Alt Text
(66 KB)
Attached To
Mode
rQ LibQB
Attached
Detach File
Event Timeline
Log In to Comment