diff --git a/lib/log_thread.c b/lib/log_thread.c index 0a97c6e..56008f8 100644 --- a/lib/log_thread.c +++ b/lib/log_thread.c @@ -1,279 +1,279 @@ /* * Copyright (C) 2011 Red Hat, Inc. * * All rights reserved. * * Author: Angus Salkeld * * libqb is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation, either version 2.1 of the License, or * (at your option) any later version. * * libqb is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with libqb. If not, see . */ #include "os_base.h" #include #include #include #include #include #include "log_int.h" static int wthread_active = QB_FALSE; static int wthread_should_exit = QB_FALSE; static qb_thread_lock_t *logt_wthread_lock = NULL; static QB_LIST_DECLARE(logt_print_finished_records); static int logt_memory_used = 0; static int logt_dropped_messages = 0; static sem_t logt_thread_start; static sem_t logt_print_finished; static int logt_sched_param_queued = QB_FALSE; static int logt_sched_policy; #if defined(HAVE_PTHREAD_SETSCHEDPARAM) && defined(HAVE_SCHED_GET_PRIORITY_MAX) static struct sched_param logt_sched_param; #endif /* HAVE_PTHREAD_SETSCHEDPARAM && HAVE_SCHED_GET_PRIORITY_MAX */ static pthread_t logt_thread_id = 0; static void *qb_logt_worker_thread(void *data) __attribute__ ((noreturn)); static void * qb_logt_worker_thread(void *data) { struct qb_log_record *rec; int dropped = 0; int res; /* * Signal wthread_create that the initialization process may continue */ sem_post(&logt_thread_start); for (;;) { retry_sem_wait: res = sem_wait(&logt_print_finished); if (res == -1 && errno == EINTR) { goto retry_sem_wait; } else if (res == -1) { /* * This case shouldn't happen */ pthread_exit(NULL); } (void)qb_thread_lock(logt_wthread_lock); if (wthread_should_exit) { int value = -1; (void)sem_getvalue(&logt_print_finished, &value); if (value == 0) { (void)qb_thread_unlock(logt_wthread_lock); pthread_exit(NULL); } } rec = qb_list_first_entry(&logt_print_finished_records, struct qb_log_record, list); qb_list_del(&rec->list); logt_memory_used = logt_memory_used - strlen(rec->buffer) - sizeof(struct qb_log_record) - 1; dropped = logt_dropped_messages; logt_dropped_messages = 0; (void)qb_thread_unlock(logt_wthread_lock); if (dropped) { printf("%d messages lost\n", dropped); } qb_log_thread_log_write(rec->cs, rec->timestamp, rec->buffer); free(rec->buffer); free(rec); } } int32_t qb_log_thread_priority_set(int32_t policy, int32_t priority) { int res = 0; #if defined(HAVE_PTHREAD_SETSCHEDPARAM) && defined(HAVE_SCHED_GET_PRIORITY_MAX) logt_sched_policy = policy; if (policy == SCHED_OTHER #ifdef SCHED_IDLE || policy == SCHED_IDLE #endif #if defined(SCHED_BATCH) && !defined(QB_DARWIN) || policy == SCHED_BATCH #endif ) { logt_sched_param.sched_priority = 0; } else { logt_sched_param.sched_priority = priority; } if (wthread_active == QB_FALSE) { logt_sched_param_queued = QB_TRUE; } else { res = pthread_setschedparam(logt_thread_id, policy, &logt_sched_param); if (res != 0) { res = -res; } } #endif return res; } int32_t qb_log_thread_start(void) { int res; if (wthread_active) { return 0; } wthread_active = QB_TRUE; sem_init(&logt_thread_start, 0, 0); sem_init(&logt_print_finished, 0, 0); res = pthread_create(&logt_thread_id, NULL, qb_logt_worker_thread, NULL); if (res != 0) { wthread_active = QB_FALSE; return -res; } sem_wait(&logt_thread_start); if (logt_sched_param_queued) { res = qb_log_thread_priority_set(logt_sched_policy, logt_sched_param.sched_priority); if (res != 0) { goto cleanup_pthread; } logt_sched_param_queued = QB_FALSE; } logt_wthread_lock = qb_thread_lock_create(QB_THREAD_LOCK_SHORT); if (logt_wthread_lock == NULL) { goto cleanup_pthread; } return 0; cleanup_pthread: wthread_should_exit = QB_TRUE; sem_post(&logt_print_finished); pthread_join(logt_thread_id, NULL); sem_destroy(&logt_print_finished); sem_destroy(&logt_thread_start); return res; } void qb_log_thread_log_post(struct qb_log_callsite *cs, time_t timestamp, const char *buffer) { struct qb_log_record *rec; size_t buf_size; size_t total_size; rec = malloc(sizeof(struct qb_log_record)); if (rec == NULL) { return; } buf_size = strlen(buffer) + 1; total_size = sizeof(struct qb_log_record) + buf_size; rec->cs = cs; rec->buffer = malloc(buf_size); if (rec->buffer == NULL) { goto free_record; } memcpy(rec->buffer, buffer, buf_size); rec->timestamp = timestamp; qb_list_init(&rec->list); (void)qb_thread_lock(logt_wthread_lock); logt_memory_used += total_size; if (logt_memory_used > 512000) { free(rec->buffer); free(rec); logt_memory_used = logt_memory_used - total_size; logt_dropped_messages += 1; (void)qb_thread_unlock(logt_wthread_lock); return; } else { qb_list_add_tail(&rec->list, &logt_print_finished_records); } (void)qb_thread_unlock(logt_wthread_lock); sem_post(&logt_print_finished); return; free_record: free(rec); } void qb_log_thread_stop(void) { int res; int value; struct qb_log_record *rec; if (wthread_active == QB_FALSE && logt_wthread_lock == NULL) { return; } if (wthread_active == QB_FALSE) { for (;;) { res = sem_getvalue(&logt_print_finished, &value); if (res != 0 || value == 0) { - return; + break; } sem_wait(&logt_print_finished); (void)qb_thread_lock(logt_wthread_lock); rec = qb_list_first_entry(&logt_print_finished_records, struct qb_log_record, list); qb_list_del(&rec->list); logt_memory_used = logt_memory_used - strlen(rec->buffer) - sizeof(struct qb_log_record) - 1; (void)qb_thread_unlock(logt_wthread_lock); qb_log_thread_log_write(rec->cs, rec->timestamp, rec->buffer); free(rec->buffer); free(rec); } } else { wthread_should_exit = QB_TRUE; sem_post(&logt_print_finished); pthread_join(logt_thread_id, NULL); } (void)qb_thread_lock_destroy(logt_wthread_lock); sem_destroy(&logt_print_finished); sem_destroy(&logt_thread_start); }