Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F2825207
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
25 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/lib/ipc_us.c b/lib/ipc_us.c
index 27856b8..8d674cc 100644
--- a/lib/ipc_us.c
+++ b/lib/ipc_us.c
@@ -1,633 +1,634 @@
/*
* Copyright (C) 2010 Red Hat, Inc.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os_base.h"
#if defined(HAVE_GETPEERUCRED)
#include <ucred.h>
#endif
#ifdef HAVE_SYS_UN_H
#include <sys/un.h>
#endif /* HAVE_SYS_UN_H */
#ifdef HAVE_SYS_STAT_H
#include <sys/stat.h>
#endif
#include <qb/qbipcs.h>
#include <qb/qbloop.h>
#include "util_int.h"
#include "ipc_int.h"
#define SERVER_BACKLOG 5
#if defined(QB_LINUX) || defined(QB_SOLARIS)
#define QB_SUN_LEN(a) sizeof(*(a))
#else
#define QB_SUN_LEN(a) SUN_LEN(a)
#endif
static int32_t qb_ipcs_us_connection_acceptor(int fd, int revent, void *data);
#ifdef SO_NOSIGPIPE
static void socket_nosigpipe(int32_t s)
{
int32_t on = 1;
setsockopt(s, SOL_SOCKET, SO_NOSIGPIPE, (void *)&on, sizeof(on));
}
#endif
static void set_cloexec_flag(int32_t fd)
{
int32_t oldflags = fcntl(fd, F_GETFD, 0);
if (oldflags < 0) {
oldflags = 0;
}
oldflags |= FD_CLOEXEC;
fcntl(fd, F_SETFD, oldflags);
}
#ifndef MSG_NOSIGNAL
#define MSG_NOSIGNAL 0
#endif
int32_t qb_ipc_us_send(int32_t s, const void *msg, size_t len)
{
int32_t result;
struct msghdr msg_send;
struct iovec iov_send;
char *rbuf = (char *)msg;
int32_t processed = 0;
msg_send.msg_iov = &iov_send;
msg_send.msg_iovlen = 1;
msg_send.msg_name = 0;
msg_send.msg_namelen = 0;
#if !defined(QB_SOLARIS)
msg_send.msg_control = 0;
msg_send.msg_controllen = 0;
msg_send.msg_flags = 0;
#else
msg_send.msg_accrights = NULL;
msg_send.msg_accrightslen = 0;
#endif
retry_send:
iov_send.iov_base = &rbuf[processed];
iov_send.iov_len = len - processed;
result = sendmsg(s, &msg_send, MSG_NOSIGNAL);
if (result == -1 && errno == EAGAIN) {
goto retry_send;
}
if (result == -1) {
return -errno;
}
processed += result;
if (processed != len) {
goto retry_send;
}
return processed;
}
static ssize_t qb_ipc_us_recv_msghdr(int32_t s,
struct msghdr *hdr, char *msg, size_t len)
{
int32_t result;
int32_t processed = 0;
retry_recv:
hdr->msg_iov->iov_base = &msg[processed];
hdr->msg_iov->iov_len = len - processed;
result = recvmsg(s, hdr, MSG_NOSIGNAL | MSG_WAITALL);
if (result == -1 && errno == EAGAIN) {
goto retry_recv;
}
if (result == -1) {
return -errno;
}
#if defined(QB_SOLARIS) || defined(QB_BSD) || defined(QB_DARWIN)
/* On many OS poll never return POLLHUP or POLLERR.
* EOF is detected when recvmsg return 0.
*/
if (result == 0) {
return -errno; //ENOTCONN
}
#endif
processed += result;
if (processed != len) {
goto retry_recv;
}
assert(processed == len);
return processed;
}
int32_t qb_ipc_us_recv_ready(int32_t s, int32_t ms_timeout)
{
struct pollfd ufds;
int32_t poll_events;
ufds.fd = s;
ufds.events = POLLIN;
ufds.revents = 0;
poll_events = poll (&ufds, 1, ms_timeout);
if ((poll_events == -1 && errno == EINTR) ||
poll_events == 0) {
return -EAGAIN;
} else if (poll_events == -1) {
return -errno;
} else if (poll_events == 1 && (ufds.revents & (POLLERR|POLLHUP))) {
return -ESHUTDOWN;
}
return 0;
}
int32_t qb_ipc_us_recv(int32_t s, void *msg, size_t len)
{
int32_t result;
retry_recv:
result = recv(s, msg, len, MSG_NOSIGNAL | MSG_WAITALL);
if (result == -1 && errno == EAGAIN) {
goto retry_recv;
}
if (result == -1) {
return -errno;
}
#if defined(QB_SOLARIS) || defined(QB_BSD) || defined(QB_DARWIN)
/* On many OS poll never return POLLHUP or POLLERR.
* EOF is detected when recvmsg return 0.
*/
if (result == 0) {
return -errno; //ENOTCONN
}
#endif
return result;
}
static int32_t qb_ipcs_uc_recv_and_auth(struct qb_ipcs_connection *c)
{
int32_t res = 0;
struct msghdr msg_recv;
struct iovec iov_recv;
struct qb_ipc_connection_request setup_msg;
#ifdef QB_LINUX
struct cmsghdr *cmsg;
char cmsg_cred[CMSG_SPACE(sizeof(struct ucred))];
int off = 0;
int on = 1;
struct ucred *cred;
#endif
msg_recv.msg_flags = 0;
msg_recv.msg_iov = &iov_recv;
msg_recv.msg_iovlen = 1;
msg_recv.msg_name = 0;
msg_recv.msg_namelen = 0;
#ifdef QB_LINUX
msg_recv.msg_control = (void *)cmsg_cred;
msg_recv.msg_controllen = sizeof(cmsg_cred);
#endif
#ifdef QB_SOLARIS
msg_recv.msg_accrights = 0;
msg_recv.msg_accrightslen = 0;
#endif /* QB_SOLARIS */
iov_recv.iov_base = &setup_msg;
iov_recv.iov_len = sizeof(struct qb_ipc_connection_request);
#ifdef QB_LINUX
setsockopt(c->sock, SOL_SOCKET, SO_PASSCRED, &on, sizeof(on));
#endif
res = qb_ipc_us_recv_msghdr(c->sock, &msg_recv, (char *)&setup_msg,
sizeof(struct qb_ipc_connection_request));
if (res < 0) {
goto cleanup_and_return;
}
if (res != sizeof(struct qb_ipc_connection_request)) {
res = -EIO;
goto cleanup_and_return;
}
c->request.max_msg_size = setup_msg.max_msg_size;
c->response.max_msg_size = setup_msg.max_msg_size;
c->event.max_msg_size = setup_msg.max_msg_size;
res = -EBADMSG;
/*
* currently support getpeerucred, getpeereid, and SO_PASSCRED credential
* retrieval mechanisms for various Platforms
*/
#ifdef HAVE_GETPEERUCRED
/*
* Solaris and some BSD systems
*/
{
ucred_t *uc = NULL;
if (getpeerucred(c->sock, &uc) == 0) {
res = 0;
c->euid = ucred_geteuid(uc);
c->egid = ucred_getegid(uc);
c->pid = ucred_getpid(uc);
ucred_free(uc);
} else {
res = -errno;
}
}
#elif HAVE_GETPEEREID
/*
* Usually MacOSX systems
*/
{
/*
* TODO get the peer's pid.
* c->pid = ?;
*/
if (getpeereid(c->sock, &c->euid, &c->egid) == 0) {
res = 0;
} else {
res = -errno;
}
}
#elif SO_PASSCRED
/*
* Usually Linux systems
*/
cmsg = CMSG_FIRSTHDR(&msg_recv);
assert(cmsg);
cred = (struct ucred *)CMSG_DATA(cmsg);
if (cred) {
res = 0;
c->pid = cred->pid;
c->euid = cred->uid;
c->egid = cred->gid;
} else {
res = -EBADMSG;
}
#else /* no credentials */
res = -ENOTSUP;
#endif /* no credentials */
cleanup_and_return:
#ifdef QB_LINUX
setsockopt(c->sock, SOL_SOCKET, SO_PASSCRED, &off, sizeof(off));
#endif
if (res == 0) {
if (c->service->serv_fns.connection_accept) {
res = c->service->serv_fns.connection_accept(c,
c->euid,
c->egid);
} else {
res = 0;
}
}
return res;
}
int32_t qb_ipcc_us_connect(const char *socket_name, int32_t * sock_pt)
{
int32_t request_fd;
struct sockaddr_un address;
#if defined(QB_SOLARIS)
request_fd = socket(PF_UNIX, SOCK_STREAM, 0);
#else
request_fd = socket(PF_LOCAL, SOCK_STREAM, 0);
#endif
if (request_fd == -1) {
return -errno;
}
#ifdef SO_NOSIGPIPE
socket_nosigpipe(request_fd);
#endif /* SO_NOSIGPIPE */
set_cloexec_flag(request_fd);
memset(&address, 0, sizeof(struct sockaddr_un));
address.sun_family = AF_UNIX;
#if defined(QB_BSD) || defined(QB_DARWIN)
address.sun_len = SUN_LEN(&address);
#endif
#if defined(QB_LINUX)
sprintf(address.sun_path + 1, "%s", socket_name);
#else
sprintf(address.sun_path, "%s/%s", SOCKETDIR, socket_name);
#endif
if (connect(request_fd, (struct sockaddr *)&address,
QB_SUN_LEN(&address)) == -1) {
goto error_connect;
}
*sock_pt = request_fd;
return 0;
error_connect:
close(request_fd);
*sock_pt = -1;
return -errno;
}
void qb_ipcc_us_disconnect(int32_t sock)
{
shutdown(sock, SHUT_RDWR);
close(sock);
}
#if 0
cs_error_t coroipcc_dispatch_get(hdb_handle_t handle, void **data, int timeout)
{
struct pollfd ufds;
int poll_events;
char buf;
struct ipc_instance *ipc_instance;
char *data_addr;
cs_error_t error = CS_OK;
int res;
error =
hdb_error_to_cs(hdb_handle_get
(&ipc_hdb, handle, (void **)&ipc_instance));
if (error != CS_OK) {
return (error);
}
*data = NULL;
ufds.fd = ipc_instance->fd;
ufds.events = POLLIN;
ufds.revents = 0;
poll_events = poll(&ufds, 1, timeout);
if (poll_events == -1 && errno == EINTR) {
error = CS_ERR_TRY_AGAIN;
goto error_put;
} else if (poll_events == -1) {
error = CS_ERR_LIBRARY;
goto error_put;
} else if (poll_events == 0) {
error = CS_ERR_TRY_AGAIN;
goto error_put;
}
if (poll_events == 1 && (ufds.revents & (POLLERR | POLLHUP))) {
error = CS_ERR_LIBRARY;
goto error_put;
}
error = socket_recv(ipc_instance->fd, &buf, 1);
assert(error == CS_OK);
if (shared_mem_dispatch_bytes_left(ipc_instance) > 500000) {
/*
* Notify coroipcs to flush any pending dispatch messages
*/
res =
ipc_sem_post(ipc_instance->control_buffer,
SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
if (res != CS_OK) {
error = CS_ERR_LIBRARY;
goto error_put;
}
}
data_addr = ipc_instance->dispatch_buffer;
data_addr = &data_addr[ipc_instance->control_buffer->read];
*data = (void *)data_addr;
return (CS_OK);
error_put:
hdb_handle_put(&ipc_hdb, handle);
return (error);
}
#endif
/*
**************************************************************************
* SERVER
*/
int32_t qb_ipcs_us_publish(struct qb_ipcs_service * s)
{
struct sockaddr_un un_addr;
int32_t res;
char error_str[100];
/*
* Create socket for IPC clients, name socket, listen for connections
*/
#if defined(QB_SOLARIS)
s->server_sock = socket(PF_UNIX, SOCK_STREAM, 0);
#else
s->server_sock = socket(PF_LOCAL, SOCK_STREAM, 0);
#endif
if (s->server_sock == -1) {
res = -errno;
strerror_r(errno, error_str, 100);
qb_util_log(LOG_ERR,
"Cannot create server socket: %s\n", error_str);
return res;
}
set_cloexec_flag(s->server_sock);
res = fcntl(s->server_sock, F_SETFL, O_NONBLOCK);
if (res == -1) {
res = -errno;
strerror_r(errno, error_str, 100);
qb_util_log(LOG_CRIT,
"Could not set non-blocking operation on server socket: %s\n",
error_str);
goto error_close;
}
memset(&un_addr, 0, sizeof(struct sockaddr_un));
un_addr.sun_family = AF_UNIX;
#if defined(QB_BSD) || defined(QB_DARWIN)
un_addr.sun_len = SUN_LEN(&un_addr);
#endif
#if defined(QB_LINUX)
sprintf(un_addr.sun_path + 1, "%s", s->name);
#else
{
struct stat stat_out;
res = stat(SOCKETDIR, &stat_out);
if (res == -1 || (res == 0 && !S_ISDIR(stat_out.st_mode))) {
res = -errno;
qb_util_log(LOG_CRIT,
"Required directory not present %s\n",
SOCKETDIR);
goto error_close;
}
sprintf(un_addr.sun_path, "%s/%s", SOCKETDIR, s->name);
unlink(un_addr.sun_path);
}
#endif
res =
bind(s->server_sock, (struct sockaddr *)&un_addr,
QB_SUN_LEN(&un_addr));
if (res) {
res = -errno;
strerror_r(errno, error_str, 100);
qb_util_log(LOG_CRIT,
"Could not bind AF_UNIX (%s): %s.\n",
un_addr.sun_path, error_str);
goto error_close;
}
/*
* Allow eveyrone to write to the socket since the IPC layer handles
* security automatically
*/
#if !defined(QB_LINUX)
res = chmod(un_addr.sun_path, S_IRWXU | S_IRWXG | S_IRWXO);
#endif
if (listen(s->server_sock, SERVER_BACKLOG) == -1) {
strerror_r(errno, error_str, 100);
qb_util_log(LOG_ERR, "listen failed: %s.\n", error_str);
}
s->poll_fns.dispatch_add(s->poll_priority, s->server_sock,
POLLIN | POLLPRI | POLLNVAL,
s, qb_ipcs_us_connection_acceptor);
return 0;
error_close:
close(s->server_sock);
return res;
}
int32_t qb_ipcs_us_withdraw(struct qb_ipcs_service * s)
{
qb_util_log(LOG_INFO, "withdrawing server sockets\n");
shutdown(s->server_sock, SHUT_RDWR);
close(s->server_sock);
return 0;
}
static int32_t qb_ipcs_us_connection_acceptor(int fd, int revent, void *data)
{
struct sockaddr_un un_addr;
int32_t new_fd;
struct qb_ipcs_connection *c;
struct qb_ipcs_service *s = (struct qb_ipcs_service *)data;
struct qb_ipc_connection_response response;
int32_t res;
socklen_t addrlen = sizeof(struct sockaddr_un);
char error_str[100];
retry_accept:
errno = 0;
new_fd = accept(fd, (struct sockaddr *)&un_addr, &addrlen);
if (new_fd == -1 && errno == EINTR) {
goto retry_accept;
}
if (new_fd == -1 && errno == EBADF) {
strerror_r(errno, error_str, 100);
qb_util_log(LOG_ERR,
"Could not accept Library connection:(fd: %d) [%d] %s\n",
fd, errno, error_str);
return -1;
}
if (new_fd == -1) {
strerror_r(errno, error_str, 100);
qb_util_log(LOG_ERR,
"Could not accept Library connection: [%d] %s\n",
errno, error_str);
return 0; /* This is an error, but -1 would indicate disconnect from poll loop */
}
set_cloexec_flag(new_fd);
res = fcntl(new_fd, F_SETFL, O_NONBLOCK);
if (res == -1) {
strerror_r(errno, error_str, 100);
qb_util_log(LOG_ERR,
"Could not set non-blocking operation on library connection: %s\n",
error_str);
close(new_fd);
return 0; /* This is an error, but -1 would indicate disconnect from poll loop */
}
c = qb_ipcs_connection_alloc(s);
c->sock = new_fd;
res = qb_ipcs_uc_recv_and_auth(c);
if (res == 0) {
qb_util_log(LOG_INFO, "IPC credentials authenticated");
+ memset(&response, 0, sizeof(response));
res = s->funcs.connect(s, c, &response);
if (res != 0) {
goto send_response;
}
qb_list_add(&c->list, &s->connections);
c->receive_buf = malloc(c->request.max_msg_size);
if (s->needs_sock_for_poll) {
s->poll_fns.dispatch_add(s->poll_priority, c->sock,
POLLIN | POLLPRI | POLLNVAL,
c,
qb_ipcs_dispatch_connection_request);
}
}
send_response:
response.hdr.id = QB_IPC_MSG_AUTHENTICATE;
response.hdr.size = sizeof(response);
response.hdr.error = res;
response.connection_type = s->type;
response.max_msg_size = c->request.max_msg_size;
qb_ipc_us_send(c->sock, &response, response.hdr.size);
if (res == 0) {
if (s->serv_fns.connection_created) {
s->serv_fns.connection_created(c);
}
} else if (res == -EACCES) {
qb_util_log(LOG_ERR, "Invalid IPC credentials.");
} else {
strerror_r(-response.hdr.error, error_str, 100);
qb_util_log(LOG_ERR, "Error in connection setup: %s.",
error_str);
}
if (res != 0) {
qb_ipcs_disconnect(c);
}
return 0;
}
diff --git a/lib/loop_poll.c b/lib/loop_poll.c
index e5fa047..2ef7e1a 100644
--- a/lib/loop_poll.c
+++ b/lib/loop_poll.c
@@ -1,432 +1,433 @@
/*
* Copyright (C) 2010 Red Hat, Inc.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os_base.h"
#ifdef HAVE_EPOLL_CREATE1
#include <sys/epoll.h>
#define HAVE_EPOLL 1
#endif /* HAVE_EPOLL_CREATE */
#include <sys/poll.h>
#include <sys/resource.h>
#include <qb/qbdefs.h>
#include <qb/qblist.h>
#include <qb/qbloop.h>
#include "loop_int.h"
#include "util_int.h"
/* logs, std(in|out|err), pipe */
#define POLL_FDS_USED_MISC 50
struct qb_poll_entry {
struct qb_loop_item item;
struct pollfd ufd;
qb_loop_poll_dispatch_fn dispatch_fn;
enum qb_loop_priority p;
int32_t install_pos;
};
struct qb_poll_source {
struct qb_loop_source s;
#ifdef HAVE_EPOLL
struct epoll_event *events;
#else
struct pollfd *ufds;
#endif /* HAVE_EPOLL */
int32_t poll_entry_count;
struct qb_poll_entry *poll_entries;
qb_loop_poll_low_fds_event_fn low_fds_event_fn;
int32_t not_enough_fds;
#ifdef HAVE_EPOLL
int32_t epollfd;
#endif /* HAVE_EPOLL */
};
static struct qb_poll_source * my_src;
#ifdef HAVE_EPOLL
static int32_t poll_to_epoll_event(int32_t event)
{
int32_t out = 0;
if (event & POLLIN) out |= EPOLLIN;
if (event & POLLOUT) out |= EPOLLOUT;
if (event & POLLPRI) out |= EPOLLPRI;
if (event & POLLERR) out |= EPOLLERR;
if (event & POLLHUP) out |= EPOLLHUP;
if (event & POLLNVAL) out |= EPOLLERR;
return out;
}
static int32_t epoll_to_poll_event(int32_t event)
{
int32_t out = 0;
if (event & EPOLLIN) out |= POLLIN;
if (event & EPOLLOUT) out |= POLLOUT;
if (event & EPOLLPRI) out |= POLLPRI;
if (event & EPOLLERR) out |= POLLERR;
if (event & EPOLLHUP) out |= POLLHUP;
return out;
}
#endif /* HAVE_EPOLL */
static void poll_dispatch_and_take_back(struct qb_loop_item * item,
enum qb_loop_priority p)
{
struct qb_poll_entry *pe = (struct qb_poll_entry *)item;
int32_t res;
int32_t idx = pe->install_pos;
res = pe->dispatch_fn(pe->ufd.fd, pe->ufd.revents, pe->item.user_data);
pe = &my_src->poll_entries[idx];
if (res < 0) {
pe->ufd.fd = -1; /* empty entry */
}
pe->ufd.revents = 0;
}
static void poll_fds_usage_check(struct qb_poll_source *s)
{
struct rlimit lim;
static int32_t socks_limit = 0;
int32_t send_event = 0;
int32_t socks_used = 0;
int32_t socks_avail = 0;
int32_t i;
if (socks_limit == 0) {
if (getrlimit(RLIMIT_NOFILE, &lim) == -1) {
char error_str[100];
strerror_r(errno, error_str, 100);
printf("getrlimit: %s\n", error_str);
return;
}
socks_limit = lim.rlim_cur;
socks_limit -= POLL_FDS_USED_MISC;
if (socks_limit < 0) {
socks_limit = 0;
}
}
for (i = 0; i < s->poll_entry_count; i++) {
if (s->poll_entries[i].ufd.fd != -1) {
socks_used++;
}
}
socks_avail = socks_limit - socks_used;
if (socks_avail < 0) {
socks_avail = 0;
}
send_event = 0;
if (s->not_enough_fds) {
if (socks_avail > 2) {
s->not_enough_fds = 0;
send_event = 1;
}
} else {
if (socks_avail <= 1) {
s->not_enough_fds = 1;
send_event = 1;
}
}
if (send_event && s->low_fds_event_fn) {
s->low_fds_event_fn(s->not_enough_fds,
socks_avail);
}
}
#ifdef HAVE_EPOLL
#define MAX_EVENTS 100
static int32_t poll_and_add_to_jobs(struct qb_loop_source* src, int32_t ms_timeout)
{
int32_t i;
int32_t res;
int32_t new_jobs = 0;
struct qb_poll_entry * pe;
struct qb_poll_source * s = (struct qb_poll_source *)src;
struct epoll_event events[MAX_EVENTS];
poll_fds_usage_check(s);
retry_poll:
res = epoll_wait(s->epollfd, events, MAX_EVENTS, ms_timeout);
if (errno == EINTR && res == -1) {
goto retry_poll;
} else if (res == -1) {
return -errno;
}
for (i = 0; i < res; i++) {
pe = &s->poll_entries[events[i].data.u32];
if (pe->ufd.fd == -1) {
// empty
continue;
}
if (events[i].events == pe->ufd.revents) {
// entry already in the job queue.
continue;
}
pe->ufd.revents = epoll_to_poll_event(events[i].events);
qb_list_init(&pe->item.list);
qb_list_add_tail(&pe->item.list, &s->s.l->level[pe->p].job_head);
new_jobs++;
}
return new_jobs;
}
#else
static int32_t poll_and_add_to_jobs(struct qb_loop_source* src, int32_t ms_timeout)
{
int32_t i;
int32_t res;
int32_t new_jobs = 0;
struct qb_poll_entry * pe;
struct qb_poll_source * s = (struct qb_poll_source *)src;
poll_fds_usage_check(s);
for (i = 0; i < s->poll_entry_count; i++) {
memcpy(&s->ufds[i], &s->poll_entries[i].ufd, sizeof(struct pollfd));
}
retry_poll:
res = poll(s->ufds, s->poll_entry_count, ms_timeout);
if (errno == EINTR && res == -1) {
goto retry_poll;
} else if (res == -1) {
return -errno;
}
for (i = 0; i < s->poll_entry_count; i++) {
if (s->ufds[i].fd == -1 || s->ufds[i].revents == 0) {
// empty
continue;
}
pe = &s->poll_entries[i];
if (s->ufds[i].revents == pe->ufd.revents) {
// entry already in the job queue.
continue;
}
pe->ufd.revents = s->ufds[i].revents;
qb_list_init(&pe->item.list);
qb_list_add_tail(&pe->item.list, &s->s.l->level[pe->p].job_head);
new_jobs++;
}
return new_jobs;
}
#endif /* HAVE_EPOLL */
struct qb_loop_source*
qb_loop_poll_init(struct qb_loop *l)
{
my_src = malloc(sizeof(struct qb_poll_source));
my_src->s.l = l;
my_src->s.dispatch_and_take_back = poll_dispatch_and_take_back;
my_src->s.poll = poll_and_add_to_jobs;
my_src->poll_entries = 0;
my_src->poll_entry_count = 0;
my_src->low_fds_event_fn = NULL;
my_src->not_enough_fds = 0;
#ifdef HAVE_EPOLL
my_src->epollfd = epoll_create1(EPOLL_CLOEXEC);
my_src->events = 0;
#else
my_src->ufds = 0;
#endif /* HAVE_EPOLL */
qb_list_init(&my_src->s.list);
qb_list_add_tail(&my_src->s.list, &l->source_head);
return (struct qb_loop_source*)my_src;
}
int32_t qb_loop_poll_low_fds_event_set(
qb_loop_t *l,
qb_loop_poll_low_fds_event_fn fn)
{
my_src->low_fds_event_fn = fn;
return 0;
}
int32_t qb_loop_poll_add(struct qb_loop *l,
enum qb_loop_priority p,
int32_t fd,
int32_t events,
void *data,
qb_loop_poll_dispatch_fn dispatch_fn)
{
struct qb_poll_entry *poll_entries;
struct qb_poll_entry *pe;
int32_t found = 0;
int32_t install_pos;
int32_t res = 0;
int32_t new_size = 0;
#ifdef HAVE_EPOLL
struct epoll_event *ev;
#else
struct pollfd *ufds;
#endif /* HAVE_EPOLL */
for (found = 0, install_pos = 0;
install_pos < my_src->poll_entry_count; install_pos++) {
if (my_src->poll_entries[install_pos].ufd.fd == -1) {
found = 1;
break;
}
}
if (found == 0) {
/*
* Grow pollfd list
*/
new_size = (my_src->poll_entry_count + 1) * sizeof(struct qb_poll_entry);
poll_entries = realloc(my_src->poll_entries, new_size);
if (poll_entries == NULL) {
return -ENOMEM;
}
my_src->poll_entries = poll_entries;
#ifdef HAVE_EPOLL
new_size = (my_src->poll_entry_count+ 1) * sizeof(struct epoll_event);
ev = realloc(my_src->events, new_size);
if (ev == NULL) {
return -ENOMEM;
}
my_src->events = ev;
#else
new_size = (my_src->poll_entry_count+ 1) * sizeof(struct pollfd);
ufds = realloc(my_src->ufds, new_size);
if (ufds == NULL) {
return -ENOMEM;
}
my_src->ufds = ufds;
#endif /* HAVE_EPOLL */
my_src->poll_entry_count += 1;
install_pos = my_src->poll_entry_count - 1;
}
/*
* Install new dispatch handler
*/
pe = &my_src->poll_entries[install_pos];
pe->install_pos = install_pos;
pe->ufd.fd = fd;
pe->ufd.events = events;
pe->ufd.revents = 0;
pe->dispatch_fn = dispatch_fn;
pe->item.user_data = data;
pe->item.source = (struct qb_loop_source*)my_src;
pe->p = p;
#ifdef HAVE_EPOLL
ev = &my_src->events[install_pos];
ev->events = poll_to_epoll_event(events);
+ ev->data.u64 = 0; /* valgrind */
ev->data.u32 = install_pos;
if (epoll_ctl(my_src->epollfd, EPOLL_CTL_ADD, fd, ev) == -1) {
res = -errno;
qb_util_log(LOG_ERR, "epoll_ctl(add) : %s", strerror(-res));
}
#endif /* HAVE_EPOLL */
return (res);
}
int32_t qb_loop_poll_mod(struct qb_loop *l,
enum qb_loop_priority p,
int32_t fd,
int32_t events,
void *data,
qb_loop_poll_dispatch_fn dispatch_fn)
{
int32_t i;
int32_t res = 0;
struct qb_poll_entry *pe;
/*
* Find file descriptor to modify events and dispatch function
*/
for (i = 0; i < my_src->poll_entry_count; i++) {
pe = &my_src->poll_entries[i];
if (pe->ufd.fd != fd) {
continue;
}
pe->dispatch_fn = dispatch_fn;
pe->item.user_data = data;
pe->p = p;
if (pe->ufd.events != events) {
#ifdef HAVE_EPOLL
my_src->events[i].events = poll_to_epoll_event(events);
my_src->events[i].data.u32 = i;
if (epoll_ctl(my_src->epollfd, EPOLL_CTL_MOD, fd, &my_src->events[i]) == -1) {
res = -errno;
qb_util_log(LOG_ERR, "epoll_ctl(mod) : %s", strerror(-res));
}
#endif /* HAVE_EPOLL */
pe->ufd.events = events;
}
return res;
}
return -EBADF;
}
int32_t qb_loop_poll_del(struct qb_loop *l, int32_t fd)
{
int32_t i;
int32_t res = 0;
struct qb_poll_entry *pe;
/*
* Find file descriptor to modify events and dispatch function
*/
for (i = 0; i < my_src->poll_entry_count; i++) {
pe = &my_src->poll_entries[i];
if (pe->ufd.fd != fd) {
continue;
}
pe->ufd.fd = -1;
pe->ufd.events = 0;
pe->ufd.revents = 0;
#ifdef HAVE_EPOLL
if (epoll_ctl(my_src->epollfd, EPOLL_CTL_DEL, fd, NULL) == -1) {
res = -errno;
qb_util_log(LOG_ERR, "epoll_ctl(del) : %s",
strerror(-res));
}
#else
my_src->ufds[i].fd = -1;
my_src->ufds[i].events = 0;
my_src->ufds[i].revents = 0;
#endif /* HAVE_EPOLL */
return 0;
}
return -EBADF;
}
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sat, Jan 25, 11:46 AM (1 d, 19 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1322429
Default Alt Text
(25 KB)
Attached To
Mode
rQ LibQB
Attached
Detach File
Event Timeline
Log In to Comment