diff --git a/include/qb/qbrb.h b/include/qb/qbrb.h index fd37626..972d660 100644 --- a/include/qb/qbrb.h +++ b/include/qb/qbrb.h @@ -1,303 +1,304 @@ /* * Copyright (C) 2010 Red Hat, Inc. * * Author: Angus Salkeld * * This file is part of libqb. * * libqb is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation, either version 2.1 of the License, or * (at your option) any later version. * * libqb is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with libqb. If not, see . */ #ifndef QB_RB_H_DEFINED #define QB_RB_H_DEFINED /* *INDENT-OFF* */ #ifdef __cplusplus extern "C" { #endif /* *INDENT-ON* */ #include #include /** * @file qbrb.h * This implements a ring buffer that works in "chunks" not bytes. * So you write/read a complete chunk or not at all. * There are two types of ring buffer normal and overwrite. * Overwrite will reclaim the oldest chunks inorder to make way for new ones, * the normal version will refuse to write a new chunk if the ring buffer * is full. * * This implementation is capable of working across processes, but one process * must only write and the other prrocess read. * * The read process will do the following: * @code * rb = qb_rb_open("test2", 2000, QB_RB_FLAG_SHARED_PROCESS|QB_RB_FLAG_CREATE); * for (i = 0; i < 200; i++) { * try_read_again: * l = qb_rb_chunk_read(rb, (void *)out, 32, 1000); * if (l < 0) { * goto try_read_again; * } * } * ... * qb_rb_close(rb); * * @endcode * * The write process will do the following: * @code * rb = qb_rb_open("test2", 2000, QB_RB_FLAG_SHARED_PROCESS); * for (i = 0; i < 200; i++) { * try_write_again: * l = qb_rb_chunk_write(rb, &v, sizeof(v)); * if (l < sizeof(v)) { * goto try_write_again; * } * } * ... * qb_rb_close(rb); * @endcode * * @author Angus Salkeld */ /** * create a ring buffer (rather than open and existing one) * @see qb_rb_open() */ #define QB_RB_FLAG_CREATE 0x01 /** * New calls to qb_rb_chunk_write() will call qb_rb_chunk_reclaim() * if there is not enough space. * If this is not set then new writes will be refused. * @see qb_rb_open() */ #define QB_RB_FLAG_OVERWRITE 0x02 /** * The ringbuffer will be shared between pthreads not processes. * This effects the type of locks/semaphores that are used. * @see qb_rb_open() */ #define QB_RB_FLAG_SHARED_THREAD 0x04 /** * The ringbuffer will be shared between processes. * This effects the type of locks/semaphores that are used. * @see qb_rb_open() */ #define QB_RB_FLAG_SHARED_PROCESS 0x08 /** * Don't use semaphores, only atomic ops. * This mean that the timeout passed into qb_rb_chunk_read() * will be ignored. */ #define QB_RB_FLAG_NO_SEMAPHORE 0x10 struct qb_ringbuffer_s; typedef struct qb_ringbuffer_s qb_ringbuffer_t; /** * Create the ring buffer with the given type. * * This creates allocates a ring buffer in shared memory. * * @param name the unique name of this ringbuffer. * @param size the requested size. * @param flags or'ed flags * @param shared_user_data_size size for a shared data area. * @note the actual size will be rounded up to the next page size. * @return a new ring buffer or NULL if there was a problem. * @see QB_RB_FLAG_CREATE, QB_RB_FLAG_OVERWRITE, QB_RB_FLAG_SHARED_THREAD, QB_RB_FLAG_SHARED_PROCESS */ qb_ringbuffer_t *qb_rb_open(const char *name, size_t size, uint32_t flags, size_t shared_user_data_size); /** * Dereference the ringbuffer and if we are the last user destroy it. * * All files, mmaped memory, semaphores and locks will be destroyed. * * @param rb ringbuffer instance */ void qb_rb_close(qb_ringbuffer_t * rb); /** * Get the name of the ringbuffer. * @param rb ringbuffer instance * @return name. */ char *qb_rb_name_get(qb_ringbuffer_t * rb); /** * Get a point to user shared data area. * * @note this is of size "shared_user_data_size" passed into qb_rb_open() * * @param rb ringbuffer instance * @return pointer to shared data. */ void *qb_rb_shared_user_data_get(qb_ringbuffer_t * rb); /** * Write a chunk to the ring buffer. * * This simply calls qb_rb_chunk_alloc() and then * qb_rb_chunk_commit(). * * @param rb ringbuffer instance * @param data (in) the data to write * @param len (in) the size of the chunk. * @return the amount of bytes actually buffered (either len or -1). * * @see qb_rb_chunk_alloc() * @see qb_rb_chunk_commit() */ ssize_t qb_rb_chunk_write(qb_ringbuffer_t * rb, const void *data, size_t len); /** * Allocate space for a chunk of the given size. * - * If type == QB_RB_FLAG_OVERWRITE then this will always return non-null - * but if it's type is QB_RB_NORMAL then when there is not enough space then - * it will return NULL. + * If type == QB_RB_FLAG_OVERWRITE and NULL is returned, memory corruption of + * the memory file has occured. The ringbuffer should be destroyed. + * If type == QB_RB_NORMAL then when there is not enough space it will + * return NULL. * * @param rb ringbuffer instance * @param len (in) the size to allocate. * @return pointer to chunk to write to, or NULL (if no space). * * @see qb_rb_chunk_alloc() */ void *qb_rb_chunk_alloc(qb_ringbuffer_t * rb, size_t len); /** * finalize the chunk. * @param rb ringbuffer instance * @param len (in) the size of the chunk. */ int32_t qb_rb_chunk_commit(qb_ringbuffer_t * rb, size_t len); /** * Read (without reclaiming) the last chunk. * * This function is a way of accessing the next chunk without a memcpy(). * You can read the chunk data in place. * * @note This function will not "pop" the chunk, you will need to call * qb_rb_chunk_reclaim(). * @param rb ringbuffer instance * @param data_out (out) a pointer to the next chunk to read (not copied). * @param ms_timeout (in) time to wait for new data. * * @return the size of the chunk (0 if buffer empty). */ ssize_t qb_rb_chunk_peek(qb_ringbuffer_t * rb, void **data_out, int32_t ms_timeout); /** * Reclaim the oldest chunk. * You will need to call this if using qb_rb_chunk_peek(). * @param rb ringbuffer instance */ void qb_rb_chunk_reclaim(qb_ringbuffer_t * rb); /** * Read the oldest chunk into data_out. * * This is the same as qb_rb_chunk_peek() memcpy() and qb_rb_chunk_reclaim(). * * @param rb ringbuffer instance * @param data_out (in/out) the chunk will be memcpy'ed into this. * @param len (in) the size of data_out. * @param ms_timeout the amount od time to wait for new data. * @return the size of the chunk, or error. */ ssize_t qb_rb_chunk_read(qb_ringbuffer_t * rb, void *data_out, size_t len, int32_t ms_timeout); /** * Get the reference count. * * @param rb ringbuffer instance * @return the number of references */ int32_t qb_rb_refcount_get(qb_ringbuffer_t * rb); /** * The amount of free space in the ring buffer. * * @note Some of this space will be consumed by the chunk headers. * @param rb ringbuffer instance */ ssize_t qb_rb_space_free(qb_ringbuffer_t * rb); /** * The total amount of data in the buffer. * * @note This includes the chunk headers (8 bytes per chunk). * @param rb ringbuffer instance */ ssize_t qb_rb_space_used(qb_ringbuffer_t * rb); /** * The total number of chunks in the buffer. * * @param rb ringbuffer instance */ ssize_t qb_rb_chunks_used(qb_ringbuffer_t * rb); /** * Write the contents of the Ring Buffer to file. * @param fd open file to write the ringbuffer data to. * @param rb ringbuffer instance * @see qb_rb_create_from_file() */ ssize_t qb_rb_write_to_file(qb_ringbuffer_t * rb, int32_t fd); /** * Load the saved ring buffer from file into tempory memory. * @param fd file with saved ringbuffer data. * @param flags same flags as passed into qb_rb_open() * @return new ringbuffer instance * @see qb_rb_write_to_file() */ qb_ringbuffer_t *qb_rb_create_from_file(int32_t fd, uint32_t flags); /** * Like 'chown' it changes the owner and group of the ringbuffers * resources. * @param owner uid of the owner to change to * @param group gid of the group to change to * @param rb ringbuffer instance * @return status (0 = ok, -errno for error) */ int32_t qb_rb_chown(qb_ringbuffer_t * rb, uid_t owner, gid_t group); /** * Like 'chmod' it changes the mode of the ringbuffers resources. * @param mode mode to change to * @param rb ringbuffer instance * @retval 0 == ok * @retval -errno for error */ int32_t qb_rb_chmod(qb_ringbuffer_t * rb, mode_t mode); /* *INDENT-OFF* */ #ifdef __cplusplus } #endif /* __cplusplus */ /* *INDENT-ON* */ #endif /* QB_RB_H_DEFINED */ diff --git a/lib/log_blackbox.c b/lib/log_blackbox.c index 8f42457..cac6787 100644 --- a/lib/log_blackbox.c +++ b/lib/log_blackbox.c @@ -1,289 +1,297 @@ /* * Copyright (C) 2011 Red Hat, Inc. * * All rights reserved. * * Author: Angus Salkeld * * libqb is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation, either version 2.1 of the License, or * (at your option) any later version. * * libqb is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with libqb. If not, see . */ #include "os_base.h" #include #include "util_int.h" #include "log_int.h" #define BB_MIN_ENTRY_SIZE (4 * sizeof(uint32_t) +\ sizeof(uint8_t) +\ 2 * sizeof(char) + sizeof(time_t)) static void _blackbox_reload(int32_t target) { struct qb_log_target *t = qb_log_target_get(target); if (t->instance == NULL) { return; } qb_rb_close(t->instance); t->instance = qb_rb_open(t->filename, t->size, QB_RB_FLAG_CREATE | QB_RB_FLAG_OVERWRITE, 0); } /* file lineno * tags * priority * function name length * function name * buffer length * buffer */ static void _blackbox_vlogger(int32_t target, struct qb_log_callsite *cs, time_t timestamp, va_list ap) { size_t max_size; size_t actual_size; uint32_t fn_size; char *chunk; char *msg_len_pt; uint32_t msg_len; struct qb_log_target *t = qb_log_target_get(target); if (t->instance == NULL) { return; } fn_size = strlen(cs->function) + 1; actual_size = 4 * sizeof(uint32_t) + sizeof(uint8_t) + fn_size + sizeof(time_t); max_size = actual_size + QB_LOG_MAX_LEN; chunk = qb_rb_chunk_alloc(t->instance, max_size); + if (chunk == NULL) { + /* something bad has happened. abort blackbox logging */ + qb_util_perror(LOG_ERR, "Blackbox allocation error, aborting blackbox log %s", t->filename); + qb_rb_close(t->instance); + t->instance = NULL; + return; + } + /* line number */ memcpy(chunk, &cs->lineno, sizeof(uint32_t)); chunk += sizeof(uint32_t); /* tags */ memcpy(chunk, &cs->tags, sizeof(uint32_t)); chunk += sizeof(uint32_t); /* log level/priority */ memcpy(chunk, &cs->priority, sizeof(uint8_t)); chunk += sizeof(uint8_t); /* function name */ memcpy(chunk, &fn_size, sizeof(uint32_t)); chunk += sizeof(uint32_t); memcpy(chunk, cs->function, fn_size); chunk += fn_size; /* timestamp */ memcpy(chunk, ×tamp, sizeof(time_t)); chunk += sizeof(time_t); /* log message length */ msg_len_pt = chunk; chunk += sizeof(uint32_t); /* log message */ msg_len = qb_vsnprintf_serialize(chunk, QB_LOG_MAX_LEN, cs->format, ap); if (msg_len >= QB_LOG_MAX_LEN) { chunk = msg_len_pt + sizeof(uint32_t); /* Reset */ msg_len = qb_vsnprintf_serialize(chunk, QB_LOG_MAX_LEN, "Log message too long to be stored in the blackbox. "\ "Maximum is QB_LOG_MAX_LEN" , ap); actual_size += msg_len; } actual_size += msg_len; /* now that we know the length, write it */ memcpy(msg_len_pt, &msg_len, sizeof(uint32_t)); (void)qb_rb_chunk_commit(t->instance, actual_size); } static void _blackbox_close(int32_t target) { struct qb_log_target *t = qb_log_target_get(target); if (t->instance) { qb_rb_close(t->instance); t->instance = NULL; } } int32_t qb_log_blackbox_open(struct qb_log_target *t) { if (t->size < 1024) { return -EINVAL; } snprintf(t->filename, PATH_MAX, "%s-%d-blackbox", t->name, getpid()); t->instance = qb_rb_open(t->filename, t->size, QB_RB_FLAG_CREATE | QB_RB_FLAG_OVERWRITE, 0); if (t->instance == NULL) { return -errno; } t->logger = NULL; t->vlogger = _blackbox_vlogger; t->reload = _blackbox_reload; t->close = _blackbox_close; return 0; } ssize_t qb_log_blackbox_write_to_file(const char *filename) { ssize_t written_size = 0; struct qb_log_target *t; int fd = open(filename, O_CREAT | O_RDWR, 0700); if (fd < 0) { return -errno; } t = qb_log_target_get(QB_LOG_BLACKBOX); if (t->instance) { written_size = qb_rb_write_to_file(t->instance, fd); } else { written_size = -ENOENT; } close(fd); return written_size; } void qb_log_blackbox_print_from_file(const char *bb_filename) { qb_ringbuffer_t *instance; ssize_t bytes_read; int max_size = 2 * QB_LOG_MAX_LEN; char *chunk; int fd; char time_buf[64]; fd = open(bb_filename, 0); if (fd < 0) { qb_util_perror(LOG_ERR, "qb_log_blackbox_print_from_file"); return; } instance = qb_rb_create_from_file(fd, 0); close(fd); if (instance == NULL) { return; } chunk = malloc(max_size); do { char *ptr; uint32_t lineno; uint32_t tags; uint8_t priority; uint32_t fn_size; char *function; uint32_t len; time_t timestamp; uint32_t msg_len; struct tm *tm; char message[QB_LOG_MAX_LEN]; bytes_read = qb_rb_chunk_read(instance, chunk, max_size, 0); if (bytes_read >= 0 && bytes_read < BB_MIN_ENTRY_SIZE) { printf("ERROR Corrupt file: blackbox header too small.\n"); goto cleanup; } else if (bytes_read < 0) { errno = -bytes_read; perror("ERROR: qb_rb_chunk_read failed"); goto cleanup; } ptr = chunk; /* lineno */ memcpy(&lineno, ptr, sizeof(uint32_t)); ptr += sizeof(uint32_t); /* tags */ memcpy(&tags, ptr, sizeof(uint32_t)); ptr += sizeof(uint32_t); /* priority */ memcpy(&priority, ptr, sizeof(uint8_t)); ptr += sizeof(uint8_t); /* function size & name */ memcpy(&fn_size, ptr, sizeof(uint32_t)); if ((fn_size + BB_MIN_ENTRY_SIZE) > bytes_read) { printf("ERROR Corrupt file: fn_size way too big %d\n", fn_size); goto cleanup; } if (fn_size <= 0) { printf("ERROR Corrupt file: fn_size negative %d\n", fn_size); goto cleanup; } ptr += sizeof(uint32_t); function = ptr; ptr += fn_size; /* timestamp size & content */ memcpy(×tamp, ptr, sizeof(time_t)); ptr += sizeof(time_t); tm = localtime(×tamp); if (tm) { (void)strftime(time_buf, sizeof(time_buf), "%b %d %T", tm); } else { snprintf(time_buf, sizeof(time_buf), "%ld", (long int)timestamp); } /* message length */ memcpy(&msg_len, ptr, sizeof(uint32_t)); if (msg_len > QB_LOG_MAX_LEN || msg_len <= 0) { printf("ERROR Corrupt file: msg_len out of bounds %d\n", msg_len); goto cleanup; } ptr += sizeof(uint32_t); /* message content */ len = qb_vsnprintf_deserialize(message, QB_LOG_MAX_LEN, ptr); assert(len > 0); message[len] = '\0'; len--; while (len > 0 && (message[len] == '\n' || message[len] == '\0')) { message[len] = '\0'; len--; } printf("%-7s %s %s(%u):%u: %s\n", qb_log_priority2str(priority), time_buf, function, lineno, tags, message); } while (bytes_read > BB_MIN_ENTRY_SIZE); cleanup: qb_rb_close(instance); free(chunk); } diff --git a/lib/ringbuffer.c b/lib/ringbuffer.c index 9bb56a0..110aee8 100644 --- a/lib/ringbuffer.c +++ b/lib/ringbuffer.c @@ -1,954 +1,956 @@ /* * Copyright (C) 2010-2011 Red Hat, Inc. * * Author: Angus Salkeld * * This file is part of libqb. * * libqb is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation, either version 2.1 of the License, or * (at your option) any later version. * * libqb is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with libqb. If not, see . */ #include "ringbuffer_int.h" #include #include "atomic_int.h" #define QB_RB_FILE_HEADER_VERSION 1 /* * #define CRAZY_DEBUG_PRINTFS 1 */ #ifdef CRAZY_DEBUG_PRINTFS #define DEBUG_PRINTF(format, args...) \ do { \ printf(format, ##args); \ } while(0) #else #define DEBUG_PRINTF(format, args...) #endif /* CRAZY_DEBUG_PRINTFS */ /* * move the write pointer to the next 128 byte boundary * write_pt goes in 4 bytes (sizeof(uint32_t)) * #define USE_CACHE_LINE_ALIGNMENT 1 */ #ifdef USE_CACHE_LINE_ALIGNMENT #define QB_CACHE_LINE_SIZE 128 #define QB_CACHE_LINE_WORDS (QB_CACHE_LINE_SIZE/sizeof(uint32_t)) #define idx_cache_line_step(idx) \ do { \ if (idx % QB_CACHE_LINE_WORDS) { \ idx += (QB_CACHE_LINE_WORDS - (idx % QB_CACHE_LINE_WORDS)); \ } \ if (idx > (rb->shared_hdr->word_size - 1)) { \ idx = ((idx) % (rb->shared_hdr->word_size)); \ } \ } while (0) #else #define QB_CACHE_LINE_SIZE 0 #define QB_CACHE_LINE_WORDS 0 #define idx_cache_line_step(idx) \ do { \ if (idx > (rb->shared_hdr->word_size - 1)) { \ idx = ((idx) % (rb->shared_hdr->word_size)); \ } \ } while (0) #endif /* the chunk header is two words * 1) the chunk data size * 2) the magic number */ #define QB_RB_CHUNK_HEADER_WORDS 2 #define QB_RB_CHUNK_HEADER_SIZE (sizeof(uint32_t) * QB_RB_CHUNK_HEADER_WORDS) /* * margin is the gap we leave when checking to see if we have enough * space for a new chunk. * So: * qb_rb_space_free() >= QB_RB_CHUNK_MARGIN + new data chunk * The extra word size is to allow for non word sized data chunks. * QB_CACHE_LINE_WORDS is to make sure we have space to align the * chunk. */ #define QB_RB_WORD_ALIGN 1 #define QB_RB_CHUNK_MARGIN (sizeof(uint32_t) * (QB_RB_CHUNK_HEADER_WORDS +\ QB_RB_WORD_ALIGN +\ QB_CACHE_LINE_WORDS)) #define QB_RB_CHUNK_MAGIC 0xA1A1A1A1 #define QB_RB_CHUNK_MAGIC_DEAD 0xD0D0D0D0 #define QB_RB_CHUNK_MAGIC_ALLOC 0xA110CED0 #define QB_RB_CHUNK_SIZE_GET(rb, pointer) rb->shared_data[pointer] #define QB_RB_CHUNK_MAGIC_GET(rb, pointer) \ qb_atomic_int_get_ex((int32_t*)&rb->shared_data[(pointer + 1) % rb->shared_hdr->word_size], \ QB_ATOMIC_ACQUIRE) #define QB_RB_CHUNK_MAGIC_SET(rb, pointer, new_val) \ qb_atomic_int_set_ex((int32_t*)&rb->shared_data[(pointer + 1) % rb->shared_hdr->word_size], \ new_val, QB_ATOMIC_RELEASE) #define QB_RB_CHUNK_DATA_GET(rb, pointer) \ &rb->shared_data[(pointer + QB_RB_CHUNK_HEADER_WORDS) % rb->shared_hdr->word_size] #define QB_MAGIC_ASSERT(_ptr_) \ do { \ uint32_t chunk_magic = QB_RB_CHUNK_MAGIC_GET(rb, _ptr_); \ if (chunk_magic != QB_RB_CHUNK_MAGIC) print_header(rb); \ assert(chunk_magic == QB_RB_CHUNK_MAGIC); \ } while (0) #define idx_step(idx) \ do { \ if (idx > (rb->shared_hdr->word_size - 1)) { \ idx = ((idx) % (rb->shared_hdr->word_size)); \ } \ } while (0) static void print_header(struct qb_ringbuffer_s * rb); static int _rb_chunk_reclaim(struct qb_ringbuffer_s * rb); qb_ringbuffer_t * qb_rb_open(const char *name, size_t size, uint32_t flags, size_t shared_user_data_size) { return qb_rb_open_2(name, size, flags, shared_user_data_size, NULL); } qb_ringbuffer_t * qb_rb_open_2(const char *name, size_t size, uint32_t flags, size_t shared_user_data_size, struct qb_rb_notifier *notifiers) { struct qb_ringbuffer_s *rb; size_t real_size; size_t shared_size; char path[PATH_MAX]; int32_t fd_hdr; int32_t fd_data; uint32_t file_flags = O_RDWR; char filename[PATH_MAX]; int32_t error = 0; void *shm_addr; long page_size = sysconf(_SC_PAGESIZE); #ifdef QB_FORCE_SHM_ALIGN page_size = QB_MAX(page_size, 16 * 1024); #endif /* QB_FORCE_SHM_ALIGN */ /* The user of this api expects the 'size' parameter passed into this function * to be reflective of the max size single write we can do to the * ringbuffer. This means we have to add both the 'margin' space used * to calculate if there is enough space for a new chunk as well as the '+1' that * prevents overlap of the read/write pointers */ size += QB_RB_CHUNK_MARGIN + 1; real_size = QB_ROUNDUP(size, page_size); shared_size = sizeof(struct qb_ringbuffer_shared_s) + shared_user_data_size; if (flags & QB_RB_FLAG_CREATE) { file_flags |= O_CREAT | O_TRUNC; } rb = calloc(1, sizeof(struct qb_ringbuffer_s)); if (rb == NULL) { return NULL; } /* * Create a shared_hdr memory segment for the header. */ snprintf(filename, PATH_MAX, "qb-%s-header", name); fd_hdr = qb_sys_mmap_file_open(path, filename, shared_size, file_flags); if (fd_hdr < 0) { error = fd_hdr; qb_util_log(LOG_ERR, "couldn't create file for mmap"); goto cleanup_hdr; } rb->shared_hdr = mmap(0, shared_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd_hdr, 0); if (rb->shared_hdr == MAP_FAILED) { error = -errno; qb_util_log(LOG_ERR, "couldn't create mmap for header"); goto cleanup_hdr; } qb_atomic_init(); rb->flags = flags; /* * create the semaphore */ if (flags & QB_RB_FLAG_CREATE) { rb->shared_data = NULL; /* rb->shared_hdr->word_size tracks data by ints and not bytes/chars. */ rb->shared_hdr->word_size = real_size / sizeof(uint32_t); rb->shared_hdr->write_pt = 0; rb->shared_hdr->read_pt = 0; (void)strlcpy(rb->shared_hdr->hdr_path, path, PATH_MAX); } if (notifiers && notifiers->post_fn) { error = 0; memcpy(&rb->notifier, notifiers, sizeof(struct qb_rb_notifier)); } else { error = qb_rb_sem_create(rb, flags); } if (error < 0) { errno = -error; qb_util_perror(LOG_ERR, "couldn't create a semaphore"); goto cleanup_hdr; } /* Create the shared_data memory segment for the actual ringbuffer. * They have to be separate. */ if (flags & QB_RB_FLAG_CREATE) { snprintf(filename, PATH_MAX, "qb-%s-data", name); fd_data = qb_sys_mmap_file_open(path, filename, real_size, file_flags); (void)strlcpy(rb->shared_hdr->data_path, path, PATH_MAX); } else { fd_data = qb_sys_mmap_file_open(path, rb->shared_hdr->data_path, real_size, file_flags); } if (fd_data < 0) { error = fd_data; qb_util_log(LOG_ERR, "couldn't create file for mmap"); goto cleanup_hdr; } qb_util_log(LOG_DEBUG, "shm size:%zd; real_size:%zd; rb->word_size:%d", size, real_size, rb->shared_hdr->word_size); /* this function closes fd_data */ error = qb_sys_circular_mmap(fd_data, &shm_addr, real_size); rb->shared_data = shm_addr; if (error != 0) { qb_util_log(LOG_ERR, "couldn't create circular mmap on %s", rb->shared_hdr->data_path); goto cleanup_data; } if (flags & QB_RB_FLAG_CREATE) { memset(rb->shared_data, 0, real_size); rb->shared_data[rb->shared_hdr->word_size] = 5; rb->shared_hdr->ref_count = 1; } else { qb_atomic_int_inc(&rb->shared_hdr->ref_count); } close(fd_hdr); return rb; cleanup_data: if (flags & QB_RB_FLAG_CREATE) { unlink(rb->shared_hdr->data_path); } cleanup_hdr: if (fd_hdr >= 0) { close(fd_hdr); } if (rb && (flags & QB_RB_FLAG_CREATE)) { unlink(rb->shared_hdr->hdr_path); if (rb->notifier.destroy_fn) { (void)rb->notifier.destroy_fn(rb->notifier.instance); } } if (rb && (rb->shared_hdr != MAP_FAILED && rb->shared_hdr != NULL)) { munmap(rb->shared_hdr, sizeof(struct qb_ringbuffer_shared_s)); } free(rb); errno = -error; return NULL; } void qb_rb_close(struct qb_ringbuffer_s * rb) { if (rb == NULL) { return; } qb_enter(); (void)qb_atomic_int_dec_and_test(&rb->shared_hdr->ref_count); if (rb->flags & QB_RB_FLAG_CREATE) { if (rb->notifier.destroy_fn) { (void)rb->notifier.destroy_fn(rb->notifier.instance); } unlink(rb->shared_hdr->data_path); unlink(rb->shared_hdr->hdr_path); qb_util_log(LOG_DEBUG, "Free'ing ringbuffer: %s", rb->shared_hdr->hdr_path); } else { qb_util_log(LOG_DEBUG, "Closing ringbuffer: %s", rb->shared_hdr->hdr_path); } munmap(rb->shared_data, (rb->shared_hdr->word_size * sizeof(uint32_t)) << 1); munmap(rb->shared_hdr, sizeof(struct qb_ringbuffer_shared_s)); free(rb); } void qb_rb_force_close(struct qb_ringbuffer_s * rb) { if (rb == NULL) { return; } qb_enter(); if (rb->notifier.destroy_fn) { (void)rb->notifier.destroy_fn(rb->notifier.instance); } errno = 0; unlink(rb->shared_hdr->data_path); qb_util_perror(LOG_DEBUG, "Force free'ing ringbuffer: %s", rb->shared_hdr->data_path); errno = 0; unlink(rb->shared_hdr->hdr_path); qb_util_perror(LOG_DEBUG, "Force free'ing ringbuffer: %s", rb->shared_hdr->hdr_path); munmap(rb->shared_data, (rb->shared_hdr->word_size * sizeof(uint32_t)) << 1); munmap(rb->shared_hdr, sizeof(struct qb_ringbuffer_shared_s)); free(rb); } char * qb_rb_name_get(struct qb_ringbuffer_s * rb) { if (rb == NULL) { return NULL; } return rb->shared_hdr->hdr_path; } void * qb_rb_shared_user_data_get(struct qb_ringbuffer_s * rb) { if (rb == NULL) { return NULL; } return rb->shared_hdr->user_data; } int32_t qb_rb_refcount_get(struct qb_ringbuffer_s * rb) { if (rb == NULL) { return -EINVAL; } return qb_atomic_int_get(&rb->shared_hdr->ref_count); } ssize_t qb_rb_space_free(struct qb_ringbuffer_s * rb) { uint32_t write_size; uint32_t read_size; size_t space_free = 0; if (rb == NULL) { return -EINVAL; } if (rb->notifier.space_used_fn) { return (rb->shared_hdr->word_size * sizeof(uint32_t)) - rb->notifier.space_used_fn(rb->notifier.instance); } write_size = rb->shared_hdr->write_pt; read_size = rb->shared_hdr->read_pt; if (write_size > read_size) { space_free = (read_size - write_size + rb->shared_hdr->word_size) - 1; } else if (write_size < read_size) { space_free = (read_size - write_size) - 1; } else { if (rb->notifier.q_len_fn && rb->notifier.q_len_fn(rb->notifier.instance) > 0) { space_free = 0; } else { space_free = rb->shared_hdr->word_size; } } /* word -> bytes */ return (space_free * sizeof(uint32_t)); } ssize_t qb_rb_space_used(struct qb_ringbuffer_s * rb) { uint32_t write_size; uint32_t read_size; size_t space_used; if (rb == NULL) { return -EINVAL; } if (rb->notifier.space_used_fn) { return rb->notifier.space_used_fn(rb->notifier.instance); } write_size = rb->shared_hdr->write_pt; read_size = rb->shared_hdr->read_pt; if (write_size > read_size) { space_used = write_size - read_size; } else if (write_size < read_size) { space_used = (write_size - read_size + rb->shared_hdr->word_size) - 1; } else { space_used = 0; } /* word -> bytes */ return (space_used * sizeof(uint32_t)); } ssize_t qb_rb_chunks_used(struct qb_ringbuffer_s *rb) { if (rb == NULL) { return -EINVAL; } if (rb->notifier.q_len_fn) { return rb->notifier.q_len_fn(rb->notifier.instance); } return -ENOTSUP; } void * qb_rb_chunk_alloc(struct qb_ringbuffer_s * rb, size_t len) { uint32_t write_pt; if (rb == NULL) { errno = EINVAL; return NULL; } /* * Reclaim data if we are over writing and we need space */ if (rb->flags & QB_RB_FLAG_OVERWRITE) { while (qb_rb_space_free(rb) < (len + QB_RB_CHUNK_MARGIN)) { int rc = _rb_chunk_reclaim(rb); - /* reclaim failure during overwrite results in an abort. */ - assert(rc == 0); + if (rc != 0) { + errno = rc; + return NULL; + } } } else { if (qb_rb_space_free(rb) < (len + QB_RB_CHUNK_MARGIN)) { errno = EAGAIN; return NULL; } } write_pt = rb->shared_hdr->write_pt; /* * insert the chunk header */ rb->shared_data[write_pt] = 0; QB_RB_CHUNK_MAGIC_SET(rb, write_pt, QB_RB_CHUNK_MAGIC_ALLOC); /* * return a pointer to the beginning of the chunk data */ return (void *)QB_RB_CHUNK_DATA_GET(rb, write_pt); } static uint32_t qb_rb_chunk_step(struct qb_ringbuffer_s * rb, uint32_t pointer) { uint32_t chunk_size = QB_RB_CHUNK_SIZE_GET(rb, pointer); /* * skip over the chunk header */ pointer += QB_RB_CHUNK_HEADER_WORDS; /* * skip over the user's data. */ pointer += (chunk_size / sizeof(uint32_t)); /* make allowance for non-word sizes */ if ((chunk_size % (sizeof(uint32_t) * QB_RB_WORD_ALIGN)) != 0) { pointer++; } idx_cache_line_step(pointer); return pointer; } int32_t qb_rb_chunk_commit(struct qb_ringbuffer_s * rb, size_t len) { uint32_t old_write_pt; if (rb == NULL) { return -EINVAL; } /* * commit the magic & chunk_size */ old_write_pt = rb->shared_hdr->write_pt; rb->shared_data[old_write_pt] = len; /* * commit the new write pointer */ rb->shared_hdr->write_pt = qb_rb_chunk_step(rb, old_write_pt); QB_RB_CHUNK_MAGIC_SET(rb, old_write_pt, QB_RB_CHUNK_MAGIC); DEBUG_PRINTF("commit [%zd] read: %u, write: %u -> %u (%u)\n", (rb->notifier.q_len_fn ? rb->notifier.q_len_fn(rb->notifier.instance) : 0), rb->shared_hdr->read_pt, old_write_pt, rb->shared_hdr->write_pt, rb->shared_hdr->word_size); /* * post the notification to the reader */ if (rb->notifier.post_fn) { return rb->notifier.post_fn(rb->notifier.instance, len); } return 0; } ssize_t qb_rb_chunk_write(struct qb_ringbuffer_s * rb, const void *data, size_t len) { char *dest = qb_rb_chunk_alloc(rb, len); int32_t res = 0; if (rb == NULL) { return -EINVAL; } if (dest == NULL) { return -errno; } memcpy(dest, data, len); res = qb_rb_chunk_commit(rb, len); if (res < 0) { return res; } return len; } static int _rb_chunk_reclaim(struct qb_ringbuffer_s * rb) { uint32_t old_read_pt; uint32_t new_read_pt; uint32_t old_chunk_size; uint32_t chunk_magic; int rc = 0; old_read_pt = rb->shared_hdr->read_pt; chunk_magic = QB_RB_CHUNK_MAGIC_GET(rb, old_read_pt); if (chunk_magic != QB_RB_CHUNK_MAGIC) { return -EINVAL; } old_chunk_size = QB_RB_CHUNK_SIZE_GET(rb, old_read_pt); new_read_pt = qb_rb_chunk_step(rb, old_read_pt); /* * clear the header */ rb->shared_data[old_read_pt] = 0; QB_RB_CHUNK_MAGIC_SET(rb, old_read_pt, QB_RB_CHUNK_MAGIC_DEAD); /* * set the new read pointer after clearing the header * to prevent a situation where a fast writer will write their * new chunk between setting the new read pointer and clearing the * header. */ rb->shared_hdr->read_pt = new_read_pt; if (rb->notifier.reclaim_fn) { rc = rb->notifier.reclaim_fn(rb->notifier.instance, old_chunk_size); if (rc < 0) { errno = -rc; qb_util_perror(LOG_WARNING, "reclaim_fn"); } } DEBUG_PRINTF("reclaim [%zd]: read: %u -> %u, write: %u\n", (rb->notifier.q_len_fn ? rb->notifier.q_len_fn(rb->notifier.instance) : 0), old_read_pt, rb->shared_hdr->read_pt, rb->shared_hdr->write_pt); return rc; } void qb_rb_chunk_reclaim(struct qb_ringbuffer_s * rb) { if (rb == NULL) { return; } _rb_chunk_reclaim(rb); } ssize_t qb_rb_chunk_peek(struct qb_ringbuffer_s * rb, void **data_out, int32_t timeout) { uint32_t read_pt; uint32_t chunk_size; uint32_t chunk_magic; int32_t res = 0; if (rb == NULL) { return -EINVAL; } if (rb->notifier.timedwait_fn) { res = rb->notifier.timedwait_fn(rb->notifier.instance, timeout); } if (res < 0 && res != -EIDRM) { if (res == -ETIMEDOUT) { return 0; } else { errno = -res; qb_util_perror(LOG_ERR, "sem_timedwait"); } return res; } read_pt = rb->shared_hdr->read_pt; chunk_magic = QB_RB_CHUNK_MAGIC_GET(rb, read_pt); if (chunk_magic != QB_RB_CHUNK_MAGIC) { if (rb->notifier.post_fn) { (void)rb->notifier.post_fn(rb->notifier.instance, res); } return 0; } chunk_size = QB_RB_CHUNK_SIZE_GET(rb, read_pt); *data_out = QB_RB_CHUNK_DATA_GET(rb, read_pt); return chunk_size; } ssize_t qb_rb_chunk_read(struct qb_ringbuffer_s * rb, void *data_out, size_t len, int32_t timeout) { uint32_t read_pt; uint32_t chunk_size; uint32_t chunk_magic; int32_t res = 0; if (rb == NULL) { return -EINVAL; } if (rb->notifier.timedwait_fn) { res = rb->notifier.timedwait_fn(rb->notifier.instance, timeout); } if (res < 0 && res != -EIDRM) { if (res != -ETIMEDOUT) { errno = -res; qb_util_perror(LOG_ERR, "sem_timedwait"); } return res; } read_pt = rb->shared_hdr->read_pt; chunk_magic = QB_RB_CHUNK_MAGIC_GET(rb, read_pt); if (chunk_magic != QB_RB_CHUNK_MAGIC) { if (rb->notifier.timedwait_fn == NULL) { return -ETIMEDOUT; } else { (void)rb->notifier.post_fn(rb->notifier.instance, res); #ifdef EBADMSG return -EBADMSG; #else return -EINVAL; #endif } } chunk_size = QB_RB_CHUNK_SIZE_GET(rb, read_pt); if (len < chunk_size) { qb_util_log(LOG_ERR, "trying to recv chunk of size %d but %d available", len, chunk_size); if (rb->notifier.post_fn) { (void)rb->notifier.post_fn(rb->notifier.instance, chunk_size); } return -ENOBUFS; } memcpy(data_out, QB_RB_CHUNK_DATA_GET(rb, read_pt), chunk_size); _rb_chunk_reclaim(rb); return chunk_size; } static void print_header(struct qb_ringbuffer_s * rb) { printf("Ringbuffer: \n"); if (rb->flags & QB_RB_FLAG_OVERWRITE) { printf(" ->OVERWRITE\n"); } else { printf(" ->NORMAL\n"); } printf(" ->write_pt [%d]\n", rb->shared_hdr->write_pt); printf(" ->read_pt [%d]\n", rb->shared_hdr->read_pt); printf(" ->size [%d words]\n", rb->shared_hdr->word_size); #ifndef S_SPLINT_S printf(" =>free [%zu bytes]\n", qb_rb_space_free(rb)); printf(" =>used [%zu bytes]\n", qb_rb_space_used(rb)); #endif /* S_SPLINT_S */ } /* * FILE HEADER ORDER * 1. word_size * 2. write_pt * 3. read_pt * 4. version * 5. header_hash * * 6. data */ ssize_t qb_rb_write_to_file(struct qb_ringbuffer_s * rb, int32_t fd) { ssize_t result; ssize_t written_size = 0; uint32_t hash = 0; uint32_t version = QB_RB_FILE_HEADER_VERSION; if (rb == NULL) { return -EINVAL; } print_header(rb); /* * 1. word_size */ result = write(fd, &rb->shared_hdr->word_size, sizeof(uint32_t)); if (result != sizeof(uint32_t)) { return -errno; } written_size += result; /* * 2. 3. store the read & write pointers */ result = write(fd, (void *)&rb->shared_hdr->write_pt, sizeof(uint32_t)); if (result != sizeof(uint32_t)) { return -errno; } written_size += result; result = write(fd, (void *)&rb->shared_hdr->read_pt, sizeof(uint32_t)); if (result != sizeof(uint32_t)) { return -errno; } written_size += result; /* * 4. version used */ result = write(fd, &version, sizeof(uint32_t)); if (result != sizeof(uint32_t)) { return -errno; } written_size += result; /* * 5. hash helps us verify header is not corrupted on file read */ hash = rb->shared_hdr->word_size + rb->shared_hdr->write_pt + rb->shared_hdr->read_pt + QB_RB_FILE_HEADER_VERSION; result = write(fd, &hash, sizeof(uint32_t)); if (result != sizeof(uint32_t)) { return -errno; } written_size += result; result = write(fd, rb->shared_data, rb->shared_hdr->word_size * sizeof(uint32_t)); if (result != rb->shared_hdr->word_size * sizeof(uint32_t)) { return -errno; } written_size += result; qb_util_log(LOG_DEBUG, " writing total of: %zd\n", written_size); return written_size; } qb_ringbuffer_t * qb_rb_create_from_file(int32_t fd, uint32_t flags) { ssize_t n_read; size_t n_required; size_t total_read = 0; uint32_t read_pt; uint32_t write_pt; struct qb_ringbuffer_s *rb; uint32_t word_size = 0; uint32_t version = 0; uint32_t hash = 0; uint32_t calculated_hash = 0; if (fd < 0) { return NULL; } /* * 1. word size */ n_required = sizeof(uint32_t); n_read = read(fd, &word_size, n_required); if (n_read != n_required) { qb_util_perror(LOG_ERR, "Unable to read blackbox file header"); return NULL; } total_read += n_read; /* * 2. 3. read & write pointers */ n_read = read(fd, &write_pt, sizeof(uint32_t)); assert(n_read == sizeof(uint32_t)); total_read += n_read; n_read = read(fd, &read_pt, sizeof(uint32_t)); assert(n_read == sizeof(uint32_t)); total_read += n_read; /* * 4. version */ n_required = sizeof(uint32_t); n_read = read(fd, &version, n_required); if (n_read != n_required) { qb_util_perror(LOG_ERR, "Unable to read blackbox file header"); return NULL; } total_read += n_read; /* * 5. Hash */ n_required = sizeof(uint32_t); n_read = read(fd, &hash, n_required); if (n_read != n_required) { qb_util_perror(LOG_ERR, "Unable to read blackbox file header"); return NULL; } total_read += n_read; calculated_hash = word_size + write_pt + read_pt + version; if (hash != calculated_hash) { qb_util_log(LOG_ERR, "Corrupt blackbox: File header hash (%d) does not match calculated hash (%d)", hash, calculated_hash); return NULL; } else if (version != QB_RB_FILE_HEADER_VERSION) { qb_util_log(LOG_ERR, "Wrong file header version. Expected %d got %d", QB_RB_FILE_HEADER_VERSION, version); return NULL; } /* * 6. data */ n_required = (word_size * sizeof(uint32_t)); rb = qb_rb_open("create_from_file", n_required, QB_RB_FLAG_CREATE | QB_RB_FLAG_NO_SEMAPHORE, 0); if (rb == NULL) { return NULL; } rb->shared_hdr->read_pt = read_pt; rb->shared_hdr->write_pt = write_pt; n_read = read(fd, rb->shared_data, n_required); if (n_read < 0) { qb_util_perror(LOG_ERR, "Unable to read blackbox file data"); goto cleanup_fail; } total_read += n_read; if (n_read != n_required) { qb_util_log(LOG_WARNING, "read %zd bytes, but expected %zu", n_read, n_required); goto cleanup_fail; } qb_util_log(LOG_DEBUG, "read total of: %zd", total_read); print_header(rb); return rb; cleanup_fail: qb_rb_close(rb); return NULL; } int32_t qb_rb_chown(struct qb_ringbuffer_s * rb, uid_t owner, gid_t group) { int32_t res; if (rb == NULL) { return -EINVAL; } res = chown(rb->shared_hdr->data_path, owner, group); if (res < 0 && errno != EPERM) { return -errno; } res = chown(rb->shared_hdr->hdr_path, owner, group); if (res < 0 && errno != EPERM) { return -errno; } return 0; } int32_t qb_rb_chmod(qb_ringbuffer_t * rb, mode_t mode) { int32_t res; if (rb == NULL) { return -EINVAL; } res = chmod(rb->shared_hdr->data_path, mode); if (res < 0) { return -errno; } res = chmod(rb->shared_hdr->hdr_path, mode); if (res < 0) { return -errno; } return 0; }