diff --git a/exec/totemknet.c b/exec/totemknet.c index 41dd0cc0..ef36b807 100644 --- a/exec/totemknet.c +++ b/exec/totemknet.c @@ -1,1146 +1,1146 @@ /* * Copyright (c) 2016 Red Hat, Inc. * * All rights reserved. * * Author: Christine Caulfield (ccaulfie@redhat.com) * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "totemknet.h" #include "util.h" #include #include #include #include #include #ifndef MSG_NOSIGNAL #define MSG_NOSIGNAL 0 #endif #define MCAST_SOCKET_BUFFER_SIZE (TRANSMITS_ALLOWED * FRAME_SIZE_MAX) /* Buffers for sendmmsg/recvmmsg */ #define MAX_BUFFERS 10 /* Should match that used by cfg */ #define CFG_INTERFACE_STATUS_MAX_LEN 512 struct totemknet_instance { struct crypto_instance *crypto_inst; qb_loop_t *poll_handle; knet_handle_t knet_handle; int link_mode; void *context; void (*totemknet_deliver_fn) ( void *context, const void *msg, unsigned int msg_len); void (*totemknet_iface_change_fn) ( void *context, const struct totem_ip_address *iface_address, unsigned int link_no); void (*totemknet_mtu_changed) ( void *context, int net_mtu); void (*totemknet_target_set_completed) (void *context); /* * Function and data used to log messages */ int totemknet_log_level_security; int totemknet_log_level_error; int totemknet_log_level_warning; int totemknet_log_level_notice; int totemknet_log_level_debug; int totemknet_subsys_id; int knet_subsys_id; void (*totemknet_log_printf) ( int level, int subsys, const char *function, const char *file, int line, const char *format, ...)__attribute__((format(printf, 6, 7))); void *knet_context; char iov_buffer[MAX_BUFFERS][FRAME_SIZE_MAX]; int stats_sent; int stats_recv; int stats_delv; int stats_remcasts; int stats_orf_token; struct timeval stats_tv_start; char *link_status[INTERFACE_MAX]; int num_links; struct totem_ip_address my_ids[INTERFACE_MAX]; uint16_t ip_port[INTERFACE_MAX]; int our_nodeid; struct totem_config *totem_config; totemsrp_stats_t *stats; struct totem_ip_address token_target; qb_loop_timer_handle timer_netif_check_timeout; qb_loop_timer_handle timer_merge_detect_timeout; int send_merge_detect_message; unsigned int merge_detect_messages_sent_before_timeout; int logpipes[2]; int knet_fd; }; struct work_item { const void *msg; unsigned int msg_len; struct totemknet_instance *instance; }; int totemknet_member_list_rebind_ip ( void *knet_context); static void totemknet_start_merge_detect_timeout( void *knet_context); static void totemknet_stop_merge_detect_timeout( void *knet_context); static void totemknet_instance_initialize (struct totemknet_instance *instance) { memset (instance, 0, sizeof (struct totemknet_instance)); } #define knet_log_printf(level, format, args...) \ do { \ instance->totemknet_log_printf ( \ level, instance->totemknet_subsys_id, \ __FUNCTION__, __FILE__, __LINE__, \ (const char *)format, ##args); \ } while (0); #define libknet_log_printf(level, format, args...) \ do { \ instance->totemknet_log_printf ( \ level, instance->knet_subsys_id, \ __FUNCTION__, "libknet.h", __LINE__, \ (const char *)format, ##args); \ } while (0); #define KNET_LOGSYS_PERROR(err_num, level, fmt, args...) \ do { \ char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \ const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \ instance->totemknet_log_printf ( \ level, instance->totemknet_subsys_id, \ __FUNCTION__, __FILE__, __LINE__, \ fmt ": %s (%d)", ##args, _error_ptr, err_num); \ } while(0) static int dst_host_filter_callback_fn(void *private_data, const unsigned char *outdata, ssize_t outdata_len, uint8_t tx_rx, uint16_t this_host_id, uint16_t src_host_id, int8_t *channel, uint16_t *dst_host_ids, size_t *dst_host_ids_entries) { struct totem_message_header *header = (struct totem_message_header *)outdata; int res; *channel = 0; if (header->target_nodeid) { dst_host_ids[0] = header->target_nodeid; *dst_host_ids_entries = 1; res = 0; /* unicast message */ } else { *dst_host_ids_entries = 0; res = 1; /* multicast message */ } return res; } static void socket_error_callback_fn(void *private_data, int datafd, int8_t channel, uint8_t tx_rx, int error, int errorno) { struct totemknet_instance *instance = (struct totemknet_instance *)private_data; knet_log_printf (LOGSYS_LEVEL_DEBUG, "Knet socket ERROR notification called: txrx=%d, error=%d, errorno=%d", tx_rx, error, errorno); if ((error == -1 && errorno != EAGAIN) || (error == 0)) { knet_handle_remove_datafd(instance->knet_handle, datafd); } } static void host_change_callback_fn(void *private_data, uint16_t host_id, uint8_t reachable, uint8_t remote, uint8_t external) { struct totemknet_instance *instance = (struct totemknet_instance *)private_data; // TODO: what? if anything. knet_log_printf (LOGSYS_LEVEL_DEBUG, "Knet host change callback. nodeid: %d reachable: %d", host_id, reachable); } static void pmtu_change_callback_fn(void *private_data, unsigned int data_mtu) { struct totemknet_instance *instance = (struct totemknet_instance *)private_data; knet_log_printf (LOGSYS_LEVEL_DEBUG, "Knet pMTU change: %d", data_mtu); // TODO: Check this instance->totemknet_mtu_changed(instance->context, data_mtu - totemip_udpip_header_size(AF_INET)); } int totemknet_crypto_set ( void *knet_context, const char *cipher_type, const char *hash_type) { return (0); } static inline void ucast_sendmsg ( struct totemknet_instance *instance, struct totem_ip_address *system_to, const void *msg, unsigned int msg_len) { int res = 0; struct totem_message_header *header = (struct totem_message_header *)msg; header->target_nodeid = system_to->nodeid; /* * Transmit unicast message * An error here is recovered by totemsrp */ /* * If sending to ourself then just pass it through knet back to * the receive fn. knet does not do local->local delivery */ if (system_to->nodeid == instance->our_nodeid) { res = write (instance->knet_fd+1, msg, msg_len); if (res < 0) { KNET_LOGSYS_PERROR (errno, instance->totemknet_log_level_debug, "sendmsg(ucast-local) failed (non-critical)"); } } else { res = write (instance->knet_fd, msg, msg_len); if (res < 0) { KNET_LOGSYS_PERROR (errno, instance->totemknet_log_level_debug, "sendmsg(ucast) failed (non-critical)"); } } } static inline void mcast_sendmsg ( struct totemknet_instance *instance, const void *msg, unsigned int msg_len, int only_active) { int res; struct totem_message_header *header = (struct totem_message_header *)msg; header->target_nodeid = 0; // log_printf (LOGSYS_LEVEL_DEBUG, "totemknet: mcast_sendmsg. only_active=%d, len=%d", only_active, msg_len); res = write (instance->knet_fd, msg, msg_len); if (res < msg_len) { knet_log_printf (LOGSYS_LEVEL_DEBUG, "totemknet: mcast_send writev returned %d", res); } /* * Also send it to ourself, directly into * the receive fn. knet does not to local->local delivery */ res = write (instance->knet_fd+1, msg, msg_len); if (res < msg_len) { knet_log_printf (LOGSYS_LEVEL_DEBUG, "totemknet: mcast_send writev (local) returned %d", res); } if (!only_active || instance->send_merge_detect_message) { /* * Current message was sent to all nodes */ instance->merge_detect_messages_sent_before_timeout++; instance->send_merge_detect_message = 0; } } static int node_compare(const void *aptr, const void *bptr) { uint16_t a,b; a = *(uint16_t *)aptr; b = *(uint16_t *)bptr; return a > b; } int totemknet_ifaces_get (void *knet_context, char ***status, unsigned int *iface_count) { struct totemknet_instance *instance = (struct totemknet_instance *)knet_context; struct knet_link_status link_status; uint16_t host_list[KNET_MAX_HOST]; size_t num_hosts; int i,j; char *ptr; int res = 0; /* * Don't do the whole 'link_info' bit if the caller just wants * a count of interfaces. */ if (status) { res = knet_host_get_host_list(instance->knet_handle, host_list, &num_hosts); if (res) { return (-1); } qsort(host_list, num_hosts, sizeof(uint16_t), node_compare); /* num_links is actually the highest link ID */ for (i=0; i <= instance->num_links; i++) { ptr = instance->link_status[i]; for (j=0; jknet_handle, host_list[j], i, &link_status); if (res == 0) { ptr[j] = '0' + (link_status.enabled | link_status.connected<<1 | link_status.dynconnected<<2); } else { ptr[j] += '?'; } } ptr[num_hosts] = '\0'; } *status = instance->link_status; } *iface_count = instance->num_links+1; return (res); } int totemknet_finalize ( void *knet_context) { struct totemknet_instance *instance = (struct totemknet_instance *)knet_context; int res = 0; knet_log_printf(LOG_DEBUG, "totemknet: finalize"); qb_loop_poll_del (instance->poll_handle, instance->logpipes[0]); qb_loop_poll_del (instance->poll_handle, instance->knet_fd); knet_handle_free(instance->knet_handle); totemknet_stop_merge_detect_timeout(instance); return (res); } static int log_deliver_fn ( int fd, int revents, void *data) { struct totemknet_instance *instance = (struct totemknet_instance *)data; char buffer[KNET_MAX_LOG_MSG_SIZE*4]; char *bufptr = buffer; int done = 0; int len; len = read(fd, buffer, sizeof(buffer)); while (done < len) { struct knet_log_msg *msg = (struct knet_log_msg *)bufptr; switch (msg->msglevel) { case KNET_LOG_ERR: libknet_log_printf (LOGSYS_LEVEL_ERROR, "%s", msg->msg); break; case KNET_LOG_WARN: libknet_log_printf (LOGSYS_LEVEL_WARNING, "%s", msg->msg); break; case KNET_LOG_INFO: libknet_log_printf (LOGSYS_LEVEL_INFO, "%s", msg->msg); break; case KNET_LOG_DEBUG: libknet_log_printf (LOGSYS_LEVEL_DEBUG, "%s", msg->msg); break; } bufptr += KNET_MAX_LOG_MSG_SIZE; done += KNET_MAX_LOG_MSG_SIZE; } return 0; } static int data_deliver_fn ( int fd, int revents, void *data) { struct totemknet_instance *instance = (struct totemknet_instance *)data; struct mmsghdr msg_recv[MAX_BUFFERS]; struct iovec iov_recv[MAX_BUFFERS]; struct sockaddr_storage system_from; int msgs_received; int i; for (i=0; iiov_buffer[i]; iov_recv[i].iov_len = FRAME_SIZE_MAX; msg_recv[i].msg_hdr.msg_name = &system_from; msg_recv[i].msg_hdr.msg_namelen = sizeof (struct sockaddr_storage); msg_recv[i].msg_hdr.msg_iov = &iov_recv[i]; msg_recv[i].msg_hdr.msg_iovlen = 1; #ifdef HAVE_MSGHDR_CONTROL msg_recv[i].msg_hdr.msg_control = 0; #endif #ifdef HAVE_MSGHDR_CONTROLLEN msg_recv[i].msg_hdr.msg_controllen = 0; #endif #ifdef HAVE_MSGHDR_FLAGS msg_recv[i].msg_hdr.msg_flags = 0; #endif #ifdef HAVE_MSGHDR_ACCRIGHTS msg_recv[i].msg_hdr.msg_accrights = NULL; #endif #ifdef HAVE_MSGHDR_ACCRIGHTSLEN msg_recv[i].msg_hdr.msg_accrightslen = 0; #endif } msgs_received = recvmmsg (fd, msg_recv, MAX_BUFFERS, MSG_NOSIGNAL | MSG_DONTWAIT, NULL); if (msgs_received == -1) { return (0); } for (i=0; istats_recv += msg_recv[i].msg_len; /* * Handle incoming message */ instance->totemknet_deliver_fn ( instance->context, instance->iov_buffer[i], msg_recv[i].msg_len); } return (0); } static void timer_function_netif_check_timeout ( void *data) { struct totemknet_instance *instance = (struct totemknet_instance *)data; int i; for (i=0; itotem_config->interface_count; i++) instance->totemknet_iface_change_fn (instance->context, &instance->my_ids[i], i); } static void totemknet_refresh_config( int32_t event, const char *key_name, struct icmap_notify_value new_val, struct icmap_notify_value old_val, void *user_data) { uint8_t reloading; uint32_t value; uint32_t link_no; size_t num_nodes; uint16_t host_ids[KNET_MAX_HOST]; int i; int err; char path[ICMAP_KEYNAME_MAXLEN]; struct totemknet_instance *instance = (struct totemknet_instance *)user_data; ENTER(); /* * If a full reload is in progress then don't do anything until it's done and * can reconfigure it all atomically */ if (icmap_get_uint8("config.totemconfig_reload_in_progress", &reloading) == CS_OK && reloading) { return; } if (icmap_get_uint32("totem.knet_pmtud_interval", &value) == CS_OK) { instance->totem_config->knet_pmtud_interval = value; knet_log_printf (LOGSYS_LEVEL_DEBUG, "knet_pmtud_interval now %d", value); err = knet_handle_pmtud_setfreq(instance->knet_handle, instance->totem_config->knet_pmtud_interval); if (err) { KNET_LOGSYS_PERROR(errno, LOGSYS_LEVEL_WARNING, "knet_handle_pmtud_setfreq failed"); } } /* Get link parameters */ for (i = 0; i <= instance->num_links; i++) { sprintf(path, "totem.interface.%d.knet_link_priority", i); if (icmap_get_uint32(path, &value) == CS_OK) { instance->totem_config->interfaces[i].knet_link_priority = value; knet_log_printf (LOGSYS_LEVEL_DEBUG, "knet_link_priority on link %d now %d", i, value); } sprintf(path, "totem.interface.%d.knet_ping_interval", i); if (icmap_get_uint32(path, &value) == CS_OK) { instance->totem_config->interfaces[i].knet_ping_interval = value; knet_log_printf (LOGSYS_LEVEL_DEBUG, "knet_ping_interval on link %d now %d", i, value); } sprintf(path, "totem.interface.%d.knet_ping_timeout", i); if (icmap_get_uint32(path, &value) == CS_OK) { instance->totem_config->interfaces[i].knet_ping_timeout = value; knet_log_printf (LOGSYS_LEVEL_DEBUG, "knet_ping_timeout on link %d now %d", i, value); } sprintf(path, "totem.interface.%d.knet_ping_precision", i); if (icmap_get_uint32(path, &value) == CS_OK) { instance->totem_config->interfaces[i].knet_ping_precision = value; knet_log_printf (LOGSYS_LEVEL_DEBUG, "knet_ping_precision on link %d now %d", i, value); } sprintf(path, "totem.interface.%d.knet_pong_count", i); if (icmap_get_uint32(path, &value) == CS_OK) { instance->totem_config->interfaces[i].knet_pong_count = value; knet_log_printf (LOGSYS_LEVEL_DEBUG, "knet_pong_count on link %d now %d", i, value); } } /* Configure link parameters for each node */ err = knet_host_get_host_list(instance->knet_handle, host_ids, &num_nodes); if (err != 0) { KNET_LOGSYS_PERROR(errno, LOGSYS_LEVEL_ERROR, "knet_host_get_host_list failed"); } for (i=0; inum_links; link_no++) { err = knet_link_set_ping_timers(instance->knet_handle, host_ids[i], link_no, instance->totem_config->interfaces[link_no].knet_ping_interval, instance->totem_config->interfaces[link_no].knet_ping_timeout, instance->totem_config->interfaces[link_no].knet_ping_precision); if (err) { KNET_LOGSYS_PERROR(errno, LOGSYS_LEVEL_ERROR, "knet_link_set_ping_timers for node %d link %d failed", host_ids[i], link_no); } err = knet_link_set_pong_count(instance->knet_handle, host_ids[i], link_no, instance->totem_config->interfaces[link_no].knet_pong_count); if (err) { KNET_LOGSYS_PERROR(errno, LOGSYS_LEVEL_ERROR, "knet_link_set_pong_count for node %d link %d failed",host_ids[i], link_no); } err = knet_link_set_priority(instance->knet_handle, host_ids[i], link_no, instance->totem_config->interfaces[link_no].knet_link_priority); if (err) { KNET_LOGSYS_PERROR(errno, LOGSYS_LEVEL_ERROR, "knet_link_set_priority for node %d link %d failed", host_ids[i], link_no); } } } LEAVE(); } static void totemknet_add_config_notifications(struct totemknet_instance *instance) { icmap_track_t icmap_track_totem = NULL; icmap_track_t icmap_track_reload = NULL; ENTER(); icmap_track_add("totem.", ICMAP_TRACK_ADD | ICMAP_TRACK_DELETE | ICMAP_TRACK_MODIFY | ICMAP_TRACK_PREFIX, totemknet_refresh_config, instance, &icmap_track_totem); icmap_track_add("config.totemconfig_reload_in_progress", ICMAP_TRACK_ADD | ICMAP_TRACK_MODIFY, totemknet_refresh_config, instance, &icmap_track_reload); LEAVE(); } /* * Create an instance */ int totemknet_initialize ( qb_loop_t *poll_handle, void **knet_context, struct totem_config *totem_config, totemsrp_stats_t *stats, void *context, void (*deliver_fn) ( void *context, const void *msg, unsigned int msg_len), void (*iface_change_fn) ( void *context, const struct totem_ip_address *iface_address, unsigned int link_no), void (*mtu_changed) ( void *context, int net_mtu), void (*target_set_completed) ( void *context)) { struct totemknet_instance *instance; int8_t channel=0; int res; int i; instance = malloc (sizeof (struct totemknet_instance)); if (instance == NULL) { return (-1); } totemknet_instance_initialize (instance); instance->totem_config = totem_config; instance->stats = stats; /* * Configure logging */ instance->totemknet_log_level_security = 1; //totem_config->totem_logging_configuration.log_level_security; instance->totemknet_log_level_error = totem_config->totem_logging_configuration.log_level_error; instance->totemknet_log_level_warning = totem_config->totem_logging_configuration.log_level_warning; instance->totemknet_log_level_notice = totem_config->totem_logging_configuration.log_level_notice; instance->totemknet_log_level_debug = totem_config->totem_logging_configuration.log_level_debug; instance->totemknet_subsys_id = totem_config->totem_logging_configuration.log_subsys_id; instance->totemknet_log_printf = totem_config->totem_logging_configuration.log_printf; instance->knet_subsys_id = _logsys_subsys_create("KNET", "libknet.h"); /* * Initialize local variables for totemknet */ instance->our_nodeid = instance->totem_config->node_id; for (i=0; i< instance->totem_config->interface_count; i++) { totemip_copy(&instance->my_ids[i], &totem_config->interfaces[i].bindnet); instance->my_ids[i].nodeid = instance->our_nodeid; instance->ip_port[i] = totem_config->interfaces[i].ip_port; /* Needed for totemsrp */ totem_config->interfaces[i].boundto.nodeid = instance->our_nodeid; } instance->poll_handle = poll_handle; instance->context = context; instance->totemknet_deliver_fn = deliver_fn; instance->totemknet_iface_change_fn = iface_change_fn; instance->totemknet_mtu_changed = mtu_changed; instance->totemknet_target_set_completed = target_set_completed; pipe(instance->logpipes); fcntl(instance->logpipes[0], F_SETFL, O_NONBLOCK); fcntl(instance->logpipes[1], F_SETFL, O_NONBLOCK); instance->knet_handle = knet_handle_new(instance->totem_config->node_id, instance->logpipes[1], KNET_LOG_DEBUG); if (!instance->knet_handle) { KNET_LOGSYS_PERROR(errno, LOGSYS_LEVEL_CRIT, "knet_handle_new failed"); return (-1); } res = knet_handle_pmtud_setfreq(instance->knet_handle, instance->totem_config->knet_pmtud_interval); if (res) { KNET_LOGSYS_PERROR(errno, LOGSYS_LEVEL_WARNING, "knet_handle_pmtud_setfreq failed"); } res = knet_handle_enable_filter(instance->knet_handle, instance, dst_host_filter_callback_fn); if (res) { KNET_LOGSYS_PERROR(errno, LOGSYS_LEVEL_WARNING, "knet_handle_enable_filter failed"); } res = knet_handle_enable_sock_notify(instance->knet_handle, instance, socket_error_callback_fn); if (res) { KNET_LOGSYS_PERROR(errno, LOGSYS_LEVEL_WARNING, "knet_handle_enable_sock_notify failed"); } res = knet_host_enable_status_change_notify(instance->knet_handle, instance, host_change_callback_fn); if (res) { KNET_LOGSYS_PERROR(errno, LOGSYS_LEVEL_WARNING, "knet_host_enable_status_change_notify failed"); } res = knet_handle_enable_pmtud_notify(instance->knet_handle, instance, pmtu_change_callback_fn); if (res) { KNET_LOGSYS_PERROR(errno, LOGSYS_LEVEL_WARNING, "knet_handle_enable_pmtud_notify failed"); } /* Get an fd into knet */ instance->knet_fd = 0; res = knet_handle_add_datafd(instance->knet_handle, &instance->knet_fd, &channel); if (res) { knet_log_printf(LOG_DEBUG, "knet_handle_add_datafd failed: %s", strerror(errno)); return -1; } /* Enable crypto if requested */ if (strcmp(instance->totem_config->crypto_cipher_type, "none") != 0) { struct knet_handle_crypto_cfg crypto_cfg; strcpy(crypto_cfg.crypto_model, "nss"); strcpy(crypto_cfg.crypto_cipher_type, instance->totem_config->crypto_cipher_type); strcpy(crypto_cfg.crypto_hash_type, instance->totem_config->crypto_hash_type); memcpy(crypto_cfg.private_key, instance->totem_config->private_key, instance->totem_config->private_key_len); crypto_cfg.private_key_len = instance->totem_config->private_key_len; res = knet_handle_crypto(instance->knet_handle, &crypto_cfg); if (res == -1) { knet_log_printf(LOG_ERR, "knet_handle_crypto failed: %s", strerror(errno)); return -1; } if (res == -2) { knet_log_printf(LOG_ERR, "knet_handle_crypto failed: -2"); return -1; } knet_log_printf(LOG_INFO, "kronosnet crypto initialized: %s/%s", crypto_cfg.crypto_cipher_type, crypto_cfg.crypto_hash_type); } knet_handle_setfwd(instance->knet_handle, 1); instance->link_mode = KNET_LINK_POLICY_PASSIVE; if (strcmp(instance->totem_config->link_mode, "active")==0) { instance->link_mode = KNET_LINK_POLICY_ACTIVE; } if (strcmp(instance->totem_config->link_mode, "rr")==0) { instance->link_mode = KNET_LINK_POLICY_RR; } for (i=0; ilink_status[i] = malloc(CFG_INTERFACE_STATUS_MAX_LEN); if (!instance->link_status[i]) { return -1; } } qb_loop_poll_add (instance->poll_handle, QB_LOOP_MED, instance->logpipes[0], POLLIN, instance, log_deliver_fn); qb_loop_poll_add (instance->poll_handle, QB_LOOP_HIGH, instance->knet_fd, POLLIN, instance, data_deliver_fn); /* * Upper layer isn't ready to receive message because it hasn't * initialized yet. Add short timer to check the interfaces. */ qb_loop_timer_add (instance->poll_handle, QB_LOOP_MED, 100*QB_TIME_NS_IN_MSEC, (void *)instance, timer_function_netif_check_timeout, &instance->timer_netif_check_timeout); totemknet_start_merge_detect_timeout(instance); /* Start listening for config changes */ totemknet_add_config_notifications(instance); knet_log_printf (LOGSYS_LEVEL_INFO, "totemknet initialized"); *knet_context = instance; return (0); } void *totemknet_buffer_alloc (void) { return malloc (FRAME_SIZE_MAX); } void totemknet_buffer_release (void *ptr) { return free (ptr); } int totemknet_processor_count_set ( void *knet_context, int processor_count) { return (0); } int totemknet_recv_flush (void *knet_context) { return (0); } int totemknet_send_flush (void *knet_context) { return (0); } int totemknet_token_send ( void *knet_context, const void *msg, unsigned int msg_len) { struct totemknet_instance *instance = (struct totemknet_instance *)knet_context; int res = 0; ucast_sendmsg (instance, &instance->token_target, msg, msg_len); return (res); } int totemknet_mcast_flush_send ( void *knet_context, const void *msg, unsigned int msg_len) { struct totemknet_instance *instance = (struct totemknet_instance *)knet_context; int res = 0; mcast_sendmsg (instance, msg, msg_len, 0); return (res); } int totemknet_mcast_noflush_send ( void *knet_context, const void *msg, unsigned int msg_len) { struct totemknet_instance *instance = (struct totemknet_instance *)knet_context; int res = 0; mcast_sendmsg (instance, msg, msg_len, 1); return (res); } extern int totemknet_iface_check (void *knet_context) { struct totemknet_instance *instance = (struct totemknet_instance *)knet_context; int res = 0; knet_log_printf(LOG_DEBUG, "totmeknet: iface_check"); return (res); } extern void totemknet_net_mtu_adjust (void *knet_context, struct totem_config *totem_config) { struct totemknet_instance *instance = (struct totemknet_instance *)knet_context; totem_config->net_mtu -= totemip_udpip_header_size(AF_INET) + 23; knet_log_printf(LOG_DEBUG, "totemknet: Returning MTU of %d", totem_config->net_mtu); } int totemknet_token_target_set ( void *knet_context, const struct totem_ip_address *token_target) { struct totemknet_instance *instance = (struct totemknet_instance *)knet_context; int res = 0; memcpy (&instance->token_target, token_target, sizeof (struct totem_ip_address)); instance->totemknet_target_set_completed (instance->context); return (res); } extern int totemknet_recv_mcast_empty ( void *knet_context) { struct totemknet_instance *instance = (struct totemknet_instance *)knet_context; unsigned int res; struct sockaddr_storage system_from; struct mmsghdr msg_recv[MAX_BUFFERS]; struct iovec iov_recv[MAX_BUFFERS]; struct pollfd ufd; int nfds; int msg_processed = 0; int i; for (i=0; iiov_buffer[i]; iov_recv[i].iov_len = FRAME_SIZE_MAX; msg_recv[i].msg_hdr.msg_name = &system_from; msg_recv[i].msg_hdr.msg_namelen = sizeof (struct sockaddr_storage); msg_recv[i].msg_hdr.msg_iov = &iov_recv[i]; msg_recv[i].msg_hdr.msg_iovlen = 1; #ifdef HAVE_MSGHDR_CONTROL msg_recv[i].msg_hdr.msg_control = 0; #endif #ifdef HAVE_MSGHDR_CONTROLLEN msg_recv[i].msg_hdr.msg_controllen = 0; #endif #ifdef HAVE_MSGHDR_FLAGS msg_recv[i].msg_hdr.msg_flags = 0; #endif #ifdef HAVE_MSGHDR_ACCRIGHTS msg_recv[i].msg_hdr.msg_accrights = NULL; #endif #ifdef HAVE_MSGHDR_ACCRIGHTSLEN msg_recv[i].msg_hdr.msg_accrightslen = 0; #endif } do { ufd.fd = instance->knet_fd; ufd.events = POLLIN; nfds = poll (&ufd, 1, 0); if (nfds == 1 && ufd.revents & POLLIN) { res = recvmmsg (instance->knet_fd, msg_recv, MAX_BUFFERS, MSG_NOSIGNAL | MSG_DONTWAIT, NULL); if (res != -1) { msg_processed = 1; } else { msg_processed = -1; } } } while (nfds == 1); return (msg_processed); } int totemknet_member_add ( void *knet_context, const struct totem_ip_address *local, const struct totem_ip_address *member, int link_no) { struct totemknet_instance *instance = (struct totemknet_instance *)knet_context; int err; int port = instance->ip_port[link_no]; struct sockaddr_storage remote_ss; struct sockaddr_storage local_ss; int addrlen; if (member->nodeid == instance->our_nodeid) { return 0; /* Don't add ourself, we send loopback messages directly */ } /* Keep track of the number of links */ if (link_no > instance->num_links) { instance->num_links = link_no; } knet_log_printf (LOGSYS_LEVEL_DEBUG, "knet: member_add: %d (%s), link=%d", member->nodeid, totemip_print(member), link_no); knet_log_printf (LOGSYS_LEVEL_DEBUG, "knet: local: %d (%s)", local->nodeid, totemip_print(local)); if (link_no == 0) { if (knet_host_add(instance->knet_handle, member->nodeid)) { KNET_LOGSYS_PERROR(errno, LOGSYS_LEVEL_ERROR, "knet_host_add"); return -1; } if (knet_host_set_policy(instance->knet_handle, member->nodeid, instance->link_mode)) { KNET_LOGSYS_PERROR(errno, LOGSYS_LEVEL_ERROR, "knet_set_policy failed"); return -1; } } /* Casts to remove const */ totemip_totemip_to_sockaddr_convert((struct totem_ip_address *)member, port+link_no, &remote_ss, &addrlen); totemip_totemip_to_sockaddr_convert((struct totem_ip_address *)local, port+link_no, &local_ss, &addrlen); - err = knet_link_set_config(instance->knet_handle, member->nodeid, link_no, &local_ss, &remote_ss); + err = knet_link_set_config(instance->knet_handle, member->nodeid, link_no, KNET_TRANSPORT_UDP, &local_ss, &remote_ss); if (err) { KNET_LOGSYS_PERROR(errno, LOGSYS_LEVEL_ERROR, "knet_link_set_config failed"); return -1; } knet_log_printf (LOGSYS_LEVEL_DEBUG, "knet: member_add: Setting link prio to %d", instance->totem_config->interfaces[link_no].knet_link_priority); err = knet_link_set_priority(instance->knet_handle, member->nodeid, link_no, instance->totem_config->interfaces[link_no].knet_link_priority); if (err) { KNET_LOGSYS_PERROR(errno, LOGSYS_LEVEL_ERROR, "knet_link_set_priority for nodeid %d, link %d failed", member->nodeid, link_no); } err = knet_link_set_ping_timers(instance->knet_handle, member->nodeid, link_no, instance->totem_config->interfaces[link_no].knet_ping_interval, instance->totem_config->interfaces[link_no].knet_ping_timeout, instance->totem_config->interfaces[link_no].knet_ping_precision); if (err) { KNET_LOGSYS_PERROR(errno, LOGSYS_LEVEL_ERROR, "knet_link_set_ping_timers for nodeid %d, link %d failed", member->nodeid, link_no); } err = knet_link_set_pong_count(instance->knet_handle, member->nodeid, link_no, instance->totem_config->interfaces[link_no].knet_pong_count); if (err) { KNET_LOGSYS_PERROR(errno, LOGSYS_LEVEL_ERROR, "knet_link_set_pong_count for nodeid %d, link %d failed", member->nodeid, link_no); } err = knet_link_set_enable(instance->knet_handle, member->nodeid, link_no, 1); if (err) { KNET_LOGSYS_PERROR(errno, LOGSYS_LEVEL_ERROR, "knet_link_set_enable for nodeid %d, link %d failed", member->nodeid, link_no); return -1; } return (0); } int totemknet_member_remove ( void *knet_context, const struct totem_ip_address *token_target, int link_no) { struct totemknet_instance *instance = (struct totemknet_instance *)knet_context; return knet_host_remove(instance->knet_handle, token_target->nodeid); } int totemknet_member_list_rebind_ip ( void *knet_context) { return (0); } static void timer_function_merge_detect_timeout ( void *data) { struct totemknet_instance *instance = (struct totemknet_instance *)data; if (instance->merge_detect_messages_sent_before_timeout == 0) { instance->send_merge_detect_message = 1; } instance->merge_detect_messages_sent_before_timeout = 0; totemknet_start_merge_detect_timeout(instance); } static void totemknet_start_merge_detect_timeout( void *knet_context) { struct totemknet_instance *instance = (struct totemknet_instance *)knet_context; qb_loop_timer_add(instance->poll_handle, QB_LOOP_MED, instance->totem_config->merge_timeout * 2 * QB_TIME_NS_IN_MSEC, (void *)instance, timer_function_merge_detect_timeout, &instance->timer_merge_detect_timeout); } static void totemknet_stop_merge_detect_timeout( void *knet_context) { struct totemknet_instance *instance = (struct totemknet_instance *)knet_context; qb_loop_timer_del(instance->poll_handle, instance->timer_merge_detect_timeout); }