diff --git a/exec/ipc_glue.c b/exec/ipc_glue.c index fa59a986..a58c8e76 100644 --- a/exec/ipc_glue.c +++ b/exec/ipc_glue.c @@ -1,841 +1,846 @@ /* * Copyright (c) 2010-2012 Red Hat, Inc. * * All rights reserved. * * Author: Angus Salkeld * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of Red Hat, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "sync.h" #include "timer.h" #include "main.h" #include "util.h" #include "apidef.h" #include "service.h" LOGSYS_DECLARE_SUBSYS ("MAIN"); static struct corosync_api_v1 *api = NULL; static int32_t ipc_not_enough_fds_left = 0; static int32_t ipc_fc_is_quorate; /* boolean */ static int32_t ipc_fc_totem_queue_level; /* percentage used */ static int32_t ipc_fc_sync_in_process; /* boolean */ static int32_t ipc_allow_connections = 0; /* boolean */ struct cs_ipcs_mapper { int32_t id; qb_ipcs_service_t *inst; char name[256]; }; struct outq_item { void *msg; size_t mlen; struct list_head list; }; static struct cs_ipcs_mapper ipcs_mapper[SERVICES_COUNT_MAX]; static int32_t cs_ipcs_job_add(enum qb_loop_priority p, void *data, qb_loop_job_dispatch_fn fn); static int32_t cs_ipcs_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t events, void *data, qb_ipcs_dispatch_fn_t fn); static int32_t cs_ipcs_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t events, void *data, qb_ipcs_dispatch_fn_t fn); static int32_t cs_ipcs_dispatch_del(int32_t fd); static struct qb_ipcs_poll_handlers corosync_poll_funcs = { .job_add = cs_ipcs_job_add, .dispatch_add = cs_ipcs_dispatch_add, .dispatch_mod = cs_ipcs_dispatch_mod, .dispatch_del = cs_ipcs_dispatch_del, }; static int32_t cs_ipcs_connection_accept (qb_ipcs_connection_t *c, uid_t euid, gid_t egid); static void cs_ipcs_connection_created(qb_ipcs_connection_t *c); static int32_t cs_ipcs_msg_process(qb_ipcs_connection_t *c, void *data, size_t size); static int32_t cs_ipcs_connection_closed (qb_ipcs_connection_t *c); static void cs_ipcs_connection_destroyed (qb_ipcs_connection_t *c); static struct qb_ipcs_service_handlers corosync_service_funcs = { .connection_accept = cs_ipcs_connection_accept, .connection_created = cs_ipcs_connection_created, .msg_process = cs_ipcs_msg_process, .connection_closed = cs_ipcs_connection_closed, .connection_destroyed = cs_ipcs_connection_destroyed, }; static const char* cs_ipcs_serv_short_name(int32_t service_id) { const char *name; switch (service_id) { case CFG_SERVICE: name = "cfg"; break; case CPG_SERVICE: name = "cpg"; break; case QUORUM_SERVICE: name = "quorum"; break; case PLOAD_SERVICE: name = "pload"; break; case VOTEQUORUM_SERVICE: name = "votequorum"; break; case MON_SERVICE: name = "mon"; break; case WD_SERVICE: name = "wd"; break; case CMAP_SERVICE: name = "cmap"; break; default: name = NULL; break; } return name; } void cs_ipc_allow_connections(int32_t allow) { ipc_allow_connections = allow; } int32_t cs_ipcs_service_destroy(int32_t service_id) { if (ipcs_mapper[service_id].inst) { qb_ipcs_destroy(ipcs_mapper[service_id].inst); ipcs_mapper[service_id].inst = NULL; } return 0; } static int32_t cs_ipcs_connection_accept (qb_ipcs_connection_t *c, uid_t euid, gid_t egid) { int32_t service = qb_ipcs_service_id_get(c); uint8_t u8; char key_name[ICMAP_KEYNAME_MAXLEN]; if (!ipc_allow_connections) { log_printf(LOGSYS_LEVEL_DEBUG, "Denied connection, corosync is not ready"); return -EAGAIN; } if (corosync_service[service] == NULL || ipcs_mapper[service].inst == NULL) { return -ENOSYS; } if (ipc_not_enough_fds_left) { return -EMFILE; } if (euid == 0 || egid == 0) { return 0; } snprintf(key_name, ICMAP_KEYNAME_MAXLEN, "uidgid.uid.%u", euid); if (icmap_get_uint8(key_name, &u8) == CS_OK && u8 == 1) return 0; snprintf(key_name, ICMAP_KEYNAME_MAXLEN, "uidgid.gid.%u", egid); if (icmap_get_uint8(key_name, &u8) == CS_OK && u8 == 1) return 0; log_printf(LOGSYS_LEVEL_ERROR, "Denied connection attempt from %d:%d", euid, egid); return -EACCES; } static char * pid_to_name (pid_t pid, char *out_name, size_t name_len) { char *name; char *rest; FILE *fp; char fname[32]; char buf[256]; snprintf (fname, 32, "/proc/%d/stat", pid); fp = fopen (fname, "r"); if (!fp) { return NULL; } if (fgets (buf, sizeof (buf), fp) == NULL) { fclose (fp); return NULL; } fclose (fp); name = strrchr (buf, '('); if (!name) { return NULL; } /* move past the bracket */ name++; rest = strrchr (buf, ')'); if (rest == NULL || rest[1] != ' ') { return NULL; } *rest = '\0'; /* move past the NULL and space */ rest += 2; /* copy the name */ strncpy (out_name, name, name_len); out_name[name_len - 1] = '\0'; return out_name; } struct cs_ipcs_conn_context { char *icmap_path; struct list_head outq_head; int32_t queuing; uint32_t queued; uint64_t invalid_request; uint64_t overload; uint32_t sent; char data[1]; }; static void cs_ipcs_connection_created(qb_ipcs_connection_t *c) { int32_t service = 0; struct cs_ipcs_conn_context *context; char proc_name[32]; struct qb_ipcs_connection_stats stats; int32_t size = sizeof(struct cs_ipcs_conn_context); char key_name[ICMAP_KEYNAME_MAXLEN]; int set_client_pid = 0; int set_proc_name = 0; log_printf(LOG_DEBUG, "connection created"); service = qb_ipcs_service_id_get(c); size += corosync_service[service]->private_data_size; context = calloc(1, size); if (context == NULL) { qb_ipcs_disconnect(c); return; } list_init(&context->outq_head); context->queuing = QB_FALSE; context->queued = 0; context->sent = 0; qb_ipcs_context_set(c, context); if (corosync_service[service]->lib_init_fn(c) != 0) { log_printf(LOG_ERR, "lib_init_fn failed, disconnecting"); qb_ipcs_disconnect(c); return; } icmap_inc("runtime.connections.active"); qb_ipcs_connection_stats_get(c, &stats, QB_FALSE); if (stats.client_pid > 0) { if (pid_to_name (stats.client_pid, proc_name, sizeof(proc_name))) { snprintf(key_name, ICMAP_KEYNAME_MAXLEN, "runtime.connections.%s:%u:%p", proc_name, stats.client_pid, c); set_proc_name = 1; } else { snprintf(key_name, ICMAP_KEYNAME_MAXLEN, "runtime.connections.%u:%p", stats.client_pid, c); } set_client_pid = 1; } else { snprintf(key_name, ICMAP_KEYNAME_MAXLEN, "runtime.connections.%p", c); } icmap_convert_name_to_valid_name(key_name); context->icmap_path = strdup(key_name); if (context->icmap_path == NULL) { qb_ipcs_disconnect(c); return; } if (set_proc_name) { snprintf(key_name, ICMAP_KEYNAME_MAXLEN, "%s.name", context->icmap_path); icmap_set_string(key_name, proc_name); } snprintf(key_name, ICMAP_KEYNAME_MAXLEN, "%s.client_pid", context->icmap_path); if (set_client_pid) { icmap_set_uint32(key_name, stats.client_pid); } else { icmap_set_uint32(key_name, 0); } snprintf(key_name, ICMAP_KEYNAME_MAXLEN, "%s.service_id", context->icmap_path); icmap_set_uint32(key_name, service); snprintf(key_name, ICMAP_KEYNAME_MAXLEN, "%s.responses", context->icmap_path); icmap_set_uint64(key_name, 0); snprintf(key_name, ICMAP_KEYNAME_MAXLEN, "%s.dispatched", context->icmap_path); icmap_set_uint64(key_name, 0); snprintf(key_name, ICMAP_KEYNAME_MAXLEN, "%s.requests", context->icmap_path); icmap_set_uint64(key_name, 0); snprintf(key_name, ICMAP_KEYNAME_MAXLEN, "%s.send_retries", context->icmap_path); icmap_set_uint64(key_name, 0); snprintf(key_name, ICMAP_KEYNAME_MAXLEN, "%s.recv_retries", context->icmap_path); icmap_set_uint64(key_name, 0); snprintf(key_name, ICMAP_KEYNAME_MAXLEN, "%s.flow_control", context->icmap_path); icmap_set_uint32(key_name, 0); snprintf(key_name, ICMAP_KEYNAME_MAXLEN, "%s.flow_control_count", context->icmap_path); icmap_set_uint64(key_name, 0); snprintf(key_name, ICMAP_KEYNAME_MAXLEN, "%s.queue_size", context->icmap_path); icmap_set_uint32(key_name, 0); snprintf(key_name, ICMAP_KEYNAME_MAXLEN, "%s.invalid_request", context->icmap_path); icmap_set_uint64(key_name, 0); snprintf(key_name, ICMAP_KEYNAME_MAXLEN, "%s.overload", context->icmap_path); icmap_set_uint64(key_name, 0); } void cs_ipc_refcnt_inc(void *conn) { qb_ipcs_connection_ref(conn); } void cs_ipc_refcnt_dec(void *conn) { qb_ipcs_connection_unref(conn); } void *cs_ipcs_private_data_get(void *conn) { struct cs_ipcs_conn_context *cnx; cnx = qb_ipcs_context_get(conn); return &cnx->data[0]; } static void cs_ipcs_connection_destroyed (qb_ipcs_connection_t *c) { struct cs_ipcs_conn_context *context; struct list_head *list, *list_next; struct outq_item *outq_item; log_printf(LOG_DEBUG, "%s() ", __func__); context = qb_ipcs_context_get(c); if (context) { for (list = context->outq_head.next; list != &context->outq_head; list = list_next) { list_next = list->next; outq_item = list_entry (list, struct outq_item, list); list_del (list); free (outq_item->msg); free (outq_item); } free(context); } } static int32_t cs_ipcs_connection_closed (qb_ipcs_connection_t *c) { int32_t res = 0; int32_t service = qb_ipcs_service_id_get(c); icmap_iter_t iter; char prefix[ICMAP_KEYNAME_MAXLEN]; const char *key_name; struct cs_ipcs_conn_context *cnx; log_printf(LOG_DEBUG, "%s() ", __func__); res = corosync_service[service]->lib_exit_fn(c); if (res != 0) { return res; } cnx = qb_ipcs_context_get(c); snprintf(prefix, ICMAP_KEYNAME_MAXLEN, "%s.", cnx->icmap_path); iter = icmap_iter_init(prefix); while ((key_name = icmap_iter_next(iter, NULL, NULL)) != NULL) { icmap_delete(key_name); } icmap_iter_finalize(iter); free(cnx->icmap_path); icmap_inc("runtime.connections.closed"); icmap_dec("runtime.connections.active"); return 0; } int cs_ipcs_response_iov_send (void *conn, const struct iovec *iov, unsigned int iov_len) { int32_t rc = qb_ipcs_response_sendv(conn, iov, iov_len); if (rc >= 0) { return 0; } return rc; } int cs_ipcs_response_send(void *conn, const void *msg, size_t mlen) { int32_t rc = qb_ipcs_response_send(conn, msg, mlen); if (rc >= 0) { return 0; } return rc; } static void outq_flush (void *data) { qb_ipcs_connection_t *conn = data; struct list_head *list, *list_next; struct outq_item *outq_item; int32_t rc; struct cs_ipcs_conn_context *context = qb_ipcs_context_get(conn); for (list = context->outq_head.next; list != &context->outq_head; list = list_next) { list_next = list->next; outq_item = list_entry (list, struct outq_item, list); rc = qb_ipcs_event_send(conn, outq_item->msg, outq_item->mlen); if (rc < 0 && rc != -EAGAIN) { errno = -rc; qb_perror(LOG_ERR, "qb_ipcs_event_send"); qb_ipcs_connection_unref(conn); return; } else if (rc == -EAGAIN) { break; } assert(rc == outq_item->mlen); context->sent++; context->queued--; list_del (list); free (outq_item->msg); free (outq_item); } if (list_empty (&context->outq_head)) { context->queuing = QB_FALSE; log_printf(LOGSYS_LEVEL_INFO, "Q empty, queued:%d sent:%d.", context->queued, context->sent); context->queued = 0; context->sent = 0; qb_ipcs_connection_unref(conn); } else { qb_loop_job_add(cs_poll_handle_get(), QB_LOOP_HIGH, conn, outq_flush); } } static void msg_send_or_queue(qb_ipcs_connection_t *conn, const struct iovec *iov, uint32_t iov_len) { int32_t rc = 0; int32_t i; int32_t bytes_msg = 0; struct outq_item *outq_item; char *write_buf = 0; struct cs_ipcs_conn_context *context = qb_ipcs_context_get(conn); for (i = 0; i < iov_len; i++) { bytes_msg += iov[i].iov_len; } if (!context->queuing) { assert(list_empty (&context->outq_head)); rc = qb_ipcs_event_sendv(conn, iov, iov_len); if (rc == bytes_msg) { context->sent++; return; } if (rc == -EAGAIN) { context->queued = 0; context->sent = 0; context->queuing = QB_TRUE; qb_ipcs_connection_ref(conn); qb_loop_job_add(cs_poll_handle_get(), QB_LOOP_HIGH, conn, outq_flush); } else { log_printf(LOGSYS_LEVEL_ERROR, "event_send retuned %d, expected %d!", rc, bytes_msg); return; } } outq_item = malloc (sizeof (struct outq_item)); if (outq_item == NULL) { qb_ipcs_connection_unref(conn); qb_ipcs_disconnect(conn); return; } outq_item->msg = malloc (bytes_msg); if (outq_item->msg == NULL) { free (outq_item); qb_ipcs_connection_unref(conn); qb_ipcs_disconnect(conn); return; } write_buf = outq_item->msg; for (i = 0; i < iov_len; i++) { memcpy (write_buf, iov[i].iov_base, iov[i].iov_len); write_buf += iov[i].iov_len; } outq_item->mlen = bytes_msg; list_init (&outq_item->list); list_add_tail (&outq_item->list, &context->outq_head); context->queued++; } int cs_ipcs_dispatch_send(void *conn, const void *msg, size_t mlen) { struct iovec iov; iov.iov_base = (void *)msg; iov.iov_len = mlen; msg_send_or_queue (conn, &iov, 1); return 0; } int cs_ipcs_dispatch_iov_send (void *conn, const struct iovec *iov, unsigned int iov_len) { msg_send_or_queue(conn, iov, iov_len); return 0; } static int32_t cs_ipcs_msg_process(qb_ipcs_connection_t *c, void *data, size_t size) { struct qb_ipc_response_header response; struct qb_ipc_request_header *request_pt = (struct qb_ipc_request_header *)data; int32_t service = qb_ipcs_service_id_get(c); int32_t send_ok = 0; int32_t is_async_call = QB_FALSE; ssize_t res = -1; int sending_allowed_private_data; struct cs_ipcs_conn_context *cnx; send_ok = corosync_sending_allowed (service, request_pt->id, request_pt, &sending_allowed_private_data); is_async_call = (service == CPG_SERVICE && request_pt->id == 2); /* * This happens when the message contains some kind of invalid * parameter, such as an invalid size */ if (send_ok == -EINVAL) { response.size = sizeof (response); response.id = 0; response.error = CS_ERR_INVALID_PARAM; cnx = qb_ipcs_context_get(c); if (cnx) { cnx->invalid_request++; } if (is_async_call) { log_printf(LOGSYS_LEVEL_INFO, "*** %s() invalid message! size:%d error:%d", __func__, response.size, response.error); } else { qb_ipcs_response_send (c, &response, sizeof (response)); } res = -EINVAL; } else if (send_ok < 0) { cnx = qb_ipcs_context_get(c); if (cnx) { cnx->overload++; } if (!is_async_call) { /* * Overload, tell library to retry */ response.size = sizeof (response); response.id = 0; response.error = CS_ERR_TRY_AGAIN; qb_ipcs_response_send (c, &response, sizeof (response)); } else { log_printf(LOGSYS_LEVEL_WARNING, "*** %s() (%d:%d - %d) %s!", __func__, service, request_pt->id, is_async_call, strerror(-send_ok)); } res = -ENOBUFS; } if (send_ok >= 0) { corosync_service[service]->lib_engine[request_pt->id].lib_handler_fn(c, request_pt); res = 0; } corosync_sending_allowed_release (&sending_allowed_private_data); return res; } static int32_t cs_ipcs_job_add(enum qb_loop_priority p, void *data, qb_loop_job_dispatch_fn fn) { return qb_loop_job_add(cs_poll_handle_get(), p, data, fn); } static int32_t cs_ipcs_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t events, void *data, qb_ipcs_dispatch_fn_t fn) { return qb_loop_poll_add(cs_poll_handle_get(), p, fd, events, data, fn); } static int32_t cs_ipcs_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t events, void *data, qb_ipcs_dispatch_fn_t fn) { return qb_loop_poll_mod(cs_poll_handle_get(), p, fd, events, data, fn); } static int32_t cs_ipcs_dispatch_del(int32_t fd) { return qb_loop_poll_del(cs_poll_handle_get(), fd); } static void cs_ipcs_low_fds_event(int32_t not_enough, int32_t fds_available) { ipc_not_enough_fds_left = not_enough; if (not_enough) { log_printf(LOGSYS_LEVEL_WARNING, "refusing new connections (fds_available:%d)", fds_available); } else { log_printf(LOGSYS_LEVEL_NOTICE, "allowing new connections (fds_available:%d)", fds_available); } } int32_t cs_ipcs_q_level_get(void) { return ipc_fc_totem_queue_level; } static qb_loop_timer_handle ipcs_check_for_flow_control_timer; static void cs_ipcs_check_for_flow_control(void) { int32_t i; int32_t fc_enabled; for (i = 0; i < SERVICES_COUNT_MAX; i++) { if (corosync_service[i] == NULL || ipcs_mapper[i].inst == NULL) { continue; } fc_enabled = QB_IPCS_RATE_OFF; if (ipc_fc_is_quorate == 1 || corosync_service[i]->allow_inquorate == CS_LIB_ALLOW_INQUORATE) { /* * we are quorate * now check flow control */ if (ipc_fc_totem_queue_level != TOTEM_Q_LEVEL_CRITICAL && ipc_fc_sync_in_process == 0) { fc_enabled = QB_FALSE; } else { fc_enabled = QB_IPCS_RATE_OFF_2; } } if (fc_enabled) { qb_ipcs_request_rate_limit(ipcs_mapper[i].inst, fc_enabled); qb_loop_timer_add(cs_poll_handle_get(), QB_LOOP_MED, 1*QB_TIME_NS_IN_MSEC, NULL, corosync_recheck_the_q_level, &ipcs_check_for_flow_control_timer); } else if (ipc_fc_totem_queue_level == TOTEM_Q_LEVEL_LOW) { qb_ipcs_request_rate_limit(ipcs_mapper[i].inst, QB_IPCS_RATE_FAST); } else if (ipc_fc_totem_queue_level == TOTEM_Q_LEVEL_GOOD) { qb_ipcs_request_rate_limit(ipcs_mapper[i].inst, QB_IPCS_RATE_NORMAL); } else if (ipc_fc_totem_queue_level == TOTEM_Q_LEVEL_HIGH) { qb_ipcs_request_rate_limit(ipcs_mapper[i].inst, QB_IPCS_RATE_SLOW); } } } static void cs_ipcs_fc_quorum_changed(int quorate, void *context) { ipc_fc_is_quorate = quorate; cs_ipcs_check_for_flow_control(); } static void cs_ipcs_totem_queue_level_changed(enum totem_q_level level) { ipc_fc_totem_queue_level = level; cs_ipcs_check_for_flow_control(); } void cs_ipcs_sync_state_changed(int32_t sync_in_process) { ipc_fc_sync_in_process = sync_in_process; cs_ipcs_check_for_flow_control(); } void cs_ipcs_stats_update(void) { int32_t i; struct qb_ipcs_stats srv_stats; struct qb_ipcs_connection_stats stats; qb_ipcs_connection_t *c; struct cs_ipcs_conn_context *cnx; char key_name[ICMAP_KEYNAME_MAXLEN]; for (i = 0; i < SERVICES_COUNT_MAX; i++) { if (corosync_service[i] == NULL || ipcs_mapper[i].inst == NULL) { continue; } qb_ipcs_stats_get(ipcs_mapper[i].inst, &srv_stats, QB_FALSE); for (c = qb_ipcs_connection_first_get(ipcs_mapper[i].inst); c; c = qb_ipcs_connection_next_get(ipcs_mapper[i].inst, c)) { cnx = qb_ipcs_context_get(c); if (cnx == NULL) continue; qb_ipcs_connection_stats_get(c, &stats, QB_FALSE); snprintf(key_name, ICMAP_KEYNAME_MAXLEN, "%s.client_pid", cnx->icmap_path); icmap_set_uint32(key_name, stats.client_pid); snprintf(key_name, ICMAP_KEYNAME_MAXLEN, "%s.requests", cnx->icmap_path); icmap_set_uint64(key_name, stats.requests); snprintf(key_name, ICMAP_KEYNAME_MAXLEN, "%s.responses", cnx->icmap_path); icmap_set_uint64(key_name, stats.responses); snprintf(key_name, ICMAP_KEYNAME_MAXLEN, "%s.dispatched", cnx->icmap_path); icmap_set_uint64(key_name, stats.events); snprintf(key_name, ICMAP_KEYNAME_MAXLEN, "%s.send_retries", cnx->icmap_path); icmap_set_uint64(key_name, stats.send_retries); snprintf(key_name, ICMAP_KEYNAME_MAXLEN, "%s.recv_retries", cnx->icmap_path); icmap_set_uint64(key_name, stats.recv_retries); snprintf(key_name, ICMAP_KEYNAME_MAXLEN, "%s.flow_control", cnx->icmap_path); icmap_set_uint32(key_name, stats.flow_control_state); snprintf(key_name, ICMAP_KEYNAME_MAXLEN, "%s.flow_control_count", cnx->icmap_path); icmap_set_uint64(key_name, stats.flow_control_count); snprintf(key_name, ICMAP_KEYNAME_MAXLEN, "%s.queue_size", cnx->icmap_path); icmap_set_uint32(key_name, cnx->queued); snprintf(key_name, ICMAP_KEYNAME_MAXLEN, "%s.invalid_request", cnx->icmap_path); icmap_set_uint64(key_name, cnx->invalid_request); snprintf(key_name, ICMAP_KEYNAME_MAXLEN, "%s.overload", cnx->icmap_path); icmap_set_uint64(key_name, cnx->overload); qb_ipcs_connection_unref(c); } } } -void cs_ipcs_service_init(struct corosync_service_engine *service) +const char *cs_ipcs_service_init(struct corosync_service_engine *service) { if (service->lib_engine_count == 0) { log_printf (LOGSYS_LEVEL_DEBUG, "NOT Initializing IPC on %s [%d]", cs_ipcs_serv_short_name(service->id), service->id); - return; + return NULL; } ipcs_mapper[service->id].id = service->id; strcpy(ipcs_mapper[service->id].name, cs_ipcs_serv_short_name(service->id)); log_printf (LOGSYS_LEVEL_DEBUG, "Initializing IPC on %s [%d]", ipcs_mapper[service->id].name, ipcs_mapper[service->id].id); ipcs_mapper[service->id].inst = qb_ipcs_create(ipcs_mapper[service->id].name, ipcs_mapper[service->id].id, QB_IPC_NATIVE, &corosync_service_funcs); assert(ipcs_mapper[service->id].inst); qb_ipcs_poll_handlers_set(ipcs_mapper[service->id].inst, &corosync_poll_funcs); - qb_ipcs_run(ipcs_mapper[service->id].inst); + if (qb_ipcs_run(ipcs_mapper[service->id].inst) != 0) { + log_printf (LOGSYS_LEVEL_ERROR, "Can't initialize IPC"); + return "qb_ipcs_run error"; + } + + return NULL; } void cs_ipcs_init(void) { api = apidef_get (); qb_loop_poll_low_fds_event_set(cs_poll_handle_get(), cs_ipcs_low_fds_event); api->quorum_register_callback (cs_ipcs_fc_quorum_changed, NULL); totempg_queue_level_register_callback (cs_ipcs_totem_queue_level_changed); icmap_set_uint64("runtime.connections.active", 0); icmap_set_uint64("runtime.connections.closed", 0); } diff --git a/exec/main.h b/exec/main.h index 13b7e121..b65f0ae9 100644 --- a/exec/main.h +++ b/exec/main.h @@ -1,126 +1,126 @@ /* * Copyright (c) 2002-2006 MontaVista Software, Inc. * Copyright (c) 2006-2012 Red Hat, Inc. * * All rights reserved. * * Author: Steven Dake (sdake@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ /** * @file * * @warning DO NOT USE SYMBOLS IN THIS FILE */ #ifndef MAIN_H_DEFINED #define MAIN_H_DEFINED #define TRUE 1 #define FALSE 0 #include #include #include #include #include extern unsigned long long *(*main_clm_get_by_nodeid) (unsigned int node_id); extern int main_mcast ( const struct iovec *iovec, unsigned int iov_len, unsigned int guarantee); extern void message_source_set (mar_message_source_t *source, void *conn); extern int message_source_is_local (const mar_message_source_t *source); extern void corosync_shutdown_request (void); extern void corosync_state_dump (void); extern qb_loop_t *cs_poll_handle_get (void); extern int cs_poll_dispatch_add (qb_loop_t * handle, int fd, int events, void *data, int (*dispatch_fn) (int fd, int revents, void *data)); extern int cs_poll_dispatch_delete ( qb_loop_t * handle, int fd); extern int corosync_sending_allowed ( unsigned int service, unsigned int id, const void *msg, void *sending_allowed_private_data); extern void corosync_sending_allowed_release (void *sending_allowed_private_data); extern void corosync_recheck_the_q_level(void *data); extern void cs_ipcs_init(void); -extern void cs_ipcs_service_init(struct corosync_service_engine *service); +extern const char *cs_ipcs_service_init(struct corosync_service_engine *service); extern void cs_ipcs_stats_update(void); extern int32_t cs_ipcs_service_destroy(int32_t service_id); extern int32_t cs_ipcs_q_level_get(void); extern int cs_ipcs_dispatch_send(void *conn, const void *msg, size_t mlen); extern int cs_ipcs_dispatch_iov_send (void *conn, const struct iovec *iov, unsigned int iov_len); extern int cs_ipcs_response_send(void *conn, const void *msg, size_t mlen); extern int cs_ipcs_response_iov_send (void *conn, const struct iovec *iov, unsigned int iov_len); extern void cs_ipcs_sync_state_changed(int32_t sync_in_process); extern void *cs_ipcs_private_data_get(void *conn); extern void cs_ipc_refcnt_inc(void *conn); extern void cs_ipc_refcnt_dec(void *conn); extern void cs_ipc_allow_connections(int32_t allow); int coroparse_configparse (const char **error_string); #endif /* MAIN_H_DEFINED */ diff --git a/exec/service.c b/exec/service.c index eaacfbba..f998306e 100644 --- a/exec/service.c +++ b/exec/service.c @@ -1,464 +1,467 @@ /* * Copyright (c) 2006 MontaVista Software, Inc. * Copyright (c) 2006-2012 Red Hat, Inc. * * All rights reserved. * * Author: Steven Dake (sdake@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include "util.h" #include #include #include "timer.h" #include #include #include "main.h" #include "service.h" #include #include LOGSYS_DECLARE_SUBSYS ("SERV"); static struct default_service default_services[] = { { .name = "corosync_cmap", .ver = 0, .loader = cmap_get_service_engine_ver0 }, { .name = "corosync_cfg", .ver = 0, .loader = cfg_get_service_engine_ver0 }, { .name = "corosync_cpg", .ver = 0, .loader = cpg_get_service_engine_ver0 }, { .name = "corosync_pload", .ver = 0, .loader = pload_get_service_engine_ver0 }, #ifdef HAVE_MONITORING { .name = "corosync_mon", .ver = 0, .loader = mon_get_service_engine_ver0 }, #endif #ifdef HAVE_WATCHDOG { .name = "corosync_wd", .ver = 0, .loader = wd_get_service_engine_ver0 }, #endif { .name = "corosync_quorum", .ver = 0, .loader = vsf_quorum_get_service_engine_ver0 }, }; /* * service exit and unlink schedwrk handler data structure */ struct seus_handler_data { int service_engine; struct corosync_api_v1 *api; }; struct corosync_service_engine *corosync_service[SERVICES_COUNT_MAX]; const char *service_stats_rx[SERVICES_COUNT_MAX][SERVICE_HANDLER_MAXIMUM_COUNT]; const char *service_stats_tx[SERVICES_COUNT_MAX][SERVICE_HANDLER_MAXIMUM_COUNT]; static void (*service_unlink_all_complete) (void) = NULL; char *corosync_service_link_and_init ( struct corosync_api_v1 *corosync_api, struct default_service *service) { struct corosync_service_engine *service_engine; int fn; char *name_sufix; char key_name[ICMAP_KEYNAME_MAXLEN]; char *init_result; /* * Initialize service */ service_engine = service->loader(); corosync_service[service_engine->id] = service_engine; if (service_engine->config_init_fn) { service_engine->config_init_fn (corosync_api); } if (service_engine->exec_init_fn) { init_result = service_engine->exec_init_fn (corosync_api); if (init_result) { return (init_result); } } /* * Store service in cmap db */ snprintf(key_name, ICMAP_KEYNAME_MAXLEN, "internal_configuration.service.%u.name", service_engine->id); icmap_set_string(key_name, service->name); snprintf(key_name, ICMAP_KEYNAME_MAXLEN, "internal_configuration.service.%u.ver", service_engine->id); icmap_set_uint32(key_name, service->ver); name_sufix = strrchr (service->name, '_'); if (name_sufix) name_sufix++; else name_sufix = (char*)service->name; snprintf(key_name, ICMAP_KEYNAME_MAXLEN, "runtime.services.%s.service_id", name_sufix); icmap_set_uint16(key_name, service_engine->id); for (fn = 0; fn < service_engine->exec_engine_count; fn++) { snprintf(key_name, ICMAP_KEYNAME_MAXLEN, "runtime.services.%s.%d.tx", name_sufix, fn); icmap_set_uint64(key_name, 0); service_stats_tx[service_engine->id][fn] = strdup(key_name); snprintf(key_name, ICMAP_KEYNAME_MAXLEN, "runtime.services.%s.%d.rx", name_sufix, fn); icmap_set_uint64(key_name, 0); service_stats_rx[service_engine->id][fn] = strdup(key_name); } log_printf (LOGSYS_LEVEL_NOTICE, "Service engine loaded: %s [%d]", service_engine->name, service_engine->id); - cs_ipcs_service_init(service_engine); + init_result = (char *)cs_ipcs_service_init(service_engine); + if (init_result != NULL) { + return (init_result); + } return NULL; } static int service_priority_max(void) { int lpc = 0, max = 0; for(; lpc < SERVICES_COUNT_MAX; lpc++) { if(corosync_service[lpc] != NULL && corosync_service[lpc]->priority > max) { max = corosync_service[lpc]->priority; } } return max; } /* * use the force */ static unsigned int corosync_service_unlink_and_exit_priority ( struct corosync_api_v1 *corosync_api, int lowest_priority, int *current_priority, int *current_service_engine) { unsigned short service_id; int res; for(; *current_priority >= lowest_priority; *current_priority = *current_priority - 1) { for(*current_service_engine = 0; *current_service_engine < SERVICES_COUNT_MAX; *current_service_engine = *current_service_engine + 1) { if(corosync_service[*current_service_engine] == NULL || corosync_service[*current_service_engine]->priority != *current_priority) { continue; } /* * find service handle and unload it if possible. * * If the service engine's exec_exit_fn returns -1 indicating * it was busy, this function returns -1 and can be called again * at a later time (usually via the schedwrk api). */ service_id = corosync_service[*current_service_engine]->id; if (corosync_service[service_id]->exec_exit_fn) { res = corosync_service[service_id]->exec_exit_fn (); if (res == -1) { return (-1); } } /* * Exit all ipc connections dependent on this service */ cs_ipcs_service_destroy (*current_service_engine); log_printf(LOGSYS_LEVEL_NOTICE, "Service engine unloaded: %s", corosync_service[*current_service_engine]->name); corosync_service[*current_service_engine] = NULL; /* * Call should call this function again */ return (1); } } /* * We finish unlink of all services -> no need to call this function again */ return (0); } static unsigned int service_unlink_and_exit ( struct corosync_api_v1 *corosync_api, const char *service_name, unsigned int service_ver) { unsigned short service_id; char *name_sufix; int res; const char *iter_key_name; icmap_iter_t iter; char key_name[ICMAP_KEYNAME_MAXLEN]; unsigned int found_service_ver; char *found_service_name; int service_found; name_sufix = strrchr (service_name, '_'); if (name_sufix) name_sufix++; else name_sufix = (char*)service_name; service_found = 0; found_service_name = NULL; iter = icmap_iter_init("internal_configuration.service."); while ((iter_key_name = icmap_iter_next(iter, NULL, NULL)) != NULL) { res = sscanf(iter_key_name, "internal_configuration.service.%hu.%s", &service_id, key_name); if (res != 2) { continue; } snprintf(key_name, ICMAP_KEYNAME_MAXLEN, "internal_configuration.service.%hu.name", service_id); free(found_service_name); if (icmap_get_string(key_name, &found_service_name) != CS_OK) { continue; } snprintf(key_name, ICMAP_KEYNAME_MAXLEN, "internal_configuration.service.%u.ver", service_id); if (icmap_get_uint32(key_name, &found_service_ver) != CS_OK) { continue; } if (service_ver == found_service_ver && strcmp(found_service_name, service_name) == 0) { free(found_service_name); service_found = 1; break; } } icmap_iter_finalize(iter); if (service_found && service_id < SERVICES_COUNT_MAX && corosync_service[service_id] != NULL) { if (corosync_service[service_id]->exec_exit_fn) { res = corosync_service[service_id]->exec_exit_fn (); if (res == -1) { return (-1); } } log_printf(LOGSYS_LEVEL_NOTICE, "Service engine unloaded: %s", corosync_service[service_id]->name); corosync_service[service_id] = NULL; cs_ipcs_service_destroy (service_id); snprintf(key_name, ICMAP_KEYNAME_MAXLEN, "internal_configuration.service.%u.handle", service_id); icmap_delete(key_name); snprintf(key_name, ICMAP_KEYNAME_MAXLEN, "internal_configuration.service.%u.name", service_id); icmap_delete(key_name); snprintf(key_name, ICMAP_KEYNAME_MAXLEN, "internal_configuration.service.%u.ver", service_id); icmap_delete(key_name); } return (0); } /* * Links default services into the executive */ unsigned int corosync_service_defaults_link_and_init (struct corosync_api_v1 *corosync_api) { unsigned int i; char *error; for (i = 0; i < sizeof (default_services) / sizeof (struct default_service); i++) { default_services[i].loader(); error = corosync_service_link_and_init ( corosync_api, &default_services[i]); if (error) { log_printf(LOGSYS_LEVEL_ERROR, "Service engine '%s' failed to load for reason '%s'", default_services[i].name, error); corosync_exit_error (COROSYNC_DONE_SERVICE_ENGINE_INIT); } } return (0); } static void service_exit_schedwrk_handler (void *data) { int res; static int current_priority = 0; static int current_service_engine = 0; static int called = 0; struct seus_handler_data *cb_data = (struct seus_handler_data *)data; struct corosync_api_v1 *api = (struct corosync_api_v1 *)cb_data->api; if (called == 0) { log_printf(LOGSYS_LEVEL_NOTICE, "Unloading all Corosync service engines."); current_priority = service_priority_max (); called = 1; } res = corosync_service_unlink_and_exit_priority ( api, 0, ¤t_priority, ¤t_service_engine); if (res == 0) { service_unlink_all_complete(); return; } qb_loop_job_add(cs_poll_handle_get(), QB_LOOP_HIGH, data, service_exit_schedwrk_handler); } void corosync_service_unlink_all ( struct corosync_api_v1 *api, void (*unlink_all_complete) (void)) { static int called = 0; static struct seus_handler_data cb_data; assert (api); service_unlink_all_complete = unlink_all_complete; if (called) { return; } if (called == 0) { called = 1; } cb_data.api = api; qb_loop_job_add(cs_poll_handle_get(), QB_LOOP_HIGH, &cb_data, service_exit_schedwrk_handler); } struct service_unlink_and_exit_data { hdb_handle_t handle; struct corosync_api_v1 *api; const char *name; unsigned int ver; }; static void service_unlink_and_exit_schedwrk_handler (void *data) { struct service_unlink_and_exit_data *service_unlink_and_exit_data = data; int res; res = service_unlink_and_exit ( service_unlink_and_exit_data->api, service_unlink_and_exit_data->name, service_unlink_and_exit_data->ver); if (res == 0) { free (service_unlink_and_exit_data); } else { qb_loop_job_add(cs_poll_handle_get(), QB_LOOP_HIGH, data, service_unlink_and_exit_schedwrk_handler); } } typedef int (*schedwrk_cast) (const void *); unsigned int corosync_service_unlink_and_exit ( struct corosync_api_v1 *api, const char *service_name, unsigned int service_ver) { struct service_unlink_and_exit_data *service_unlink_and_exit_data; assert (api); service_unlink_and_exit_data = malloc (sizeof (struct service_unlink_and_exit_data)); service_unlink_and_exit_data->api = api; service_unlink_and_exit_data->name = strdup (service_name); service_unlink_and_exit_data->ver = service_ver; qb_loop_job_add(cs_poll_handle_get(), QB_LOOP_HIGH, service_unlink_and_exit_data, service_unlink_and_exit_schedwrk_handler); return (0); }