diff --git a/lib/ringbuffer.c b/lib/ringbuffer.c index 6a1ac86..42aa85a 100644 --- a/lib/ringbuffer.c +++ b/lib/ringbuffer.c @@ -1,814 +1,852 @@ /* * Copyright (C) 2010-2011 Red Hat, Inc. * * Author: Angus Salkeld * * This file is part of libqb. * * libqb is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation, either version 2.1 of the License, or * (at your option) any later version. * * libqb is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with libqb. If not, see . */ #include "ringbuffer_int.h" #include #include /* * #define CRAZY_DEBUG_PRINTFS 1 */ #ifdef CRAZY_DEBUG_PRINTFS #define DEBUG_PRINTF(format, args...) \ do { \ printf(format, ##args); \ } while(0) #else #define DEBUG_PRINTF(format, args...) #endif /* CRAZY_DEBUG_PRINTFS */ /* * move the write pointer to the next 128 byte boundary * write_pt goes in 4 bytes (sizeof(uint32_t)) * #define USE_CACHE_LINE_ALIGNMENT 1 */ #ifdef USE_CACHE_LINE_ALIGNMENT #define QB_CACHE_LINE_SIZE 128 #define QB_CACHE_LINE_WORDS (QB_CACHE_LINE_SIZE/sizeof(uint32_t)) #define idx_cache_line_step(idx) \ do { \ if (idx % QB_CACHE_LINE_WORDS) { \ idx += (QB_CACHE_LINE_WORDS - (idx % QB_CACHE_LINE_WORDS)); \ } \ if (idx > (rb->shared_hdr->word_size - 1)) { \ idx = ((idx) % (rb->shared_hdr->word_size)); \ } \ } while (0) #else #define QB_CACHE_LINE_SIZE 0 #define QB_CACHE_LINE_WORDS 0 #define idx_cache_line_step(idx) \ do { \ if (idx > (rb->shared_hdr->word_size - 1)) { \ idx = ((idx) % (rb->shared_hdr->word_size)); \ } \ } while (0) #endif /* the chunk header is two words * 1) the chunk data size * 2) the magic number */ #define QB_RB_CHUNK_HEADER_WORDS 2 #define QB_RB_CHUNK_HEADER_SIZE (sizeof(uint32_t) * QB_RB_CHUNK_HEADER_WORDS) /* * margin is the gap we leave when checking to see if we have enough * space for a new chunk. * So: * qb_rb_space_free() >= QB_RB_CHUNK_MARGIN + new data chunk * The extra word size is to allow for non word sized data chunks. * QB_CACHE_LINE_WORDS is to make sure we have space to align the * chunk. */ #define QB_RB_WORD_ALIGN 1 #define QB_RB_CHUNK_MARGIN (sizeof(uint32_t) * (QB_RB_CHUNK_HEADER_WORDS +\ QB_RB_WORD_ALIGN +\ QB_CACHE_LINE_WORDS)) #define QB_RB_CHUNK_MAGIC 0xA1A1A1A1 #define QB_RB_CHUNK_MAGIC_DEAD 0xD0D0D0D0 #define QB_RB_CHUNK_MAGIC_ALLOC 0xA110CED0 #define QB_RB_CHUNK_SIZE_GET(rb, pointer) \ rb->shared_data[pointer] #define QB_RB_CHUNK_MAGIC_GET(rb, pointer) \ rb->shared_data[(pointer + 1) % rb->shared_hdr->word_size] #define QB_RB_CHUNK_MAGIC_SET(rb, pointer, new_val) \ rb->shared_data[(pointer + 1) % rb->shared_hdr->word_size] = new_val; #define QB_RB_CHUNK_DATA_GET(rb, pointer) \ &rb->shared_data[(pointer + QB_RB_CHUNK_HEADER_WORDS) % rb->shared_hdr->word_size] #define QB_MAGIC_ASSERT(_ptr_) \ do { \ uint32_t chunk_magic = QB_RB_CHUNK_MAGIC_GET(rb, _ptr_); \ if (chunk_magic != QB_RB_CHUNK_MAGIC) print_header(rb); \ assert(chunk_magic == QB_RB_CHUNK_MAGIC); \ } while (0) #define idx_step(idx) \ do { \ if (idx > (rb->shared_hdr->word_size - 1)) { \ idx = ((idx) % (rb->shared_hdr->word_size)); \ } \ } while (0) static void print_header(struct qb_ringbuffer_s * rb); static void _rb_chunk_reclaim(struct qb_ringbuffer_s * rb); qb_ringbuffer_t * qb_rb_open(const char *name, size_t size, uint32_t flags, size_t shared_user_data_size) +{ + return qb_rb_open_2(name, size, flags, shared_user_data_size, NULL); +} + +qb_ringbuffer_t * +qb_rb_open_2(const char *name, size_t size, uint32_t flags, + size_t shared_user_data_size, + struct qb_rb_notifier *notifiers) { struct qb_ringbuffer_s *rb; size_t real_size; size_t shared_size; char path[PATH_MAX]; int32_t fd_hdr; int32_t fd_data; uint32_t file_flags = O_RDWR; char filename[PATH_MAX]; int32_t error = 0; void *shm_addr; long page_size = sysconf(_SC_PAGESIZE); #ifdef QB_FORCE_SHM_ALIGN page_size = QB_MAX(page_size, 16 * 1024); #endif /* QB_FORCE_SHM_ALIGN */ real_size = QB_ROUNDUP(size, page_size); shared_size = sizeof(struct qb_ringbuffer_shared_s) + shared_user_data_size; if (flags & QB_RB_FLAG_CREATE) { file_flags |= O_CREAT | O_TRUNC; } rb = calloc(1, sizeof(struct qb_ringbuffer_s)); if (rb == NULL) { return NULL; } /* * Create a shared_hdr memory segment for the header. */ snprintf(filename, PATH_MAX, "qb-%s-header", name); fd_hdr = qb_sys_mmap_file_open(path, filename, shared_size, file_flags); if (fd_hdr < 0) { error = fd_hdr; qb_util_log(LOG_ERR, "couldn't create file for mmap"); goto cleanup_hdr; } rb->shared_hdr = mmap(0, shared_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd_hdr, 0); if (rb->shared_hdr == MAP_FAILED) { error = -errno; qb_util_log(LOG_ERR, "couldn't create mmap for header"); goto cleanup_hdr; } qb_atomic_init(); rb->flags = flags; /* * create the semaphore */ if (flags & QB_RB_FLAG_CREATE) { rb->shared_data = NULL; /* rb->shared_hdr->word_size tracks data by ints and not bytes/chars. */ rb->shared_hdr->word_size = real_size / sizeof(uint32_t); rb->shared_hdr->write_pt = 0; rb->shared_hdr->read_pt = 0; (void)strlcpy(rb->shared_hdr->hdr_path, path, PATH_MAX); } - error = qb_rb_sem_create(rb, flags); + if (notifiers && notifiers->post_fn) { + error = 0; + memcpy(&rb->notifier, + notifiers, + sizeof(struct qb_rb_notifier)); + } else { + error = qb_rb_sem_create(rb, flags); + } if (error < 0) { - qb_util_perror(LOG_ERR, "couldn't get a semaphore"); + errno = -error; + qb_util_perror(LOG_ERR, "couldn't create a semaphore"); goto cleanup_hdr; } /* Create the shared_data memory segment for the actual ringbuffer. * They have to be separate. */ if (flags & QB_RB_FLAG_CREATE) { snprintf(filename, PATH_MAX, "qb-%s-data", name); fd_data = qb_sys_mmap_file_open(path, filename, real_size, file_flags); (void)strlcpy(rb->shared_hdr->data_path, path, PATH_MAX); } else { fd_data = qb_sys_mmap_file_open(path, rb->shared_hdr->data_path, real_size, file_flags); } if (fd_data < 0) { error = fd_data; qb_util_log(LOG_ERR, "couldn't create file for mmap"); goto cleanup_hdr; } qb_util_log(LOG_DEBUG, "shm size:%zd; real_size:%zd; rb->word_size:%d", size, real_size, rb->shared_hdr->word_size); error = qb_sys_circular_mmap(fd_data, &shm_addr, real_size); rb->shared_data = shm_addr; if (error != 0) { qb_util_log(LOG_ERR, "couldn't create circular mmap on %s", rb->shared_hdr->data_path); goto cleanup_data; } if (flags & QB_RB_FLAG_CREATE) { memset(rb->shared_data, 0, real_size); rb->shared_data[rb->shared_hdr->word_size] = 5; rb->shared_hdr->ref_count = 1; } else { qb_atomic_int_inc(&rb->shared_hdr->ref_count); } close(fd_hdr); close(fd_data); return rb; cleanup_data: close(fd_data); if (flags & QB_RB_FLAG_CREATE) { unlink(rb->shared_hdr->data_path); } cleanup_hdr: if (fd_hdr >= 0) { close(fd_hdr); } if (rb && (flags & QB_RB_FLAG_CREATE)) { unlink(rb->shared_hdr->hdr_path); - if (rb->sem_destroy_fn) { - (void)rb->sem_destroy_fn(rb); + if (rb->notifier.destroy_fn) { + (void)rb->notifier.destroy_fn(rb->notifier.instance); } } if (rb && (rb->shared_hdr != MAP_FAILED && rb->shared_hdr != NULL)) { munmap(rb->shared_hdr, sizeof(struct qb_ringbuffer_shared_s)); } free(rb); errno = -error; return NULL; } + void qb_rb_close(struct qb_ringbuffer_s * rb) { if (rb == NULL) { return; } + qb_enter(); (void)qb_atomic_int_dec_and_test(&rb->shared_hdr->ref_count); if (rb->flags & QB_RB_FLAG_CREATE) { - if (rb->sem_destroy_fn) { - (void)rb->sem_destroy_fn(rb); + if (rb->notifier.destroy_fn) { + (void)rb->notifier.destroy_fn(rb->notifier.instance); } unlink(rb->shared_hdr->data_path); unlink(rb->shared_hdr->hdr_path); qb_util_log(LOG_DEBUG, "Free'ing ringbuffer: %s", rb->shared_hdr->hdr_path); } else { qb_util_log(LOG_DEBUG, "Closing ringbuffer: %s", rb->shared_hdr->hdr_path); } munmap(rb->shared_data, (rb->shared_hdr->word_size * sizeof(uint32_t)) << 1); munmap(rb->shared_hdr, sizeof(struct qb_ringbuffer_shared_s)); free(rb); } void qb_rb_force_close(struct qb_ringbuffer_s * rb) { if (rb == NULL) { return; } + qb_enter(); - if (rb->sem_destroy_fn) { - (void)rb->sem_destroy_fn(rb); + if (rb->notifier.destroy_fn) { + (void)rb->notifier.destroy_fn(rb->notifier.instance); } unlink(rb->shared_hdr->data_path); unlink(rb->shared_hdr->hdr_path); qb_util_log(LOG_DEBUG, "Force free'ing ringbuffer: %s", rb->shared_hdr->hdr_path); munmap(rb->shared_data, (rb->shared_hdr->word_size * sizeof(uint32_t)) << 1); munmap(rb->shared_hdr, sizeof(struct qb_ringbuffer_shared_s)); free(rb); } char * qb_rb_name_get(struct qb_ringbuffer_s * rb) { if (rb == NULL) { return NULL; } return rb->shared_hdr->hdr_path; } void * qb_rb_shared_user_data_get(struct qb_ringbuffer_s * rb) { if (rb == NULL) { return NULL; } return rb->shared_hdr->user_data; } int32_t qb_rb_refcount_get(struct qb_ringbuffer_s * rb) { if (rb == NULL) { return -EINVAL; } return qb_atomic_int_get(&rb->shared_hdr->ref_count); } ssize_t qb_rb_space_free(struct qb_ringbuffer_s * rb) { uint32_t write_size; uint32_t read_size; size_t space_free = 0; if (rb == NULL) { return -EINVAL; } + if (rb->notifier.space_used_fn) { + return (rb->shared_hdr->word_size * sizeof(uint32_t)) - + rb->notifier.space_used_fn(rb->notifier.instance); + } write_size = rb->shared_hdr->write_pt; read_size = rb->shared_hdr->read_pt; if (write_size > read_size) { space_free = (read_size - write_size + rb->shared_hdr->word_size) - 1; } else if (write_size < read_size) { space_free = (read_size - write_size) - 1; } else { - if (rb->sem_getvalue_fn && rb->sem_getvalue_fn(rb) > 0) { + if (rb->notifier.q_len_fn && rb->notifier.q_len_fn(rb->notifier.instance) > 0) { space_free = 0; } else { space_free = rb->shared_hdr->word_size; } } /* word -> bytes */ return (space_free * sizeof(uint32_t)); } ssize_t qb_rb_space_used(struct qb_ringbuffer_s * rb) { uint32_t write_size; uint32_t read_size; size_t space_used; if (rb == NULL) { return -EINVAL; } + if (rb->notifier.space_used_fn) { + return rb->notifier.space_used_fn(rb->notifier.instance); + } write_size = rb->shared_hdr->write_pt; read_size = rb->shared_hdr->read_pt; if (write_size > read_size) { space_used = write_size - read_size; } else if (write_size < read_size) { space_used = (write_size - read_size + rb->shared_hdr->word_size) - 1; } else { space_used = 0; } /* word -> bytes */ return (space_used * sizeof(uint32_t)); } ssize_t qb_rb_chunks_used(struct qb_ringbuffer_s *rb) { if (rb == NULL) { return -EINVAL; } - if (rb->sem_getvalue_fn) { - return rb->sem_getvalue_fn(rb); - } else { - return -ENOTSUP; + if (rb->notifier.q_len_fn) { + return rb->notifier.q_len_fn(rb->notifier.instance); } + return -ENOTSUP; } void * qb_rb_chunk_alloc(struct qb_ringbuffer_s * rb, size_t len) { uint32_t write_pt; if (rb == NULL) { errno = EINVAL; return NULL; } /* * Reclaim data if we are over writing and we need space */ if (rb->flags & QB_RB_FLAG_OVERWRITE) { while (qb_rb_space_free(rb) < (len + QB_RB_CHUNK_MARGIN)) { _rb_chunk_reclaim(rb); } } else { if (qb_rb_space_free(rb) < (len + QB_RB_CHUNK_MARGIN)) { errno = EAGAIN; return NULL; } } write_pt = rb->shared_hdr->write_pt; /* * insert the chunk header */ rb->shared_data[write_pt] = 0; QB_RB_CHUNK_MAGIC_SET(rb, write_pt, QB_RB_CHUNK_MAGIC_ALLOC); /* * return a pointer to the beginning of the chunk data */ return (void *)QB_RB_CHUNK_DATA_GET(rb, write_pt); } static uint32_t qb_rb_chunk_step(struct qb_ringbuffer_s * rb, uint32_t pointer) { uint32_t chunk_size = QB_RB_CHUNK_SIZE_GET(rb, pointer); /* * skip over the chunk header */ pointer += QB_RB_CHUNK_HEADER_WORDS; /* * skip over the user's data. */ pointer += (chunk_size / sizeof(uint32_t)); /* make allowance for non-word sizes */ if ((chunk_size % (sizeof(uint32_t) * QB_RB_WORD_ALIGN)) != 0) { pointer++; } idx_cache_line_step(pointer); return pointer; } int32_t qb_rb_chunk_commit(struct qb_ringbuffer_s * rb, size_t len) { uint32_t old_write_pt; if (rb == NULL) { return -EINVAL; } /* * commit the magic & chunk_size */ old_write_pt = rb->shared_hdr->write_pt; rb->shared_data[old_write_pt] = len; /* * commit the new write pointer */ rb->shared_hdr->write_pt = qb_rb_chunk_step(rb, old_write_pt); QB_RB_CHUNK_MAGIC_SET(rb, old_write_pt, QB_RB_CHUNK_MAGIC); DEBUG_PRINTF("commit [%zd] read: %u, write: %u -> %u (%u)\n", - (rb->sem_getvalue_fn ? rb->sem_getvalue_fn(rb) : 0), + (rb->notifier.q_len_fn ? + rb->notifier.q_len_fn(rb->notifier.instance) : 0), rb->shared_hdr->read_pt, old_write_pt, rb->shared_hdr->write_pt, rb->shared_hdr->word_size); /* * post the notification to the reader */ - if (rb->sem_post_fn) { - return rb->sem_post_fn(rb); - } else { - return 0; + if (rb->notifier.post_fn) { + return rb->notifier.post_fn(rb->notifier.instance, len); } + return 0; } ssize_t qb_rb_chunk_write(struct qb_ringbuffer_s * rb, const void *data, size_t len) { char *dest = qb_rb_chunk_alloc(rb, len); int32_t res = 0; if (rb == NULL) { return -EINVAL; } if (dest == NULL) { return -errno; } memcpy(dest, data, len); res = qb_rb_chunk_commit(rb, len); if (res < 0) { return res; } return len; } static void _rb_chunk_reclaim(struct qb_ringbuffer_s * rb) { uint32_t old_read_pt; uint32_t new_read_pt; + uint32_t old_chunk_size; uint32_t chunk_magic; old_read_pt = rb->shared_hdr->read_pt; chunk_magic = QB_RB_CHUNK_MAGIC_GET(rb, old_read_pt); if (chunk_magic != QB_RB_CHUNK_MAGIC) { return; } + old_chunk_size = QB_RB_CHUNK_SIZE_GET(rb, old_read_pt); new_read_pt = qb_rb_chunk_step(rb, old_read_pt); /* * clear the header */ rb->shared_data[old_read_pt] = 0; QB_RB_CHUNK_MAGIC_SET(rb, old_read_pt, QB_RB_CHUNK_MAGIC_DEAD); /* * set the new read pointer after clearing the header * to prevent a situation where a fast writer will write their * new chunk between setting the new read pointer and clearing the * header. */ rb->shared_hdr->read_pt = new_read_pt; + if (rb->notifier.reclaim_fn) { + int rc = rb->notifier.reclaim_fn(rb->notifier.instance, + old_chunk_size); + if (rc < 0) { + errno = -rc; + qb_util_perror(LOG_WARNING, "reclaim_fn"); + } + } + DEBUG_PRINTF("reclaim [%zd]: read: %u -> %u, write: %u\n", - (rb->sem_getvalue_fn ? rb->sem_getvalue_fn(rb) : 0), + (rb->notifier.q_len_fn ? + rb->notifier.q_len_fn(rb->notifier.instance) : 0), old_read_pt, rb->shared_hdr->read_pt, rb->shared_hdr->write_pt); } void qb_rb_chunk_reclaim(struct qb_ringbuffer_s * rb) { if (rb == NULL) { return; } _rb_chunk_reclaim(rb); } ssize_t qb_rb_chunk_peek(struct qb_ringbuffer_s * rb, void **data_out, int32_t timeout) { uint32_t read_pt; uint32_t chunk_size; uint32_t chunk_magic; int32_t res = 0; if (rb == NULL) { return -EINVAL; } - if (rb->sem_timedwait_fn) { - res = rb->sem_timedwait_fn(rb, timeout); + if (rb->notifier.timedwait_fn) { + res = rb->notifier.timedwait_fn(rb->notifier.instance, timeout); } if (res < 0 && res != -EIDRM) { if (res == -ETIMEDOUT) { return 0; } else { errno = -res; qb_util_perror(LOG_ERR, "sem_timedwait"); } return res; } read_pt = rb->shared_hdr->read_pt; chunk_magic = QB_RB_CHUNK_MAGIC_GET(rb, read_pt); if (chunk_magic != QB_RB_CHUNK_MAGIC) { - if (rb->sem_post_fn) { - (void)rb->sem_post_fn(rb); + if (rb->notifier.post_fn) { + (void)rb->notifier.post_fn(rb->notifier.instance, res); } return 0; } chunk_size = QB_RB_CHUNK_SIZE_GET(rb, read_pt); *data_out = QB_RB_CHUNK_DATA_GET(rb, read_pt); return chunk_size; } ssize_t qb_rb_chunk_read(struct qb_ringbuffer_s * rb, void *data_out, size_t len, int32_t timeout) { uint32_t read_pt; uint32_t chunk_size; uint32_t chunk_magic; int32_t res = 0; if (rb == NULL) { return -EINVAL; } - if (rb->sem_timedwait_fn) { - res = rb->sem_timedwait_fn(rb, timeout); + if (rb->notifier.timedwait_fn) { + res = rb->notifier.timedwait_fn(rb->notifier.instance, timeout); } if (res < 0 && res != -EIDRM) { if (res != -ETIMEDOUT) { + errno = -res; qb_util_perror(LOG_ERR, "sem_timedwait"); } return res; } read_pt = rb->shared_hdr->read_pt; chunk_magic = QB_RB_CHUNK_MAGIC_GET(rb, read_pt); if (chunk_magic != QB_RB_CHUNK_MAGIC) { - if (rb->sem_timedwait_fn == NULL) { + if (rb->notifier.timedwait_fn == NULL) { return -ETIMEDOUT; } else { - (void)rb->sem_post_fn(rb); + (void)rb->notifier.post_fn(rb->notifier.instance, res); #ifdef EBADMSG return -EBADMSG; #else return -EINVAL; #endif } } chunk_size = QB_RB_CHUNK_SIZE_GET(rb, read_pt); if (len < chunk_size) { qb_util_log(LOG_ERR, "trying to recv chunk of size %d but %d available", len, chunk_size); - (void)rb->sem_post_fn(rb); + (void)rb->notifier.post_fn(rb->notifier.instance, chunk_size); return -ENOBUFS; } - ; + memcpy(data_out, QB_RB_CHUNK_DATA_GET(rb, read_pt), chunk_size); _rb_chunk_reclaim(rb); return chunk_size; } static void print_header(struct qb_ringbuffer_s * rb) { printf("Ringbuffer: \n"); if (rb->flags & QB_RB_FLAG_OVERWRITE) { printf(" ->OVERWRITE\n"); } else { printf(" ->NORMAL\n"); } printf(" ->write_pt [%d]\n", rb->shared_hdr->write_pt); printf(" ->read_pt [%d]\n", rb->shared_hdr->read_pt); printf(" ->size [%d words]\n", rb->shared_hdr->word_size); #ifndef S_SPLINT_S printf(" =>free [%zu bytes]\n", qb_rb_space_free(rb)); printf(" =>used [%zu bytes]\n", qb_rb_space_used(rb)); #endif /* S_SPLINT_S */ } ssize_t qb_rb_write_to_file(struct qb_ringbuffer_s * rb, int32_t fd) { ssize_t result; ssize_t written_size = 0; if (rb == NULL) { return -EINVAL; } print_header(rb); result = write(fd, &rb->shared_hdr->word_size, sizeof(uint32_t)); if (result != sizeof(uint32_t)) { return -errno; } written_size += result; /* * store the read & write pointers */ result = write(fd, (void *)&rb->shared_hdr->write_pt, sizeof(uint32_t)); if (result != sizeof(uint32_t)) { return -errno; } written_size += result; result = write(fd, (void *)&rb->shared_hdr->read_pt, sizeof(uint32_t)); if (result != sizeof(uint32_t)) { return -errno; } written_size += result; result = write(fd, rb->shared_data, rb->shared_hdr->word_size * sizeof(uint32_t)); if (result != rb->shared_hdr->word_size * sizeof(uint32_t)) { return -errno; } written_size += result; qb_util_log(LOG_DEBUG, " writing total of: %zd\n", written_size); return written_size; } qb_ringbuffer_t * qb_rb_create_from_file(int32_t fd, uint32_t flags) { ssize_t n_read; size_t n_required; size_t total_read = 0; uint32_t read_pt; uint32_t write_pt; struct qb_ringbuffer_s *rb; uint32_t word_size = 0; if (fd < 0) { return NULL; } n_required = sizeof(uint32_t); n_read = read(fd, &word_size, n_required); if (n_read != n_required) { qb_util_perror(LOG_ERR, "Unable to read blackbox file header"); return NULL; } total_read += n_read; n_read = read(fd, &write_pt, sizeof(uint32_t)); assert(n_read == sizeof(uint32_t)); total_read += n_read; n_read = read(fd, &read_pt, sizeof(uint32_t)); assert(n_read == sizeof(uint32_t)); total_read += n_read; n_required = (word_size * sizeof(uint32_t)); rb = qb_rb_open("create_from_file", n_required, QB_RB_FLAG_CREATE | QB_RB_FLAG_NO_SEMAPHORE, 0); if (rb == NULL) { return NULL; } rb->shared_hdr->read_pt = read_pt; rb->shared_hdr->write_pt = write_pt; n_read = read(fd, rb->shared_data, n_required); if (n_read < 0) { qb_util_perror(LOG_ERR, "Unable to read blackbox file data"); goto cleanup_fail; } total_read += n_read; if (n_read != n_required) { qb_util_log(LOG_WARNING, "read %zd bytes, but expected %zu", n_read, n_required); goto cleanup_fail; } qb_util_log(LOG_DEBUG, "read total of: %zd", total_read); print_header(rb); return rb; cleanup_fail: qb_rb_close(rb); return NULL; } int32_t qb_rb_chown(struct qb_ringbuffer_s * rb, uid_t owner, gid_t group) { int32_t res; if (rb == NULL) { return -EINVAL; } res = chown(rb->shared_hdr->data_path, owner, group); if (res < 0 && errno != EPERM) { return -errno; } res = chown(rb->shared_hdr->hdr_path, owner, group); if (res < 0 && errno != EPERM) { return -errno; } return 0; } int32_t qb_rb_chmod(qb_ringbuffer_t * rb, mode_t mode) { int32_t res; if (rb == NULL) { return -EINVAL; } res = chmod(rb->shared_hdr->data_path, mode); if (res < 0) { return -errno; } res = chmod(rb->shared_hdr->hdr_path, mode); if (res < 0) { return -errno; } return 0; } diff --git a/lib/ringbuffer_helper.c b/lib/ringbuffer_helper.c index d0c8575..455fd99 100644 --- a/lib/ringbuffer_helper.c +++ b/lib/ringbuffer_helper.c @@ -1,291 +1,308 @@ /* * Copyright (C) 2010 Red Hat, Inc. * * Author: Angus Salkeld * * This file is part of libqb. * * libqb is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation, either version 2.1 of the License, or * (at your option) any later version. * * libqb is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with libqb. If not, see . */ #include "ringbuffer_int.h" #include static int32_t -my_posix_sem_timedwait(qb_ringbuffer_t * rb, int32_t ms_timeout) +my_posix_sem_timedwait(void * instance, int32_t ms_timeout) { + struct qb_ringbuffer_s *rb = (struct qb_ringbuffer_s *)instance; struct timespec ts_timeout; int32_t res; if (ms_timeout > 0) { qb_util_timespec_from_epoch_get(&ts_timeout); qb_timespec_add_ms(&ts_timeout, ms_timeout); } sem_wait_again: if (ms_timeout > 0) { res = rpl_sem_timedwait(&rb->shared_hdr->posix_sem, &ts_timeout); } else if (ms_timeout == 0) { res = rpl_sem_trywait(&rb->shared_hdr->posix_sem); } else { res = rpl_sem_wait(&rb->shared_hdr->posix_sem); } if (res == -1) { switch (errno) { case EINTR: goto sem_wait_again; break; case EAGAIN: res = -ETIMEDOUT; break; case ETIMEDOUT: res = -errno; break; default: res = -errno; qb_util_perror(LOG_ERR, "error waiting for semaphore"); break; } } return res; } static int32_t -my_posix_sem_post(qb_ringbuffer_t * rb) +my_posix_sem_post(void * instance, size_t msg_size) { + struct qb_ringbuffer_s *rb = (struct qb_ringbuffer_s *)instance; if (rpl_sem_post(&rb->shared_hdr->posix_sem) < 0) { return -errno; } else { return 0; } } static ssize_t -my_posix_getvalue_fn(struct qb_ringbuffer_s *rb) +my_posix_getvalue_fn(void * instance) { + struct qb_ringbuffer_s *rb = (struct qb_ringbuffer_s *)instance; int val; if (rpl_sem_getvalue(&rb->shared_hdr->posix_sem, &val) < 0) { return -errno; } else { return val; } } static int32_t -my_posix_sem_destroy(qb_ringbuffer_t * rb) +my_posix_sem_destroy(void * instance) { + struct qb_ringbuffer_s *rb = (struct qb_ringbuffer_s *)instance; + qb_enter(); if (rpl_sem_destroy(&rb->shared_hdr->posix_sem) == -1) { return -errno; } else { return 0; } } static int32_t -my_posix_sem_create(struct qb_ringbuffer_s *rb, uint32_t flags) +my_posix_sem_create(void * instance, uint32_t flags) { + struct qb_ringbuffer_s *rb = (struct qb_ringbuffer_s *)instance; int32_t pshared = QB_FALSE; if (flags & QB_RB_FLAG_SHARED_PROCESS) { if ((flags & QB_RB_FLAG_CREATE) == 0) { return 0; } pshared = QB_TRUE; } if (rpl_sem_init(&rb->shared_hdr->posix_sem, pshared, 0) == -1) { return -errno; } else { return 0; } } static int32_t -my_sysv_sem_timedwait(qb_ringbuffer_t * rb, int32_t ms_timeout) +my_sysv_sem_timedwait(void * instance, int32_t ms_timeout) { + struct qb_ringbuffer_s *rb = (struct qb_ringbuffer_s *)instance; struct sembuf sops[1]; int32_t res = 0; #ifdef HAVE_SEMTIMEDOP struct timespec ts_timeout; struct timespec *ts_pt; if (ms_timeout >= 0) { /* * Note: sem_timedwait takes an absolute time where as semtimedop * takes a relative time. */ ts_timeout.tv_sec = 0; ts_timeout.tv_nsec = 0; qb_timespec_add_ms(&ts_timeout, ms_timeout); ts_pt = &ts_timeout; } else { ts_pt = NULL; } #endif /* HAVE_SEMTIMEDOP */ /* * wait for sem post. */ sops[0].sem_num = 0; sops[0].sem_op = -1; #ifdef HAVE_SEMTIMEDOP sops[0].sem_flg = 0; #else sops[0].sem_flg = IPC_NOWAIT; #endif /* HAVE_SEMTIMEDOP */ semop_again: #ifdef HAVE_SEMTIMEDOP if (semtimedop(rb->sem_id, sops, 1, ts_pt) == -1) #else if (semop(rb->sem_id, sops, 1) == -1) #endif /* HAVE_SEMTIMEDOP */ { if (errno == EINTR) { goto semop_again; } else if (errno == EAGAIN) { /* make consistent with sem_timedwait */ res = -ETIMEDOUT; } else { res = -errno; qb_util_perror(LOG_ERR, "error waiting for semaphore"); } return res; } return 0; } static int32_t -my_sysv_sem_post(qb_ringbuffer_t * rb) +my_sysv_sem_post(void * instance, size_t msg_size) { + struct qb_ringbuffer_s *rb = (struct qb_ringbuffer_s *)instance; struct sembuf sops[1]; if ((rb->flags & QB_RB_FLAG_SHARED_PROCESS) == 0) { return 0; } sops[0].sem_num = 0; sops[0].sem_op = 1; sops[0].sem_flg = 0; semop_again: if (semop(rb->sem_id, sops, 1) == -1) { if (errno == EINTR) { goto semop_again; } else { qb_util_perror(LOG_ERR, "could not increment semaphore"); } return -errno; } return 0; } static ssize_t -my_sysv_getvalue_fn(struct qb_ringbuffer_s *rb) +my_sysv_getvalue_fn(void * instance) { + struct qb_ringbuffer_s *rb = (struct qb_ringbuffer_s *)instance; ssize_t res = semctl(rb->sem_id, 0, GETVAL, 0); if (res == -1) { return -errno; } return res; } static int32_t -my_sysv_sem_destroy(qb_ringbuffer_t * rb) +my_sysv_sem_destroy(void * instance) { + struct qb_ringbuffer_s *rb = (struct qb_ringbuffer_s *)instance; if (semctl(rb->sem_id, 0, IPC_RMID, 0) == -1) { return -errno; } else { return 0; } } static int32_t -my_sysv_sem_create(qb_ringbuffer_t * rb, uint32_t flags) +my_sysv_sem_create(void * instance, uint32_t flags) { + struct qb_ringbuffer_s *rb = (struct qb_ringbuffer_s *)instance; union semun options; int32_t res; key_t sem_key; sem_key = ftok(rb->shared_hdr->hdr_path, (rb->shared_hdr->word_size + 1)); if (sem_key == -1) { res = -errno; qb_util_perror(LOG_ERR, "couldn't get a sem id"); return res; } if (flags & QB_RB_FLAG_CREATE) { rb->sem_id = semget(sem_key, 1, IPC_CREAT | IPC_EXCL | 0600); if (rb->sem_id == -1) { res = -errno; qb_util_perror(LOG_ERR, "couldn't create a semaphore"); return res; } options.val = 0; res = semctl(rb->sem_id, 0, SETVAL, options); } else { rb->sem_id = semget(sem_key, 0, 0600); if (rb->sem_id == -1) { res = -errno; qb_util_perror(LOG_ERR, "couldn't get a sem id"); return res; } res = 0; } qb_util_log(LOG_DEBUG, "sem key:%d, id:%d, value:%d", (int)sem_key, rb->sem_id, semctl(rb->sem_id, 0, GETVAL, 0)); return res; } int32_t qb_rb_sem_create(struct qb_ringbuffer_s * rb, uint32_t flags) { int32_t rc; int32_t use_posix = QB_TRUE; if ((flags & QB_RB_FLAG_SHARED_PROCESS) && !(flags & QB_RB_FLAG_NO_SEMAPHORE)) { #if defined(HAVE_POSIX_PSHARED_SEMAPHORE) || \ defined(HAVE_RPL_PSHARED_SEMAPHORE) use_posix = QB_TRUE; #else #ifdef HAVE_SYSV_PSHARED_SEMAPHORE use_posix = QB_FALSE; #else return -ENOTSUP; #endif /* HAVE_SYSV_PSHARED_SEMAPHORE */ #endif /* HAVE_POSIX_PSHARED_SEMAPHORE */ } if (flags & QB_RB_FLAG_NO_SEMAPHORE) { rc = 0; - rb->sem_timedwait_fn = NULL; - rb->sem_post_fn = NULL; - rb->sem_getvalue_fn = NULL; - rb->sem_destroy_fn = NULL; + rb->notifier.instance = NULL; + rb->notifier.timedwait_fn = NULL; + rb->notifier.post_fn = NULL; + rb->notifier.q_len_fn = NULL; + rb->notifier.space_used_fn = NULL; + rb->notifier.destroy_fn = NULL; } else if (use_posix) { rc = my_posix_sem_create(rb, flags); - rb->sem_timedwait_fn = my_posix_sem_timedwait; - rb->sem_post_fn = my_posix_sem_post; - rb->sem_getvalue_fn = my_posix_getvalue_fn; - rb->sem_destroy_fn = my_posix_sem_destroy; + rb->notifier.instance = rb; + rb->notifier.timedwait_fn = my_posix_sem_timedwait; + rb->notifier.post_fn = my_posix_sem_post; + rb->notifier.q_len_fn = my_posix_getvalue_fn; + rb->notifier.space_used_fn = NULL; + rb->notifier.destroy_fn = my_posix_sem_destroy; } else { rc = my_sysv_sem_create(rb, flags); - rb->sem_timedwait_fn = my_sysv_sem_timedwait; - rb->sem_post_fn = my_sysv_sem_post; - rb->sem_getvalue_fn = my_sysv_getvalue_fn; - rb->sem_destroy_fn = my_sysv_sem_destroy; + rb->notifier.instance = rb; + rb->notifier.timedwait_fn = my_sysv_sem_timedwait; + rb->notifier.post_fn = my_sysv_sem_post; + rb->notifier.q_len_fn = my_sysv_getvalue_fn; + rb->notifier.space_used_fn = NULL; + rb->notifier.destroy_fn = my_sysv_sem_destroy; } return rc; } diff --git a/lib/ringbuffer_int.h b/lib/ringbuffer_int.h index dfdee17..f76593e 100644 --- a/lib/ringbuffer_int.h +++ b/lib/ringbuffer_int.h @@ -1,84 +1,98 @@ /* * Copyright (C) 2010 Red Hat, Inc. * * Author: Angus Salkeld * * This file is part of libqb. * * libqb is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation, either version 2.1 of the License, or * (at your option) any later version. * * libqb is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with libqb. If not, see . */ #ifndef _RINGBUFFER_H_ #define _RINGBUFFER_H_ #include "os_base.h" #ifdef HAVE_SYS_MMAN_H #include #endif /* HAVE_SYS_MMAN_H */ #ifdef HAVE_SYS_SEM_H #include #endif #ifdef HAVE_SYS_IPC_H #include #endif #include "rpl_sem.h" #include "util_int.h" #include #include struct qb_ringbuffer_s; int32_t qb_rb_sem_create(struct qb_ringbuffer_s *rb, uint32_t flags); -typedef int32_t(*qb_rb_sem_post_fn_t) (struct qb_ringbuffer_s * rb); -typedef ssize_t(*qb_rb_sem_getvalue_fn_t) (struct qb_ringbuffer_s * rb); -typedef int32_t(*qb_rb_sem_timedwait_fn_t) (struct qb_ringbuffer_s * rb, - int32_t ms_timeout); -typedef int32_t(*qb_rb_sem_destroy_fn_t) (struct qb_ringbuffer_s * rb); + +typedef int32_t(*qb_rb_notifier_post_fn_t) (void * instance, size_t msg_size); +typedef ssize_t(*qb_rb_notifier_q_len_fn_t) (void * instance); +typedef ssize_t(*qb_rb_notifier_used_fn_t) (void * instance); +typedef int32_t(*qb_rb_notifier_timedwait_fn_t) (void * instance, + int32_t ms_timeout); +typedef int32_t(*qb_rb_notifier_reclaim_fn_t) (void * instance, size_t msg_size); +typedef int32_t(*qb_rb_notifier_destroy_fn_t) (void * instance); + +struct qb_rb_notifier { + qb_rb_notifier_post_fn_t post_fn; + qb_rb_notifier_q_len_fn_t q_len_fn; + qb_rb_notifier_used_fn_t space_used_fn; + qb_rb_notifier_timedwait_fn_t timedwait_fn; + qb_rb_notifier_reclaim_fn_t reclaim_fn; + qb_rb_notifier_destroy_fn_t destroy_fn; + void *instance; +}; struct qb_ringbuffer_shared_s { volatile uint32_t write_pt; volatile uint32_t read_pt; uint32_t word_size; char hdr_path[PATH_MAX]; char data_path[PATH_MAX]; int32_t ref_count; rpl_sem_t posix_sem; char user_data[1]; } __attribute__ ((aligned(8))); struct qb_ringbuffer_s { uint32_t flags; int32_t sem_id; struct qb_ringbuffer_shared_s *shared_hdr; uint32_t *shared_data; - qb_rb_sem_post_fn_t sem_post_fn; - qb_rb_sem_getvalue_fn_t sem_getvalue_fn; - qb_rb_sem_timedwait_fn_t sem_timedwait_fn; - qb_rb_sem_destroy_fn_t sem_destroy_fn; + struct qb_rb_notifier notifier; }; - void qb_rb_force_close(qb_ringbuffer_t * rb); +qb_ringbuffer_t *qb_rb_open_2(const char *name, size_t size, uint32_t flags, + size_t shared_user_data_size, + struct qb_rb_notifier *notifier); + + #ifndef HAVE_SEMUN union semun { int32_t val; struct semid_ds *buf; unsigned short int *array; struct seminfo *__buf; }; #endif /* HAVE_SEMUN */ #endif /* _RINGBUFFER_H_ */