Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F3152037
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
22 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/lib/loop_poll.c b/lib/loop_poll.c
index 987382f..1f6b960 100644
--- a/lib/loop_poll.c
+++ b/lib/loop_poll.c
@@ -1,758 +1,760 @@
/*
* Copyright (C) 2010 Red Hat, Inc.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os_base.h"
#ifdef HAVE_SYS_RESOURCE_H
#include <sys/resource.h>
#endif
#include <signal.h>
#include "loop_poll_int.h"
/*
* Define this to log slow (>10ms) jobs.
*/
#undef DEBUG_DISPATCH_TIME
/* logs, std(in|out|err), pipe */
#define POLL_FDS_USED_MISC 50
static int32_t _qb_signal_add_to_jobs_(struct qb_loop *l,
struct qb_poll_entry *pe);
static void
_poll_entry_check_generate_(struct qb_poll_entry *pe)
{
int32_t i;
for (i = 0; i < 200; i++) {
pe->check = random();
if (pe->check != 0 && pe->check != 0xffffffff) {
break;
}
}
}
static void
_poll_entry_mark_deleted_(struct qb_poll_entry *pe)
{
pe->ufd.fd = -1;
pe->state = QB_POLL_ENTRY_DELETED;
pe->check = 0;
}
static void
_poll_entry_empty_(struct qb_poll_entry *pe)
{
memset(pe, 0, sizeof(struct qb_poll_entry));
pe->ufd.fd = -1;
}
static void
_poll_dispatch_and_take_back_(struct qb_loop_item *item,
enum qb_loop_priority p)
{
struct qb_poll_entry *pe = (struct qb_poll_entry *)item;
int32_t res;
#ifdef DEBUG_DISPATCH_TIME
uint64_t start;
uint64_t stop;
int32_t log_warn = QB_FALSE;
start = qb_util_nano_current_get();
#endif /* DEBUG_DISPATCH_TIME */
assert(pe->state == QB_POLL_ENTRY_JOBLIST);
assert(pe->item.type == QB_LOOP_FD);
res = pe->poll_dispatch_fn(pe->ufd.fd,
pe->ufd.revents,
pe->item.user_data);
if (res < 0) {
_poll_entry_mark_deleted_(pe);
} else {
pe->state = QB_POLL_ENTRY_ACTIVE;
pe->ufd.revents = 0;
}
#ifdef DEBUG_DISPATCH_TIME
if (pe->state == QB_POLL_ENTRY_ACTIVE) {
pe->runs++;
if ((pe->runs % 50) == 0) {
log_warn = QB_TRUE;
}
stop = qb_util_nano_current_get();
if ((stop - start) > (10 * QB_TIME_NS_IN_MSEC)) {
log_warn = QB_TRUE;
}
if (log_warn && pe->item.type == QB_LOOP_FD) {
qb_util_log(LOG_INFO,
"[fd:%d] dispatch:%p runs:%d duration:%d ms",
pe->ufd.fd, pe->poll_dispatch_fn,
pe->runs,
(int32_t) ((stop -
start) / QB_TIME_NS_IN_MSEC));
}
}
#endif /* DEBUG_DISPATCH_TIME */
}
void
qb_poll_fds_usage_check_(struct qb_poll_source *s)
{
struct rlimit lim;
static int32_t socks_limit = 0;
int32_t send_event = 0;
int32_t socks_used = 0;
int32_t socks_avail = 0;
struct qb_poll_entry *pe;
int32_t i;
if (socks_limit == 0) {
if (getrlimit(RLIMIT_NOFILE, &lim) == -1) {
qb_util_perror(LOG_WARNING, "getrlimit");
return;
}
socks_limit = lim.rlim_cur;
socks_limit -= POLL_FDS_USED_MISC;
if (socks_limit < 0) {
socks_limit = 0;
}
}
for (i = 0; i < s->poll_entry_count; i++) {
assert(qb_array_index(s->poll_entries, i, (void **)&pe) == 0);
if ((pe->state == QB_POLL_ENTRY_ACTIVE ||
pe->state == QB_POLL_ENTRY_JOBLIST) && pe->ufd.fd != -1) {
socks_used++;
}
if (pe->state == QB_POLL_ENTRY_DELETED) {
_poll_entry_empty_(pe);
}
}
socks_avail = socks_limit - socks_used;
if (socks_avail < 0) {
socks_avail = 0;
}
send_event = 0;
if (s->not_enough_fds) {
if (socks_avail > 2) {
s->not_enough_fds = 0;
send_event = 1;
}
} else {
if (socks_avail <= 1) {
s->not_enough_fds = 1;
send_event = 1;
}
}
if (send_event && s->low_fds_event_fn) {
s->low_fds_event_fn(s->not_enough_fds, socks_avail);
}
}
struct qb_loop_source *
qb_loop_poll_create(struct qb_loop *l)
{
struct qb_poll_source *s = malloc(sizeof(struct qb_poll_source));
if (s == NULL) {
return NULL;
}
s->s.l = l;
s->s.dispatch_and_take_back = _poll_dispatch_and_take_back_;
s->poll_entries = qb_array_create_2(16, sizeof(struct qb_poll_entry), 16);
s->poll_entry_count = 0;
s->low_fds_event_fn = NULL;
s->not_enough_fds = 0;
#ifdef HAVE_EPOLL
(void)qb_epoll_init(s);
#else
#ifdef HAVE_KQUEUE
(void)qb_kqueue_init(s);
#else
(void)qb_poll_init(s);
#endif /* HAVE_KQUEUE */
#endif /* HAVE_EPOLL */
return (struct qb_loop_source *)s;
}
void
qb_loop_poll_destroy(struct qb_loop *l)
{
struct qb_poll_source *s = (struct qb_poll_source *)l->fd_source;
qb_array_free(s->poll_entries);
s->driver.fini(s);
free(s);
}
int32_t
qb_loop_poll_low_fds_event_set(struct qb_loop *l,
qb_loop_poll_low_fds_event_fn fn)
{
struct qb_poll_source *s = (struct qb_poll_source *)l->fd_source;
s->low_fds_event_fn = fn;
return 0;
}
static int32_t
_get_empty_array_position_(struct qb_poll_source *s)
{
int32_t found = 0;
uint32_t install_pos;
int32_t res = 0;
struct qb_poll_entry *pe;
#ifndef HAVE_EPOLL
struct pollfd *ufds;
int32_t new_size = 0;
#endif /* HAVE_EPOLL */
for (found = 0, install_pos = 0;
install_pos < s->poll_entry_count; install_pos++) {
assert(qb_array_index
(s->poll_entries, install_pos, (void **)&pe) == 0);
if (pe->state == QB_POLL_ENTRY_EMPTY) {
found = 1;
break;
}
}
if (found == 0) {
/*
* Grow pollfd list
*/
res = qb_array_grow(s->poll_entries, s->poll_entry_count + 1);
if (res != 0) {
return res;
}
#ifndef HAVE_EPOLL
+#ifndef HAVE_KQUEUE
new_size = (s->poll_entry_count + 1) * sizeof(struct pollfd);
ufds = realloc(s->ufds, new_size);
if (ufds == NULL) {
return -ENOMEM;
}
s->ufds = ufds;
+#endif
#endif /* HAVE_EPOLL */
s->poll_entry_count += 1;
install_pos = s->poll_entry_count - 1;
}
return install_pos;
}
static int32_t
_poll_add_(struct qb_loop *l,
enum qb_loop_priority p,
int32_t fd, int32_t events, void *data, struct qb_poll_entry **pe_pt)
{
struct qb_poll_entry *pe;
uint32_t install_pos;
int32_t res = 0;
struct qb_poll_source *s;
if (l == NULL) {
return -EINVAL;
}
s = (struct qb_poll_source *)l->fd_source;
install_pos = _get_empty_array_position_(s);
assert(qb_array_index(s->poll_entries, install_pos, (void **)&pe) == 0);
pe->state = QB_POLL_ENTRY_ACTIVE;
pe->install_pos = install_pos;
_poll_entry_check_generate_(pe);
pe->ufd.fd = fd;
pe->ufd.events = events;
pe->ufd.revents = 0;
pe->item.user_data = data;
pe->item.source = (struct qb_loop_source *)l->fd_source;
pe->p = p;
pe->runs = 0;
res = s->driver.add(s, pe, fd, events);
if (res == 0) {
*pe_pt = pe;
return 0;
} else {
pe->state = QB_POLL_ENTRY_EMPTY;
return res;
}
}
static int32_t
_qb_poll_add_to_jobs_(struct qb_loop *l, struct qb_poll_entry *pe)
{
assert(pe->item.type == QB_LOOP_FD);
qb_loop_level_item_add(&l->level[pe->p], &pe->item);
pe->state = QB_POLL_ENTRY_JOBLIST;
return 1;
}
int32_t
qb_loop_poll_add(struct qb_loop * lp,
enum qb_loop_priority p,
int32_t fd,
int32_t events,
void *data, qb_loop_poll_dispatch_fn dispatch_fn)
{
struct qb_poll_entry *pe = NULL;
int32_t size;
int32_t new_size;
int32_t res;
struct qb_loop *l = lp;
if (l == NULL) {
l = qb_loop_default_get();
}
size = ((struct qb_poll_source *)l->fd_source)->poll_entry_count;
res = _poll_add_(l, p, fd, events, data, &pe);
new_size = ((struct qb_poll_source *)l->fd_source)->poll_entry_count;
pe->poll_dispatch_fn = dispatch_fn;
pe->item.type = QB_LOOP_FD;
pe->add_to_jobs = _qb_poll_add_to_jobs_;
if (new_size > size) {
qb_util_log(LOG_TRACE,
"grown poll array to %d for FD %d", new_size, fd);
}
return res;
}
int32_t
qb_loop_poll_mod(struct qb_loop * lp,
enum qb_loop_priority p,
int32_t fd,
int32_t events,
void *data, qb_loop_poll_dispatch_fn dispatch_fn)
{
uint32_t i;
int32_t res = 0;
struct qb_poll_entry *pe;
struct qb_poll_source *s;
struct qb_loop *l = lp;
if (l == NULL) {
l = qb_loop_default_get();
}
s = (struct qb_poll_source *)l->fd_source;
/*
* Find file descriptor to modify events and dispatch function
*/
for (i = 0; i < s->poll_entry_count; i++) {
assert(qb_array_index(s->poll_entries, i, (void **)&pe) == 0);
if (pe->ufd.fd != fd) {
continue;
}
if (pe->state == QB_POLL_ENTRY_DELETED || pe->check == 0) {
qb_util_log(LOG_ERR,
"poll_mod : can't modify entry already deleted");
return -EBADF;
}
pe->poll_dispatch_fn = dispatch_fn;
pe->item.user_data = data;
pe->p = p;
if (pe->ufd.events != events) {
res = s->driver.mod(s, pe, fd, events);
pe->ufd.events = events;
}
return res;
}
return -EBADF;
}
int32_t
qb_loop_poll_del(struct qb_loop * lp, int32_t fd)
{
int32_t i;
int32_t res = 0;
struct qb_poll_entry *pe;
struct qb_poll_source *s;
struct qb_loop *l = lp;
if (l == NULL) {
l = qb_loop_default_get();
}
s = (struct qb_poll_source *)l->fd_source;
for (i = 0; i < s->poll_entry_count; i++) {
assert(qb_array_index(s->poll_entries, i, (void **)&pe) == 0);
if (pe->ufd.fd != fd || pe->item.type != QB_LOOP_FD) {
continue;
}
if (pe->state == QB_POLL_ENTRY_DELETED ||
pe->state == QB_POLL_ENTRY_EMPTY) {
return 0;
}
if (pe->state == QB_POLL_ENTRY_JOBLIST) {
qb_loop_level_item_del(&l->level[pe->p], &pe->item);
}
res = s->driver.del(s, pe, fd, i);
_poll_entry_mark_deleted_(pe);
return res;
}
return -EBADF;
}
static int32_t pipe_fds[2] = { -1, -1 };
struct qb_signal_source {
struct qb_loop_source s;
struct qb_list_head sig_head;
sigset_t signal_superset;
};
struct qb_loop_sig {
struct qb_loop_item item;
int32_t signal;
enum qb_loop_priority p;
qb_loop_signal_dispatch_fn dispatch_fn;
struct qb_loop_sig *cloned_from;
};
static void
_handle_real_signal_(int signal_num, siginfo_t * si, void *context)
{
int32_t sig = signal_num;
int32_t res = 0;
if (pipe_fds[1] > 0) {
try_again:
res = write(pipe_fds[1], &sig, sizeof(int32_t));
if (res == -1 && errno == EAGAIN) {
goto try_again;
} else if (res != sizeof(int32_t)) {
qb_util_log(LOG_ERR,
"failed to write signal to pipe [%d]", res);
}
}
qb_util_log(LOG_TRACE, "got real signal [%d] sent to pipe", sig);
}
static void
_signal_dispatch_and_take_back_(struct qb_loop_item *item,
enum qb_loop_priority p)
{
struct qb_loop_sig *sig = (struct qb_loop_sig *)item;
int32_t res;
res = sig->dispatch_fn(sig->signal, sig->item.user_data);
if (res != 0) {
(void)qb_loop_signal_del(sig->cloned_from->item.source->l,
sig->cloned_from);
}
free(sig);
}
struct qb_loop_source *
qb_loop_signals_create(struct qb_loop *l)
{
int32_t res = 0;
struct qb_poll_entry *pe;
struct qb_signal_source *s = calloc(1, sizeof(struct qb_signal_source));
if (s == NULL) {
return NULL;
}
s->s.l = l;
s->s.dispatch_and_take_back = _signal_dispatch_and_take_back_;
s->s.poll = NULL;
qb_list_init(&s->sig_head);
sigemptyset(&s->signal_superset);
if (pipe_fds[0] < 0) {
res = pipe(pipe_fds);
if (res == -1) {
res = -errno;
qb_util_perror(LOG_ERR, "Can't light pipe");
goto error_exit;
}
(void)qb_sys_fd_nonblock_cloexec_set(pipe_fds[0]);
(void)qb_sys_fd_nonblock_cloexec_set(pipe_fds[1]);
res = _poll_add_(l, QB_LOOP_HIGH,
pipe_fds[0], POLLIN, NULL, &pe);
if (res == 0) {
pe->poll_dispatch_fn = NULL;
pe->item.type = QB_LOOP_SIG;
pe->add_to_jobs = _qb_signal_add_to_jobs_;
} else {
qb_util_perror(LOG_ERR, "Can't smoke pipe");
goto error_exit;
}
}
return (struct qb_loop_source *)s;
error_exit:
errno = -res;
free(s);
if (pipe_fds[0] >= 0) {
close(pipe_fds[0]);
}
if (pipe_fds[1] >= 0) {
close(pipe_fds[1]);
}
return NULL;
}
void
qb_loop_signals_destroy(struct qb_loop *l)
{
struct qb_signal_source *s =
(struct qb_signal_source *)l->signal_source;
struct qb_list_head *list;
struct qb_list_head *n;
struct qb_loop_item *item;
close(pipe_fds[0]);
pipe_fds[0] = -1;
close(pipe_fds[1]);
pipe_fds[1] = -1;
qb_list_for_each_safe(list, n, &s->sig_head) {
item = qb_list_entry(list, struct qb_loop_item, list);
qb_list_del(&item->list);
free(item);
}
free(l->signal_source);
}
static int32_t
_qb_signal_add_to_jobs_(struct qb_loop *l, struct qb_poll_entry *pe)
{
struct qb_signal_source *s =
(struct qb_signal_source *)l->signal_source;
struct qb_list_head *list;
struct qb_loop_sig *sig;
struct qb_loop_item *item;
struct qb_loop_sig *new_sig_job;
int32_t the_signal;
ssize_t res;
int32_t jobs_added = 0;
res = read(pipe_fds[0], &the_signal, sizeof(int32_t));
if (res != sizeof(int32_t)) {
qb_util_perror(LOG_WARNING, "failed to read pipe");
return 0;
}
pe->ufd.revents = 0;
qb_list_for_each(list, &s->sig_head) {
item = qb_list_entry(list, struct qb_loop_item, list);
sig = (struct qb_loop_sig *)item;
if (sig->signal == the_signal) {
new_sig_job = calloc(1, sizeof(struct qb_loop_sig));
if (new_sig_job == NULL) {
return jobs_added;
}
memcpy(new_sig_job, sig, sizeof(struct qb_loop_sig));
qb_util_log(LOG_TRACE,
"adding signal [%d] to job queue %p",
the_signal, sig);
new_sig_job->cloned_from = sig;
qb_loop_level_item_add(&l->level[sig->p],
&new_sig_job->item);
jobs_added++;
}
}
return jobs_added;
}
static void
_adjust_sigactions_(struct qb_signal_source *s)
{
struct qb_loop_sig *sig;
struct qb_loop_item *item;
struct sigaction sa;
int32_t i;
int32_t needed;
sa.sa_flags = SA_SIGINFO;
sa.sa_sigaction = _handle_real_signal_;
sigemptyset(&s->signal_superset);
sigemptyset(&sa.sa_mask);
/* re-set to default */
for (i = 0; i < 30; i++) {
needed = QB_FALSE;
qb_list_for_each_entry(item, &s->sig_head, list) {
sig = (struct qb_loop_sig *)item;
if (i == sig->signal) {
needed = QB_TRUE;
break;
}
}
if (needed) {
sigaddset(&s->signal_superset, i);
sigaction(i, &sa, NULL);
} else {
(void)signal(i, SIG_DFL);
}
}
}
int32_t
qb_loop_signal_add(qb_loop_t * lp,
enum qb_loop_priority p,
int32_t the_sig,
void *data,
qb_loop_signal_dispatch_fn dispatch_fn,
qb_loop_signal_handle * handle)
{
struct qb_loop_sig *sig;
struct qb_signal_source *s;
struct qb_loop *l = lp;
if (l == NULL) {
l = qb_loop_default_get();
}
if (l == NULL || dispatch_fn == NULL) {
return -EINVAL;
}
if (p < QB_LOOP_LOW || p > QB_LOOP_HIGH) {
return -EINVAL;
}
s = (struct qb_signal_source *)l->signal_source;
sig = calloc(1, sizeof(struct qb_loop_sig));
if (sig == NULL) {
return -errno;
}
sig->dispatch_fn = dispatch_fn;
sig->p = p;
sig->signal = the_sig;
sig->item.user_data = data;
sig->item.source = l->signal_source;
sig->item.type = QB_LOOP_SIG;
qb_list_init(&sig->item.list);
qb_list_add_tail(&sig->item.list, &s->sig_head);
if (sigismember(&s->signal_superset, the_sig) != 1) {
_adjust_sigactions_(s);
}
if (handle) {
*handle = sig;
}
return 0;
}
int32_t
qb_loop_signal_mod(qb_loop_t * lp,
enum qb_loop_priority p,
int32_t the_sig,
void *data,
qb_loop_signal_dispatch_fn dispatch_fn,
qb_loop_signal_handle handle)
{
struct qb_signal_source *s;
struct qb_loop_sig *sig = (struct qb_loop_sig *)handle;
struct qb_loop *l = lp;
if (l == NULL) {
l = qb_loop_default_get();
}
if (l == NULL || dispatch_fn == NULL || handle == NULL) {
return -EINVAL;
}
if (p < QB_LOOP_LOW || p > QB_LOOP_HIGH) {
return -EINVAL;
}
s = (struct qb_signal_source *)l->signal_source;
sig->item.user_data = data;
sig->item.type = QB_LOOP_SIG;
sig->dispatch_fn = dispatch_fn;
sig->p = p;
if (sig->signal != the_sig) {
sig->signal = the_sig;
_adjust_sigactions_(s);
}
return 0;
}
int32_t
qb_loop_signal_del(qb_loop_t * lp, qb_loop_signal_handle handle)
{
struct qb_signal_source *s;
struct qb_loop_sig *sig = (struct qb_loop_sig *)handle;
struct qb_loop_sig *sig_clone;
struct qb_loop *l = lp;
struct qb_loop_item *item;
if (l == NULL) {
l = qb_loop_default_get();
}
if (l == NULL || handle == NULL) {
return -EINVAL;
}
s = (struct qb_signal_source *)l->signal_source;
qb_list_for_each_entry(item, &l->level[sig->p].wait_head, list) {
if (item->type != QB_LOOP_SIG) {
continue;
}
sig_clone = (struct qb_loop_sig *)item;
if (sig_clone->cloned_from == sig) {
qb_util_log(LOG_TRACE, "deleting sig in WAITLIST");
qb_list_del(&sig_clone->item.list);
free(sig_clone);
break;
}
}
qb_list_for_each_entry(item, &l->level[sig->p].job_head, list) {
if (item->type != QB_LOOP_SIG) {
continue;
}
sig_clone = (struct qb_loop_sig *)item;
if (sig_clone->cloned_from == sig) {
qb_loop_level_item_del(&l->level[sig->p], item);
qb_util_log(LOG_TRACE, "deleting sig in JOBLIST");
break;
}
}
qb_list_del(&sig->item.list);
free(sig);
_adjust_sigactions_(s);
return 0;
}
diff --git a/lib/loop_poll_kqueue.c b/lib/loop_poll_kqueue.c
index 6acbcc1..b33825b 100644
--- a/lib/loop_poll_kqueue.c
+++ b/lib/loop_poll_kqueue.c
@@ -1,192 +1,175 @@
/*
* Copyright (C) 2012 Red Hat, Inc.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os_base.h"
#include "loop_poll_int.h"
#ifdef HAVE_SYS_EVENT_H
#include <sys/event.h>
#endif /* HAVE_SYS_EVENT_H */
#define MAX_EVENTS 12
static int32_t
_poll_to_filter_(int32_t event)
{
int32_t out = 0;
if (event & POLLIN)
out |= EVFILT_READ;
if (event & POLLOUT)
out |= EVFILT_WRITE;
return out;
}
-static int32_t
-_filter_to_poll_event_(int32_t event)
-{
- int32_t out = 0;
- if (event & EPOLLIN)
- out |= POLLIN;
- if (event & EPOLLOUT)
- out |= POLLOUT;
- if (event & EPOLLPRI)
- out |= POLLPRI;
- if (event & EPOLLERR)
- out |= POLLERR;
- if (event & EPOLLHUP)
- out |= POLLHUP;
- return out;
-}
-
static void
_fini(struct qb_poll_source *s)
{
if (s->epollfd != -1) {
close(s->epollfd);
s->epollfd = -1;
}
}
static int32_t
_add(struct qb_poll_source *s, struct qb_poll_entry *pe, int32_t fd, int32_t events)
{
int32_t res = 0;
struct kevent ke;
int kents = _poll_to_filter_(events);
/* fill out the kevent struct */
EV_SET(&ke, pe->check, kents, EV_ADD, 0, NULL, pe);
/* set the event */
- res = kevent(kq, &ke, 1, NULL, 0, NULL);
+ res = kevent(s->epollfd, &ke, 1, NULL, 0, NULL);
if (res == -1) {
res = -errno;
qb_util_perror(LOG_ERR, "kevent(add)");
}
return res;
}
static int32_t
_mod(struct qb_poll_source *s, struct qb_poll_entry *pe, int32_t fd, int32_t events)
{
}
static int32_t
_del(struct qb_poll_source *s, struct qb_poll_entry *pe, int32_t fd, int32_t arr_index)
{
int32_t res = 0;
struct kevent ke;
- int kents = _poll_to_filter_(events);
+ int kents = 0; //_poll_to_filter_(events);
/* fill out the kevent struct */
EV_SET(&ke, pe->check, kents, EV_DELETE, 0, NULL, pe);
/* set the event */
- res = kevent(kq, &ke, 1, NULL, 0, NULL);
+ res = kevent(s->epollfd, &ke, 1, NULL, 0, NULL);
if (res == -1) {
res = -errno;
qb_util_perror(LOG_ERR, "kevent(del)");
}
return res;
}
static int32_t
_poll_and_add_to_jobs_(struct qb_loop_source *src, int32_t ms_timeout)
{
int32_t i;
int32_t res;
int32_t event_count;
int32_t new_jobs = 0;
int32_t revents;
struct qb_poll_entry *pe = NULL;
struct qb_poll_source *s = (struct qb_poll_source *)src;
struct kevent events[MAX_EVENTS];
struct timespec timeout = { 0, 0 };
qb_timespec_add_ms(&timeout, ms_timeout);
qb_poll_fds_usage_check_(s);
retry_poll:
event_count = kevent(s->epollfd, NULL, 0, events, MAX_EVENTS, NULL);
if (errno == EINTR && event_count == -1) {
goto retry_poll;
} else if (event_count == -1) {
return -errno;
}
for (i = 0; i < event_count; i++) {
- if (evi.flags & EV_ERROR) {
+ if (events[i].flags & EV_ERROR) {
revents = POLLHUP;
}
- if (evi.filter == EVFILT_READ) {
+ if (events[i].filter == EVFILT_READ) {
revents |= POLLIN;
}
- if (evi.filter == EVFILT_WRITE) {
+ if (events[i].filter == EVFILT_WRITE) {
revents |= POLLOUT;
}
- pe = evi.udata;
- if (pe->check != evi.ident) {
+ pe = events[i].udata;
+ if (pe->check != events[i].ident) {
qb_util_log(LOG_WARNING,
"can't find poll entry for new event.");
continue;
}
if (pe->ufd.fd == -1 || pe->state == QB_POLL_ENTRY_DELETED) {
qb_util_log(LOG_WARNING,
"can't post new event to a deleted entry.");
/*
* empty/deleted
*/
continue;
}
if (revents == pe->ufd.revents ||
pe->state == QB_POLL_ENTRY_JOBLIST) {
/*
* entry already in the job queue.
*/
continue;
}
pe->ufd.revents = revents;
new_jobs += pe->add_to_jobs(src->l, pe);
}
return new_jobs;
}
int32_t
-qb_epoll_init(struct qb_poll_source *s)
+qb_kqueue_init(struct qb_poll_source *s)
{
s->epollfd = kqueue();
if (s->epollfd < 0) {
return -errno;
}
s->driver.fini = _fini;
s->driver.add = _add;
s->driver.mod = _mod;
s->driver.del = _del;
s->s.poll = _poll_and_add_to_jobs_;
return 0;
}
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Mon, Feb 24, 3:02 PM (5 h, 26 m ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1464279
Default Alt Text
(22 KB)
Attached To
Mode
rQ LibQB
Attached
Detach File
Event Timeline
Log In to Comment