diff --git a/exec/totemconfig.c b/exec/totemconfig.c index 57a1587a..46e09952 100644 --- a/exec/totemconfig.c +++ b/exec/totemconfig.c @@ -1,2438 +1,2444 @@ /* * Copyright (c) 2002-2005 MontaVista Software, Inc. * Copyright (c) 2006-2019 Red Hat, Inc. * * All rights reserved. * * Author: Steven Dake (sdake@redhat.com) * Jan Friesse (jfriesse@redhat.com) * Chrissie Caulfield (ccaulfie@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "util.h" #include "totemconfig.h" #define TOKEN_RETRANSMITS_BEFORE_LOSS_CONST 4 #define TOKEN_TIMEOUT 3000 #define TOKEN_WARNING 75 #define TOKEN_COEFFICIENT 650 #define JOIN_TIMEOUT 50 #define MERGE_TIMEOUT 200 #define DOWNCHECK_TIMEOUT 1000 #define FAIL_TO_RECV_CONST 2500 #define SEQNO_UNCHANGED_CONST 30 #define MINIMUM_TIMEOUT (int)(1000/HZ)*3 #define MINIMUM_TIMEOUT_HOLD (int)(MINIMUM_TIMEOUT * 0.8 - (1000/HZ)) #define MAX_NETWORK_DELAY 50 #define WINDOW_SIZE 50 #define MAX_MESSAGES 17 #define MISS_COUNT_CONST 5 #define BLOCK_UNLISTED_IPS 1 +#define CANCEL_TOKEN_HOLD_ON_RETRANSMIT 0 /* This constant is not used for knet */ #define UDP_NETMTU 1500 /* Currently all but PONG_COUNT match the defaults in libknet.h */ #define KNET_PING_INTERVAL 1000 #define KNET_PING_TIMEOUT 2000 #define KNET_PING_PRECISION 2048 #define KNET_PONG_COUNT 2 #define KNET_PMTUD_INTERVAL 30 #define KNET_DEFAULT_TRANSPORT KNET_TRANSPORT_UDP #define DEFAULT_PORT 5405 static char error_string_response[768]; static void add_totem_config_notification(struct totem_config *totem_config); static void *totem_get_param_by_name(struct totem_config *totem_config, const char *param_name) { if (strcmp(param_name, "totem.token") == 0) return &totem_config->token_timeout; if (strcmp(param_name, "totem.token_warning") == 0) return &totem_config->token_warning; if (strcmp(param_name, "totem.token_retransmit") == 0) return &totem_config->token_retransmit_timeout; if (strcmp(param_name, "totem.hold") == 0) return &totem_config->token_hold_timeout; if (strcmp(param_name, "totem.token_retransmits_before_loss_const") == 0) return &totem_config->token_retransmits_before_loss_const; if (strcmp(param_name, "totem.join") == 0) return &totem_config->join_timeout; if (strcmp(param_name, "totem.send_join") == 0) return &totem_config->send_join_timeout; if (strcmp(param_name, "totem.consensus") == 0) return &totem_config->consensus_timeout; if (strcmp(param_name, "totem.merge") == 0) return &totem_config->merge_timeout; if (strcmp(param_name, "totem.downcheck") == 0) return &totem_config->downcheck_timeout; if (strcmp(param_name, "totem.fail_recv_const") == 0) return &totem_config->fail_to_recv_const; if (strcmp(param_name, "totem.seqno_unchanged_const") == 0) return &totem_config->seqno_unchanged_const; if (strcmp(param_name, "totem.heartbeat_failures_allowed") == 0) return &totem_config->heartbeat_failures_allowed; if (strcmp(param_name, "totem.max_network_delay") == 0) return &totem_config->max_network_delay; if (strcmp(param_name, "totem.window_size") == 0) return &totem_config->window_size; if (strcmp(param_name, "totem.max_messages") == 0) return &totem_config->max_messages; if (strcmp(param_name, "totem.miss_count_const") == 0) return &totem_config->miss_count_const; if (strcmp(param_name, "totem.knet_pmtud_interval") == 0) return &totem_config->knet_pmtud_interval; if (strcmp(param_name, "totem.knet_compression_threshold") == 0) return &totem_config->knet_compression_threshold; if (strcmp(param_name, "totem.knet_compression_level") == 0) return &totem_config->knet_compression_level; if (strcmp(param_name, "totem.knet_compression_model") == 0) return totem_config->knet_compression_model; if (strcmp(param_name, "totem.block_unlisted_ips") == 0) return &totem_config->block_unlisted_ips; + if (strcmp(param_name, "totem.cancel_token_hold_on_retransmit") == 0) + return &totem_config->cancel_token_hold_on_retransmit; return NULL; } /* * Read key_name from icmap. If key is not found or key_name == delete_key or if allow_zero is false * and readed value is zero, default value is used and stored into totem_config. */ static void totem_volatile_config_set_uint32_value (struct totem_config *totem_config, icmap_map_t map, const char *key_name, const char *deleted_key, unsigned int default_value, int allow_zero_value) { char runtime_key_name[ICMAP_KEYNAME_MAXLEN]; if (icmap_get_uint32_r(map, key_name, totem_get_param_by_name(totem_config, key_name)) != CS_OK || (deleted_key != NULL && strcmp(deleted_key, key_name) == 0) || (!allow_zero_value && *(uint32_t *)totem_get_param_by_name(totem_config, key_name) == 0)) { *(uint32_t *)totem_get_param_by_name(totem_config, key_name) = default_value; } /* * Store totem_config value to cmap runtime section */ if (strlen("runtime.config.") + strlen(key_name) >= ICMAP_KEYNAME_MAXLEN) { /* * This shouldn't happen */ return ; } strcpy(runtime_key_name, "runtime.config."); strcat(runtime_key_name, key_name); icmap_set_uint32_r(map, runtime_key_name, *(uint32_t *)totem_get_param_by_name(totem_config, key_name)); } static void totem_volatile_config_set_int32_value (struct totem_config *totem_config, icmap_map_t map, const char *key_name, const char *deleted_key, int default_value, int allow_zero_value) { char runtime_key_name[ICMAP_KEYNAME_MAXLEN]; if (icmap_get_int32_r(map, key_name, totem_get_param_by_name(totem_config, key_name)) != CS_OK || (deleted_key != NULL && strcmp(deleted_key, key_name) == 0) || (!allow_zero_value && *(int32_t *)totem_get_param_by_name(totem_config, key_name) == 0)) { *(int32_t *)totem_get_param_by_name(totem_config, key_name) = default_value; } /* * Store totem_config value to cmap runtime section */ if (strlen("runtime.config.") + strlen(key_name) >= ICMAP_KEYNAME_MAXLEN) { /* * This shouldn't happen */ return ; } strcpy(runtime_key_name, "runtime.config."); strcat(runtime_key_name, key_name); icmap_set_int32_r(map, runtime_key_name, *(int32_t *)totem_get_param_by_name(totem_config, key_name)); } static void totem_volatile_config_set_string_value (struct totem_config *totem_config, icmap_map_t map, const char *key_name, const char *deleted_key, const char *default_value) { char runtime_key_name[ICMAP_KEYNAME_MAXLEN]; int res; char *new_config_value; const void *config_value; config_value = totem_get_param_by_name(totem_config, key_name); res = icmap_get_string_r(map, key_name, (char **)&new_config_value); if (res != CS_OK || (deleted_key != NULL && strcmp(deleted_key, key_name) == 0)) { /* Slightly pointless use of strncpy but it keeps coverity happy */ strncpy((char *)config_value, default_value, CONFIG_STRING_LEN_MAX); } else { strncpy((char *)config_value, new_config_value, CONFIG_STRING_LEN_MAX); } if (res == CS_OK) { free(new_config_value); } /* * Store totem_config value to cmap runtime section */ if (strlen("runtime.config.") + strlen(key_name) >= ICMAP_KEYNAME_MAXLEN) { /* * This shouldn't happen */ return ; } strcpy(runtime_key_name, "runtime.config."); strcat(runtime_key_name, key_name); (void)icmap_set_string_r(map, runtime_key_name, (char *)config_value); } /* * Read string value stored in key_name from icmap, use it as a boolean (yes/no) type, convert it * to integer value (1/0) and store into totem_config. * * If key is not found or key_name == delete_key default value is used * and stored into totem_config. */ static void totem_volatile_config_set_boolean_value (struct totem_config *totem_config, icmap_map_t map, const char *key_name, const char *deleted_key, unsigned int default_value) { char runtime_key_name[ICMAP_KEYNAME_MAXLEN]; char *str; int val; str = NULL; val = default_value; if ((deleted_key != NULL && strcmp(deleted_key, key_name) == 0) || (icmap_get_string_r(map, key_name, &str) != CS_OK)) { /* * Do nothing. str is NULL (icmap_get_string ether not called or * not changed str). */ } else { if (strcmp(str, "yes") == 0) { val = 1; } else if (strcmp(str, "no") == 0) { val = 0; } free(str); } /* * Store totem_config value to cmap runtime section */ if (strlen("runtime.config.") + strlen(key_name) >= ICMAP_KEYNAME_MAXLEN) { /* * This shouldn't happen */ return ; } strcpy(runtime_key_name, "runtime.config."); strcat(runtime_key_name, key_name); *(uint32_t *)totem_get_param_by_name(totem_config, key_name) = val; icmap_set_uint32_r(map, runtime_key_name, val); } /* * Read and validate config values from cmap and store them into totem_config. If key doesn't exists, * default value is stored. deleted_key is name of key beeing processed by delete operation * from cmap. It is considered as non existing even if it can be read. Can be NULL. */ void totem_volatile_config_read (struct totem_config *totem_config, icmap_map_t temp_map, const char *deleted_key) { uint32_t u32; totem_volatile_config_set_uint32_value(totem_config, temp_map, "totem.token_retransmits_before_loss_const", deleted_key, TOKEN_RETRANSMITS_BEFORE_LOSS_CONST, 0); totem_volatile_config_set_uint32_value(totem_config, temp_map, "totem.token", deleted_key, TOKEN_TIMEOUT, 0); totem_volatile_config_set_uint32_value(totem_config, temp_map, "totem.token_warning", deleted_key, TOKEN_WARNING, 1); if (totem_config->interfaces[0].member_count > 2) { u32 = TOKEN_COEFFICIENT; icmap_get_uint32_r(temp_map, "totem.token_coefficient", &u32); totem_config->token_timeout += (totem_config->interfaces[0].member_count - 2) * u32; /* * Store totem_config value to cmap runtime section */ icmap_set_uint32_r(temp_map, "runtime.config.totem.token", totem_config->token_timeout); } totem_volatile_config_set_uint32_value(totem_config, temp_map, "totem.max_network_delay", deleted_key, MAX_NETWORK_DELAY, 0); totem_volatile_config_set_uint32_value(totem_config, temp_map, "totem.window_size", deleted_key, WINDOW_SIZE, 0); totem_volatile_config_set_uint32_value(totem_config, temp_map, "totem.max_messages", deleted_key, MAX_MESSAGES, 0); totem_volatile_config_set_uint32_value(totem_config, temp_map, "totem.miss_count_const", deleted_key, MISS_COUNT_CONST, 0); totem_volatile_config_set_uint32_value(totem_config, temp_map, "totem.knet_pmtud_interval", deleted_key, KNET_PMTUD_INTERVAL, 0); totem_volatile_config_set_uint32_value(totem_config, temp_map, "totem.token_retransmit", deleted_key, (int)(totem_config->token_timeout / (totem_config->token_retransmits_before_loss_const + 0.2)), 0); totem_volatile_config_set_uint32_value(totem_config, temp_map, "totem.hold", deleted_key, (int)(totem_config->token_retransmit_timeout * 0.8 - (1000/HZ)), 0); totem_volatile_config_set_uint32_value(totem_config, temp_map, "totem.join", deleted_key, JOIN_TIMEOUT, 0); totem_volatile_config_set_uint32_value(totem_config, temp_map, "totem.consensus", deleted_key, (int)(float)(1.2 * totem_config->token_timeout), 0); totem_volatile_config_set_uint32_value(totem_config, temp_map, "totem.merge", deleted_key, MERGE_TIMEOUT, 0); totem_volatile_config_set_uint32_value(totem_config, temp_map, "totem.downcheck", deleted_key, DOWNCHECK_TIMEOUT, 0); totem_volatile_config_set_uint32_value(totem_config, temp_map, "totem.fail_recv_const", deleted_key, FAIL_TO_RECV_CONST, 0); totem_volatile_config_set_uint32_value(totem_config, temp_map, "totem.seqno_unchanged_const", deleted_key, SEQNO_UNCHANGED_CONST, 0); totem_volatile_config_set_uint32_value(totem_config, temp_map, "totem.send_join", deleted_key, 0, 1); totem_volatile_config_set_uint32_value(totem_config, temp_map, "totem.heartbeat_failures_allowed", deleted_key, 0, 1); totem_volatile_config_set_uint32_value(totem_config, temp_map, "totem.knet_compression_threshold", deleted_key, 0, 1); totem_volatile_config_set_int32_value(totem_config, temp_map, "totem.knet_compression_level", deleted_key, 0, 1); totem_volatile_config_set_string_value(totem_config, temp_map, "totem.knet_compression_model", deleted_key, "none"); totem_volatile_config_set_boolean_value(totem_config, temp_map, "totem.block_unlisted_ips", deleted_key, BLOCK_UNLISTED_IPS); + + totem_volatile_config_set_boolean_value(totem_config, temp_map, "totem.cancel_token_hold_on_retransmit", + deleted_key, CANCEL_TOKEN_HOLD_ON_RETRANSMIT); } int totem_volatile_config_validate ( struct totem_config *totem_config, icmap_map_t temp_map, const char **error_string) { /* Static just to keep them off the stack */ static char local_error_reason[512]; static char addr_str_buf[INET6_ADDRSTRLEN]; const char *error_reason = local_error_reason; char name_key[ICMAP_KEYNAME_MAXLEN]; char *name_str; int i, j, num_configured, members; uint32_t tmp_config_value; if (totem_config->max_network_delay < MINIMUM_TIMEOUT) { snprintf (local_error_reason, sizeof(local_error_reason), "The max_network_delay parameter (%d ms) may not be less than (%d ms).", totem_config->max_network_delay, MINIMUM_TIMEOUT); goto parse_error; } if (totem_config->token_timeout < MINIMUM_TIMEOUT) { snprintf (local_error_reason, sizeof(local_error_reason), "The token timeout parameter (%d ms) may not be less than (%d ms).", totem_config->token_timeout, MINIMUM_TIMEOUT); goto parse_error; } if (totem_config->token_warning > 100 || totem_config->token_warning < 0) { snprintf (local_error_reason, sizeof(local_error_reason), "The token warning parameter (%d%%) must be between 0 (disabled) and 100.", totem_config->token_warning); goto parse_error; } if (totem_config->token_retransmit_timeout < MINIMUM_TIMEOUT) { if (icmap_get_uint32_r(temp_map, "totem.token_retransmit", &tmp_config_value) == CS_OK) { snprintf (local_error_reason, sizeof(local_error_reason), "The token retransmit timeout parameter (%d ms) may not be less than (%d ms).", totem_config->token_retransmit_timeout, MINIMUM_TIMEOUT); goto parse_error; } else { snprintf (local_error_reason, sizeof(local_error_reason), "Not appropriate token or token_retransmits_before_loss_const value set"); goto parse_error; } } if (totem_config->token_hold_timeout < MINIMUM_TIMEOUT_HOLD) { snprintf (local_error_reason, sizeof(local_error_reason), "The token hold timeout parameter (%d ms) may not be less than (%d ms).", totem_config->token_hold_timeout, MINIMUM_TIMEOUT_HOLD); goto parse_error; } if (totem_config->join_timeout < MINIMUM_TIMEOUT) { snprintf (local_error_reason, sizeof(local_error_reason), "The join timeout parameter (%d ms) may not be less than (%d ms).", totem_config->join_timeout, MINIMUM_TIMEOUT); goto parse_error; } if (totem_config->consensus_timeout < MINIMUM_TIMEOUT) { snprintf (local_error_reason, sizeof(local_error_reason), "The consensus timeout parameter (%d ms) may not be less than (%d ms).", totem_config->consensus_timeout, MINIMUM_TIMEOUT); goto parse_error; } if (totem_config->consensus_timeout < totem_config->join_timeout) { snprintf (local_error_reason, sizeof(local_error_reason), "The consensus timeout parameter (%d ms) may not be less than join timeout (%d ms).", totem_config->consensus_timeout, totem_config->join_timeout); goto parse_error; } if (totem_config->merge_timeout < MINIMUM_TIMEOUT) { snprintf (local_error_reason, sizeof(local_error_reason), "The merge timeout parameter (%d ms) may not be less than (%d ms).", totem_config->merge_timeout, MINIMUM_TIMEOUT); goto parse_error; } if (totem_config->downcheck_timeout < MINIMUM_TIMEOUT) { snprintf (local_error_reason, sizeof(local_error_reason), "The downcheck timeout parameter (%d ms) may not be less than (%d ms).", totem_config->downcheck_timeout, MINIMUM_TIMEOUT); goto parse_error; } /* Check that we have nodelist 'name' if there is more than one link */ num_configured = 0; members = -1; for (i = 0; i < INTERFACE_MAX; i++) { if (totem_config->interfaces[i].configured) { if (num_configured == 0) { members = totem_config->interfaces[i].member_count; } num_configured++; } } if (num_configured > 1) { /* * This assert is here just to make compiler happy */ assert(members != -1); for (i=0; i < members; i++) { snprintf(name_key, sizeof(name_key), "nodelist.node.%d.name", i); if (icmap_get_string_r(temp_map, name_key, &name_str) != CS_OK) { snprintf (local_error_reason, sizeof(local_error_reason), "for a multi-link configuration, all nodes must have a 'name' attribute"); goto parse_error; } free(name_str); } for (i=0; i < INTERFACE_MAX; i++) { if (!totem_config->interfaces[i].configured) { continue; } if (totem_config->interfaces[i].member_count != members) { snprintf (local_error_reason, sizeof(local_error_reason), "Not all nodes have the same number of links"); goto parse_error; } } } /* Verify that all nodes on the same link have the same IP family */ for (i=0; i < INTERFACE_MAX; i++) { for (j=1; jinterfaces[i].member_count; j++) { if (totem_config->interfaces[i].configured) { if (totem_config->interfaces[i].member_list[j].family != totem_config->interfaces[i].member_list[0].family) { memcpy(addr_str_buf, totemip_print(&(totem_config->interfaces[i].member_list[j])), sizeof(addr_str_buf)); snprintf (local_error_reason, sizeof(local_error_reason), "Nodes for link %d have different IP families " "(compared %s with %s)", i, addr_str_buf, totemip_print(&(totem_config->interfaces[i].member_list[0]))); goto parse_error; } } } } return 0; parse_error: snprintf (error_string_response, sizeof(error_string_response), "parse error in config: %s\n", error_reason); *error_string = error_string_response; return (-1); } static int totem_get_crypto(struct totem_config *totem_config, icmap_map_t map, const char **error_string) { char *str; const char *tmp_cipher; const char *tmp_hash; const char *tmp_model; char *crypto_model_str; int res = 0; tmp_hash = "none"; tmp_cipher = "none"; tmp_model = "none"; crypto_model_str = NULL; if (icmap_get_string_r(map, "totem.crypto_model", &crypto_model_str) == CS_OK) { tmp_model = crypto_model_str; } else { tmp_model = "nss"; } if (icmap_get_string_r(map, "totem.secauth", &str) == CS_OK) { if (strcmp(str, "on") == 0) { tmp_cipher = "aes256"; tmp_hash = "sha256"; } free(str); } if (icmap_get_string_r(map, "totem.crypto_cipher", &str) == CS_OK) { if (strcmp(str, "none") == 0) { tmp_cipher = "none"; } if (strcmp(str, "aes256") == 0) { tmp_cipher = "aes256"; } if (strcmp(str, "aes192") == 0) { tmp_cipher = "aes192"; } if (strcmp(str, "aes128") == 0) { tmp_cipher = "aes128"; } free(str); } if (icmap_get_string_r(map, "totem.crypto_hash", &str) == CS_OK) { if (strcmp(str, "none") == 0) { tmp_hash = "none"; } if (strcmp(str, "md5") == 0) { tmp_hash = "md5"; } if (strcmp(str, "sha1") == 0) { tmp_hash = "sha1"; } if (strcmp(str, "sha256") == 0) { tmp_hash = "sha256"; } if (strcmp(str, "sha384") == 0) { tmp_hash = "sha384"; } if (strcmp(str, "sha512") == 0) { tmp_hash = "sha512"; } free(str); } if ((strcmp(tmp_cipher, "none") != 0) && (strcmp(tmp_hash, "none") == 0)) { *error_string = "crypto_cipher requires crypto_hash with value other than none"; res = -1; goto out_free_crypto_model_str; } if (strcmp(tmp_model, "none") == 0) { /* * Shouldn't happen because it is handled by coroparse */ *error_string = "invalid crypto_model"; res = -1; goto out_free_crypto_model_str; } if (strcmp(tmp_cipher, totem_config->crypto_cipher_type) || strcmp(tmp_hash, totem_config->crypto_hash_type) || strcmp(tmp_model, totem_config->crypto_model)) { totem_config->crypto_changed = 1; } strncpy(totem_config->crypto_cipher_type, tmp_cipher, CONFIG_STRING_LEN_MAX - 1); totem_config->crypto_cipher_type[CONFIG_STRING_LEN_MAX - 1] = '\0'; strncpy(totem_config->crypto_hash_type, tmp_hash, CONFIG_STRING_LEN_MAX - 1); totem_config->crypto_hash_type[CONFIG_STRING_LEN_MAX - 1] = '\0'; strncpy(totem_config->crypto_model, tmp_model, CONFIG_STRING_LEN_MAX - 1); totem_config->crypto_model[CONFIG_STRING_LEN_MAX - 1] = '\0'; out_free_crypto_model_str: free(crypto_model_str); return (res); } static int nodelist_byname(icmap_map_t map, const char *find_name, int strip_domain) { icmap_iter_t iter; const char *iter_key; char name_str[ICMAP_KEYNAME_MAXLEN]; int res = 0; unsigned int node_pos; char *name; unsigned int namelen; iter = icmap_iter_init_r(map, "nodelist.node."); while ((iter_key = icmap_iter_next(iter, NULL, NULL)) != NULL) { res = sscanf(iter_key, "nodelist.node.%u.%s", &node_pos, name_str); if (res != 2) { continue; } /* ring0_addr is allowed as a fallback */ if (strcmp(name_str, "name") && strcmp(name_str, "ring0_addr")) { continue; } if (icmap_get_string_r(map, iter_key, &name) != CS_OK) { continue; } namelen = strlen(name); if (strip_domain) { char *dot; dot = strchr(name, '.'); if (dot) { namelen = dot - name; } } if (strncmp(find_name, name, namelen) == 0 && strlen(find_name) == namelen) { icmap_iter_finalize(iter); return node_pos; } } icmap_iter_finalize(iter); return -1; } /* Compare two addresses - only address part (sin_addr/sin6_addr) is checked */ static int ipaddr_equal(const struct sockaddr *addr1, const struct sockaddr *addr2) { int addrlen = 0; const void *addr1p, *addr2p; if (addr1->sa_family != addr2->sa_family) return 0; switch (addr1->sa_family) { case AF_INET: addrlen = sizeof(struct in_addr); addr1p = &((struct sockaddr_in *)addr1)->sin_addr; addr2p = &((struct sockaddr_in *)addr2)->sin_addr; break; case AF_INET6: addrlen = sizeof(struct in6_addr); addr1p = &((struct sockaddr_in6 *)addr1)->sin6_addr; addr2p = &((struct sockaddr_in6 *)addr2)->sin6_addr; break; default: assert(0); } return (memcmp(addr1p, addr2p, addrlen) == 0); } /* Finds the local node and returns its position in the nodelist. * Uses nodelist.local_node_pos as a cache to save effort */ static int find_local_node(icmap_map_t map, int use_cache) { char nodename2[PATH_MAX]; char name_str[ICMAP_KEYNAME_MAXLEN]; icmap_iter_t iter; const char *iter_key; unsigned int cached_pos; char *dot = NULL; const char *node; struct ifaddrs *ifa, *ifa_list; struct sockaddr *sa; int found = 0; int node_pos = -1; int res; struct utsname utsname; /* Check for cached value first */ if (use_cache) { if (icmap_get_uint32("nodelist.local_node_pos", &cached_pos) == CS_OK) { return cached_pos; } } res = uname(&utsname); if (res) { return -1; } node = utsname.nodename; /* 1. Exact match */ node_pos = nodelist_byname(map, node, 0); if (node_pos > -1) { found = 1; goto ret_found; } /* 2. Try to match with increasingly more * specific versions of it */ strcpy(nodename2, node); dot = strrchr(nodename2, '.'); while (dot) { *dot = '\0'; node_pos = nodelist_byname(map, nodename2, 0); if (node_pos > -1) { found = 1; goto ret_found; } dot = strrchr(nodename2, '.'); } node_pos = nodelist_byname(map, nodename2, 1); if (node_pos > -1) { found = 1; goto ret_found; } /* * The corosync.conf name may not be related to uname at all, * they may match a hostname on some network interface. */ if (getifaddrs(&ifa_list)) return -1; for (ifa = ifa_list; ifa; ifa = ifa->ifa_next) { socklen_t salen = 0; /* Restore this */ strcpy(nodename2, node); sa = ifa->ifa_addr; if (!sa) { continue; } if (sa->sa_family != AF_INET && sa->sa_family != AF_INET6) { continue; } if (sa->sa_family == AF_INET) { salen = sizeof(struct sockaddr_in); } if (sa->sa_family == AF_INET6) { salen = sizeof(struct sockaddr_in6); } if (getnameinfo(sa, salen, nodename2, sizeof(nodename2), NULL, 0, 0) == 0) { node_pos = nodelist_byname(map, nodename2, 0); if (node_pos > -1) { found = 1; goto out; } /* Truncate this name and try again */ dot = strchr(nodename2, '.'); if (dot) { *dot = '\0'; node_pos = nodelist_byname(map, nodename2, 0); if (node_pos > -1) { found = 1; goto out; } } } /* See if it's the IP address that's in corosync.conf */ if (getnameinfo(sa, sizeof(*sa), nodename2, sizeof(nodename2), NULL, 0, NI_NUMERICHOST)) continue; node_pos = nodelist_byname(map, nodename2, 0); if (node_pos > -1) { found = 1; goto out; } } out: if (found) { freeifaddrs(ifa_list); goto ret_found; } /* * This section covers the usecase where the nodename specified in cluster.conf * is an alias specified in /etc/hosts. For example: * hostname alias1 alias2 * and * the above calls use uname and getnameinfo does not return aliases. * here we take the name specified in cluster.conf, resolve it to an address * and then compare against all known local ip addresses. * if we have a match, we found our nodename. In theory this chunk of code * could replace all the checks above, but let's avoid any possible regressions * and use it as last. */ iter = icmap_iter_init_r(map, "nodelist.node."); while ((iter_key = icmap_iter_next(iter, NULL, NULL)) != NULL) { char *dbnodename = NULL; struct addrinfo hints; struct addrinfo *result = NULL, *rp = NULL; res = sscanf(iter_key, "nodelist.node.%u.%s", &node_pos, name_str); if (res != 2) { continue; } /* 'ring0_addr' is allowed as a fallback, but 'name' will be found first * because the names are in alpha order. */ if (strcmp(name_str, "name") && strcmp(name_str, "ring0_addr")) { continue; } if (icmap_get_string_r(map, iter_key, &dbnodename) != CS_OK) { continue; } memset(&hints, 0, sizeof(struct addrinfo)); hints.ai_family = AF_UNSPEC; hints.ai_socktype = SOCK_DGRAM; hints.ai_flags = 0; hints.ai_protocol = IPPROTO_UDP; if (getaddrinfo(dbnodename, NULL, &hints, &result)) { continue; } for (rp = result; rp != NULL; rp = rp->ai_next) { for (ifa = ifa_list; ifa; ifa = ifa->ifa_next) { if (ifa->ifa_addr && ipaddr_equal(rp->ai_addr, ifa->ifa_addr)) { freeaddrinfo(result); found = 1; goto out2; } } } freeaddrinfo(result); } out2: icmap_iter_finalize(iter); freeifaddrs(ifa_list); ret_found: if (found) { res = icmap_set_uint32_r(map, "nodelist.local_node_pos", node_pos); } return node_pos; } static enum totem_ip_version_enum totem_config_get_ip_version(struct totem_config *totem_config) { enum totem_ip_version_enum res; char *str; res = TOTEM_IP_VERSION_6_4; if (totem_config->transport_number == TOTEM_TRANSPORT_UDP) { res = TOTEM_IP_VERSION_4; } if (icmap_get_string("totem.ip_version", &str) == CS_OK) { if (strcmp(str, "ipv4") == 0) { res = TOTEM_IP_VERSION_4; } if (strcmp(str, "ipv6") == 0) { res = TOTEM_IP_VERSION_6; } if (strcmp(str, "ipv6-4") == 0) { res = TOTEM_IP_VERSION_6_4; } if (strcmp(str, "ipv4-6") == 0) { res = TOTEM_IP_VERSION_4_6; } free(str); } return (res); } static uint16_t generate_cluster_id (const char *cluster_name) { int i; int value = 0; for (i = 0; i < strlen(cluster_name); i++) { value <<= 1; value += cluster_name[i]; } return (value & 0xFFFF); } static int get_cluster_mcast_addr ( const char *cluster_name, unsigned int linknumber, enum totem_ip_version_enum ip_version, struct totem_ip_address *res) { uint16_t clusterid; char addr[INET6_ADDRSTRLEN + 1]; int err; if (cluster_name == NULL) { return (-1); } clusterid = generate_cluster_id(cluster_name) + linknumber; memset (res, 0, sizeof(*res)); switch (ip_version) { case TOTEM_IP_VERSION_4: case TOTEM_IP_VERSION_4_6: snprintf(addr, sizeof(addr), "239.192.%d.%d", clusterid >> 8, clusterid % 0xFF); break; case TOTEM_IP_VERSION_6: case TOTEM_IP_VERSION_6_4: snprintf(addr, sizeof(addr), "ff15::%x", clusterid); break; default: /* * Unknown family */ return (-1); } err = totemip_parse (res, addr, ip_version); return (err); } static unsigned int generate_nodeid( struct totem_config *totem_config, char *addr) { unsigned int nodeid; struct totem_ip_address totemip; /* AF_INET hard-coded here because auto-generated nodeids are only for IPv4 */ if (totemip_parse(&totemip, addr, TOTEM_IP_VERSION_4) != 0) return -1; memcpy (&nodeid, &totemip.addr, sizeof (unsigned int)); #if __BYTE_ORDER == __LITTLE_ENDIAN nodeid = swab32 (nodeid); #endif if (totem_config->clear_node_high_bit) { nodeid &= 0x7FFFFFFF; } return nodeid; } static int check_for_duplicate_nodeids( struct totem_config *totem_config, const char **error_string) { icmap_iter_t iter; icmap_iter_t subiter; const char *iter_key; int res = 0; int retval = 0; char tmp_key[ICMAP_KEYNAME_MAXLEN]; char *ring0_addr=NULL; char *ring0_addr1=NULL; unsigned int node_pos; unsigned int node_pos1; unsigned int last_node_pos = -1; unsigned int nodeid; unsigned int nodeid1; int autogenerated; iter = icmap_iter_init("nodelist.node."); while ((iter_key = icmap_iter_next(iter, NULL, NULL)) != NULL) { res = sscanf(iter_key, "nodelist.node.%u.%s", &node_pos, tmp_key); if (res != 2) { continue; } /* * This relies on the fact the icmap keys are always returned in order * so all of the keys for a node will be grouped together. We're basically * just running the code below once for each node. */ if (last_node_pos == node_pos) { continue; } last_node_pos = node_pos; snprintf(tmp_key, ICMAP_KEYNAME_MAXLEN, "nodelist.node.%u.nodeid", node_pos); autogenerated = 0; /* Generated nodeids are only allowed for UDP/UDPU so ring0_addr is valid here */ if (icmap_get_uint32(tmp_key, &nodeid) != CS_OK) { snprintf(tmp_key, ICMAP_KEYNAME_MAXLEN, "nodelist.node.%u.ring0_addr", node_pos); if (icmap_get_string(tmp_key, &ring0_addr) != CS_OK) { continue; } /* Generate nodeid so we can check that auto-generated nodeids don't clash either */ nodeid = generate_nodeid(totem_config, ring0_addr); if (nodeid == -1) { continue; } autogenerated = 1; } node_pos1 = 0; subiter = icmap_iter_init("nodelist.node."); while (((iter_key = icmap_iter_next(subiter, NULL, NULL)) != NULL) && (node_pos1 < node_pos)) { res = sscanf(iter_key, "nodelist.node.%u.%s", &node_pos1, tmp_key); if ((res != 2) || (node_pos1 >= node_pos)) { continue; } if (strcmp(tmp_key, "nodeid") != 0) { continue; } snprintf(tmp_key, ICMAP_KEYNAME_MAXLEN, "nodelist.node.%u.nodeid", node_pos1); if (icmap_get_uint32(tmp_key, &nodeid1) != CS_OK) { snprintf(tmp_key, ICMAP_KEYNAME_MAXLEN, "nodelist.node.%u.ring0_addr", node_pos1); if (icmap_get_string(tmp_key, &ring0_addr1) != CS_OK) { continue; } nodeid1 = generate_nodeid(totem_config, ring0_addr1); if (nodeid1 == -1) { continue; } } if (nodeid == nodeid1) { retval = -1; snprintf (error_string_response, sizeof(error_string_response), "Nodeid %u%s%s%s appears twice in corosync.conf", nodeid, autogenerated?"(autogenerated from ":"", autogenerated?ring0_addr:"", autogenerated?")":""); *error_string = error_string_response; break; } } icmap_iter_finalize(subiter); } icmap_iter_finalize(iter); return retval; } /* * This needs to be done last of all. It would be nice to do it when reading the * interface params, but the totem params need to have them to be read first. We * need both, so this is a way round that circular dependancy. */ static void calc_knet_ping_timers(struct totem_config *totem_config) { char runtime_key_name[ICMAP_KEYNAME_MAXLEN]; int interface; for (interface = 0; interface < INTERFACE_MAX; interface++) { if (totem_config->interfaces[interface].configured) { if (!totem_config->interfaces[interface].knet_pong_count) { totem_config->interfaces[interface].knet_pong_count = KNET_PONG_COUNT; } if (!totem_config->interfaces[interface].knet_ping_timeout) { totem_config->interfaces[interface].knet_ping_timeout = totem_config->token_timeout / totem_config->interfaces[interface].knet_pong_count; } snprintf(runtime_key_name, sizeof(runtime_key_name), "runtime.config.totem.interface.%d.knet_ping_timeout", interface); icmap_set_uint32(runtime_key_name, totem_config->interfaces[interface].knet_ping_timeout); if (!totem_config->interfaces[interface].knet_ping_interval) { totem_config->interfaces[interface].knet_ping_interval = totem_config->token_timeout / (totem_config->interfaces[interface].knet_pong_count * 2); } snprintf(runtime_key_name, sizeof(runtime_key_name), "runtime.config.totem.interface.%d.knet_ping_interval", interface); icmap_set_uint32(runtime_key_name, totem_config->interfaces[interface].knet_ping_interval); } } } /* * Compute difference between two set of totem interface arrays and commit it. * set1 and set2 * are changed so for same ring, ip existing in both set1 and set2 are cleared * (set to 0), and ips which are only in set1 or set2 remains untouched. * totempg_node_add/remove is called. */ static void compute_and_set_totempg_interfaces(struct totem_interface *set1, struct totem_interface *set2) { int ring_no, set1_pos, set2_pos; struct totem_ip_address empty_ip_address; memset(&empty_ip_address, 0, sizeof(empty_ip_address)); for (ring_no = 0; ring_no < INTERFACE_MAX; ring_no++) { if (!set1[ring_no].configured && !set2[ring_no].configured) { continue; } for (set1_pos = 0; set1_pos < set1[ring_no].member_count; set1_pos++) { for (set2_pos = 0; set2_pos < set2[ring_no].member_count; set2_pos++) { /* * For current ring_no remove all set1 items existing * in set2 */ if (memcmp(&set1[ring_no].member_list[set1_pos], &set2[ring_no].member_list[set2_pos], sizeof(struct totem_ip_address)) == 0) { memset(&set1[ring_no].member_list[set1_pos], 0, sizeof(struct totem_ip_address)); memset(&set2[ring_no].member_list[set2_pos], 0, sizeof(struct totem_ip_address)); } } } } for (ring_no = 0; ring_no < INTERFACE_MAX; ring_no++) { for (set1_pos = 0; set1_pos < set1[ring_no].member_count; set1_pos++) { /* * All items which remain in set1 and don't exist in set2 any more * have to be removed. */ if (memcmp(&set1[ring_no].member_list[set1_pos], &empty_ip_address, sizeof(empty_ip_address)) != 0) { log_printf(LOGSYS_LEVEL_DEBUG, "removing dynamic member %s for ring %u", totemip_print(&set1[ring_no].member_list[set1_pos]), ring_no); totempg_member_remove(&set1[ring_no].member_list[set1_pos], ring_no); } } if (!set2[ring_no].configured) { continue; } for (set2_pos = 0; set2_pos < set2[ring_no].member_count; set2_pos++) { /* * All items which remain in set2 and don't exist in set1 are new nodes * and have to be added. */ if (memcmp(&set2[ring_no].member_list[set2_pos], &empty_ip_address, sizeof(empty_ip_address)) != 0) { log_printf(LOGSYS_LEVEL_DEBUG, "adding dynamic member %s for ring %u", totemip_print(&set2[ring_no].member_list[set2_pos]), ring_no); totempg_member_add(&set2[ring_no].member_list[set2_pos], ring_no); } } } } /* * Configure parameters for links */ static void configure_link_params(struct totem_config *totem_config, icmap_map_t map) { int i; char tmp_key[ICMAP_KEYNAME_MAXLEN]; char *addr_string; int err; int local_node_pos = find_local_node(map, 0); for (i = 0; iinterfaces[i].configured) { continue; } log_printf(LOGSYS_LEVEL_DEBUG, "Configuring link %d params\n", i); snprintf(tmp_key, ICMAP_KEYNAME_MAXLEN, "nodelist.node.%u.ring%u_addr", local_node_pos, i); if (icmap_get_string_r(map, tmp_key, &addr_string) != CS_OK) { continue; } err = totemip_parse(&totem_config->interfaces[i].local_ip, addr_string, totem_config->ip_version); if (err != 0) { continue; } totem_config->interfaces[i].local_ip.nodeid = totem_config->node_id; /* In case this is a new link, fill in the defaults if there was no interface{} section for it */ if (!totem_config->interfaces[i].knet_link_priority) totem_config->interfaces[i].knet_link_priority = 1; /* knet_ping_interval & knet_ping_timeout are set later once we know all the other params */ if (!totem_config->interfaces[i].knet_ping_precision) totem_config->interfaces[i].knet_ping_precision = KNET_PING_PRECISION; if (!totem_config->interfaces[i].knet_pong_count) totem_config->interfaces[i].knet_pong_count = KNET_PONG_COUNT; if (!totem_config->interfaces[i].knet_transport) totem_config->interfaces[i].knet_transport = KNET_TRANSPORT_UDP; if (!totem_config->interfaces[i].ip_port) totem_config->interfaces[i].ip_port = DEFAULT_PORT + i; } } static void configure_totem_links(struct totem_config *totem_config, icmap_map_t map) { int i; for (i = 0; iinterfaces[i].configured) { continue; } log_printf(LOGSYS_LEVEL_INFO, "Configuring link %d\n", i); totempg_iface_set(&totem_config->interfaces[i].local_ip, totem_config->interfaces[i].ip_port, i); } } /* Check for differences in config that can't be done on-the-fly and print an error */ static int check_things_have_not_changed(struct totem_config *totem_config, const char **error_string) { int i,j,k; const char *ip_str; char addr_buf[INET6_ADDRSTRLEN]; int changed = 0; for (i = 0; iinterfaces[i].configured && totem_config->orig_interfaces[i].configured) { if (totem_config->interfaces[i].knet_transport != totem_config->orig_interfaces[i].knet_transport) { log_printf(LOGSYS_LEVEL_ERROR, "New config has different knet transport for link %d. Internal value was NOT changed.\n", i); changed = 1; } /* Check each nodeid in the new configuration and make sure its IP address on this link has not changed */ for (j=0; j < totem_config->interfaces[i].member_count; j++) { for (k=0; k < totem_config->orig_interfaces[i].member_count; k++) { if (totem_config->interfaces[i].member_list[j].nodeid == totem_config->orig_interfaces[i].member_list[k].nodeid) { /* Found our nodeid - check the IP address */ if (memcmp(&totem_config->interfaces[i].member_list[j], &totem_config->orig_interfaces[i].member_list[k], sizeof(struct totem_ip_address))) { ip_str = totemip_print(&totem_config->orig_interfaces[i].member_list[k]); /* if ip_str is NULL then the old address was invalid and is allowed to change */ if (ip_str) { strncpy(addr_buf, ip_str, sizeof(addr_buf)); addr_buf[sizeof(addr_buf) - 1] = '\0'; log_printf(LOGSYS_LEVEL_ERROR, "new config has different address for link %d (addr changed from %s to %s). Internal value was NOT changed.\n", i, addr_buf, totemip_print(&totem_config->interfaces[i].member_list[j])); changed = 1; } } } } } } } if (changed) { snprintf (error_string_response, sizeof(error_string_response), "To reconfigure an interface it must be deleted and recreated. A working interface needs to be available to corosync at all times"); *error_string = error_string_response; return -1; } return 0; } static int put_nodelist_members_to_config(struct totem_config *totem_config, icmap_map_t map, int reload, const char **error_string) { icmap_iter_t iter, iter2; const char *iter_key, *iter_key2; int res = 0; unsigned int node_pos; char tmp_key[ICMAP_KEYNAME_MAXLEN]; char tmp_key2[ICMAP_KEYNAME_MAXLEN]; char *node_addr_str; int member_count; unsigned int linknumber = 0; int i, j; int last_node_pos = -1; /* Clear out nodelist so we can put the new one in if needed */ for (i = 0; i < INTERFACE_MAX; i++) { for (j = 0; j < PROCESSOR_COUNT_MAX; j++) { memset(&totem_config->interfaces[i].member_list[j], 0, sizeof(struct totem_ip_address)); } totem_config->interfaces[i].member_count = 0; } iter = icmap_iter_init_r(map, "nodelist.node."); while ((iter_key = icmap_iter_next(iter, NULL, NULL)) != NULL) { res = sscanf(iter_key, "nodelist.node.%u.%s", &node_pos, tmp_key); if (res != 2) { continue; } /* If it's the same as the last node_pos then skip it */ if (node_pos == last_node_pos) { continue; } last_node_pos = node_pos; snprintf(tmp_key, ICMAP_KEYNAME_MAXLEN, "nodelist.node.%u.", node_pos); iter2 = icmap_iter_init_r(map, tmp_key); while ((iter_key2 = icmap_iter_next(iter2, NULL, NULL)) != NULL) { unsigned int nodeid; char *str; snprintf(tmp_key, ICMAP_KEYNAME_MAXLEN, "nodelist.node.%u.nodeid", node_pos); if (icmap_get_uint32_r(map, tmp_key, &nodeid) != CS_OK) { nodeid = 0; } res = sscanf(iter_key2, "nodelist.node.%u.ring%u%s", &node_pos, &linknumber, tmp_key2); if (res != 3 || strcmp(tmp_key2, "_addr") != 0) { continue; } if (linknumber >= INTERFACE_MAX) { snprintf (error_string_response, sizeof(error_string_response), "parse error in config: interface ring number %u is bigger than allowed maximum %u\n", linknumber, INTERFACE_MAX - 1); *error_string = error_string_response; icmap_iter_finalize(iter2); icmap_iter_finalize(iter); return (-1); } if (icmap_get_string_r(map, iter_key2, &node_addr_str) != CS_OK) { continue; } /* Generate nodeids if they are not provided and transport is UDP/U */ if (!nodeid && (totem_config->transport_number == TOTEM_TRANSPORT_UDP || totem_config->transport_number == TOTEM_TRANSPORT_UDPU)) { snprintf(tmp_key, ICMAP_KEYNAME_MAXLEN, "nodelist.node.%u.ring0_addr", node_pos); if (icmap_get_string_r(map, tmp_key, &str) == CS_OK) { nodeid = generate_nodeid(totem_config, str); if (nodeid == -1) { sprintf(error_string_response, "An IPV6 network requires that a node ID be specified " "for address '%s'.", node_addr_str); *error_string = error_string_response; free(str); return (-1); } log_printf(LOGSYS_LEVEL_DEBUG, "Generated nodeid = " CS_PRI_NODE_ID " for %s", nodeid, str); free(str); /* * Put nodeid back to nodelist to make cfgtool work */ snprintf(tmp_key, ICMAP_KEYNAME_MAXLEN, "nodelist.node.%u.nodeid", node_pos); /* * Not critical */ (void)icmap_set_uint32_r(map, tmp_key, nodeid); } } if (!nodeid && totem_config->transport_number == TOTEM_TRANSPORT_KNET) { sprintf(error_string_response, "Knet requires an explicit nodeid to be specified " "for address '%s'.", node_addr_str); *error_string = error_string_response; return (-1); } if (totem_config->transport_number == TOTEM_TRANSPORT_KNET && nodeid >= KNET_MAX_HOST) { sprintf(error_string_response, "Knet requires nodeid to be less than %u " "for address '%s'.", KNET_MAX_HOST, node_addr_str); *error_string = error_string_response; return (-1); } member_count = totem_config->interfaces[linknumber].member_count; res = totemip_parse(&totem_config->interfaces[linknumber].member_list[member_count], node_addr_str, totem_config->ip_version); if (res == 0) { totem_config->interfaces[linknumber].member_list[member_count].nodeid = nodeid; totem_config->interfaces[linknumber].member_count++; totem_config->interfaces[linknumber].configured = 1; } else { sprintf(error_string_response, "failed to parse node address '%s'\n", node_addr_str); *error_string = error_string_response; memset(&totem_config->interfaces[linknumber].member_list[member_count], 0, sizeof(struct totem_ip_address)); free(node_addr_str); icmap_iter_finalize(iter2); icmap_iter_finalize(iter); return -1; } free(node_addr_str); } icmap_iter_finalize(iter2); } icmap_iter_finalize(iter); configure_link_params(totem_config, map); if (reload) { log_printf(LOGSYS_LEVEL_DEBUG, "About to reconfigure links from nodelist.\n"); if (check_things_have_not_changed(totem_config, error_string) == -1) { return -1; } } return 0; } static void config_convert_nodelist_to_interface(icmap_map_t map, struct totem_config *totem_config) { int res = 0; int node_pos; char tmp_key[ICMAP_KEYNAME_MAXLEN]; char tmp_key2[ICMAP_KEYNAME_MAXLEN]; char *node_addr_str; unsigned int linknumber = 0; icmap_iter_t iter; const char *iter_key; node_pos = find_local_node(map, 1); if (node_pos > -1) { /* * We found node, so create interface section */ snprintf(tmp_key, ICMAP_KEYNAME_MAXLEN, "nodelist.node.%u.", node_pos); iter = icmap_iter_init_r(map, tmp_key); while ((iter_key = icmap_iter_next(iter, NULL, NULL)) != NULL) { res = sscanf(iter_key, "nodelist.node.%u.ring%u%s", &node_pos, &linknumber, tmp_key2); if (res != 3 || strcmp(tmp_key2, "_addr") != 0) { continue ; } if (icmap_get_string_r(map, iter_key, &node_addr_str) != CS_OK) { continue; } snprintf(tmp_key2, ICMAP_KEYNAME_MAXLEN, "totem.interface.%u.bindnetaddr", linknumber); icmap_set_string_r(map, tmp_key2, node_addr_str); free(node_addr_str); } icmap_iter_finalize(iter); } } static int get_interface_params(struct totem_config *totem_config, icmap_map_t map, const char **error_string, uint64_t *warnings, int reload) { int res = 0; unsigned int linknumber = 0; int member_count = 0; int i; icmap_iter_t iter, member_iter; const char *iter_key; const char *member_iter_key; char linknumber_key[ICMAP_KEYNAME_MAXLEN]; char tmp_key[ICMAP_KEYNAME_MAXLEN]; uint8_t u8; uint32_t u32; char *str; char *cluster_name = NULL; enum totem_ip_version_enum tmp_ip_version = TOTEM_IP_VERSION_4; int ret = 0; if (reload) { for (i=0; iinterfaces[i].configured = 0; totem_config->interfaces[i].knet_ping_timeout = 0; totem_config->interfaces[i].knet_ping_interval = 0; totem_config->interfaces[i].knet_ping_precision = KNET_PING_PRECISION; totem_config->interfaces[i].knet_pong_count = KNET_PONG_COUNT; } } if (icmap_get_string_r(map, "totem.cluster_name", &cluster_name) != CS_OK) { cluster_name = NULL; } iter = icmap_iter_init_r(map, "totem.interface."); while ((iter_key = icmap_iter_next(iter, NULL, NULL)) != NULL) { res = sscanf(iter_key, "totem.interface.%[^.].%s", linknumber_key, tmp_key); if (res != 2) { continue; } if (strcmp(tmp_key, "bindnetaddr") != 0 && totem_config->transport_number == TOTEM_TRANSPORT_UDP) { continue; } member_count = 0; linknumber = atoi(linknumber_key); if (linknumber >= INTERFACE_MAX) { snprintf (error_string_response, sizeof(error_string_response), "parse error in config: interface ring number %u is bigger than allowed maximum %u\n", linknumber, INTERFACE_MAX - 1); *error_string = error_string_response; ret = -1; goto out; } /* These things are only valid for the initial read */ if (!reload) { /* * Get the bind net address */ snprintf(tmp_key, ICMAP_KEYNAME_MAXLEN, "totem.interface.%u.bindnetaddr", linknumber); if (icmap_get_string_r(map, tmp_key, &str) == CS_OK) { res = totemip_parse (&totem_config->interfaces[linknumber].bindnet, str, totem_config->ip_version); if (res) { sprintf(error_string_response, "failed to parse bindnet address '%s'\n", str); *error_string = error_string_response; free(str); ret = -1; goto out; } free(str); } /* * Get interface multicast address */ snprintf(tmp_key, ICMAP_KEYNAME_MAXLEN, "totem.interface.%u.mcastaddr", linknumber); if (icmap_get_string_r(map, tmp_key, &str) == CS_OK) { res = totemip_parse (&totem_config->interfaces[linknumber].mcast_addr, str, totem_config->ip_version); if (res) { sprintf(error_string_response, "failed to parse mcast address '%s'\n", str); *error_string = error_string_response; free(str); ret = -1; goto out; } free(str); } else if (totem_config->transport_number == TOTEM_TRANSPORT_UDP) { /* * User not specified address -> autogenerate one from cluster_name key * (if available). Return code is intentionally ignored, because * udpu doesn't need mcastaddr and validity of mcastaddr for udp is * checked later anyway. */ if (totem_config->interfaces[0].bindnet.family == AF_INET) { tmp_ip_version = TOTEM_IP_VERSION_4; } else if (totem_config->interfaces[0].bindnet.family == AF_INET6) { tmp_ip_version = TOTEM_IP_VERSION_6; } (void)get_cluster_mcast_addr (cluster_name, linknumber, tmp_ip_version, &totem_config->interfaces[linknumber].mcast_addr); } snprintf(tmp_key, ICMAP_KEYNAME_MAXLEN, "totem.interface.%u.broadcast", linknumber); if (icmap_get_string(tmp_key, &str) == CS_OK) { if (strcmp (str, "yes") == 0) { totem_config->broadcast_use = 1; } free(str); } } /* These things are only valid for the initial read OR a newly-defined link */ if (!reload || (totem_config->interfaces[linknumber].configured == 0)) { /* * Get mcast port */ snprintf(tmp_key, ICMAP_KEYNAME_MAXLEN, "totem.interface.%u.mcastport", linknumber); if (icmap_get_uint16_r(map, tmp_key, &totem_config->interfaces[linknumber].ip_port) != CS_OK) { if (totem_config->broadcast_use) { totem_config->interfaces[linknumber].ip_port = DEFAULT_PORT + (2 * linknumber); } else { totem_config->interfaces[linknumber].ip_port = DEFAULT_PORT + linknumber; } } /* * Get the TTL */ totem_config->interfaces[linknumber].ttl = 1; snprintf(tmp_key, ICMAP_KEYNAME_MAXLEN, "totem.interface.%u.ttl", linknumber); if (icmap_get_uint8_r(map, tmp_key, &u8) == CS_OK) { totem_config->interfaces[linknumber].ttl = u8; } totem_config->interfaces[linknumber].knet_transport = KNET_DEFAULT_TRANSPORT; snprintf(tmp_key, ICMAP_KEYNAME_MAXLEN, "totem.interface.%u.knet_transport", linknumber); if (icmap_get_string_r(map, tmp_key, &str) == CS_OK) { if (strcmp(str, "sctp") == 0) { totem_config->interfaces[linknumber].knet_transport = KNET_TRANSPORT_SCTP; } else if (strcmp(str, "udp") == 0) { totem_config->interfaces[linknumber].knet_transport = KNET_TRANSPORT_UDP; } else { *error_string = "Unrecognised knet_transport. expected 'udp' or 'sctp'"; ret = -1; goto out; } } } totem_config->interfaces[linknumber].configured = 1; /* * Get the knet link params */ totem_config->interfaces[linknumber].knet_link_priority = 1; snprintf(tmp_key, ICMAP_KEYNAME_MAXLEN, "totem.interface.%u.knet_link_priority", linknumber); if (icmap_get_uint8_r(map, tmp_key, &u8) == CS_OK) { totem_config->interfaces[linknumber].knet_link_priority = u8; } totem_config->interfaces[linknumber].knet_ping_interval = 0; /* real default applied later */ snprintf(tmp_key, ICMAP_KEYNAME_MAXLEN, "totem.interface.%u.knet_ping_interval", linknumber); if (icmap_get_uint32_r(map, tmp_key, &u32) == CS_OK) { totem_config->interfaces[linknumber].knet_ping_interval = u32; } totem_config->interfaces[linknumber].knet_ping_timeout = 0; /* real default applied later */ snprintf(tmp_key, ICMAP_KEYNAME_MAXLEN, "totem.interface.%u.knet_ping_timeout", linknumber); if (icmap_get_uint32_r(map, tmp_key, &u32) == CS_OK) { totem_config->interfaces[linknumber].knet_ping_timeout = u32; } totem_config->interfaces[linknumber].knet_ping_precision = KNET_PING_PRECISION; snprintf(tmp_key, ICMAP_KEYNAME_MAXLEN, "totem.interface.%u.knet_ping_precision", linknumber); if (icmap_get_uint32_r(map, tmp_key, &u32) == CS_OK) { totem_config->interfaces[linknumber].knet_ping_precision = u32; } totem_config->interfaces[linknumber].knet_pong_count = KNET_PONG_COUNT; snprintf(tmp_key, ICMAP_KEYNAME_MAXLEN, "totem.interface.%u.knet_pong_count", linknumber); if (icmap_get_uint32_r(map, tmp_key, &u32) == CS_OK) { totem_config->interfaces[linknumber].knet_pong_count = u32; } snprintf(tmp_key, ICMAP_KEYNAME_MAXLEN, "totem.interface.%u.member.", linknumber); member_iter = icmap_iter_init_r(map, tmp_key); while ((member_iter_key = icmap_iter_next(member_iter, NULL, NULL)) != NULL) { if (member_count == 0) { if (icmap_get_string("nodelist.node.0.ring0_addr", &str) == CS_OK) { free(str); *warnings |= TOTEM_CONFIG_WARNING_MEMBERS_IGNORED; break; } else { *warnings |= TOTEM_CONFIG_WARNING_MEMBERS_DEPRECATED; } } if (icmap_get_string_r(map, member_iter_key, &str) == CS_OK) { res = totemip_parse (&totem_config->interfaces[linknumber].member_list[member_count++], str, totem_config->ip_version); if (res) { sprintf(error_string_response, "failed to parse node address '%s'\n", str); *error_string = error_string_response; icmap_iter_finalize(member_iter); free(str); ret = -1; goto out; } free(str); } } icmap_iter_finalize(member_iter); totem_config->interfaces[linknumber].member_count = member_count; } out: icmap_iter_finalize(iter); free(cluster_name); return (ret); } extern int totem_config_read ( struct totem_config *totem_config, const char **error_string, uint64_t *warnings) { int res = 0; char *str, *ring0_addr_str; char tmp_key[ICMAP_KEYNAME_MAXLEN]; uint16_t u16; int i; int local_node_pos; uint32_t u32; *warnings = 0; memset (totem_config, 0, sizeof (struct totem_config)); totem_config->interfaces = malloc (sizeof (struct totem_interface) * INTERFACE_MAX); if (totem_config->interfaces == 0) { *error_string = "Out of memory trying to allocate ethernet interface storage area"; return -1; } totem_config->transport_number = TOTEM_TRANSPORT_KNET; if (icmap_get_string("totem.transport", &str) == CS_OK) { if (strcmp (str, "udpu") == 0) { totem_config->transport_number = TOTEM_TRANSPORT_UDPU; } else if (strcmp (str, "udp") == 0) { totem_config->transport_number = TOTEM_TRANSPORT_UDP; } else if (strcmp (str, "knet") == 0) { totem_config->transport_number = TOTEM_TRANSPORT_KNET; } else { *error_string = "Invalid transport type. Should be udpu, udp or knet"; free(str); return -1; } free(str); } memset (totem_config->interfaces, 0, sizeof (struct totem_interface) * INTERFACE_MAX); strcpy (totem_config->link_mode, "passive"); icmap_get_uint32("totem.version", (uint32_t *)&totem_config->version); /* initial crypto load */ if (totem_get_crypto(totem_config, icmap_get_global_map(), error_string) != 0) { return -1; } if (totem_config_keyread(totem_config, icmap_get_global_map(), error_string) != 0) { return -1; } totem_config->crypto_index = 1; totem_config->crypto_changed = 0; if (icmap_get_string("totem.link_mode", &str) == CS_OK) { if (strlen(str) >= TOTEM_LINK_MODE_BYTES) { *error_string = "totem.link_mode is too long"; free(str); return -1; } strcpy (totem_config->link_mode, str); free(str); } if (icmap_get_uint32("totem.nodeid", &u32) == CS_OK) { *warnings |= TOTEM_CONFIG_WARNING_TOTEM_NODEID_SET; } totem_config->clear_node_high_bit = 0; if (icmap_get_string("totem.clear_node_high_bit", &str) == CS_OK) { if (strcmp (str, "yes") == 0) { totem_config->clear_node_high_bit = 1; } free(str); } icmap_get_uint32("totem.threads", &totem_config->threads); icmap_get_uint32("totem.netmtu", &totem_config->net_mtu); totem_config->ip_version = totem_config_get_ip_version(totem_config); if (icmap_get_string("totem.interface.0.bindnetaddr", &str) != CS_OK) { /* * We were not able to find ring 0 bindnet addr. Try to use nodelist informations */ config_convert_nodelist_to_interface(icmap_get_global_map(), totem_config); } else { if (icmap_get_string("nodelist.node.0.ring0_addr", &ring0_addr_str) == CS_OK) { /* * Both bindnetaddr and ring0_addr are set. * Log warning information, and use nodelist instead */ *warnings |= TOTEM_CONFIG_BINDNETADDR_NODELIST_SET; config_convert_nodelist_to_interface(icmap_get_global_map(), totem_config); free(ring0_addr_str); } free(str); } /* * Broadcast option is global but set in interface section, * so reset before processing interfaces. */ totem_config->broadcast_use = 0; res = get_interface_params(totem_config, icmap_get_global_map(), error_string, warnings, 0); if (res < 0) { return res; } /* * Use broadcast is global, so if set, make sure to fill mcast addr correctly * broadcast is only supported for UDP so just do interface 0; */ if (totem_config->broadcast_use) { totemip_parse (&totem_config->interfaces[0].mcast_addr, "255.255.255.255", TOTEM_IP_VERSION_4); } /* * Store automatically generated items back to icmap only for UDP */ if (totem_config->transport_number == TOTEM_TRANSPORT_UDP) { for (i = 0; i < INTERFACE_MAX; i++) { if (!totem_config->interfaces[i].configured) { continue; } snprintf(tmp_key, ICMAP_KEYNAME_MAXLEN, "totem.interface.%u.mcastaddr", i); if (icmap_get_string(tmp_key, &str) == CS_OK) { free(str); } else { str = (char *)totemip_print(&totem_config->interfaces[i].mcast_addr); icmap_set_string(tmp_key, str); } snprintf(tmp_key, ICMAP_KEYNAME_MAXLEN, "totem.interface.%u.mcastport", i); if (icmap_get_uint16(tmp_key, &u16) != CS_OK) { icmap_set_uint16(tmp_key, totem_config->interfaces[i].ip_port); } } } /* * Check existence of nodelist */ if ((icmap_get_string("nodelist.node.0.name", &str) == CS_OK) || (icmap_get_string("nodelist.node.0.ring0_addr", &str) == CS_OK)) { free(str); /* * find local node */ local_node_pos = find_local_node(icmap_get_global_map(), 1); if (local_node_pos != -1) { assert(totem_config->node_id == 0); snprintf(tmp_key, ICMAP_KEYNAME_MAXLEN, "nodelist.node.%u.nodeid", local_node_pos); (void)icmap_get_uint32(tmp_key, &totem_config->node_id); if ((totem_config->transport_number == TOTEM_TRANSPORT_KNET) && (!totem_config->node_id)) { *error_string = "Knet requires an explicit nodeid for the local node"; return -1; } if ((totem_config->transport_number == TOTEM_TRANSPORT_UDP || totem_config->transport_number == TOTEM_TRANSPORT_UDPU) && (!totem_config->node_id)) { snprintf(tmp_key, ICMAP_KEYNAME_MAXLEN, "nodelist.node.%u.ring0_addr", local_node_pos); icmap_get_string(tmp_key, &str); totem_config->node_id = generate_nodeid(totem_config, str); if (totem_config->node_id == -1) { *error_string = "An IPV6 network requires that a node ID be specified"; free(str); return (-1); } totem_config->interfaces[0].member_list[local_node_pos].nodeid = totem_config->node_id; free(str); } /* Users must not change this */ icmap_set_ro_access("nodelist.local_node_pos", 0, 1); } if (put_nodelist_members_to_config(totem_config, icmap_get_global_map(), 0, error_string)) { return -1; } } /* * Get things that might change in the future (and can depend on totem_config->interfaces); */ totem_volatile_config_read(totem_config, icmap_get_global_map(), NULL); calc_knet_ping_timers(totem_config); /* This is now done in the totemknet interface callback */ /* configure_totem_links(totem_config, icmap_get_global_map()); */ add_totem_config_notification(totem_config); return 0; } int totem_config_validate ( struct totem_config *totem_config, const char **error_string) { static char local_error_reason[512]; char parse_error[512]; const char *error_reason = local_error_reason; int i; uint32_t u32; int num_configured = 0; unsigned int interface_max = INTERFACE_MAX; for (i = 0; i < INTERFACE_MAX; i++) { if (totem_config->interfaces[i].configured) { num_configured++; } } if (num_configured == 0) { error_reason = "No interfaces defined"; goto parse_error; } /* Check we found a local node name */ if (icmap_get_uint32("nodelist.local_node_pos", &u32) != CS_OK) { error_reason = "No valid name found for local host"; goto parse_error; } for (i = 0; i < INTERFACE_MAX; i++) { /* * Some error checking of parsed data to make sure its valid */ struct totem_ip_address null_addr; if (!totem_config->interfaces[i].configured) { continue; } memset (&null_addr, 0, sizeof (struct totem_ip_address)); if ((totem_config->transport_number == TOTEM_TRANSPORT_UDP) && memcmp (&totem_config->interfaces[i].mcast_addr, &null_addr, sizeof (struct totem_ip_address)) == 0) { snprintf (local_error_reason, sizeof(local_error_reason), "No multicast address specified for interface %u", i); goto parse_error; } if (totem_config->interfaces[i].ip_port == 0) { snprintf (local_error_reason, sizeof(local_error_reason), "No multicast port specified for interface %u", i); goto parse_error; } if (totem_config->interfaces[i].ttl > 255) { snprintf (local_error_reason, sizeof(local_error_reason), "Invalid TTL (should be 0..255) for interface %u", i); goto parse_error; } if (totem_config->transport_number != TOTEM_TRANSPORT_UDP && totem_config->interfaces[i].ttl != 1) { snprintf (local_error_reason, sizeof(local_error_reason), "Can only set ttl on multicast transport types for interface %u", i); goto parse_error; } if (totem_config->interfaces[i].knet_link_priority > 255) { snprintf (local_error_reason, sizeof(local_error_reason), "Invalid link priority (should be 0..255) for interface %u", i); goto parse_error; } if (totem_config->transport_number != TOTEM_TRANSPORT_KNET && totem_config->interfaces[i].knet_link_priority != 1) { snprintf (local_error_reason, sizeof(local_error_reason), "Can only set link priority on knet transport type for interface %u", i); goto parse_error; } if (totem_config->interfaces[i].mcast_addr.family == AF_INET6 && totem_config->node_id == 0) { snprintf (local_error_reason, sizeof(local_error_reason), "An IPV6 network requires that a node ID be specified for interface %u", i); goto parse_error; } if (totem_config->broadcast_use == 0 && totem_config->transport_number == TOTEM_TRANSPORT_UDP) { if (totem_config->interfaces[i].mcast_addr.family != totem_config->interfaces[i].bindnet.family) { snprintf (local_error_reason, sizeof(local_error_reason), "Multicast address family does not match bind address family for interface %u", i); goto parse_error; } if (totemip_is_mcast (&totem_config->interfaces[i].mcast_addr) != 0) { snprintf (local_error_reason, sizeof(local_error_reason), "mcastaddr is not a correct multicast address for interface %u", i); goto parse_error; } } } if (totem_config->version != 2) { error_reason = "This totem parser can only parse version 2 configurations."; goto parse_error; } if (totem_volatile_config_validate(totem_config, icmap_get_global_map(), error_string) == -1) { return (-1); } if (check_for_duplicate_nodeids(totem_config, error_string) == -1) { return (-1); } /* * KNET Link values validation */ if (strcmp (totem_config->link_mode, "active") && strcmp (totem_config->link_mode, "rr") && strcmp (totem_config->link_mode, "passive")) { snprintf (local_error_reason, sizeof(local_error_reason), "The Knet link mode \"%s\" specified is invalid. It must be active, passive or rr.\n", totem_config->link_mode); goto parse_error; } /* Only Knet does multiple interfaces */ if (totem_config->transport_number != TOTEM_TRANSPORT_KNET) { interface_max = 1; } if (interface_max < num_configured) { snprintf (parse_error, sizeof(parse_error), "%d is too many configured interfaces for non-Knet transport.", num_configured); error_reason = parse_error; goto parse_error; } /* Only knet allows crypto */ if (totem_config->transport_number != TOTEM_TRANSPORT_KNET) { if ((strcmp(totem_config->crypto_cipher_type, "none") != 0) || (strcmp(totem_config->crypto_hash_type, "none") != 0)) { snprintf (parse_error, sizeof(parse_error), "crypto_cipher & crypto_hash are only valid for the Knet transport."); error_reason = parse_error; goto parse_error; } } if (totem_config->net_mtu == 0) { if (totem_config->transport_number == TOTEM_TRANSPORT_KNET) { totem_config->net_mtu = KNET_MAX_PACKET_SIZE; } else { totem_config->net_mtu = UDP_NETMTU; } } return 0; parse_error: snprintf (error_string_response, sizeof(error_string_response), "parse error in config: %s\n", error_reason); *error_string = error_string_response; return (-1); } static int read_keyfile ( const char *key_location, struct totem_config *totem_config, const char **error_string) { int fd; int res; int saved_errno; char error_str[100]; const char *error_ptr; fd = open (key_location, O_RDONLY); if (fd == -1) { error_ptr = qb_strerror_r(errno, error_str, sizeof(error_str)); snprintf (error_string_response, sizeof(error_string_response), "Could not open %s: %s\n", key_location, error_ptr); goto parse_error; } res = read (fd, totem_config->private_key, TOTEM_PRIVATE_KEY_LEN_MAX); saved_errno = errno; close (fd); if (res == -1) { error_ptr = qb_strerror_r (saved_errno, error_str, sizeof(error_str)); snprintf (error_string_response, sizeof(error_string_response), "Could not read %s: %s\n", key_location, error_ptr); goto parse_error; } if (res < TOTEM_PRIVATE_KEY_LEN_MIN) { snprintf (error_string_response, sizeof(error_string_response), "Could only read %d bits of minimum %u bits from %s.\n", res * 8, TOTEM_PRIVATE_KEY_LEN_MIN * 8, key_location); goto parse_error; } totem_config->private_key_len = res; return 0; parse_error: *error_string = error_string_response; return (-1); } int totem_config_keyread ( struct totem_config *totem_config, icmap_map_t map, const char **error_string) { int got_key = 0; char *key_location = NULL; int res; size_t key_len; char old_key[TOTEM_PRIVATE_KEY_LEN_MAX]; size_t old_key_len; /* Take a copy so we can see if it has changed */ memcpy(old_key, totem_config->private_key, sizeof(totem_config->private_key)); old_key_len = totem_config->private_key_len; memset (totem_config->private_key, 0, sizeof(totem_config->private_key)); totem_config->private_key_len = 0; if (strcmp(totem_config->crypto_cipher_type, "none") == 0 && strcmp(totem_config->crypto_hash_type, "none") == 0) { return (0); } /* cmap may store the location of the key file */ if (icmap_get_string_r(map, "totem.keyfile", &key_location) == CS_OK) { res = read_keyfile(key_location, totem_config, error_string); free(key_location); if (res) { goto key_error; } got_key = 1; } else { /* Or the key itself may be in the cmap */ if (icmap_get_r(map, "totem.key", NULL, &key_len, NULL) == CS_OK) { if (key_len > sizeof(totem_config->private_key)) { sprintf(error_string_response, "key is too long"); goto key_error; } if (key_len < TOTEM_PRIVATE_KEY_LEN_MIN) { sprintf(error_string_response, "key is too short"); goto key_error; } if (icmap_get_r(map, "totem.key", totem_config->private_key, &key_len, NULL) == CS_OK) { totem_config->private_key_len = key_len; got_key = 1; } else { sprintf(error_string_response, "can't load private key"); goto key_error; } } } /* In desperation we read the default filename */ if (!got_key) { res = read_keyfile(COROSYSCONFDIR "/authkey", totem_config, error_string); if (res) goto key_error; } if (old_key_len != totem_config->private_key_len || memcmp(old_key, totem_config->private_key, sizeof(totem_config->private_key))) { totem_config->crypto_changed = 1; } return (0); key_error: *error_string = error_string_response; return (-1); } int totem_reread_crypto_config(struct totem_config *totem_config, icmap_map_t map, const char **error_string) { if (totem_get_crypto(totem_config, map, error_string) != 0) { return -1; } if (totem_config_keyread(totem_config, map, error_string) != 0) { return -1; } return 0; } static void debug_dump_totem_config(const struct totem_config *totem_config) { log_printf(LOGSYS_LEVEL_DEBUG, "Token Timeout (%d ms) retransmit timeout (%d ms)", totem_config->token_timeout, totem_config->token_retransmit_timeout); if (totem_config->token_warning) { uint32_t token_warning_ms = totem_config->token_warning * totem_config->token_timeout / 100; log_printf(LOGSYS_LEVEL_DEBUG, "Token warning every %d ms (%d%% of Token Timeout)", token_warning_ms, totem_config->token_warning); if (token_warning_ms < totem_config->token_retransmit_timeout) log_printf (LOGSYS_LEVEL_DEBUG, "The token warning interval (%d ms) is less than the token retransmit timeout (%d ms) " "which can lead to spurious token warnings. Consider increasing the token_warning parameter.", token_warning_ms, totem_config->token_retransmit_timeout); } else log_printf(LOGSYS_LEVEL_DEBUG, "Token warnings disabled"); log_printf(LOGSYS_LEVEL_DEBUG, "token hold (%d ms) retransmits before loss (%d retrans)", totem_config->token_hold_timeout, totem_config->token_retransmits_before_loss_const); log_printf(LOGSYS_LEVEL_DEBUG, "join (%d ms) send_join (%d ms) consensus (%d ms) merge (%d ms)", totem_config->join_timeout, totem_config->send_join_timeout, totem_config->consensus_timeout, totem_config->merge_timeout); log_printf(LOGSYS_LEVEL_DEBUG, "downcheck (%d ms) fail to recv const (%d msgs)", totem_config->downcheck_timeout, totem_config->fail_to_recv_const); log_printf(LOGSYS_LEVEL_DEBUG, "seqno unchanged const (%d rotations) Maximum network MTU %d", totem_config->seqno_unchanged_const, totem_config->net_mtu); log_printf(LOGSYS_LEVEL_DEBUG, "window size per rotation (%d messages) maximum messages per rotation (%d messages)", totem_config->window_size, totem_config->max_messages); log_printf(LOGSYS_LEVEL_DEBUG, "missed count const (%d messages)", totem_config->miss_count_const); log_printf(LOGSYS_LEVEL_DEBUG, "heartbeat_failures_allowed (%d)", totem_config->heartbeat_failures_allowed); log_printf(LOGSYS_LEVEL_DEBUG, "max_network_delay (%d ms)", totem_config->max_network_delay); } static void totem_change_notify( int32_t event, const char *key_name, struct icmap_notify_value new_val, struct icmap_notify_value old_val, void *user_data) { struct totem_config *totem_config = (struct totem_config *)user_data; uint32_t *param; uint8_t reloading; const char *deleted_key = NULL; const char *error_string; /* * If a full reload is in progress then don't do anything until it's done and * can reconfigure it all atomically */ if (icmap_get_uint8("config.reload_in_progress", &reloading) == CS_OK && reloading) return; param = totem_get_param_by_name((struct totem_config *)user_data, key_name); /* * Process change only if changed key is found in totem_config (-> param is not NULL) * or for special key token_coefficient. token_coefficient key is not stored in * totem_config, but it is used for computation of token timeout. */ if (!param && strcmp(key_name, "totem.token_coefficient") != 0) return; /* * Values other than UINT32 are not supported, or needed (yet) */ switch (event) { case ICMAP_TRACK_DELETE: deleted_key = key_name; break; case ICMAP_TRACK_ADD: case ICMAP_TRACK_MODIFY: deleted_key = NULL; break; default: break; } totem_volatile_config_read (totem_config, icmap_get_global_map(), deleted_key); log_printf(LOGSYS_LEVEL_DEBUG, "Totem related config key changed. Dumping actual totem config."); debug_dump_totem_config(totem_config); if (totem_volatile_config_validate(totem_config, icmap_get_global_map(), &error_string) == -1) { log_printf (LOGSYS_LEVEL_ERROR, "%s", error_string); /* * TODO: Consider corosync exit and/or load defaults for volatile * values. For now, log error seems to be enough */ } } int totemconfig_configure_new_params( struct totem_config *totem_config, icmap_map_t map, const char **error_string) { uint64_t warnings = 0LL; get_interface_params(totem_config, map, error_string, &warnings, 1); if (put_nodelist_members_to_config (totem_config, map, 1, error_string)) { return -1; } calc_knet_ping_timers(totem_config); log_printf(LOGSYS_LEVEL_DEBUG, "Configuration reloaded. Dumping actual totem config."); debug_dump_totem_config(totem_config); /* Reinstate the local_node_pos */ (void)find_local_node(map, 0); return 0; } void totemconfig_commit_new_params( struct totem_config *totem_config, icmap_map_t map) { struct totem_interface *new_interfaces = NULL; new_interfaces = malloc (sizeof (struct totem_interface) * INTERFACE_MAX); assert(new_interfaces != NULL); memcpy(new_interfaces, totem_config->interfaces, sizeof (struct totem_interface) * INTERFACE_MAX); /* Set link parameters including local_ip */ configure_totem_links(totem_config, map); /* Add & remove nodes */ compute_and_set_totempg_interfaces(totem_config->orig_interfaces, new_interfaces); /* Does basic global params (like compression) */ totempg_reconfigure(); free(new_interfaces); } static void add_totem_config_notification(struct totem_config *totem_config) { icmap_track_t icmap_track; icmap_track_add("totem.", ICMAP_TRACK_ADD | ICMAP_TRACK_DELETE | ICMAP_TRACK_MODIFY | ICMAP_TRACK_PREFIX, totem_change_notify, totem_config, &icmap_track); } diff --git a/exec/totemsrp.c b/exec/totemsrp.c index 949d367b..d24b11fa 100644 --- a/exec/totemsrp.c +++ b/exec/totemsrp.c @@ -1,5235 +1,5236 @@ /* * Copyright (c) 2003-2006 MontaVista Software, Inc. * Copyright (c) 2006-2018 Red Hat, Inc. * * All rights reserved. * * Author: Steven Dake (sdake@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ /* * The first version of this code was based upon Yair Amir's PhD thesis: * http://www.cs.jhu.edu/~yairamir/phd.ps) (ch4,5). * * The current version of totemsrp implements the Totem protocol specified in: * http://citeseer.ist.psu.edu/amir95totem.html * * The deviations from the above published protocols are: * - token hold mode where token doesn't rotate on unused ring - reduces cpu * usage on 1.6ghz xeon from 35% to less then .1 % as measured by top */ #include #include #ifdef HAVE_ALLOCA_H #include #endif #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define LOGSYS_UTILS_ONLY 1 #include #include "totemsrp.h" #include "totemnet.h" #include "icmap.h" #include "totemconfig.h" #include "cs_queue.h" #define LOCALHOST_IP inet_addr("127.0.0.1") #define QUEUE_RTR_ITEMS_SIZE_MAX 16384 /* allow 16384 retransmit items */ #define RETRANS_MESSAGE_QUEUE_SIZE_MAX 16384 /* allow 500 messages to be queued */ #define RECEIVED_MESSAGE_QUEUE_SIZE_MAX 500 /* allow 500 messages to be queued */ #define MAXIOVS 5 #define RETRANSMIT_ENTRIES_MAX 30 #define TOKEN_SIZE_MAX 64000 /* bytes */ #define LEAVE_DUMMY_NODEID 0 /* * SRP address. */ struct srp_addr { unsigned int nodeid; }; /* * Rollover handling: * SEQNO_START_MSG is the starting sequence number after a new configuration * This should remain zero, unless testing overflow in which case * 0x7ffff000 and 0xfffff000 are good starting values. * * SEQNO_START_TOKEN is the starting sequence number after a new configuration * for a token. This should remain zero, unless testing overflow in which * case 07fffff00 or 0xffffff00 are good starting values. */ #define SEQNO_START_MSG 0x0 #define SEQNO_START_TOKEN 0x0 /* * These can be used ot test different rollover points * #define SEQNO_START_MSG 0xfffffe00 * #define SEQNO_START_TOKEN 0xfffffe00 */ /* * These can be used to test the error recovery algorithms * #define TEST_DROP_ORF_TOKEN_PERCENTAGE 30 * #define TEST_DROP_COMMIT_TOKEN_PERCENTAGE 30 * #define TEST_DROP_MCAST_PERCENTAGE 50 * #define TEST_RECOVERY_MSG_COUNT 300 */ /* * we compare incoming messages to determine if their endian is * different - if so convert them * * do not change */ #define ENDIAN_LOCAL 0xff22 enum message_type { MESSAGE_TYPE_ORF_TOKEN = 0, /* Ordering, Reliability, Flow (ORF) control Token */ MESSAGE_TYPE_MCAST = 1, /* ring ordered multicast message */ MESSAGE_TYPE_MEMB_MERGE_DETECT = 2, /* merge rings if there are available rings */ MESSAGE_TYPE_MEMB_JOIN = 3, /* membership join message */ MESSAGE_TYPE_MEMB_COMMIT_TOKEN = 4, /* membership commit token */ MESSAGE_TYPE_TOKEN_HOLD_CANCEL = 5, /* cancel the holding of the token */ }; enum encapsulation_type { MESSAGE_ENCAPSULATED = 1, MESSAGE_NOT_ENCAPSULATED = 2 }; /* * New membership algorithm local variables */ struct consensus_list_item { struct srp_addr addr; int set; }; struct token_callback_instance { struct qb_list_head list; int (*callback_fn) (enum totem_callback_token_type type, const void *); enum totem_callback_token_type callback_type; int delete; void *data; }; struct totemsrp_socket { int mcast; int token; }; struct mcast { struct totem_message_header header; struct srp_addr system_from; unsigned int seq; int this_seqno; struct memb_ring_id ring_id; unsigned int node_id; int guarantee; } __attribute__((packed)); struct rtr_item { struct memb_ring_id ring_id; unsigned int seq; }__attribute__((packed)); struct orf_token { struct totem_message_header header; unsigned int seq; unsigned int token_seq; unsigned int aru; unsigned int aru_addr; struct memb_ring_id ring_id; unsigned int backlog; unsigned int fcc; int retrans_flg; int rtr_list_entries; struct rtr_item rtr_list[0]; }__attribute__((packed)); struct memb_join { struct totem_message_header header; struct srp_addr system_from; unsigned int proc_list_entries; unsigned int failed_list_entries; unsigned long long ring_seq; unsigned char end_of_memb_join[0]; /* * These parts of the data structure are dynamic: * struct srp_addr proc_list[]; * struct srp_addr failed_list[]; */ } __attribute__((packed)); struct memb_merge_detect { struct totem_message_header header; struct srp_addr system_from; struct memb_ring_id ring_id; } __attribute__((packed)); struct token_hold_cancel { struct totem_message_header header; struct memb_ring_id ring_id; } __attribute__((packed)); struct memb_commit_token_memb_entry { struct memb_ring_id ring_id; unsigned int aru; unsigned int high_delivered; unsigned int received_flg; }__attribute__((packed)); struct memb_commit_token { struct totem_message_header header; unsigned int token_seq; struct memb_ring_id ring_id; unsigned int retrans_flg; int memb_index; int addr_entries; unsigned char end_of_commit_token[0]; /* * These parts of the data structure are dynamic: * * struct srp_addr addr[PROCESSOR_COUNT_MAX]; * struct memb_commit_token_memb_entry memb_list[PROCESSOR_COUNT_MAX]; */ }__attribute__((packed)); struct message_item { struct mcast *mcast; unsigned int msg_len; }; struct sort_queue_item { struct mcast *mcast; unsigned int msg_len; }; enum memb_state { MEMB_STATE_OPERATIONAL = 1, MEMB_STATE_GATHER = 2, MEMB_STATE_COMMIT = 3, MEMB_STATE_RECOVERY = 4 }; struct totemsrp_instance { int iface_changes; int failed_to_recv; /* * Flow control mcasts and remcasts on last and current orf_token */ int fcc_remcast_last; int fcc_mcast_last; int fcc_remcast_current; struct consensus_list_item consensus_list[PROCESSOR_COUNT_MAX]; int consensus_list_entries; int lowest_active_if; struct srp_addr my_id; struct totem_ip_address my_addrs[INTERFACE_MAX]; struct srp_addr my_proc_list[PROCESSOR_COUNT_MAX]; struct srp_addr my_failed_list[PROCESSOR_COUNT_MAX]; struct srp_addr my_new_memb_list[PROCESSOR_COUNT_MAX]; struct srp_addr my_trans_memb_list[PROCESSOR_COUNT_MAX]; struct srp_addr my_memb_list[PROCESSOR_COUNT_MAX]; struct srp_addr my_deliver_memb_list[PROCESSOR_COUNT_MAX]; struct srp_addr my_left_memb_list[PROCESSOR_COUNT_MAX]; unsigned int my_leave_memb_list[PROCESSOR_COUNT_MAX]; int my_proc_list_entries; int my_failed_list_entries; int my_new_memb_entries; int my_trans_memb_entries; int my_memb_entries; int my_deliver_memb_entries; int my_left_memb_entries; int my_leave_memb_entries; struct memb_ring_id my_ring_id; struct memb_ring_id my_old_ring_id; int my_aru_count; int my_merge_detect_timeout_outstanding; unsigned int my_last_aru; int my_seq_unchanged; int my_received_flg; unsigned int my_high_seq_received; unsigned int my_install_seq; int my_rotation_counter; int my_set_retrans_flg; int my_retrans_flg_count; unsigned int my_high_ring_delivered; int heartbeat_timeout; /* * Queues used to order, deliver, and recover messages */ struct cs_queue new_message_queue; struct cs_queue new_message_queue_trans; struct cs_queue retrans_message_queue; struct sq regular_sort_queue; struct sq recovery_sort_queue; /* * Received up to and including */ unsigned int my_aru; unsigned int my_high_delivered; struct qb_list_head token_callback_received_listhead; struct qb_list_head token_callback_sent_listhead; char orf_token_retransmit[TOKEN_SIZE_MAX]; int orf_token_retransmit_size; unsigned int my_token_seq; /* * Timers */ qb_loop_timer_handle timer_pause_timeout; qb_loop_timer_handle timer_orf_token_timeout; qb_loop_timer_handle timer_orf_token_warning; qb_loop_timer_handle timer_orf_token_retransmit_timeout; qb_loop_timer_handle timer_orf_token_hold_retransmit_timeout; qb_loop_timer_handle timer_merge_detect_timeout; qb_loop_timer_handle memb_timer_state_gather_join_timeout; qb_loop_timer_handle memb_timer_state_gather_consensus_timeout; qb_loop_timer_handle memb_timer_state_commit_timeout; qb_loop_timer_handle timer_heartbeat_timeout; /* * Function and data used to log messages */ int totemsrp_log_level_security; int totemsrp_log_level_error; int totemsrp_log_level_warning; int totemsrp_log_level_notice; int totemsrp_log_level_debug; int totemsrp_log_level_trace; int totemsrp_subsys_id; void (*totemsrp_log_printf) ( int level, int subsys, const char *function, const char *file, int line, const char *format, ...)__attribute__((format(printf, 6, 7)));; enum memb_state memb_state; //TODO struct srp_addr next_memb; qb_loop_t *totemsrp_poll_handle; struct totem_ip_address mcast_address; void (*totemsrp_deliver_fn) ( unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required); void (*totemsrp_confchg_fn) ( enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id); void (*totemsrp_service_ready_fn) (void); void (*totemsrp_waiting_trans_ack_cb_fn) ( int waiting_trans_ack); void (*memb_ring_id_create_or_load) ( struct memb_ring_id *memb_ring_id, unsigned int nodeid); void (*memb_ring_id_store) ( const struct memb_ring_id *memb_ring_id, unsigned int nodeid); int global_seqno; int my_token_held; unsigned long long token_ring_id_seq; unsigned int last_released; unsigned int set_aru; int old_ring_state_saved; int old_ring_state_aru; unsigned int old_ring_state_high_seq_received; unsigned int my_last_seq; struct timeval tv_old; void *totemnet_context; struct totem_config *totem_config; unsigned int use_heartbeat; unsigned int my_trc; unsigned int my_pbl; unsigned int my_cbl; uint64_t pause_timestamp; struct memb_commit_token *commit_token; totemsrp_stats_t stats; uint32_t orf_token_discard; uint32_t originated_orf_token; uint32_t threaded_mode_enabled; uint32_t waiting_trans_ack; int flushing; void * token_recv_event_handle; void * token_sent_event_handle; char commit_token_storage[40000]; }; struct message_handlers { int count; int (*handler_functions[6]) ( struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed); }; enum gather_state_from { TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT = 0, TOTEMSRP_GSFROM_GATHER_MISSING1 = 1, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE = 2, TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED = 3, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE = 4, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE = 5, TOTEMSRP_GSFROM_FAILED_TO_RECEIVE = 6, TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE = 7, TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE = 8, TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE = 9, TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE = 10, TOTEMSRP_GSFROM_MERGE_DURING_JOIN = 11, TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE = 12, TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE = 13, TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY = 14, TOTEMSRP_GSFROM_INTERFACE_CHANGE = 15, TOTEMSRP_GSFROM_MAX = TOTEMSRP_GSFROM_INTERFACE_CHANGE, }; const char* gather_state_from_desc [] = { [TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT] = "consensus timeout", [TOTEMSRP_GSFROM_GATHER_MISSING1] = "MISSING", [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE] = "The token was lost in the OPERATIONAL state.", [TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED] = "The consensus timeout expired.", [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE] = "The token was lost in the COMMIT state.", [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE] = "The token was lost in the RECOVERY state.", [TOTEMSRP_GSFROM_FAILED_TO_RECEIVE] = "failed to receive", [TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE] = "foreign message in operational state", [TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE] = "foreign message in gather state", [TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE] = "merge during operational state", [TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE] = "merge during gather state", [TOTEMSRP_GSFROM_MERGE_DURING_JOIN] = "merge during join", [TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE] = "join during operational state", [TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE] = "join during commit state", [TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY] = "join during recovery", [TOTEMSRP_GSFROM_INTERFACE_CHANGE] = "interface change", }; /* * forward decls */ static int message_handler_orf_token ( struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed); static int message_handler_mcast ( struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed); static int message_handler_memb_merge_detect ( struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed); static int message_handler_memb_join ( struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed); static int message_handler_memb_commit_token ( struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed); static int message_handler_token_hold_cancel ( struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed); static void totemsrp_instance_initialize (struct totemsrp_instance *instance); static void srp_addr_to_nodeid ( struct totemsrp_instance *instance, unsigned int *nodeid_out, struct srp_addr *srp_addr_in, unsigned int entries); static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b); static void memb_leave_message_send (struct totemsrp_instance *instance); static void token_callbacks_execute (struct totemsrp_instance *instance, enum totem_callback_token_type type); static void memb_state_gather_enter (struct totemsrp_instance *instance, enum gather_state_from gather_from); static void messages_deliver_to_app (struct totemsrp_instance *instance, int skip, unsigned int end_point); static int orf_token_mcast (struct totemsrp_instance *instance, struct orf_token *oken, int fcc_mcasts_allowed); static void messages_free (struct totemsrp_instance *instance, unsigned int token_aru); static void memb_ring_id_set (struct totemsrp_instance *instance, const struct memb_ring_id *ring_id); static void target_set_completed (void *context); static void memb_state_commit_token_update (struct totemsrp_instance *instance); static void memb_state_commit_token_target_set (struct totemsrp_instance *instance); static int memb_state_commit_token_send (struct totemsrp_instance *instance); static int memb_state_commit_token_send_recovery (struct totemsrp_instance *instance, struct memb_commit_token *memb_commit_token); static void memb_state_commit_token_create (struct totemsrp_instance *instance); static int token_hold_cancel_send (struct totemsrp_instance *instance); static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out); static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out); static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out); static void mcast_endian_convert (const struct mcast *in, struct mcast *out); static void memb_merge_detect_endian_convert ( const struct memb_merge_detect *in, struct memb_merge_detect *out); static struct srp_addr srp_addr_endian_convert (struct srp_addr in); static void timer_function_orf_token_timeout (void *data); static void timer_function_orf_token_warning (void *data); static void timer_function_pause_timeout (void *data); static void timer_function_heartbeat_timeout (void *data); static void timer_function_token_retransmit_timeout (void *data); static void timer_function_token_hold_retransmit_timeout (void *data); static void timer_function_merge_detect_timeout (void *data); static void *totemsrp_buffer_alloc (struct totemsrp_instance *instance); static void totemsrp_buffer_release (struct totemsrp_instance *instance, void *ptr); static const char* gsfrom_to_msg(enum gather_state_from gsfrom); void main_deliver_fn ( void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from); void main_iface_change_fn ( void *context, const struct totem_ip_address *iface_address, unsigned int iface_no); struct message_handlers totemsrp_message_handlers = { 6, { message_handler_orf_token, /* MESSAGE_TYPE_ORF_TOKEN */ message_handler_mcast, /* MESSAGE_TYPE_MCAST */ message_handler_memb_merge_detect, /* MESSAGE_TYPE_MEMB_MERGE_DETECT */ message_handler_memb_join, /* MESSAGE_TYPE_MEMB_JOIN */ message_handler_memb_commit_token, /* MESSAGE_TYPE_MEMB_COMMIT_TOKEN */ message_handler_token_hold_cancel /* MESSAGE_TYPE_TOKEN_HOLD_CANCEL */ } }; #define log_printf(level, format, args...) \ do { \ instance->totemsrp_log_printf ( \ level, instance->totemsrp_subsys_id, \ __FUNCTION__, __FILE__, __LINE__, \ format, ##args); \ } while (0); #define LOGSYS_PERROR(err_num, level, fmt, args...) \ do { \ char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \ const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \ instance->totemsrp_log_printf ( \ level, instance->totemsrp_subsys_id, \ __FUNCTION__, __FILE__, __LINE__, \ fmt ": %s (%d)\n", ##args, _error_ptr, err_num); \ } while(0) static const char* gsfrom_to_msg(enum gather_state_from gsfrom) { if (gsfrom <= TOTEMSRP_GSFROM_MAX) { return gather_state_from_desc[gsfrom]; } else { return "UNKNOWN"; } } static void totemsrp_instance_initialize (struct totemsrp_instance *instance) { memset (instance, 0, sizeof (struct totemsrp_instance)); qb_list_init (&instance->token_callback_received_listhead); qb_list_init (&instance->token_callback_sent_listhead); instance->my_received_flg = 1; instance->my_token_seq = SEQNO_START_TOKEN - 1; instance->memb_state = MEMB_STATE_OPERATIONAL; instance->set_aru = -1; instance->my_aru = SEQNO_START_MSG; instance->my_high_seq_received = SEQNO_START_MSG; instance->my_high_delivered = SEQNO_START_MSG; instance->orf_token_discard = 0; instance->originated_orf_token = 0; instance->commit_token = (struct memb_commit_token *)instance->commit_token_storage; instance->waiting_trans_ack = 1; } static int pause_flush (struct totemsrp_instance *instance) { uint64_t now_msec; uint64_t timestamp_msec; int res = 0; now_msec = (qb_util_nano_current_get () / QB_TIME_NS_IN_MSEC); timestamp_msec = instance->pause_timestamp / QB_TIME_NS_IN_MSEC; if ((now_msec - timestamp_msec) > (instance->totem_config->token_timeout / 2)) { log_printf (instance->totemsrp_log_level_notice, "Process pause detected for %d ms, flushing membership messages.", (unsigned int)(now_msec - timestamp_msec)); /* * -1 indicates an error from recvmsg */ do { res = totemnet_recv_mcast_empty (instance->totemnet_context); } while (res == -1); } return (res); } static int token_event_stats_collector (enum totem_callback_token_type type, const void *void_instance) { struct totemsrp_instance *instance = (struct totemsrp_instance *)void_instance; uint32_t time_now; unsigned long long nano_secs = qb_util_nano_current_get (); time_now = (nano_secs / QB_TIME_NS_IN_MSEC); if (type == TOTEM_CALLBACK_TOKEN_RECEIVED) { /* incr latest token the index */ if (instance->stats.latest_token == (TOTEM_TOKEN_STATS_MAX - 1)) instance->stats.latest_token = 0; else instance->stats.latest_token++; if (instance->stats.earliest_token == instance->stats.latest_token) { /* we have filled up the array, start overwriting */ if (instance->stats.earliest_token == (TOTEM_TOKEN_STATS_MAX - 1)) instance->stats.earliest_token = 0; else instance->stats.earliest_token++; instance->stats.token[instance->stats.earliest_token].rx = 0; instance->stats.token[instance->stats.earliest_token].tx = 0; instance->stats.token[instance->stats.earliest_token].backlog_calc = 0; } instance->stats.token[instance->stats.latest_token].rx = time_now; instance->stats.token[instance->stats.latest_token].tx = 0; /* in case we drop the token */ } else { instance->stats.token[instance->stats.latest_token].tx = time_now; } return 0; } static void totempg_mtu_changed(void *context, int net_mtu) { struct totemsrp_instance *instance = context; instance->totem_config->net_mtu = net_mtu - 2 * sizeof (struct mcast); log_printf (instance->totemsrp_log_level_debug, "Net MTU changed to %d, new value is %d", net_mtu, instance->totem_config->net_mtu); } /* * Exported interfaces */ int totemsrp_initialize ( qb_loop_t *poll_handle, void **srp_context, struct totem_config *totem_config, totempg_stats_t *stats, void (*deliver_fn) ( unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void (*confchg_fn) ( enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id), void (*waiting_trans_ack_cb_fn) ( int waiting_trans_ack)) { struct totemsrp_instance *instance; int res; instance = malloc (sizeof (struct totemsrp_instance)); if (instance == NULL) { goto error_exit; } totemsrp_instance_initialize (instance); instance->totemsrp_waiting_trans_ack_cb_fn = waiting_trans_ack_cb_fn; instance->totemsrp_waiting_trans_ack_cb_fn (1); stats->srp = &instance->stats; instance->stats.latest_token = 0; instance->stats.earliest_token = 0; instance->totem_config = totem_config; /* * Configure logging */ instance->totemsrp_log_level_security = totem_config->totem_logging_configuration.log_level_security; instance->totemsrp_log_level_error = totem_config->totem_logging_configuration.log_level_error; instance->totemsrp_log_level_warning = totem_config->totem_logging_configuration.log_level_warning; instance->totemsrp_log_level_notice = totem_config->totem_logging_configuration.log_level_notice; instance->totemsrp_log_level_debug = totem_config->totem_logging_configuration.log_level_debug; instance->totemsrp_log_level_trace = totem_config->totem_logging_configuration.log_level_trace; instance->totemsrp_subsys_id = totem_config->totem_logging_configuration.log_subsys_id; instance->totemsrp_log_printf = totem_config->totem_logging_configuration.log_printf; /* * Configure totem store and load functions */ instance->memb_ring_id_create_or_load = totem_config->totem_memb_ring_id_create_or_load; instance->memb_ring_id_store = totem_config->totem_memb_ring_id_store; /* * Initialize local variables for totemsrp */ totemip_copy (&instance->mcast_address, &totem_config->interfaces[instance->lowest_active_if].mcast_addr); /* * Display totem configuration */ log_printf (instance->totemsrp_log_level_debug, "Token Timeout (%d ms) retransmit timeout (%d ms)", totem_config->token_timeout, totem_config->token_retransmit_timeout); if (totem_config->token_warning) { uint32_t token_warning_ms = totem_config->token_warning * totem_config->token_timeout / 100; log_printf(instance->totemsrp_log_level_debug, "Token warning every %d ms (%d%% of Token Timeout)", token_warning_ms, totem_config->token_warning); if (token_warning_ms < totem_config->token_retransmit_timeout) log_printf (LOGSYS_LEVEL_DEBUG, "The token warning interval (%d ms) is less than the token retransmit timeout (%d ms) " "which can lead to spurious token warnings. Consider increasing the token_warning parameter.", token_warning_ms, totem_config->token_retransmit_timeout); } else { log_printf(instance->totemsrp_log_level_debug, "Token warnings disabled"); } log_printf (instance->totemsrp_log_level_debug, "token hold (%d ms) retransmits before loss (%d retrans)", totem_config->token_hold_timeout, totem_config->token_retransmits_before_loss_const); log_printf (instance->totemsrp_log_level_debug, "join (%d ms) send_join (%d ms) consensus (%d ms) merge (%d ms)", totem_config->join_timeout, totem_config->send_join_timeout, totem_config->consensus_timeout, totem_config->merge_timeout); log_printf (instance->totemsrp_log_level_debug, "downcheck (%d ms) fail to recv const (%d msgs)", totem_config->downcheck_timeout, totem_config->fail_to_recv_const); log_printf (instance->totemsrp_log_level_debug, "seqno unchanged const (%d rotations) Maximum network MTU %d", totem_config->seqno_unchanged_const, totem_config->net_mtu); log_printf (instance->totemsrp_log_level_debug, "window size per rotation (%d messages) maximum messages per rotation (%d messages)", totem_config->window_size, totem_config->max_messages); log_printf (instance->totemsrp_log_level_debug, "missed count const (%d messages)", totem_config->miss_count_const); log_printf (instance->totemsrp_log_level_debug, "send threads (%d threads)", totem_config->threads); log_printf (instance->totemsrp_log_level_debug, "heartbeat_failures_allowed (%d)", totem_config->heartbeat_failures_allowed); log_printf (instance->totemsrp_log_level_debug, "max_network_delay (%d ms)", totem_config->max_network_delay); cs_queue_init (&instance->retrans_message_queue, RETRANS_MESSAGE_QUEUE_SIZE_MAX, sizeof (struct message_item), instance->threaded_mode_enabled); sq_init (&instance->regular_sort_queue, QUEUE_RTR_ITEMS_SIZE_MAX, sizeof (struct sort_queue_item), 0); sq_init (&instance->recovery_sort_queue, QUEUE_RTR_ITEMS_SIZE_MAX, sizeof (struct sort_queue_item), 0); instance->totemsrp_poll_handle = poll_handle; instance->totemsrp_deliver_fn = deliver_fn; instance->totemsrp_confchg_fn = confchg_fn; instance->use_heartbeat = 1; timer_function_pause_timeout (instance); if ( totem_config->heartbeat_failures_allowed == 0 ) { log_printf (instance->totemsrp_log_level_debug, "HeartBeat is Disabled. To enable set heartbeat_failures_allowed > 0"); instance->use_heartbeat = 0; } if (instance->use_heartbeat) { instance->heartbeat_timeout = (totem_config->heartbeat_failures_allowed) * totem_config->token_retransmit_timeout + totem_config->max_network_delay; if (instance->heartbeat_timeout >= totem_config->token_timeout) { log_printf (instance->totemsrp_log_level_debug, "total heartbeat_timeout (%d ms) is not less than token timeout (%d ms)", instance->heartbeat_timeout, totem_config->token_timeout); log_printf (instance->totemsrp_log_level_debug, "heartbeat_timeout = heartbeat_failures_allowed * token_retransmit_timeout + max_network_delay"); log_printf (instance->totemsrp_log_level_debug, "heartbeat timeout should be less than the token timeout. Heartbeat is disabled!!"); instance->use_heartbeat = 0; } else { log_printf (instance->totemsrp_log_level_debug, "total heartbeat_timeout (%d ms)", instance->heartbeat_timeout); } } res = totemnet_initialize ( poll_handle, &instance->totemnet_context, totem_config, stats->srp, instance, main_deliver_fn, main_iface_change_fn, totempg_mtu_changed, target_set_completed); if (res == -1) { goto error_exit; } instance->my_id.nodeid = instance->totem_config->interfaces[instance->lowest_active_if].boundto.nodeid; /* * Must have net_mtu adjusted by totemnet_initialize first */ cs_queue_init (&instance->new_message_queue, MESSAGE_QUEUE_MAX, sizeof (struct message_item), instance->threaded_mode_enabled); cs_queue_init (&instance->new_message_queue_trans, MESSAGE_QUEUE_MAX, sizeof (struct message_item), instance->threaded_mode_enabled); totemsrp_callback_token_create (instance, &instance->token_recv_event_handle, TOTEM_CALLBACK_TOKEN_RECEIVED, 0, token_event_stats_collector, instance); totemsrp_callback_token_create (instance, &instance->token_sent_event_handle, TOTEM_CALLBACK_TOKEN_SENT, 0, token_event_stats_collector, instance); *srp_context = instance; return (0); error_exit: return (-1); } void totemsrp_finalize ( void *srp_context) { struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context; memb_leave_message_send (instance); totemnet_finalize (instance->totemnet_context); cs_queue_free (&instance->new_message_queue); cs_queue_free (&instance->new_message_queue_trans); cs_queue_free (&instance->retrans_message_queue); sq_free (&instance->regular_sort_queue); sq_free (&instance->recovery_sort_queue); free (instance); } int totemsrp_nodestatus_get ( void *srp_context, unsigned int nodeid, struct totem_node_status *node_status) { struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context; int i; node_status->version = TOTEM_NODE_STATUS_STRUCTURE_VERSION; /* Fill in 'reachable' here as the lower level UDP[u] layers don't know */ for (i = 0; i < instance->my_proc_list_entries; i++) { if (instance->my_proc_list[i].nodeid == nodeid) { node_status->reachable = 1; } } return totemnet_nodestatus_get(instance->totemnet_context, nodeid, node_status); } /* * Return configured interfaces. interfaces is array of totem_ip addresses allocated by caller, * with interaces_size number of items. iface_count is final number of interfaces filled by this * function. * * Function returns 0 on success, otherwise if interfaces array is not big enough, -2 is returned, * and if interface was not found, -1 is returned. */ int totemsrp_ifaces_get ( void *srp_context, unsigned int nodeid, unsigned int *interface_id, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count) { struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context; struct totem_ip_address *iface_ptr = interfaces; int res = 0; int i,n; int num_ifs = 0; memset(interfaces, 0, sizeof(struct totem_ip_address) * interfaces_size); *iface_count = INTERFACE_MAX; for (i=0; itotem_config->interfaces[i].member_count; n++) { if (instance->totem_config->interfaces[i].configured && instance->totem_config->interfaces[i].member_list[n].nodeid == nodeid) { memcpy(iface_ptr, &instance->totem_config->interfaces[i].member_list[n], sizeof(struct totem_ip_address)); interface_id[num_ifs] = i; iface_ptr++; if (++num_ifs > interfaces_size) { res = -2; break; } } } } totemnet_ifaces_get(instance->totemnet_context, status, iface_count); *iface_count = num_ifs; return (res); } int totemsrp_crypto_set ( void *srp_context, const char *cipher_type, const char *hash_type) { struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context; int res; res = totemnet_crypto_set(instance->totemnet_context, cipher_type, hash_type); return (res); } unsigned int totemsrp_my_nodeid_get ( void *srp_context) { struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context; unsigned int res; res = instance->my_id.nodeid; return (res); } int totemsrp_my_family_get ( void *srp_context) { struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context; int res; res = instance->totem_config->interfaces[instance->lowest_active_if].boundto.family; return (res); } /* * Set operations for use by the membership algorithm */ static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b) { if (a->nodeid == b->nodeid) { return 1; } return 0; } static void srp_addr_to_nodeid ( struct totemsrp_instance *instance, unsigned int *nodeid_out, struct srp_addr *srp_addr_in, unsigned int entries) { unsigned int i; for (i = 0; i < entries; i++) { nodeid_out[i] = srp_addr_in[i].nodeid; } } static struct srp_addr srp_addr_endian_convert (struct srp_addr in) { struct srp_addr res; res.nodeid = swab32 (in.nodeid); return (res); } static void memb_consensus_reset (struct totemsrp_instance *instance) { instance->consensus_list_entries = 0; } static void memb_set_subtract ( struct srp_addr *out_list, int *out_list_entries, struct srp_addr *one_list, int one_list_entries, struct srp_addr *two_list, int two_list_entries) { int found = 0; int i; int j; *out_list_entries = 0; for (i = 0; i < one_list_entries; i++) { for (j = 0; j < two_list_entries; j++) { if (srp_addr_equal (&one_list[i], &two_list[j])) { found = 1; break; } } if (found == 0) { out_list[*out_list_entries] = one_list[i]; *out_list_entries = *out_list_entries + 1; } found = 0; } } /* * Set consensus for a specific processor */ static void memb_consensus_set ( struct totemsrp_instance *instance, const struct srp_addr *addr) { int found = 0; int i; for (i = 0; i < instance->consensus_list_entries; i++) { if (srp_addr_equal(addr, &instance->consensus_list[i].addr)) { found = 1; break; /* found entry */ } } instance->consensus_list[i].addr = *addr; instance->consensus_list[i].set = 1; if (found == 0) { instance->consensus_list_entries++; } return; } /* * Is consensus set for a specific processor */ static int memb_consensus_isset ( struct totemsrp_instance *instance, const struct srp_addr *addr) { int i; for (i = 0; i < instance->consensus_list_entries; i++) { if (srp_addr_equal (addr, &instance->consensus_list[i].addr)) { return (instance->consensus_list[i].set); } } return (0); } /* * Is consensus agreed upon based upon consensus database */ static int memb_consensus_agreed ( struct totemsrp_instance *instance) { struct srp_addr token_memb[PROCESSOR_COUNT_MAX]; int token_memb_entries = 0; int agreed = 1; int i; memb_set_subtract (token_memb, &token_memb_entries, instance->my_proc_list, instance->my_proc_list_entries, instance->my_failed_list, instance->my_failed_list_entries); for (i = 0; i < token_memb_entries; i++) { if (memb_consensus_isset (instance, &token_memb[i]) == 0) { agreed = 0; break; } } if (agreed && instance->failed_to_recv == 1) { /* * Both nodes agreed on our failure. We don't care how many proc list items left because we * will create single ring anyway. */ return (agreed); } assert (token_memb_entries >= 1); return (agreed); } static void memb_consensus_notset ( struct totemsrp_instance *instance, struct srp_addr *no_consensus_list, int *no_consensus_list_entries, struct srp_addr *comparison_list, int comparison_list_entries) { int i; *no_consensus_list_entries = 0; for (i = 0; i < instance->my_proc_list_entries; i++) { if (memb_consensus_isset (instance, &instance->my_proc_list[i]) == 0) { no_consensus_list[*no_consensus_list_entries] = instance->my_proc_list[i]; *no_consensus_list_entries = *no_consensus_list_entries + 1; } } } /* * Is set1 equal to set2 Entries can be in different orders */ static int memb_set_equal ( struct srp_addr *set1, int set1_entries, struct srp_addr *set2, int set2_entries) { int i; int j; int found = 0; if (set1_entries != set2_entries) { return (0); } for (i = 0; i < set2_entries; i++) { for (j = 0; j < set1_entries; j++) { if (srp_addr_equal (&set1[j], &set2[i])) { found = 1; break; } } if (found == 0) { return (0); } found = 0; } return (1); } /* * Is subset fully contained in fullset */ static int memb_set_subset ( const struct srp_addr *subset, int subset_entries, const struct srp_addr *fullset, int fullset_entries) { int i; int j; int found = 0; if (subset_entries > fullset_entries) { return (0); } for (i = 0; i < subset_entries; i++) { for (j = 0; j < fullset_entries; j++) { if (srp_addr_equal (&subset[i], &fullset[j])) { found = 1; } } if (found == 0) { return (0); } found = 0; } return (1); } /* * merge subset into fullset taking care not to add duplicates */ static void memb_set_merge ( const struct srp_addr *subset, int subset_entries, struct srp_addr *fullset, int *fullset_entries) { int found = 0; int i; int j; for (i = 0; i < subset_entries; i++) { for (j = 0; j < *fullset_entries; j++) { if (srp_addr_equal (&fullset[j], &subset[i])) { found = 1; break; } } if (found == 0) { fullset[*fullset_entries] = subset[i]; *fullset_entries = *fullset_entries + 1; } found = 0; } return; } static void memb_set_and_with_ring_id ( struct srp_addr *set1, struct memb_ring_id *set1_ring_ids, int set1_entries, struct srp_addr *set2, int set2_entries, struct memb_ring_id *old_ring_id, struct srp_addr *and, int *and_entries) { int i; int j; int found = 0; *and_entries = 0; for (i = 0; i < set2_entries; i++) { for (j = 0; j < set1_entries; j++) { if (srp_addr_equal (&set1[j], &set2[i])) { if (memcmp (&set1_ring_ids[j], old_ring_id, sizeof (struct memb_ring_id)) == 0) { found = 1; } break; } } if (found) { and[*and_entries] = set1[j]; *and_entries = *and_entries + 1; } found = 0; } return; } static void memb_set_log( struct totemsrp_instance *instance, int level, const char *string, struct srp_addr *list, int list_entries) { char int_buf[32]; char list_str[512]; int i; memset(list_str, 0, sizeof(list_str)); for (i = 0; i < list_entries; i++) { if (i == 0) { snprintf(int_buf, sizeof(int_buf), CS_PRI_NODE_ID, list[i].nodeid); } else { snprintf(int_buf, sizeof(int_buf), "," CS_PRI_NODE_ID, list[i].nodeid); } if (strlen(list_str) + strlen(int_buf) >= sizeof(list_str)) { break ; } strcat(list_str, int_buf); } log_printf(level, "List '%s' contains %d entries: %s", string, list_entries, list_str); } static void my_leave_memb_clear( struct totemsrp_instance *instance) { memset(instance->my_leave_memb_list, 0, sizeof(instance->my_leave_memb_list)); instance->my_leave_memb_entries = 0; } static unsigned int my_leave_memb_match( struct totemsrp_instance *instance, unsigned int nodeid) { int i; unsigned int ret = 0; for (i = 0; i < instance->my_leave_memb_entries; i++){ if (instance->my_leave_memb_list[i] == nodeid){ ret = nodeid; break; } } return ret; } static void my_leave_memb_set( struct totemsrp_instance *instance, unsigned int nodeid) { int i, found = 0; for (i = 0; i < instance->my_leave_memb_entries; i++){ if (instance->my_leave_memb_list[i] == nodeid){ found = 1; break; } } if (found == 1) { return; } if (instance->my_leave_memb_entries < (PROCESSOR_COUNT_MAX - 1)) { instance->my_leave_memb_list[instance->my_leave_memb_entries] = nodeid; instance->my_leave_memb_entries++; } else { log_printf (instance->totemsrp_log_level_warning, "Cannot set LEAVE nodeid=" CS_PRI_NODE_ID, nodeid); } } static void *totemsrp_buffer_alloc (struct totemsrp_instance *instance) { assert (instance != NULL); return totemnet_buffer_alloc (instance->totemnet_context); } static void totemsrp_buffer_release (struct totemsrp_instance *instance, void *ptr) { assert (instance != NULL); totemnet_buffer_release (instance->totemnet_context, ptr); } static void reset_token_retransmit_timeout (struct totemsrp_instance *instance) { int32_t res; qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_retransmit_timeout); res = qb_loop_timer_add (instance->totemsrp_poll_handle, QB_LOOP_MED, instance->totem_config->token_retransmit_timeout*QB_TIME_NS_IN_MSEC, (void *)instance, timer_function_token_retransmit_timeout, &instance->timer_orf_token_retransmit_timeout); if (res != 0) { log_printf(instance->totemsrp_log_level_error, "reset_token_retransmit_timeout - qb_loop_timer_add error : %d", res); } } static void start_merge_detect_timeout (struct totemsrp_instance *instance) { int32_t res; if (instance->my_merge_detect_timeout_outstanding == 0) { res = qb_loop_timer_add (instance->totemsrp_poll_handle, QB_LOOP_MED, instance->totem_config->merge_timeout*QB_TIME_NS_IN_MSEC, (void *)instance, timer_function_merge_detect_timeout, &instance->timer_merge_detect_timeout); if (res != 0) { log_printf(instance->totemsrp_log_level_error, "start_merge_detect_timeout - qb_loop_timer_add error : %d", res); } instance->my_merge_detect_timeout_outstanding = 1; } } static void cancel_merge_detect_timeout (struct totemsrp_instance *instance) { qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_merge_detect_timeout); instance->my_merge_detect_timeout_outstanding = 0; } /* * ring_state_* is used to save and restore the sort queue * state when a recovery operation fails (and enters gather) */ static void old_ring_state_save (struct totemsrp_instance *instance) { if (instance->old_ring_state_saved == 0) { instance->old_ring_state_saved = 1; memcpy (&instance->my_old_ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id)); instance->old_ring_state_aru = instance->my_aru; instance->old_ring_state_high_seq_received = instance->my_high_seq_received; log_printf (instance->totemsrp_log_level_debug, "Saving state aru %x high seq received %x", instance->my_aru, instance->my_high_seq_received); } } static void old_ring_state_restore (struct totemsrp_instance *instance) { instance->my_aru = instance->old_ring_state_aru; instance->my_high_seq_received = instance->old_ring_state_high_seq_received; log_printf (instance->totemsrp_log_level_debug, "Restoring instance->my_aru %x my high seq received %x", instance->my_aru, instance->my_high_seq_received); } static void old_ring_state_reset (struct totemsrp_instance *instance) { log_printf (instance->totemsrp_log_level_debug, "Resetting old ring state"); instance->old_ring_state_saved = 0; } static void reset_pause_timeout (struct totemsrp_instance *instance) { int32_t res; qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_pause_timeout); res = qb_loop_timer_add (instance->totemsrp_poll_handle, QB_LOOP_MED, instance->totem_config->token_timeout * QB_TIME_NS_IN_MSEC / 5, (void *)instance, timer_function_pause_timeout, &instance->timer_pause_timeout); if (res != 0) { log_printf(instance->totemsrp_log_level_error, "reset_pause_timeout - qb_loop_timer_add error : %d", res); } } static void reset_token_warning (struct totemsrp_instance *instance) { int32_t res; qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_warning); res = qb_loop_timer_add (instance->totemsrp_poll_handle, QB_LOOP_MED, instance->totem_config->token_warning * instance->totem_config->token_timeout / 100 * QB_TIME_NS_IN_MSEC, (void *)instance, timer_function_orf_token_warning, &instance->timer_orf_token_warning); if (res != 0) { log_printf(instance->totemsrp_log_level_error, "reset_token_warning - qb_loop_timer_add error : %d", res); } } static void reset_token_timeout (struct totemsrp_instance *instance) { int32_t res; qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_timeout); res = qb_loop_timer_add (instance->totemsrp_poll_handle, QB_LOOP_MED, instance->totem_config->token_timeout*QB_TIME_NS_IN_MSEC, (void *)instance, timer_function_orf_token_timeout, &instance->timer_orf_token_timeout); if (res != 0) { log_printf(instance->totemsrp_log_level_error, "reset_token_timeout - qb_loop_timer_add error : %d", res); } if (instance->totem_config->token_warning) reset_token_warning(instance); } static void reset_heartbeat_timeout (struct totemsrp_instance *instance) { int32_t res; qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_heartbeat_timeout); res = qb_loop_timer_add (instance->totemsrp_poll_handle, QB_LOOP_MED, instance->heartbeat_timeout*QB_TIME_NS_IN_MSEC, (void *)instance, timer_function_heartbeat_timeout, &instance->timer_heartbeat_timeout); if (res != 0) { log_printf(instance->totemsrp_log_level_error, "reset_heartbeat_timeout - qb_loop_timer_add error : %d", res); } } static void cancel_token_warning (struct totemsrp_instance *instance) { qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_warning); } static void cancel_token_timeout (struct totemsrp_instance *instance) { qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_timeout); if (instance->totem_config->token_warning) cancel_token_warning(instance); } static void cancel_heartbeat_timeout (struct totemsrp_instance *instance) { qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_heartbeat_timeout); } static void cancel_token_retransmit_timeout (struct totemsrp_instance *instance) { qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_retransmit_timeout); } static void start_token_hold_retransmit_timeout (struct totemsrp_instance *instance) { int32_t res; res = qb_loop_timer_add (instance->totemsrp_poll_handle, QB_LOOP_MED, instance->totem_config->token_hold_timeout*QB_TIME_NS_IN_MSEC, (void *)instance, timer_function_token_hold_retransmit_timeout, &instance->timer_orf_token_hold_retransmit_timeout); if (res != 0) { log_printf(instance->totemsrp_log_level_error, "start_token_hold_retransmit_timeout - qb_loop_timer_add error : %d", res); } } static void cancel_token_hold_retransmit_timeout (struct totemsrp_instance *instance) { qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_hold_retransmit_timeout); } static void memb_state_consensus_timeout_expired ( struct totemsrp_instance *instance) { struct srp_addr no_consensus_list[PROCESSOR_COUNT_MAX]; int no_consensus_list_entries; instance->stats.consensus_timeouts++; if (memb_consensus_agreed (instance)) { memb_consensus_reset (instance); memb_consensus_set (instance, &instance->my_id); reset_token_timeout (instance); // REVIEWED } else { memb_consensus_notset ( instance, no_consensus_list, &no_consensus_list_entries, instance->my_proc_list, instance->my_proc_list_entries); memb_set_merge (no_consensus_list, no_consensus_list_entries, instance->my_failed_list, &instance->my_failed_list_entries); memb_state_gather_enter (instance, TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT); } } static void memb_join_message_send (struct totemsrp_instance *instance); static void memb_merge_detect_transmit (struct totemsrp_instance *instance); /* * Timers used for various states of the membership algorithm */ static void timer_function_pause_timeout (void *data) { struct totemsrp_instance *instance = data; instance->pause_timestamp = qb_util_nano_current_get (); reset_pause_timeout (instance); } static void memb_recovery_state_token_loss (struct totemsrp_instance *instance) { old_ring_state_restore (instance); memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE); instance->stats.recovery_token_lost++; } static void timer_function_orf_token_warning (void *data) { struct totemsrp_instance *instance = data; uint64_t tv_diff; /* need to protect against the case where token_warning is set to 0 dynamically */ if (instance->totem_config->token_warning) { tv_diff = qb_util_nano_current_get () / QB_TIME_NS_IN_MSEC - instance->stats.token[instance->stats.latest_token].rx; log_printf (instance->totemsrp_log_level_notice, "Token has not been received in %d ms ", (unsigned int) tv_diff); reset_token_warning(instance); } else { cancel_token_warning(instance); } } static void timer_function_orf_token_timeout (void *data) { struct totemsrp_instance *instance = data; switch (instance->memb_state) { case MEMB_STATE_OPERATIONAL: log_printf (instance->totemsrp_log_level_debug, "The token was lost in the OPERATIONAL state."); log_printf (instance->totemsrp_log_level_notice, "A processor failed, forming new configuration:" " token timed out (%ums), waiting %ums for consensus.", instance->totem_config->token_timeout, instance->totem_config->consensus_timeout); totemnet_iface_check (instance->totemnet_context); memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE); instance->stats.operational_token_lost++; break; case MEMB_STATE_GATHER: log_printf (instance->totemsrp_log_level_debug, "The consensus timeout expired (%ums).", instance->totem_config->consensus_timeout); memb_state_consensus_timeout_expired (instance); memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED); instance->stats.gather_token_lost++; break; case MEMB_STATE_COMMIT: log_printf (instance->totemsrp_log_level_debug, "The token was lost in the COMMIT state."); memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE); instance->stats.commit_token_lost++; break; case MEMB_STATE_RECOVERY: log_printf (instance->totemsrp_log_level_debug, "The token was lost in the RECOVERY state."); memb_recovery_state_token_loss (instance); instance->orf_token_discard = 1; break; } } static void timer_function_heartbeat_timeout (void *data) { struct totemsrp_instance *instance = data; log_printf (instance->totemsrp_log_level_debug, "HeartBeat Timer expired Invoking token loss mechanism in state %d ", instance->memb_state); timer_function_orf_token_timeout(data); } static void memb_timer_function_state_gather (void *data) { struct totemsrp_instance *instance = data; int32_t res; switch (instance->memb_state) { case MEMB_STATE_OPERATIONAL: case MEMB_STATE_RECOVERY: assert (0); /* this should never happen */ break; case MEMB_STATE_GATHER: case MEMB_STATE_COMMIT: memb_join_message_send (instance); /* * Restart the join timeout `*/ qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout); res = qb_loop_timer_add (instance->totemsrp_poll_handle, QB_LOOP_MED, instance->totem_config->join_timeout*QB_TIME_NS_IN_MSEC, (void *)instance, memb_timer_function_state_gather, &instance->memb_timer_state_gather_join_timeout); if (res != 0) { log_printf(instance->totemsrp_log_level_error, "memb_timer_function_state_gather - qb_loop_timer_add error : %d", res); } break; } } static void memb_timer_function_gather_consensus_timeout (void *data) { struct totemsrp_instance *instance = data; memb_state_consensus_timeout_expired (instance); } static void deliver_messages_from_recovery_to_regular (struct totemsrp_instance *instance) { unsigned int i; struct sort_queue_item *recovery_message_item; struct sort_queue_item regular_message_item; unsigned int range = 0; int res; void *ptr; struct mcast *mcast; log_printf (instance->totemsrp_log_level_debug, "recovery to regular %x-%x", SEQNO_START_MSG + 1, instance->my_aru); range = instance->my_aru - SEQNO_START_MSG; /* * Move messages from recovery to regular sort queue */ // todo should i be initialized to 0 or 1 ? for (i = 1; i <= range; i++) { res = sq_item_get (&instance->recovery_sort_queue, i + SEQNO_START_MSG, &ptr); if (res != 0) { continue; } recovery_message_item = ptr; /* * Convert recovery message into regular message */ mcast = recovery_message_item->mcast; if (mcast->header.encapsulated == MESSAGE_ENCAPSULATED) { /* * Message is a recovery message encapsulated * in a new ring message */ regular_message_item.mcast = (struct mcast *)(((char *)recovery_message_item->mcast) + sizeof (struct mcast)); regular_message_item.msg_len = recovery_message_item->msg_len - sizeof (struct mcast); mcast = regular_message_item.mcast; } else { /* * TODO this case shouldn't happen */ continue; } log_printf (instance->totemsrp_log_level_debug, "comparing if ring id is for this processors old ring seqno " CS_PRI_RING_ID_SEQ, (uint64_t)mcast->seq); /* * Only add this message to the regular sort * queue if it was originated with the same ring * id as the previous ring */ if (memcmp (&instance->my_old_ring_id, &mcast->ring_id, sizeof (struct memb_ring_id)) == 0) { res = sq_item_inuse (&instance->regular_sort_queue, mcast->seq); if (res == 0) { sq_item_add (&instance->regular_sort_queue, ®ular_message_item, mcast->seq); if (sq_lt_compare (instance->old_ring_state_high_seq_received, mcast->seq)) { instance->old_ring_state_high_seq_received = mcast->seq; } } } else { log_printf (instance->totemsrp_log_level_debug, "-not adding msg with seq no " CS_PRI_RING_ID_SEQ, (uint64_t)mcast->seq); } } } /* * Change states in the state machine of the membership algorithm */ static void memb_state_operational_enter (struct totemsrp_instance *instance) { struct srp_addr joined_list[PROCESSOR_COUNT_MAX]; int joined_list_entries = 0; unsigned int aru_save; unsigned int joined_list_totemip[PROCESSOR_COUNT_MAX]; unsigned int trans_memb_list_totemip[PROCESSOR_COUNT_MAX]; unsigned int new_memb_list_totemip[PROCESSOR_COUNT_MAX]; unsigned int left_list[PROCESSOR_COUNT_MAX]; unsigned int i; unsigned int res; char left_node_msg[1024]; char joined_node_msg[1024]; char failed_node_msg[1024]; instance->originated_orf_token = 0; memb_consensus_reset (instance); old_ring_state_reset (instance); deliver_messages_from_recovery_to_regular (instance); log_printf (instance->totemsrp_log_level_trace, "Delivering to app %x to %x", instance->my_high_delivered + 1, instance->old_ring_state_high_seq_received); aru_save = instance->my_aru; instance->my_aru = instance->old_ring_state_aru; messages_deliver_to_app (instance, 0, instance->old_ring_state_high_seq_received); /* * Calculate joined and left list */ memb_set_subtract (instance->my_left_memb_list, &instance->my_left_memb_entries, instance->my_memb_list, instance->my_memb_entries, instance->my_trans_memb_list, instance->my_trans_memb_entries); memb_set_subtract (joined_list, &joined_list_entries, instance->my_new_memb_list, instance->my_new_memb_entries, instance->my_trans_memb_list, instance->my_trans_memb_entries); /* * Install new membership */ instance->my_memb_entries = instance->my_new_memb_entries; memcpy (&instance->my_memb_list, instance->my_new_memb_list, sizeof (struct srp_addr) * instance->my_memb_entries); instance->last_released = 0; instance->my_set_retrans_flg = 0; /* * Deliver transitional configuration to application */ srp_addr_to_nodeid (instance, left_list, instance->my_left_memb_list, instance->my_left_memb_entries); srp_addr_to_nodeid (instance, trans_memb_list_totemip, instance->my_trans_memb_list, instance->my_trans_memb_entries); instance->totemsrp_confchg_fn (TOTEM_CONFIGURATION_TRANSITIONAL, trans_memb_list_totemip, instance->my_trans_memb_entries, left_list, instance->my_left_memb_entries, 0, 0, &instance->my_ring_id); instance->waiting_trans_ack = 1; instance->totemsrp_waiting_trans_ack_cb_fn (1); // TODO we need to filter to ensure we only deliver those // messages which are part of instance->my_deliver_memb messages_deliver_to_app (instance, 1, instance->old_ring_state_high_seq_received); instance->my_aru = aru_save; /* * Deliver regular configuration to application */ srp_addr_to_nodeid (instance, new_memb_list_totemip, instance->my_new_memb_list, instance->my_new_memb_entries); srp_addr_to_nodeid (instance, joined_list_totemip, joined_list, joined_list_entries); instance->totemsrp_confchg_fn (TOTEM_CONFIGURATION_REGULAR, new_memb_list_totemip, instance->my_new_memb_entries, 0, 0, joined_list_totemip, joined_list_entries, &instance->my_ring_id); /* * The recovery sort queue now becomes the regular * sort queue. It is necessary to copy the state * into the regular sort queue. */ sq_copy (&instance->regular_sort_queue, &instance->recovery_sort_queue); instance->my_last_aru = SEQNO_START_MSG; /* When making my_proc_list smaller, ensure that the * now non-used entries are zero-ed out. There are some suspect * assert's that assume that there is always 2 entries in the list. * These fail when my_proc_list is reduced to 1 entry (and the * valid [0] entry is the same as the 'unused' [1] entry). */ memset(instance->my_proc_list, 0, sizeof (struct srp_addr) * instance->my_proc_list_entries); instance->my_proc_list_entries = instance->my_new_memb_entries; memcpy (instance->my_proc_list, instance->my_new_memb_list, sizeof (struct srp_addr) * instance->my_memb_entries); instance->my_failed_list_entries = 0; /* * TODO Not exactly to spec * * At the entry to this function all messages without a gap are * deliered. * * This code throw away messages from the last gap in the sort queue * to my_high_seq_received * * What should really happen is we should deliver all messages up to * a gap, then delier the transitional configuration, then deliver * the messages between the first gap and my_high_seq_received, then * deliver a regular configuration, then deliver the regular * configuration * * Unfortunately totempg doesn't appear to like this operating mode * which needs more inspection */ i = instance->my_high_seq_received + 1; do { void *ptr; i -= 1; res = sq_item_get (&instance->regular_sort_queue, i, &ptr); if (i == 0) { break; } } while (res); instance->my_high_delivered = i; for (i = 0; i <= instance->my_high_delivered; i++) { void *ptr; res = sq_item_get (&instance->regular_sort_queue, i, &ptr); if (res == 0) { struct sort_queue_item *regular_message; regular_message = ptr; free (regular_message->mcast); } } sq_items_release (&instance->regular_sort_queue, instance->my_high_delivered); instance->last_released = instance->my_high_delivered; if (joined_list_entries) { int sptr = 0; sptr += snprintf(joined_node_msg, sizeof(joined_node_msg)-sptr, " joined:"); for (i=0; i< joined_list_entries; i++) { sptr += snprintf(joined_node_msg+sptr, sizeof(joined_node_msg)-sptr, " " CS_PRI_NODE_ID, joined_list_totemip[i]); } } else { joined_node_msg[0] = '\0'; } if (instance->my_left_memb_entries) { int sptr = 0; int sptr2 = 0; sptr += snprintf(left_node_msg, sizeof(left_node_msg)-sptr, " left:"); for (i=0; i< instance->my_left_memb_entries; i++) { sptr += snprintf(left_node_msg+sptr, sizeof(left_node_msg)-sptr, " " CS_PRI_NODE_ID, left_list[i]); } for (i=0; i< instance->my_left_memb_entries; i++) { if (my_leave_memb_match(instance, left_list[i]) == 0) { if (sptr2 == 0) { sptr2 += snprintf(failed_node_msg, sizeof(failed_node_msg)-sptr2, " failed:"); } sptr2 += snprintf(failed_node_msg+sptr2, sizeof(left_node_msg)-sptr2, " " CS_PRI_NODE_ID, left_list[i]); } } if (sptr2 == 0) { failed_node_msg[0] = '\0'; } } else { left_node_msg[0] = '\0'; failed_node_msg[0] = '\0'; } my_leave_memb_clear(instance); log_printf (instance->totemsrp_log_level_debug, "entering OPERATIONAL state."); log_printf (instance->totemsrp_log_level_notice, "A new membership (" CS_PRI_RING_ID ") was formed. Members%s%s", instance->my_ring_id.rep, (uint64_t)instance->my_ring_id.seq, joined_node_msg, left_node_msg); if (strlen(failed_node_msg)) { log_printf (instance->totemsrp_log_level_notice, "Failed to receive the leave message.%s", failed_node_msg); } instance->memb_state = MEMB_STATE_OPERATIONAL; instance->stats.operational_entered++; instance->stats.continuous_gather = 0; instance->my_received_flg = 1; reset_pause_timeout (instance); /* * Save ring id information from this configuration to determine * which processors are transitioning from old regular configuration * in to new regular configuration on the next configuration change */ memcpy (&instance->my_old_ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id)); return; } static void memb_state_gather_enter ( struct totemsrp_instance *instance, enum gather_state_from gather_from) { int32_t res; instance->orf_token_discard = 1; instance->originated_orf_token = 0; memb_set_merge ( &instance->my_id, 1, instance->my_proc_list, &instance->my_proc_list_entries); memb_join_message_send (instance); /* * Restart the join timeout */ qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout); res = qb_loop_timer_add (instance->totemsrp_poll_handle, QB_LOOP_MED, instance->totem_config->join_timeout*QB_TIME_NS_IN_MSEC, (void *)instance, memb_timer_function_state_gather, &instance->memb_timer_state_gather_join_timeout); if (res != 0) { log_printf(instance->totemsrp_log_level_error, "memb_state_gather_enter - qb_loop_timer_add error(1) : %d", res); } /* * Restart the consensus timeout */ qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_consensus_timeout); res = qb_loop_timer_add (instance->totemsrp_poll_handle, QB_LOOP_MED, instance->totem_config->consensus_timeout*QB_TIME_NS_IN_MSEC, (void *)instance, memb_timer_function_gather_consensus_timeout, &instance->memb_timer_state_gather_consensus_timeout); if (res != 0) { log_printf(instance->totemsrp_log_level_error, "memb_state_gather_enter - qb_loop_timer_add error(2) : %d", res); } /* * Cancel the token loss and token retransmission timeouts */ cancel_token_retransmit_timeout (instance); // REVIEWED cancel_token_timeout (instance); // REVIEWED cancel_merge_detect_timeout (instance); memb_consensus_reset (instance); memb_consensus_set (instance, &instance->my_id); log_printf (instance->totemsrp_log_level_debug, "entering GATHER state from %d(%s).", gather_from, gsfrom_to_msg(gather_from)); instance->memb_state = MEMB_STATE_GATHER; instance->stats.gather_entered++; if (gather_from == TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED) { /* * State 3 means gather, so we are continuously gathering. */ instance->stats.continuous_gather++; } return; } static void timer_function_token_retransmit_timeout (void *data); static void target_set_completed ( void *context) { struct totemsrp_instance *instance = (struct totemsrp_instance *)context; memb_state_commit_token_send (instance); } static void memb_state_commit_enter ( struct totemsrp_instance *instance) { old_ring_state_save (instance); memb_state_commit_token_update (instance); memb_state_commit_token_target_set (instance); qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout); instance->memb_timer_state_gather_join_timeout = 0; qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_consensus_timeout); instance->memb_timer_state_gather_consensus_timeout = 0; memb_ring_id_set (instance, &instance->commit_token->ring_id); instance->memb_ring_id_store (&instance->my_ring_id, instance->my_id.nodeid); instance->token_ring_id_seq = instance->my_ring_id.seq; log_printf (instance->totemsrp_log_level_debug, "entering COMMIT state."); instance->memb_state = MEMB_STATE_COMMIT; reset_token_retransmit_timeout (instance); // REVIEWED reset_token_timeout (instance); // REVIEWED instance->stats.commit_entered++; instance->stats.continuous_gather = 0; /* * reset all flow control variables since we are starting a new ring */ instance->my_trc = 0; instance->my_pbl = 0; instance->my_cbl = 0; /* * commit token sent after callback that token target has been set */ } static void memb_state_recovery_enter ( struct totemsrp_instance *instance, struct memb_commit_token *commit_token) { int i; int local_received_flg = 1; unsigned int low_ring_aru; unsigned int range = 0; unsigned int messages_originated = 0; const struct srp_addr *addr; struct memb_commit_token_memb_entry *memb_list; struct memb_ring_id my_new_memb_ring_id_list[PROCESSOR_COUNT_MAX]; addr = (const struct srp_addr *)commit_token->end_of_commit_token; memb_list = (struct memb_commit_token_memb_entry *)(addr + commit_token->addr_entries); log_printf (instance->totemsrp_log_level_debug, "entering RECOVERY state."); instance->orf_token_discard = 0; instance->my_high_ring_delivered = 0; sq_reinit (&instance->recovery_sort_queue, SEQNO_START_MSG); cs_queue_reinit (&instance->retrans_message_queue); low_ring_aru = instance->old_ring_state_high_seq_received; memb_state_commit_token_send_recovery (instance, commit_token); instance->my_token_seq = SEQNO_START_TOKEN - 1; /* * Build regular configuration */ totemnet_processor_count_set ( instance->totemnet_context, commit_token->addr_entries); /* * Build transitional configuration */ for (i = 0; i < instance->my_new_memb_entries; i++) { memcpy (&my_new_memb_ring_id_list[i], &memb_list[i].ring_id, sizeof (struct memb_ring_id)); } memb_set_and_with_ring_id ( instance->my_new_memb_list, my_new_memb_ring_id_list, instance->my_new_memb_entries, instance->my_memb_list, instance->my_memb_entries, &instance->my_old_ring_id, instance->my_trans_memb_list, &instance->my_trans_memb_entries); for (i = 0; i < instance->my_trans_memb_entries; i++) { log_printf (instance->totemsrp_log_level_debug, "TRANS [%d] member " CS_PRI_NODE_ID ":", i, instance->my_trans_memb_list[i].nodeid); } for (i = 0; i < instance->my_new_memb_entries; i++) { log_printf (instance->totemsrp_log_level_debug, "position [%d] member " CS_PRI_NODE_ID ":", i, addr[i].nodeid); log_printf (instance->totemsrp_log_level_debug, "previous ringid (" CS_PRI_RING_ID ")", memb_list[i].ring_id.rep, (uint64_t)memb_list[i].ring_id.seq); log_printf (instance->totemsrp_log_level_debug, "aru %x high delivered %x received flag %d", memb_list[i].aru, memb_list[i].high_delivered, memb_list[i].received_flg); // assert (totemip_print (&memb_list[i].ring_id.rep) != 0); } /* * Determine if any received flag is false */ for (i = 0; i < commit_token->addr_entries; i++) { if (memb_set_subset (&instance->my_new_memb_list[i], 1, instance->my_trans_memb_list, instance->my_trans_memb_entries) && memb_list[i].received_flg == 0) { instance->my_deliver_memb_entries = instance->my_trans_memb_entries; memcpy (instance->my_deliver_memb_list, instance->my_trans_memb_list, sizeof (struct srp_addr) * instance->my_trans_memb_entries); local_received_flg = 0; break; } } if (local_received_flg == 1) { goto no_originate; } /* Else originate messages if we should */ /* * Calculate my_low_ring_aru, instance->my_high_ring_delivered for the transitional membership */ for (i = 0; i < commit_token->addr_entries; i++) { if (memb_set_subset (&instance->my_new_memb_list[i], 1, instance->my_deliver_memb_list, instance->my_deliver_memb_entries) && memcmp (&instance->my_old_ring_id, &memb_list[i].ring_id, sizeof (struct memb_ring_id)) == 0) { if (sq_lt_compare (memb_list[i].aru, low_ring_aru)) { low_ring_aru = memb_list[i].aru; } if (sq_lt_compare (instance->my_high_ring_delivered, memb_list[i].high_delivered)) { instance->my_high_ring_delivered = memb_list[i].high_delivered; } } } /* * Copy all old ring messages to instance->retrans_message_queue */ range = instance->old_ring_state_high_seq_received - low_ring_aru; if (range == 0) { /* * No messages to copy */ goto no_originate; } assert (range < QUEUE_RTR_ITEMS_SIZE_MAX); log_printf (instance->totemsrp_log_level_debug, "copying all old ring messages from %x-%x.", low_ring_aru + 1, instance->old_ring_state_high_seq_received); for (i = 1; i <= range; i++) { struct sort_queue_item *sort_queue_item; struct message_item message_item; void *ptr; int res; res = sq_item_get (&instance->regular_sort_queue, low_ring_aru + i, &ptr); if (res != 0) { continue; } sort_queue_item = ptr; messages_originated++; memset (&message_item, 0, sizeof (struct message_item)); // TODO LEAK message_item.mcast = totemsrp_buffer_alloc (instance); assert (message_item.mcast); memset(message_item.mcast, 0, sizeof (struct mcast)); message_item.mcast->header.magic = TOTEM_MH_MAGIC; message_item.mcast->header.version = TOTEM_MH_VERSION; message_item.mcast->header.type = MESSAGE_TYPE_MCAST; message_item.mcast->system_from = instance->my_id; message_item.mcast->header.encapsulated = MESSAGE_ENCAPSULATED; message_item.mcast->header.nodeid = instance->my_id.nodeid; assert (message_item.mcast->header.nodeid); memcpy (&message_item.mcast->ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id)); message_item.msg_len = sort_queue_item->msg_len + sizeof (struct mcast); memcpy (((char *)message_item.mcast) + sizeof (struct mcast), sort_queue_item->mcast, sort_queue_item->msg_len); cs_queue_item_add (&instance->retrans_message_queue, &message_item); } log_printf (instance->totemsrp_log_level_debug, "Originated %d messages in RECOVERY.", messages_originated); goto originated; no_originate: log_printf (instance->totemsrp_log_level_debug, "Did not need to originate any messages in recovery."); originated: instance->my_aru = SEQNO_START_MSG; instance->my_aru_count = 0; instance->my_seq_unchanged = 0; instance->my_high_seq_received = SEQNO_START_MSG; instance->my_install_seq = SEQNO_START_MSG; instance->last_released = SEQNO_START_MSG; reset_token_timeout (instance); // REVIEWED reset_token_retransmit_timeout (instance); // REVIEWED instance->memb_state = MEMB_STATE_RECOVERY; instance->stats.recovery_entered++; instance->stats.continuous_gather = 0; return; } void totemsrp_event_signal (void *srp_context, enum totem_event_type type, int value) { struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context; token_hold_cancel_send (instance); return; } int totemsrp_mcast ( void *srp_context, struct iovec *iovec, unsigned int iov_len, int guarantee) { struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context; int i; struct message_item message_item; char *addr; unsigned int addr_idx; struct cs_queue *queue_use; if (instance->waiting_trans_ack) { queue_use = &instance->new_message_queue_trans; } else { queue_use = &instance->new_message_queue; } if (cs_queue_is_full (queue_use)) { log_printf (instance->totemsrp_log_level_debug, "queue full"); return (-1); } memset (&message_item, 0, sizeof (struct message_item)); /* * Allocate pending item */ message_item.mcast = totemsrp_buffer_alloc (instance); if (message_item.mcast == 0) { goto error_mcast; } /* * Set mcast header */ memset(message_item.mcast, 0, sizeof (struct mcast)); message_item.mcast->header.magic = TOTEM_MH_MAGIC; message_item.mcast->header.version = TOTEM_MH_VERSION; message_item.mcast->header.type = MESSAGE_TYPE_MCAST; message_item.mcast->header.encapsulated = MESSAGE_NOT_ENCAPSULATED; message_item.mcast->header.nodeid = instance->my_id.nodeid; assert (message_item.mcast->header.nodeid); message_item.mcast->guarantee = guarantee; message_item.mcast->system_from = instance->my_id; addr = (char *)message_item.mcast; addr_idx = sizeof (struct mcast); for (i = 0; i < iov_len; i++) { memcpy (&addr[addr_idx], iovec[i].iov_base, iovec[i].iov_len); addr_idx += iovec[i].iov_len; } message_item.msg_len = addr_idx; log_printf (instance->totemsrp_log_level_trace, "mcasted message added to pending queue"); instance->stats.mcast_tx++; cs_queue_item_add (queue_use, &message_item); return (0); error_mcast: return (-1); } /* * Determine if there is room to queue a new message */ int totemsrp_avail (void *srp_context) { struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context; int avail; struct cs_queue *queue_use; if (instance->waiting_trans_ack) { queue_use = &instance->new_message_queue_trans; } else { queue_use = &instance->new_message_queue; } cs_queue_avail (queue_use, &avail); return (avail); } /* * ORF Token Management */ /* * Recast message to mcast group if it is available */ static int orf_token_remcast ( struct totemsrp_instance *instance, int seq) { struct sort_queue_item *sort_queue_item; int res; void *ptr; struct sq *sort_queue; if (instance->memb_state == MEMB_STATE_RECOVERY) { sort_queue = &instance->recovery_sort_queue; } else { sort_queue = &instance->regular_sort_queue; } res = sq_in_range (sort_queue, seq); if (res == 0) { log_printf (instance->totemsrp_log_level_debug, "sq not in range"); return (-1); } /* * Get RTR item at seq, if not available, return */ res = sq_item_get (sort_queue, seq, &ptr); if (res != 0) { return -1; } sort_queue_item = ptr; totemnet_mcast_noflush_send ( instance->totemnet_context, sort_queue_item->mcast, sort_queue_item->msg_len); return (0); } /* * Free all freeable messages from ring */ static void messages_free ( struct totemsrp_instance *instance, unsigned int token_aru) { struct sort_queue_item *regular_message; unsigned int i; int res; int log_release = 0; unsigned int release_to; unsigned int range = 0; release_to = token_aru; if (sq_lt_compare (instance->my_last_aru, release_to)) { release_to = instance->my_last_aru; } if (sq_lt_compare (instance->my_high_delivered, release_to)) { release_to = instance->my_high_delivered; } /* * Ensure we dont try release before an already released point */ if (sq_lt_compare (release_to, instance->last_released)) { return; } range = release_to - instance->last_released; assert (range < QUEUE_RTR_ITEMS_SIZE_MAX); /* * Release retransmit list items if group aru indicates they are transmitted */ for (i = 1; i <= range; i++) { void *ptr; res = sq_item_get (&instance->regular_sort_queue, instance->last_released + i, &ptr); if (res == 0) { regular_message = ptr; totemsrp_buffer_release (instance, regular_message->mcast); } sq_items_release (&instance->regular_sort_queue, instance->last_released + i); log_release = 1; } instance->last_released += range; if (log_release) { log_printf (instance->totemsrp_log_level_trace, "releasing messages up to and including %x", release_to); } } static void update_aru ( struct totemsrp_instance *instance) { unsigned int i; int res; struct sq *sort_queue; unsigned int range; unsigned int my_aru_saved = 0; if (instance->memb_state == MEMB_STATE_RECOVERY) { sort_queue = &instance->recovery_sort_queue; } else { sort_queue = &instance->regular_sort_queue; } range = instance->my_high_seq_received - instance->my_aru; my_aru_saved = instance->my_aru; for (i = 1; i <= range; i++) { void *ptr; res = sq_item_get (sort_queue, my_aru_saved + i, &ptr); /* * If hole, stop updating aru */ if (res != 0) { break; } } instance->my_aru += i - 1; } /* * Multicasts pending messages onto the ring (requires orf_token possession) */ static int orf_token_mcast ( struct totemsrp_instance *instance, struct orf_token *token, int fcc_mcasts_allowed) { struct message_item *message_item = 0; struct cs_queue *mcast_queue; struct sq *sort_queue; struct sort_queue_item sort_queue_item; struct mcast *mcast; unsigned int fcc_mcast_current; if (instance->memb_state == MEMB_STATE_RECOVERY) { mcast_queue = &instance->retrans_message_queue; sort_queue = &instance->recovery_sort_queue; reset_token_retransmit_timeout (instance); // REVIEWED } else { if (instance->waiting_trans_ack) { mcast_queue = &instance->new_message_queue_trans; } else { mcast_queue = &instance->new_message_queue; } sort_queue = &instance->regular_sort_queue; } for (fcc_mcast_current = 0; fcc_mcast_current < fcc_mcasts_allowed; fcc_mcast_current++) { if (cs_queue_is_empty (mcast_queue)) { break; } message_item = (struct message_item *)cs_queue_item_get (mcast_queue); message_item->mcast->seq = ++token->seq; message_item->mcast->this_seqno = instance->global_seqno++; /* * Build IO vector */ memset (&sort_queue_item, 0, sizeof (struct sort_queue_item)); sort_queue_item.mcast = message_item->mcast; sort_queue_item.msg_len = message_item->msg_len; mcast = sort_queue_item.mcast; memcpy (&mcast->ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id)); /* * Add message to retransmit queue */ sq_item_add (sort_queue, &sort_queue_item, message_item->mcast->seq); totemnet_mcast_noflush_send ( instance->totemnet_context, message_item->mcast, message_item->msg_len); /* * Delete item from pending queue */ cs_queue_item_remove (mcast_queue); /* * If messages mcasted, deliver any new messages to totempg */ instance->my_high_seq_received = token->seq; } update_aru (instance); /* * Return 1 if more messages are available for single node clusters */ return (fcc_mcast_current); } /* * Remulticasts messages in orf_token's retransmit list (requires orf_token) * Modify's orf_token's rtr to include retransmits required by this process */ static int orf_token_rtr ( struct totemsrp_instance *instance, struct orf_token *orf_token, unsigned int *fcc_allowed) { unsigned int res; unsigned int i, j; unsigned int found; struct sq *sort_queue; struct rtr_item *rtr_list; unsigned int range = 0; char retransmit_msg[1024]; char value[64]; if (instance->memb_state == MEMB_STATE_RECOVERY) { sort_queue = &instance->recovery_sort_queue; } else { sort_queue = &instance->regular_sort_queue; } rtr_list = &orf_token->rtr_list[0]; strcpy (retransmit_msg, "Retransmit List: "); if (orf_token->rtr_list_entries) { log_printf (instance->totemsrp_log_level_debug, "Retransmit List %d", orf_token->rtr_list_entries); for (i = 0; i < orf_token->rtr_list_entries; i++) { sprintf (value, "%x ", rtr_list[i].seq); strcat (retransmit_msg, value); } strcat (retransmit_msg, ""); log_printf (instance->totemsrp_log_level_notice, "%s", retransmit_msg); } /* * Retransmit messages on orf_token's RTR list from RTR queue */ for (instance->fcc_remcast_current = 0, i = 0; instance->fcc_remcast_current < *fcc_allowed && i < orf_token->rtr_list_entries;) { /* * If this retransmit request isn't from this configuration, * try next rtr entry */ if (memcmp (&rtr_list[i].ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id)) != 0) { i += 1; continue; } res = orf_token_remcast (instance, rtr_list[i].seq); if (res == 0) { /* * Multicasted message, so no need to copy to new retransmit list */ orf_token->rtr_list_entries -= 1; assert (orf_token->rtr_list_entries >= 0); memmove (&rtr_list[i], &rtr_list[i + 1], sizeof (struct rtr_item) * (orf_token->rtr_list_entries - i)); instance->stats.mcast_retx++; instance->fcc_remcast_current++; } else { i += 1; } } *fcc_allowed = *fcc_allowed - instance->fcc_remcast_current; /* * Add messages to retransmit to RTR list * but only retry if there is room in the retransmit list */ range = orf_token->seq - instance->my_aru; assert (range < QUEUE_RTR_ITEMS_SIZE_MAX); for (i = 1; (orf_token->rtr_list_entries < RETRANSMIT_ENTRIES_MAX) && (i <= range); i++) { /* * Ensure message is within the sort queue range */ res = sq_in_range (sort_queue, instance->my_aru + i); if (res == 0) { break; } /* * Find if a message is missing from this processor */ res = sq_item_inuse (sort_queue, instance->my_aru + i); if (res == 0) { /* * Determine how many times we have missed receiving * this sequence number. sq_item_miss_count increments * a counter for the sequence number. The miss count * will be returned and compared. This allows time for * delayed multicast messages to be received before * declaring the message is missing and requesting a * retransmit. */ res = sq_item_miss_count (sort_queue, instance->my_aru + i); if (res < instance->totem_config->miss_count_const) { continue; } /* * Determine if missing message is already in retransmit list */ found = 0; for (j = 0; j < orf_token->rtr_list_entries; j++) { if (instance->my_aru + i == rtr_list[j].seq) { found = 1; } } if (found == 0) { /* * Missing message not found in current retransmit list so add it */ memcpy (&rtr_list[orf_token->rtr_list_entries].ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id)); rtr_list[orf_token->rtr_list_entries].seq = instance->my_aru + i; orf_token->rtr_list_entries++; } } } return (instance->fcc_remcast_current); } static void token_retransmit (struct totemsrp_instance *instance) { totemnet_token_send (instance->totemnet_context, instance->orf_token_retransmit, instance->orf_token_retransmit_size); } /* * Retransmit the regular token if no mcast or token has * been received in retransmit token period retransmit * the token to the next processor */ static void timer_function_token_retransmit_timeout (void *data) { struct totemsrp_instance *instance = data; switch (instance->memb_state) { case MEMB_STATE_GATHER: break; case MEMB_STATE_COMMIT: case MEMB_STATE_OPERATIONAL: case MEMB_STATE_RECOVERY: token_retransmit (instance); reset_token_retransmit_timeout (instance); // REVIEWED break; } } static void timer_function_token_hold_retransmit_timeout (void *data) { struct totemsrp_instance *instance = data; switch (instance->memb_state) { case MEMB_STATE_GATHER: break; case MEMB_STATE_COMMIT: break; case MEMB_STATE_OPERATIONAL: case MEMB_STATE_RECOVERY: token_retransmit (instance); break; } } static void timer_function_merge_detect_timeout(void *data) { struct totemsrp_instance *instance = data; instance->my_merge_detect_timeout_outstanding = 0; switch (instance->memb_state) { case MEMB_STATE_OPERATIONAL: if (instance->my_ring_id.rep == instance->my_id.nodeid) { memb_merge_detect_transmit (instance); } break; case MEMB_STATE_GATHER: case MEMB_STATE_COMMIT: case MEMB_STATE_RECOVERY: break; } } /* * Send orf_token to next member (requires orf_token) */ static int token_send ( struct totemsrp_instance *instance, struct orf_token *orf_token, int forward_token) { int res = 0; unsigned int orf_token_size; orf_token_size = sizeof (struct orf_token) + (orf_token->rtr_list_entries * sizeof (struct rtr_item)); orf_token->header.nodeid = instance->my_id.nodeid; memcpy (instance->orf_token_retransmit, orf_token, orf_token_size); instance->orf_token_retransmit_size = orf_token_size; assert (orf_token->header.nodeid); if (forward_token == 0) { return (0); } totemnet_token_send (instance->totemnet_context, orf_token, orf_token_size); return (res); } static int token_hold_cancel_send (struct totemsrp_instance *instance) { struct token_hold_cancel token_hold_cancel; /* * Only cancel if the token is currently held */ if (instance->my_token_held == 0) { return (0); } instance->my_token_held = 0; /* * Build message */ token_hold_cancel.header.magic = TOTEM_MH_MAGIC; token_hold_cancel.header.version = TOTEM_MH_VERSION; token_hold_cancel.header.type = MESSAGE_TYPE_TOKEN_HOLD_CANCEL; token_hold_cancel.header.encapsulated = 0; token_hold_cancel.header.nodeid = instance->my_id.nodeid; memcpy (&token_hold_cancel.ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id)); assert (token_hold_cancel.header.nodeid); instance->stats.token_hold_cancel_tx++; totemnet_mcast_flush_send (instance->totemnet_context, &token_hold_cancel, sizeof (struct token_hold_cancel)); return (0); } static int orf_token_send_initial (struct totemsrp_instance *instance) { struct orf_token orf_token; int res; orf_token.header.magic = TOTEM_MH_MAGIC; orf_token.header.version = TOTEM_MH_VERSION; orf_token.header.type = MESSAGE_TYPE_ORF_TOKEN; orf_token.header.encapsulated = 0; orf_token.header.nodeid = instance->my_id.nodeid; assert (orf_token.header.nodeid); orf_token.seq = SEQNO_START_MSG; orf_token.token_seq = SEQNO_START_TOKEN; orf_token.retrans_flg = 1; instance->my_set_retrans_flg = 1; instance->stats.orf_token_tx++; if (cs_queue_is_empty (&instance->retrans_message_queue) == 1) { orf_token.retrans_flg = 0; instance->my_set_retrans_flg = 0; } else { orf_token.retrans_flg = 1; instance->my_set_retrans_flg = 1; } orf_token.aru = 0; orf_token.aru = SEQNO_START_MSG - 1; orf_token.aru_addr = instance->my_id.nodeid; memcpy (&orf_token.ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id)); orf_token.fcc = 0; orf_token.backlog = 0; orf_token.rtr_list_entries = 0; res = token_send (instance, &orf_token, 1); return (res); } static void memb_state_commit_token_update ( struct totemsrp_instance *instance) { struct srp_addr *addr; struct memb_commit_token_memb_entry *memb_list; unsigned int high_aru; unsigned int i; addr = (struct srp_addr *)instance->commit_token->end_of_commit_token; memb_list = (struct memb_commit_token_memb_entry *)(addr + instance->commit_token->addr_entries); memcpy (instance->my_new_memb_list, addr, sizeof (struct srp_addr) * instance->commit_token->addr_entries); instance->my_new_memb_entries = instance->commit_token->addr_entries; memcpy (&memb_list[instance->commit_token->memb_index].ring_id, &instance->my_old_ring_id, sizeof (struct memb_ring_id)); memb_list[instance->commit_token->memb_index].aru = instance->old_ring_state_aru; /* * TODO high delivered is really instance->my_aru, but with safe this * could change? */ instance->my_received_flg = (instance->my_aru == instance->my_high_seq_received); memb_list[instance->commit_token->memb_index].received_flg = instance->my_received_flg; memb_list[instance->commit_token->memb_index].high_delivered = instance->my_high_delivered; /* * find high aru up to current memb_index for all matching ring ids * if any ring id matching memb_index has aru less then high aru set * received flag for that entry to false */ high_aru = memb_list[instance->commit_token->memb_index].aru; for (i = 0; i <= instance->commit_token->memb_index; i++) { if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id, &memb_list[i].ring_id, sizeof (struct memb_ring_id)) == 0) { if (sq_lt_compare (high_aru, memb_list[i].aru)) { high_aru = memb_list[i].aru; } } } for (i = 0; i <= instance->commit_token->memb_index; i++) { if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id, &memb_list[i].ring_id, sizeof (struct memb_ring_id)) == 0) { if (sq_lt_compare (memb_list[i].aru, high_aru)) { memb_list[i].received_flg = 0; if (i == instance->commit_token->memb_index) { instance->my_received_flg = 0; } } } } instance->commit_token->header.nodeid = instance->my_id.nodeid; instance->commit_token->memb_index += 1; assert (instance->commit_token->memb_index <= instance->commit_token->addr_entries); assert (instance->commit_token->header.nodeid); } static void memb_state_commit_token_target_set ( struct totemsrp_instance *instance) { struct srp_addr *addr; addr = (struct srp_addr *)instance->commit_token->end_of_commit_token; /* Totemnet just looks at the node id */ totemnet_token_target_set ( instance->totemnet_context, addr[instance->commit_token->memb_index % instance->commit_token->addr_entries].nodeid); } static int memb_state_commit_token_send_recovery ( struct totemsrp_instance *instance, struct memb_commit_token *commit_token) { unsigned int commit_token_size; commit_token->token_seq++; commit_token->header.nodeid = instance->my_id.nodeid; commit_token_size = sizeof (struct memb_commit_token) + ((sizeof (struct srp_addr) + sizeof (struct memb_commit_token_memb_entry)) * commit_token->addr_entries); /* * Make a copy for retransmission if necessary */ memcpy (instance->orf_token_retransmit, commit_token, commit_token_size); instance->orf_token_retransmit_size = commit_token_size; instance->stats.memb_commit_token_tx++; totemnet_token_send (instance->totemnet_context, commit_token, commit_token_size); /* * Request retransmission of the commit token in case it is lost */ reset_token_retransmit_timeout (instance); return (0); } static int memb_state_commit_token_send ( struct totemsrp_instance *instance) { unsigned int commit_token_size; instance->commit_token->token_seq++; instance->commit_token->header.nodeid = instance->my_id.nodeid; commit_token_size = sizeof (struct memb_commit_token) + ((sizeof (struct srp_addr) + sizeof (struct memb_commit_token_memb_entry)) * instance->commit_token->addr_entries); /* * Make a copy for retransmission if necessary */ memcpy (instance->orf_token_retransmit, instance->commit_token, commit_token_size); instance->orf_token_retransmit_size = commit_token_size; instance->stats.memb_commit_token_tx++; totemnet_token_send (instance->totemnet_context, instance->commit_token, commit_token_size); /* * Request retransmission of the commit token in case it is lost */ reset_token_retransmit_timeout (instance); return (0); } static int memb_lowest_in_config (struct totemsrp_instance *instance) { struct srp_addr token_memb[PROCESSOR_COUNT_MAX]; int token_memb_entries = 0; int i; unsigned int lowest_nodeid; memb_set_subtract (token_memb, &token_memb_entries, instance->my_proc_list, instance->my_proc_list_entries, instance->my_failed_list, instance->my_failed_list_entries); /* * find representative by searching for smallest identifier */ assert(token_memb_entries > 0); lowest_nodeid = token_memb[0].nodeid; for (i = 1; i < token_memb_entries; i++) { if (lowest_nodeid > token_memb[i].nodeid) { lowest_nodeid = token_memb[i].nodeid; } } return (lowest_nodeid == instance->my_id.nodeid); } static int srp_addr_compare (const void *a, const void *b) { const struct srp_addr *srp_a = (const struct srp_addr *)a; const struct srp_addr *srp_b = (const struct srp_addr *)b; if (srp_a->nodeid < srp_b->nodeid) { return -1; } else if (srp_a->nodeid > srp_b->nodeid) { return 1; } else { return 0; } } static void memb_state_commit_token_create ( struct totemsrp_instance *instance) { struct srp_addr token_memb[PROCESSOR_COUNT_MAX]; struct srp_addr *addr; struct memb_commit_token_memb_entry *memb_list; int token_memb_entries = 0; log_printf (instance->totemsrp_log_level_debug, "Creating commit token because I am the rep."); memb_set_subtract (token_memb, &token_memb_entries, instance->my_proc_list, instance->my_proc_list_entries, instance->my_failed_list, instance->my_failed_list_entries); memset (instance->commit_token, 0, sizeof (struct memb_commit_token)); instance->commit_token->header.magic = TOTEM_MH_MAGIC; instance->commit_token->header.version = TOTEM_MH_VERSION; instance->commit_token->header.type = MESSAGE_TYPE_MEMB_COMMIT_TOKEN; instance->commit_token->header.encapsulated = 0; instance->commit_token->header.nodeid = instance->my_id.nodeid; assert (instance->commit_token->header.nodeid); instance->commit_token->ring_id.rep = instance->my_id.nodeid; instance->commit_token->ring_id.seq = instance->token_ring_id_seq + 4; /* * This qsort is necessary to ensure the commit token traverses * the ring in the proper order */ qsort (token_memb, token_memb_entries, sizeof (struct srp_addr), srp_addr_compare); instance->commit_token->memb_index = 0; instance->commit_token->addr_entries = token_memb_entries; addr = (struct srp_addr *)instance->commit_token->end_of_commit_token; memb_list = (struct memb_commit_token_memb_entry *)(addr + instance->commit_token->addr_entries); memcpy (addr, token_memb, token_memb_entries * sizeof (struct srp_addr)); memset (memb_list, 0, sizeof (struct memb_commit_token_memb_entry) * token_memb_entries); } static void memb_join_message_send (struct totemsrp_instance *instance) { char memb_join_data[40000]; struct memb_join *memb_join = (struct memb_join *)memb_join_data; char *addr; unsigned int addr_idx; size_t msg_len; memb_join->header.magic = TOTEM_MH_MAGIC; memb_join->header.version = TOTEM_MH_VERSION; memb_join->header.type = MESSAGE_TYPE_MEMB_JOIN; memb_join->header.encapsulated = 0; memb_join->header.nodeid = instance->my_id.nodeid; assert (memb_join->header.nodeid); msg_len = sizeof(struct memb_join) + ((instance->my_proc_list_entries + instance->my_failed_list_entries) * sizeof(struct srp_addr)); if (msg_len > sizeof(memb_join_data)) { log_printf (instance->totemsrp_log_level_error, "memb_join_message too long. Ignoring message."); return ; } memb_join->ring_seq = instance->my_ring_id.seq; memb_join->proc_list_entries = instance->my_proc_list_entries; memb_join->failed_list_entries = instance->my_failed_list_entries; memb_join->system_from = instance->my_id; /* * This mess adds the joined and failed processor lists into the join * message */ addr = (char *)memb_join; addr_idx = sizeof (struct memb_join); memcpy (&addr[addr_idx], instance->my_proc_list, instance->my_proc_list_entries * sizeof (struct srp_addr)); addr_idx += instance->my_proc_list_entries * sizeof (struct srp_addr); memcpy (&addr[addr_idx], instance->my_failed_list, instance->my_failed_list_entries * sizeof (struct srp_addr)); addr_idx += instance->my_failed_list_entries * sizeof (struct srp_addr); if (instance->totem_config->send_join_timeout) { usleep (random() % (instance->totem_config->send_join_timeout * 1000)); } instance->stats.memb_join_tx++; totemnet_mcast_flush_send ( instance->totemnet_context, memb_join, addr_idx); } static void memb_leave_message_send (struct totemsrp_instance *instance) { char memb_join_data[40000]; struct memb_join *memb_join = (struct memb_join *)memb_join_data; char *addr; unsigned int addr_idx; int active_memb_entries; struct srp_addr active_memb[PROCESSOR_COUNT_MAX]; size_t msg_len; log_printf (instance->totemsrp_log_level_debug, "sending join/leave message"); /* * add us to the failed list, and remove us from * the members list */ memb_set_merge( &instance->my_id, 1, instance->my_failed_list, &instance->my_failed_list_entries); memb_set_subtract (active_memb, &active_memb_entries, instance->my_proc_list, instance->my_proc_list_entries, &instance->my_id, 1); msg_len = sizeof(struct memb_join) + ((active_memb_entries + instance->my_failed_list_entries) * sizeof(struct srp_addr)); if (msg_len > sizeof(memb_join_data)) { log_printf (instance->totemsrp_log_level_error, "memb_leave message too long. Ignoring message."); return ; } memb_join->header.magic = TOTEM_MH_MAGIC; memb_join->header.version = TOTEM_MH_VERSION; memb_join->header.type = MESSAGE_TYPE_MEMB_JOIN; memb_join->header.encapsulated = 0; memb_join->header.nodeid = LEAVE_DUMMY_NODEID; memb_join->ring_seq = instance->my_ring_id.seq; memb_join->proc_list_entries = active_memb_entries; memb_join->failed_list_entries = instance->my_failed_list_entries; memb_join->system_from = instance->my_id; // TODO: CC Maybe use the actual join send routine. /* * This mess adds the joined and failed processor lists into the join * message */ addr = (char *)memb_join; addr_idx = sizeof (struct memb_join); memcpy (&addr[addr_idx], active_memb, active_memb_entries * sizeof (struct srp_addr)); addr_idx += active_memb_entries * sizeof (struct srp_addr); memcpy (&addr[addr_idx], instance->my_failed_list, instance->my_failed_list_entries * sizeof (struct srp_addr)); addr_idx += instance->my_failed_list_entries * sizeof (struct srp_addr); if (instance->totem_config->send_join_timeout) { usleep (random() % (instance->totem_config->send_join_timeout * 1000)); } instance->stats.memb_join_tx++; totemnet_mcast_flush_send ( instance->totemnet_context, memb_join, addr_idx); } static void memb_merge_detect_transmit (struct totemsrp_instance *instance) { struct memb_merge_detect memb_merge_detect; memb_merge_detect.header.magic = TOTEM_MH_MAGIC; memb_merge_detect.header.version = TOTEM_MH_VERSION; memb_merge_detect.header.type = MESSAGE_TYPE_MEMB_MERGE_DETECT; memb_merge_detect.header.encapsulated = 0; memb_merge_detect.header.nodeid = instance->my_id.nodeid; memb_merge_detect.system_from = instance->my_id; memcpy (&memb_merge_detect.ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id)); assert (memb_merge_detect.header.nodeid); instance->stats.memb_merge_detect_tx++; totemnet_mcast_flush_send (instance->totemnet_context, &memb_merge_detect, sizeof (struct memb_merge_detect)); } static void memb_ring_id_set ( struct totemsrp_instance *instance, const struct memb_ring_id *ring_id) { memcpy (&instance->my_ring_id, ring_id, sizeof (struct memb_ring_id)); } int totemsrp_callback_token_create ( void *srp_context, void **handle_out, enum totem_callback_token_type type, int delete, int (*callback_fn) (enum totem_callback_token_type type, const void *), const void *data) { struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context; struct token_callback_instance *callback_handle; token_hold_cancel_send (instance); callback_handle = malloc (sizeof (struct token_callback_instance)); if (callback_handle == 0) { return (-1); } *handle_out = (void *)callback_handle; qb_list_init (&callback_handle->list); callback_handle->callback_fn = callback_fn; callback_handle->data = (void *) data; callback_handle->callback_type = type; callback_handle->delete = delete; switch (type) { case TOTEM_CALLBACK_TOKEN_RECEIVED: qb_list_add (&callback_handle->list, &instance->token_callback_received_listhead); break; case TOTEM_CALLBACK_TOKEN_SENT: qb_list_add (&callback_handle->list, &instance->token_callback_sent_listhead); break; } return (0); } void totemsrp_callback_token_destroy (void *srp_context, void **handle_out) { struct token_callback_instance *h; if (*handle_out) { h = (struct token_callback_instance *)*handle_out; qb_list_del (&h->list); free (h); h = NULL; *handle_out = 0; } } static void token_callbacks_execute ( struct totemsrp_instance *instance, enum totem_callback_token_type type) { struct qb_list_head *list, *tmp_iter; struct qb_list_head *callback_listhead = 0; struct token_callback_instance *token_callback_instance; int res; int del; switch (type) { case TOTEM_CALLBACK_TOKEN_RECEIVED: callback_listhead = &instance->token_callback_received_listhead; break; case TOTEM_CALLBACK_TOKEN_SENT: callback_listhead = &instance->token_callback_sent_listhead; break; default: assert (0); } qb_list_for_each_safe(list, tmp_iter, callback_listhead) { token_callback_instance = qb_list_entry (list, struct token_callback_instance, list); del = token_callback_instance->delete; if (del == 1) { qb_list_del (list); } res = token_callback_instance->callback_fn ( token_callback_instance->callback_type, token_callback_instance->data); /* * This callback failed to execute, try it again on the next token */ if (res == -1 && del == 1) { qb_list_add (list, callback_listhead); } else if (del) { free (token_callback_instance); } } } /* * Flow control functions */ static unsigned int backlog_get (struct totemsrp_instance *instance) { unsigned int backlog = 0; struct cs_queue *queue_use = NULL; if (instance->memb_state == MEMB_STATE_OPERATIONAL) { if (instance->waiting_trans_ack) { queue_use = &instance->new_message_queue_trans; } else { queue_use = &instance->new_message_queue; } } else if (instance->memb_state == MEMB_STATE_RECOVERY) { queue_use = &instance->retrans_message_queue; } if (queue_use != NULL) { backlog = cs_queue_used (queue_use); } instance->stats.token[instance->stats.latest_token].backlog_calc = backlog; return (backlog); } static int fcc_calculate ( struct totemsrp_instance *instance, struct orf_token *token) { unsigned int transmits_allowed; unsigned int backlog_calc; transmits_allowed = instance->totem_config->max_messages; if (transmits_allowed > instance->totem_config->window_size - token->fcc) { transmits_allowed = instance->totem_config->window_size - token->fcc; } instance->my_cbl = backlog_get (instance); /* * Only do backlog calculation if there is a backlog otherwise * we would result in div by zero */ if (token->backlog + instance->my_cbl - instance->my_pbl) { backlog_calc = (instance->totem_config->window_size * instance->my_pbl) / (token->backlog + instance->my_cbl - instance->my_pbl); if (backlog_calc > 0 && transmits_allowed > backlog_calc) { transmits_allowed = backlog_calc; } } return (transmits_allowed); } /* * don't overflow the RTR sort queue */ static void fcc_rtr_limit ( struct totemsrp_instance *instance, struct orf_token *token, unsigned int *transmits_allowed) { int check = QUEUE_RTR_ITEMS_SIZE_MAX; check -= (*transmits_allowed + instance->totem_config->window_size); assert (check >= 0); if (sq_lt_compare (instance->last_released + QUEUE_RTR_ITEMS_SIZE_MAX - *transmits_allowed - instance->totem_config->window_size, token->seq)) { *transmits_allowed = 0; } } static void fcc_token_update ( struct totemsrp_instance *instance, struct orf_token *token, unsigned int msgs_transmitted) { token->fcc += msgs_transmitted - instance->my_trc; token->backlog += instance->my_cbl - instance->my_pbl; instance->my_trc = msgs_transmitted; instance->my_pbl = instance->my_cbl; } /* * Sanity checkers */ static int check_orf_token_sanity( const struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed) { int rtr_entries; const struct orf_token *token = (const struct orf_token *)msg; size_t required_len; if (msg_len < sizeof(struct orf_token)) { log_printf (instance->totemsrp_log_level_security, "Received orf_token message is too short... ignoring."); return (-1); } if (endian_conversion_needed) { rtr_entries = swab32(token->rtr_list_entries); } else { rtr_entries = token->rtr_list_entries; } required_len = sizeof(struct orf_token) + rtr_entries * sizeof(struct rtr_item); if (msg_len < required_len) { log_printf (instance->totemsrp_log_level_security, "Received orf_token message is too short... ignoring."); return (-1); } return (0); } static int check_mcast_sanity( struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed) { if (msg_len < sizeof(struct mcast)) { log_printf (instance->totemsrp_log_level_security, "Received mcast message is too short... ignoring."); return (-1); } return (0); } static int check_memb_merge_detect_sanity( struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed) { if (msg_len < sizeof(struct memb_merge_detect)) { log_printf (instance->totemsrp_log_level_security, "Received memb_merge_detect message is too short... ignoring."); return (-1); } return (0); } static int check_memb_join_sanity( struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed) { const struct memb_join *mj_msg = (const struct memb_join *)msg; unsigned int proc_list_entries; unsigned int failed_list_entries; size_t required_len; if (msg_len < sizeof(struct memb_join)) { log_printf (instance->totemsrp_log_level_security, "Received memb_join message is too short... ignoring."); return (-1); } proc_list_entries = mj_msg->proc_list_entries; failed_list_entries = mj_msg->failed_list_entries; if (endian_conversion_needed) { proc_list_entries = swab32(proc_list_entries); failed_list_entries = swab32(failed_list_entries); } required_len = sizeof(struct memb_join) + ((proc_list_entries + failed_list_entries) * sizeof(struct srp_addr)); if (msg_len < required_len) { log_printf (instance->totemsrp_log_level_security, "Received memb_join message is too short... ignoring."); return (-1); } return (0); } static int check_memb_commit_token_sanity( struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed) { const struct memb_commit_token *mct_msg = (const struct memb_commit_token *)msg; unsigned int addr_entries; size_t required_len; if (msg_len < sizeof(struct memb_commit_token)) { log_printf (instance->totemsrp_log_level_security, "Received memb_commit_token message is too short... ignoring."); return (0); } addr_entries= mct_msg->addr_entries; if (endian_conversion_needed) { addr_entries = swab32(addr_entries); } required_len = sizeof(struct memb_commit_token) + (addr_entries * (sizeof(struct srp_addr) + sizeof(struct memb_commit_token_memb_entry))); if (msg_len < required_len) { log_printf (instance->totemsrp_log_level_security, "Received memb_commit_token message is too short... ignoring."); return (-1); } return (0); } static int check_token_hold_cancel_sanity( struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed) { if (msg_len < sizeof(struct token_hold_cancel)) { log_printf (instance->totemsrp_log_level_security, "Received token_hold_cancel message is too short... ignoring."); return (-1); } return (0); } /* * Message Handlers */ unsigned long long int tv_old; /* * message handler called when TOKEN message type received */ static int message_handler_orf_token ( struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed) { char token_storage[1500]; char token_convert[1500]; struct orf_token *token = NULL; int forward_token; unsigned int transmits_allowed; unsigned int mcasted_retransmit; unsigned int mcasted_regular; unsigned int last_aru; #ifdef GIVEINFO unsigned long long tv_current; unsigned long long tv_diff; tv_current = qb_util_nano_current_get (); tv_diff = tv_current - tv_old; tv_old = tv_current; log_printf (instance->totemsrp_log_level_debug, "Time since last token %0.4f ms", ((float)tv_diff) / 1000000.0); #endif if (check_orf_token_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) { return (0); } if (instance->orf_token_discard) { return (0); } #ifdef TEST_DROP_ORF_TOKEN_PERCENTAGE if (random()%100 < TEST_DROP_ORF_TOKEN_PERCENTAGE) { return (0); } #endif if (endian_conversion_needed) { orf_token_endian_convert ((struct orf_token *)msg, (struct orf_token *)token_convert); msg = (struct orf_token *)token_convert; } /* * Make copy of token and retransmit list in case we have * to flush incoming messages from the kernel queue */ token = (struct orf_token *)token_storage; memcpy (token, msg, sizeof (struct orf_token)); memcpy (&token->rtr_list[0], (char *)msg + sizeof (struct orf_token), sizeof (struct rtr_item) * RETRANSMIT_ENTRIES_MAX); /* * Handle merge detection timeout */ if (token->seq == instance->my_last_seq) { start_merge_detect_timeout (instance); instance->my_seq_unchanged += 1; } else { cancel_merge_detect_timeout (instance); cancel_token_hold_retransmit_timeout (instance); instance->my_seq_unchanged = 0; } instance->my_last_seq = token->seq; #ifdef TEST_RECOVERY_MSG_COUNT if (instance->memb_state == MEMB_STATE_OPERATIONAL && token->seq > TEST_RECOVERY_MSG_COUNT) { return (0); } #endif instance->flushing = 1; totemnet_recv_flush (instance->totemnet_context); instance->flushing = 0; /* * Determine if we should hold (in reality drop) the token */ instance->my_token_held = 0; if (instance->my_ring_id.rep == instance->my_id.nodeid && instance->my_seq_unchanged > instance->totem_config->seqno_unchanged_const) { instance->my_token_held = 1; } else { if (instance->my_ring_id.rep != instance->my_id.nodeid && instance->my_seq_unchanged >= instance->totem_config->seqno_unchanged_const) { instance->my_token_held = 1; } } /* * Hold onto token when there is no activity on ring and * this processor is the ring rep */ forward_token = 1; if (instance->my_ring_id.rep == instance->my_id.nodeid) { if (instance->my_token_held) { forward_token = 0; } } switch (instance->memb_state) { case MEMB_STATE_COMMIT: /* Discard token */ break; case MEMB_STATE_OPERATIONAL: messages_free (instance, token->aru); /* * Do NOT add break, this case should also execute code in gather case. */ case MEMB_STATE_GATHER: /* * DO NOT add break, we use different free mechanism in recovery state */ case MEMB_STATE_RECOVERY: /* * Discard tokens from another configuration */ if (memcmp (&token->ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id)) != 0) { if ((forward_token) && instance->use_heartbeat) { reset_heartbeat_timeout(instance); } else { cancel_heartbeat_timeout(instance); } return (0); /* discard token */ } /* * Discard retransmitted tokens */ if (sq_lte_compare (token->token_seq, instance->my_token_seq)) { return (0); /* discard token */ } /* * Token is valid so trigger callbacks */ token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_RECEIVED); last_aru = instance->my_last_aru; instance->my_last_aru = token->aru; transmits_allowed = fcc_calculate (instance, token); mcasted_retransmit = orf_token_rtr (instance, token, &transmits_allowed); - if (instance->my_token_held == 1 && - (token->rtr_list_entries > 0 || mcasted_retransmit > 0)) { + if (instance->totem_config->cancel_token_hold_on_retransmit && + instance->my_token_held == 1 && + (token->rtr_list_entries > 0 || mcasted_retransmit > 0)) { instance->my_token_held = 0; forward_token = 1; } fcc_rtr_limit (instance, token, &transmits_allowed); mcasted_regular = orf_token_mcast (instance, token, transmits_allowed); /* if (mcasted_regular) { printf ("mcasted regular %d\n", mcasted_regular); printf ("token seq %d\n", token->seq); } */ fcc_token_update (instance, token, mcasted_retransmit + mcasted_regular); if (sq_lt_compare (instance->my_aru, token->aru) || instance->my_id.nodeid == token->aru_addr || token->aru_addr == 0) { token->aru = instance->my_aru; if (token->aru == token->seq) { token->aru_addr = 0; } else { token->aru_addr = instance->my_id.nodeid; } } if (token->aru == last_aru && token->aru_addr != 0) { instance->my_aru_count += 1; } else { instance->my_aru_count = 0; } /* * We really don't follow specification there. In specification, OTHER nodes * detect failure of one node (based on aru_count) and my_id IS NEVER added * to failed list (so node never mark itself as failed) */ if (instance->my_aru_count > instance->totem_config->fail_to_recv_const && token->aru_addr == instance->my_id.nodeid) { log_printf (instance->totemsrp_log_level_error, "FAILED TO RECEIVE"); instance->failed_to_recv = 1; memb_set_merge (&instance->my_id, 1, instance->my_failed_list, &instance->my_failed_list_entries); memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FAILED_TO_RECEIVE); } else { instance->my_token_seq = token->token_seq; token->token_seq += 1; if (instance->memb_state == MEMB_STATE_RECOVERY) { /* * instance->my_aru == instance->my_high_seq_received means this processor * has recovered all messages it can recover * (ie: its retrans queue is empty) */ if (cs_queue_is_empty (&instance->retrans_message_queue) == 0) { if (token->retrans_flg == 0) { token->retrans_flg = 1; instance->my_set_retrans_flg = 1; } } else if (token->retrans_flg == 1 && instance->my_set_retrans_flg) { token->retrans_flg = 0; instance->my_set_retrans_flg = 0; } log_printf (instance->totemsrp_log_level_debug, "token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, aru %x", token->retrans_flg, instance->my_set_retrans_flg, cs_queue_is_empty (&instance->retrans_message_queue), instance->my_retrans_flg_count, token->aru); if (token->retrans_flg == 0) { instance->my_retrans_flg_count += 1; } else { instance->my_retrans_flg_count = 0; } if (instance->my_retrans_flg_count == 2) { instance->my_install_seq = token->seq; } log_printf (instance->totemsrp_log_level_debug, "install seq %x aru %x high seq received %x", instance->my_install_seq, instance->my_aru, instance->my_high_seq_received); if (instance->my_retrans_flg_count >= 2 && instance->my_received_flg == 0 && sq_lte_compare (instance->my_install_seq, instance->my_aru)) { instance->my_received_flg = 1; instance->my_deliver_memb_entries = instance->my_trans_memb_entries; memcpy (instance->my_deliver_memb_list, instance->my_trans_memb_list, sizeof (struct totem_ip_address) * instance->my_trans_memb_entries); } if (instance->my_retrans_flg_count >= 3 && sq_lte_compare (instance->my_install_seq, token->aru)) { instance->my_rotation_counter += 1; } else { instance->my_rotation_counter = 0; } if (instance->my_rotation_counter == 2) { log_printf (instance->totemsrp_log_level_debug, "retrans flag count %x token aru %x install seq %x aru %x %x", instance->my_retrans_flg_count, token->aru, instance->my_install_seq, instance->my_aru, token->seq); memb_state_operational_enter (instance); instance->my_rotation_counter = 0; instance->my_retrans_flg_count = 0; } } totemnet_send_flush (instance->totemnet_context); token_send (instance, token, forward_token); #ifdef GIVEINFO tv_current = qb_util_nano_current_get (); tv_diff = tv_current - tv_old; tv_old = tv_current; log_printf (instance->totemsrp_log_level_debug, "I held %0.4f ms", ((float)tv_diff) / 1000000.0); #endif if (instance->memb_state == MEMB_STATE_OPERATIONAL) { messages_deliver_to_app (instance, 0, instance->my_high_seq_received); } /* * Deliver messages after token has been transmitted * to improve performance */ reset_token_timeout (instance); // REVIEWED reset_token_retransmit_timeout (instance); // REVIEWED if (instance->my_id.nodeid == instance->my_ring_id.rep && instance->my_token_held == 1) { start_token_hold_retransmit_timeout (instance); } token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_SENT); } break; } if ((forward_token) && instance->use_heartbeat) { reset_heartbeat_timeout(instance); } else { cancel_heartbeat_timeout(instance); } return (0); } static void messages_deliver_to_app ( struct totemsrp_instance *instance, int skip, unsigned int end_point) { struct sort_queue_item *sort_queue_item_p; unsigned int i; int res; struct mcast *mcast_in; struct mcast mcast_header; unsigned int range = 0; int endian_conversion_required; unsigned int my_high_delivered_stored = 0; struct srp_addr aligned_system_from; range = end_point - instance->my_high_delivered; if (range) { log_printf (instance->totemsrp_log_level_trace, "Delivering %x to %x", instance->my_high_delivered, end_point); } assert (range < QUEUE_RTR_ITEMS_SIZE_MAX); my_high_delivered_stored = instance->my_high_delivered; /* * Deliver messages in order from rtr queue to pending delivery queue */ for (i = 1; i <= range; i++) { void *ptr = 0; /* * If out of range of sort queue, stop assembly */ res = sq_in_range (&instance->regular_sort_queue, my_high_delivered_stored + i); if (res == 0) { break; } res = sq_item_get (&instance->regular_sort_queue, my_high_delivered_stored + i, &ptr); /* * If hole, stop assembly */ if (res != 0 && skip == 0) { break; } instance->my_high_delivered = my_high_delivered_stored + i; if (res != 0) { continue; } sort_queue_item_p = ptr; mcast_in = sort_queue_item_p->mcast; assert (mcast_in != (struct mcast *)0xdeadbeef); endian_conversion_required = 0; if (mcast_in->header.magic != TOTEM_MH_MAGIC) { endian_conversion_required = 1; mcast_endian_convert (mcast_in, &mcast_header); } else { memcpy (&mcast_header, mcast_in, sizeof (struct mcast)); } aligned_system_from = mcast_header.system_from; /* * Skip messages not originated in instance->my_deliver_memb */ if (skip && memb_set_subset (&aligned_system_from, 1, instance->my_deliver_memb_list, instance->my_deliver_memb_entries) == 0) { instance->my_high_delivered = my_high_delivered_stored + i; continue; } /* * Message found */ log_printf (instance->totemsrp_log_level_trace, "Delivering MCAST message with seq %x to pending delivery queue", mcast_header.seq); /* * Message is locally originated multicast */ instance->totemsrp_deliver_fn ( mcast_header.header.nodeid, ((char *)sort_queue_item_p->mcast) + sizeof (struct mcast), sort_queue_item_p->msg_len - sizeof (struct mcast), endian_conversion_required); } } /* * recv message handler called when MCAST message type received */ static int message_handler_mcast ( struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed) { struct sort_queue_item sort_queue_item; struct sq *sort_queue; struct mcast mcast_header; struct srp_addr aligned_system_from; if (check_mcast_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) { return (0); } if (endian_conversion_needed) { mcast_endian_convert (msg, &mcast_header); } else { memcpy (&mcast_header, msg, sizeof (struct mcast)); } if (mcast_header.header.encapsulated == MESSAGE_ENCAPSULATED) { sort_queue = &instance->recovery_sort_queue; } else { sort_queue = &instance->regular_sort_queue; } assert (msg_len <= FRAME_SIZE_MAX); #ifdef TEST_DROP_MCAST_PERCENTAGE if (random()%100 < TEST_DROP_MCAST_PERCENTAGE) { return (0); } #endif /* * If the message is foreign execute the switch below */ if (memcmp (&instance->my_ring_id, &mcast_header.ring_id, sizeof (struct memb_ring_id)) != 0) { aligned_system_from = mcast_header.system_from; switch (instance->memb_state) { case MEMB_STATE_OPERATIONAL: memb_set_merge ( &aligned_system_from, 1, instance->my_proc_list, &instance->my_proc_list_entries); memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE); break; case MEMB_STATE_GATHER: if (!memb_set_subset ( &aligned_system_from, 1, instance->my_proc_list, instance->my_proc_list_entries)) { memb_set_merge (&aligned_system_from, 1, instance->my_proc_list, &instance->my_proc_list_entries); memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE); return (0); } break; case MEMB_STATE_COMMIT: /* discard message */ instance->stats.rx_msg_dropped++; break; case MEMB_STATE_RECOVERY: /* discard message */ instance->stats.rx_msg_dropped++; break; } return (0); } log_printf (instance->totemsrp_log_level_trace, "Received ringid (" CS_PRI_RING_ID ") seq %x", mcast_header.ring_id.rep, (uint64_t)mcast_header.ring_id.seq, mcast_header.seq); /* * Add mcast message to rtr queue if not already in rtr queue * otherwise free io vectors */ if (msg_len > 0 && msg_len <= FRAME_SIZE_MAX && sq_in_range (sort_queue, mcast_header.seq) && sq_item_inuse (sort_queue, mcast_header.seq) == 0) { /* * Allocate new multicast memory block */ // TODO LEAK sort_queue_item.mcast = totemsrp_buffer_alloc (instance); if (sort_queue_item.mcast == NULL) { return (-1); /* error here is corrected by the algorithm */ } memcpy (sort_queue_item.mcast, msg, msg_len); sort_queue_item.msg_len = msg_len; if (sq_lt_compare (instance->my_high_seq_received, mcast_header.seq)) { instance->my_high_seq_received = mcast_header.seq; } sq_item_add (sort_queue, &sort_queue_item, mcast_header.seq); } update_aru (instance); if (instance->memb_state == MEMB_STATE_OPERATIONAL) { messages_deliver_to_app (instance, 0, instance->my_high_seq_received); } /* TODO remove from retrans message queue for old ring in recovery state */ return (0); } static int message_handler_memb_merge_detect ( struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed) { struct memb_merge_detect memb_merge_detect; struct srp_addr aligned_system_from; if (check_memb_merge_detect_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) { return (0); } if (endian_conversion_needed) { memb_merge_detect_endian_convert (msg, &memb_merge_detect); } else { memcpy (&memb_merge_detect, msg, sizeof (struct memb_merge_detect)); } /* * do nothing if this is a merge detect from this configuration */ if (memcmp (&instance->my_ring_id, &memb_merge_detect.ring_id, sizeof (struct memb_ring_id)) == 0) { return (0); } aligned_system_from = memb_merge_detect.system_from; /* * Execute merge operation */ switch (instance->memb_state) { case MEMB_STATE_OPERATIONAL: memb_set_merge (&aligned_system_from, 1, instance->my_proc_list, &instance->my_proc_list_entries); memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE); break; case MEMB_STATE_GATHER: if (!memb_set_subset ( &aligned_system_from, 1, instance->my_proc_list, instance->my_proc_list_entries)) { memb_set_merge (&aligned_system_from, 1, instance->my_proc_list, &instance->my_proc_list_entries); memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE); return (0); } break; case MEMB_STATE_COMMIT: /* do nothing in commit */ break; case MEMB_STATE_RECOVERY: /* do nothing in recovery */ break; } return (0); } static void memb_join_process ( struct totemsrp_instance *instance, const struct memb_join *memb_join) { struct srp_addr *proc_list; struct srp_addr *failed_list; int gather_entered = 0; int fail_minus_memb_entries = 0; struct srp_addr fail_minus_memb[PROCESSOR_COUNT_MAX]; struct srp_addr aligned_system_from; proc_list = (struct srp_addr *)memb_join->end_of_memb_join; failed_list = proc_list + memb_join->proc_list_entries; aligned_system_from = memb_join->system_from; log_printf(instance->totemsrp_log_level_trace, "memb_join_process"); memb_set_log(instance, instance->totemsrp_log_level_trace, "proclist", proc_list, memb_join->proc_list_entries); memb_set_log(instance, instance->totemsrp_log_level_trace, "faillist", failed_list, memb_join->failed_list_entries); memb_set_log(instance, instance->totemsrp_log_level_trace, "my_proclist", instance->my_proc_list, instance->my_proc_list_entries); memb_set_log(instance, instance->totemsrp_log_level_trace, "my_faillist", instance->my_failed_list, instance->my_failed_list_entries); if (memb_join->header.type == MESSAGE_TYPE_MEMB_JOIN) { if (instance->flushing) { if (memb_join->header.nodeid == LEAVE_DUMMY_NODEID) { log_printf (instance->totemsrp_log_level_warning, "Discarding LEAVE message during flush, nodeid=" CS_PRI_NODE_ID, memb_join->failed_list_entries > 0 ? failed_list[memb_join->failed_list_entries - 1 ].nodeid : LEAVE_DUMMY_NODEID); if (memb_join->failed_list_entries > 0) { my_leave_memb_set(instance, failed_list[memb_join->failed_list_entries - 1 ].nodeid); } } else { log_printf (instance->totemsrp_log_level_warning, "Discarding JOIN message during flush, nodeid=" CS_PRI_NODE_ID, memb_join->header.nodeid); } return; } else { if (memb_join->header.nodeid == LEAVE_DUMMY_NODEID) { log_printf (instance->totemsrp_log_level_debug, "Received LEAVE message from " CS_PRI_NODE_ID, memb_join->failed_list_entries > 0 ? failed_list[memb_join->failed_list_entries - 1 ].nodeid : LEAVE_DUMMY_NODEID); if (memb_join->failed_list_entries > 0) { my_leave_memb_set(instance, failed_list[memb_join->failed_list_entries - 1 ].nodeid); } } } } if (memb_set_equal (proc_list, memb_join->proc_list_entries, instance->my_proc_list, instance->my_proc_list_entries) && memb_set_equal (failed_list, memb_join->failed_list_entries, instance->my_failed_list, instance->my_failed_list_entries)) { if (memb_join->header.nodeid != LEAVE_DUMMY_NODEID) { memb_consensus_set (instance, &aligned_system_from); } if (memb_consensus_agreed (instance) && instance->failed_to_recv == 1) { instance->failed_to_recv = 0; instance->my_proc_list[0] = instance->my_id; instance->my_proc_list_entries = 1; instance->my_failed_list_entries = 0; memb_state_commit_token_create (instance); memb_state_commit_enter (instance); return; } if (memb_consensus_agreed (instance) && memb_lowest_in_config (instance)) { memb_state_commit_token_create (instance); memb_state_commit_enter (instance); } else { goto out; } } else if (memb_set_subset (proc_list, memb_join->proc_list_entries, instance->my_proc_list, instance->my_proc_list_entries) && memb_set_subset (failed_list, memb_join->failed_list_entries, instance->my_failed_list, instance->my_failed_list_entries)) { goto out; } else if (memb_set_subset (&aligned_system_from, 1, instance->my_failed_list, instance->my_failed_list_entries)) { goto out; } else { memb_set_merge (proc_list, memb_join->proc_list_entries, instance->my_proc_list, &instance->my_proc_list_entries); if (memb_set_subset ( &instance->my_id, 1, failed_list, memb_join->failed_list_entries)) { memb_set_merge ( &aligned_system_from, 1, instance->my_failed_list, &instance->my_failed_list_entries); } else { if (memb_set_subset ( &aligned_system_from, 1, instance->my_memb_list, instance->my_memb_entries)) { if (memb_set_subset ( &aligned_system_from, 1, instance->my_failed_list, instance->my_failed_list_entries) == 0) { memb_set_merge (failed_list, memb_join->failed_list_entries, instance->my_failed_list, &instance->my_failed_list_entries); } else { memb_set_subtract (fail_minus_memb, &fail_minus_memb_entries, failed_list, memb_join->failed_list_entries, instance->my_memb_list, instance->my_memb_entries); memb_set_merge (fail_minus_memb, fail_minus_memb_entries, instance->my_failed_list, &instance->my_failed_list_entries); } } } memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_JOIN); gather_entered = 1; } out: if (gather_entered == 0 && instance->memb_state == MEMB_STATE_OPERATIONAL) { memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE); } } static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out) { int i; struct srp_addr *in_proc_list; struct srp_addr *in_failed_list; struct srp_addr *out_proc_list; struct srp_addr *out_failed_list; out->header.magic = TOTEM_MH_MAGIC; out->header.version = TOTEM_MH_VERSION; out->header.type = in->header.type; out->header.nodeid = swab32 (in->header.nodeid); out->system_from = srp_addr_endian_convert(in->system_from); out->proc_list_entries = swab32 (in->proc_list_entries); out->failed_list_entries = swab32 (in->failed_list_entries); out->ring_seq = swab64 (in->ring_seq); in_proc_list = (struct srp_addr *)in->end_of_memb_join; in_failed_list = in_proc_list + out->proc_list_entries; out_proc_list = (struct srp_addr *)out->end_of_memb_join; out_failed_list = out_proc_list + out->proc_list_entries; for (i = 0; i < out->proc_list_entries; i++) { out_proc_list[i] = srp_addr_endian_convert (in_proc_list[i]); } for (i = 0; i < out->failed_list_entries; i++) { out_failed_list[i] = srp_addr_endian_convert (in_failed_list[i]); } } static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out) { int i; struct srp_addr *in_addr = (struct srp_addr *)in->end_of_commit_token; struct srp_addr *out_addr = (struct srp_addr *)out->end_of_commit_token; struct memb_commit_token_memb_entry *in_memb_list; struct memb_commit_token_memb_entry *out_memb_list; out->header.magic = TOTEM_MH_MAGIC; out->header.version = TOTEM_MH_VERSION; out->header.type = in->header.type; out->header.nodeid = swab32 (in->header.nodeid); out->token_seq = swab32 (in->token_seq); out->ring_id.rep = swab32(in->ring_id.rep); out->ring_id.seq = swab64 (in->ring_id.seq); out->retrans_flg = swab32 (in->retrans_flg); out->memb_index = swab32 (in->memb_index); out->addr_entries = swab32 (in->addr_entries); in_memb_list = (struct memb_commit_token_memb_entry *)(in_addr + out->addr_entries); out_memb_list = (struct memb_commit_token_memb_entry *)(out_addr + out->addr_entries); for (i = 0; i < out->addr_entries; i++) { out_addr[i] = srp_addr_endian_convert (in_addr[i]); /* * Only convert the memb entry if it has been set */ if (in_memb_list[i].ring_id.rep != 0) { out_memb_list[i].ring_id.rep = swab32(in_memb_list[i].ring_id.rep); out_memb_list[i].ring_id.seq = swab64 (in_memb_list[i].ring_id.seq); out_memb_list[i].aru = swab32 (in_memb_list[i].aru); out_memb_list[i].high_delivered = swab32 (in_memb_list[i].high_delivered); out_memb_list[i].received_flg = swab32 (in_memb_list[i].received_flg); } } } static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out) { int i; out->header.magic = TOTEM_MH_MAGIC; out->header.version = TOTEM_MH_VERSION; out->header.type = in->header.type; out->header.nodeid = swab32 (in->header.nodeid); out->seq = swab32 (in->seq); out->token_seq = swab32 (in->token_seq); out->aru = swab32 (in->aru); out->ring_id.rep = swab32(in->ring_id.rep); out->aru_addr = swab32(in->aru_addr); out->ring_id.seq = swab64 (in->ring_id.seq); out->fcc = swab32 (in->fcc); out->backlog = swab32 (in->backlog); out->retrans_flg = swab32 (in->retrans_flg); out->rtr_list_entries = swab32 (in->rtr_list_entries); for (i = 0; i < out->rtr_list_entries; i++) { out->rtr_list[i].ring_id.rep = swab32(in->rtr_list[i].ring_id.rep); out->rtr_list[i].ring_id.seq = swab64 (in->rtr_list[i].ring_id.seq); out->rtr_list[i].seq = swab32 (in->rtr_list[i].seq); } } static void mcast_endian_convert (const struct mcast *in, struct mcast *out) { out->header.magic = TOTEM_MH_MAGIC; out->header.version = TOTEM_MH_VERSION; out->header.type = in->header.type; out->header.nodeid = swab32 (in->header.nodeid); out->header.encapsulated = in->header.encapsulated; out->seq = swab32 (in->seq); out->this_seqno = swab32 (in->this_seqno); out->ring_id.rep = swab32(in->ring_id.rep); out->ring_id.seq = swab64 (in->ring_id.seq); out->node_id = swab32 (in->node_id); out->guarantee = swab32 (in->guarantee); out->system_from = srp_addr_endian_convert(in->system_from); } static void memb_merge_detect_endian_convert ( const struct memb_merge_detect *in, struct memb_merge_detect *out) { out->header.magic = TOTEM_MH_MAGIC; out->header.version = TOTEM_MH_VERSION; out->header.type = in->header.type; out->header.nodeid = swab32 (in->header.nodeid); out->ring_id.rep = swab32(in->ring_id.rep); out->ring_id.seq = swab64 (in->ring_id.seq); out->system_from = srp_addr_endian_convert (in->system_from); } static int ignore_join_under_operational ( struct totemsrp_instance *instance, const struct memb_join *memb_join) { struct srp_addr *proc_list; struct srp_addr *failed_list; unsigned long long ring_seq; struct srp_addr aligned_system_from; proc_list = (struct srp_addr *)memb_join->end_of_memb_join; failed_list = proc_list + memb_join->proc_list_entries; ring_seq = memb_join->ring_seq; aligned_system_from = memb_join->system_from; if (memb_set_subset (&instance->my_id, 1, failed_list, memb_join->failed_list_entries)) { return (1); } /* * In operational state, my_proc_list is exactly the same as * my_memb_list. */ if ((memb_set_subset (&aligned_system_from, 1, instance->my_memb_list, instance->my_memb_entries)) && (ring_seq < instance->my_ring_id.seq)) { return (1); } return (0); } static int message_handler_memb_join ( struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed) { const struct memb_join *memb_join; struct memb_join *memb_join_convert = alloca (msg_len); struct srp_addr aligned_system_from; if (check_memb_join_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) { return (0); } if (endian_conversion_needed) { memb_join = memb_join_convert; memb_join_endian_convert (msg, memb_join_convert); } else { memb_join = msg; } aligned_system_from = memb_join->system_from; /* * If the process paused because it wasn't scheduled in a timely * fashion, flush the join messages because they may be queued * entries */ if (pause_flush (instance)) { return (0); } if (instance->token_ring_id_seq < memb_join->ring_seq) { instance->token_ring_id_seq = memb_join->ring_seq; } switch (instance->memb_state) { case MEMB_STATE_OPERATIONAL: if (!ignore_join_under_operational (instance, memb_join)) { memb_join_process (instance, memb_join); } break; case MEMB_STATE_GATHER: memb_join_process (instance, memb_join); break; case MEMB_STATE_COMMIT: if (memb_set_subset (&aligned_system_from, 1, instance->my_new_memb_list, instance->my_new_memb_entries) && memb_join->ring_seq >= instance->my_ring_id.seq) { memb_join_process (instance, memb_join); memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE); } break; case MEMB_STATE_RECOVERY: if (memb_set_subset (&aligned_system_from, 1, instance->my_new_memb_list, instance->my_new_memb_entries) && memb_join->ring_seq >= instance->my_ring_id.seq) { memb_join_process (instance, memb_join); memb_recovery_state_token_loss (instance); memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY); } break; } return (0); } static int message_handler_memb_commit_token ( struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed) { struct memb_commit_token *memb_commit_token_convert = alloca (msg_len); struct memb_commit_token *memb_commit_token; struct srp_addr sub[PROCESSOR_COUNT_MAX]; int sub_entries; struct srp_addr *addr; log_printf (instance->totemsrp_log_level_debug, "got commit token"); if (check_memb_commit_token_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) { return (0); } if (endian_conversion_needed) { memb_commit_token_endian_convert (msg, memb_commit_token_convert); } else { memcpy (memb_commit_token_convert, msg, msg_len); } memb_commit_token = memb_commit_token_convert; addr = (struct srp_addr *)memb_commit_token->end_of_commit_token; #ifdef TEST_DROP_COMMIT_TOKEN_PERCENTAGE if (random()%100 < TEST_DROP_COMMIT_TOKEN_PERCENTAGE) { return (0); } #endif switch (instance->memb_state) { case MEMB_STATE_OPERATIONAL: /* discard token */ break; case MEMB_STATE_GATHER: memb_set_subtract (sub, &sub_entries, instance->my_proc_list, instance->my_proc_list_entries, instance->my_failed_list, instance->my_failed_list_entries); if (memb_set_equal (addr, memb_commit_token->addr_entries, sub, sub_entries) && memb_commit_token->ring_id.seq > instance->my_ring_id.seq) { memcpy (instance->commit_token, memb_commit_token, msg_len); memb_state_commit_enter (instance); } break; case MEMB_STATE_COMMIT: /* * If retransmitted commit tokens are sent on this ring * filter them out and only enter recovery once the * commit token has traversed the array. This is * determined by : * memb_commit_token->memb_index == memb_commit_token->addr_entries) { */ if (memb_commit_token->ring_id.seq == instance->my_ring_id.seq && memb_commit_token->memb_index == memb_commit_token->addr_entries) { memb_state_recovery_enter (instance, memb_commit_token); } break; case MEMB_STATE_RECOVERY: if (instance->my_id.nodeid == instance->my_ring_id.rep) { /* Filter out duplicated tokens */ if (instance->originated_orf_token) { break; } instance->originated_orf_token = 1; log_printf (instance->totemsrp_log_level_debug, "Sending initial ORF token"); // TODO convert instead of initiate orf_token_send_initial (instance); reset_token_timeout (instance); // REVIEWED reset_token_retransmit_timeout (instance); // REVIEWED } break; } return (0); } static int message_handler_token_hold_cancel ( struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed) { const struct token_hold_cancel *token_hold_cancel = msg; if (check_token_hold_cancel_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) { return (0); } if (memcmp (&token_hold_cancel->ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id)) == 0) { instance->my_seq_unchanged = 0; if (instance->my_ring_id.rep == instance->my_id.nodeid) { timer_function_token_retransmit_timeout (instance); } } return (0); } static int check_message_header_validity( void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from) { struct totemsrp_instance *instance = context; const struct totem_message_header *message_header = msg; const char *guessed_str; const char *msg_byte = msg; if (msg_len < sizeof (struct totem_message_header)) { log_printf (instance->totemsrp_log_level_security, "Message received from %s is too short... Ignoring %u.", totemip_sa_print((struct sockaddr *)system_from), (unsigned int)msg_len); return (-1); } if (message_header->magic != TOTEM_MH_MAGIC && message_header->magic != swab16(TOTEM_MH_MAGIC)) { /* * We've received ether Knet, old version of Corosync, * or something else. Do some guessing to display (hopefully) * helpful message */ guessed_str = NULL; if (message_header->magic == 0xFFFF) { /* * Corosync 2.2 used header with two UINT8_MAX */ guessed_str = "Corosync 2.2"; } else if (message_header->magic == 0xFEFE) { /* * Corosync 2.3+ used header with two UINT8_MAX - 1 */ guessed_str = "Corosync 2.3+"; } else if (msg_byte[0] == 0x01) { /* * Knet has stable1 with first byte of message == 1 */ guessed_str = "unencrypted Kronosnet"; } else if (msg_byte[0] >= 0 && msg_byte[0] <= 5) { /* * Unencrypted Corosync 1.x/OpenAIS has first byte * 0-5. Collision with Knet (but still worth the try) */ guessed_str = "unencrypted Corosync 2.0/2.1/1.x/OpenAIS"; } else { /* * Encrypted Kronosned packet has a hash at the end of * the packet and nothing specific at the beginning of the * packet (just encrypted data). * Encrypted Corosync 1.x/OpenAIS is quite similar but hash_digest * is in the beginning of the packet. * * So it's not possible to reliably detect ether of them. */ guessed_str = "encrypted Kronosnet/Corosync 2.0/2.1/1.x/OpenAIS or unknown"; } log_printf(instance->totemsrp_log_level_security, "Message received from %s has bad magic number (probably sent by %s).. Ignoring", totemip_sa_print((struct sockaddr *)system_from), guessed_str); return (-1); } if (message_header->version != TOTEM_MH_VERSION) { log_printf(instance->totemsrp_log_level_security, "Message received from %s has unsupported version %u... Ignoring", totemip_sa_print((struct sockaddr *)system_from), message_header->version); return (-1); } return (0); } void main_deliver_fn ( void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from) { struct totemsrp_instance *instance = context; const struct totem_message_header *message_header = msg; if (check_message_header_validity(context, msg, msg_len, system_from) == -1) { return ; } switch (message_header->type) { case MESSAGE_TYPE_ORF_TOKEN: instance->stats.orf_token_rx++; break; case MESSAGE_TYPE_MCAST: instance->stats.mcast_rx++; break; case MESSAGE_TYPE_MEMB_MERGE_DETECT: instance->stats.memb_merge_detect_rx++; break; case MESSAGE_TYPE_MEMB_JOIN: instance->stats.memb_join_rx++; break; case MESSAGE_TYPE_MEMB_COMMIT_TOKEN: instance->stats.memb_commit_token_rx++; break; case MESSAGE_TYPE_TOKEN_HOLD_CANCEL: instance->stats.token_hold_cancel_rx++; break; default: log_printf (instance->totemsrp_log_level_security, "Message received from %s has wrong type... ignoring %d.\n", totemip_sa_print((struct sockaddr *)system_from), (int)message_header->type); instance->stats.rx_msg_dropped++; return; } /* * Handle incoming message */ totemsrp_message_handlers.handler_functions[(int)message_header->type] ( instance, msg, msg_len, message_header->magic != TOTEM_MH_MAGIC); } int totemsrp_iface_set ( void *context, const struct totem_ip_address *interface_addr, unsigned short ip_port, unsigned int iface_no) { struct totemsrp_instance *instance = context; int res; totemip_copy(&instance->my_addrs[iface_no], interface_addr); res = totemnet_iface_set ( instance->totemnet_context, interface_addr, ip_port, iface_no); return (res); } /* Contrary to its name, this only gets called when the interface is enabled */ void main_iface_change_fn ( void *context, const struct totem_ip_address *iface_addr, unsigned int iface_no) { struct totemsrp_instance *instance = context; int num_interfaces; int i; if (!instance->my_id.nodeid) { instance->my_id.nodeid = iface_addr->nodeid; } totemip_copy (&instance->my_addrs[iface_no], iface_addr); if (instance->iface_changes++ == 0) { instance->memb_ring_id_create_or_load (&instance->my_ring_id, instance->my_id.nodeid); /* * Increase the ring_id sequence number. This doesn't follow specification. * Solves problem with restarted leader node (node with lowest nodeid) before * rest of the cluster forms new membership and guarantees unique ring_id for * new singleton configuration. */ instance->my_ring_id.seq++; instance->token_ring_id_seq = instance->my_ring_id.seq; log_printf ( instance->totemsrp_log_level_debug, "Created or loaded sequence id " CS_PRI_RING_ID " for this ring.", instance->my_ring_id.rep, (uint64_t)instance->my_ring_id.seq); if (instance->totemsrp_service_ready_fn) { instance->totemsrp_service_ready_fn (); } } num_interfaces = 0; for (i = 0; i < INTERFACE_MAX; i++) { if (instance->totem_config->interfaces[i].configured) { num_interfaces++; } } if (instance->iface_changes >= num_interfaces) { /* We need to clear orig_interfaces so that 'commit' diffs against nothing */ instance->totem_config->orig_interfaces = malloc (sizeof (struct totem_interface) * INTERFACE_MAX); assert(instance->totem_config->orig_interfaces != NULL); memset(instance->totem_config->orig_interfaces, 0, sizeof (struct totem_interface) * INTERFACE_MAX); totemconfig_commit_new_params(instance->totem_config, icmap_get_global_map()); memb_state_gather_enter (instance, TOTEMSRP_GSFROM_INTERFACE_CHANGE); free(instance->totem_config->orig_interfaces); } } void totemsrp_net_mtu_adjust (struct totem_config *totem_config) { totem_config->net_mtu -= 2 * sizeof (struct mcast); } void totemsrp_service_ready_register ( void *context, void (*totem_service_ready) (void)) { struct totemsrp_instance *instance = (struct totemsrp_instance *)context; instance->totemsrp_service_ready_fn = totem_service_ready; } int totemsrp_member_add ( void *context, const struct totem_ip_address *member, int iface_no) { struct totemsrp_instance *instance = (struct totemsrp_instance *)context; int res; res = totemnet_member_add (instance->totemnet_context, &instance->my_addrs[iface_no], member, iface_no); return (res); } int totemsrp_member_remove ( void *context, const struct totem_ip_address *member, int iface_no) { struct totemsrp_instance *instance = (struct totemsrp_instance *)context; int res; res = totemnet_member_remove (instance->totemnet_context, member, iface_no); return (res); } void totemsrp_threaded_mode_enable (void *context) { struct totemsrp_instance *instance = (struct totemsrp_instance *)context; instance->threaded_mode_enabled = 1; } void totemsrp_trans_ack (void *context) { struct totemsrp_instance *instance = (struct totemsrp_instance *)context; instance->waiting_trans_ack = 0; instance->totemsrp_waiting_trans_ack_cb_fn (0); } int totemsrp_reconfigure (void *context, struct totem_config *totem_config) { struct totemsrp_instance *instance = (struct totemsrp_instance *)context; int res; res = totemnet_reconfigure (instance->totemnet_context, totem_config); return (res); } int totemsrp_crypto_reconfigure_phase (void *context, struct totem_config *totem_config, cfg_message_crypto_reconfig_phase_t phase) { struct totemsrp_instance *instance = (struct totemsrp_instance *)context; int res; res = totemnet_crypto_reconfigure_phase (instance->totemnet_context, totem_config, phase); return (res); } void totemsrp_stats_clear (void *context, int flags) { struct totemsrp_instance *instance = (struct totemsrp_instance *)context; memset(&instance->stats, 0, sizeof(totemsrp_stats_t)); if (flags & TOTEMPG_STATS_CLEAR_TRANSPORT) { totemnet_stats_clear (instance->totemnet_context); } } void totemsrp_force_gather (void *context) { timer_function_orf_token_timeout(context); } diff --git a/include/corosync/totem/totem.h b/include/corosync/totem/totem.h index 8b166566..bdb6a15f 100644 --- a/include/corosync/totem/totem.h +++ b/include/corosync/totem/totem.h @@ -1,292 +1,294 @@ /* * Copyright (c) 2005 MontaVista Software, Inc. * Copyright (c) 2006-2012 Red Hat, Inc. * * Author: Steven Dake (sdake@redhat.com) * * All rights reserved. * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #ifndef TOTEM_H_DEFINED #define TOTEM_H_DEFINED #include "totemip.h" #include #include #include #ifdef HAVE_SMALL_MEMORY_FOOTPRINT #define PROCESSOR_COUNT_MAX 16 #define MESSAGE_SIZE_MAX 1024*64 #define MESSAGE_QUEUE_MAX 512 #else #define PROCESSOR_COUNT_MAX 384 #define MESSAGE_SIZE_MAX 1024*1024 /* (1MB) */ #define MESSAGE_QUEUE_MAX ((4 * MESSAGE_SIZE_MAX) / totem_config->net_mtu) #endif /* HAVE_SMALL_MEMORY_FOOTPRINT */ #define FRAME_SIZE_MAX KNET_MAX_PACKET_SIZE #define CONFIG_STRING_LEN_MAX 128 /* * Estimation of required buffer size for totemudp and totemudpu - it should be at least * sizeof(memb_join) + PROCESSOR_MAX * 2 * sizeof(srp_addr)) * if we want to support PROCESSOR_MAX nodes, but because we don't have * srp_addr and memb_join, we have to use estimation. * TODO: Consider moving srp_addr/memb_join into totem headers instead of totemsrp.c */ #define UDP_RECEIVE_FRAME_SIZE_MAX (PROCESSOR_COUNT_MAX * (INTERFACE_MAX * 2 * sizeof(struct totem_ip_address)) + 1024) #define TRANSMITS_ALLOWED 16 #define SEND_THREADS_MAX 16 /* This must be <= KNET_MAX_LINK */ #define INTERFACE_MAX 8 #define BIND_MAX_RETRIES 10 #define BIND_RETRIES_INTERVAL 100 /** * Maximum number of continuous gather states */ #define MAX_NO_CONT_GATHER 3 /* * Maximum number of continuous failures get from sendmsg call */ #define MAX_NO_CONT_SENDMSG_FAILURES 30 struct totem_interface { struct totem_ip_address bindnet; struct totem_ip_address boundto; struct totem_ip_address mcast_addr; struct totem_ip_address local_ip; uint16_t ip_port; uint16_t ttl; uint8_t configured; int member_count; int knet_link_priority; int knet_ping_interval; int knet_ping_timeout; int knet_ping_precision; int knet_pong_count; int knet_transport; struct totem_ip_address member_list[PROCESSOR_COUNT_MAX]; }; struct totem_logging_configuration { void (*log_printf) ( int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format, ...) __attribute__((format(printf, 6, 7))); int log_level_security; int log_level_error; int log_level_warning; int log_level_notice; int log_level_debug; int log_level_trace; int log_subsys_id; }; /* * COrosync TOtem. Also used as an endian_detector. */ #define TOTEM_MH_MAGIC 0xC070 #define TOTEM_MH_VERSION 0x03 struct totem_message_header { unsigned short magic; char version; char type; char encapsulated; unsigned int nodeid; unsigned int target_nodeid; } __attribute__((packed)); enum { TOTEM_PRIVATE_KEY_LEN_MIN = KNET_MIN_KEY_LEN, TOTEM_PRIVATE_KEY_LEN_MAX = KNET_MAX_KEY_LEN }; enum { TOTEM_LINK_MODE_BYTES = 64 }; typedef enum { TOTEM_TRANSPORT_UDP = 0, TOTEM_TRANSPORT_UDPU = 1, TOTEM_TRANSPORT_KNET = 2 } totem_transport_t; #define MEMB_RING_ID struct memb_ring_id { unsigned int rep; unsigned long long seq; } __attribute__((packed)); typedef enum { CRYPTO_RECONFIG_PHASE_ACTIVATE = 1, CRYPTO_RECONFIG_PHASE_CLEANUP = 2, } cfg_message_crypto_reconfig_phase_t; struct totem_config { int version; /* * network */ struct totem_interface *interfaces; struct totem_interface *orig_interfaces; /* for reload */ unsigned int node_id; unsigned int clear_node_high_bit; unsigned int knet_pmtud_interval; /* * key information */ unsigned char private_key[TOTEM_PRIVATE_KEY_LEN_MAX]; unsigned int private_key_len; /* * Totem configuration parameters */ unsigned int token_timeout; unsigned int token_warning; unsigned int token_retransmit_timeout; unsigned int token_hold_timeout; unsigned int token_retransmits_before_loss_const; unsigned int join_timeout; unsigned int send_join_timeout; unsigned int consensus_timeout; unsigned int merge_timeout; unsigned int downcheck_timeout; unsigned int fail_to_recv_const; unsigned int seqno_unchanged_const; char link_mode[TOTEM_LINK_MODE_BYTES]; struct totem_logging_configuration totem_logging_configuration; unsigned int net_mtu; unsigned int threads; unsigned int heartbeat_failures_allowed; unsigned int max_network_delay; unsigned int window_size; unsigned int max_messages; unsigned int broadcast_use; char crypto_model[CONFIG_STRING_LEN_MAX]; char crypto_cipher_type[CONFIG_STRING_LEN_MAX]; char crypto_hash_type[CONFIG_STRING_LEN_MAX]; int crypto_index; /* Num of crypto config currently loaded into knet ( 1 or 2 ) */ int crypto_changed; /* Has crypto changed since last time? (it's expensive to reload) */ char knet_compression_model[CONFIG_STRING_LEN_MAX]; uint32_t knet_compression_threshold; int knet_compression_level; totem_transport_t transport_number; unsigned int miss_count_const; enum totem_ip_version_enum ip_version; unsigned int block_unlisted_ips; + unsigned int cancel_token_hold_on_retransmit; + void (*totem_memb_ring_id_create_or_load) ( struct memb_ring_id *memb_ring_id, unsigned int nodeid); void (*totem_memb_ring_id_store) ( const struct memb_ring_id *memb_ring_id, unsigned int nodeid); }; /* * Node status returned from the API * Usually the same as the cfg version (except for * link_status) */ #define TOTEM_NODE_STATUS_STRUCTURE_VERSION 1 struct totem_node_status { uint32_t version; /* Structure version */ unsigned int nodeid; uint8_t reachable; uint8_t remote; uint8_t external; uint8_t onwire_min; uint8_t onwire_max; uint8_t onwire_ver; struct knet_link_status link_status[KNET_MAX_LINK]; }; #define TOTEM_CONFIGURATION_TYPE enum totem_configuration_type { TOTEM_CONFIGURATION_REGULAR, TOTEM_CONFIGURATION_TRANSITIONAL }; #define TOTEM_CALLBACK_TOKEN_TYPE enum totem_callback_token_type { TOTEM_CALLBACK_TOKEN_RECEIVED = 1, TOTEM_CALLBACK_TOKEN_SENT = 2 }; enum totem_event_type { TOTEM_EVENT_DELIVERY_CONGESTED, TOTEM_EVENT_NEW_MSG, }; #endif /* TOTEM_H_DEFINED */ diff --git a/man/corosync.conf.5 b/man/corosync.conf.5 index 0588ad1e..a3771ea7 100644 --- a/man/corosync.conf.5 +++ b/man/corosync.conf.5 @@ -1,1032 +1,1045 @@ .\"/* .\" * Copyright (c) 2005 MontaVista Software, Inc. .\" * Copyright (c) 2006-2021 Red Hat, Inc. .\" * .\" * All rights reserved. .\" * .\" * Author: Steven Dake (sdake@redhat.com) .\" * .\" * This software licensed under BSD license, the text of which follows: .\" * .\" * Redistribution and use in source and binary forms, with or without .\" * modification, are permitted provided that the following conditions are met: .\" * .\" * - Redistributions of source code must retain the above copyright notice, .\" * this list of conditions and the following disclaimer. .\" * - Redistributions in binary form must reproduce the above copyright notice, .\" * this list of conditions and the following disclaimer in the documentation .\" * and/or other materials provided with the distribution. .\" * - Neither the name of the MontaVista Software, Inc. nor the names of its .\" * contributors may be used to endorse or promote products derived from this .\" * software without specific prior written permission. .\" * .\" * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" .\" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE .\" * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE .\" * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE .\" * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR .\" * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF .\" * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS .\" * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN .\" * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) .\" * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF .\" * THE POSSIBILITY OF SUCH DAMAGE. .\" */ -.TH COROSYNC_CONF 5 2021-07-23 "corosync Man Page" "Corosync Cluster Engine Programmer's Manual" +.TH COROSYNC_CONF 5 2021-08-11 "corosync Man Page" "Corosync Cluster Engine Programmer's Manual" .SH NAME corosync.conf - corosync executive configuration file .SH SYNOPSIS /etc/corosync/corosync.conf .SH DESCRIPTION The corosync.conf instructs the corosync executive about various parameters needed to control the corosync executive. Empty lines and lines starting with # character are ignored. The configuration file consists of bracketed top level directives. The possible directive choices are: .TP totem { } This top level directive contains configuration options for the totem protocol. .TP logging { } This top level directive contains configuration options for logging. .TP quorum { } This top level directive contains configuration options for quorum. .TP nodelist { } This top level directive contains configuration options for nodes in cluster. .TP system { } This top level directive contains configuration options related to system. .TP resources { } This top level directive contains configuration options for resources. .TP nozzle { } This top level directive contains configuration options for a libnozzle device. .PP The .B interface sub-directive of totem is optional for UDP and knet transports. For knet, multiple interface subsections define parameters for each knet link on the system. For UDPU an interface section is not needed and it is recommended that the nodelist is used to define cluster nodes. .TP linknumber This specifies the link number for the interface. When using the knet protocol, each interface should specify separate link numbers to uniquely identify to the membership protocol which interface to use for which link. The linknumber must start at 0. For UDP the only supported linknumber is 0. .TP knet_link_priority This specifies the priority for the link when knet is used in 'passive' mode. (see link_mode below) .TP knet_ping_interval This specifies the interval between knet link pings. knet_ping_interval and knet_ping_timeout are a pair, if one is specified the other should be too, otherwise one will be calculated from the token timeout and one will be taken from the config file. (default is token timeout / (knet_pong_count*2)) .TP knet_ping_timeout If no ping is received within this time, the knet link is declared dead. knet_ping_interval and knet_ping_timeout are a pair, if one is specified the other should be too, otherwise one will be calculated from the token timeout and one will be taken from the config file. (default is token timeout / knet_pong_count) .TP knet_ping_precision How many values of latency are used to calculate the average link latency. (default 2048 samples) .TP knet_pong_count How many valid ping/pongs before a link is marked UP. (default 2) .TP knet_transport Which IP transport knet should use. valid values are "sctp" or "udp". (default: udp) .TP bindnetaddr (udp only) This specifies the network address the corosync executive should bind to when using udp. bindnetaddr (udp only) should be an IP address configured on the system, or a network address. For example, if the local interface is 192.168.5.92 with netmask 255.255.255.0, you should set bindnetaddr to 192.168.5.92 or 192.168.5.0. If the local interface is 192.168.5.92 with netmask 255.255.255.192, set bindnetaddr to 192.168.5.92 or 192.168.5.64, and so forth. This may also be an IPV6 address, in which case IPV6 networking will be used. In this case, the exact address must be specified and there is no automatic selection of the network interface within a specific subnet as with IPv4. If IPv6 networking is used, the nodeid field in nodelist must be specified. .TP broadcast (udp only) This is optional and can be set to yes. If it is set to yes, the broadcast address will be used for communication. If this option is set, mcastaddr should not be set. .TP mcastaddr (udp only) This is the multicast address used by corosync executive. The default should work for most networks, but the network administrator should be queried about a multicast address to use. Avoid 224.x.x.x because this is a "config" multicast address. This may also be an IPV6 multicast address, in which case IPV6 networking will be used. If IPv6 networking is used, the nodeid field in nodelist must be specified. It's not necessary to use this option if cluster_name option is used. If both options are used, mcastaddr has higher priority. .TP mcastport (udp only) This specifies the UDP port number. It is possible to use the same multicast address on a network with the corosync services configured for different UDP ports. Please note corosync uses two UDP ports mcastport (for mcast receives) and mcastport - 1 (for mcast sends). If you have multiple clusters on the same network using the same mcastaddr please configure the mcastports with a gap. .TP ttl (udp only) This specifies the Time To Live (TTL). If you run your cluster on a routed network then the default of "1" will be too small. This option provides a way to increase this up to 255. The valid range is 0..255. .PP .PP Within the .B totem directive, there are seven configuration options of which one is required, five are optional, and one is required when IPV6 is configured in the interface subdirective. The required directive controls the version of the totem configuration. The optional option unless using IPV6 directive controls identification of the processor. The optional options control secrecy and authentication, the network mode of operation and maximum network MTU field. .TP version This specifies the version of the configuration file. Currently the only valid version for this directive is 2. .TP clear_node_high_bit This configuration option is optional and is only relevant when no nodeid is specified. Some corosync clients require a signed 32 bit nodeid that is greater than zero however by default corosync uses all 32 bits of the IPv4 address space when generating a nodeid. Set this option to yes to force the high bit to be zero and therefore ensure the nodeid is a positive signed 32 bit integer. WARNING: Cluster behavior is undefined if this option is enabled on only a subset of the cluster (for example during a rolling upgrade). .TP crypto_model This specifies which cryptographic library should be used by knet. Supported values depend on the libknet build and on the installed cryptography libraries. Typically nss and openssl will be available but gcrypt and others could also be allowed. The default is nss. .TP crypto_hash This specifies which HMAC authentication should be used to authenticate all messages. Valid values are none (no authentication), md5, sha1, sha256, sha384 and sha512. Encrypted transmission is only supported for the knet transport. The default is none. .TP crypto_cipher This specifies which cipher should be used to encrypt all messages. Valid values are none (no encryption), aes256, aes192 and aes128. Enabling crypto_cipher, requires also enabling of crypto_hash. Encrypted transmission is only supported for the knet transport. The default is none. .TP secauth This implies crypto_cipher=aes256 and crypto_hash=sha256, unless those options are explicitly set. Encrypted transmission is only supported for the knet transport. The default is off. .TP keyfile This specifies the fully qualified path to the shared key used to authenticate and encrypt data used within the Totem protocol. The default is /etc/corosync/authkey. .TP key Shared key stored in configuration instead of authkey file. This option has lower precedence than keyfile option so it's used only when keyfile is not specified. Using this option is not recommended for security reasons. .TP link_mode This specifies the Kronosnet mode, which may be passive, active, or rr (round-robin). .B passive: the active link with the highest priority (highest number) will be used. If one or more links share the same priority the one with the lowest link ID will be used. .B active: All active links will be used simultaneously to send traffic. link priority is ignored. .B rr: Round-Robin policy. Each packet will be sent to the next active link in order. If only one interface directive is specified, passive is automatically chosen. The maximum number of interface directives that is allowed with Kronosnet is 8. For other transports it is 1. .TP netmtu This specifies the network maximum transmit unit. To set this value beyond 1500, the regular frame MTU, requires ethernet devices that support large, or also called jumbo, frames. If any device in the network doesn't support large frames, the protocol will not operate properly. The hosts must also have their mtu size set from 1500 to whatever frame size is specified here. Please note while some NICs or switches claim large frame support, they support 9000 MTU as the maximum frame size including the IP header. Setting the netmtu and host MTUs to 9000 will cause totem to use the full 9000 bytes of the frame. Then Linux will add a 18 byte header moving the full frame size to 9018. As a result some hardware will not operate properly with this size of data. A netmtu of 8982 seems to work for the few large frame devices that have been tested. Some manufacturers claim large frame support when in fact they support frame sizes of 4500 bytes. When sending multicast traffic, if the network frequently reconfigures, chances are that some device in the network doesn't support large frames. Choose hardware carefully if intending to use large frame support. The default is 1500. .TP transport This directive controls the transport mechanism used. The default is knet. The transport type can also be set to udpu or udp. Only knet allows crypto or multiple interfaces per node. .TP cluster_name This specifies the name of cluster and it's used for automatic generating of multicast address. .TP config_version This specifies version of config file. This is converted to unsigned 64-bit int. By default it's 0. Option is used to prevent joining old nodes with not up-to-date configuration. If value is not 0, and node is going for first time (only for first time, join after split doesn't follow this rules) from single-node membership to multiple nodes membership, other nodes config_versions are collected. If current node config_version is not equal to highest of collected versions, corosync is terminated. .TP ip_version This specifies version of IP to ask DNS resolver for. The value can be one of .B ipv4 (look only for an IPv4 address) , .B ipv6 (check only IPv6 address) , .B ipv4-6 (look for all address families and use first IPv4 address found in the list if there is such address, otherwise use first IPv6 address) and .B ipv6-4 (look for all address families and use first IPv6 address found in the list if there is such address, otherwise use first IPv4 address). Default (if unspecified) is .B ipv6-4 for knet and udpu transports and .B ipv4 for udp. The knet transport supports IPv4 and IPv6 addresses concurrently, provided they are consistent on each link. Within the .B totem directive, there are several configuration options which are used to control the operation of the protocol. It is generally not recommended to change any of these values without proper guidance and sufficient testing. Some networks may require larger values if suffering from frequent reconfigurations. Some applications may require faster failure detection times which can be achieved by reducing the token timeout. .TP token This timeout is used directly or as a base for real token timeout calculation (explained in .B token_coefficient section). Token timeout specifies in milliseconds until a token loss is declared after not receiving a token. This is the time spent detecting a failure of a processor in the current configuration. Reforming a new configuration takes about 50 milliseconds in addition to this timeout. For real token timeout used by totem it's possible to read cmap value of .B runtime.config.totem.token key. Be careful to use the same timeout values on each of the nodes in the cluster or unpredictable results may occur. The default is 3000 milliseconds. .TP token_warning Specifies the interval between warnings that the token has not been received. The value is a percentage of the token timeout and can be set to 0 to disable warnings. The default is 75%. .TP token_coefficient This value is used only when .B nodelist section is specified and contains at least 3 nodes. If so, real token timeout is then computed as token + (number_of_nodes - 2) * token_coefficient. This allows cluster to scale without manually changing token timeout every time new node is added. This value can be set to 0 resulting in effective removal of this feature. The default is 650 milliseconds. .TP token_retransmit This timeout specifies in milliseconds after how long before receiving a token the token is retransmitted. This will be automatically calculated if token is modified. It is not recommended to alter this value without guidance from the corosync community. The minimum is 30 milliseconds. If not set and error occur, make sure token / (token_retransmits_before_loss_const + 0.2) is more than 30. The default is 238 milliseconds for two nodes cluster. Three or more nodes reference .B token_coefficient. .TP knet_compression_model Type of compression used by Kronosnet. Supported values depend on the libknet build and on the installed compression libraries. Typically zlib and lz4 will be available but bzip2 and others could also be allowed. The default is 'none'. .TP knet_compression_threshold Tells knet to NOT compress any packets that are smaller than the value indicated. Default 100 bytes. Set to 0 to reset to the default. Set to 1 to compress everything. .TP knet_compression_level Many compression libraries allow tuning of compression parameters. For example 0 or 1 ... 9 are commonly used to determine the level of compression. This value is passed unmodified to the compression library so it is recommended to consult the library's documentation for more detailed information. .TP hold This timeout specifies in milliseconds how long the token should be held by the representative when the protocol is under low utilization. It is not recommended to alter this value without guidance from the corosync community. The default is 180 milliseconds. .TP token_retransmits_before_loss_const This value identifies how many token retransmits should be attempted before forming a new configuration. It is also used for token_retransmit and hold calculations. The default is 4 retransmissions. .TP join This timeout specifies in milliseconds how long to wait for join messages in the membership protocol. The default is 50 milliseconds. .TP send_join This timeout specifies in milliseconds an upper range between 0 and send_join to wait before sending a join message. For configurations with less than 32 nodes, this parameter is not necessary. For larger rings, this parameter is necessary to ensure the NIC is not overflowed with join messages on formation of a new ring. A reasonable value for large rings (128 nodes) would be 80msec. Other timer values must also change if this value is changed. Seek advice from the corosync mailing list if trying to run larger configurations. The default is 0 milliseconds. .TP consensus This timeout specifies in milliseconds how long to wait for consensus to be achieved before starting a new round of membership configuration. The minimum value for consensus must be 1.2 * token. This value will be automatically calculated at 1.2 * token if the user doesn't specify a consensus value. For two node clusters, a consensus larger than the join timeout but less than token is safe. For three node or larger clusters, consensus should be larger than token. There is an increasing risk of odd membership changes, which still guarantee virtual synchrony, as node count grows if consensus is less than token. The default is 1200 milliseconds. .TP merge This timeout specifies in milliseconds how long to wait before checking for a partition when no multicast traffic is being sent. If multicast traffic is being sent, the merge detection happens automatically as a function of the protocol. The default is 200 milliseconds. .TP downcheck This timeout specifies in milliseconds how long to wait before checking that a network interface is back up after it has been downed. The default is 1000 milliseconds. .TP fail_recv_const This constant specifies how many rotations of the token without receiving any of the messages when messages should be received may occur before a new configuration is formed. The default is 2500 failures to receive a message. .TP seqno_unchanged_const This constant specifies how many rotations of the token without any multicast traffic should occur before the hold timer is started. The default is 30 rotations. .TP heartbeat_failures_allowed [HeartBeating mechanism] Configures the optional HeartBeating mechanism for faster failure detection. Keep in mind that engaging this mechanism in lossy networks could cause faulty loss declaration as the mechanism relies on the network for heartbeating. So as a rule of thumb use this mechanism if you require improved failure in low to medium utilized networks. This constant specifies the number of heartbeat failures the system should tolerate before declaring heartbeat failure e.g 3. Also if this value is not set or is 0 then the heartbeat mechanism is not engaged in the system and token rotation is the method of failure detection The default is 0 (disabled). .TP max_network_delay [HeartBeating mechanism] This constant specifies in milliseconds the approximate delay that your network takes to transport one packet from one machine to another. This value is to be set by system engineers and please don't change if not sure as this effects the failure detection mechanism using heartbeat. The default is 50 milliseconds. .TP window_size This constant specifies the maximum number of messages that may be sent on one token rotation. If all processors perform equally well, this value could be large (300), which would introduce higher latency from origination to delivery for very large rings. To reduce latency in large rings(16+), the defaults are a safe compromise. If 1 or more slow processor(s) are present among fast processors, window_size should be no larger than 256000 / netmtu to avoid overflow of the kernel receive buffers. The user is notified of this by the display of a retransmit list in the notification logs. There is no loss of data, but performance is reduced when these errors occur. The default is 50 messages. .TP max_messages This constant specifies the maximum number of messages that may be sent by one processor on receipt of the token. The max_messages parameter is limited to 256000 / netmtu to prevent overflow of the kernel transmit buffers. The default is 17 messages. .TP miss_count_const This constant defines the maximum number of times on receipt of a token a message is checked for retransmission before a retransmission occurs. This parameter is useful to modify for switches that delay multicast packets compared to unicast packets. The default setting works well for nearly all modern switches. The default is 5 messages. .TP knet_pmtud_interval How often the knet PMTUd runs to look for network MTU changes. Value in seconds, default: 30 .TP block_unlisted_ips Allow UDPU and KNET to drop packets from IP addresses that are not known (nodes which don't exist in the nodelist) to corosync. Value is yes or no. This feature is mainly to protect against the joining of nodes with outdated configurations after a cluster split. Another use case is to allow the atomic merge of two independent clusters. Changing the default value is not recommended, the overhead is tiny and an existing cluster may fail if corosync is started on an unlisted node with an old configuration. The default value is yes. +.TP +cancel_token_hold_on_retransmit +Allows Corosync to hold token by representative when there is too much +retransmit messages. This allows network to process increased load without +overloading it. Used mechanism is same as described for +.B hold +directive. + +Some deployments may prefer to never hold token when there is +retransmit messages. If so, option should be set to yes. + +The default value is no. + .PP Within the .B logging directive, there are several configuration options which are all optional. .PP The following 3 options are valid only for the top level logging directive: .TP timestamp This specifies that a timestamp is placed on all log messages. It can be one of off (no timestamp), on (second precision timestamp) or hires (millisecond precision timestamp - only when supported by LibQB). The default is hires (or on if hires is not supported). .TP fileline This specifies that file and line should be printed. The default is off. .TP function_name This specifies that the code function name should be printed. The default is off. .TP blackbox This specifies that blackbox functionality should be enabled. The default is on. .PP The following options are valid both for top level logging directive and they can be overridden in logger_subsys entries. .TP to_stderr .TP to_logfile .TP to_syslog These specify the destination of logging output. Any combination of these options may be specified. Valid options are .B yes and .B no. The default is syslog and stderr. Please note, if you are using to_logfile and want to rotate the file, use logrotate(8) with the option .B copytruncate. eg. .ne 18 .RS .nf .ft CW /var/log/corosync.log { missingok compress notifempty daily rotate 7 copytruncate } .ft .fi .RE .TP logfile If the .B to_logfile directive is set to .B yes , this option specifies the pathname of the log file. No default. .TP logfile_priority This specifies the logfile priority for this particular subsystem. Ignored if debug is on. Possible values are: alert, crit, debug (same as debug = on), emerg, err, info, notice, warning. The default is: info. .TP syslog_facility This specifies the syslog facility type that will be used for any messages sent to syslog. options are daemon, local0, local1, local2, local3, local4, local5, local6 & local7. The default is daemon. .TP syslog_priority This specifies the syslog level for this particular subsystem. Ignored if debug is on. Possible values are: alert, crit, debug (same as debug = on), emerg, err, info, notice, warning. The default is: info. .TP debug This specifies whether debug output is logged for this particular logger. Also can contain value trace, what is highest level of debug information. The default is off. .PP Within the .B logging directive, logger_subsys directives are optional. .PP Within the .B logger_subsys sub-directive, all of the above logging configuration options are valid and can be used to override the default settings. The subsys entry, described below, is mandatory to identify the subsystem. .TP subsys This specifies the subsystem identity (name) for which logging is specified. This is the name used by a service in the log_init() call. E.g. 'CPG'. This directive is required. .PP Within the .B quorum directive it is possible to specify the quorum algorithm to use with the .TP provider directive. At the time of writing only corosync_votequorum is supported. See votequorum(5) for configuration options. .PP Within the .B nodelist directive it is possible to specify specific information about nodes in cluster. Directive can contain only .B node sub-directive, which specifies every node that should be a member of the membership, and where non-default options are needed. Every node must have at least ring0_addr field filled. Every node that should be a member of the membership must be specified. Possible options are: .TP ringX_addr This specifies IP or network hostname address of the particular node. X is a link number. .TP nodeid This configuration option is required for each node for Kronosnet mode. It is a 32 bit value specifying the node identifier delivered to the cluster membership service. The node identifier value of zero is reserved and should not be used. If knet is set, this field must be set. .TP name This option is used mainly with knet transport to identify local node. It's also used by client software (pacemaker). Algorithm for identifying local node is following: .RS .IP 1. Looks up $HOSTNAME in the nodelist .IP 2. If this fails strip the domain name from $HOSTNAME and looks up that in the nodelist .IP 3. If this fails look in the nodelist for a fully-qualified name whose short version matches the short version of $HOSTNAME .IP 4. If all this fails then search the interfaces list for an address that matches a name in the nodelist .RE .PP Within the .B system directive it is possible to specify system options. Possible options are: .TP qb_ipc_type This specifies type of IPC to use. Can be one of native (default), shm and socket. Native means one of shm or socket, depending on what is supported by OS. On systems with support for both, SHM is selected. SHM is generally faster, but need to allocate ring buffer file in /dev/shm. .TP sched_rr Should be set to yes (default) if corosync should try to set round robin realtime scheduling with maximal priority to itself. When setting of scheduler fails, fallback to set maximal priority. .TP priority Set priority of corosync process. Valid only when sched_rr is set to no. Can be ether numeric value with similar meaning as .BR nice (1) or .B max / .B min meaning maximal / minimal priority (so minimal / maximal nice value). .TP move_to_root_cgroup Can be one of .B yes (Corosync always moves itself to root cgroup), .B no (Corosync never tries to move itself to root cgroup) or .B auto (Corosync first checks if sched_rr is enabled, and if so, it tries to set round robin realtime scheduling with maximal priority to itself. If setting of priority fails, corosync tries to move itself to root cgroup and retries setting of priority). This feature is available only for systems with cgroups v1 with RT sched enabled (Linux with CONFIG_RT_GROUP_SCHED kernel option) and cgroups v2. It's worth noting that currently (May 3 2021) cgroup2 doesn’t yet support control of realtime processes and the cpu controller can only be enabled when all RT processes are in the root cgroup (applies only for kernel with CONFIG_RT_GROUP_SCHED enabled). So when move_to_root_cgroup is disabled, kernel is compiled with CONFIG_RT_GROUP_SCHED and systemd is used, it may be impossible to make systemd options like CPUQuota working correctly until corosync is stopped. Also when moving to root cgroup is enforced and used together with cgroup2 and systemd it makes impossible (most of the time) for journald to add systemd specific metadata (most importantly _SYSTEMD_UNIT) properly, because corosync is moved out of cgroup created by systemd. This means it is not possible to filter corosync logged messages based on these metadata (for example using -u or _SYSTEMD_UNIT=UNIT pattern) and also running systemctl status doesn't display (all) corosync log messages. The problem is even worse because journald caches pid for some time (approx. 5 sec) so initial corosync messages have correct metadata. .TP allow_knet_handle_fallback If knet handle creation fails using privileged operations, allow fallback to creating knet handle using unprivileged operations. Defaults to no, meaning if privileged knet handle creation fails, corosync will refuse to start. The knet handle will always be created using privileged operations if possible, setting this to yes only allows fallback to unprivileged operations. This fallback may result in performance issues, but if running in an unprivileged environment, e.g. as a normal user or in unprivileged container, this may be required. .TP state_dir Existing directory where corosync should chdir into. Corosync stores important state files and blackboxes there. The default is /var/lib/corosync. .PP Within the .B resources directive it is possible to specify options for resources. Possible option is: .TP watchdog_device (Valid only if Corosync was compiled with watchdog support.) .br Watchdog device to use, for example /dev/watchdog. If unset, empty or "off", no watchdog is used. .IP In a cluster with properly configured power fencing a watchdog provides no additional value. On the other hand, slow watchdog communication may incur multi-second delays in the Corosync main loop, potentially breaking down membership. IPMI watchdogs are particularly notorious in this regard: read about kipmid_max_busy_us in IPMI.txt in the Linux kernel documentation. .PP Within the .B nozzle directive it is possible to specify options for a libnozzle device. This is a pseudo ethernet device that routes network traffic through a channel on the corosync knet network (NOT cpg or any corosync internal service) to other nodes in the cluster. This allows applications to take advantage of knet features such as multipathing, automatic failover, link switching etc. Note that libnozzle is not a reliable transport, but you can tunnel TCP through it for reliable communications. .br libnozzle also supports optional interface up/down scripts that are kept under a /etc/corosync/updown.d/ directory. See the knet documentation for more information. .br Only one nozzle device is allowed. .br The nozzle stanza takes several options: .TP name The name of the network device to be created. On Linux this may be any name at all, other platforms have restrictions on the name. .TP ipaddr The IP address (IPv6 or IPv4) of the interface. The bottom part of this address will be replaced by the local node's nodeid in conjunction with ipprefix. so, eg ipaddr: 192.168.1.0 ipprefix: 24 will make nodeids 1,2,5 use IP addresses 192.168.1.1, 192.168.1.2 & 192.168.1.5. If a prefix length of 16 is used then the bottom two bytes will be filled in with nodeid numbers. IPv6 addresses must end in '::', the nodeid will be added after the two colons to make the local IP address. Only one IP address is currently supported in the corosync.conf file. Additional IP addresses can be added in the ifup script if necessary. .TP ipprefix specifies the IP address prefix for the nozzle device (see above) .TP macaddr Specifies the MAC address prefix for the nozzle device. As for the IP address, the bottom part of the MAC address will be filled in with the node id. In this case no prefix applies, the bottom two bytes of the MAC address will always be overwritten with the node id. So specifying macaddr: 54:54:12:24:12:12 on nodeid 1 will result in it having a MAC address of 54:54:12:24:00:01 .SH "TO ADD A NEW NODE TO THE CLUSTER" For example to add a node with address 10.24.38.108 with nodeid 3. The node has the name NEW (in DNS or /etc/hosts) and is not currently running corosync. The current corosync.conf nodelist looks like this: .PP .nf .RS nodelist { node { nodeid: 1 ring0_addr: 10.24.38.101 name: node1 } node { nodeid: 2 ring0_addr: 10.24.38.102 name: node2 } } .RE .fi .PP Add a new entry for the node below the existing nodes. Node entries don't have to be in nodeid order, but it will help keep you sane. So the nodelist now looks like this: .PP .nf .RS nodelist { node { nodeid: 1 ring0_addr: 10.24.38.101 name: node1 } node { nodeid: 2 ring0_addr: 10.24.38.102 name: node2 } node { nodeid: 3 ring0_addr: 10.24.38.108 name: NEW } } .RE .fi .PP .PP This file must then be copied onto all three nodes - the existing two nodes, and the new one. On one of the existing corosync nodes, tell corosync to re-read the updated config file into memory: .PP .nf .RS corosync-cfgtool -R .RE .fi .PP This command only needs to be run on one node in the cluster. You may then start corosync on the NEW node and it should join the cluster. If this doesn't work as expected then check the communications between all three nodes is working, and check the syslog files on all nodes for more information. It's important to note that the key bit of information about a node failing to join might be on a different node than you expect. .SH "TO REMOVE A NODE FROM THE CLUSTER" This is the reverse procedure to 'Adding a node' above. First you need to shut down the node you will be removing from the cluster. .PP .nf .RS corosync-cfgtool -H .RE .fi .PP Then delete the nodelist stanza from corosync.conf and finally update corosync on the remaining nodes by running .PP .nf .RS corosync-cfgtool -R .RE .fi .TP on one of them. .SH "ADDRESS RESOLUTION" corosync resolves ringX_addr names/IP addresses using the getaddrinfo(3) call with respect of totem.ip_version setting. getaddrinfo() function uses a sophisticated algorithm to sort node addresses into a preferred order and corosync always chooses the first address in that list of the required family. As such it is essential that your DNS or /etc/hosts files are correctly configured so that all addresses for ringX appear on the same network (or are reachable with minimal hops) and over the same IP protocol. If this is not the case then some nodes might not be able to join the cluster. It is possible to override the search order used by getaddrinfo() using the configuration file /etc/gai.conf(5) if necessary, but this is not recommended. If there is any doubt about the order of addresses returned from getaddrinfo() then it might be simpler to use IP addresses (v4 or v6) in the ringX_addr field. .SH "FILES" .TP /etc/corosync/corosync.conf The corosync executive configuration file. .SH "SEE ALSO" .BR corosync_overview (7), .BR votequorum (5), .BR corosync-qdevice (8), .BR logrotate (8) .BR getaddrinfo (3) .BR gai.conf (5) .PP