Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F7632315
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
50 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/lib/ipc_int.h b/lib/ipc_int.h
index 500315e..f63e053 100644
--- a/lib/ipc_int.h
+++ b/lib/ipc_int.h
@@ -1,207 +1,208 @@
/*
* Copyright (C) 2009 Red Hat, Inc.
*
* Author: Steven Dake <sdake@redhat.com>
* Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef QB_IPC_INT_H_DEFINED
#define QB_IPC_INT_H_DEFINED
#include "os_base.h"
#include <dirent.h>
#include <qb/qblist.h>
#include <qb/qbloop.h>
#include <qb/qbipcc.h>
#include <qb/qbipcs.h>
#include <qb/qbipc_common.h>
#include <qb/qbrb.h>
#define QB_IPC_MAX_WAIT_MS 2000
/*
Client Server
SEND CONN REQ ->
ACCEPT & CREATE queues
or DENY
<- SEND ACCEPT(with details)/DENY
*/
struct qb_ipc_connection_request {
struct qb_ipc_request_header hdr;
uint32_t max_msg_size;
} __attribute__ ((aligned(8)));
struct qb_ipc_event_connection_request {
struct qb_ipc_request_header hdr;
intptr_t connection;
} __attribute__ ((aligned(8)));
struct qb_ipc_connection_response {
struct qb_ipc_response_header hdr;
int32_t connection_type;
uint32_t max_msg_size;
intptr_t connection;
char request[PATH_MAX];
char response[PATH_MAX];
char event[PATH_MAX];
} __attribute__ ((aligned(8)));
struct qb_ipcc_connection;
struct qb_ipc_one_way {
size_t max_msg_size;
enum qb_ipc_type type;
union {
struct {
int32_t sock;
char *sock_name;
void* shared_data;
char shared_file_name[NAME_MAX];
} us;
struct {
qb_ringbuffer_t *rb;
} shm;
} u;
};
struct qb_ipcc_funcs {
ssize_t (*recv)(struct qb_ipc_one_way *one_way, void *buf, size_t buf_size, int32_t timeout);
ssize_t (*send)(struct qb_ipc_one_way *one_way, const void *data, size_t size);
ssize_t (*sendv)(struct qb_ipc_one_way *one_way, const struct iovec *iov, size_t iov_len);
void (*disconnect)(struct qb_ipcc_connection* c);
int32_t (*fc_get)(struct qb_ipc_one_way *one_way);
};
struct qb_ipcc_connection {
char name[NAME_MAX];
int32_t needs_sock_for_poll;
+ gid_t egid;
struct qb_ipc_one_way setup;
struct qb_ipc_one_way request;
struct qb_ipc_one_way response;
struct qb_ipc_one_way event;
struct qb_ipcc_funcs funcs;
struct qb_ipc_request_header *receive_buf;
uint32_t fc_enable_max;
int32_t is_connected;
void * context;
};
int32_t qb_ipcc_us_setup_connect(struct qb_ipcc_connection *c,
struct qb_ipc_connection_response *r);
ssize_t qb_ipc_us_send(struct qb_ipc_one_way *one_way, const void *msg, size_t len);
ssize_t qb_ipc_us_recv(struct qb_ipc_one_way *one_way, void *msg, size_t len, int32_t timeout);
int32_t qb_ipc_us_ready(struct qb_ipc_one_way *ow_data, struct qb_ipc_one_way *ow_conn,
int32_t ms_timeout, int32_t events);
void qb_ipcc_us_sock_close(int32_t sock);
int32_t qb_ipcc_us_connect(struct qb_ipcc_connection *c, struct qb_ipc_connection_response * response);
int32_t qb_ipcc_shm_connect(struct qb_ipcc_connection *c, struct qb_ipc_connection_response * response);
struct qb_ipcs_service;
struct qb_ipcs_connection;
struct qb_ipcs_funcs {
int32_t (*connect)(struct qb_ipcs_service *s, struct qb_ipcs_connection *c,
struct qb_ipc_connection_response *r);
void (*disconnect)(struct qb_ipcs_connection *c);
ssize_t (*recv)(struct qb_ipc_one_way *one_way, void *buf, size_t buf_size, int32_t timeout);
ssize_t (*peek)(struct qb_ipc_one_way *one_way, void **data_out, int32_t timeout);
void (*reclaim)(struct qb_ipc_one_way *one_way);
ssize_t (*send)(struct qb_ipc_one_way *one_way, const void *data, size_t size);
ssize_t (*sendv)(struct qb_ipc_one_way *one_way, const struct iovec* iov, size_t iov_len);
void (*fc_set)(struct qb_ipc_one_way *one_way, int32_t fc_enable);
ssize_t (*q_len_get)(struct qb_ipc_one_way *one_way);
};
struct qb_ipcs_service {
enum qb_ipc_type type;
char name[NAME_MAX];
uint32_t max_buffer_size;
int32_t service_id;
int32_t ref_count;
pid_t pid;
int32_t needs_sock_for_poll;
int32_t server_sock;
struct qb_ipcs_service_handlers serv_fns;
struct qb_ipcs_poll_handlers poll_fns;
struct qb_ipcs_funcs funcs;
enum qb_loop_priority poll_priority;
struct qb_list_head connections;
struct qb_list_head list;
struct qb_ipcs_stats stats;
void *context;
};
enum qb_ipcs_connection_state {
QB_IPCS_CONNECTION_INACTIVE,
QB_IPCS_CONNECTION_ACTIVE,
QB_IPCS_CONNECTION_ESTABLISHED,
QB_IPCS_CONNECTION_SHUTTING_DOWN,
};
#define CONNECTION_DESCRIPTION (34) /* INT_MAX length + 3 */
struct qb_ipcs_connection_auth {
uid_t uid;
gid_t gid;
mode_t mode;
};
struct qb_ipcs_connection {
enum qb_ipcs_connection_state state;
int32_t refcount;
pid_t pid;
uid_t euid;
gid_t egid;
struct qb_ipcs_connection_auth auth;
struct qb_ipc_one_way setup;
struct qb_ipc_one_way request;
struct qb_ipc_one_way response;
struct qb_ipc_one_way event;
struct qb_ipcs_service *service;
struct qb_list_head list;
struct qb_ipc_request_header *receive_buf;
void *context;
int32_t fc_enabled;
int32_t poll_events;
int32_t outstanding_notifiers;
char description[CONNECTION_DESCRIPTION];
struct qb_ipcs_connection_stats_2 stats;
};
void qb_ipcs_us_init(struct qb_ipcs_service *s);
void qb_ipcs_shm_init(struct qb_ipcs_service *s);
int32_t qb_ipcs_us_publish(struct qb_ipcs_service *s);
int32_t qb_ipcs_us_withdraw(struct qb_ipcs_service *s);
int32_t qb_ipcc_us_sock_connect(const char *socket_name, int32_t * sock_pt);
int32_t qb_ipcs_dispatch_connection_request(int32_t fd, int32_t revents, void *data);
struct qb_ipcs_connection* qb_ipcs_connection_alloc(struct qb_ipcs_service *s);
int32_t qb_ipcs_process_request(struct qb_ipcs_service *s,
struct qb_ipc_request_header *hdr);
int32_t qb_ipc_us_sock_error_is_disconnected(int err);
#endif /* QB_IPC_INT_H_DEFINED */
diff --git a/lib/ipc_setup.c b/lib/ipc_setup.c
index 06257c1..700de94 100644
--- a/lib/ipc_setup.c
+++ b/lib/ipc_setup.c
@@ -1,820 +1,855 @@
/*
* Copyright (C) 2010,2013 Red Hat, Inc.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os_base.h"
#if defined(HAVE_GETPEERUCRED)
#include <ucred.h>
#endif
#ifdef HAVE_SYS_UN_H
#include <sys/un.h>
#endif /* HAVE_SYS_UN_H */
#ifdef HAVE_SYS_STAT_H
#include <sys/stat.h>
#endif
#ifdef HAVE_SYS_MMAN_H
#include <sys/mman.h>
#endif
#include <qb/qbatomic.h>
#include <qb/qbipcs.h>
#include <qb/qbloop.h>
#include <qb/qbdefs.h>
#include "util_int.h"
#include "ipc_int.h"
struct ipc_auth_ugp {
uid_t uid;
gid_t gid;
pid_t pid;
};
struct ipc_auth_data {
int32_t sock;
struct qb_ipcs_service *s;
- struct qb_ipc_connection_request msg;
+ union {
+ struct qb_ipc_connection_request req;
+ struct qb_ipc_connection_response res;
+ } msg;
struct msghdr msg_recv;
struct iovec iov_recv;
struct ipc_auth_ugp ugp;
size_t processed;
size_t len;
#ifdef SO_PASSCRED
char *cmsg_cred;
#endif
};
static int32_t qb_ipcs_us_connection_acceptor(int fd, int revent, void *data);
ssize_t
qb_ipc_us_send(struct qb_ipc_one_way *one_way, const void *msg, size_t len)
{
int32_t result;
int32_t processed = 0;
char *rbuf = (char *)msg;
qb_sigpipe_ctl(QB_SIGPIPE_IGNORE);
retry_send:
result = send(one_way->u.us.sock,
&rbuf[processed], len - processed, MSG_NOSIGNAL);
if (result == -1) {
if (errno == EAGAIN && processed > 0) {
goto retry_send;
} else {
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
return -errno;
}
}
processed += result;
if (processed != len) {
goto retry_send;
}
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
return processed;
}
static ssize_t
qb_ipc_us_recv_msghdr(struct ipc_auth_data *data)
{
char *msg = (char *) &data->msg;
int32_t result;
qb_sigpipe_ctl(QB_SIGPIPE_IGNORE);
retry_recv:
data->msg_recv.msg_iov->iov_base = &msg[data->processed];
data->msg_recv.msg_iov->iov_len = data->len - data->processed;
result = recvmsg(data->sock, &data->msg_recv, MSG_NOSIGNAL | MSG_WAITALL);
if (result == -1 && errno == EAGAIN) {
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
return -EAGAIN;
}
if (result == -1) {
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
return -errno;
}
if (result == 0) {
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
qb_util_log(LOG_DEBUG,
"recv(fd %d) got 0 bytes assuming ENOTCONN", data->sock);
return -ENOTCONN;
}
data->processed += result;
if (data->processed != data->len) {
goto retry_recv;
}
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
assert(data->processed == data->len);
return data->processed;
}
int32_t
qb_ipc_us_sock_error_is_disconnected(int err)
{
if (err >= 0) {
return QB_FALSE;
} else if (err == -EAGAIN ||
err == -ETIMEDOUT ||
err == -EINTR ||
#ifdef EWOULDBLOCK
err == -EWOULDBLOCK ||
#endif
err == -EMSGSIZE ||
err == -ENOMSG ||
err == -EINVAL) {
return QB_FALSE;
}
return QB_TRUE;
}
int32_t
qb_ipc_us_ready(struct qb_ipc_one_way * ow_data,
struct qb_ipc_one_way * ow_conn,
int32_t ms_timeout, int32_t events)
{
struct pollfd ufds[2];
int32_t poll_events;
int numfds = 1;
int i;
ufds[0].fd = ow_data->u.us.sock;
ufds[0].events = events;
ufds[0].revents = 0;
if (ow_conn && ow_data != ow_conn) {
numfds++;
ufds[1].fd = ow_conn->u.us.sock;
ufds[1].events = POLLIN;
ufds[1].revents = 0;
}
poll_events = poll(ufds, numfds, ms_timeout);
if ((poll_events == -1 && errno == EINTR) || poll_events == 0) {
return -EAGAIN;
} else if (poll_events == -1) {
return -errno;
}
for (i = 0; i < poll_events; i++) {
if (ufds[i].revents & POLLERR) {
qb_util_log(LOG_DEBUG, "poll(fd %d) got POLLERR",
ufds[i].fd);
return -ENOTCONN;
} else if (ufds[i].revents & POLLHUP) {
qb_util_log(LOG_DEBUG, "poll(fd %d) got POLLHUP",
ufds[i].fd);
return -ENOTCONN;
} else if (ufds[i].revents & POLLNVAL) {
qb_util_log(LOG_DEBUG, "poll(fd %d) got POLLNVAL",
ufds[i].fd);
return -ENOTCONN;
} else if (ufds[i].revents == 0) {
qb_util_log(LOG_DEBUG, "poll(fd %d) zero revents",
ufds[i].fd);
return -ENOTCONN;
}
}
return 0;
}
/*
* recv an entire message - and try hard to get all of it.
*/
ssize_t
qb_ipc_us_recv(struct qb_ipc_one_way * one_way,
void *msg, size_t len, int32_t timeout)
{
int32_t result;
int32_t final_rc = 0;
int32_t processed = 0;
int32_t to_recv = len;
char *data = msg;
qb_sigpipe_ctl(QB_SIGPIPE_IGNORE);
retry_recv:
result = recv(one_way->u.us.sock, &data[processed], to_recv,
MSG_NOSIGNAL | MSG_WAITALL);
if (result == -1) {
if (errno == EAGAIN && (processed > 0 || timeout == -1)) {
result = qb_ipc_us_ready(one_way, NULL, timeout, POLLIN);
if (result == 0 || result == -EAGAIN) {
goto retry_recv;
}
final_rc = result;
goto cleanup_sigpipe;
} else if (errno == ECONNRESET || errno == EPIPE) {
final_rc = -ENOTCONN;
goto cleanup_sigpipe;
} else {
final_rc = -errno;
goto cleanup_sigpipe;
}
}
if (result == 0) {
final_rc = -ENOTCONN;
goto cleanup_sigpipe;
}
processed += result;
to_recv -= result;
if (processed != len) {
goto retry_recv;
}
final_rc = processed;
cleanup_sigpipe:
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
return final_rc;
}
static int32_t
qb_ipcc_stream_sock_connect(const char *socket_name, int32_t * sock_pt)
{
int32_t request_fd;
struct sockaddr_un address;
int32_t res = 0;
request_fd = socket(PF_UNIX, SOCK_STREAM, 0);
if (request_fd == -1) {
return -errno;
}
qb_socket_nosigpipe(request_fd);
res = qb_sys_fd_nonblock_cloexec_set(request_fd);
if (res < 0) {
goto error_connect;
}
memset(&address, 0, sizeof(struct sockaddr_un));
address.sun_family = AF_UNIX;
#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
address.sun_len = QB_SUN_LEN(&address);
#endif
#if defined(QB_LINUX) || defined(QB_CYGWIN)
snprintf(address.sun_path + 1, UNIX_PATH_MAX - 1, "%s", socket_name);
#else
snprintf(address.sun_path, sizeof(address.sun_path), "%s/%s", SOCKETDIR,
socket_name);
#endif
if (connect(request_fd, (struct sockaddr *)&address,
QB_SUN_LEN(&address)) == -1) {
res = -errno;
goto error_connect;
}
*sock_pt = request_fd;
return 0;
error_connect:
close(request_fd);
*sock_pt = -1;
return res;
}
void
qb_ipcc_us_sock_close(int32_t sock)
{
shutdown(sock, SHUT_RDWR);
close(sock);
}
+static int32_t
+qb_ipc_auth_creds(struct ipc_auth_data *data)
+{
+ int32_t res = 0;
+
+ /*
+ * currently support getpeerucred, getpeereid, and SO_PASSCRED credential
+ * retrieval mechanisms for various Platforms
+ */
+#ifdef HAVE_GETPEERUCRED
+ /*
+ * Solaris and some BSD systems
+ */
+ {
+ ucred_t *uc = NULL;
+
+ if (getpeerucred(data->sock, &uc) == 0) {
+ res = 0;
+ data->ugp.uid = ucred_geteuid(uc);
+ data->ugp.gid = ucred_getegid(uc);
+ data->ugp.pid = ucred_getpid(uc);
+ ucred_free(uc);
+ } else {
+ res = -errno;
+ }
+ }
+#elif defined(HAVE_GETPEEREID)
+ /*
+ * Usually MacOSX systems
+ */
+ {
+ /*
+ * TODO get the peer's pid.
+ * c->pid = ?;
+ */
+ if (getpeereid(data->sock, &data->ugp.uid, &data->ugp.gid) == 0) {
+ res = 0;
+ } else {
+ res = -errno;
+ }
+ }
+
+#elif defined(SO_PASSCRED)
+ /*
+ * Usually Linux systems
+ */
+ {
+ struct ucred cred;
+ struct cmsghdr *cmsg;
+
+ res = -EINVAL;
+ for (cmsg = CMSG_FIRSTHDR(&data->msg_recv); cmsg != NULL;
+ cmsg = CMSG_NXTHDR(&data->msg_recv, cmsg)) {
+ if (cmsg->cmsg_type != SCM_CREDENTIALS)
+ continue;
+
+ memcpy(&cred, CMSG_DATA(cmsg), sizeof(struct ucred));
+ res = 0;
+ data->ugp.pid = cred.pid;
+ data->ugp.uid = cred.uid;
+ data->ugp.gid = cred.gid;
+ break;
+ }
+ }
+#else /* no credentials */
+ data->ugp.pid = 0;
+ data->ugp.uid = 0;
+ data->ugp.gid = 0;
+ res = -ENOTSUP;
+#endif /* no credentials */
+
+ return res;
+}
+
+static void
+destroy_ipc_auth_data(struct ipc_auth_data *data)
+{
+ if (data->s) {
+ qb_ipcs_unref(data->s);
+ }
+
+#ifdef SO_PASSCRED
+ free(data->cmsg_cred);
+#endif
+ free(data);
+}
+
+static struct ipc_auth_data *
+init_ipc_auth_data(int sock, size_t len)
+{
+ struct ipc_auth_data *data = calloc(1, sizeof(struct ipc_auth_data));
+
+ if (data == NULL) {
+ return NULL;
+ }
+
+ data->msg_recv.msg_iov = &data->iov_recv;
+ data->msg_recv.msg_iovlen = 1;
+ data->msg_recv.msg_name = 0;
+ data->msg_recv.msg_namelen = 0;
+
+#ifdef SO_PASSCRED
+ data->cmsg_cred = calloc(1, CMSG_SPACE(sizeof(struct ucred)));
+ if (data->cmsg_cred == NULL) {
+ destroy_ipc_auth_data(data);
+ return NULL;
+ }
+ data->msg_recv.msg_control = (void *)data->cmsg_cred;
+ data->msg_recv.msg_controllen = CMSG_SPACE(sizeof(struct ucred));
+#endif
+#ifdef QB_SOLARIS
+ data->msg_recv.msg_accrights = 0;
+ data->msg_recv.msg_accrightslen = 0;
+#else
+ data->msg_recv.msg_flags = 0;
+#endif /* QB_SOLARIS */
+
+ data->len = len;
+ data->iov_recv.iov_base = &data->msg;
+ data->iov_recv.iov_len = data->len;
+ data->sock = sock;
+
+ return data;
+}
+
int32_t
qb_ipcc_us_setup_connect(struct qb_ipcc_connection *c,
struct qb_ipc_connection_response *r)
{
int32_t res;
struct qb_ipc_connection_request request;
+ struct ipc_auth_data *data;
#ifdef QB_LINUX
int off = 0;
int on = 1;
#endif
res = qb_ipcc_stream_sock_connect(c->name, &c->setup.u.us.sock);
if (res != 0) {
return res;
}
#ifdef QB_LINUX
setsockopt(c->setup.u.us.sock, SOL_SOCKET, SO_PASSCRED, &on,
sizeof(on));
#endif
memset(&request, 0, sizeof(request));
request.hdr.id = QB_IPC_MSG_AUTHENTICATE;
request.hdr.size = sizeof(request);
request.max_msg_size = c->setup.max_msg_size;
res = qb_ipc_us_send(&c->setup, &request, request.hdr.size);
if (res < 0) {
qb_ipcc_us_sock_close(c->setup.u.us.sock);
return res;
}
+
+ data = init_ipc_auth_data(c->setup.u.us.sock, sizeof(struct qb_ipc_connection_response));
+ if (data == NULL) {
+ qb_ipcc_us_sock_close(c->setup.u.us.sock);
+ return -ENOMEM;
+ }
+
+ qb_ipc_us_ready(&c->setup, NULL, -1, POLLIN);
+ res = qb_ipc_us_recv_msghdr(data);
+
#ifdef QB_LINUX
setsockopt(c->setup.u.us.sock, SOL_SOCKET, SO_PASSCRED, &off,
sizeof(off));
#endif
- res =
- qb_ipc_us_recv(&c->setup, r,
- sizeof(struct qb_ipc_connection_response), -1);
- if (res < 0) {
+ if (res != data->len) {
+ destroy_ipc_auth_data(data);
return res;
}
- if (r->hdr.error != 0) {
- return r->hdr.error;
- }
- return 0;
+ memcpy(r, &data->msg.res, sizeof(struct qb_ipc_connection_response));
+
+ qb_ipc_auth_creds(data);
+ c->egid = data->ugp.gid;
+
+ destroy_ipc_auth_data(data);
+ return r->hdr.error;
}
/*
**************************************************************************
* SERVER
*/
int32_t
qb_ipcs_us_publish(struct qb_ipcs_service * s)
{
struct sockaddr_un un_addr;
int32_t res;
#ifdef SO_PASSCRED
int on = 1;
#endif
/*
* Create socket for IPC clients, name socket, listen for connections
*/
s->server_sock = socket(PF_UNIX, SOCK_STREAM, 0);
if (s->server_sock == -1) {
res = -errno;
qb_util_perror(LOG_ERR, "Cannot create server socket");
return res;
}
res = qb_sys_fd_nonblock_cloexec_set(s->server_sock);
if (res < 0) {
goto error_close;
}
memset(&un_addr, 0, sizeof(struct sockaddr_un));
un_addr.sun_family = AF_UNIX;
#if defined(QB_BSD) || defined(QB_DARWIN)
un_addr.sun_len = SUN_LEN(&un_addr);
#endif
qb_util_log(LOG_INFO, "server name: %s", s->name);
#if defined(QB_LINUX) || defined(QB_CYGWIN)
snprintf(un_addr.sun_path + 1, UNIX_PATH_MAX - 1, "%s", s->name);
#else
{
struct stat stat_out;
res = stat(SOCKETDIR, &stat_out);
if (res == -1 || (res == 0 && !S_ISDIR(stat_out.st_mode))) {
res = -errno;
qb_util_log(LOG_CRIT,
"Required directory not present %s",
SOCKETDIR);
goto error_close;
}
snprintf(un_addr.sun_path, sizeof(un_addr.sun_path), "%s/%s", SOCKETDIR,
s->name);
unlink(un_addr.sun_path);
}
#endif
res = bind(s->server_sock, (struct sockaddr *)&un_addr,
QB_SUN_LEN(&un_addr));
if (res) {
res = -errno;
qb_util_perror(LOG_ERR, "Could not bind AF_UNIX (%s)",
un_addr.sun_path);
goto error_close;
}
/*
* Allow everyone to write to the socket since the IPC layer handles
* security automatically
*/
#if !defined(QB_LINUX) && !defined(QB_CYGWIN)
res = chmod(un_addr.sun_path, S_IRWXU | S_IRWXG | S_IRWXO);
#endif
#ifdef SO_PASSCRED
setsockopt(s->server_sock, SOL_SOCKET, SO_PASSCRED, &on, sizeof(on));
#endif
if (listen(s->server_sock, SERVER_BACKLOG) == -1) {
qb_util_perror(LOG_ERR, "socket listen failed");
}
res = s->poll_fns.dispatch_add(s->poll_priority, s->server_sock,
POLLIN | POLLPRI | POLLNVAL,
s, qb_ipcs_us_connection_acceptor);
return res;
error_close:
close(s->server_sock);
return res;
}
int32_t
qb_ipcs_us_withdraw(struct qb_ipcs_service * s)
{
qb_util_log(LOG_INFO, "withdrawing server sockets");
(void)s->poll_fns.dispatch_del(s->server_sock);
shutdown(s->server_sock, SHUT_RDWR);
close(s->server_sock);
s->server_sock = -1;
return 0;
}
static int32_t
handle_new_connection(struct qb_ipcs_service *s,
int32_t auth_result,
int32_t sock,
void *msg, size_t len, struct ipc_auth_ugp *ugp)
{
struct qb_ipcs_connection *c = NULL;
struct qb_ipc_connection_request *req = msg;
int32_t res = auth_result;
int32_t res2 = 0;
uint32_t max_buffer_size = QB_MAX(req->max_msg_size, s->max_buffer_size);
struct qb_ipc_connection_response response;
c = qb_ipcs_connection_alloc(s);
if (c == NULL) {
qb_ipcc_us_sock_close(sock);
return -ENOMEM;
}
c->receive_buf = calloc(1, max_buffer_size);
if (c->receive_buf == NULL) {
free(c);
qb_ipcc_us_sock_close(sock);
return -ENOMEM;
}
c->setup.u.us.sock = sock;
c->request.max_msg_size = max_buffer_size;
c->response.max_msg_size = max_buffer_size;
c->event.max_msg_size = max_buffer_size;
c->pid = ugp->pid;
c->auth.uid = c->euid = ugp->uid;
c->auth.gid = c->egid = ugp->gid;
c->auth.mode = 0600;
c->stats.client_pid = ugp->pid;
snprintf(c->description, CONNECTION_DESCRIPTION,
"%d-%d-%d", s->pid, ugp->pid, c->setup.u.us.sock);
if (auth_result == 0 && c->service->serv_fns.connection_accept) {
res = c->service->serv_fns.connection_accept(c,
c->euid, c->egid);
}
if (res != 0) {
goto send_response;
}
qb_util_log(LOG_DEBUG, "IPC credentials authenticated (%s)",
c->description);
memset(&response, 0, sizeof(response));
if (s->funcs.connect) {
res = s->funcs.connect(s, c, &response);
if (res != 0) {
goto send_response;
}
}
/*
* The connection is good, add it to the active connection list
*/
c->state = QB_IPCS_CONNECTION_ACTIVE;
qb_list_add(&c->list, &s->connections);
send_response:
response.hdr.id = QB_IPC_MSG_AUTHENTICATE;
response.hdr.size = sizeof(response);
response.hdr.error = res;
if (res == 0) {
response.connection = (intptr_t) c;
response.connection_type = s->type;
response.max_msg_size = c->request.max_msg_size;
s->stats.active_connections++;
}
res2 = qb_ipc_us_send(&c->setup, &response, response.hdr.size);
if (res == 0 && res2 != response.hdr.size) {
res = res2;
}
if (res == 0) {
qb_ipcs_connection_ref(c);
if (s->serv_fns.connection_created) {
s->serv_fns.connection_created(c);
}
if (c->state == QB_IPCS_CONNECTION_ACTIVE) {
c->state = QB_IPCS_CONNECTION_ESTABLISHED;
}
qb_ipcs_connection_unref(c);
} else {
if (res == -EACCES) {
qb_util_log(LOG_ERR, "Invalid IPC credentials (%s).",
c->description);
} else if (res == -EAGAIN) {
qb_util_log(LOG_WARNING, "Denied connection, is not ready (%s)",
c->description);
} else {
errno = -res;
qb_util_perror(LOG_ERR,
"Error in connection setup (%s)",
c->description);
}
if (c->state == QB_IPCS_CONNECTION_INACTIVE) {
/* This removes the initial alloc ref */
qb_ipcs_connection_unref(c);
qb_ipcc_us_sock_close(sock);
} else {
qb_ipcs_disconnect(c);
}
}
return res;
}
-static void
-destroy_ipc_auth_data(struct ipc_auth_data *data)
-{
- if (data->s) {
- qb_ipcs_unref(data->s);
- }
-
-#ifdef SO_PASSCRED
- free(data->cmsg_cred);
-#endif
- free(data);
-}
-
static int32_t
process_auth(int32_t fd, int32_t revents, void *d)
{
struct ipc_auth_data *data = (struct ipc_auth_data *) d;
int32_t res = 0;
#ifdef SO_PASSCRED
int off = 0;
#endif
if (data->s->server_sock == -1) {
qb_util_log(LOG_DEBUG, "Closing fd (%d) for server shutdown", fd);
res = -ESHUTDOWN;
goto cleanup_and_return;
}
if (revents & POLLNVAL) {
qb_util_log(LOG_DEBUG, "NVAL conn fd (%d)", fd);
res = -EINVAL;
goto cleanup_and_return;
}
if (revents & POLLHUP) {
qb_util_log(LOG_DEBUG, "HUP conn fd (%d)", fd);
res = -ESHUTDOWN;
goto cleanup_and_return;
}
if ((revents & POLLIN) == 0) {
return 0;
}
res = qb_ipc_us_recv_msghdr(data);
if (res == -EAGAIN) {
/* yield to mainloop, Let mainloop call us again */
return 0;
}
if (res != data->len) {
res = -EIO;
goto cleanup_and_return;
}
- /*
- * currently support getpeerucred, getpeereid, and SO_PASSCRED credential
- * retrieval mechanisms for various Platforms
- */
-#ifdef HAVE_GETPEERUCRED
- /*
- * Solaris and some BSD systems
- */
- {
- ucred_t *uc = NULL;
-
- if (getpeerucred(data->sock, &uc) == 0) {
- res = 0;
- data->ugp.uid = ucred_geteuid(uc);
- data->ugp.gid = ucred_getegid(uc);
- data->ugp.pid = ucred_getpid(uc);
- ucred_free(uc);
- } else {
- res = -errno;
- }
- }
-#elif HAVE_GETPEEREID
- /*
- * Usually MacOSX systems
- */
- {
- /*
- * TODO get the peer's pid.
- * c->pid = ?;
- */
- if (getpeereid(data->sock, &data->ugp.uid, &data->ugp.gid) == 0) {
- res = 0;
- } else {
- res = -errno;
- }
- }
-
-#elif SO_PASSCRED
- /*
- * Usually Linux systems
- */
- {
- struct ucred cred;
- struct cmsghdr *cmsg;
-
- res = -EINVAL;
- for (cmsg = CMSG_FIRSTHDR(&data->msg_recv); cmsg != NULL;
- cmsg = CMSG_NXTHDR(&data->msg_recv, cmsg)) {
- if (cmsg->cmsg_type != SCM_CREDENTIALS)
- continue;
-
- memcpy(&cred, CMSG_DATA(cmsg), sizeof(struct ucred));
- res = 0;
- data->ugp.pid = cred.pid;
- data->ugp.uid = cred.uid;
- data->ugp.gid = cred.gid;
- break;
- }
- }
-#else /* no credentials */
- data->ugp.pid = 0;
- data->ugp.uid = 0;
- data->ugp.gid = 0;
- res = -ENOTSUP;
-#endif /* no credentials */
+ res = qb_ipc_auth_creds(data);
cleanup_and_return:
#ifdef SO_PASSCRED
setsockopt(data->sock, SOL_SOCKET, SO_PASSCRED, &off, sizeof(off));
#endif
(void)data->s->poll_fns.dispatch_del(data->sock);
if (res < 0) {
close(data->sock);
- } else if (data->msg.hdr.id == QB_IPC_MSG_AUTHENTICATE) {
+ } else if (data->msg.req.hdr.id == QB_IPC_MSG_AUTHENTICATE) {
(void)handle_new_connection(data->s, res, data->sock, &data->msg, data->len, &data->ugp);
} else {
close(data->sock);
}
destroy_ipc_auth_data(data);
return 1;
}
static void
qb_ipcs_uc_recv_and_auth(int32_t sock, struct qb_ipcs_service *s)
{
int res = 0;
struct ipc_auth_data *data = NULL;
#ifdef SO_PASSCRED
int on = 1;
#endif
- data = calloc(1, sizeof(struct ipc_auth_data));
+ data = init_ipc_auth_data(sock, sizeof(struct qb_ipc_connection_request));
if (data == NULL) {
close(sock);
/* -ENOMEM */
return;
}
data->s = s;
qb_ipcs_ref(data->s);
- data->msg_recv.msg_iov = &data->iov_recv;
- data->msg_recv.msg_iovlen = 1;
- data->msg_recv.msg_name = 0;
- data->msg_recv.msg_namelen = 0;
-
-#ifdef SO_PASSCRED
- data->cmsg_cred = calloc(1,CMSG_SPACE(sizeof(struct ucred)));
- if (data->cmsg_cred == NULL) {
- close(sock);
- destroy_ipc_auth_data(data);
- /* -ENOMEM */
- return;
- }
- data->msg_recv.msg_control = (void *)data->cmsg_cred;
- data->msg_recv.msg_controllen = CMSG_SPACE(sizeof(struct ucred));
-#endif
-#ifdef QB_SOLARIS
- data->msg_recv.msg_accrights = 0;
- data->msg_recv.msg_accrightslen = 0;
-#else
- data->msg_recv.msg_flags = 0;
-#endif /* QB_SOLARIS */
-
- data->len = sizeof(struct qb_ipc_connection_request);
- data->iov_recv.iov_base = &data->msg;
- data->iov_recv.iov_len = data->len;
- data->sock = sock;
-
#ifdef SO_PASSCRED
setsockopt(sock, SOL_SOCKET, SO_PASSCRED, &on, sizeof(on));
#endif
res = s->poll_fns.dispatch_add(QB_LOOP_MED,
data->sock,
POLLIN | POLLPRI | POLLNVAL,
data, process_auth);
if (res < 0) {
qb_util_log(LOG_DEBUG, "Failed to process AUTH for fd (%d)", data->sock);
close(sock);
destroy_ipc_auth_data(data);
}
}
static int32_t
qb_ipcs_us_connection_acceptor(int fd, int revent, void *data)
{
struct sockaddr_un un_addr;
int32_t new_fd;
struct qb_ipcs_service *s = (struct qb_ipcs_service *)data;
int32_t res;
socklen_t addrlen = sizeof(struct sockaddr_un);
if (revent & (POLLNVAL | POLLHUP | POLLERR)) {
/*
* handle shutdown more cleanly.
*/
return -1;
}
retry_accept:
errno = 0;
new_fd = accept(fd, (struct sockaddr *)&un_addr, &addrlen);
if (new_fd == -1 && errno == EINTR) {
goto retry_accept;
}
if (new_fd == -1 && errno == EBADF) {
qb_util_perror(LOG_ERR,
"Could not accept client connection from fd:%d",
fd);
return -1;
}
if (new_fd == -1) {
qb_util_perror(LOG_ERR, "Could not accept client connection");
/* This is an error, but -1 would indicate disconnect
* from the poll loop
*/
return 0;
}
res = qb_sys_fd_nonblock_cloexec_set(new_fd);
if (res < 0) {
close(new_fd);
/* This is an error, but -1 would indicate disconnect
* from the poll loop
*/
return 0;
}
qb_ipcs_uc_recv_and_auth(new_fd, s);
return 0;
}
diff --git a/lib/ipc_socket.c b/lib/ipc_socket.c
index 8d2179d..e4a2606 100644
--- a/lib/ipc_socket.c
+++ b/lib/ipc_socket.c
@@ -1,837 +1,839 @@
/*
* Copyright (C) 2010,2013 Red Hat, Inc.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os_base.h"
#ifdef HAVE_SYS_UN_H
#include <sys/un.h>
#endif /* HAVE_SYS_UN_H */
#ifdef HAVE_SYS_MMAN_H
#include <sys/mman.h>
#endif
#include <qb/qbatomic.h>
#include <qb/qbipcs.h>
#include <qb/qbloop.h>
#include <qb/qbdefs.h>
#include "util_int.h"
#include "ipc_int.h"
struct ipc_us_control {
int32_t sent;
int32_t flow_control;
};
#define SHM_CONTROL_SIZE (3 * sizeof(struct ipc_us_control))
static void
set_sock_addr(struct sockaddr_un *address, const char *socket_name)
{
memset(address, 0, sizeof(struct sockaddr_un));
address->sun_family = AF_UNIX;
#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
address->sun_len = QB_SUN_LEN(address);
#endif
#if defined(QB_LINUX) || defined(QB_CYGWIN)
snprintf(address->sun_path + 1, UNIX_PATH_MAX - 1, "%s", socket_name);
#else
snprintf(address->sun_path, sizeof(address->sun_path), "%s/%s", SOCKETDIR,
socket_name);
#endif
}
static int32_t
qb_ipc_dgram_sock_setup(const char *base_name,
- const char *service_name, int32_t * sock_pt)
+ const char *service_name, int32_t * sock_pt,
+ gid_t gid)
{
int32_t request_fd;
struct sockaddr_un local_address;
int32_t res = 0;
char sock_path[PATH_MAX];
request_fd = socket(PF_UNIX, SOCK_DGRAM, 0);
if (request_fd == -1) {
return -errno;
}
qb_socket_nosigpipe(request_fd);
res = qb_sys_fd_nonblock_cloexec_set(request_fd);
if (res < 0) {
goto error_connect;
}
snprintf(sock_path, PATH_MAX, "%s-%s", base_name, service_name);
set_sock_addr(&local_address, sock_path);
#if !(defined(QB_LINUX) || defined(QB_CYGWIN))
res = unlink(local_address.sun_path);
#endif
res = bind(request_fd, (struct sockaddr *)&local_address,
sizeof(local_address));
#if !(defined(QB_LINUX) || defined(QB_CYGWIN))
chmod(local_address.sun_path, 0660);
+ chown(local_address.sun_path, -1, gid);
#endif
if (res < 0) {
goto error_connect;
}
*sock_pt = request_fd;
return 0;
error_connect:
close(request_fd);
*sock_pt = -1;
return res;
}
static int32_t
set_sock_size(int sockfd, size_t max_msg_size)
{
int32_t rc;
unsigned int optval;
socklen_t optlen = sizeof(optval);
rc = getsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, &optval, &optlen);
qb_util_log(LOG_TRACE, "%d: getsockopt(%d, needed:%d) actual:%d",
rc, sockfd, max_msg_size, optval);
/* The optvat <= max_msg_size check is weird...
* during testing it was discovered in some instances if the
* default optval is exactly equal to our max_msg_size, we couldn't
* actually send a message that large unless we explicilty set
* it using setsockopt... there is no good explaination for this. Most
* likely this is hitting some sort of "off by one" error in the kernel. */
if (rc == 0 && optval <= max_msg_size) {
optval = max_msg_size;
optlen = sizeof(optval);
rc = setsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, &optval, optlen);
}
return rc;
}
static int32_t
dgram_verify_msg_size(size_t max_msg_size)
{
int32_t rc = -1;
int32_t sockets[2];
int32_t tries = 0;
int32_t write_passed = 0;
int32_t read_passed = 0;
char buf[max_msg_size];
if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets) < 0) {
goto cleanup_socks;
}
if (set_sock_size(sockets[0], max_msg_size) != 0) {
goto cleanup_socks;
}
if (set_sock_size(sockets[1], max_msg_size) != 0) {
goto cleanup_socks;
}
for (tries = 0; tries < 3; tries++) {
if (write_passed == 0) {
rc = write(sockets[1], buf, max_msg_size);
if (rc < 0 && (errno == EAGAIN || errno == EINTR)) {
continue;
} else if (rc == max_msg_size) {
write_passed = 1;
} else {
break;
}
}
if (read_passed == 0) {
rc = read(sockets[0], buf, max_msg_size);
if (rc < 0 && (errno == EAGAIN || errno == EINTR)) {
continue;
} else if (rc == max_msg_size) {
read_passed = 1;
} else {
break;
}
}
if (read_passed && write_passed) {
rc = 0;
break;
}
}
cleanup_socks:
close(sockets[0]);
close(sockets[1]);
return rc;
}
int32_t
qb_ipcc_verify_dgram_max_msg_size(size_t max_msg_size)
{
int32_t i;
int32_t last = -1;
int32_t inc = 2048;
if (dgram_verify_msg_size(max_msg_size) == 0) {
return max_msg_size;
}
for (i = inc; i < max_msg_size; i+=inc) {
if (dgram_verify_msg_size(i) == 0) {
last = i;
} else if (inc >= 512) {
i-=inc;
inc = inc/2;
} else {
break;
}
}
return last;
}
/*
* bind to "base_name-local_name"
* connect to "base_name-remote_name"
* output sock_pt
*/
static int32_t
qb_ipc_dgram_sock_connect(const char *base_name,
const char *local_name,
const char *remote_name,
- int32_t max_msg_size, int32_t * sock_pt)
+ int32_t max_msg_size, int32_t * sock_pt, gid_t gid)
{
char sock_path[PATH_MAX];
struct sockaddr_un remote_address;
int32_t res = qb_ipc_dgram_sock_setup(base_name, local_name,
- sock_pt);
+ sock_pt, gid);
if (res < 0) {
return res;
}
snprintf(sock_path, PATH_MAX, "%s-%s", base_name, remote_name);
set_sock_addr(&remote_address, sock_path);
if (connect(*sock_pt, (struct sockaddr *)&remote_address,
QB_SUN_LEN(&remote_address)) == -1) {
res = -errno;
goto error_connect;
}
return set_sock_size(*sock_pt, max_msg_size);
error_connect:
close(*sock_pt);
*sock_pt = -1;
return res;
}
static int32_t
_finish_connecting(struct qb_ipc_one_way *one_way)
{
struct sockaddr_un remote_address;
int res;
int error;
int retry = 0;
set_sock_addr(&remote_address, one_way->u.us.sock_name);
/* this retry loop is here to help connecting when trying to send
* an event right after connection setup.
*/
do {
errno = 0;
res = connect(one_way->u.us.sock,
(struct sockaddr *)&remote_address,
QB_SUN_LEN(&remote_address));
if (res == -1) {
error = -errno;
qb_util_perror(LOG_DEBUG, "error calling connect()");
retry++;
usleep(100000);
}
} while (res == -1 && retry < 10);
if (res == -1) {
return error;
}
free(one_way->u.us.sock_name);
one_way->u.us.sock_name = NULL;
return set_sock_size(one_way->u.us.sock, one_way->max_msg_size);
}
/*
* client functions
* --------------------------------------------------------
*/
static void
qb_ipcc_us_disconnect(struct qb_ipcc_connection *c)
{
#if !(defined(QB_LINUX) || defined(QB_CYGWIN))
struct sockaddr_un un_addr;
socklen_t un_addr_len = sizeof(struct sockaddr_un);
char *base_name;
char sock_name[PATH_MAX];
size_t length;
#endif
munmap(c->request.u.us.shared_data, SHM_CONTROL_SIZE);
unlink(c->request.u.us.shared_file_name);
#if !(defined(QB_LINUX) || defined(QB_CYGWIN))
if (getsockname(c->response.u.us.sock, (struct sockaddr *)&un_addr, &un_addr_len) == 0) {
length = strlen(un_addr.sun_path);
base_name = strndup(un_addr.sun_path,length-9);
qb_util_log(LOG_DEBUG, "unlinking socket bound files with base_name=%s length=%d",base_name,length);
snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"request");
qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
unlink(sock_name);
snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"event");
qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
unlink(sock_name);
snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"event-tx");
qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
unlink(sock_name);
snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"response");
qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
unlink(sock_name);
}
#endif
qb_ipcc_us_sock_close(c->event.u.us.sock);
qb_ipcc_us_sock_close(c->request.u.us.sock);
qb_ipcc_us_sock_close(c->setup.u.us.sock);
}
static ssize_t
qb_ipc_socket_send(struct qb_ipc_one_way *one_way,
const void *msg_ptr, size_t msg_len)
{
ssize_t rc = 0;
struct ipc_us_control *ctl;
ctl = (struct ipc_us_control *)one_way->u.us.shared_data;
if (one_way->u.us.sock_name) {
rc = _finish_connecting(one_way);
if (rc < 0) {
qb_util_log(LOG_ERR, "socket connect-on-send");
return rc;
}
}
qb_sigpipe_ctl(QB_SIGPIPE_IGNORE);
rc = send(one_way->u.us.sock, msg_ptr, msg_len, MSG_NOSIGNAL);
if (rc == -1) {
rc = -errno;
if (errno != EAGAIN && errno != ENOBUFS) {
qb_util_perror(LOG_DEBUG, "socket_send:send");
}
}
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
if (ctl && rc == msg_len) {
qb_atomic_int_inc(&ctl->sent);
}
return rc;
}
static ssize_t
qb_ipc_socket_sendv(struct qb_ipc_one_way *one_way, const struct iovec *iov,
size_t iov_len)
{
int32_t rc;
struct ipc_us_control *ctl;
ctl = (struct ipc_us_control *)one_way->u.us.shared_data;
qb_sigpipe_ctl(QB_SIGPIPE_IGNORE);
if (one_way->u.us.sock_name) {
rc = _finish_connecting(one_way);
if (rc < 0) {
qb_util_perror(LOG_ERR, "socket connect-on-sendv");
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
return rc;
}
}
rc = writev(one_way->u.us.sock, iov, iov_len);
if (rc == -1) {
rc = -errno;
if (errno != EAGAIN && errno != ENOBUFS) {
qb_util_perror(LOG_DEBUG, "socket_sendv:writev %d",
one_way->u.us.sock);
}
}
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
if (ctl && rc > 0) {
qb_atomic_int_inc(&ctl->sent);
}
return rc;
}
/*
* recv a message of unknown size.
*/
static ssize_t
qb_ipc_us_recv_at_most(struct qb_ipc_one_way *one_way,
void *msg, size_t len, int32_t timeout)
{
int32_t result;
int32_t final_rc = 0;
int32_t to_recv = 0;
char *data = msg;
struct ipc_us_control *ctl = NULL;
int32_t time_waited = 0;
int32_t time_to_wait = timeout;
if (timeout == -1) {
time_to_wait = 1000;
}
qb_sigpipe_ctl(QB_SIGPIPE_IGNORE);
retry_peek:
result = recv(one_way->u.us.sock, data,
sizeof(struct qb_ipc_request_header),
MSG_NOSIGNAL | MSG_PEEK);
if (result == -1) {
if (errno != EAGAIN) {
final_rc = -errno;
#if !(defined(QB_LINUX) || defined(QB_CYGWIN))
if (errno == ECONNRESET || errno == EPIPE) {
final_rc = -ENOTCONN;
}
#endif
goto cleanup_sigpipe;
}
/* check to see if we have enough time left to try again */
if (time_waited < timeout || timeout == -1) {
result = qb_ipc_us_ready(one_way, NULL, time_to_wait, POLLIN);
if (qb_ipc_us_sock_error_is_disconnected(result)) {
final_rc = result;
goto cleanup_sigpipe;
}
time_waited += time_to_wait;
goto retry_peek;
} else if (time_waited >= timeout) {
final_rc = -ETIMEDOUT;
goto cleanup_sigpipe;
}
}
if (result >= sizeof(struct qb_ipc_request_header)) {
struct qb_ipc_request_header *hdr = NULL;
hdr = (struct qb_ipc_request_header *)msg;
to_recv = hdr->size;
}
result = recv(one_way->u.us.sock, data, to_recv,
MSG_NOSIGNAL | MSG_WAITALL);
if (result == -1) {
final_rc = -errno;
goto cleanup_sigpipe;
} else if (result == 0) {
qb_util_log(LOG_DEBUG, "recv == 0 -> ENOTCONN");
final_rc = -ENOTCONN;
goto cleanup_sigpipe;
}
final_rc = result;
ctl = (struct ipc_us_control *)one_way->u.us.shared_data;
if (ctl) {
(void)qb_atomic_int_dec_and_test(&ctl->sent);
}
cleanup_sigpipe:
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
return final_rc;
}
static void
qb_ipc_us_fc_set(struct qb_ipc_one_way *one_way, int32_t fc_enable)
{
struct ipc_us_control *ctl =
(struct ipc_us_control *)one_way->u.us.shared_data;
qb_util_log(LOG_TRACE, "setting fc to %d", fc_enable);
qb_atomic_int_set(&ctl->flow_control, fc_enable);
}
static int32_t
qb_ipc_us_fc_get(struct qb_ipc_one_way *one_way)
{
struct ipc_us_control *ctl =
(struct ipc_us_control *)one_way->u.us.shared_data;
return qb_atomic_int_get(&ctl->flow_control);
}
static ssize_t
qb_ipc_us_q_len_get(struct qb_ipc_one_way *one_way)
{
struct ipc_us_control *ctl =
(struct ipc_us_control *)one_way->u.us.shared_data;
return qb_atomic_int_get(&ctl->sent);
}
int32_t
qb_ipcc_us_connect(struct qb_ipcc_connection * c,
struct qb_ipc_connection_response * r)
{
int32_t res;
char path[PATH_MAX];
int32_t fd_hdr;
char *shm_ptr;
qb_atomic_init();
c->needs_sock_for_poll = QB_FALSE;
c->funcs.send = qb_ipc_socket_send;
c->funcs.sendv = qb_ipc_socket_sendv;
c->funcs.recv = qb_ipc_us_recv_at_most;
c->funcs.fc_get = qb_ipc_us_fc_get;
c->funcs.disconnect = qb_ipcc_us_disconnect;
fd_hdr = qb_sys_mmap_file_open(path, r->request,
SHM_CONTROL_SIZE, O_RDWR);
if (fd_hdr < 0) {
res = fd_hdr;
errno = -fd_hdr;
qb_util_perror(LOG_ERR, "couldn't open file for mmap");
return res;
}
(void)strlcpy(c->request.u.us.shared_file_name, r->request, NAME_MAX);
shm_ptr = mmap(0, SHM_CONTROL_SIZE,
PROT_READ | PROT_WRITE, MAP_SHARED, fd_hdr, 0);
if (shm_ptr == MAP_FAILED) {
res = -errno;
qb_util_perror(LOG_ERR, "couldn't create mmap for header");
goto cleanup_hdr;
}
c->request.u.us.shared_data = shm_ptr;
c->response.u.us.shared_data = shm_ptr + sizeof(struct ipc_us_control);
c->event.u.us.shared_data = shm_ptr + (2 * sizeof(struct ipc_us_control));
close(fd_hdr);
fd_hdr = -1;
res = qb_ipc_dgram_sock_connect(r->response, "response", "request",
- r->max_msg_size, &c->request.u.us.sock);
+ r->max_msg_size, &c->request.u.us.sock, c->egid);
if (res != 0) {
goto cleanup_hdr;
}
c->response.u.us.sock = c->request.u.us.sock;
res = qb_ipc_dgram_sock_connect(r->response, "event", "event-tx",
- r->max_msg_size, &c->event.u.us.sock);
+ r->max_msg_size, &c->event.u.us.sock, c->egid);
if (res != 0) {
goto cleanup_hdr;
}
return 0;
cleanup_hdr:
if (fd_hdr >= 0) {
close(fd_hdr);
}
close(c->event.u.us.sock);
close(c->request.u.us.sock);
unlink(r->request);
munmap(c->request.u.us.shared_data, SHM_CONTROL_SIZE);
return res;
}
/*
* service functions
* --------------------------------------------------------
*/
static int32_t
_sock_connection_liveliness(int32_t fd, int32_t revents, void *data)
{
struct qb_ipcs_connection *c = (struct qb_ipcs_connection *)data;
qb_util_log(LOG_DEBUG, "LIVENESS: fd %d event %d conn (%s)",
fd, revents, c->description);
if (revents & POLLNVAL) {
qb_util_log(LOG_DEBUG, "NVAL conn (%s)", c->description);
qb_ipcs_disconnect(c);
return -EINVAL;
}
if (revents & POLLHUP) {
qb_util_log(LOG_DEBUG, "HUP conn (%s)", c->description);
qb_ipcs_disconnect(c);
return -ESHUTDOWN;
}
/* If we actually get POLLIN for some reason here, it most
* certainly means EOF. Do a recv on the fd to detect eof and
* then disconnect */
if (revents & POLLIN) {
char buf[10];
int res;
res = recv(fd, buf, sizeof(buf), MSG_DONTWAIT);
if (res < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
res = -errno;
} else if (res == 0) {
qb_util_log(LOG_DEBUG, "EOF conn (%s)", c->description);
res = -ESHUTDOWN;
}
if (res < 0) {
qb_ipcs_disconnect(c);
return res;
}
}
return 0;
}
static int32_t
_sock_add_to_mainloop(struct qb_ipcs_connection *c)
{
int res;
res = c->service->poll_fns.dispatch_add(c->service->poll_priority,
c->request.u.us.sock,
POLLIN | POLLPRI | POLLNVAL,
c,
qb_ipcs_dispatch_connection_request);
if (res < 0) {
qb_util_log(LOG_ERR,
"Error adding socket to mainloop (%s).",
c->description);
return res;
}
res = c->service->poll_fns.dispatch_add(c->service->poll_priority,
c->setup.u.us.sock,
POLLIN | POLLPRI | POLLNVAL,
c, _sock_connection_liveliness);
qb_util_log(LOG_DEBUG, "added %d to poll loop (liveness)",
c->setup.u.us.sock);
if (res < 0) {
qb_util_perror(LOG_ERR, "Error adding setupfd to mainloop");
(void)c->service->poll_fns.dispatch_del(c->request.u.us.sock);
return res;
}
return res;
}
static void
_sock_rm_from_mainloop(struct qb_ipcs_connection *c)
{
(void)c->service->poll_fns.dispatch_del(c->request.u.us.sock);
(void)c->service->poll_fns.dispatch_del(c->setup.u.us.sock);
}
static void
qb_ipcs_us_disconnect(struct qb_ipcs_connection *c)
{
#if !(defined(QB_LINUX) || defined(QB_CYGWIN))
struct sockaddr_un un_addr;
socklen_t un_addr_len = sizeof(struct sockaddr_un);
char *base_name;
char sock_name[PATH_MAX];
size_t length;
#endif
qb_enter();
if (c->state == QB_IPCS_CONNECTION_ESTABLISHED ||
c->state == QB_IPCS_CONNECTION_ACTIVE) {
_sock_rm_from_mainloop(c);
#if !(defined(QB_LINUX) || defined(QB_CYGWIN))
if (getsockname(c->response.u.us.sock, (struct sockaddr *)&un_addr, &un_addr_len) == 0) {
length = strlen(un_addr.sun_path);
base_name = strndup(un_addr.sun_path,length-8);
qb_util_log(LOG_DEBUG, "unlinking socket bound files with base_name=%s length=%d",base_name,length);
snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"request");
qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
unlink(sock_name);
snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"event");
qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
unlink(sock_name);
snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"event-tx");
qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
unlink(sock_name);
snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"response");
qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
unlink(sock_name);
}
#endif
qb_ipcc_us_sock_close(c->setup.u.us.sock);
qb_ipcc_us_sock_close(c->request.u.us.sock);
qb_ipcc_us_sock_close(c->event.u.us.sock);
}
if (c->state == QB_IPCS_CONNECTION_SHUTTING_DOWN ||
c->state == QB_IPCS_CONNECTION_ACTIVE) {
munmap(c->request.u.us.shared_data, SHM_CONTROL_SIZE);
unlink(c->request.u.us.shared_file_name);
}
}
static int32_t
qb_ipcs_us_connect(struct qb_ipcs_service *s,
struct qb_ipcs_connection *c,
struct qb_ipc_connection_response *r)
{
char path[PATH_MAX];
int32_t fd_hdr;
int32_t res = 0;
struct ipc_us_control *ctl;
char *shm_ptr;
qb_util_log(LOG_DEBUG, "connecting to client (%s)", c->description);
c->request.u.us.sock = c->setup.u.us.sock;
c->response.u.us.sock = c->setup.u.us.sock;
snprintf(r->request, NAME_MAX, "qb-%s-control-%s",
s->name, c->description);
snprintf(r->response, NAME_MAX, "qb-%s-%s", s->name, c->description);
fd_hdr = qb_sys_mmap_file_open(path, r->request,
SHM_CONTROL_SIZE,
O_CREAT | O_TRUNC | O_RDWR);
if (fd_hdr < 0) {
res = fd_hdr;
errno = -fd_hdr;
qb_util_perror(LOG_ERR, "couldn't create file for mmap (%s)",
c->description);
return res;
}
(void)strlcpy(r->request, path, PATH_MAX);
(void)strlcpy(c->request.u.us.shared_file_name, r->request, NAME_MAX);
res = chown(r->request, c->auth.uid, c->auth.gid);
if (res != 0) {
/* ignore res, this is just for the compiler warnings.
*/
res = 0;
}
res = chmod(r->request, c->auth.mode);
if (res != 0) {
/* ignore res, this is just for the compiler warnings.
*/
res = 0;
}
shm_ptr = mmap(0, SHM_CONTROL_SIZE,
PROT_READ | PROT_WRITE, MAP_SHARED, fd_hdr, 0);
if (shm_ptr == MAP_FAILED) {
res = -errno;
qb_util_perror(LOG_ERR, "couldn't create mmap for header (%s)",
c->description);
goto cleanup_hdr;
}
c->request.u.us.shared_data = shm_ptr;
c->response.u.us.shared_data = shm_ptr + sizeof(struct ipc_us_control);
c->event.u.us.shared_data = shm_ptr + (2 * sizeof(struct ipc_us_control));
ctl = (struct ipc_us_control *)c->request.u.us.shared_data;
ctl->sent = 0;
ctl->flow_control = 0;
ctl = (struct ipc_us_control *)c->response.u.us.shared_data;
ctl->sent = 0;
ctl->flow_control = 0;
ctl = (struct ipc_us_control *)c->event.u.us.shared_data;
ctl->sent = 0;
ctl->flow_control = 0;
close(fd_hdr);
fd_hdr = -1;
/* request channel */
res = qb_ipc_dgram_sock_setup(r->response, "request",
- &c->request.u.us.sock);
+ &c->request.u.us.sock, c->egid);
if (res < 0) {
goto cleanup_hdr;
}
c->setup.u.us.sock_name = NULL;
c->request.u.us.sock_name = NULL;
/* response channel */
c->response.u.us.sock = c->request.u.us.sock;
snprintf(path, PATH_MAX, "%s-%s", r->response, "response");
c->response.u.us.sock_name = strdup(path);
/* event channel */
res = qb_ipc_dgram_sock_setup(r->response, "event-tx",
- &c->event.u.us.sock);
+ &c->event.u.us.sock, c->egid);
if (res < 0) {
goto cleanup_hdr;
}
snprintf(path, PATH_MAX, "%s-%s", r->response, "event");
c->event.u.us.sock_name = strdup(path);
res = _sock_add_to_mainloop(c);
if (res < 0) {
goto cleanup_hdr;
}
return res;
cleanup_hdr:
free(c->response.u.us.sock_name);
free(c->event.u.us.sock_name);
if (fd_hdr >= 0) {
close(fd_hdr);
}
unlink(r->request);
munmap(c->request.u.us.shared_data, SHM_CONTROL_SIZE);
return res;
}
void
qb_ipcs_us_init(struct qb_ipcs_service *s)
{
s->funcs.connect = qb_ipcs_us_connect;
s->funcs.disconnect = qb_ipcs_us_disconnect;
s->funcs.recv = qb_ipc_us_recv_at_most;
s->funcs.peek = NULL;
s->funcs.reclaim = NULL;
s->funcs.send = qb_ipc_socket_send;
s->funcs.sendv = qb_ipc_socket_sendv;
s->funcs.fc_set = qb_ipc_us_fc_set;
s->funcs.q_len_get = qb_ipc_us_q_len_get;
s->needs_sock_for_poll = QB_FALSE;
qb_atomic_init();
}
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Thu, Oct 16, 3:20 PM (18 h, 51 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2536522
Default Alt Text
(50 KB)
Attached To
Mode
rQ LibQB
Attached
Detach File
Event Timeline
Log In to Comment