diff --git a/lib/log_dcs.c b/lib/log_dcs.c index 0249e6d..e8e3b6f 100644 --- a/lib/log_dcs.c +++ b/lib/log_dcs.c @@ -1,211 +1,220 @@ /* * Copyright (C) 2011 Red Hat, Inc. * * All rights reserved. * * Author: Angus Salkeld * * libqb is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation, either version 2.1 of the License, or * (at your option) any later version. * * libqb is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with libqb. If not, see . */ #include "os_base.h" #include #ifdef HAVE_LINK_H #include #endif #include #include #include #include #include #include "log_int.h" static qb_array_t *lookup_arr = NULL; static qb_array_t *callsite_arr = NULL; static uint32_t callsite_arr_next = 0; static uint32_t callsite_elems_per_bin = 0; static qb_thread_lock_t *arr_next_lock = NULL; struct callsite_list { struct qb_log_callsite *cs; struct callsite_list *next; }; static void _log_register_callsites(qb_array_t * a, uint32_t bin) { struct qb_log_callsite *start; struct qb_log_callsite *stop; int32_t rc = qb_array_index(callsite_arr, bin * callsite_elems_per_bin, (void **)&start); if (rc == 0) { stop = &start[callsite_elems_per_bin]; rc = qb_log_callsites_register(start, stop); assert(rc == 0); } } static struct qb_log_callsite * _log_dcs_new_cs(const char *function, const char *filename, const char *format, uint8_t priority, uint32_t lineno, uint32_t tags) { struct qb_log_callsite *cs; int32_t rc = qb_array_index(callsite_arr, callsite_arr_next++, (void **)&cs); assert(rc == 0); assert(cs != NULL); cs->function = strdup(function); cs->filename = strdup(filename); cs->format = strdup(format); cs->priority = priority; cs->lineno = lineno; cs->tags = tags; return cs; } struct qb_log_callsite * qb_log_dcs_get(int32_t * newly_created, const char *function, const char *filename, const char *format, uint8_t priority, uint32_t lineno, uint32_t tags) { int32_t rc; struct qb_log_callsite *cs = NULL; struct callsite_list *csl_head; struct callsite_list *csl_last = NULL; struct callsite_list *csl; const char *safe_filename = filename; const char *safe_function = function; const char *safe_format = format; if (filename == NULL) { safe_filename = ""; } if (function == NULL) { safe_function = ""; } if (format == NULL) { safe_format = ""; } + /* - * try the fastest access first (no locking needed) + * try the fastest access first. */ rc = qb_array_index(lookup_arr, lineno, (void **)&csl_head); assert(rc == 0); + + /* + * Still need to lock here as we are reading from cs->* members + * and they might not be completely filled in yet by another thread + * that's running in _log_dcs_new_cs() below + */ + (void)qb_thread_lock(arr_next_lock); if (csl_head->cs && priority == csl_head->cs->priority && strcmp(safe_filename, csl_head->cs->filename) == 0 && strcmp(safe_format, csl_head->cs->format) == 0) { + (void)qb_thread_unlock(arr_next_lock); return csl_head->cs; } + /* - * so we will either have to create it or go through a list, so lock it. + * so we will either have to create it or go through a list */ - (void)qb_thread_lock(arr_next_lock); if (csl_head->cs == NULL) { csl_head->cs = _log_dcs_new_cs(safe_function, safe_filename, safe_format, priority, lineno, tags); cs = csl_head->cs; csl_head->next = NULL; *newly_created = QB_TRUE; } else { for (csl = csl_head; csl; csl = csl->next) { assert(csl->cs->lineno == lineno); if (priority == csl->cs->priority && strcmp(safe_format, csl->cs->format) == 0 && strcmp(safe_filename, csl->cs->filename) == 0) { cs = csl->cs; break; } csl_last = csl; } if (cs == NULL) { csl = calloc(1, sizeof(struct callsite_list)); if (csl == NULL) { goto cleanup; } csl->cs = _log_dcs_new_cs(safe_function, safe_filename, safe_format, priority, lineno, tags); csl->next = NULL; csl_last->next = csl; cs = csl->cs; *newly_created = QB_TRUE; } } cleanup: (void)qb_thread_unlock(arr_next_lock); return cs; } void qb_log_dcs_init(void) { int32_t rc; lookup_arr = qb_array_create_2(16, sizeof(struct callsite_list), 1); callsite_arr = qb_array_create_2(16, sizeof(struct qb_log_callsite), 1); arr_next_lock = qb_thread_lock_create(QB_THREAD_LOCK_SHORT); callsite_elems_per_bin = qb_array_elems_per_bin_get(callsite_arr); rc = qb_array_new_bin_cb_set(callsite_arr, _log_register_callsites); assert(rc == 0); } void qb_log_dcs_fini(void) { struct callsite_list *csl_head; struct callsite_list *csl_next; struct callsite_list *csl; int32_t i; int32_t rc; struct qb_log_callsite *cs = NULL; int32_t cnt = qb_array_num_bins_get(lookup_arr); cnt *= qb_array_elems_per_bin_get(lookup_arr); for (i = 0; i < cnt; i++) { rc = qb_array_index(lookup_arr, i, (void **)&csl_head); if (rc != 0 || csl_head->next == NULL) { continue; } for (csl = csl_head->next; csl; csl = csl_next) { csl_next = csl->next; free(csl); } } for (i = 0; i < callsite_arr_next; i++) { rc = qb_array_index(callsite_arr, i, (void **)&cs); if (rc == 0 && cs){ free((char*)cs->function); free((char*)cs->filename); free((char*)cs->format); } } qb_array_free(lookup_arr); qb_array_free(callsite_arr); (void)qb_thread_lock_destroy(arr_next_lock); } diff --git a/lib/log_thread.c b/lib/log_thread.c index 27a24b6..40b8239 100644 --- a/lib/log_thread.c +++ b/lib/log_thread.c @@ -1,306 +1,308 @@ /* * Copyright (C) 2011 Red Hat, Inc. * * All rights reserved. * * Author: Angus Salkeld * * libqb is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation, either version 2.1 of the License, or * (at your option) any later version. * * libqb is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with libqb. If not, see . */ #include "os_base.h" #include #include #include #include #include #include "log_int.h" static int wthread_active = QB_FALSE; static int wthread_should_exit = QB_FALSE; static qb_thread_lock_t *logt_wthread_lock = NULL; static QB_LIST_DECLARE(logt_print_finished_records); static int logt_memory_used = 0; static int logt_dropped_messages = 0; static sem_t logt_thread_start; static sem_t logt_print_finished; static int logt_sched_param_queued = QB_FALSE; static int logt_sched_policy; #if defined(HAVE_PTHREAD_SETSCHEDPARAM) static struct sched_param logt_sched_param; #endif /* HAVE_PTHREAD_SETSCHEDPARAM */ static pthread_t logt_thread_id = 0; static void *qb_logt_worker_thread(void *data) __attribute__ ((noreturn)); static void * qb_logt_worker_thread(void *data) { struct qb_log_record *rec; int dropped = 0; int res; /* Signal qb_log_thread_start that the initialization may continue */ sem_post(&logt_thread_start); for (;;) { retry_sem_wait: res = sem_wait(&logt_print_finished); if (res == -1 && errno == EINTR) { goto retry_sem_wait; } else if (res == -1) { /* This case shouldn't happen */ pthread_exit(NULL); } (void)qb_thread_lock(logt_wthread_lock); if (wthread_should_exit) { int value = -1; (void)sem_getvalue(&logt_print_finished, &value); if (value == 0) { (void)qb_thread_unlock(logt_wthread_lock); pthread_exit(NULL); } } rec = qb_list_first_entry(&logt_print_finished_records, struct qb_log_record, list); qb_list_del(&rec->list); logt_memory_used = logt_memory_used - strlen(rec->buffer) - sizeof(struct qb_log_record) - 1; dropped = logt_dropped_messages; logt_dropped_messages = 0; if (dropped) { printf("%d messages lost\n", dropped); } qb_log_thread_log_write(rec->cs, &rec->timestamp, rec->buffer); (void)qb_thread_unlock(logt_wthread_lock); free(rec->buffer); free(rec); } } int32_t qb_log_thread_priority_set(int32_t policy, int32_t priority) { int res = 0; #if defined(HAVE_PTHREAD_SETSCHEDPARAM) logt_sched_policy = policy; if (policy == SCHED_OTHER #ifdef SCHED_IDLE || policy == SCHED_IDLE #endif #if defined(SCHED_BATCH) && !defined(QB_DARWIN) || policy == SCHED_BATCH #endif ) { logt_sched_param.sched_priority = 0; } else { logt_sched_param.sched_priority = priority; } if (wthread_active == QB_FALSE) { logt_sched_param_queued = QB_TRUE; } else { res = pthread_setschedparam(logt_thread_id, policy, &logt_sched_param); if (res != 0) { res = -res; } } #endif return res; } int32_t qb_log_thread_start(void) { int res; qb_thread_lock_t *wthread_lock; if (wthread_active) { return 0; } wthread_active = QB_TRUE; sem_init(&logt_thread_start, 0, 0); sem_init(&logt_print_finished, 0, 0); errno = 0; logt_wthread_lock = qb_thread_lock_create(QB_THREAD_LOCK_SHORT); if (logt_wthread_lock == NULL) { return errno ? -errno : -1; } res = pthread_create(&logt_thread_id, NULL, qb_logt_worker_thread, NULL); if (res != 0) { wthread_active = QB_FALSE; (void)qb_thread_lock_destroy(logt_wthread_lock); return -res; } sem_wait(&logt_thread_start); if (logt_sched_param_queued) { res = qb_log_thread_priority_set(logt_sched_policy, #if defined(HAVE_PTHREAD_SETSCHEDPARAM) logt_sched_param.sched_priority); #else 0); #endif if (res != 0) { goto cleanup_pthread; } logt_sched_param_queued = QB_FALSE; } return 0; cleanup_pthread: wthread_should_exit = QB_TRUE; sem_post(&logt_print_finished); pthread_join(logt_thread_id, NULL); wthread_active = QB_FALSE; wthread_lock = logt_wthread_lock; logt_wthread_lock = NULL; (void)qb_thread_lock_destroy(wthread_lock); sem_destroy(&logt_print_finished); sem_destroy(&logt_thread_start); return res; } void qb_log_thread_pause(struct qb_log_target *t) { if (t->threaded) { (void)qb_thread_lock(logt_wthread_lock); } } void qb_log_thread_resume(struct qb_log_target *t) { if (t->threaded) { (void)qb_thread_unlock(logt_wthread_lock); } } void qb_log_thread_log_post(struct qb_log_callsite *cs, struct timespec *timestamp, const char *buffer) { struct qb_log_record *rec; size_t buf_size; size_t total_size; rec = malloc(sizeof(struct qb_log_record)); if (rec == NULL) { return; } buf_size = strlen(buffer) + 1; total_size = sizeof(struct qb_log_record) + buf_size; rec->cs = cs; rec->buffer = malloc(buf_size); if (rec->buffer == NULL) { goto free_record; } memcpy(rec->buffer, buffer, buf_size); memcpy(&rec->timestamp, timestamp, sizeof(struct timespec)); qb_list_init(&rec->list); (void)qb_thread_lock(logt_wthread_lock); logt_memory_used += total_size; if (logt_memory_used > 512000) { free(rec->buffer); free(rec); logt_memory_used = logt_memory_used - total_size; logt_dropped_messages += 1; (void)qb_thread_unlock(logt_wthread_lock); return; } else { qb_list_add_tail(&rec->list, &logt_print_finished_records); } (void)qb_thread_unlock(logt_wthread_lock); sem_post(&logt_print_finished); return; free_record: free(rec); } void qb_log_thread_stop(void) { int res; int value; struct qb_log_record *rec; if (wthread_active == QB_FALSE && logt_wthread_lock == NULL) { return; } if (wthread_active == QB_FALSE) { for (;;) { res = sem_getvalue(&logt_print_finished, &value); if (res != 0 || value == 0) { break; } sem_wait(&logt_print_finished); (void)qb_thread_lock(logt_wthread_lock); rec = qb_list_first_entry(&logt_print_finished_records, struct qb_log_record, list); qb_list_del(&rec->list); logt_memory_used = logt_memory_used - strlen(rec->buffer) - sizeof(struct qb_log_record) - 1; qb_log_thread_log_write(rec->cs, &rec->timestamp, rec->buffer); (void)qb_thread_unlock(logt_wthread_lock); free(rec->buffer); free(rec); } } else { + (void)qb_thread_lock(logt_wthread_lock); wthread_should_exit = QB_TRUE; + (void)qb_thread_unlock(logt_wthread_lock); sem_post(&logt_print_finished); pthread_join(logt_thread_id, NULL); } (void)qb_thread_lock_destroy(logt_wthread_lock); sem_destroy(&logt_print_finished); sem_destroy(&logt_thread_start); }