diff --git a/examples/ipcclient.c b/examples/ipcclient.c index f0be5c9..cd1cbcb 100644 --- a/examples/ipcclient.c +++ b/examples/ipcclient.c @@ -1,237 +1,241 @@ /* * Copyright (c) 2011 Red Hat, Inc. * * All rights reserved. * * Author: Angus Salkeld * * libqb is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation, either version 2.1 of the License, or * (at your option) any later version. * * libqb is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with libqb. If not, see . */ #include "os_base.h" #include #include #include #include #include static int32_t do_benchmark = QB_FALSE; static int32_t use_events = QB_FALSE; static int alarm_notice; static qb_util_stopwatch_t *sw; #define ONE_MEG 1048576 -#define MAX_MSG_SIZE ONE_MEG -static char data[ONE_MEG]; +static char *data; struct my_req { struct qb_ipc_request_header hdr; char message[256]; }; struct my_res { struct qb_ipc_response_header hdr; char message[256]; }; static void sigalrm_handler (int num) { alarm_notice = 1; } static void _benchmark(qb_ipcc_connection_t *conn, int write_size) { struct iovec iov[2]; ssize_t res; struct qb_ipc_request_header hdr; int write_count = 0; float secs; alarm_notice = 0; hdr.size = write_size; hdr.id = QB_IPC_MSG_USER_START + 1; iov[0].iov_base = &hdr; iov[0].iov_len = sizeof(struct qb_ipc_request_header); iov[1].iov_base = data; iov[1].iov_len = write_size - sizeof(struct qb_ipc_request_header); alarm (10); qb_util_stopwatch_start(sw); do { res = qb_ipcc_sendv(conn, iov, 2); if (res == write_size) { write_count++; } } while (alarm_notice == 0 && (res == write_size || res == -EAGAIN)); if (res < 0) { perror("qb_ipcc_sendv"); } qb_util_stopwatch_stop(sw); secs = qb_util_stopwatch_sec_elapsed_get(sw); printf ("%5d messages sent ", write_count); printf ("%5d bytes per write ", write_size); printf ("%7.3f Seconds runtime ", secs); printf ("%9.3f TP/s ", ((float)write_count) / secs); printf ("%7.3f MB/s.\n", ((float)write_count) * ((float)write_size) / secs); } static void do_throughput_benchmark(qb_ipcc_connection_t *conn) { ssize_t size = 64; int i; signal (SIGALRM, sigalrm_handler); sw = qb_util_stopwatch_create(); for (i = 0; i < 10; i++) { /* number of repetitions - up to 50k */ _benchmark (conn, size); signal (SIGALRM, sigalrm_handler); size *= 5; if (size >= (ONE_MEG - 100)) { break; } } } static void do_echo(qb_ipcc_connection_t *conn) { struct my_req req; struct my_res res; char *newline; int32_t rc; int32_t send_ten_events; while (1) { printf("SEND (q or Q to quit) : "); if (fgets(req.message, 256, stdin) == NULL) { continue; } newline = strrchr(req.message, '\n'); if (newline) { *newline = '\0'; } if (strcasecmp(req.message, "q") == 0) { break; } else { req.hdr.id = QB_IPC_MSG_USER_START + 3; req.hdr.size = sizeof(struct my_req); rc = qb_ipcc_send(conn, &req, req.hdr.size); if (rc < 0) { perror("qb_ipcc_send"); exit(0); } } send_ten_events = (strcasecmp(req.message, "events") == 0); if (rc > 0) { if (use_events && !send_ten_events) { printf("waiting for event recv\n"); rc = qb_ipcc_event_recv(conn, &res, sizeof(res), -1); } else { printf("waiting for recv\n"); rc = qb_ipcc_recv(conn, &res, sizeof(res), -1); } printf("recv %d\n", rc); if (rc < 0) { perror("qb_ipcc_recv"); exit(0); } if (send_ten_events) { int32_t i; printf("waiting for 10 events\n"); for (i = 0; i < 10; i++) { rc = qb_ipcc_event_recv(conn, &res, sizeof(res), -1); if (rc < 0) { perror("qb_ipcc_event_recv"); } else { printf("got event %d rc:%d\n", i, rc); } } } printf("Response[%d]: %s \n", res.hdr.id, res.message); } } } static void show_usage(const char *name) { printf("usage: \n"); printf("%s \n", name); printf("\n"); printf(" options:\n"); printf("\n"); printf(" -h show this help text\n"); printf(" -b benchmark\n"); printf(" -e use events instead of responses\n"); printf("\n"); } int main(int argc, char *argv[]) { qb_ipcc_connection_t *conn; const char *options = "eb"; int32_t opt; while ((opt = getopt(argc, argv, options)) != -1) { switch (opt) { case 'b': do_benchmark = QB_TRUE; break; case 'e': use_events = QB_TRUE; break; case 'h': default: show_usage(argv[0]); exit(0); break; } } qb_log_init("ipcclient", LOG_USER, LOG_TRACE); qb_log_ctl(QB_LOG_SYSLOG, QB_LOG_CONF_ENABLED, QB_FALSE); qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD, QB_LOG_FILTER_FILE, "*", LOG_TRACE); qb_log_format_set(QB_LOG_STDERR, "%f:%l [%p] %b"); qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE); - conn = qb_ipcc_connect("ipcserver", MAX_MSG_SIZE); + /* Our example server is enforcing a buffer size minimum, + * so the client does not need to be concerned with setting + * the buffer size */ + conn = qb_ipcc_connect("ipcserver", 0); if (conn == NULL) { perror("qb_ipcc_connect"); exit(1); } + data = calloc(1, qb_ipcc_get_buffer_size(conn)); if (do_benchmark) { do_throughput_benchmark(conn); } else { do_echo(conn); } qb_ipcc_disconnect(conn); + free(data); return EXIT_SUCCESS; } diff --git a/examples/ipcserver.c b/examples/ipcserver.c index c1f912a..356c038 100644 --- a/examples/ipcserver.c +++ b/examples/ipcserver.c @@ -1,384 +1,389 @@ /* * Copyright (c) 2011 Red Hat, Inc. * * All rights reserved. * * Author: Angus Salkeld * * libqb is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation, either version 2.1 of the License, or * (at your option) any later version. * * libqb is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with libqb. If not, see . */ #include "os_base.h" #include #include #include #include #include #include #ifdef HAVE_GLIB #include static GMainLoop *glib_loop; static qb_array_t *gio_map; #endif /* HAVE_GLIB */ +#define ONE_MEG 1048576 + static int32_t use_glib = QB_FALSE; static int32_t use_events = QB_FALSE; static qb_loop_t *bms_loop; static qb_ipcs_service_t *s1; static int32_t s1_connection_accept_fn(qb_ipcs_connection_t * c, uid_t uid, gid_t gid) { #if 0 if (uid == 0 && gid == 0) { qb_log(LOG_INFO, "Authenticated connection"); return 1; } qb_log(LOG_NOTICE, "BAD user!"); return 0; #else return 0; #endif } static void s1_connection_created_fn(qb_ipcs_connection_t * c) { struct qb_ipcs_stats srv_stats; qb_ipcs_stats_get(s1, &srv_stats, QB_FALSE); qb_log(LOG_INFO, "Connection created (active:%d, closed:%d)", srv_stats.active_connections, srv_stats.closed_connections); } static void s1_connection_destroyed_fn(qb_ipcs_connection_t * c) { qb_log(LOG_INFO, "Connection about to be freed"); } static int32_t s1_connection_closed_fn(qb_ipcs_connection_t * c) { struct qb_ipcs_connection_stats stats; struct qb_ipcs_stats srv_stats; qb_ipcs_stats_get(s1, &srv_stats, QB_FALSE); qb_ipcs_connection_stats_get(c, &stats, QB_FALSE); qb_log(LOG_INFO, "Connection to pid:%d destroyed (active:%d, closed:%d)", stats.client_pid, srv_stats.active_connections, srv_stats.closed_connections); qb_log(LOG_DEBUG, " Requests %"PRIu64"", stats.requests); qb_log(LOG_DEBUG, " Responses %"PRIu64"", stats.responses); qb_log(LOG_DEBUG, " Events %"PRIu64"", stats.events); qb_log(LOG_DEBUG, " Send retries %"PRIu64"", stats.send_retries); qb_log(LOG_DEBUG, " Recv retries %"PRIu64"", stats.recv_retries); qb_log(LOG_DEBUG, " FC state %d", stats.flow_control_state); qb_log(LOG_DEBUG, " FC count %"PRIu64"", stats.flow_control_count); return 0; } struct my_req { struct qb_ipc_request_header hdr; char message[256]; }; static int32_t s1_msg_process_fn(qb_ipcs_connection_t * c, void *data, size_t size) { struct qb_ipc_request_header *hdr; struct my_req *req_pt; struct qb_ipc_response_header response; ssize_t res; struct iovec iov[2]; char resp[100]; int32_t sl; int32_t send_ten_events = QB_FALSE; hdr = (struct qb_ipc_request_header *)data; if (hdr->id == (QB_IPC_MSG_USER_START + 1)) { return 0; } req_pt = (struct my_req *)data; qb_log(LOG_DEBUG, "msg received (id:%d, size:%d, data:%s)", req_pt->hdr.id, req_pt->hdr.size, req_pt->message); if (strcmp(req_pt->message, "kill") == 0) { exit(0); } response.size = sizeof(struct qb_ipc_response_header); response.id = 13; response.error = 0; sl = snprintf(resp, 100, "ACK %zd bytes", size) + 1; iov[0].iov_len = sizeof(response); iov[0].iov_base = &response; iov[1].iov_len = sl; iov[1].iov_base = resp; response.size += sl; send_ten_events = (strcmp(req_pt->message, "events") == 0); if (use_events && !send_ten_events) { res = qb_ipcs_event_sendv(c, iov, 2); } else { res = qb_ipcs_response_sendv(c, iov, 2); } if (res < 0) { errno = - res; qb_perror(LOG_ERR, "qb_ipcs_response_send"); } if (send_ten_events) { int32_t i; qb_log(LOG_INFO, "request to send 10 events"); for (i = 0; i < 10; i++) { res = qb_ipcs_event_sendv(c, iov, 2); qb_log(LOG_INFO, "sent event %d res:%d", i, res); } } return 0; } static void sigusr1_handler(int32_t num) { qb_log(LOG_DEBUG, "(%d)", num); qb_ipcs_destroy(s1); exit(0); } static void show_usage(const char *name) { printf("usage: \n"); printf("%s \n", name); printf("\n"); printf(" options:\n"); printf("\n"); printf(" -h show this help text\n"); printf(" -m use shared memory\n"); printf(" -u use unix sockets\n"); printf(" -g use glib mainloop\n"); printf(" -e use events\n"); printf("\n"); } #ifdef HAVE_GLIB struct gio_to_qb_poll { gboolean is_used; int32_t events; int32_t source; int32_t fd; void *data; qb_ipcs_dispatch_fn_t fn; enum qb_loop_priority p; }; static gboolean gio_read_socket(GIOChannel * gio, GIOCondition condition, gpointer data) { struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data; gint fd = g_io_channel_unix_get_fd(gio); return (adaptor->fn(fd, condition, adaptor->data) == 0); } static void gio_poll_destroy(gpointer data) { struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data; qb_log(LOG_DEBUG, "fd %d adaptor destroyed\n", adaptor->fd); adaptor->is_used = QB_FALSE; adaptor->fd = 0; } static int32_t my_g_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts, void *data, qb_ipcs_dispatch_fn_t fn) { struct gio_to_qb_poll *adaptor; GIOChannel *channel; int32_t res = 0; res = qb_array_index(gio_map, fd, (void **)&adaptor); if (res < 0) { return res; } if (adaptor->is_used) { return -EEXIST; } channel = g_io_channel_unix_new(fd); if (!channel) { return -ENOMEM; } adaptor->fn = fn; adaptor->events = evts; adaptor->data = data; adaptor->p = p; adaptor->is_used = TRUE; adaptor->fd = fd; adaptor->source = g_io_add_watch_full(channel, G_PRIORITY_DEFAULT, evts, gio_read_socket, adaptor, gio_poll_destroy); /* we are handing the channel off to be managed by mainloop now. * remove our reference. */ g_io_channel_unref(channel); return 0; } static int32_t my_g_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t evts, void *data, qb_ipcs_dispatch_fn_t fn) { return 0; } static int32_t my_g_dispatch_del(int32_t fd) { struct gio_to_qb_poll *adaptor; if (qb_array_index(gio_map, fd, (void **)&adaptor) == 0) { g_source_remove(adaptor->source); adaptor->is_used = FALSE; } return 0; } #endif /* HAVE_GLIB */ static int32_t my_job_add(enum qb_loop_priority p, void *data, qb_loop_job_dispatch_fn fn) { return qb_loop_job_add(bms_loop, p, data, fn); } static int32_t my_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts, void *data, qb_ipcs_dispatch_fn_t fn) { return qb_loop_poll_add(bms_loop, p, fd, evts, data, fn); } static int32_t my_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t evts, void *data, qb_ipcs_dispatch_fn_t fn) { return qb_loop_poll_mod(bms_loop, p, fd, evts, data, fn); } static int32_t my_dispatch_del(int32_t fd) { return qb_loop_poll_del(bms_loop, fd); } int32_t main(int32_t argc, char *argv[]) { const char *options = "mpseugh"; int32_t opt; int32_t rc; enum qb_ipc_type ipc_type = QB_IPC_NATIVE; struct qb_ipcs_service_handlers sh = { .connection_accept = s1_connection_accept_fn, .connection_created = s1_connection_created_fn, .msg_process = s1_msg_process_fn, .connection_destroyed = s1_connection_destroyed_fn, .connection_closed = s1_connection_closed_fn, }; struct qb_ipcs_poll_handlers ph = { .job_add = my_job_add, .dispatch_add = my_dispatch_add, .dispatch_mod = my_dispatch_mod, .dispatch_del = my_dispatch_del, }; #ifdef HAVE_GLIB struct qb_ipcs_poll_handlers glib_ph = { .job_add = NULL, /* FIXME */ .dispatch_add = my_g_dispatch_add, .dispatch_mod = my_g_dispatch_mod, .dispatch_del = my_g_dispatch_del, }; #endif /* HAVE_GLIB */ while ((opt = getopt(argc, argv, options)) != -1) { switch (opt) { case 'm': ipc_type = QB_IPC_SHM; break; case 'u': ipc_type = QB_IPC_SOCKET; break; case 'g': use_glib = QB_TRUE; break; case 'e': use_events = QB_TRUE; break; case 'h': default: show_usage(argv[0]); exit(0); break; } } signal(SIGINT, sigusr1_handler); qb_log_init("ipcserver", LOG_USER, LOG_TRACE); qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD, QB_LOG_FILTER_FILE, "*", LOG_TRACE); qb_log_format_set(QB_LOG_STDERR, "%f:%l [%p] %b"); qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE); s1 = qb_ipcs_create("ipcserver", 0, ipc_type, &sh); if (s1 == 0) { qb_perror(LOG_ERR, "qb_ipcs_create"); exit(1); } + /* This forces the clients to use a minimum buffer size */ + qb_ipcs_enforce_buffer_size(s1, ONE_MEG); + if (!use_glib) { bms_loop = qb_loop_create(); qb_ipcs_poll_handlers_set(s1, &ph); rc = qb_ipcs_run(s1); if (rc != 0) { errno = -rc; qb_perror(LOG_ERR, "qb_ipcs_run"); exit(1); } qb_loop_run(bms_loop); } else { #ifdef HAVE_GLIB glib_loop = g_main_loop_new(NULL, FALSE); gio_map = qb_array_create_2(16, sizeof(struct gio_to_qb_poll), 1); qb_ipcs_poll_handlers_set(s1, &glib_ph); rc = qb_ipcs_run(s1); if (rc != 0) { errno = -rc; qb_perror(LOG_ERR, "qb_ipcs_run"); exit(1); } g_main_loop_run(glib_loop); #else qb_log(LOG_ERR, "You don't seem to have glib-devel installed.\n"); #endif } return EXIT_SUCCESS; }