diff --git a/include/qb/qbloop.h b/include/qb/qbloop.h index db0c480..c9b1993 100644 --- a/include/qb/qbloop.h +++ b/include/qb/qbloop.h @@ -1,310 +1,321 @@ /* * Copyright (C) 2010 Red Hat, Inc. * * Author: Angus Salkeld * * This file is part of libqb. * * libqb is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation, either version 2.1 of the License, or * (at your option) any later version. * * libqb is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with libqb. If not, see . */ #ifndef QB_LOOP_H_DEFINED #define QB_LOOP_H_DEFINED /* *INDENT-OFF* */ #ifdef __cplusplus extern "C" { #endif /* *INDENT-ON* */ #include #include #include /* make POLLIN etc. readily available */ /** * @file qbloop.h * * Main loop manages timers, jobs and polling sockets. * * Only a weaker sense of priorities is implemented, alluding to distinct * set of pros and cons compared to the stronger, strict approach to them * as widely applied in this problem space (since the latter gives the * application more control as the effect of the former can still be * achieved with some reductions, whereas it is not straightforward the * other way around; cf. static priority task scheduling vs. relative * fine-tuning within a single priority domain with nice(2)): * * + implicit mitigation for deadlock-prone priority arrangements * * - less predictable (proportional probability based, we can talk * about an advisory effect of the priorities) responses to the arrival * of the high-ranked events (i.e. in the process of the picking the next * event to handle from the priority queue when at least two different * priorities are eligible at the moment) * * One practical application for this module of libqb is in combination with * IPC servers based on qbipcs.h published one (the #qb_ipcs_poll_handlers * structure maps fittingly to the control functions published here). * * @example tcpserver.c */ /** * Priorites for jobs, timers & poll */ enum qb_loop_priority { QB_LOOP_LOW = 0, QB_LOOP_MED = 1, QB_LOOP_HIGH = 2, }; /** * An opaque data type representing the main loop. */ typedef struct qb_loop qb_loop_t; typedef uint64_t qb_loop_timer_handle; typedef void *qb_loop_signal_handle; typedef int32_t (*qb_loop_poll_dispatch_fn) (int32_t fd, int32_t revents, void *data); typedef void (*qb_loop_job_dispatch_fn)(void *data); typedef void (*qb_loop_timer_dispatch_fn)(void *data); typedef int32_t (*qb_loop_signal_dispatch_fn)(int32_t rsignal, void *data); typedef void (*qb_loop_poll_low_fds_event_fn) (int32_t not_enough, int32_t fds_available); /** * Create a new main loop. * * @return loop instance. */ qb_loop_t * qb_loop_create(void); /** * */ void qb_loop_destroy(struct qb_loop * l); /** * Stop the main loop. * @param l pointer to the loop instance */ void qb_loop_stop(qb_loop_t *l); /** * Run the main loop. * * @param l pointer to the loop instance */ void qb_loop_run(qb_loop_t *l); /** * Add a job to the mainloop. * * This is run in the next cycle of the loop. * @note it is a one-shot job. * * @param l pointer to the loop instance * @param p the priority * @param data user data passed into the dispatch function * @param dispatch_fn callback function * @return status (0 == ok, -errno == failure) */ int32_t qb_loop_job_add(qb_loop_t *l, enum qb_loop_priority p, void *data, qb_loop_job_dispatch_fn dispatch_fn); /** * Delete a job from the mainloop. * * This will try to delete the job if it hasn't run yet. * * @note this will remove the first job that matches the * parameters (priority, data, dispatch_fn). * * @param l pointer to the loop instance * @param p the priority * @param data user data passed into the dispatch function * @param dispatch_fn callback function * @return status (0 == ok, -errno == failure) */ int32_t qb_loop_job_del(struct qb_loop *l, enum qb_loop_priority p, void *data, qb_loop_job_dispatch_fn dispatch_fn); /** * Add a timer to the mainloop. * @note it is a one-shot job. * * @param l pointer to the loop instance * @param p the priority * @param nsec_duration nano-secs in the future to run the dispatch. * @param data user data passed into the dispatch function * @param dispatch_fn callback function * @param timer_handle_out handle to delete the timer if needed. * @return status (0 == ok, -errno == failure) */ int32_t qb_loop_timer_add(qb_loop_t *l, enum qb_loop_priority p, uint64_t nsec_duration, void *data, qb_loop_timer_dispatch_fn dispatch_fn, qb_loop_timer_handle * timer_handle_out); /** * Delete a timer that is still outstanding. * * @param l pointer to the loop instance * @param th handle to delete the timer if needed. * @return status (0 == ok, -errno == failure) */ int32_t qb_loop_timer_del(qb_loop_t *l, qb_loop_timer_handle th); /** * Check to see if a timer that is still outstanding. * * @param l pointer to the loop instance * @param th handle to delete the timer if needed. * @retval QB_TRUE yes this timer is outstanding * @retval QB_FALSE this timer does not exist or has expired */ int32_t qb_loop_timer_is_running(qb_loop_t *l, qb_loop_timer_handle th); /** - * Get the time remaining before it expires. + * Get the expiration time of the timer, as set when the timer was created * * @note if the timer has already expired it will return 0 * * @param l pointer to the loop instance * @param th timer handle. - * @return nano seconds left + * @return nano seconds at which the timer will expire */ uint64_t qb_loop_timer_expire_time_get(struct qb_loop *l, qb_loop_timer_handle th); +/** + * Get the time remaining before the timer expires + * + * @note if the timer has already expired it will return 0 + * + * @param l pointer to the loop instance + * @param th timer handle. + * @return nano seconds remaining until the timer expires + */ +uint64_t qb_loop_timer_expire_time_remaining(struct qb_loop *l, qb_loop_timer_handle th); + /** * Set a callback to receive events on file descriptors * getting low. * @param l pointer to the loop instance * @param fn callback function. * @return status (0 == ok, -errno == failure) */ int32_t qb_loop_poll_low_fds_event_set(qb_loop_t *l, qb_loop_poll_low_fds_event_fn fn); /** * Add a poll job to the mainloop. * @note it is a re-occurring job. * * @param l pointer to the loop instance * @param p the priority * @param fd file descriptor. * @param events (POLLIN|POLLOUT) etc .... * @param data user data passed into the dispatch function * @param dispatch_fn callback function * @return status (0 == ok, -errno == failure) */ int32_t qb_loop_poll_add(qb_loop_t *l, enum qb_loop_priority p, int32_t fd, int32_t events, void *data, qb_loop_poll_dispatch_fn dispatch_fn); /** * Modify a poll job. * * @param l pointer to the loop instance * @param p the priority * @param fd file descriptor. * @param events (POLLIN|POLLOUT) etc .... * @param data user data passed into the dispatch function * @param dispatch_fn callback function * @return status (0 == ok, -errno == failure) */ int32_t qb_loop_poll_mod(qb_loop_t *l, enum qb_loop_priority p, int32_t fd, int32_t events, void *data, qb_loop_poll_dispatch_fn dispatch_fn); /** * Delete a poll job. * * @param l pointer to the loop instance * @param fd file descriptor. * @return status (0 == ok, -errno == failure) */ int32_t qb_loop_poll_del(qb_loop_t *l, int32_t fd); /** * Add a signal job. * * Get a callback on this signal (not in the context of the signal). * * @param l pointer to the loop instance * @param p the priority * @param sig (SIGHUP or SIGINT) etc .... * @param data user data passed into the dispatch function * @param dispatch_fn callback function * @param handle (out) a reference to the signal job * @return status (0 == ok, -errno == failure) */ int32_t qb_loop_signal_add(qb_loop_t *l, enum qb_loop_priority p, int32_t sig, void *data, qb_loop_signal_dispatch_fn dispatch_fn, qb_loop_signal_handle *handle); /** * Modify the signal job * * @param l pointer to the loop instance * @param p the priority * @param sig (SIGHUP or SIGINT) etc .... * @param data user data passed into the dispatch function * @param dispatch_fn callback function * @param handle (in) a reference to the signal job * @return status (0 == ok, -errno == failure) */ int32_t qb_loop_signal_mod(qb_loop_t *l, enum qb_loop_priority p, int32_t sig, void *data, qb_loop_signal_dispatch_fn dispatch_fn, qb_loop_signal_handle handle); /** * Delete the signal job. * * @param l pointer to the loop instance * @param handle (in) a reference to the signal job * @return status (0 == ok, -errno == failure) */ int32_t qb_loop_signal_del(qb_loop_t *l, qb_loop_signal_handle handle); /* *INDENT-OFF* */ #ifdef __cplusplus } #endif /* __cplusplus */ /* *INDENT-ON* */ #endif /* QB_LOOP_H_DEFINED */ diff --git a/lib/loop_timerlist.c b/lib/loop_timerlist.c index 4102ea3..e5e3ae8 100644 --- a/lib/loop_timerlist.c +++ b/lib/loop_timerlist.c @@ -1,293 +1,343 @@ /* * Copyright (C) 2010 Red Hat, Inc. * * Author: Angus Salkeld * * This file is part of libqb. * * libqb is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation, either version 2.1 of the License, or * (at your option) any later version. * * libqb is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with libqb. If not, see . */ #include "os_base.h" #include #include #include #include #include #include "loop_int.h" #include "util_int.h" #include "tlist.h" struct qb_loop_timer { struct qb_loop_item item; qb_loop_timer_dispatch_fn dispatch_fn; enum qb_loop_priority p; timer_handle timerlist_handle; enum qb_poll_entry_state state; int32_t check; uint32_t install_pos; }; struct qb_timer_source { struct qb_loop_source s; struct timerlist timerlist; qb_array_t *timers; size_t timer_entry_count; }; static void timer_dispatch(struct qb_loop_item *item, enum qb_loop_priority p) { struct qb_loop_timer *timer = (struct qb_loop_timer *)item; assert(timer->state == QB_POLL_ENTRY_JOBLIST); timer->check = 0; timer->dispatch_fn(timer->item.user_data); timer->state = QB_POLL_ENTRY_EMPTY; } static int32_t expired_timers; static void make_job_from_tmo(void *data) { struct qb_loop_timer *t = (struct qb_loop_timer *)data; struct qb_loop *l = t->item.source->l; assert(t->state == QB_POLL_ENTRY_ACTIVE); qb_loop_level_item_add(&l->level[t->p], &t->item); t->state = QB_POLL_ENTRY_JOBLIST; expired_timers++; } static int32_t expire_the_timers(struct qb_loop_source *s, int32_t ms_timeout) { struct qb_timer_source *ts = (struct qb_timer_source *)s; expired_timers = 0; timerlist_expire(&ts->timerlist); return expired_timers; } int32_t qb_loop_timer_msec_duration_to_expire(struct qb_loop_source * timer_source) { struct qb_timer_source *my_src = (struct qb_timer_source *)timer_source; uint64_t left = timerlist_msec_duration_to_expire(&my_src->timerlist); if (left != -1 && left > 0xFFFFFFFF) { left = 0xFFFFFFFE; } return left; } struct qb_loop_source * qb_loop_timer_create(struct qb_loop *l) { struct qb_timer_source *my_src = malloc(sizeof(struct qb_timer_source)); if (my_src == NULL) { return NULL; } my_src->s.l = l; my_src->s.dispatch_and_take_back = timer_dispatch; my_src->s.poll = expire_the_timers; timerlist_init(&my_src->timerlist); my_src->timers = qb_array_create_2(16, sizeof(struct qb_loop_timer), 16); my_src->timer_entry_count = 0; return (struct qb_loop_source *)my_src; } void qb_loop_timer_destroy(struct qb_loop *l) { struct qb_timer_source *my_src = (struct qb_timer_source *)l->timer_source; qb_array_free(my_src->timers); free(l->timer_source); } static int32_t _timer_from_handle_(struct qb_timer_source *s, qb_loop_timer_handle handle_in, struct qb_loop_timer **timer_pt) { int32_t rc; int32_t check; uint32_t install_pos; struct qb_loop_timer *timer; if (handle_in == 0) { return -EINVAL; } check = handle_in >> 32; install_pos = handle_in & UINT32_MAX; rc = qb_array_index(s->timers, install_pos, (void **)&timer); if (rc != 0) { return rc; } if (timer->check != check) { return -EINVAL; } *timer_pt = timer; return 0; } static int32_t _get_empty_array_position_(struct qb_timer_source *s) { int32_t install_pos; int32_t res = 0; struct qb_loop_timer *timer; for (install_pos = 0; install_pos < s->timer_entry_count; install_pos++) { assert(qb_array_index(s->timers, install_pos, (void **)&timer) == 0); if (timer->state == QB_POLL_ENTRY_EMPTY) { return install_pos; } } res = qb_array_grow(s->timers, s->timer_entry_count + 1); if (res != 0) { return res; } s->timer_entry_count++; install_pos = s->timer_entry_count - 1; return install_pos; } int32_t qb_loop_timer_add(struct qb_loop * lp, enum qb_loop_priority p, uint64_t nsec_duration, void *data, qb_loop_timer_dispatch_fn timer_fn, qb_loop_timer_handle * timer_handle_out) { struct qb_loop_timer *t; struct qb_timer_source *my_src; int32_t i; struct qb_loop *l = lp; if (l == NULL) { l = qb_loop_default_get(); } if (l == NULL || timer_fn == NULL) { return -EINVAL; } my_src = (struct qb_timer_source *)l->timer_source; i = _get_empty_array_position_(my_src); assert(qb_array_index(my_src->timers, i, (void **)&t) >= 0); t->state = QB_POLL_ENTRY_ACTIVE; t->install_pos = i; t->item.user_data = data; t->item.source = (struct qb_loop_source *)my_src; t->dispatch_fn = timer_fn; t->p = p; qb_list_init(&t->item.list); /* * Make sure just positive integers are used for the integrity(?) * checks within 2^32 address space, if we miss 200 times in a row * (just 0 is concerned per specification of random), the PRNG may be * broken -> the value is unspecified, subject of previous assignment. */ for (i = 0; i < 200; i++) { t->check = random(); if (t->check > 0) { break; /* covers also t->check == UINT32_MAX */ } } if (timer_handle_out) { *timer_handle_out = (((uint64_t) (t->check)) << 32) | t->install_pos; } return timerlist_add_duration(&my_src->timerlist, make_job_from_tmo, t, nsec_duration, &t->timerlist_handle); } int32_t qb_loop_timer_del(struct qb_loop * lp, qb_loop_timer_handle th) { struct qb_timer_source *s; struct qb_loop_timer *t; int32_t res; struct qb_loop *l = lp; if (l == NULL) { l = qb_loop_default_get(); } s = (struct qb_timer_source *)l->timer_source; res = _timer_from_handle_(s, th, &t); if (res != 0) { return res; } if (t->state == QB_POLL_ENTRY_DELETED) { qb_util_log(LOG_WARNING, "timer already deleted"); return 0; } if (t->state != QB_POLL_ENTRY_ACTIVE && t->state != QB_POLL_ENTRY_JOBLIST) { return -EINVAL; } if (t->state == QB_POLL_ENTRY_JOBLIST) { qb_loop_level_item_del(&l->level[t->p], &t->item); } if (t->timerlist_handle) { timerlist_del(&s->timerlist, t->timerlist_handle); } t->state = QB_POLL_ENTRY_EMPTY; return 0; } uint64_t qb_loop_timer_expire_time_get(struct qb_loop * lp, qb_loop_timer_handle th) { struct qb_timer_source *s; struct qb_loop_timer *t; int32_t res; struct qb_loop *l = lp; if (l == NULL) { l = qb_loop_default_get(); } s = (struct qb_timer_source *)l->timer_source; res = _timer_from_handle_(s, th, &t); if (res != 0) { return 0; } if (t->state != QB_POLL_ENTRY_ACTIVE) { return 0; } return timerlist_expire_time(&s->timerlist, t->timerlist_handle); } +uint64_t +qb_loop_timer_expire_time_remaining(struct qb_loop * lp, qb_loop_timer_handle th) +{ + + uint64_t current_ns; + /* NOTE: while it does not appear that absolute timers are used anywhere, + * we may as well respect this pattern in case that changes. + * Unfortunately, that means we do need to repeat timer fetch code from qb_loop_timer_expire_time_get + * rather than just a simple call to qb_loop_timer_expire_time_get and qb_util_nano_current_get. + */ + + struct qb_timer_source *s; + struct qb_loop_timer *t; + int32_t res; + struct qb_loop *l = lp; + + if (l == NULL) { + l = qb_loop_default_get(); + } + s = (struct qb_timer_source *)l->timer_source; + + res = _timer_from_handle_(s, th, &t); + if (res != 0) { + return 0; + } + + struct timerlist_timer *timer = (struct timerlist_timer *)t->timerlist_handle; + + + if (timer->is_absolute_timer) { + current_ns = qb_util_nano_from_epoch_get(); + } + else { + current_ns = qb_util_nano_current_get(); + } + uint64_t timer_ns = timerlist_expire_time(&s->timerlist, t->timerlist_handle); + /* since time estimation is racy by nature, I'll try to check the state late, + * and try to understand that no matter what, the timer might have expired in the mean time + */ + if (t->state != QB_POLL_ENTRY_ACTIVE) { + return 0; + } + if (timer_ns < current_ns) { + return 0; // respect the "expired" contract + } + return timer_ns - current_ns; + + +} + int32_t qb_loop_timer_is_running(qb_loop_t *l, qb_loop_timer_handle th) { return (qb_loop_timer_expire_time_get(l, th) > 0); } diff --git a/tests/check_loop.c b/tests/check_loop.c index ac9bbbf..81cc2ba 100644 --- a/tests/check_loop.c +++ b/tests/check_loop.c @@ -1,763 +1,782 @@ /* * Copyright (c) 2010 Red Hat, Inc. * * All rights reserved. * * Author: Angus Salkeld * * This file is part of libqb. * * libqb is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation, either version 2.1 of the License, or * (at your option) any later version. * * libqb is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with libqb. If not, see . */ #include "os_base.h" #include "check_common.h" #include #include #include #include static int32_t job_1_run_count = 0; static int32_t job_2_run_count = 0; static int32_t job_3_run_count = 0; static int32_t job_order_1 = 1; static int32_t job_order_2 = 2; static int32_t job_order_3 = 3; static int32_t job_order_4 = 4; static int32_t job_order_5 = 5; static int32_t job_order_6 = 6; static int32_t job_order_7 = 7; static int32_t job_order_8 = 8; static int32_t job_order_9 = 9; static int32_t job_order_10 = 10; static int32_t job_order_11 = 11; static int32_t job_order_12 = 12; static int32_t job_order_13 = 13; static void job_1(void *data) { job_1_run_count++; } static void job_order_check(void *data) { int32_t * order = (int32_t *)data; job_1_run_count++; ck_assert_int_eq(job_1_run_count, *order); if (job_1_run_count == 1) { qb_loop_job_add(NULL, QB_LOOP_MED, &job_order_10, job_order_check); qb_loop_job_add(NULL, QB_LOOP_MED, &job_order_11, job_order_check); qb_loop_job_add(NULL, QB_LOOP_MED, &job_order_12, job_order_check); qb_loop_job_add(NULL, QB_LOOP_MED, &job_order_13, job_order_check); } else if (job_1_run_count >= 13) { qb_loop_stop(NULL); } } static void job_stop(void *data) { qb_loop_t *l = (qb_loop_t *)data; job_3_run_count++; qb_loop_stop(l); } static void job_2(void *data) { int32_t res; qb_loop_t *l = (qb_loop_t *)data; job_2_run_count++; res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_stop); ck_assert_int_eq(res, 0); } static void job_1_r(void *data) { int32_t res; qb_loop_t *l = (qb_loop_t *)data; job_1_run_count++; res = qb_loop_job_add(l, QB_LOOP_MED, data, job_2); ck_assert_int_eq(res, 0); } static void job_1_add_nuts(void *data) { int32_t res; qb_loop_t *l = (qb_loop_t *)data; job_1_run_count++; res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_1); ck_assert_int_eq(res, 0); res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_1); ck_assert_int_eq(res, 0); res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_1); ck_assert_int_eq(res, 0); res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_1); ck_assert_int_eq(res, 0); res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_1); ck_assert_int_eq(res, 0); res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_1); ck_assert_int_eq(res, 0); res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_1); ck_assert_int_eq(res, 0); res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_1); ck_assert_int_eq(res, 0); res = qb_loop_job_add(l, QB_LOOP_MED, data, job_1); ck_assert_int_eq(res, 0); res = qb_loop_job_add(l, QB_LOOP_MED, data, job_1); ck_assert_int_eq(res, 0); res = qb_loop_job_add(l, QB_LOOP_MED, data, job_1); ck_assert_int_eq(res, 0); res = qb_loop_job_add(l, QB_LOOP_MED, data, job_1); ck_assert_int_eq(res, 0); res = qb_loop_job_add(l, QB_LOOP_LOW, data, job_1); ck_assert_int_eq(res, 0); res = qb_loop_job_add(l, QB_LOOP_LOW, data, job_1); ck_assert_int_eq(res, 0); if (job_1_run_count < 500) { res = qb_loop_job_add(l, QB_LOOP_LOW, data, job_1_add_nuts); ck_assert_int_eq(res, 0); } else { res = qb_loop_job_add(l, QB_LOOP_LOW, data, job_stop); ck_assert_int_eq(res, 0); } ck_assert_int_eq(res, 0); } START_TEST(test_loop_job_input) { int32_t res; qb_loop_t *l; res = qb_loop_job_add(NULL, QB_LOOP_LOW, NULL, job_2); ck_assert_int_eq(res, -EINVAL); l = qb_loop_create(); fail_if(l == NULL); res = qb_loop_job_add(NULL, QB_LOOP_LOW, NULL, job_2); ck_assert_int_eq(res, 0); res = qb_loop_job_add(l, 89, NULL, job_2); ck_assert_int_eq(res, -EINVAL); res = qb_loop_job_add(l, QB_LOOP_LOW, NULL, NULL); ck_assert_int_eq(res, -EINVAL); qb_loop_destroy(l); } END_TEST START_TEST(test_loop_job_1) { int32_t res; qb_loop_t *l = qb_loop_create(); fail_if(l == NULL); res = qb_loop_job_add(l, QB_LOOP_LOW, NULL, job_1); ck_assert_int_eq(res, 0); res = qb_loop_job_add(l, QB_LOOP_LOW, l, job_stop); ck_assert_int_eq(res, 0); qb_loop_run(l); ck_assert_int_eq(job_1_run_count, 1); qb_loop_destroy(l); } END_TEST START_TEST(test_loop_job_4) { int32_t res; qb_loop_t *l = qb_loop_create(); fail_if(l == NULL); res = qb_loop_job_add(l, QB_LOOP_LOW, l, job_1_r); ck_assert_int_eq(res, 0); qb_loop_run(l); ck_assert_int_eq(job_1_run_count, 1); ck_assert_int_eq(job_2_run_count, 1); ck_assert_int_eq(job_3_run_count, 1); qb_loop_destroy(l); } END_TEST START_TEST(test_loop_job_nuts) { int32_t res; qb_loop_t *l = qb_loop_create(); fail_if(l == NULL); res = qb_loop_job_add(l, QB_LOOP_LOW, l, job_1_add_nuts); ck_assert_int_eq(res, 0); qb_loop_run(l); fail_if(job_1_run_count < 500); qb_loop_destroy(l); } END_TEST START_TEST(test_loop_job_order) { int32_t res; qb_loop_t *l = qb_loop_create(); fail_if(l == NULL); job_1_run_count = 0; res = qb_loop_job_add(l, QB_LOOP_MED, &job_order_1, job_order_check); ck_assert_int_eq(res, 0); res = qb_loop_job_add(l, QB_LOOP_MED, &job_order_2, job_order_check); ck_assert_int_eq(res, 0); res = qb_loop_job_add(l, QB_LOOP_MED, &job_order_3, job_order_check); ck_assert_int_eq(res, 0); res = qb_loop_job_add(l, QB_LOOP_MED, &job_order_4, job_order_check); ck_assert_int_eq(res, 0); res = qb_loop_job_add(l, QB_LOOP_MED, &job_order_5, job_order_check); ck_assert_int_eq(res, 0); res = qb_loop_job_add(l, QB_LOOP_MED, &job_order_6, job_order_check); ck_assert_int_eq(res, 0); res = qb_loop_job_add(l, QB_LOOP_MED, &job_order_7, job_order_check); ck_assert_int_eq(res, 0); res = qb_loop_job_add(l, QB_LOOP_MED, &job_order_8, job_order_check); ck_assert_int_eq(res, 0); res = qb_loop_job_add(l, QB_LOOP_MED, &job_order_9, job_order_check); ck_assert_int_eq(res, 0); qb_loop_run(l); qb_loop_destroy(l); } END_TEST static qb_util_stopwatch_t *rl_sw; #define RATE_LIMIT_RUNTIME_SEC 3 static void job_add_self(void *data) { int32_t res; uint64_t elapsed1; qb_loop_t *l = (qb_loop_t *)data; job_1_run_count++; qb_util_stopwatch_stop(rl_sw); elapsed1 = qb_util_stopwatch_us_elapsed_get(rl_sw); if (elapsed1 > (RATE_LIMIT_RUNTIME_SEC * QB_TIME_US_IN_SEC)) { /* run for 3 seconds */ qb_loop_stop(l); return; } res = qb_loop_job_add(l, QB_LOOP_MED, data, job_add_self); ck_assert_int_eq(res, 0); } START_TEST(test_job_rate_limit) { int32_t res; qb_loop_t *l = qb_loop_create(); fail_if(l == NULL); rl_sw = qb_util_stopwatch_create(); fail_if(rl_sw == NULL); qb_util_stopwatch_start(rl_sw); res = qb_loop_job_add(l, QB_LOOP_MED, l, job_add_self); ck_assert_int_eq(res, 0); qb_loop_run(l); /* * the test is to confirm that a single job does not run away * and cause cpu spin. We are going to say that a spin is more than * one job per 50ms if there is only one job pending in the loop. */ _ck_assert_int(job_1_run_count, <, (RATE_LIMIT_RUNTIME_SEC * (QB_TIME_MS_IN_SEC/50)) + 10); qb_loop_destroy(l); qb_util_stopwatch_free(rl_sw); } END_TEST static void job_stop_and_del_1(void *data) { int32_t res; qb_loop_t *l = (qb_loop_t *)data; job_3_run_count++; res = qb_loop_job_del(l, QB_LOOP_MED, l, job_1); ck_assert_int_eq(res, 0); qb_loop_stop(l); } START_TEST(test_job_add_del) { int32_t res; qb_loop_t *l = qb_loop_create(); fail_if(l == NULL); res = qb_loop_job_add(l, QB_LOOP_MED, l, job_1); ck_assert_int_eq(res, 0); res = qb_loop_job_del(l, QB_LOOP_MED, l, job_1); ck_assert_int_eq(res, 0); job_1_run_count = 0; job_3_run_count = 0; res = qb_loop_job_add(l, QB_LOOP_MED, l, job_1); ck_assert_int_eq(res, 0); res = qb_loop_job_add(l, QB_LOOP_HIGH, l, job_stop_and_del_1); ck_assert_int_eq(res, 0); qb_loop_run(l); ck_assert_int_eq(job_1_run_count, 0); ck_assert_int_eq(job_3_run_count, 1); qb_loop_destroy(l); } END_TEST static Suite *loop_job_suite(void) { TCase *tc; Suite *s = suite_create("loop_job"); add_tcase(s, tc, test_loop_job_input); add_tcase(s, tc, test_loop_job_1); add_tcase(s, tc, test_loop_job_4); add_tcase(s, tc, test_loop_job_nuts, 5); add_tcase(s, tc, test_job_rate_limit, 5); add_tcase(s, tc, test_job_add_del); add_tcase(s, tc, test_loop_job_order); return s; } /* * ----------------------------------------------------------------------- * Timers */ static qb_loop_timer_handle test_th; +static qb_loop_timer_handle test_th2; + +static void check_time_left(void *data) +{ + qb_loop_t *l = (qb_loop_t *)data; + + /* NOTE: We are checking the 'stop_loop' timer here, not our own */ + uint64_t abs_time = qb_loop_timer_expire_time_get(l, test_th); + uint64_t rel_time = qb_loop_timer_expire_time_remaining(l, test_th); + + ck_assert(abs_time > 0ULL); + ck_assert(rel_time > 0ULL); + ck_assert(abs_time > rel_time); + ck_assert(rel_time <= 60*QB_TIME_NS_IN_MSEC); +} + START_TEST(test_loop_timer_input) { int32_t res; qb_loop_t *l; res = qb_loop_timer_add(NULL, QB_LOOP_LOW, 5*QB_TIME_NS_IN_MSEC, NULL, job_2, &test_th); ck_assert_int_eq(res, -EINVAL); l = qb_loop_create(); fail_if(l == NULL); res = qb_loop_timer_add(NULL, QB_LOOP_LOW, 5*QB_TIME_NS_IN_MSEC, NULL, job_2, &test_th); ck_assert_int_eq(res, 0); res = qb_loop_timer_add(l, QB_LOOP_LOW, 5*QB_TIME_NS_IN_MSEC, l, NULL, &test_th); ck_assert_int_eq(res, -EINVAL); qb_loop_destroy(l); } END_TEST static void one_shot_tmo(void * data) { static int32_t been_here = QB_FALSE; ck_assert_int_eq(been_here, QB_FALSE); been_here = QB_TRUE; } static qb_loop_timer_handle reset_th; static int32_t reset_timer_step = 0; static void reset_one_shot_tmo(void*data) { int32_t res; qb_loop_t *l = data; if (reset_timer_step == 0) { res = qb_loop_timer_del(l, reset_th); ck_assert_int_eq(res, -EINVAL); res = qb_loop_timer_is_running(l, reset_th); ck_assert_int_eq(res, QB_FALSE); res = qb_loop_timer_add(l, QB_LOOP_LOW, 8*QB_TIME_NS_IN_MSEC, l, reset_one_shot_tmo, &reset_th); ck_assert_int_eq(res, 0); } reset_timer_step++; } START_TEST(test_loop_timer_basic) { int32_t res; qb_loop_t *l = qb_loop_create(); fail_if(l == NULL); res = qb_loop_timer_add(l, QB_LOOP_LOW, 5*QB_TIME_NS_IN_MSEC, l, one_shot_tmo, &test_th); ck_assert_int_eq(res, 0); res = qb_loop_timer_is_running(l, test_th); ck_assert_int_eq(res, QB_TRUE); res = qb_loop_timer_add(l, QB_LOOP_LOW, 7*QB_TIME_NS_IN_MSEC, l, reset_one_shot_tmo, &reset_th); ck_assert_int_eq(res, 0); + res = qb_loop_timer_add(l, QB_LOOP_HIGH, 20*QB_TIME_NS_IN_MSEC, l, check_time_left, &test_th2); + ck_assert_int_eq(res, 0); + res = qb_loop_timer_add(l, QB_LOOP_LOW, 60*QB_TIME_NS_IN_MSEC, l, job_stop, &test_th); ck_assert_int_eq(res, 0); qb_loop_run(l); ck_assert_int_eq(reset_timer_step, 2); qb_loop_destroy(l); } END_TEST struct qb_stop_watch { uint64_t start; uint64_t end; qb_loop_t *l; uint64_t ns_timer; int64_t total; int32_t count; int32_t killer; qb_loop_timer_handle th; }; static void stop_watch_tmo(void*data) { struct qb_stop_watch *sw = (struct qb_stop_watch *)data; float per; int64_t diff; sw->end = qb_util_nano_current_get(); diff = sw->end - sw->start; if (diff < sw->ns_timer) { printf("timer expired early! by %"PRIi64"\n", (int64_t)(sw->ns_timer - diff)); } ck_assert(diff >= sw->ns_timer); sw->total += diff; sw->total -= sw->ns_timer; sw->start = sw->end; sw->count++; if (sw->count < 50) { qb_loop_timer_add(sw->l, QB_LOOP_LOW, sw->ns_timer, data, stop_watch_tmo, &sw->th); } else { per = ((sw->total * 100) / sw->count) / (float)sw->ns_timer; printf("average error for %"PRIu64" ns timer is %"PRIi64" (ns) (%f)\n", sw->ns_timer, sw->total/sw->count, per); if (sw->killer) { qb_loop_stop(sw->l); } } } static void start_timer(qb_loop_t *l, struct qb_stop_watch *sw, uint64_t timeout, int32_t killer) { int32_t res; sw->l = l; sw->count = 0; sw->total = 0; sw->killer = killer; sw->ns_timer = timeout; sw->start = qb_util_nano_current_get(); res = qb_loop_timer_add(sw->l, QB_LOOP_LOW, sw->ns_timer, sw, stop_watch_tmo, &sw->th); ck_assert_int_eq(res, 0); } START_TEST(test_loop_timer_precision) { int32_t i; uint64_t tmo; struct qb_stop_watch sw[11]; qb_loop_t *l = qb_loop_create(); fail_if(l == NULL); for (i = 0; i < 10; i++) { tmo = ((1 + i * 9) * QB_TIME_NS_IN_MSEC) + 500000; start_timer(l, &sw[i], tmo, QB_FALSE); } start_timer(l, &sw[i], 100 * QB_TIME_NS_IN_MSEC, QB_TRUE); qb_loop_run(l); qb_loop_destroy(l); } END_TEST static int expire_leak_counter = 0; #define EXPIRE_NUM_RUNS 10 static int expire_leak_runs = 0; static void empty_func_tmo(void*data) { expire_leak_counter++; } static void stop_func_tmo(void*data) { qb_loop_t *l = (qb_loop_t *)data; qb_log(LOG_DEBUG, "expire_leak_counter:%d", expire_leak_counter); qb_loop_stop(l); } static void next_func_tmo(void*data) { qb_loop_t *l = (qb_loop_t *)data; int32_t i; uint64_t tmo; uint64_t max_tmo = 0; qb_loop_timer_handle th; qb_log(LOG_DEBUG, "expire_leak_counter:%d", expire_leak_counter); for (i = 0; i < 300; i++) { tmo = ((1 + i) * QB_TIME_NS_IN_MSEC) + 500000; qb_loop_timer_add(l, QB_LOOP_LOW, tmo, NULL, empty_func_tmo, &th); qb_loop_timer_add(l, QB_LOOP_MED, tmo, NULL, empty_func_tmo, &th); qb_loop_timer_add(l, QB_LOOP_HIGH, tmo, NULL, empty_func_tmo, &th); max_tmo = QB_MAX(max_tmo, tmo); } expire_leak_runs++; if (expire_leak_runs == EXPIRE_NUM_RUNS) { qb_loop_timer_add(l, QB_LOOP_LOW, max_tmo, l, stop_func_tmo, &th); } else { qb_loop_timer_add(l, QB_LOOP_LOW, max_tmo, l, next_func_tmo, &th); } } /* * make sure that file descriptors don't get leaked with no qb_loop_timer_del() */ START_TEST(test_loop_timer_expire_leak) { int32_t i; uint64_t tmo; uint64_t max_tmo = 0; qb_loop_timer_handle th; qb_loop_t *l = qb_loop_create(); fail_if(l == NULL); expire_leak_counter = 0; for (i = 0; i < 300; i++) { tmo = ((1 + i) * QB_TIME_NS_IN_MSEC) + 500000; qb_loop_timer_add(l, QB_LOOP_LOW, tmo, NULL, empty_func_tmo, &th); qb_loop_timer_add(l, QB_LOOP_MED, tmo, NULL, empty_func_tmo, &th); qb_loop_timer_add(l, QB_LOOP_HIGH, tmo, NULL, empty_func_tmo, &th); max_tmo = QB_MAX(max_tmo, tmo); } qb_loop_timer_add(l, QB_LOOP_LOW, max_tmo, l, next_func_tmo, &th); expire_leak_runs = 1; qb_loop_run(l); ck_assert_int_eq(expire_leak_counter, 300*3* EXPIRE_NUM_RUNS); qb_loop_destroy(l); } END_TEST static int received_signum = 0; static int received_sigs = 0; static int32_t sig_handler(int32_t rsignal, void *data) { qb_loop_t *l = (qb_loop_t *)data; qb_log(LOG_DEBUG, "caught signal %d", rsignal); received_signum = rsignal; received_sigs++; qb_loop_job_add(l, QB_LOOP_LOW, NULL, job_stop); return 0; } START_TEST(test_loop_sig_handling) { qb_loop_signal_handle handle; qb_loop_t *l = qb_loop_create(); fail_if(l == NULL); qb_loop_signal_add(l, QB_LOOP_HIGH, SIGINT, l, sig_handler, &handle); qb_loop_signal_add(l, QB_LOOP_HIGH, SIGTERM, l, sig_handler, &handle); qb_loop_signal_add(l, QB_LOOP_HIGH, SIGQUIT, l, sig_handler, &handle); kill(getpid(), SIGINT); qb_loop_run(l); ck_assert_int_eq(received_signum, SIGINT); kill(getpid(), SIGQUIT); qb_loop_run(l); ck_assert_int_eq(received_signum, SIGQUIT); qb_loop_destroy(l); } END_TEST /* Globals for this test only */ static int our_signal_called = 0; static qb_loop_t *this_l; static void handle_nonqb_signal(int num) { our_signal_called = 1; qb_loop_job_add(this_l, QB_LOOP_LOW, NULL, job_stop); } START_TEST(test_loop_dont_override_other_signals) { qb_loop_signal_handle handle; this_l = qb_loop_create(); fail_if(this_l == NULL); signal(SIGUSR1, handle_nonqb_signal); qb_loop_signal_add(this_l, QB_LOOP_HIGH, SIGINT, this_l, sig_handler, &handle); kill(getpid(), SIGUSR1); qb_loop_run(this_l); ck_assert_int_eq(our_signal_called, 1); qb_loop_destroy(this_l); } END_TEST START_TEST(test_loop_sig_only_get_one) { int res; qb_loop_signal_handle handle; qb_loop_t *l = qb_loop_create(); fail_if(l == NULL); /* make sure we only get one call to the handler * don't assume we are going to exit the loop. */ received_sigs = 0; qb_loop_signal_add(l, QB_LOOP_LOW, SIGINT, l, sig_handler, &handle); res = qb_loop_job_add(l, QB_LOOP_MED, NULL, job_1); ck_assert_int_eq(res, 0); res = qb_loop_job_add(l, QB_LOOP_HIGH, NULL, job_1); ck_assert_int_eq(res, 0); res = qb_loop_job_add(l, QB_LOOP_MED, NULL, job_1); ck_assert_int_eq(res, 0); res = qb_loop_job_add(l, QB_LOOP_HIGH, NULL, job_1); ck_assert_int_eq(res, 0); res = qb_loop_job_add(l, QB_LOOP_HIGH, NULL, job_1); ck_assert_int_eq(res, 0); res = qb_loop_job_add(l, QB_LOOP_MED, NULL, job_1); ck_assert_int_eq(res, 0); kill(getpid(), SIGINT); qb_loop_run(l); ck_assert_int_eq(received_signum, SIGINT); ck_assert_int_eq(received_sigs, 1); qb_loop_destroy(l); } END_TEST static qb_loop_signal_handle sig_hdl; static void job_rm_sig_handler(void *data) { int res; qb_loop_t *l = (qb_loop_t *)data; res = qb_loop_signal_del(l, sig_hdl); ck_assert_int_eq(res, 0); res = qb_loop_job_add(l, QB_LOOP_LOW, NULL, job_stop); ck_assert_int_eq(res, 0); } START_TEST(test_loop_sig_delete) { int res; qb_loop_t *l = qb_loop_create(); fail_if(l == NULL); /* make sure we can remove a signal job from the job queue. */ received_sigs = 0; received_signum = 0; res = qb_loop_signal_add(l, QB_LOOP_MED, SIGINT, l, sig_handler, &sig_hdl); ck_assert_int_eq(res, 0); res = qb_loop_job_add(l, QB_LOOP_HIGH, NULL, job_rm_sig_handler); ck_assert_int_eq(res, 0); kill(getpid(), SIGINT); qb_loop_run(l); ck_assert_int_eq(received_sigs, 0); ck_assert_int_eq(received_signum, 0); qb_loop_destroy(l); } END_TEST static Suite * loop_timer_suite(void) { TCase *tc; Suite *s = suite_create("loop_timers"); add_tcase(s, tc, test_loop_timer_input); add_tcase(s, tc, test_loop_timer_basic, 30); add_tcase(s, tc, test_loop_timer_precision, 30); add_tcase(s, tc, test_loop_timer_expire_leak, 30); return s; } static Suite * loop_signal_suite(void) { TCase *tc; Suite *s = suite_create("loop_signal_suite"); add_tcase(s, tc, test_loop_sig_handling, 10); add_tcase(s, tc, test_loop_sig_only_get_one); add_tcase(s, tc, test_loop_sig_delete); add_tcase(s, tc, test_loop_dont_override_other_signals); return s; } int32_t main(void) { int32_t number_failed; SRunner *sr = srunner_create(loop_job_suite()); srunner_add_suite (sr, loop_timer_suite()); srunner_add_suite (sr, loop_signal_suite()); qb_log_init("check", LOG_USER, LOG_EMERG); atexit(qb_log_fini); qb_log_ctl(QB_LOG_SYSLOG, QB_LOG_CONF_ENABLED, QB_FALSE); qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD, QB_LOG_FILTER_FILE, "*", LOG_INFO); qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE); srunner_run_all(sr, CK_VERBOSE); number_failed = srunner_ntests_failed(sr); srunner_free(sr); return (number_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE; }