Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/lib/loop_poll.c b/lib/loop_poll.c
index 1ea8764..c02edb6 100644
--- a/lib/loop_poll.c
+++ b/lib/loop_poll.c
@@ -1,1063 +1,1063 @@
/*
* Copyright (C) 2010 Red Hat, Inc.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os_base.h"
#include <sys/resource.h>
#ifdef HAVE_SYS_EPOLL_H
#include <sys/epoll.h>
#endif /* HAVE_SYS_EPOLL_H */
#include <sys/poll.h>
#ifndef S_SPLINT_S
#ifdef HAVE_SYS_TIMERFD_H
#include <sys/timerfd.h>
#endif /* HAVE_SYS_TIMERFD_H */
#endif /* S_SPLINT_S */
#include <signal.h>
#include <qb/qbdefs.h>
#include <qb/qblist.h>
#include <qb/qbarray.h>
#include <qb/qbloop.h>
#include <qb/qbutil.h>
#include "loop_int.h"
#include "util_int.h"
/*
* Define this to log slow (>10ms) jobs.
*/
#undef DEBUG_DISPATCH_TIME
/* logs, std(in|out|err), pipe */
#define POLL_FDS_USED_MISC 50
enum qb_poll_type {
QB_UNKNOWN,
QB_JOB,
QB_POLL,
QB_SIGNAL,
QB_TIMER,
};
struct qb_poll_entry;
typedef int32_t (*qb_poll_add_to_jobs_fn) (struct qb_loop* l, struct qb_poll_entry* pe);
struct qb_poll_entry {
struct qb_loop_item item;
enum qb_poll_type type;
qb_loop_poll_dispatch_fn poll_dispatch_fn;
qb_loop_timer_dispatch_fn timer_dispatch_fn;
enum qb_loop_priority p;
uint32_t install_pos;
struct pollfd ufd;
qb_poll_add_to_jobs_fn add_to_jobs;
uint32_t runs;
enum qb_poll_entry_state state;
uint32_t check;
};
struct qb_poll_source {
struct qb_loop_source s;
#ifdef HAVE_EPOLL
int32_t epollfd;
#else
struct pollfd *ufds;
#endif /* HAVE_EPOLL */
int32_t poll_entry_count;
qb_array_t *poll_entries;
qb_loop_poll_low_fds_event_fn low_fds_event_fn;
int32_t not_enough_fds;
};
static int32_t _qb_signal_add_to_jobs_(struct qb_loop* l,
struct qb_poll_entry* pe);
#ifdef HAVE_EPOLL
static int32_t _poll_to_epoll_event_(int32_t event)
{
int32_t out = 0;
if (event & POLLIN) out |= EPOLLIN;
if (event & POLLOUT) out |= EPOLLOUT;
if (event & POLLPRI) out |= EPOLLPRI;
if (event & POLLERR) out |= EPOLLERR;
if (event & POLLHUP) out |= EPOLLHUP;
if (event & POLLNVAL) out |= EPOLLERR;
return out;
}
static int32_t _epoll_to_poll_event_(int32_t event)
{
int32_t out = 0;
if (event & EPOLLIN) out |= POLLIN;
if (event & EPOLLOUT) out |= POLLOUT;
if (event & EPOLLPRI) out |= POLLPRI;
if (event & EPOLLERR) out |= POLLERR;
if (event & EPOLLHUP) out |= POLLHUP;
return out;
}
#endif /* HAVE_EPOLL */
static void _poll_entry_check_generate_(struct qb_poll_entry *pe)
{
int32_t i;
for (i = 0; i < 200; i++) {
pe->check = random();
if (pe->check != 0 && pe->check != 0xffffffff) {
break;
}
}
}
#if defined(HAVE_TIMERFD) || defined(HAVE_EPOLL)
static int32_t _poll_entry_from_handle_(struct qb_poll_source *s,
uint64_t handle_in,
struct qb_poll_entry **pe_pt)
{
int32_t res = 0;
uint32_t check = ((uint32_t) (((uint64_t) handle_in) >> 32));
uint32_t handle = handle_in & 0xffffffff;
struct qb_poll_entry *pe;
res = qb_array_index(s->poll_entries, handle, (void**)&pe);
if (res != 0) {
return res;
}
if (pe->check != check) {
return -EINVAL;
}
*pe_pt = pe;
return 0;
}
#endif /* HAVE_TIMERFD or HAVE_EPOLL */
static void _poll_entry_mark_deleted_(struct qb_poll_entry *pe)
{
pe->ufd.fd = -1;
pe->state = QB_POLL_ENTRY_DELETED;
pe->check = 0;
}
static void _poll_entry_empty_(struct qb_poll_entry *pe)
{
memset(pe, 0, sizeof(struct qb_poll_entry));
pe->ufd.fd = -1;
}
static void _poll_dispatch_and_take_back_(struct qb_loop_item * item,
enum qb_loop_priority p)
{
struct qb_poll_entry *pe = (struct qb_poll_entry *)item;
int32_t res;
#ifdef DEBUG_DISPATCH_TIME
uint64_t start;
uint64_t stop;
int32_t log_warn = QB_FALSE;
start = qb_util_nano_current_get();
#endif /* DEBUG_DISPATCH_TIME */
assert(pe->state == QB_POLL_ENTRY_JOBLIST);
if (pe->type == QB_POLL) {
res = pe->poll_dispatch_fn(pe->ufd.fd, pe->ufd.revents, pe->item.user_data);
if (res < 0) {
_poll_entry_mark_deleted_(pe);
} else {
pe->state = QB_POLL_ENTRY_ACTIVE;
pe->ufd.revents = 0;
}
} else if (pe->type == QB_TIMER) {
_poll_entry_mark_deleted_(pe);
pe->timer_dispatch_fn(pe->item.user_data);
} else {
qb_util_log(LOG_WARNING, "poll entry of unknown type:%d state:%d",
pe->type, pe->state);
return;
}
if (pe->state == QB_POLL_ENTRY_ACTIVE) {
#ifdef DEBUG_DISPATCH_TIME
pe->runs++;
if ((pe->runs % 50) == 0) {
log_warn = QB_TRUE;
}
stop = qb_util_nano_current_get();
if ((stop - start) > (10 * QB_TIME_NS_IN_MSEC)) {
log_warn = QB_TRUE;
}
if (log_warn && pe->type == QB_POLL) {
qb_util_log(LOG_INFO,
"[fd:%d] dispatch:%p runs:%d duration:%d ms",
pe->ufd.fd, pe->poll_dispatch_fn,
pe->runs,
(int32_t)((stop - start)/QB_TIME_NS_IN_MSEC));
} else if (log_warn && pe->type == QB_TIMER) {
qb_util_log(LOG_INFO,
"[timer] dispatch:%p runs:%d duration:%d ms",
pe->timer_dispatch_fn,
pe->runs,
(int32_t)((stop - start)/QB_TIME_NS_IN_MSEC));
}
#endif /* DEBUG_DISPATCH_TIME */
}
}
static void _poll_fds_usage_check_(struct qb_poll_source *s)
{
struct rlimit lim;
static int32_t socks_limit = 0;
int32_t send_event = 0;
int32_t socks_used = 0;
int32_t socks_avail = 0;
struct qb_poll_entry * pe;
int32_t i;
if (socks_limit == 0) {
if (getrlimit(RLIMIT_NOFILE, &lim) == -1) {
qb_util_perror(LOG_WARNING, "getrlimit");
return;
}
socks_limit = lim.rlim_cur;
socks_limit -= POLL_FDS_USED_MISC;
if (socks_limit < 0) {
socks_limit = 0;
}
}
for (i = 0; i < s->poll_entry_count; i++) {
assert(qb_array_index(s->poll_entries, i, (void**)&pe) == 0);
if ((pe->state == QB_POLL_ENTRY_ACTIVE ||
pe->state == QB_POLL_ENTRY_JOBLIST) &&
pe->ufd.fd != -1) {
socks_used++;
}
if (pe->state == QB_POLL_ENTRY_DELETED) {
_poll_entry_empty_(pe);
}
}
socks_avail = socks_limit - socks_used;
if (socks_avail < 0) {
socks_avail = 0;
}
send_event = 0;
if (s->not_enough_fds) {
if (socks_avail > 2) {
s->not_enough_fds = 0;
send_event = 1;
}
} else {
if (socks_avail <= 1) {
s->not_enough_fds = 1;
send_event = 1;
}
}
if (send_event && s->low_fds_event_fn) {
s->low_fds_event_fn(s->not_enough_fds,
socks_avail);
}
}
#ifdef HAVE_EPOLL
#define MAX_EVENTS 12
static int32_t _poll_and_add_to_jobs_(struct qb_loop_source* src, int32_t ms_timeout)
{
int32_t i;
int32_t res;
int32_t event_count;
int32_t new_jobs = 0;
struct qb_poll_entry * pe = NULL;
struct qb_poll_source * s = (struct qb_poll_source *)src;
struct epoll_event events[MAX_EVENTS];
_poll_fds_usage_check_(s);
retry_poll:
event_count = epoll_wait(s->epollfd, events, MAX_EVENTS, ms_timeout);
if (errno == EINTR && event_count == -1) {
goto retry_poll;
} else if (event_count == -1) {
return -errno;
}
for (i = 0; i < event_count; i++) {
res = _poll_entry_from_handle_(s, events[i].data.u64, &pe);
if (res != 0) {
qb_util_log(LOG_WARNING, "can't find poll entry for new event.");
continue;
}
if (pe->ufd.fd == -1 || pe->state == QB_POLL_ENTRY_DELETED) {
qb_util_log(LOG_WARNING, "can't post new event to a deleted entry.");
/*
* empty/deleted
*/
continue;
}
if (events[i].events == pe->ufd.revents ||
pe->state == QB_POLL_ENTRY_JOBLIST) {
/*
* entry already in the job queue.
*/
continue;
}
pe->ufd.revents = _epoll_to_poll_event_(events[i].events);
new_jobs += pe->add_to_jobs(src->l, pe);
}
return new_jobs;
}
#else
static int32_t _poll_and_add_to_jobs_(struct qb_loop_source* src, int32_t ms_timeout)
{
int32_t i;
int32_t res;
int32_t new_jobs = 0;
struct qb_poll_entry * pe;
struct qb_poll_source * s = (struct qb_poll_source *)src;
_poll_fds_usage_check_(s);
for (i = 0; i < s->poll_entry_count; i++) {
assert(qb_array_index(s->poll_entries, i, (void**)&pe) == 0);
memcpy(&s->ufds[i], &pe->ufd, sizeof(struct pollfd));
}
retry_poll:
res = poll(s->ufds, s->poll_entry_count, ms_timeout);
if (errno == EINTR && res == -1) {
goto retry_poll;
} else if (res == -1) {
return -errno;
}
for (i = 0; i < s->poll_entry_count; i++) {
if (s->ufds[i].fd == -1 || s->ufds[i].revents == 0) {
/*
* empty entry
*/
continue;
}
assert(qb_array_index(s->poll_entries, i, (void**)&pe) == 0);
if (pe->state != QB_POLL_ENTRY_ACTIVE ||
s->ufds[i].revents == pe->ufd.revents) {
/*
* Wrong state to accept an event.
*/
continue;
}
pe->ufd.revents = s->ufds[i].revents;
new_jobs += pe->add_to_jobs(src->l, pe);
}
return new_jobs;
}
#endif /* HAVE_EPOLL */
struct qb_loop_source*
qb_loop_poll_create(struct qb_loop *l)
{
struct qb_poll_source *s = malloc(sizeof(struct qb_poll_source));
s->s.l = l;
s->s.dispatch_and_take_back = _poll_dispatch_and_take_back_;
s->s.poll = _poll_and_add_to_jobs_;
s->poll_entries = qb_array_create(128, sizeof(struct qb_poll_entry));
s->poll_entry_count = 0;
s->low_fds_event_fn = NULL;
s->not_enough_fds = 0;
#ifdef HAVE_EPOLL
s->epollfd = epoll_create1(EPOLL_CLOEXEC);
#else
s->ufds = 0;
#endif /* HAVE_EPOLL */
return (struct qb_loop_source*)s;
}
void qb_loop_poll_destroy(struct qb_loop *l)
{
struct qb_poll_source * s = (struct qb_poll_source *)l->fd_source;
qb_array_free(s->poll_entries);
#ifdef HAVE_EPOLL
if (s->epollfd != -1) {
close(s->epollfd);
s->epollfd = -1;
}
#endif /* HAVE_EPOLL */
free(s);
}
int32_t qb_loop_poll_low_fds_event_set(struct qb_loop *l,
qb_loop_poll_low_fds_event_fn fn)
{
struct qb_poll_source * s = (struct qb_poll_source *)l->fd_source;
s->low_fds_event_fn = fn;
return 0;
}
static int32_t _get_empty_array_position_(struct qb_poll_source * s)
{
int32_t found = 0;
uint32_t install_pos;
int32_t res = 0;
struct qb_poll_entry *pe;
#ifndef HAVE_EPOLL
struct pollfd *ufds;
int32_t new_size = 0;
#endif /* HAVE_EPOLL */
for (found = 0, install_pos = 0;
install_pos < s->poll_entry_count; install_pos++) {
assert(qb_array_index(s->poll_entries, install_pos, (void**)&pe) == 0);
if (pe->state == QB_POLL_ENTRY_EMPTY) {
found = 1;
break;
}
}
if (found == 0) {
/*
* Grow pollfd list
*/
res = qb_array_grow(s->poll_entries,
s->poll_entry_count + 1);
if (res != 0) {
return res;
}
#ifndef HAVE_EPOLL
new_size = (s->poll_entry_count+ 1) * sizeof(struct pollfd);
ufds = realloc(s->ufds, new_size);
if (ufds == NULL) {
return -ENOMEM;
}
s->ufds = ufds;
#endif /* HAVE_EPOLL */
s->poll_entry_count += 1;
install_pos = s->poll_entry_count - 1;
}
return install_pos;
}
static int32_t _poll_add_(struct qb_loop *l,
enum qb_loop_priority p,
int32_t fd,
int32_t events,
void *data,
struct qb_poll_entry **pe_pt)
{
struct qb_poll_entry *pe;
uint32_t install_pos;
int32_t res = 0;
struct qb_poll_source * s;
#ifdef HAVE_EPOLL
struct epoll_event ev;
#endif /* HAVE_EPOLL */
if (l == NULL) {
return -EINVAL;
}
s = (struct qb_poll_source *)l->fd_source;
install_pos = _get_empty_array_position_(s);
assert(qb_array_index(s->poll_entries, install_pos, (void**)&pe) == 0);
pe->state = QB_POLL_ENTRY_ACTIVE;
pe->install_pos = install_pos;
_poll_entry_check_generate_(pe);
pe->ufd.fd = fd;
pe->ufd.events = events;
pe->ufd.revents = 0;
pe->item.user_data = data;
pe->item.source = (struct qb_loop_source*)l->fd_source;
pe->p = p;
pe->runs = 0;
#ifdef HAVE_EPOLL
ev.events = _poll_to_epoll_event_(events);
ev.data.u64 = (((uint64_t) (pe->check)) << 32) | pe->install_pos;
if (epoll_ctl(s->epollfd, EPOLL_CTL_ADD, fd, &ev) == -1) {
res = -errno;
qb_util_perror(LOG_ERR, "epoll_ctl(add)");
}
#endif /* HAVE_EPOLL */
*pe_pt = pe;
return (res);
}
static int32_t _qb_poll_add_to_jobs_(struct qb_loop* l, struct qb_poll_entry* pe)
{
assert(pe->type == QB_POLL);
qb_loop_level_item_add(&l->level[pe->p], &pe->item);
pe->state = QB_POLL_ENTRY_JOBLIST;
return 1;
}
int32_t qb_loop_poll_add(struct qb_loop *l,
enum qb_loop_priority p,
int32_t fd,
int32_t events,
void *data,
qb_loop_poll_dispatch_fn dispatch_fn)
{
struct qb_poll_entry *pe = NULL;
int32_t size;
int32_t new_size;
int32_t res;
size = ((struct qb_poll_source *)l->fd_source)->poll_entry_count;
res = _poll_add_(l, p, fd, events, data, &pe);
new_size = ((struct qb_poll_source *)l->fd_source)->poll_entry_count;
pe->poll_dispatch_fn = dispatch_fn;
pe->type = QB_POLL;
pe->add_to_jobs = _qb_poll_add_to_jobs_;
if (new_size > size) {
- qb_util_log(LOG_DEBUG,
+ qb_util_log(LOG_TRACE,
"grown poll array to %d for FD %d",
new_size, fd);
}
return res;
}
int32_t qb_loop_poll_mod(struct qb_loop *l,
enum qb_loop_priority p,
int32_t fd,
int32_t events,
void *data,
qb_loop_poll_dispatch_fn dispatch_fn)
{
uint32_t i;
int32_t res = 0;
struct qb_poll_entry *pe;
struct qb_poll_source * s = (struct qb_poll_source *)l->fd_source;
#ifdef HAVE_EPOLL
struct epoll_event ev;
#endif /* HAVE_EPOLL */
/*
* Find file descriptor to modify events and dispatch function
*/
for (i = 0; i < s->poll_entry_count; i++) {
assert(qb_array_index(s->poll_entries, i, (void**)&pe) == 0);
if (pe->ufd.fd != fd) {
continue;
}
if (pe->state == QB_POLL_ENTRY_DELETED || pe->check == 0) {
qb_util_log(LOG_ERR,
"poll_mod : can't modify entry already deleted");
return -EBADF;
}
pe->poll_dispatch_fn = dispatch_fn;
pe->item.user_data = data;
pe->p = p;
if (pe->ufd.events != events) {
#ifdef HAVE_EPOLL
ev.events = _poll_to_epoll_event_(events);
ev.data.u64 = (((uint64_t) (pe->check)) << 32) | i;
if (epoll_ctl(s->epollfd, EPOLL_CTL_MOD, fd, &ev) == -1) {
res = -errno;
qb_util_perror(LOG_ERR, "epoll_ctl(mod)");
}
#endif /* HAVE_EPOLL */
pe->ufd.events = events;
}
return res;
}
return -EBADF;
}
int32_t qb_loop_poll_del(struct qb_loop *l, int32_t fd)
{
int32_t i;
int32_t res = 0;
struct qb_poll_entry *pe;
struct qb_poll_source * s = (struct qb_poll_source *)l->fd_source;
for (i = 0; i < s->poll_entry_count; i++) {
assert(qb_array_index(s->poll_entries, i, (void**)&pe) == 0);
if (pe->ufd.fd != fd || pe->type != QB_POLL) {
continue;
}
if (pe->state == QB_POLL_ENTRY_DELETED ||
pe->state == QB_POLL_ENTRY_EMPTY) {
return 0;
}
if (pe->state == QB_POLL_ENTRY_JOBLIST) {
qb_loop_level_item_del(&l->level[pe->p], &pe->item);
}
#ifdef HAVE_EPOLL
if (epoll_ctl(s->epollfd, EPOLL_CTL_DEL, fd, NULL) == -1) {
res = -errno;
qb_util_perror(LOG_WARNING, "epoll_ctl(del)");
}
#else
s->ufds[i].fd = -1;
s->ufds[i].events = 0;
s->ufds[i].revents = 0;
#endif /* HAVE_EPOLL */
_poll_entry_mark_deleted_(pe);
return res;
}
return -EBADF;
}
#ifdef HAVE_TIMERFD
static int32_t _qb_timer_add_to_jobs_(struct qb_loop* l, struct qb_poll_entry* pe)
{
uint64_t expired = 0;
ssize_t bytes = 0;
assert(pe->type == QB_TIMER);
if (pe->ufd.revents == POLLIN) {
bytes = read(pe->ufd.fd, &expired, sizeof(expired));
if (bytes != sizeof(expired)) {
qb_util_perror(LOG_WARNING,
"couldn't read from timer fd %zd", bytes);
}
qb_loop_level_item_add(&l->level[pe->p], &pe->item);
} else {
qb_util_log(LOG_ERR, "timer revents: %d expected %d",
pe->ufd.revents, POLLIN);
}
close(pe->ufd.fd);
pe->ufd.fd = -1;
pe->state = QB_POLL_ENTRY_JOBLIST;
return 1;
}
int32_t qb_loop_timer_msec_duration_to_expire(struct qb_loop_source *timer_source)
{
return -1;
}
struct qb_loop_source*
qb_loop_timer_create(struct qb_loop *l)
{
return NULL;
}
void qb_loop_timer_destroy(struct qb_loop *l)
{
}
int32_t qb_loop_timer_add(struct qb_loop *l,
enum qb_loop_priority p,
uint64_t nsec_duration,
void *data,
qb_loop_timer_dispatch_fn timer_fn,
qb_loop_timer_handle * timer_handle_out)
{
struct qb_poll_entry *pe;
int32_t fd;
int32_t res;
int32_t size, new_size;
struct itimerspec its;
if (l == NULL || timer_fn == NULL) {
qb_util_log(LOG_ERR,
"can't add a timer with either (l == NULL || timer_fn == NULL)");
return -EINVAL;
}
if (timer_handle_out == NULL) {
qb_util_log(LOG_ERR, "can't add a timer with (timer_handle_out == NULL)");
return -ENOENT;
}
fd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC|TFD_NONBLOCK);
if (fd == -1) {
res = -errno;
qb_util_perror(LOG_ERR, "failed to create timer");
return res;
}
its.it_interval.tv_sec = 0;
its.it_interval.tv_nsec = 0;
its.it_value.tv_sec = nsec_duration / QB_TIME_NS_IN_SEC;
its.it_value.tv_nsec = nsec_duration % QB_TIME_NS_IN_SEC;
res = timerfd_settime(fd, 0, &its, NULL);
if (res == -1) {
res = -errno;
qb_util_perror(LOG_ERR, "failed to set time on timer");
goto close_and_return;
}
size = ((struct qb_poll_source *)l->fd_source)->poll_entry_count;
res = _poll_add_(l, p, fd, POLLIN, data, &pe);
if (res != 0) {
goto close_and_return;
}
new_size = ((struct qb_poll_source *)l->fd_source)->poll_entry_count;
if (new_size > size) {
- qb_util_log(LOG_DEBUG, "grown poll array to %d for timer %d",
+ qb_util_log(LOG_TRACE, "grown poll array to %d for timer %d",
new_size, fd);
}
pe->timer_dispatch_fn = timer_fn;
pe->type = QB_TIMER;
pe->add_to_jobs = _qb_timer_add_to_jobs_;
*timer_handle_out = (((uint64_t) (pe->check)) << 32) | pe->install_pos;
return res;
close_and_return:
close(fd);
return res;
}
int32_t qb_loop_timer_del(struct qb_loop *l, qb_loop_timer_handle th)
{
struct qb_poll_entry *pe;
struct qb_poll_source *s;
int32_t res;
if (l == NULL || th == 0) {
return -EINVAL;
}
s = (struct qb_poll_source *)l->fd_source;
res = _poll_entry_from_handle_(s, th, &pe);
if (res != 0) {
return res;
}
if (pe->type != QB_TIMER) {
qb_util_log(LOG_WARNING,
"trying to delete timer but handle points to type:%d",
pe->type);
return -EINVAL;
}
if (pe->state == QB_POLL_ENTRY_DELETED) {
return 0;
}
if (pe->state == QB_POLL_ENTRY_JOBLIST) {
qb_loop_level_item_del(&l->level[pe->p], &pe->item);
qb_util_log(LOG_DEBUG, "trying to delete timer in JOBLIST");
}
if (pe->state != QB_POLL_ENTRY_ACTIVE &&
pe->state != QB_POLL_ENTRY_JOBLIST) {
qb_util_log(LOG_WARNING, "trying to delete timer with state:%d",
pe->state);
return -EINVAL;
}
if (pe->ufd.fd != -1) {
#ifdef HAVE_EPOLL
if (epoll_ctl(s->epollfd, EPOLL_CTL_DEL, pe->ufd.fd, NULL) == -1) {
res = -errno;
qb_util_perror(LOG_WARNING, "epoll_ctl(del:%d)", pe->ufd.fd);
}
#else
s->ufds[pe->install_pos].fd = -1;
s->ufds[pe->install_pos].events = 0;
s->ufds[pe->install_pos].revents = 0;
#endif /* HAVE_EPOLL */
close(pe->ufd.fd);
}
_poll_entry_mark_deleted_(pe);
return 0;
}
uint64_t qb_loop_timer_expire_time_get(struct qb_loop *l, qb_loop_timer_handle th)
{
struct qb_poll_entry *pe;
struct qb_poll_source *s;
int32_t res = 0;
struct itimerspec its;
if (l == NULL || th == 0) {
return 0;
}
s = (struct qb_poll_source *)l->fd_source;
res = _poll_entry_from_handle_(s, th, &pe);
if (res != 0) {
return res;
}
if (timerfd_gettime(pe->ufd.fd, &its) == -1) {
return 0;
}
return (its.it_value.tv_sec * QB_TIME_NS_IN_SEC) + its.it_value.tv_nsec;
}
#endif /* HAVE_TIMERFD */
static int32_t pipe_fds[2] = {-1, -1};
struct qb_signal_source {
struct qb_loop_source s;
struct qb_list_head sig_head;
sigset_t signal_superset;
};
struct qb_loop_sig {
struct qb_loop_item item;
int32_t signal;
enum qb_loop_priority p;
qb_loop_signal_dispatch_fn dispatch_fn;
struct qb_loop_sig *cloned_from;
};
static void _handle_real_signal_(int signal_num, siginfo_t * si, void *context)
{
int32_t sig = signal_num;
int32_t res = 0;
if (pipe_fds[1] > 0) {
try_again:
res = write(pipe_fds[1], &sig, sizeof(int32_t));
if (res == -1 && errno == EAGAIN) {
goto try_again;
} else if (res != sizeof(int32_t)) {
qb_util_log(LOG_ERR, "failed to write signal to pipe [%d]",
res);
}
}
}
static void _signal_dispatch_and_take_back_(struct qb_loop_item * item,
enum qb_loop_priority p)
{
struct qb_loop_sig *sig = (struct qb_loop_sig *)item;
int32_t res;
res = sig->dispatch_fn(sig->signal, sig->item.user_data);
if (res != 0) {
qb_list_del(&sig->cloned_from->item.list);
free(sig->cloned_from);
}
free(sig);
}
struct qb_loop_source *
qb_loop_signals_create(struct qb_loop *l)
{
int32_t res = 0;
struct qb_poll_entry *pe;
struct qb_signal_source *s = calloc(1, sizeof(struct qb_signal_source));
s->s.l = l;
s->s.dispatch_and_take_back = _signal_dispatch_and_take_back_;
s->s.poll = NULL;
qb_list_init(&s->sig_head);
sigemptyset(&s->signal_superset);
if (pipe_fds[0] < 0) {
res = pipe(pipe_fds);
if (res == -1) {
res = -errno;
qb_util_perror(LOG_ERR, "Can't light pipe");
goto error_exit;
}
(void)qb_util_fd_nonblock_cloexec_set(pipe_fds[0]);
(void)qb_util_fd_nonblock_cloexec_set(pipe_fds[1]);
res = _poll_add_(l, QB_LOOP_HIGH,
pipe_fds[0], POLLIN,
NULL, &pe);
if (res == 0) {
pe->poll_dispatch_fn = NULL;
pe->type = QB_SIGNAL;
pe->add_to_jobs = _qb_signal_add_to_jobs_;
} else {
qb_util_perror(LOG_ERR, "Can't smoke pipe");
goto error_exit;
}
}
return (struct qb_loop_source *)s;
error_exit:
free(s);
errno = -res;
return NULL;
}
void qb_loop_signals_destroy(struct qb_loop *l)
{
close(pipe_fds[0]);
pipe_fds[0] = -1;
close(pipe_fds[1]);
pipe_fds[1] = -1;
free(l->signal_source);
}
static int32_t _qb_signal_add_to_jobs_(struct qb_loop* l,
struct qb_poll_entry* pe)
{
struct qb_signal_source *s = (struct qb_signal_source *)l->signal_source;
struct qb_list_head *list;
struct qb_loop_sig *sig;
struct qb_loop_item *item;
struct qb_loop_sig *new_sig_job;
int32_t the_signal;
ssize_t res;
int32_t jobs_added = 0;
res = read(pipe_fds[0], &the_signal, sizeof(int32_t));
if (res != sizeof(int32_t)) {
res = -errno;
qb_util_perror(LOG_ERR, "failed to read pipe");
return 0;
}
pe->ufd.revents = 0;
qb_list_for_each(list, &s->sig_head) {
item = qb_list_entry(list, struct qb_loop_item, list);
sig = (struct qb_loop_sig *)item;
if (sig->signal == the_signal) {
new_sig_job = calloc(1, sizeof(struct qb_loop_sig));
memcpy(new_sig_job, sig, sizeof(struct qb_loop_sig));
new_sig_job->cloned_from = sig;
qb_loop_level_item_add(&l->level[pe->p], &new_sig_job->item);
jobs_added++;
}
}
return jobs_added;
}
static void _adjust_sigactions_(struct qb_signal_source *s)
{
struct qb_loop_sig *sig;
struct qb_loop_item *item;
struct sigaction sa;
int32_t i;
int32_t needed;
sa.sa_flags = SA_SIGINFO;
sa.sa_sigaction = _handle_real_signal_;
sigemptyset(&s->signal_superset);
sigemptyset(&sa.sa_mask);
/* re-set to default */
for (i = 0; i < 30; i++) {
needed = QB_FALSE;
qb_list_for_each_entry(item, &s->sig_head, list) {
sig = (struct qb_loop_sig *)item;
if (i == sig->signal) {
needed = QB_TRUE;
break;
}
}
if (needed) {
sigaddset(&s->signal_superset, i);
sigaction(i, &sa, NULL);
} else {
(void)signal(i, SIG_DFL);
}
}
}
int32_t qb_loop_signal_add(qb_loop_t *l,
enum qb_loop_priority p,
int32_t the_sig,
void *data,
qb_loop_signal_dispatch_fn dispatch_fn,
qb_loop_signal_handle *handle)
{
struct qb_loop_sig *sig;
struct qb_signal_source *s;
if (l == NULL || dispatch_fn == NULL) {
return -EINVAL;
}
if (p < QB_LOOP_LOW || p > QB_LOOP_HIGH) {
return -EINVAL;
}
s = (struct qb_signal_source *)l->signal_source;
sig = calloc(1, sizeof(struct qb_loop_sig));
sig->dispatch_fn = dispatch_fn;
sig->p = p;
sig->signal = the_sig;
sig->item.user_data = data;
sig->item.source = l->signal_source;
qb_list_init(&sig->item.list);
qb_list_add_tail(&sig->item.list, &s->sig_head);
if (sigismember(&s->signal_superset, the_sig) != 1) {
_adjust_sigactions_(s);
}
if (handle) {
*handle = sig;
}
return 0;
}
int32_t qb_loop_signal_mod(qb_loop_t *l,
enum qb_loop_priority p,
int32_t the_sig,
void *data,
qb_loop_signal_dispatch_fn dispatch_fn,
qb_loop_signal_handle handle)
{
struct qb_signal_source *s;
struct qb_loop_sig *sig = (struct qb_loop_sig *)handle;
if (l == NULL || dispatch_fn == NULL || handle == NULL) {
return -EINVAL;
}
if (p < QB_LOOP_LOW || p > QB_LOOP_HIGH) {
return -EINVAL;
}
s = (struct qb_signal_source *)l->signal_source;
sig->item.user_data = data;
sig->dispatch_fn = dispatch_fn;
sig->p = p;
if (sig->signal != the_sig) {
sig->signal = the_sig;
_adjust_sigactions_(s);
}
return 0;
}
int32_t qb_loop_signal_del(qb_loop_t *l,
qb_loop_signal_handle handle)
{
struct qb_loop_sig *sig = (struct qb_loop_sig *)handle;
qb_list_del(&sig->item.list);
free(sig);
return 0;
}
diff --git a/tests/check_array.c b/tests/check_array.c
index dff7fb5..8d3bce2 100644
--- a/tests/check_array.c
+++ b/tests/check_array.c
@@ -1,167 +1,165 @@
/*
* Copyright (c) 2010 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os_base.h"
#include <check.h>
#include <qb/qbdefs.h>
-#include <qb/qbutil.h>
+#include <qb/qblog.h>
#include <qb/qbarray.h>
struct test_my_st {
int32_t a;
int32_t b;
int32_t c;
int32_t d;
};
START_TEST(test_array_limits)
{
qb_array_t *a;
int32_t res;
struct test_my_st *st;
a = qb_array_create(INT_MAX, sizeof(struct test_my_st));
fail_unless(a == NULL);
a = qb_array_create(-56, sizeof(struct test_my_st));
fail_unless(a == NULL);
a = qb_array_create(67, 0);
fail_unless(a == NULL);
/* working array */
a = qb_array_create(10, sizeof(struct test_my_st));
fail_if(a == NULL);
/* out-of-bounds */
res = qb_array_index(a, 10, (void**)&st);
ck_assert_int_eq(res, -ERANGE);
res = qb_array_index(a, -10, (void**)&st);
ck_assert_int_eq(res, -ERANGE);
res = qb_array_index(NULL, 1, (void**)&st);
ck_assert_int_eq(res, -EINVAL);
res = qb_array_index(a, -10, NULL);
ck_assert_int_eq(res, -EINVAL);
qb_array_free(a);
}
END_TEST
START_TEST(test_array_correct_retrieval)
{
qb_array_t *a;
int32_t i;
int32_t res;
struct test_my_st *st;
a = qb_array_create(112, sizeof(struct test_my_st));
for (i = 0; i < 112; i++) {
res = qb_array_index(a, i, (void**)&st);
ck_assert_int_eq(res, 0);
st->a = i;
st->b = i+1;
st->c = i+2;
st->d = i+3;
}
/* read back */
for (i = 0; i < 112; i++) {
res = qb_array_index(a, i, (void**)&st);
ck_assert_int_eq(res, 0);
ck_assert_int_eq(st->a, i);
ck_assert_int_eq(st->b, i+1);
ck_assert_int_eq(st->c, i+2);
ck_assert_int_eq(st->d, i+3);
}
qb_array_free(a);
}
END_TEST
START_TEST(test_array_static_memory)
{
qb_array_t *a;
int32_t res;
struct test_my_st *st_old;
struct test_my_st *st;
a = qb_array_create(112, sizeof(struct test_my_st));
res = qb_array_index(a, 99, (void**)&st_old);
ck_assert_int_eq(res, 0);
res = qb_array_grow(a, 1453);
ck_assert_int_eq(res, 0);
res = qb_array_index(a, 345, (void**)&st);
st->a = 411;
/* confirm the pointer is the same after a grow */
res = qb_array_index(a, 99, (void**)&st);
ck_assert_int_eq(res, 0);
fail_unless(st == st_old);
qb_array_free(a);
}
END_TEST
static Suite *array_suite(void)
{
TCase *tc;
Suite *s = suite_create("qb_array");
tc = tcase_create("limits");
tcase_add_test(tc, test_array_limits);
suite_add_tcase(s, tc);
tc = tcase_create("correct_retrieval");
tcase_add_test(tc, test_array_correct_retrieval);
suite_add_tcase(s, tc);
tc = tcase_create("static_memory");
tcase_add_test(tc, test_array_static_memory);
suite_add_tcase(s, tc);
return s;
}
-static void libqb_log_fn(const char *file_name,
- int32_t file_line, int32_t severity, const char *msg)
-{
- printf("libqb: %s:%d %s\n", file_name, file_line, msg);
-}
-
int32_t main(void)
{
int32_t number_failed;
Suite *s = array_suite();
SRunner *sr = srunner_create(s);
- qb_util_set_log_function(libqb_log_fn);
+ qb_log_init("check", LOG_USER, LOG_EMERG);
+ qb_log_ctl(QB_LOG_SYSLOG, QB_LOG_CONF_ENABLED, QB_FALSE);
+ qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD,
+ QB_LOG_FILTER_FILE, "*", LOG_INFO);
+ qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE);
srunner_run_all(sr, CK_VERBOSE);
number_failed = srunner_ntests_failed(sr);
srunner_free(sr);
return (number_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE;
}
diff --git a/tests/check_ipc.c b/tests/check_ipc.c
index be8681c..04d2239 100644
--- a/tests/check_ipc.c
+++ b/tests/check_ipc.c
@@ -1,580 +1,576 @@
/*
* Copyright (c) 2010 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
-#include <syslog.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <errno.h>
#include <signal.h>
#include <check.h>
#include <qb/qbdefs.h>
-#include <qb/qbutil.h>
+#include <qb/qblog.h>
#include <qb/qbipcc.h>
#include <qb/qbipcs.h>
#include <qb/qbloop.h>
#define IPC_NAME "ipc_test"
#define MAX_MSG_SIZE (8192*16)
static qb_ipcc_connection_t *conn;
static enum qb_ipc_type ipc_type;
enum my_msg_ids {
IPC_MSG_REQ_TX_RX,
IPC_MSG_RES_TX_RX,
IPC_MSG_REQ_DISPATCH,
IPC_MSG_RES_DISPATCH,
IPC_MSG_REQ_SERVER_FAIL,
IPC_MSG_RES_SERVER_FAIL,
};
/* Test Cases
*
* 1) basic send & recv differnet message sizes
*
* 2) send message to start dispatch (confirm receipt)
*
* 3) flow control
*
* 4) authentication
*
* 5) thread safety
*
* 6) cleanup
*
* 7) service availabilty
*
* 8) multiple services
*/
static qb_loop_t *my_loop;
static qb_ipcs_service_t* s1;
static int32_t turn_on_fc = QB_FALSE;
static int32_t fc_enabled = 89;
static int32_t exit_handler(int32_t rsignal, void *data)
{
qb_ipcs_destroy(s1);
qb_loop_stop(my_loop);
exit(0);
return -1;
}
static int32_t s1_msg_process_fn(qb_ipcs_connection_t *c,
void *data, size_t size)
{
struct qb_ipc_request_header *req_pt = (struct qb_ipc_request_header *)data;
struct qb_ipc_response_header response;
ssize_t res;
if (req_pt->id == IPC_MSG_REQ_TX_RX) {
response.size = sizeof(struct qb_ipc_response_header);
response.id = IPC_MSG_RES_TX_RX;
response.error = 0;
res = qb_ipcs_response_send(c, &response,
sizeof(response));
if (res < 0) {
- perror("qb_ipcs_response_send");
+ qb_perror(LOG_INFO, "qb_ipcs_response_send");
}
if (turn_on_fc) {
qb_ipcs_request_rate_limit(s1, QB_IPCS_RATE_OFF);
}
} else if (req_pt->id == IPC_MSG_REQ_DISPATCH) {
response.size = sizeof(struct qb_ipc_response_header);
response.id = IPC_MSG_RES_DISPATCH;
response.error = 0;
res = qb_ipcs_event_send(c, &response,
sizeof(response));
if (res < 0) {
- perror("qb_ipcs_event_send");
+ qb_perror(LOG_INFO, "qb_ipcs_event_send");
}
} else if (req_pt->id == IPC_MSG_REQ_SERVER_FAIL) {
qb_ipcs_destroy(s1);
exit(0);
}
return 0;
}
-static void ipc_log_fn(const char *file_name,
- int32_t file_line, int32_t severity, const char *msg)
-{
- if (severity < LOG_INFO)
- fprintf(stderr, "%s:%d [%d] %s\n", file_name, file_line, severity, msg);
-}
-
static int32_t my_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t events,
void *data, qb_ipcs_dispatch_fn_t fn)
{
return qb_loop_poll_add(my_loop, p, fd, events, data, fn);
}
static int32_t my_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t events,
void *data, qb_ipcs_dispatch_fn_t fn)
{
return qb_loop_poll_mod(my_loop, p, fd, events, data, fn);
}
static int32_t my_dispatch_del(int32_t fd)
{
return qb_loop_poll_del(my_loop, fd);
}
static void run_ipc_server(void)
{
int32_t res;
qb_loop_signal_handle handle;
struct qb_ipcs_service_handlers sh = {
.connection_accept = NULL,
.connection_created = NULL,
.msg_process = s1_msg_process_fn,
.connection_destroyed = NULL,
.connection_closed = NULL,
};
struct qb_ipcs_poll_handlers ph = {
.dispatch_add = my_dispatch_add,
.dispatch_mod = my_dispatch_mod,
.dispatch_del = my_dispatch_del,
};
qb_loop_signal_add(my_loop, QB_LOOP_HIGH, SIGSTOP,
NULL, exit_handler, &handle);
qb_loop_signal_add(my_loop, QB_LOOP_HIGH, SIGTERM,
NULL, exit_handler, &handle);
my_loop = qb_loop_create();
s1 = qb_ipcs_create(IPC_NAME, 4, ipc_type, &sh);
fail_if(s1 == 0);
qb_ipcs_poll_handlers_set(s1, &ph);
res = qb_ipcs_run(s1);
ck_assert_int_eq(res, 0);
qb_loop_run(my_loop);
}
static int32_t run_function_in_new_process(void (*run_ipc_server_fn)(void))
{
pid_t pid = fork ();
if (pid == -1) {
fprintf (stderr, "Can't fork\n");
return -1;
}
if (pid == 0) {
run_ipc_server_fn();
return 0;
}
return pid;
}
static int32_t stop_process(pid_t pid)
{
kill(pid, SIGTERM);
waitpid(pid, NULL, 0);
return 0;
}
#define IPC_BUF_SIZE (1024 * 1024)
static char buffer[IPC_BUF_SIZE];
static int32_t send_and_check(uint32_t size, int32_t ms_timeout)
{
struct qb_ipc_request_header *req_header = (struct qb_ipc_request_header *)buffer;
struct qb_ipc_response_header res_header;
int32_t res;
int32_t try_times = 0;
req_header->id = IPC_MSG_REQ_TX_RX;
req_header->size = sizeof(struct qb_ipc_request_header) + size;
repeat_send:
res = qb_ipcc_send(conn, req_header, req_header->size);
try_times++;
if (res < 0) {
if (res == -EAGAIN && try_times < 10) {
goto repeat_send;
} else {
if (res == -EAGAIN && try_times >= 10) {
fc_enabled = QB_TRUE;
}
errno = -res;
- perror("qb_ipcc_send");
+ qb_perror(LOG_INFO, "qb_ipcc_send");
return res;
}
}
repeat_recv:
res = qb_ipcc_recv(conn, &res_header,
sizeof(struct qb_ipc_response_header), ms_timeout);
if (res == -EINTR) {
return -1;
}
if (res == -EAGAIN || res == -ETIMEDOUT) {
goto repeat_recv;
}
ck_assert_int_eq(res, sizeof(struct qb_ipc_response_header));
ck_assert_int_eq(res_header.id, IPC_MSG_RES_TX_RX);
ck_assert_int_eq(res_header.size, sizeof(struct qb_ipc_response_header));
return 0;
}
static int32_t recv_timeout = -1;
static void test_ipc_txrx(void)
{
int32_t j;
int32_t c = 0;
size_t size;
pid_t pid;
pid = run_function_in_new_process(run_ipc_server);
fail_if(pid == -1);
sleep(1);
do {
conn = qb_ipcc_connect(IPC_NAME, MAX_MSG_SIZE);
if (conn == NULL) {
j = waitpid(pid, NULL, WNOHANG);
ck_assert_int_eq(j, 0);
sleep(1);
c++;
}
} while (conn == NULL && c < 5);
fail_if(conn == NULL);
size = QB_MIN(sizeof(struct qb_ipc_request_header), 64);
for (j = 1; j < 19; j++) {
size *= 2;
if (size >= MAX_MSG_SIZE)
break;
if (send_and_check(size, recv_timeout) < 0) {
break;
}
}
if (turn_on_fc) {
ck_assert_int_eq(fc_enabled, QB_TRUE);
}
qb_ipcc_disconnect(conn);
stop_process(pid);
}
START_TEST(test_ipc_txrx_shm_tmo)
{
ipc_type = QB_IPC_SHM;
recv_timeout = 1000;
test_ipc_txrx();
}
END_TEST
START_TEST(test_ipc_txrx_shm_block)
{
ipc_type = QB_IPC_SHM;
recv_timeout = -1;
test_ipc_txrx();
}
END_TEST
START_TEST(test_ipc_txrx_us_block)
{
ipc_type = QB_IPC_SOCKET;
recv_timeout = -1;
test_ipc_txrx();
}
END_TEST
START_TEST(test_ipc_txrx_us_tmo)
{
ipc_type = QB_IPC_SOCKET;
recv_timeout = 1000;
test_ipc_txrx();
}
END_TEST
START_TEST(test_ipc_fc_shm)
{
turn_on_fc = QB_TRUE;
ipc_type = QB_IPC_SHM;
test_ipc_txrx();
}
END_TEST
START_TEST(test_ipc_fc_us)
{
turn_on_fc = QB_TRUE;
ipc_type = QB_IPC_SOCKET;
test_ipc_txrx();
}
END_TEST
START_TEST(test_ipc_txrx_pmq)
{
ipc_type = QB_IPC_POSIX_MQ;
test_ipc_txrx();
}
END_TEST
START_TEST(test_ipc_txrx_smq)
{
ipc_type = QB_IPC_SYSV_MQ;
test_ipc_txrx();
}
END_TEST
static void test_ipc_dispatch(void)
{
int32_t res;
int32_t j;
int32_t c = 0;
pid_t pid;
struct qb_ipc_request_header req_header;
struct qb_ipc_response_header *res_header = (struct qb_ipc_response_header*)buffer;
pid = run_function_in_new_process(run_ipc_server);
fail_if(pid == -1);
sleep(1);
do {
conn = qb_ipcc_connect(IPC_NAME, MAX_MSG_SIZE);
if (conn == NULL) {
j = waitpid(pid, NULL, WNOHANG);
ck_assert_int_eq(j, 0);
sleep(1);
c++;
}
} while (conn == NULL && c < 5);
fail_if(conn == NULL);
req_header.id = IPC_MSG_REQ_DISPATCH;
req_header.size = sizeof(struct qb_ipc_request_header);
repeat_send:
res = qb_ipcc_send(conn, &req_header, req_header.size);
if (res < 0) {
if (res == -EAGAIN) {
goto repeat_send;
} else if (res == -EINVAL || res == -EINTR) {
- perror("qb_ipcc_send");
+ qb_perror(LOG_INFO, "qb_ipcc_send");
return;
} else {
errno = -res;
- perror("qb_ipcc_send");
+ qb_perror(LOG_INFO, "qb_ipcc_send");
goto repeat_send;
}
}
repeat_event_recv:
res = qb_ipcc_event_recv(conn, res_header, IPC_BUF_SIZE, 0);
if (res < 0) {
if (res == -EAGAIN || res == -ETIMEDOUT) {
goto repeat_event_recv;
} else {
errno = -res;
- perror("qb_ipcc_event_recv");
+ qb_perror(LOG_INFO, "qb_ipcc_event_recv");
goto repeat_send;
}
}
ck_assert_int_eq(res, sizeof(struct qb_ipc_response_header));
ck_assert_int_eq(res_header->id, IPC_MSG_RES_DISPATCH);
qb_ipcc_disconnect(conn);
stop_process(pid);
}
START_TEST(test_ipc_disp_shm)
{
ipc_type = QB_IPC_SHM;
test_ipc_dispatch();
}
END_TEST
START_TEST(test_ipc_disp_us)
{
ipc_type = QB_IPC_SOCKET;
test_ipc_dispatch();
}
END_TEST
static void test_ipc_server_fail(void)
{
struct qb_ipc_request_header req_header;
struct qb_ipc_response_header res_header;
int32_t res;
int32_t try_times = 0;
int32_t j;
int32_t c = 0;
pid_t pid;
pid = run_function_in_new_process(run_ipc_server);
fail_if(pid == -1);
sleep(1);
do {
conn = qb_ipcc_connect(IPC_NAME, MAX_MSG_SIZE);
if (conn == NULL) {
j = waitpid(pid, NULL, WNOHANG);
ck_assert_int_eq(j, 0);
sleep(1);
c++;
}
} while (conn == NULL && c < 5);
fail_if(conn == NULL);
/*
* tell the server to exit
*/
req_header.id = IPC_MSG_REQ_SERVER_FAIL;
req_header.size = sizeof(struct qb_ipc_request_header);
repeat_send:
res = qb_ipcc_send(conn, &req_header, req_header.size);
try_times++;
if (res < 0) {
if (res == -EAGAIN && try_times < 10) {
goto repeat_send;
}
ck_assert_int_eq(res, 0);
}
/*
* wait a bit for the server to die.
*/
sleep(1);
/*
* try recv from the exit'ed server
*/
res = qb_ipcc_recv(conn, &res_header,
sizeof(struct qb_ipc_response_header), 100);
/*
* confirm we get -ENOTCONN
*/
ck_assert_int_eq(res, -ENOTCONN);
qb_ipcc_disconnect(conn);
stop_process(pid);
}
START_TEST(test_ipc_server_fail_soc)
{
ipc_type = QB_IPC_SOCKET;
test_ipc_server_fail();
}
END_TEST
START_TEST(test_ipc_server_fail_shm)
{
ipc_type = QB_IPC_SHM;
test_ipc_server_fail();
}
END_TEST
static Suite *ipc_suite(void)
{
TCase *tc;
uid_t uid;
Suite *s = suite_create("ipc");
tc = tcase_create("ipc_server_fail_shm");
tcase_add_test(tc, test_ipc_server_fail_shm);
tcase_set_timeout(tc, 6);
suite_add_tcase(s, tc);
tc = tcase_create("ipc_server_fail_soc");
tcase_add_test(tc, test_ipc_server_fail_soc);
tcase_set_timeout(tc, 6);
suite_add_tcase(s, tc);
tc = tcase_create("ipc_txrx_shm_block");
tcase_add_test(tc, test_ipc_txrx_shm_block);
tcase_set_timeout(tc, 6);
suite_add_tcase(s, tc);
tc = tcase_create("ipc_txrx_shm_tmo");
tcase_add_test(tc, test_ipc_txrx_shm_tmo);
tcase_set_timeout(tc, 6);
suite_add_tcase(s, tc);
tc = tcase_create("ipc_txrx_us_block");
tcase_add_test(tc, test_ipc_txrx_us_block);
tcase_set_timeout(tc, 6);
suite_add_tcase(s, tc);
tc = tcase_create("ipc_txrx_us_tmo");
tcase_add_test(tc, test_ipc_txrx_us_tmo);
tcase_set_timeout(tc, 6);
suite_add_tcase(s, tc);
tc = tcase_create("ipc_fc_shm");
tcase_add_test(tc, test_ipc_fc_shm);
tcase_set_timeout(tc, 6);
suite_add_tcase(s, tc);
tc = tcase_create("ipc_fc_us");
tcase_add_test(tc, test_ipc_fc_us);
tcase_set_timeout(tc, 6);
suite_add_tcase(s, tc);
uid = geteuid();
if (uid == 0) {
tc = tcase_create("ipc_txrx_posix_mq");
tcase_add_test(tc, test_ipc_txrx_pmq);
tcase_set_timeout(tc, 10);
suite_add_tcase(s, tc);
tc = tcase_create("ipc_txrx_sysv_mq");
tcase_add_test(tc, test_ipc_txrx_smq);
tcase_set_timeout(tc, 10);
suite_add_tcase(s, tc);
}
tc = tcase_create("ipc_dispatch_shm");
tcase_add_test(tc, test_ipc_disp_shm);
tcase_set_timeout(tc, 16);
suite_add_tcase(s, tc);
tc = tcase_create("ipc_dispatch_us");
tcase_add_test(tc, test_ipc_disp_us);
tcase_set_timeout(tc, 16);
suite_add_tcase(s, tc);
return s;
}
int32_t main(void)
{
int32_t number_failed;
Suite *s = ipc_suite();
SRunner *sr = srunner_create(s);
- qb_util_set_log_function(ipc_log_fn);
+ qb_log_init("check", LOG_USER, LOG_EMERG);
+ qb_log_ctl(QB_LOG_SYSLOG, QB_LOG_CONF_ENABLED, QB_FALSE);
+ qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD,
+ QB_LOG_FILTER_FILE, "*", LOG_INFO);
+ qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE);
srunner_run_all(sr, CK_VERBOSE);
number_failed = srunner_ntests_failed(sr);
srunner_free(sr);
return (number_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE;
}
diff --git a/tests/check_loop.c b/tests/check_loop.c
index 7b665a9..d1842bc 100644
--- a/tests/check_loop.c
+++ b/tests/check_loop.c
@@ -1,441 +1,440 @@
/*
* Copyright (c) 2010 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os_base.h"
#include <check.h>
#include <qb/qbdefs.h>
#include <qb/qbutil.h>
#include <qb/qbloop.h>
+#include <qb/qblog.h>
static int32_t job_1_run_count = 0;
static int32_t job_2_run_count = 0;
static int32_t job_3_run_count = 0;
static void job_1(void *data)
{
job_1_run_count++;
}
static void job_stop(void *data)
{
qb_loop_t *l = (qb_loop_t *)data;
job_3_run_count++;
qb_loop_stop(l);
}
static void job_2(void *data)
{
int32_t res;
qb_loop_t *l = (qb_loop_t *)data;
job_2_run_count++;
res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_stop);
ck_assert_int_eq(res, 0);
}
static void job_1_r(void *data)
{
int32_t res;
qb_loop_t *l = (qb_loop_t *)data;
job_1_run_count++;
res = qb_loop_job_add(l, QB_LOOP_MED, data, job_2);
ck_assert_int_eq(res, 0);
}
static void job_1_add_nuts(void *data)
{
int32_t res;
qb_loop_t *l = (qb_loop_t *)data;
job_1_run_count++;
res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_1);
res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_1);
res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_1);
res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_1);
res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_1);
res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_1);
res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_1);
res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_1);
res = qb_loop_job_add(l, QB_LOOP_MED, data, job_1);
res = qb_loop_job_add(l, QB_LOOP_MED, data, job_1);
res = qb_loop_job_add(l, QB_LOOP_MED, data, job_1);
res = qb_loop_job_add(l, QB_LOOP_MED, data, job_1);
res = qb_loop_job_add(l, QB_LOOP_LOW, data, job_1);
res = qb_loop_job_add(l, QB_LOOP_LOW, data, job_1);
if (job_1_run_count < 500) {
res = qb_loop_job_add(l, QB_LOOP_LOW, data, job_1_add_nuts);
} else {
res = qb_loop_job_add(l, QB_LOOP_LOW, data, job_stop);
}
ck_assert_int_eq(res, 0);
}
START_TEST(test_loop_job_input)
{
int32_t res;
qb_loop_t *l = qb_loop_create();
fail_if(l == NULL);
res = qb_loop_job_add(NULL, QB_LOOP_LOW, NULL, job_2);
ck_assert_int_eq(res, -EINVAL);
res = qb_loop_job_add(l, 89, NULL, job_2);
ck_assert_int_eq(res, -EINVAL);
res = qb_loop_job_add(l, QB_LOOP_LOW, NULL, NULL);
ck_assert_int_eq(res, -EINVAL);
qb_loop_destroy(l);
}
END_TEST
START_TEST(test_loop_job_1)
{
int32_t res;
qb_loop_t *l = qb_loop_create();
fail_if(l == NULL);
res = qb_loop_job_add(l, QB_LOOP_LOW, NULL, job_1);
ck_assert_int_eq(res, 0);
res = qb_loop_job_add(l, QB_LOOP_LOW, l, job_stop);
ck_assert_int_eq(res, 0);
qb_loop_run(l);
ck_assert_int_eq(job_1_run_count, 1);
qb_loop_destroy(l);
}
END_TEST
START_TEST(test_loop_job_4)
{
int32_t res;
qb_loop_t *l = qb_loop_create();
fail_if(l == NULL);
res = qb_loop_job_add(l, QB_LOOP_LOW, l, job_1_r);
ck_assert_int_eq(res, 0);
qb_loop_run(l);
ck_assert_int_eq(job_1_run_count, 1);
ck_assert_int_eq(job_2_run_count, 1);
ck_assert_int_eq(job_3_run_count, 1);
qb_loop_destroy(l);
}
END_TEST
START_TEST(test_loop_job_nuts)
{
int32_t res;
qb_loop_t *l = qb_loop_create();
fail_if(l == NULL);
res = qb_loop_job_add(l, QB_LOOP_LOW, l, job_1_add_nuts);
ck_assert_int_eq(res, 0);
qb_loop_run(l);
fail_if(job_1_run_count < 500);
qb_loop_destroy(l);
}
END_TEST
static Suite *loop_job_suite(void)
{
TCase *tc;
Suite *s = suite_create("loop_job");
tc = tcase_create("limits");
tcase_add_test(tc, test_loop_job_input);
suite_add_tcase(s, tc);
tc = tcase_create("run_one");
tcase_add_test(tc, test_loop_job_1);
suite_add_tcase(s, tc);
tc = tcase_create("run_recursive");
tcase_add_test(tc, test_loop_job_4);
suite_add_tcase(s, tc);
tc = tcase_create("run_500");
tcase_add_test(tc, test_loop_job_nuts);
suite_add_tcase(s, tc);
return s;
}
/*
* -----------------------------------------------------------------------
* Timers
*/
static qb_loop_timer_handle test_th;
START_TEST(test_loop_timer_input)
{
int32_t res;
qb_loop_t *l = qb_loop_create();
fail_if(l == NULL);
res = qb_loop_timer_add(NULL, QB_LOOP_LOW, 5*QB_TIME_NS_IN_MSEC, NULL, job_2, &test_th);
ck_assert_int_eq(res, -EINVAL);
res = qb_loop_timer_add(l, QB_LOOP_LOW, 5*QB_TIME_NS_IN_MSEC, l, NULL, &test_th);
ck_assert_int_eq(res, -EINVAL);
res = qb_loop_timer_add(l, QB_LOOP_LOW, 5*QB_TIME_NS_IN_MSEC, l, job_1, NULL);
ck_assert_int_eq(res, -ENOENT);
qb_loop_destroy(l);
}
END_TEST
static void one_shot_tmo(void*data)
{
static int32_t been_here = QB_FALSE;
ck_assert_int_eq(been_here, QB_FALSE);
been_here = QB_TRUE;
}
static qb_loop_timer_handle reset_th;
static int32_t reset_timer_step = 0;
static void reset_one_shot_tmo(void*data)
{
int32_t res;
qb_loop_t *l = data;
if (reset_timer_step == 0) {
res = qb_loop_timer_del(l, reset_th);
ck_assert_int_eq(res, -EINVAL);
res = qb_loop_timer_add(l, QB_LOOP_LOW, 8*QB_TIME_NS_IN_MSEC, l, reset_one_shot_tmo, &reset_th);
ck_assert_int_eq(res, 0);
}
reset_timer_step++;
}
START_TEST(test_loop_timer_basic)
{
int32_t res;
qb_loop_t *l = qb_loop_create();
fail_if(l == NULL);
res = qb_loop_timer_add(l, QB_LOOP_LOW, 5*QB_TIME_NS_IN_MSEC, l, one_shot_tmo, &test_th);
ck_assert_int_eq(res, 0);
res = qb_loop_timer_add(l, QB_LOOP_LOW, 7*QB_TIME_NS_IN_MSEC, l, reset_one_shot_tmo, &reset_th);
ck_assert_int_eq(res, 0);
res = qb_loop_timer_add(l, QB_LOOP_LOW, 60*QB_TIME_NS_IN_MSEC, l, job_stop, &test_th);
ck_assert_int_eq(res, 0);
qb_loop_run(l);
ck_assert_int_eq(reset_timer_step, 2);
qb_loop_destroy(l);
}
END_TEST
struct qb_stop_watch {
uint64_t start;
uint64_t end;
qb_loop_t *l;
uint64_t ns_timer;
int64_t total;
int32_t count;
int32_t killer;
qb_loop_timer_handle th;
};
static void stop_watch_tmo(void*data)
{
struct qb_stop_watch *sw = (struct qb_stop_watch *)data;
float per;
int64_t diff;
sw->end = qb_util_nano_current_get();
diff = sw->end - sw->start;
if (diff < sw->ns_timer) {
printf("timer expired early! by %"PRIi64"\n", (int64_t)(sw->ns_timer - diff));
}
ck_assert(diff >= sw->ns_timer);
sw->total += diff;
sw->total -= sw->ns_timer;
sw->start = sw->end;
sw->count++;
if (sw->count < 50) {
qb_loop_timer_add(sw->l, QB_LOOP_LOW, sw->ns_timer, data, stop_watch_tmo, &sw->th);
} else {
per = ((sw->total * 100) / sw->count) / (float)sw->ns_timer;
printf("average error for %"PRIu64" ns timer is %"PRIi64" (ns) (%f)\n",
sw->ns_timer,
sw->total/sw->count, per);
if (sw->killer) {
qb_loop_stop(sw->l);
}
}
}
static void start_timer(qb_loop_t *l, struct qb_stop_watch *sw, uint64_t timeout, int32_t killer)
{
int32_t res;
sw->l = l;
sw->count = 0;
sw->total = 0;
sw->killer = killer;
sw->ns_timer = timeout;
sw->start = qb_util_nano_current_get();
res = qb_loop_timer_add(sw->l, QB_LOOP_LOW, sw->ns_timer, sw, stop_watch_tmo, &sw->th);
ck_assert_int_eq(res, 0);
}
START_TEST(test_loop_timer_precision)
{
int32_t i;
uint64_t tmo;
struct qb_stop_watch sw[11];
qb_loop_t *l = qb_loop_create();
fail_if(l == NULL);
for (i = 0; i < 10; i++) {
tmo = ((1 + i * 9) * QB_TIME_NS_IN_MSEC) + 500000;
start_timer(l, &sw[i], tmo, QB_FALSE);
}
start_timer(l, &sw[i], 100 * QB_TIME_NS_IN_MSEC, QB_TRUE);
qb_loop_run(l);
qb_loop_destroy(l);
}
END_TEST
static int expire_leak_counter = 0;
#define EXPIRE_NUM_RUNS 10
static int expire_leak_runs = 0;
static void empty_func_tmo(void*data)
{
expire_leak_counter++;
}
static void stop_func_tmo(void*data)
{
qb_loop_t *l = (qb_loop_t *)data;
- printf("%s(%d)\n", __func__, expire_leak_counter);
+ qb_log(LOG_DEBUG, "expire_leak_counter:%d", expire_leak_counter);
qb_loop_stop(l);
}
static void next_func_tmo(void*data)
{
qb_loop_t *l = (qb_loop_t *)data;
int32_t i;
uint64_t tmo;
uint64_t max_tmo = 0;
qb_loop_timer_handle th;
- printf("%s(%d)\n", __func__, expire_leak_counter);
+ qb_log(LOG_DEBUG, "expire_leak_counter:%d", expire_leak_counter);
for (i = 0; i < 300; i++) {
tmo = ((1 + i) * QB_TIME_NS_IN_MSEC) + 500000;
qb_loop_timer_add(l, QB_LOOP_LOW, tmo, NULL, empty_func_tmo, &th);
qb_loop_timer_add(l, QB_LOOP_MED, tmo, NULL, empty_func_tmo, &th);
qb_loop_timer_add(l, QB_LOOP_HIGH, tmo, NULL, empty_func_tmo, &th);
max_tmo = QB_MAX(max_tmo, tmo);
}
expire_leak_runs++;
if (expire_leak_runs == EXPIRE_NUM_RUNS) {
qb_loop_timer_add(l, QB_LOOP_LOW, max_tmo, l, stop_func_tmo, &th);
} else {
qb_loop_timer_add(l, QB_LOOP_LOW, max_tmo, l, next_func_tmo, &th);
}
}
/*
* make sure that file descriptors don't get leaked with no qb_loop_timer_del()
*/
START_TEST(test_loop_timer_expire_leak)
{
int32_t i;
uint64_t tmo;
uint64_t max_tmo = 0;
qb_loop_timer_handle th;
qb_loop_t *l = qb_loop_create();
fail_if(l == NULL);
expire_leak_counter = 0;
for (i = 0; i < 300; i++) {
tmo = ((1 + i) * QB_TIME_NS_IN_MSEC) + 500000;
qb_loop_timer_add(l, QB_LOOP_LOW, tmo, NULL, empty_func_tmo, &th);
qb_loop_timer_add(l, QB_LOOP_MED, tmo, NULL, empty_func_tmo, &th);
qb_loop_timer_add(l, QB_LOOP_HIGH, tmo, NULL, empty_func_tmo, &th);
max_tmo = QB_MAX(max_tmo, tmo);
}
qb_loop_timer_add(l, QB_LOOP_LOW, max_tmo, l, next_func_tmo, &th);
expire_leak_runs = 1;
qb_loop_run(l);
ck_assert_int_eq(expire_leak_counter, 300*3* EXPIRE_NUM_RUNS);
qb_loop_destroy(l);
}
END_TEST
static Suite *loop_timer_suite(void)
{
TCase *tc;
Suite *s = suite_create("loop_timers");
tc = tcase_create("limits");
tcase_add_test(tc, test_loop_timer_input);
suite_add_tcase(s, tc);
tc = tcase_create("basic");
tcase_add_test(tc, test_loop_timer_basic);
tcase_set_timeout(tc, 30);
suite_add_tcase(s, tc);
tc = tcase_create("precision");
tcase_add_test(tc, test_loop_timer_precision);
tcase_set_timeout(tc, 30);
suite_add_tcase(s, tc);
tc = tcase_create("expire_leak");
tcase_add_test(tc, test_loop_timer_expire_leak);
tcase_set_timeout(tc, 30);
suite_add_tcase(s, tc);
return s;
}
-static void libqb_log_fn(const char *file_name,
- int32_t file_line, int32_t severity, const char *msg)
-{
- printf("libqb: %s:%d %s\n", file_name, file_line, msg);
-}
-
int32_t main(void)
{
int32_t number_failed;
SRunner *sr = srunner_create(loop_job_suite());
srunner_add_suite (sr, loop_timer_suite());
- qb_util_set_log_function(libqb_log_fn);
+ qb_log_init("check", LOG_USER, LOG_EMERG);
+ qb_log_ctl(QB_LOG_SYSLOG, QB_LOG_CONF_ENABLED, QB_FALSE);
+ qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD,
+ QB_LOG_FILTER_FILE, "*", LOG_INFO);
+ qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE);
srunner_run_all(sr, CK_VERBOSE);
number_failed = srunner_ntests_failed(sr);
srunner_free(sr);
return (number_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE;
}
diff --git a/tests/check_rb.c b/tests/check_rb.c
index 36cdb53..d67263e 100644
--- a/tests/check_rb.c
+++ b/tests/check_rb.c
@@ -1,238 +1,235 @@
/*
* Copyright (c) 2010 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdio.h>
#include <stdlib.h>
#include <syslog.h>
#include <errno.h>
#include <check.h>
#include <qb/qbdefs.h>
#include <qb/qbrb.h>
#include <qb/qbipc_common.h>
-#include <qb/qbutil.h>
+#include <qb/qblog.h>
START_TEST(test_ring_buffer1)
{
char my_buf[512];
struct qb_ipc_request_header *hdr;
char *str;
qb_ringbuffer_t *rb;
int32_t i;
int32_t b;
ssize_t actual;
ssize_t avail;
rb = qb_rb_open("test1", 200, QB_RB_FLAG_CREATE, 0);
fail_if(rb == NULL);
for (b = 0; b < 3; b++) {
hdr = (struct qb_ipc_request_header *) my_buf;
str = my_buf + sizeof(struct qb_ipc_request_header);
for (i = 0; i < 900; i++) {
hdr->id = __LINE__ + i;
hdr->size =
sprintf(str, "ID: %d (%s + i(%d)) -- %s-%s!",
hdr->id, "actually the line number", i,
__func__, __FILE__) + 1;
hdr->size += sizeof(struct qb_ipc_request_header);
avail = qb_rb_space_free(rb);
actual = qb_rb_chunk_write(rb, hdr, hdr->size);
if (avail < (hdr->size + (2 * sizeof(uint32_t)))) {
ck_assert_int_eq(actual, -EAGAIN);
} else {
ck_assert_int_eq(actual, hdr->size);
}
}
memset(my_buf, 0, sizeof(my_buf));
hdr = (struct qb_ipc_request_header *) my_buf;
str = my_buf + sizeof(struct qb_ipc_request_header);
for (i = 0; i < 15; i++) {
actual = qb_rb_chunk_read(rb, hdr, 512, 0);
if (actual < 0) {
ck_assert_int_eq(0, qb_rb_chunks_used(rb));
break;
}
str[actual - sizeof(struct qb_ipc_request_header)] = '\0';
ck_assert_int_eq(actual, hdr->size);
}
}
qb_rb_close(rb);
}
END_TEST
/*
* nice size (int64)
*/
START_TEST(test_ring_buffer2)
{
qb_ringbuffer_t *t;
int32_t i;
int64_t v = 7891034;
int64_t *new_data;
ssize_t l;
t = qb_rb_open("test2", 200 * sizeof(int64_t), QB_RB_FLAG_CREATE, 0);
fail_if(t == NULL);
for (i = 0; i < 200; i++) {
l = qb_rb_chunk_write(t, &v, sizeof(v));
ck_assert_int_eq(l, sizeof(v));
}
for (i = 0; i < 100; i++) {
l = qb_rb_chunk_peek(t, (void **)&new_data, 0);
if (l < 0) {
/* no more to read */
break;
}
ck_assert_int_eq(l, sizeof(v));
fail_unless(v == *new_data);
qb_rb_chunk_reclaim(t);
}
for (i = 0; i < 100; i++) {
l = qb_rb_chunk_write(t, &v, sizeof(v));
ck_assert_int_eq(l, sizeof(v));
}
for (i = 0; i < 100; i++) {
l = qb_rb_chunk_peek(t, (void **)&new_data, 0);
if (l == 0) {
/* no more to read */
break;
}
ck_assert_int_eq(l, sizeof(v));
fail_unless(v == *new_data);
qb_rb_chunk_reclaim(t);
}
qb_rb_close(t);
}
END_TEST
/*
* odd size (10)
*/
START_TEST(test_ring_buffer3)
{
qb_ringbuffer_t *t;
int32_t i;
char v[] = "1234567891";
char out[32];
ssize_t l;
size_t len = strlen(v) + 1;
t = qb_rb_open("test3", 10, QB_RB_FLAG_CREATE | QB_RB_FLAG_OVERWRITE, 0);
fail_if(t == NULL);
for (i = 0; i < 9000; i++) {
l = qb_rb_chunk_write(t, v, len);
ck_assert_int_eq(l, len);
}
for (i = 0; i < 2000; i++) {
l = qb_rb_chunk_read(t, (void *)out, 32, 0);
if (l < 0) {
/* no more to read */
break;
}
ck_assert_int_eq(l, len);
ck_assert_str_eq(v, out);
}
qb_rb_close(t);
}
END_TEST
START_TEST(test_ring_buffer4)
{
qb_ringbuffer_t *t;
char data[] = "1234567891";
int32_t i;
char *new_data;
ssize_t l;
t = qb_rb_open("test4", 10, QB_RB_FLAG_CREATE | QB_RB_FLAG_OVERWRITE, 0);
fail_if(t == NULL);
for (i = 0; i < 2000; i++) {
l = qb_rb_chunk_write(t, data, strlen(data));
ck_assert_int_eq(l, strlen(data));
if (i == 0) {
data[0] = 'b';
}
}
for (i = 0; i < 2000; i++) {
l = qb_rb_chunk_peek(t, (void **)&new_data, 0);
if (l == 0) {
break;
}
ck_assert_int_eq(l, strlen(data));
qb_rb_chunk_reclaim(t);
}
qb_rb_close(t);
}
END_TEST
static Suite *rb_suite(void)
{
TCase *tc;
Suite *s = suite_create("ringbuffer");
tc = tcase_create("test01");
tcase_add_test(tc, test_ring_buffer1);
suite_add_tcase(s, tc);
tc = tcase_create("test02");
tcase_add_test(tc, test_ring_buffer2);
suite_add_tcase(s, tc);
tc = tcase_create("test03");
tcase_add_test(tc, test_ring_buffer3);
suite_add_tcase(s, tc);
tc = tcase_create("test04");
tcase_add_test(tc, test_ring_buffer4);
suite_add_tcase(s, tc);
return s;
}
-static void libqb_log_fn(const char *file_name,
- int32_t file_line, int32_t severity, const char *msg)
-{
- if (severity < LOG_INFO)
- printf("libqb: %s:%d %s\n", file_name, file_line, msg);
-}
-
int32_t main(void)
{
int32_t number_failed;
Suite *s = rb_suite();
SRunner *sr = srunner_create(s);
- qb_util_set_log_function(libqb_log_fn);
+ qb_log_init("check", LOG_USER, LOG_EMERG);
+ qb_log_ctl(QB_LOG_SYSLOG, QB_LOG_CONF_ENABLED, QB_FALSE);
+ qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD,
+ QB_LOG_FILTER_FILE, "*", LOG_INFO);
+ qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE);
srunner_run_all(sr, CK_VERBOSE);
number_failed = srunner_ntests_failed(sr);
srunner_free(sr);
return (number_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE;
}

File Metadata

Mime Type
text/x-diff
Expires
Mon, Feb 24, 9:14 AM (1 d, 1 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1464103
Default Alt Text
(61 KB)

Event Timeline