diff --git a/qdevices/pr-poll-array.c b/qdevices/pr-poll-array.c index 4e03d8e5..a2ba45f7 100644 --- a/qdevices/pr-poll-array.c +++ b/qdevices/pr-poll-array.c @@ -1,156 +1,156 @@ /* * Copyright (c) 2015-2016 Red Hat, Inc. * * All rights reserved. * * Author: Jan Friesse (jfriesse@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the Red Hat, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include "pr-poll-array.h" void pr_poll_array_init(struct pr_poll_array *poll_array, size_t user_data_size) { memset(poll_array, 0, sizeof(*poll_array)); poll_array->user_data_size = user_data_size; } void pr_poll_array_destroy(struct pr_poll_array *poll_array) { free(poll_array->array); free(poll_array->user_data_array); pr_poll_array_init(poll_array, poll_array->user_data_size); } void pr_poll_array_clean(struct pr_poll_array *poll_array) { poll_array->items = 0; } static int pr_poll_array_realloc(struct pr_poll_array *poll_array, ssize_t new_array_size) { PRPollDesc *new_array; char *new_user_data_array; new_array = realloc(poll_array->array, sizeof(PRPollDesc) * new_array_size); if (new_array == NULL) { return (-1); } + poll_array->allocated = new_array_size; + poll_array->array = new_array; + if (poll_array->user_data_size > 0) { new_user_data_array = realloc(poll_array->user_data_array, poll_array->user_data_size * new_array_size); if (new_user_data_array == NULL) { return (-1); } poll_array->user_data_array = new_user_data_array; } - poll_array->allocated = new_array_size; - poll_array->array = new_array; - return (0); } ssize_t pr_poll_array_size(struct pr_poll_array *poll_array) { return (poll_array->items); } ssize_t pr_poll_array_add(struct pr_poll_array *poll_array, PRPollDesc **pfds, void **user_data) { if (pr_poll_array_size(poll_array) >= poll_array->allocated) { if (pr_poll_array_realloc(poll_array, (poll_array->allocated * 2) + 1)) { return (-1); } } *pfds = &poll_array->array[pr_poll_array_size(poll_array)]; memset(*pfds, 0, sizeof(**pfds)); *user_data = poll_array->user_data_array + (poll_array->items * poll_array->user_data_size); memset(*user_data, 0, poll_array->user_data_size); poll_array->items++; return (poll_array->items - 1); } void pr_poll_array_gc(struct pr_poll_array *poll_array) { if (poll_array->allocated > (pr_poll_array_size(poll_array) * 3) + 1) { pr_poll_array_realloc(poll_array, (pr_poll_array_size(poll_array) * 2) + 1); } } PRPollDesc * pr_poll_array_get(const struct pr_poll_array *poll_array, ssize_t pos) { if (pos >= poll_array->items) { return (NULL); } return (&poll_array->array[pos]); } void * pr_poll_array_get_user_data(const struct pr_poll_array *poll_array, ssize_t pos) { if (pos >= poll_array->items) { return (NULL); } return (poll_array->user_data_array + (pos * poll_array->user_data_size)); } diff --git a/qdevices/qdevice-ipc.c b/qdevices/qdevice-ipc.c index 6312441a..73aba3dc 100644 --- a/qdevices/qdevice-ipc.c +++ b/qdevices/qdevice-ipc.c @@ -1,335 +1,335 @@ /* * Copyright (c) 2015-2016 Red Hat, Inc. * * All rights reserved. * * Author: Jan Friesse (jfriesse@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the Red Hat, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include "qdevice-config.h" #include "qdevice-ipc.h" #include "qdevice-log.h" #include "unix-socket-ipc.h" #include "dynar-simple-lex.h" #include "dynar-str.h" #include "qdevice-ipc-cmd.h" int qdevice_ipc_init(struct qdevice_instance *instance) { if (unix_socket_ipc_init(&instance->local_ipc, instance->advanced_settings->local_socket_file, instance->advanced_settings->local_socket_backlog, instance->advanced_settings->ipc_max_clients, instance->advanced_settings->ipc_max_receive_size, instance->advanced_settings->ipc_max_send_size) != 0) { qdevice_log_err(LOG_ERR, "Can't create unix socket"); return (-1); } return (0); } int qdevice_ipc_close(struct qdevice_instance *instance) { int res; res = unix_socket_ipc_close(&instance->local_ipc); if (res != 0) { qdevice_log_err(LOG_WARNING, "Can't close local IPC"); } return (res); } int qdevice_ipc_is_closed(struct qdevice_instance *instance) { return (unix_socket_ipc_is_closed(&instance->local_ipc)); } int qdevice_ipc_destroy(struct qdevice_instance *instance) { int res; struct unix_socket_client *client; const struct unix_socket_client_list *ipc_client_list; ipc_client_list = &instance->local_ipc.clients; TAILQ_FOREACH(client, ipc_client_list, entries) { free(client->user_data); } res = unix_socket_ipc_destroy(&instance->local_ipc); if (res != 0) { qdevice_log_err(LOG_WARNING, "Can't destroy local IPC"); } return (res); } int qdevice_ipc_accept(struct qdevice_instance *instance, struct unix_socket_client **res_client) { int res; int accept_res; accept_res = unix_socket_ipc_accept(&instance->local_ipc, res_client); switch (accept_res) { case -1: qdevice_log_err(LOG_ERR, "Can't accept local IPC connection"); res = -1; goto return_res; break; case -2: qdevice_log(LOG_ERR, "Maximum IPC clients reached. Not accepting connection"); res = -1; goto return_res; break; case -3: qdevice_log(LOG_ERR, "Can't add client to list"); res = -1; goto return_res; break; default: unix_socket_client_read_line(*res_client, 1); res = 0; break; } (*res_client)->user_data = malloc(sizeof(struct qdevice_ipc_user_data)); if ((*res_client)->user_data == NULL) { qdevice_log(LOG_ERR, "Can't alloc IPC client user data"); res = -1; qdevice_ipc_client_disconnect(instance, *res_client); + } else { + memset((*res_client)->user_data, 0, sizeof(struct qdevice_ipc_user_data)); } - memset((*res_client)->user_data, 0, sizeof(struct qdevice_ipc_user_data)); return_res: return (res); } void qdevice_ipc_client_disconnect(struct qdevice_instance *instance, struct unix_socket_client *client) { free(client->user_data); unix_socket_ipc_client_disconnect(&instance->local_ipc, client); } int qdevice_ipc_send_error(struct qdevice_instance *instance, struct unix_socket_client *client, const char *error_fmt, ...) { va_list ap; int res; va_start(ap, error_fmt); res = ((dynar_str_cpy(&client->send_buffer, "Error\n") == 0) && (dynar_str_vcatf(&client->send_buffer, error_fmt, ap) > 0) && (dynar_str_cat(&client->send_buffer, "\n") == 0)); va_end(ap); if (res) { unix_socket_client_write_buffer(client, 1); } else { qdevice_log(LOG_ERR, "Can't send ipc error to client (buffer too small)"); } return (res ? 0 : -1); } int qdevice_ipc_send_buffer(struct qdevice_instance *instance, struct unix_socket_client *client) { if (dynar_str_prepend(&client->send_buffer, "OK\n") != 0) { qdevice_log(LOG_ERR, "Can't send ipc message to client (buffer too small)"); if (qdevice_ipc_send_error(instance, client, "Internal IPC buffer too small") != 0) { return (-1); } return (0); } unix_socket_client_write_buffer(client, 1); return (0); } static void qdevice_ipc_parse_line(struct qdevice_instance *instance, struct unix_socket_client *client) { struct dynar_simple_lex lex; struct dynar *token; char *str; struct qdevice_ipc_user_data *ipc_user_data; int verbose; ipc_user_data = (struct qdevice_ipc_user_data *)client->user_data; dynar_simple_lex_init(&lex, &client->receive_buffer, DYNAR_SIMPLE_LEX_TYPE_PLAIN); token = dynar_simple_lex_token_next(&lex); verbose = 0; if (token == NULL) { qdevice_log(LOG_ERR, "Can't alloc memory for simple lex"); if (qdevice_ipc_send_error(instance, client, "Command too long") != 0) { client->schedule_disconnect = 1; } return; } str = dynar_data(token); if (strcasecmp(str, "") == 0) { qdevice_log(LOG_DEBUG, "IPC client doesn't send command"); if (qdevice_ipc_send_error(instance, client, "No command specified") != 0) { client->schedule_disconnect = 1; } } else if (strcasecmp(str, "shutdown") == 0) { qdevice_log(LOG_DEBUG, "IPC client requested shutdown"); ipc_user_data->shutdown_requested = 1; if (qdevice_ipc_send_buffer(instance, client) != 0) { client->schedule_disconnect = 1; } } else if (strcasecmp(str, "status") == 0) { token = dynar_simple_lex_token_next(&lex); - str = dynar_data(token); - if (token != NULL && strcmp(str, "") != 0) { + if (token != NULL && (str = dynar_data(token), strcmp(str, "")) != 0) { if (strcasecmp(str, "verbose") == 0) { verbose = 1; } } if (qdevice_ipc_cmd_status(instance, &client->send_buffer, verbose) != 0) { if (qdevice_ipc_send_error(instance, client, "Can't get QDevice status") != 0) { client->schedule_disconnect = 1; } } else { if (qdevice_ipc_send_buffer(instance, client) != 0) { client->schedule_disconnect = 1; } } } else { qdevice_log(LOG_DEBUG, "IPC client sent unknown command"); if (qdevice_ipc_send_error(instance, client, "Unknown command '%s'", str) != 0) { client->schedule_disconnect = 1; } } dynar_simple_lex_destroy(&lex); } void qdevice_ipc_io_read(struct qdevice_instance *instance, struct unix_socket_client *client) { int res; res = unix_socket_client_io_read(client); switch (res) { case 0: /* * Partial read */ break; case -1: qdevice_log(LOG_DEBUG, "IPC client closed connection"); client->schedule_disconnect = 1; break; case -2: qdevice_log(LOG_ERR, "Can't store message from IPC client. Disconnecting client."); client->schedule_disconnect = 1; break; case -3: qdevice_log_err(LOG_ERR, "Can't receive message from IPC client. Disconnecting client."); client->schedule_disconnect = 1; break; case 1: /* * Full message received */ unix_socket_client_read_line(client, 0); qdevice_ipc_parse_line(instance, client); break; } } void qdevice_ipc_io_write(struct qdevice_instance *instance, struct unix_socket_client *client) { int res; struct qdevice_ipc_user_data *ipc_user_data; ipc_user_data = (struct qdevice_ipc_user_data *)client->user_data; res = unix_socket_client_io_write(client); switch (res) { case 0: /* * Partial send */ break; case -1: qdevice_log(LOG_DEBUG, "IPC client closed connection"); client->schedule_disconnect = 1; break; case -2: qdevice_log_err(LOG_ERR, "Can't send message to IPC client. Disconnecting client"); client->schedule_disconnect = 1; break; case 1: /* * Full message sent */ unix_socket_client_write_buffer(client, 0); client->schedule_disconnect = 1; if (ipc_user_data->shutdown_requested) { qdevice_ipc_close(instance); } break; } } diff --git a/qdevices/qnetd-algo-ffsplit.c b/qdevices/qnetd-algo-ffsplit.c index 8ae73d61..01e5f0c7 100644 --- a/qdevices/qnetd-algo-ffsplit.c +++ b/qdevices/qnetd-algo-ffsplit.c @@ -1,805 +1,813 @@ /* * Copyright (c) 2015-2016 Red Hat, Inc. * * All rights reserved. * * Author: Jan Friesse (jfriesse@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the Red Hat, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include "qnetd-algo-ffsplit.h" #include "qnetd-log.h" #include "qnetd-log-debug.h" #include "qnetd-cluster-list.h" #include "qnetd-cluster.h" #include "qnetd-client-send.h" enum qnetd_algo_ffsplit_cluster_state { QNETD_ALGO_FFSPLIT_CLUSTER_STATE_WAITING_FOR_CHANGE, QNETD_ALGO_FFSPLIT_CLUSTER_STATE_WAITING_FOR_STABLE_MEMBERSHIP, QNETD_ALGO_FFSPLIT_CLUSTER_STATE_SENDING_NACKS, QNETD_ALGO_FFSPLIT_CLUSTER_STATE_SENDING_ACKS, }; struct qnetd_algo_ffsplit_cluster_data { enum qnetd_algo_ffsplit_cluster_state cluster_state; const struct node_list *quorate_partition_node_list; }; enum qnetd_algo_ffsplit_client_state { QNETD_ALGO_FFSPLIT_CLIENT_STATE_WAITING_FOR_CHANGE, QNETD_ALGO_FFSPLIT_CLIENT_STATE_SENDING_NACK, QNETD_ALGO_FFSPLIT_CLIENT_STATE_SENDING_ACK, }; struct qnetd_algo_ffsplit_client_data { enum qnetd_algo_ffsplit_client_state client_state; uint32_t vote_info_expected_seq_num; }; enum tlv_reply_error_code qnetd_algo_ffsplit_client_init(struct qnetd_client *client) { struct qnetd_algo_ffsplit_cluster_data *cluster_data; struct qnetd_algo_ffsplit_client_data *client_data; if (qnetd_cluster_size(client->cluster) == 1) { cluster_data = malloc(sizeof(*cluster_data)); if (cluster_data == NULL) { qnetd_log(LOG_ERR, "ffsplit: Can't initialize cluster data for client %s", client->addr_str); return (TLV_REPLY_ERROR_CODE_INTERNAL_ERROR); } memset(cluster_data, 0, sizeof(*cluster_data)); cluster_data->cluster_state = QNETD_ALGO_FFSPLIT_CLUSTER_STATE_WAITING_FOR_CHANGE; cluster_data->quorate_partition_node_list = NULL; client->cluster->algorithm_data = cluster_data; } client_data = malloc(sizeof(*client_data)); if (client_data == NULL) { qnetd_log(LOG_ERR, "ffsplit: Can't initialize node data for client %s", client->addr_str); return (TLV_REPLY_ERROR_CODE_INTERNAL_ERROR); } memset(client_data, 0, sizeof(*client_data)); client_data->client_state = QNETD_ALGO_FFSPLIT_CLIENT_STATE_WAITING_FOR_CHANGE; client->algorithm_data = client_data; return (TLV_REPLY_ERROR_CODE_NO_ERROR); } static int qnetd_algo_ffsplit_is_prefered_partition(const struct qnetd_client *client, const struct node_list *config_node_list, const struct node_list *membership_node_list) { uint32_t prefered_node_id; struct node_list_entry *node_entry; int case_processed; prefered_node_id = 0; case_processed = 0; switch (client->tie_breaker.mode) { case TLV_TIE_BREAKER_MODE_LOWEST: node_entry = TAILQ_FIRST(config_node_list); prefered_node_id = node_entry->node_id; TAILQ_FOREACH(node_entry, config_node_list, entries) { if (node_entry->node_id < prefered_node_id) { prefered_node_id = node_entry->node_id; } } case_processed = 1; break; case TLV_TIE_BREAKER_MODE_HIGHEST: node_entry = TAILQ_FIRST(config_node_list); prefered_node_id = node_entry->node_id; TAILQ_FOREACH(node_entry, config_node_list, entries) { if (node_entry->node_id > prefered_node_id) { prefered_node_id = node_entry->node_id; } } case_processed = 1; break; case TLV_TIE_BREAKER_MODE_NODE_ID: prefered_node_id = client->tie_breaker.node_id; case_processed = 1; break; } if (!case_processed) { qnetd_log(LOG_CRIT, "qnetd_algo_ffsplit_is_prefered_partition unprocessed " "tie_breaker.mode"); exit(1); } return (node_list_find_node_id(membership_node_list, prefered_node_id) != NULL); } static int qnetd_algo_ffsplit_is_membership_stable(const struct qnetd_client *client, int client_leaving, const struct tlv_ring_id *ring_id, const struct node_list *config_node_list, const struct node_list *membership_node_list) { const struct qnetd_client *iter_client1, *iter_client2; const struct node_list *config_node_list1, *config_node_list2; const struct node_list *membership_node_list1, *membership_node_list2; const struct node_list_entry *iter_node1, *iter_node2; const struct node_list_entry *iter_node3, *iter_node4; const struct tlv_ring_id *ring_id1, *ring_id2; /* * Test if all active clients share same config list. */ TAILQ_FOREACH(iter_client1, &client->cluster->client_list, cluster_entries) { TAILQ_FOREACH(iter_client2, &client->cluster->client_list, cluster_entries) { if (iter_client1 == iter_client2) { continue; } if (iter_client1->node_id == client->node_id) { if (client_leaving) { continue; } config_node_list1 = config_node_list; } else { config_node_list1 = &iter_client1->configuration_node_list; } if (iter_client2->node_id == client->node_id) { if (client_leaving) { continue; } config_node_list2 = config_node_list; } else { config_node_list2 = &iter_client2->configuration_node_list; } /* * Walk thru all node ids in given config node list... */ TAILQ_FOREACH(iter_node1, config_node_list1, entries) { /* * ... and try to find given node id in other list */ iter_node2 = node_list_find_node_id(config_node_list2, iter_node1->node_id); if (iter_node2 == NULL) { /* * Node with iter_node1->node_id was not found in * config_node_list2 -> lists doesn't match */ return (0); } } } } /* * Test if same partitions share same ring ids and membership node list */ TAILQ_FOREACH(iter_client1, &client->cluster->client_list, cluster_entries) { if (iter_client1->node_id == client->node_id) { if (client_leaving) { continue; } membership_node_list1 = membership_node_list; ring_id1 = ring_id; } else { membership_node_list1 = &iter_client1->last_membership_node_list; ring_id1 = &iter_client1->last_ring_id; } /* * Walk thru all memberships nodes */ TAILQ_FOREACH(iter_node1, membership_node_list1, entries) { /* * try to find client with given node id */ iter_client2 = qnetd_cluster_find_client_by_node_id(client->cluster, iter_node1->node_id); if (iter_client2 == NULL) { /* * Client with given id is not connected */ continue; } if (iter_client2->node_id == client->node_id) { if (client_leaving) { continue; } membership_node_list2 = membership_node_list; ring_id2 = ring_id; } else { membership_node_list2 = &iter_client2->last_membership_node_list; ring_id2 = &iter_client2->last_ring_id; } /* * Compare ring ids */ if (!tlv_ring_id_eq(ring_id1, ring_id2)) { return (0); } /* * Now compare that membership node list equals, so walk thru all * members ... */ TAILQ_FOREACH(iter_node3, membership_node_list1, entries) { /* * ... and try to find given node id in other membership node list */ iter_node4 = node_list_find_node_id(membership_node_list2, iter_node3->node_id); if (iter_node4 == NULL) { /* * Node with iter_node3->node_id was not found in * membership_node_list2 -> lists doesn't match */ return (0); } } } } return (1); } static size_t qnetd_algo_ffsplit_no_active_clients_in_partition(const struct qnetd_client *client, const struct node_list *membership_node_list) { const struct node_list_entry *iter_node; const struct qnetd_client *iter_client; size_t res; res = 0; if (client == NULL || membership_node_list == NULL) { return (0); } TAILQ_FOREACH(iter_node, membership_node_list, entries) { iter_client = qnetd_cluster_find_client_by_node_id(client->cluster, iter_node->node_id); if (iter_client != NULL) { res++; } } return (res); } /* * Compares two partitions. Return 1 if client1, config_node_list1, membership_node_list1 is * "better" than client2, config_node_list2, membership_node_list2 */ static int qnetd_algo_ffsplit_partition_cmp(const struct qnetd_client *client1, const struct node_list *config_node_list1, const struct node_list *membership_node_list1, const struct qnetd_client *client2, const struct node_list *config_node_list2, const struct node_list *membership_node_list2) { size_t part1_active_clients, part2_active_clients; + int res; + + res = -1; if (node_list_size(config_node_list1) % 2 != 0) { /* * Odd clusters never split into 50:50. */ if (node_list_size(membership_node_list1) > node_list_size(config_node_list1) / 2) { - return (1); + res = 1; goto exit_res; } else { - return (0); + res = 0; goto exit_res; } } else { if (node_list_size(membership_node_list1) > node_list_size(config_node_list1) / 2) { - return (1); + res = 1; goto exit_res; } else if (node_list_size(membership_node_list1) < node_list_size(config_node_list1) / 2) { - return (0); + res = 0; goto exit_res; } /* * 50:50 split */ /* * Check how many active clients are in partitions */ part1_active_clients = qnetd_algo_ffsplit_no_active_clients_in_partition( client1, membership_node_list1); part2_active_clients = qnetd_algo_ffsplit_no_active_clients_in_partition( client2, membership_node_list2); if (part1_active_clients > part2_active_clients) { - return (1); + res = 1; goto exit_res; } else if (part1_active_clients < part2_active_clients) { - return (0); + res = 0; goto exit_res; } /* * Number of active clients in both partitions equals. Use tie-breaker. */ if (qnetd_algo_ffsplit_is_prefered_partition(client1, config_node_list1, membership_node_list1)) { - return (1); + res = 1; goto exit_res; } else { - return (0); + res = 0; goto exit_res; } } - qnetd_log(LOG_CRIT, "qnetd_algo_ffsplit_partition_cmp unhandled case"); - exit(1); - /* NOTREACHED */ +exit_res: + if (res == -1) { + qnetd_log(LOG_CRIT, "qnetd_algo_ffsplit_partition_cmp unhandled case"); + exit(1); + /* NOTREACHED */ + } + + return (res); } /* * Select best partition for given client->cluster. * If there is no partition which could become quorate, NULL is returned */ static const struct node_list * qnetd_algo_ffsplit_select_partition(const struct qnetd_client *client, int client_leaving, const struct node_list *config_node_list, const struct node_list *membership_node_list) { const struct qnetd_client *iter_client; const struct qnetd_client *best_client; const struct node_list *best_config_node_list, *best_membership_node_list; const struct node_list *iter_config_node_list, *iter_membership_node_list; best_client = NULL; best_config_node_list = best_membership_node_list = NULL; /* * Get highest score */ TAILQ_FOREACH(iter_client, &client->cluster->client_list, cluster_entries) { if (iter_client->node_id == client->node_id) { if (client_leaving) { continue; } iter_config_node_list = config_node_list; iter_membership_node_list = membership_node_list; } else { iter_config_node_list = &iter_client->configuration_node_list; iter_membership_node_list = &iter_client->last_membership_node_list; } if (qnetd_algo_ffsplit_partition_cmp(iter_client, iter_config_node_list, iter_membership_node_list, best_client, best_config_node_list, best_membership_node_list) > 0) { best_client = iter_client; best_config_node_list = iter_config_node_list; best_membership_node_list = iter_membership_node_list; } } return (best_membership_node_list); } /* * Update state of all nodes to match quorate_partition_node_list */ static void qnetd_algo_ffsplit_update_nodes_state(struct qnetd_client *client, int client_leaving, const struct node_list *quorate_partition_node_list) { const struct qnetd_client *iter_client; struct qnetd_algo_ffsplit_client_data *iter_client_data; TAILQ_FOREACH(iter_client, &client->cluster->client_list, cluster_entries) { iter_client_data = (struct qnetd_algo_ffsplit_client_data *)iter_client->algorithm_data; if (iter_client->node_id == client->node_id && client_leaving) { iter_client_data->client_state = QNETD_ALGO_FFSPLIT_CLIENT_STATE_WAITING_FOR_CHANGE; continue; } if (quorate_partition_node_list == NULL || node_list_find_node_id(quorate_partition_node_list, iter_client->node_id) == NULL) { iter_client_data->client_state = QNETD_ALGO_FFSPLIT_CLIENT_STATE_SENDING_NACK; } else { iter_client_data->client_state = QNETD_ALGO_FFSPLIT_CLIENT_STATE_SENDING_ACK; } } } /* * Send vote info. If client_leaving is set, client is ignored. if send_acks * is set, only ACK votes are send (nodes in QNETD_ALGO_FFSPLIT_CLIENT_STATE_SENDING_ACK state), * otherwise only NACK votes are send (nodes in QNETD_ALGO_FFSPLIT_CLIENT_STATE_SENDING_NACK state) * * Returns number of send votes */ static size_t qnetd_algo_ffsplit_send_votes(struct qnetd_client *client, int client_leaving, const struct tlv_ring_id *ring_id, int send_acks) { size_t sent_votes; struct qnetd_client *iter_client; struct qnetd_algo_ffsplit_client_data *iter_client_data; const struct tlv_ring_id *ring_id_to_send; enum tlv_vote vote_to_send; sent_votes = 0; TAILQ_FOREACH(iter_client, &client->cluster->client_list, cluster_entries) { if (iter_client->node_id == client->node_id) { if (client_leaving) { continue; } ring_id_to_send = ring_id; } else { ring_id_to_send = &iter_client->last_ring_id; } iter_client_data = (struct qnetd_algo_ffsplit_client_data *)iter_client->algorithm_data; vote_to_send = TLV_VOTE_UNDEFINED; if (send_acks) { if (iter_client_data->client_state == QNETD_ALGO_FFSPLIT_CLIENT_STATE_SENDING_ACK) { vote_to_send = TLV_VOTE_ACK; } } else { if (iter_client_data->client_state == QNETD_ALGO_FFSPLIT_CLIENT_STATE_SENDING_NACK) { vote_to_send = TLV_VOTE_NACK; } } if (vote_to_send != TLV_VOTE_UNDEFINED) { iter_client_data->vote_info_expected_seq_num++; sent_votes++; if (qnetd_client_send_vote_info(iter_client, iter_client_data->vote_info_expected_seq_num, ring_id_to_send, vote_to_send) == -1) { client->schedule_disconnect = 1; } } } return (sent_votes); } /* * Return number of clients in QNETD_ALGO_FFSPLIT_CLIENT_STATE_SENDING_ACK state if sending_acks is * set or number of nodes in QNETD_ALGO_FFSPLIT_CLIENT_STATE_SENDING_NACK state if sending_acks is * not set */ static size_t qnetd_algo_ffsplit_no_clients_in_sending_state(struct qnetd_client *client, int sending_acks) { size_t no_clients; struct qnetd_client *iter_client; struct qnetd_algo_ffsplit_client_data *iter_client_data; no_clients = 0; TAILQ_FOREACH(iter_client, &client->cluster->client_list, cluster_entries) { iter_client_data = (struct qnetd_algo_ffsplit_client_data *)iter_client->algorithm_data; if (sending_acks && iter_client_data->client_state == QNETD_ALGO_FFSPLIT_CLIENT_STATE_SENDING_ACK) { no_clients++; } if (!sending_acks && iter_client_data->client_state == QNETD_ALGO_FFSPLIT_CLIENT_STATE_SENDING_NACK) { no_clients++; } } return (no_clients); } static enum tlv_vote qnetd_algo_ffsplit_do(struct qnetd_client *client, int client_leaving, const struct tlv_ring_id *ring_id, const struct node_list *config_node_list, const struct node_list *membership_node_list) { struct qnetd_algo_ffsplit_cluster_data *cluster_data; const struct node_list *quorate_partition_node_list; cluster_data = (struct qnetd_algo_ffsplit_cluster_data *)client->cluster->algorithm_data; cluster_data->cluster_state = QNETD_ALGO_FFSPLIT_CLUSTER_STATE_WAITING_FOR_STABLE_MEMBERSHIP; if (!qnetd_algo_ffsplit_is_membership_stable(client, client_leaving, ring_id, config_node_list, membership_node_list)) { /* * Wait until membership is stable */ qnetd_log(LOG_DEBUG, "ffsplit: Membership for cluster %s is not yet stable", client->cluster_name); return (TLV_VOTE_WAIT_FOR_REPLY); } qnetd_log(LOG_DEBUG, "ffsplit: Membership for cluster %s is now stable", client->cluster_name); quorate_partition_node_list = qnetd_algo_ffsplit_select_partition(client, client_leaving, config_node_list, membership_node_list); cluster_data->quorate_partition_node_list = quorate_partition_node_list; if (quorate_partition_node_list == NULL) { qnetd_log(LOG_DEBUG, "ffsplit: No quorate partition was selected"); } else { qnetd_log(LOG_DEBUG, "ffsplit: Quorate partition selected"); qnetd_log_debug_dump_node_list(client, quorate_partition_node_list); } qnetd_algo_ffsplit_update_nodes_state(client, client_leaving, quorate_partition_node_list); cluster_data->cluster_state = QNETD_ALGO_FFSPLIT_CLUSTER_STATE_SENDING_NACKS; if (qnetd_algo_ffsplit_send_votes(client, client_leaving, ring_id, 0) == 0) { qnetd_log(LOG_DEBUG, "ffsplit: No client gets NACK"); /* * No one gets nack -> send acks */ cluster_data->cluster_state = QNETD_ALGO_FFSPLIT_CLUSTER_STATE_SENDING_ACKS; if (qnetd_algo_ffsplit_send_votes(client, client_leaving, ring_id, 1) == 0) { qnetd_log(LOG_DEBUG, "ffsplit: No client gets ACK"); /* * No one gets acks -> finished */ cluster_data->cluster_state = QNETD_ALGO_FFSPLIT_CLUSTER_STATE_WAITING_FOR_CHANGE; } } return (TLV_VOTE_NO_CHANGE); } enum tlv_reply_error_code qnetd_algo_ffsplit_config_node_list_received(struct qnetd_client *client, uint32_t msg_seq_num, int config_version_set, uint64_t config_version, const struct node_list *nodes, int initial, enum tlv_vote *result_vote) { if (node_list_size(nodes) == 0) { /* * Empty node list shouldn't happen */ qnetd_log(LOG_ERR, "ffsplit: Received empty config node list for client %s", client->addr_str); return (TLV_REPLY_ERROR_CODE_INVALID_CONFIG_NODE_LIST); } if (node_list_find_node_id(nodes, client->node_id) == NULL) { /* * Current node is not in node list */ qnetd_log(LOG_ERR, "ffsplit: Received config node list without client %s", client->addr_str); return (TLV_REPLY_ERROR_CODE_INVALID_CONFIG_NODE_LIST); } if (initial || node_list_size(&client->last_membership_node_list) == 0) { /* * Initial node list -> membership is going to be send by client */ *result_vote = TLV_VOTE_ASK_LATER; } else { *result_vote = qnetd_algo_ffsplit_do(client, 0, &client->last_ring_id, nodes, &client->last_membership_node_list); } return (TLV_REPLY_ERROR_CODE_NO_ERROR); } /* * Called after client sent membership node list. * All client fields are already set. Nodes is actual node list. * msg_seq_num is 32-bit number set by client. If client sent config file version, * config_version_set is set to 1 and config_version contains valid config file version. * ring_id and quorate are copied from client votequorum callback. * * Function has to return result_vote. This can be one of ack/nack, ask_later (client * should ask later for a vote) or wait_for_reply (client should wait for reply). * * Return TLV_REPLY_ERROR_CODE_NO_ERROR on success, different TLV_REPLY_ERROR_CODE_* * on failure (error is send back to client) */ enum tlv_reply_error_code qnetd_algo_ffsplit_membership_node_list_received(struct qnetd_client *client, uint32_t msg_seq_num, const struct tlv_ring_id *ring_id, const struct node_list *nodes, enum tlv_vote *result_vote) { if (node_list_size(nodes) == 0) { /* * Empty node list shouldn't happen */ qnetd_log(LOG_ERR, "ffsplit: Received empty membership node list for client %s", client->addr_str); return (TLV_REPLY_ERROR_CODE_INVALID_MEMBERSHIP_NODE_LIST); } if (node_list_find_node_id(nodes, client->node_id) == NULL) { /* * Current node is not in node list */ qnetd_log(LOG_ERR, "ffsplit: Received membership node list without client %s", client->addr_str); return (TLV_REPLY_ERROR_CODE_INVALID_MEMBERSHIP_NODE_LIST); } if (node_list_size(&client->configuration_node_list) == 0) { /* * Config node list not received -> it's going to be sent later */ *result_vote = TLV_VOTE_ASK_LATER; } else { *result_vote = qnetd_algo_ffsplit_do(client, 0, ring_id, &client->configuration_node_list, nodes); } return (TLV_REPLY_ERROR_CODE_NO_ERROR); } enum tlv_reply_error_code qnetd_algo_ffsplit_quorum_node_list_received(struct qnetd_client *client, uint32_t msg_seq_num, enum tlv_quorate quorate, const struct node_list *nodes, enum tlv_vote *result_vote) { /* * Quorum node list is informative -> no change */ *result_vote = TLV_VOTE_NO_CHANGE; return (TLV_REPLY_ERROR_CODE_NO_ERROR); } void qnetd_algo_ffsplit_client_disconnect(struct qnetd_client *client, int server_going_down) { (void)qnetd_algo_ffsplit_do(client, 1, &client->last_ring_id, &client->configuration_node_list, &client->last_membership_node_list); free(client->algorithm_data); if (qnetd_cluster_size(client->cluster) == 1) { /* * Last client in the cluster */ free(client->cluster->algorithm_data); } } enum tlv_reply_error_code qnetd_algo_ffsplit_ask_for_vote_received(struct qnetd_client *client, uint32_t msg_seq_num, enum tlv_vote *result_vote) { /* * Ask for vote is not supported in current algorithm */ return (TLV_REPLY_ERROR_CODE_UNSUPPORTED_DECISION_ALGORITHM_MESSAGE); } enum tlv_reply_error_code qnetd_algo_ffsplit_vote_info_reply_received(struct qnetd_client *client, uint32_t msg_seq_num) { struct qnetd_algo_ffsplit_cluster_data *cluster_data; struct qnetd_algo_ffsplit_client_data *client_data; cluster_data = (struct qnetd_algo_ffsplit_cluster_data *)client->cluster->algorithm_data; client_data = (struct qnetd_algo_ffsplit_client_data *)client->algorithm_data; if (client_data->vote_info_expected_seq_num != msg_seq_num) { qnetd_log(LOG_DEBUG, "ffsplit: Received old vote info reply from client %s", client->addr_str); return (TLV_REPLY_ERROR_CODE_NO_ERROR); } client_data->client_state = QNETD_ALGO_FFSPLIT_CLIENT_STATE_WAITING_FOR_CHANGE; if (cluster_data->cluster_state != QNETD_ALGO_FFSPLIT_CLUSTER_STATE_SENDING_NACKS && cluster_data->cluster_state != QNETD_ALGO_FFSPLIT_CLUSTER_STATE_SENDING_ACKS) { return (TLV_REPLY_ERROR_CODE_NO_ERROR); } if (cluster_data->cluster_state == QNETD_ALGO_FFSPLIT_CLUSTER_STATE_SENDING_NACKS) { if (qnetd_algo_ffsplit_no_clients_in_sending_state(client, 0) == 0) { qnetd_log(LOG_DEBUG, "ffsplit: All NACK votes sent for cluster %s", client->cluster_name); cluster_data->cluster_state = QNETD_ALGO_FFSPLIT_CLUSTER_STATE_SENDING_ACKS; if (qnetd_algo_ffsplit_send_votes(client, 0, &client->last_ring_id, 1) == 0) { qnetd_log(LOG_DEBUG, "ffsplit: No client gets ACK"); /* * No one gets acks -> finished */ cluster_data->cluster_state = QNETD_ALGO_FFSPLIT_CLUSTER_STATE_WAITING_FOR_CHANGE; } } } else { if (qnetd_algo_ffsplit_no_clients_in_sending_state(client, 1) == 0) { qnetd_log(LOG_DEBUG, "ffsplit: All ACK votes sent for cluster %s", client->cluster_name); cluster_data->cluster_state = QNETD_ALGO_FFSPLIT_CLUSTER_STATE_WAITING_FOR_CHANGE; } } return (TLV_REPLY_ERROR_CODE_NO_ERROR); } enum tlv_reply_error_code qnetd_algo_ffsplit_timer_callback(struct qnetd_client *client, int *reschedule_timer, int *send_vote, enum tlv_vote *result_vote) { return (TLV_REPLY_ERROR_CODE_NO_ERROR); } static struct qnetd_algorithm qnetd_algo_ffsplit = { .init = qnetd_algo_ffsplit_client_init, .config_node_list_received = qnetd_algo_ffsplit_config_node_list_received, .membership_node_list_received = qnetd_algo_ffsplit_membership_node_list_received, .quorum_node_list_received = qnetd_algo_ffsplit_quorum_node_list_received, .client_disconnect = qnetd_algo_ffsplit_client_disconnect, .ask_for_vote_received = qnetd_algo_ffsplit_ask_for_vote_received, .vote_info_reply_received = qnetd_algo_ffsplit_vote_info_reply_received, .timer_callback = qnetd_algo_ffsplit_timer_callback, }; enum tlv_reply_error_code qnetd_algo_ffsplit_register() { return (qnetd_algorithm_register(TLV_DECISION_ALGORITHM_TYPE_FFSPLIT, &qnetd_algo_ffsplit)); } diff --git a/qdevices/qnetd-ipc.c b/qdevices/qnetd-ipc.c index 2370804c..0d40275e 100644 --- a/qdevices/qnetd-ipc.c +++ b/qdevices/qnetd-ipc.c @@ -1,408 +1,412 @@ /* * Copyright (c) 2015-2016 Red Hat, Inc. * * All rights reserved. * * Author: Jan Friesse (jfriesse@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the Red Hat, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include "qnet-config.h" #include "qnetd-ipc.h" #include "qnetd-ipc-cmd.h" #include "qnetd-log.h" #include "unix-socket-ipc.h" #include "dynar-simple-lex.h" #include "dynar-str.h" /* * Needed for creating nspr handle from unix fd */ #include int qnetd_ipc_init(struct qnetd_instance *instance) { if (unix_socket_ipc_init(&instance->local_ipc, instance->advanced_settings->local_socket_file, instance->advanced_settings->local_socket_backlog, instance->advanced_settings->ipc_max_clients, instance->advanced_settings->ipc_max_receive_size, instance->advanced_settings->ipc_max_send_size) != 0) { qnetd_log_err(LOG_ERR, "Can't create unix socket"); return (-1); } if ((instance->ipc_socket_poll_fd = PR_CreateSocketPollFd(instance->local_ipc.socket)) == NULL) { qnetd_log_nss(LOG_CRIT, "Can't create NSPR IPC socket poll fd"); return (-1); } return (0); } int qnetd_ipc_close(struct qnetd_instance *instance) { int res; res = unix_socket_ipc_close(&instance->local_ipc); if (res != 0) { qnetd_log_err(LOG_WARNING, "Can't close local IPC"); } return (res); } int qnetd_ipc_is_closed(struct qnetd_instance *instance) { return (unix_socket_ipc_is_closed(&instance->local_ipc)); } int qnetd_ipc_destroy(struct qnetd_instance *instance) { int res; struct unix_socket_client *client; const struct unix_socket_client_list *ipc_client_list; ipc_client_list = &instance->local_ipc.clients; TAILQ_FOREACH(client, ipc_client_list, entries) { free(client->user_data); } if (PR_DestroySocketPollFd(instance->ipc_socket_poll_fd) != PR_SUCCESS) { qnetd_log_nss(LOG_WARNING, "Unable to destroy IPC poll socket fd"); } res = unix_socket_ipc_destroy(&instance->local_ipc); if (res != 0) { qnetd_log_err(LOG_WARNING, "Can't destroy local IPC"); } return (res); } int qnetd_ipc_accept(struct qnetd_instance *instance, struct unix_socket_client **res_client) { int res; int accept_res; PRFileDesc *prfd; accept_res = unix_socket_ipc_accept(&instance->local_ipc, res_client); switch (accept_res) { case -1: qnetd_log_err(LOG_ERR, "Can't accept local IPC connection"); res = -1; goto return_res; break; case -2: qnetd_log(LOG_ERR, "Maximum IPC clients reached. Not accepting connection"); res = -1; goto return_res; break; case -3: qnetd_log(LOG_ERR, "Can't add client to list"); res = -1; goto return_res; break; default: unix_socket_client_read_line(*res_client, 1); res = 0; break; } (*res_client)->user_data = malloc(sizeof(struct qnetd_ipc_user_data)); if ((*res_client)->user_data == NULL) { qnetd_log(LOG_ERR, "Can't alloc IPC client user data"); res = -1; qnetd_ipc_client_disconnect(instance, *res_client); goto return_res; } memset((*res_client)->user_data, 0, sizeof(struct qnetd_ipc_user_data)); prfd = PR_CreateSocketPollFd((*res_client)->socket); if (prfd == NULL) { qnetd_log_nss(LOG_CRIT, "Can't create NSPR poll fd for IPC client. Disconnecting client"); qnetd_ipc_client_disconnect(instance, *res_client); res = -1; goto return_res; } ((struct qnetd_ipc_user_data *)(*res_client)->user_data)->nspr_poll_fd = prfd; return_res: return (res); } void qnetd_ipc_client_disconnect(struct qnetd_instance *instance, struct unix_socket_client *client) { if (PR_DestroySocketPollFd( ((struct qnetd_ipc_user_data *)(client)->user_data)->nspr_poll_fd) != PR_SUCCESS) { qnetd_log_nss(LOG_WARNING, "Unable to destroy client IPC poll socket fd"); } free(client->user_data); unix_socket_ipc_client_disconnect(&instance->local_ipc, client); } int qnetd_ipc_send_error(struct qnetd_instance *instance, struct unix_socket_client *client, const char *error_fmt, ...) { va_list ap; int res; va_start(ap, error_fmt); res = ((dynar_str_cpy(&client->send_buffer, "Error\n") == 0) && (dynar_str_vcatf(&client->send_buffer, error_fmt, ap) > 0) && (dynar_str_cat(&client->send_buffer, "\n") == 0)); va_end(ap); if (res) { unix_socket_client_write_buffer(client, 1); } else { qnetd_log(LOG_ERR, "Can't send ipc error to client (buffer too small)"); } return (res ? 0 : -1); } int qnetd_ipc_send_buffer(struct qnetd_instance *instance, struct unix_socket_client *client) { if (dynar_str_prepend(&client->send_buffer, "OK\n") != 0) { qnetd_log(LOG_ERR, "Can't send ipc message to client (buffer too small)"); if (qnetd_ipc_send_error(instance, client, "Internal IPC buffer too small") != 0) { return (-1); } return (0); } unix_socket_client_write_buffer(client, 1); return (0); } static void qnetd_ipc_parse_line(struct qnetd_instance *instance, struct unix_socket_client *client) { struct dynar_simple_lex lex; struct dynar *token; char *str; struct qnetd_ipc_user_data *ipc_user_data; int verbose; char *cluster_name; ipc_user_data = (struct qnetd_ipc_user_data *)client->user_data; dynar_simple_lex_init(&lex, &client->receive_buffer, DYNAR_SIMPLE_LEX_TYPE_QUOTE); token = dynar_simple_lex_token_next(&lex); verbose = 0; cluster_name = NULL; if (token == NULL) { goto exit_err_low_mem; } str = dynar_data(token); if (strcasecmp(str, "") == 0) { qnetd_log(LOG_DEBUG, "IPC client doesn't send command"); if (qnetd_ipc_send_error(instance, client, "No command specified") != 0) { client->schedule_disconnect = 1; } } else if (strcasecmp(str, "shutdown") == 0) { qnetd_log(LOG_DEBUG, "IPC client requested shutdown"); ipc_user_data->shutdown_requested = 1; if (qnetd_ipc_send_buffer(instance, client) != 0) { client->schedule_disconnect = 1; } } else if (strcasecmp(str, "status") == 0) { token = dynar_simple_lex_token_next(&lex); if (token == NULL) { goto exit_err_low_mem; } str = dynar_data(token); if (token != NULL && strcmp(str, "") != 0) { if (strcasecmp(str, "verbose") == 0) { verbose = 1; } } if (qnetd_ipc_cmd_status(instance, &client->send_buffer, verbose) != 0) { if (qnetd_ipc_send_error(instance, client, "Can't get QNetd status") != 0) { client->schedule_disconnect = 1; } } else { if (qnetd_ipc_send_buffer(instance, client) != 0) { client->schedule_disconnect = 1; } } } else if (strcasecmp(str, "list") == 0) { while (((token = dynar_simple_lex_token_next(&lex)) != NULL) && (str = dynar_data(token), strcmp(str, "") != 0)) { if (strcasecmp(str, "verbose") == 0) { verbose = 1; } else if (strcasecmp(str, "cluster") == 0) { token = dynar_simple_lex_token_next(&lex); if (token == NULL) { goto exit_err_low_mem; } + free(cluster_name); cluster_name = NULL; if ((cluster_name = strdup(dynar_data(token))) == NULL) { goto exit_err_low_mem; } } else { break; } } if (qnetd_ipc_cmd_list(instance, &client->send_buffer, verbose, cluster_name) != 0) { if (qnetd_ipc_send_error(instance, client, "Can't get QNetd cluster list") != 0) { client->schedule_disconnect = 1; } } else { if (qnetd_ipc_send_buffer(instance, client) != 0) { client->schedule_disconnect = 1; } } + free(cluster_name); cluster_name = NULL; } else { qnetd_log(LOG_DEBUG, "IPC client sent unknown command"); if (qnetd_ipc_send_error(instance, client, "Unknown command '%s'", str) != 0) { client->schedule_disconnect = 1; } } dynar_simple_lex_destroy(&lex); return ; exit_err_low_mem: + free(cluster_name); cluster_name = NULL; + qnetd_log(LOG_ERR, "Can't alloc memory for simple lex"); if (qnetd_ipc_send_error(instance, client, "Command too long") != 0) { client->schedule_disconnect = 1; } } void qnetd_ipc_io_read(struct qnetd_instance *instance, struct unix_socket_client *client) { int res; res = unix_socket_client_io_read(client); switch (res) { case 0: /* * Partial read */ break; case -1: qnetd_log(LOG_DEBUG, "IPC client closed connection"); client->schedule_disconnect = 1; break; case -2: qnetd_log(LOG_ERR, "Can't store message from IPC client. Disconnecting client."); client->schedule_disconnect = 1; break; case -3: qnetd_log_err(LOG_ERR, "Can't receive message from IPC client. Disconnecting client."); client->schedule_disconnect = 1; break; case 1: /* * Full message received */ unix_socket_client_read_line(client, 0); qnetd_ipc_parse_line(instance, client); break; } } void qnetd_ipc_io_write(struct qnetd_instance *instance, struct unix_socket_client *client) { int res; struct qnetd_ipc_user_data *ipc_user_data; ipc_user_data = (struct qnetd_ipc_user_data *)client->user_data; res = unix_socket_client_io_write(client); switch (res) { case 0: /* * Partial send */ break; case -1: qnetd_log(LOG_DEBUG, "IPC client closed connection"); client->schedule_disconnect = 1; break; case -2: qnetd_log_err(LOG_ERR, "Can't send message to IPC client. Disconnecting client"); client->schedule_disconnect = 1; break; case 1: /* * Full message sent */ unix_socket_client_write_buffer(client, 0); client->schedule_disconnect = 1; if (ipc_user_data->shutdown_requested) { qnetd_ipc_close(instance); } break; } } diff --git a/qdevices/timer-list.c b/qdevices/timer-list.c index e37f5f18..ed8bf186 100644 --- a/qdevices/timer-list.c +++ b/qdevices/timer-list.c @@ -1,248 +1,248 @@ /* * Copyright (c) 2015-2016 Red Hat, Inc. * * All rights reserved. * * Author: Jan Friesse (jfriesse@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the Red Hat, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include "timer-list.h" void timer_list_init(struct timer_list *tlist) { memset(tlist, 0, sizeof(*tlist)); TAILQ_INIT(&tlist->list); TAILQ_INIT(&tlist->free_list); } static PRIntervalTime timer_list_entry_time_to_expire(const struct timer_list_entry *entry, PRIntervalTime current_time) { PRIntervalTime diff, half_interval; diff = entry->expire_time - current_time; half_interval = ~0; half_interval /= 2; if (diff > half_interval) { return (0); } return (diff); } static int timer_list_entry_cmp(const struct timer_list_entry *entry1, const struct timer_list_entry *entry2, PRIntervalTime current_time) { PRIntervalTime diff1, diff2; int res; diff1 = timer_list_entry_time_to_expire(entry1, current_time); diff2 = timer_list_entry_time_to_expire(entry2, current_time); res = 0; if (diff1 < diff2) res = -1; if (diff1 > diff2) res = 1; return (res); } static void timer_list_insert_into_list(struct timer_list *tlist, struct timer_list_entry *new_entry) { struct timer_list_entry *entry; /* * This can overflow and it's not a problem */ new_entry->expire_time = new_entry->epoch + PR_MillisecondsToInterval(new_entry->interval); entry = TAILQ_FIRST(&tlist->list); while (entry != NULL) { if (timer_list_entry_cmp(entry, new_entry, new_entry->epoch) > 0) { /* * Insert new entry right before current entry */ TAILQ_INSERT_BEFORE(entry, new_entry, entries); break; } entry = TAILQ_NEXT(entry, entries); } if (entry == NULL) { TAILQ_INSERT_TAIL(&tlist->list, new_entry, entries); } } struct timer_list_entry * timer_list_add(struct timer_list *tlist, PRUint32 interval, timer_list_cb_fn func, void *data1, void *data2) { struct timer_list_entry *new_entry; - if (interval < 1 && interval > TIMER_LIST_MAX_INTERVAL) { + if (interval < 1 || interval > TIMER_LIST_MAX_INTERVAL) { return (NULL); } if (!TAILQ_EMPTY(&tlist->free_list)) { /* * Use free list entry */ new_entry = TAILQ_FIRST(&tlist->free_list); TAILQ_REMOVE(&tlist->free_list, new_entry, entries); } else { /* * Alloc new entry */ new_entry = malloc(sizeof(*new_entry)); if (new_entry == NULL) { return (NULL); } } memset(new_entry, 0, sizeof(*new_entry)); new_entry->epoch = PR_IntervalNow(); new_entry->interval = interval; new_entry->func = func; new_entry->user_data1 = data1; new_entry->user_data2 = data2; new_entry->is_active = 1; timer_list_insert_into_list(tlist, new_entry); return (new_entry); } void timer_list_reschedule(struct timer_list *tlist, struct timer_list_entry *entry) { if (entry->is_active) { entry->epoch = PR_IntervalNow(); TAILQ_REMOVE(&tlist->list, entry, entries); timer_list_insert_into_list(tlist, entry); } } void timer_list_expire(struct timer_list *tlist) { PRIntervalTime now; struct timer_list_entry *entry; int res; now = PR_IntervalNow(); while ((entry = TAILQ_FIRST(&tlist->list)) != NULL && timer_list_entry_time_to_expire(entry, now) == 0) { /* * Expired */ res = entry->func(entry->user_data1, entry->user_data2); if (res == 0) { /* * Move item to free list */ timer_list_delete(tlist, entry); } else if (entry->is_active) { /* * Schedule again */ entry->epoch = now; TAILQ_REMOVE(&tlist->list, entry, entries); timer_list_insert_into_list(tlist, entry); } } } PRIntervalTime timer_list_time_to_expire(struct timer_list *tlist) { struct timer_list_entry *entry; entry = TAILQ_FIRST(&tlist->list); if (entry == NULL) { return (PR_INTERVAL_NO_TIMEOUT); } return (timer_list_entry_time_to_expire(entry, PR_IntervalNow())); } void timer_list_delete(struct timer_list *tlist, struct timer_list_entry *entry) { if (entry->is_active) { /* * Move item to free list */ TAILQ_REMOVE(&tlist->list, entry, entries); TAILQ_INSERT_HEAD(&tlist->free_list, entry, entries); entry->is_active = 0; } } void timer_list_free(struct timer_list *tlist) { struct timer_list_entry *entry; struct timer_list_entry *entry_next; entry = TAILQ_FIRST(&tlist->list); while (entry != NULL) { entry_next = TAILQ_NEXT(entry, entries); free(entry); entry = entry_next; } entry = TAILQ_FIRST(&tlist->free_list); while (entry != NULL) { entry_next = TAILQ_NEXT(entry, entries); free(entry); entry = entry_next; } timer_list_init(tlist); } diff --git a/qdevices/unix-socket.c b/qdevices/unix-socket.c index cfe5c841..1cbd8253 100644 --- a/qdevices/unix-socket.c +++ b/qdevices/unix-socket.c @@ -1,205 +1,205 @@ /* * Copyright (c) 2015-2016 Red Hat, Inc. * * All rights reserved. * * Author: Jan Friesse (jfriesse@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the Red Hat, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include "unix-socket.h" static int unix_socket_set_non_blocking(int fd) { int flags; flags = fcntl(fd, F_GETFL, NULL); if (flags < 0) { return (-1); } flags |= O_NONBLOCK; if (fcntl(fd, F_SETFL, flags) < 0) { return (-1); } return (0); } int unix_socket_server_create(const char *path, int non_blocking, int backlog) { int s; struct sockaddr_un sun; if (strlen(path) >= sizeof(sun.sun_path)) { errno = ENAMETOOLONG; return (-1); } if ((s = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) { return (-1); } memset(&sun, 0, sizeof(sun)); sun.sun_family = AF_UNIX; - strncpy(sun.sun_path, path, sizeof(sun.sun_path)); + strncpy(sun.sun_path, path, strlen(path)); unlink(path); if (bind(s, (struct sockaddr *)&sun, SUN_LEN(&sun)) != 0) { close(s); return (-1); } if (non_blocking) { if (unix_socket_set_non_blocking(s) != 0) { close(s); return (-1); } } if (listen(s, backlog) != 0) { close(s); return (-1); } return (s); } int unix_socket_client_create(const char *path, int non_blocking) { int s; struct sockaddr_un sun; if (strlen(path) >= sizeof(sun.sun_path)) { errno = ENAMETOOLONG; return (-1); } if ((s = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) { return (-1); } memset(&sun, 0, sizeof(sun)); sun.sun_family = AF_UNIX; - strncpy(sun.sun_path, path, sizeof(sun.sun_path)); + strncpy(sun.sun_path, path, strlen(path)); if (non_blocking) { if (unix_socket_set_non_blocking(s) != 0) { close(s); return (-1); } } if (connect(s, (struct sockaddr *)&sun, SUN_LEN(&sun)) != 0) { close(s); return (-1); } return (s); } int unix_socket_server_destroy(int sock, const char *path) { int res; res = 0; if (close(sock) != 0) { res = -1; } if (unlink(path) != 0) { res = -1; } return (res); } int unix_socket_server_accept(int sock, int non_blocking) { struct sockaddr_un sun; socklen_t sun_len; int client_sock; sun_len = sizeof(sun); if ((client_sock = accept(sock, (struct sockaddr *)&sun, &sun_len)) < 0) { return (-1); } if (non_blocking) { if (unix_socket_set_non_blocking(client_sock) != 0) { close(client_sock); return (-1); } } return (client_sock); } int unix_socket_close(int sock) { return (close(sock)); } ssize_t unix_socket_read(int sock, void *buf, size_t len) { return (recv(sock, buf, len, 0)); } ssize_t unix_socket_write(int sock, void *buf, size_t len) { return (send(sock, buf, len, 0)); }