diff --git a/libknet/host.c b/libknet/host.c index abb1f894..85d4626b 100644 --- a/libknet/host.c +++ b/libknet/host.c @@ -1,711 +1,742 @@ /* * Copyright (C) 2010-2019 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include "host.h" #include "internals.h" #include "logging.h" #include "threads_common.h" static void _host_list_update(knet_handle_t knet_h) { struct knet_host *host; knet_h->host_ids_entries = 0; for (host = knet_h->host_head; host != NULL; host = host->next) { knet_h->host_ids[knet_h->host_ids_entries] = host->host_id; knet_h->host_ids_entries++; } } int knet_host_add(knet_handle_t knet_h, knet_node_id_t host_id) { int savederrno = 0, err = 0; struct knet_host *host = NULL; uint8_t link_idx; if (!knet_h) { errno = EINVAL; return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (knet_h->host_index[host_id]) { err = -1; savederrno = EEXIST; log_err(knet_h, KNET_SUB_HOST, "Unable to add host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } host = malloc(sizeof(struct knet_host)); if (!host) { err = -1; savederrno = errno; log_err(knet_h, KNET_SUB_HOST, "Unable to allocate memory for host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } memset(host, 0, sizeof(struct knet_host)); /* * set host_id */ host->host_id = host_id; /* * set default host->name to host_id for logging */ snprintf(host->name, KNET_MAX_HOST_LEN - 1, "%u", host_id); /* * initialize links internal data */ for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { host->link[link_idx].link_id = link_idx; host->link[link_idx].status.stats.latency_min = UINT32_MAX; } /* * add new host to the index */ knet_h->host_index[host_id] = host; /* * add new host to host list */ if (knet_h->host_head) { host->next = knet_h->host_head; } knet_h->host_head = host; _host_list_update(knet_h); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); if (err < 0) { free(host); } errno = err ? savederrno : 0; return err; } int knet_host_remove(knet_handle_t knet_h, knet_node_id_t host_id) { int savederrno = 0, err = 0; struct knet_host *host, *removed; uint8_t link_idx; if (!knet_h) { errno = EINVAL; return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Unable to remove host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } /* * if links are configured we cannot release the host */ for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { if (host->link[link_idx].configured) { err = -1; savederrno = EBUSY; log_err(knet_h, KNET_SUB_HOST, "Unable to remove host %u, links are still configured: %s", host_id, strerror(savederrno)); goto exit_unlock; } } removed = NULL; /* * removing host from list */ if (knet_h->host_head->host_id == host_id) { removed = knet_h->host_head; knet_h->host_head = removed->next; } else { for (host = knet_h->host_head; host->next != NULL; host = host->next) { if (host->next->host_id == host_id) { removed = host->next; host->next = removed->next; break; } } } knet_h->host_index[host_id] = NULL; free(removed); _host_list_update(knet_h); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_host_set_name(knet_handle_t knet_h, knet_node_id_t host_id, const char *name) { int savederrno = 0, err = 0; struct knet_host *host; if (!knet_h) { errno = EINVAL; return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->host_index[host_id]) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Unable to find host %u to set name: %s", host_id, strerror(savederrno)); goto exit_unlock; } if (!name) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Unable to set name for host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } if (strlen(name) >= KNET_MAX_HOST_LEN) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Requested name for host %u is too long: %s", host_id, strerror(savederrno)); goto exit_unlock; } for (host = knet_h->host_head; host != NULL; host = host->next) { if (!strncmp(host->name, name, KNET_MAX_HOST_LEN - 1)) { err = -1; savederrno = EEXIST; log_err(knet_h, KNET_SUB_HOST, "Duplicated name found on host_id %u", host->host_id); goto exit_unlock; } } snprintf(knet_h->host_index[host_id]->name, KNET_MAX_HOST_LEN - 1, "%s", name); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_host_get_name_by_host_id(knet_handle_t knet_h, knet_node_id_t host_id, char *name) { int savederrno = 0, err = 0; if (!knet_h) { errno = EINVAL; return -1; } if (!name) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->host_index[host_id]) { savederrno = EINVAL; err = -1; log_debug(knet_h, KNET_SUB_HOST, "Host %u not found", host_id); goto exit_unlock; } snprintf(name, KNET_MAX_HOST_LEN, "%s", knet_h->host_index[host_id]->name); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_host_get_id_by_host_name(knet_handle_t knet_h, const char *name, knet_node_id_t *host_id) { int savederrno = 0, err = 0, found = 0; struct knet_host *host; if (!knet_h) { errno = EINVAL; return -1; } if (!name) { errno = EINVAL; return -1; } if (!host_id) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } for (host = knet_h->host_head; host != NULL; host = host->next) { if (!strncmp(name, host->name, KNET_MAX_HOST_LEN)) { found = 1; *host_id = host->host_id; break; } } if (!found) { savederrno = ENOENT; err = -1; } pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_host_get_host_list(knet_handle_t knet_h, knet_node_id_t *host_ids, size_t *host_ids_entries) { int savederrno = 0; if (!knet_h) { errno = EINVAL; return -1; } if ((!host_ids) || (!host_ids_entries)) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } memmove(host_ids, knet_h->host_ids, sizeof(knet_h->host_ids)); *host_ids_entries = knet_h->host_ids_entries; pthread_rwlock_unlock(&knet_h->global_rwlock); return 0; } int knet_host_set_policy(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t policy) { int savederrno = 0, err = 0; uint8_t old_policy; if (!knet_h) { errno = EINVAL; return -1; } if (policy > KNET_LINK_POLICY_RR) { errno = EINVAL; return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->host_index[host_id]) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Unable to set name for host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } old_policy = knet_h->host_index[host_id]->link_handler_policy; knet_h->host_index[host_id]->link_handler_policy = policy; if (_host_dstcache_update_async(knet_h, knet_h->host_index[host_id])) { savederrno = errno; err = -1; knet_h->host_index[host_id]->link_handler_policy = old_policy; log_debug(knet_h, KNET_SUB_HOST, "Unable to update switch cache for host %u: %s", host_id, strerror(savederrno)); } log_debug(knet_h, KNET_SUB_HOST, "Host %u has new switching policy: %u", host_id, policy); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_host_get_policy(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t *policy) { int savederrno = 0, err = 0; if (!knet_h) { errno = EINVAL; return -1; } if (!policy) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->host_index[host_id]) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Unable to get name for host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } *policy = knet_h->host_index[host_id]->link_handler_policy; exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_host_get_status(knet_handle_t knet_h, knet_node_id_t host_id, struct knet_host_status *status) { int savederrno = 0, err = 0; struct knet_host *host; if (!knet_h) { errno = EINVAL; return -1; } if (!status) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } memmove(status, &host->status, sizeof(struct knet_host_status)); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_host_enable_status_change_notify(knet_handle_t knet_h, void *host_status_change_notify_fn_private_data, void (*host_status_change_notify_fn) ( void *private_data, knet_node_id_t host_id, uint8_t reachable, uint8_t remote, uint8_t external)) { int savederrno = 0; if (!knet_h) { errno = EINVAL; return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } knet_h->host_status_change_notify_fn_private_data = host_status_change_notify_fn_private_data; knet_h->host_status_change_notify_fn = host_status_change_notify_fn; if (knet_h->host_status_change_notify_fn) { log_debug(knet_h, KNET_SUB_HOST, "host_status_change_notify_fn enabled"); } else { log_debug(knet_h, KNET_SUB_HOST, "host_status_change_notify_fn disabled"); } pthread_rwlock_unlock(&knet_h->global_rwlock); errno = 0; return 0; } int _send_host_info(knet_handle_t knet_h, const void *data, const size_t datalen) { ssize_t ret = 0; if (knet_h->fini_in_progress) { return 0; } ret = sendto(knet_h->hostsockfd[1], data, datalen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0); if (ret < 0) { log_debug(knet_h, KNET_SUB_HOST, "Unable to write data to hostpipe. Error: %s", strerror(errno)); return -1; } if ((size_t)ret != datalen) { log_debug(knet_h, KNET_SUB_HOST, "Unable to write all data to hostpipe. Expected: %zu, Written: %zd.", datalen, ret); return -1; } return 0; } static void _clear_cbuffers(struct knet_host *host, seq_num_t rx_seq_num) { int i; memset(host->circular_buffer, 0, KNET_CBUFFER_SIZE); host->rx_seq_num = rx_seq_num; memset(host->circular_buffer_defrag, 0, KNET_CBUFFER_SIZE); for (i = 0; i < KNET_MAX_LINK; i++) { memset(&host->defrag_buf[i], 0, sizeof(struct knet_host_defrag_buf)); } } +static void _reclaim_old_defrag_bufs(struct knet_host *host, seq_num_t seq_num) +{ + seq_num_t head, tail; /* seq_num boundaries */ + int i; + + head = seq_num + 1; + tail = seq_num - (KNET_MAX_LINK + 1); + + /* + * expire old defrag buffers + */ + for (i = 0; i < KNET_MAX_LINK; i++) { + if (host->defrag_buf[i].in_use) { + /* + * head has done a rollover to 0+ + */ + if (tail > head) { + if ((host->defrag_buf[i].pckt_seq >= head) && (host->defrag_buf[i].pckt_seq <= tail)) { + host->defrag_buf[i].in_use = 0; + } + } else { + if ((host->defrag_buf[i].pckt_seq >= head) || (host->defrag_buf[i].pckt_seq <= tail)){ + host->defrag_buf[i].in_use = 0; + } + } + } + } +} + /* * check if a given packet seq num is in the circular buffers * defrag_buf = 0 -> use normal cbuf 1 -> use the defrag buffer lookup */ int _seq_num_lookup(struct knet_host *host, seq_num_t seq_num, int defrag_buf, int clear_buf) { - size_t i, j; /* circular buffer indexes */ + size_t head, tail; /* circular buffer indexes */ seq_num_t seq_dist; char *dst_cbuf = host->circular_buffer; char *dst_cbuf_defrag = host->circular_buffer_defrag; seq_num_t *dst_seq_num = &host->rx_seq_num; if (clear_buf) { _clear_cbuffers(host, seq_num); } + _reclaim_old_defrag_bufs(host, seq_num); + if (seq_num < *dst_seq_num) { seq_dist = (SEQ_MAX - seq_num) + *dst_seq_num; } else { seq_dist = *dst_seq_num - seq_num; } - j = seq_num % KNET_CBUFFER_SIZE; + head = seq_num % KNET_CBUFFER_SIZE; if (seq_dist < KNET_CBUFFER_SIZE) { /* seq num is in ring buffer */ if (!defrag_buf) { - return (dst_cbuf[j] == 0) ? 1 : 0; + return (dst_cbuf[head] == 0) ? 1 : 0; } else { - return (dst_cbuf_defrag[j] == 0) ? 1 : 0; + return (dst_cbuf_defrag[head] == 0) ? 1 : 0; } } else if (seq_dist <= SEQ_MAX - KNET_CBUFFER_SIZE) { memset(dst_cbuf, 0, KNET_CBUFFER_SIZE); memset(dst_cbuf_defrag, 0, KNET_CBUFFER_SIZE); *dst_seq_num = seq_num; } /* cleaning up circular buffer */ - i = (*dst_seq_num + 1) % KNET_CBUFFER_SIZE; + tail = (*dst_seq_num + 1) % KNET_CBUFFER_SIZE; - if (i > j) { - memset(dst_cbuf + i, 0, KNET_CBUFFER_SIZE - i); - memset(dst_cbuf, 0, j + 1); - memset(dst_cbuf_defrag + i, 0, KNET_CBUFFER_SIZE - i); - memset(dst_cbuf_defrag, 0, j + 1); + if (tail > head) { + memset(dst_cbuf + tail, 0, KNET_CBUFFER_SIZE - tail); + memset(dst_cbuf, 0, head + 1); + memset(dst_cbuf_defrag + tail, 0, KNET_CBUFFER_SIZE - tail); + memset(dst_cbuf_defrag, 0, head + 1); } else { - memset(dst_cbuf + i, 0, j - i + 1); - memset(dst_cbuf_defrag + i, 0, j - i + 1); + memset(dst_cbuf + tail, 0, head - tail + 1); + memset(dst_cbuf_defrag + tail, 0, head - tail + 1); } *dst_seq_num = seq_num; return 1; } void _seq_num_set(struct knet_host *host, seq_num_t seq_num, int defrag_buf) { if (!defrag_buf) { host->circular_buffer[seq_num % KNET_CBUFFER_SIZE] = 1; } else { host->circular_buffer_defrag[seq_num % KNET_CBUFFER_SIZE] = 1; } return; } int _host_dstcache_update_async(knet_handle_t knet_h, struct knet_host *host) { int savederrno = 0; knet_node_id_t host_id = host->host_id; if (sendto(knet_h->dstsockfd[1], &host_id, sizeof(host_id), MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0) != sizeof(host_id)) { savederrno = errno; log_debug(knet_h, KNET_SUB_HOST, "Unable to write to dstpipefd[1]: %s", strerror(savederrno)); errno = savederrno; return -1; } return 0; } int _host_dstcache_update_sync(knet_handle_t knet_h, struct knet_host *host) { int link_idx; int best_priority = -1; int reachable = 0; if (knet_h->host_id == host->host_id && knet_h->has_loop_link) { host->active_link_entries = 1; return 0; } host->active_link_entries = 0; for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { if (host->link[link_idx].status.enabled != 1) /* link is not enabled */ continue; if (host->link[link_idx].status.connected != 1) /* link is not enabled */ continue; if (host->link[link_idx].has_valid_mtu != 1) /* link does not have valid MTU */ continue; if (host->link_handler_policy == KNET_LINK_POLICY_PASSIVE) { /* for passive we look for the only active link with higher priority */ if (host->link[link_idx].priority > best_priority) { host->active_links[0] = link_idx; best_priority = host->link[link_idx].priority; } host->active_link_entries = 1; } else { /* for RR and ACTIVE we need to copy all available links */ host->active_links[host->active_link_entries] = link_idx; host->active_link_entries++; } } if (host->link_handler_policy == KNET_LINK_POLICY_PASSIVE) { log_info(knet_h, KNET_SUB_HOST, "host: %u (passive) best link: %u (pri: %u)", host->host_id, host->link[host->active_links[0]].link_id, host->link[host->active_links[0]].priority); } else { log_info(knet_h, KNET_SUB_HOST, "host: %u has %u active links", host->host_id, host->active_link_entries); } /* no active links, we can clean the circular buffers and indexes */ if (!host->active_link_entries) { log_warn(knet_h, KNET_SUB_HOST, "host: %u has no active links", host->host_id); _clear_cbuffers(host, 0); } else { reachable = 1; } if (host->status.reachable != reachable) { host->status.reachable = reachable; if (knet_h->host_status_change_notify_fn) { knet_h->host_status_change_notify_fn( knet_h->host_status_change_notify_fn_private_data, host->host_id, host->status.reachable, host->status.remote, host->status.external); } } return 0; } diff --git a/libknet/tests/knet_bench.c b/libknet/tests/knet_bench.c index a60169f9..14dcb55b 100644 --- a/libknet/tests/knet_bench.c +++ b/libknet/tests/knet_bench.c @@ -1,1313 +1,1329 @@ /* * Copyright (C) 2016-2019 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * * This software licensed under GPL-2.0+ */ #include "config.h" #include #include #include #include #include #include #include #include #include #include "libknet.h" #include "compat.h" #include "internals.h" #include "netutils.h" #include "transport_common.h" #include "test-common.h" #define MAX_NODES 128 static int senderid = -1; static int thisnodeid = -1; static knet_handle_t knet_h; static int datafd = 0; static int8_t channel = 0; static int globallistener = 0; static int continous = 0; static int show_stats = 0; static struct sockaddr_storage allv4; static struct sockaddr_storage allv6; static int broadcast_test = 1; static pthread_t rx_thread = (pthread_t)NULL; static char *rx_buf[PCKT_FRAG_MAX]; static int wait_for_perf_rx = 0; static char *compresscfg = NULL; static char *cryptocfg = NULL; static int machine_output = 0; static int use_access_lists = 0; static int bench_shutdown_in_progress = 0; static pthread_mutex_t shutdown_mutex = PTHREAD_MUTEX_INITIALIZER; #define TEST_PING 0 #define TEST_PING_AND_DATA 1 #define TEST_PERF_BY_SIZE 2 #define TEST_PERF_BY_TIME 3 static int test_type = TEST_PING; #define TEST_START 2 #define TEST_STOP 4 #define TEST_COMPLETE 6 #define ONE_GIGABYTE 1073741824 static uint64_t perf_by_size_size = 1 * ONE_GIGABYTE; static uint64_t perf_by_time_secs = 10; +static uint32_t force_packet_size = 0; + struct node { int nodeid; int links; uint8_t transport[KNET_MAX_LINK]; struct sockaddr_storage address[KNET_MAX_LINK]; }; static void print_help(void) { printf("knet_bench usage:\n"); printf(" -h print this help (no really)\n"); printf(" -d enable debug logs (default INFO)\n"); printf(" -f enable use of access lists (default: off)\n"); printf(" -c [implementation]:[crypto]:[hashing] crypto configuration. (default disabled)\n"); printf(" Example: -c nss:aes128:sha1\n"); printf(" -z [implementation]:[level]:[threshold] compress configuration. (default disabled)\n"); printf(" Example: -z zlib:5:100\n"); printf(" -p [active|passive|rr] (default: passive)\n"); printf(" -P [UDP|SCTP] (default: UDP) protocol (transport) to use for all links\n"); printf(" -t [nodeid] This nodeid (required)\n"); printf(" -n [nodeid],[proto]/[link1_ip],[link2_..] Other nodes information (at least one required)\n"); printf(" Example: -n 1,192.168.8.1,SCTP/3ffe::8:1,UDP/172...\n"); printf(" can be repeated up to %d and should contain also the localnode info\n", MAX_NODES); printf(" -b [port] baseport (default: 50000)\n"); printf(" -l enable global listener on 0.0.0.0/:: (default: off, incompatible with -o)\n"); printf(" -o enable baseport offset per nodeid\n"); printf(" -m change PMTUd interval in seconds (default: 60)\n"); printf(" -w dont wait for all nodes to be up before starting the test (default: wait)\n"); printf(" -T [ping|ping_data|perf-by-size|perf-by-time]\n"); printf(" test type (default: ping)\n"); printf(" ping: will wait for all hosts to join the knet network, sleep 5 seconds and quit\n"); printf(" ping_data: will wait for all hosts to join the knet network, sends some data to all nodes and quit\n"); printf(" perf-by-size: will wait for all hosts to join the knet network,\n"); printf(" perform a series of benchmarks by transmitting a known\n"); printf(" size/quantity of packets and measuring the time, then quit\n"); printf(" perf-by-time: will wait for all hosts to join the knet network,\n"); printf(" perform a series of benchmarks by transmitting a known\n"); printf(" size of packets for a given amount of time (10 seconds)\n"); printf(" and measuring the quantity of data transmitted, then quit\n"); printf(" -s nodeid that will generate traffic for benchmarks\n"); printf(" -S [size|seconds] when used in combination with -T perf-by-size it indicates how many GB of traffic to generate for the test. (default: 1GB)\n"); printf(" when used in combination with -T perf-by-time it indicates how many Seconds of traffic to generate for the test. (default: 10 seconds)\n"); + printf(" -x force packet size for perf-by-time or perf-by-size\n"); printf(" -C repeat the test continously (default: off)\n"); printf(" -X[XX] show stats at the end of the run (default: 1)\n"); printf(" 1: show handle stats, 2: show summary link stats\n"); printf(" 3: show detailed link stats\n"); printf(" -a enable machine parsable output (default: off).\n"); } static void parse_nodes(char *nodesinfo[MAX_NODES], int onidx, int port, struct node nodes[MAX_NODES], int *thisidx) { int i; char *temp = NULL; char port_str[10]; memset(port_str, 0, sizeof(port_str)); sprintf(port_str, "%d", port); for (i = 0; i < onidx; i++) { nodes[i].nodeid = atoi(strtok(nodesinfo[i], ",")); if ((nodes[i].nodeid < 0) || (nodes[i].nodeid > KNET_MAX_HOST)) { printf("Invalid nodeid: %d (0 - %d)\n", nodes[i].nodeid, KNET_MAX_HOST); exit(FAIL); } if (thisnodeid == nodes[i].nodeid) { *thisidx = i; } while((temp = strtok(NULL, ","))) { char *slash = NULL; uint8_t transport; if (nodes[i].links == KNET_MAX_LINK) { printf("Too many links configured. Max %d\n", KNET_MAX_LINK); exit(FAIL); } slash = strstr(temp, "/"); if (slash) { memset(slash, 0, 1); transport = knet_get_transport_id_by_name(temp); if (transport == KNET_MAX_TRANSPORTS) { printf("Unknown transport: %s\n", temp); exit(FAIL); } nodes[i].transport[nodes[i].links] = transport; temp = slash + 1; } else { nodes[i].transport[nodes[i].links] = KNET_TRANSPORT_UDP; } if (knet_strtoaddr(temp, port_str, &nodes[i].address[nodes[i].links], sizeof(struct sockaddr_storage)) < 0) { printf("Unable to convert %s to sockaddress\n", temp); exit(FAIL); } nodes[i].links++; } } if (knet_strtoaddr("0.0.0.0", port_str, &allv4, sizeof(struct sockaddr_storage)) < 0) { printf("Unable to convert 0.0.0.0 to sockaddress\n"); exit(FAIL); } if (knet_strtoaddr("::", port_str, &allv6, sizeof(struct sockaddr_storage)) < 0) { printf("Unable to convert :: to sockaddress\n"); exit(FAIL); } for (i = 1; i < onidx; i++) { if (nodes[0].links != nodes[i].links) { printf("knet_bench does not support unbalanced link configuration\n"); exit(FAIL); } } return; } static int private_data; static void sock_notify(void *pvt_data, int local_datafd, int8_t local_channel, uint8_t tx_rx, int error, int errorno) { printf("[info]: error (%d - %d - %s) from socket: %d\n", error, errorno, strerror(errno), local_datafd); return; } static int ping_dst_host_filter(void *pvt_data, const unsigned char *outdata, ssize_t outdata_len, uint8_t tx_rx, knet_node_id_t this_host_id, knet_node_id_t src_host_id, int8_t *dst_channel, knet_node_id_t *dst_host_ids, size_t *dst_host_ids_entries) { if (broadcast_test) { return 1; } if (tx_rx == KNET_NOTIFY_TX) { memmove(&dst_host_ids[0], outdata, 2); } else { dst_host_ids[0] = this_host_id; } *dst_host_ids_entries = 1; return 0; } static void setup_knet(int argc, char *argv[]) { int logfd = 0; int rv; char *policystr = NULL, *protostr = NULL; char *othernodeinfo[MAX_NODES]; struct node nodes[MAX_NODES]; int thisidx = -1; int onidx = 0; int debug = KNET_LOG_INFO; int port = 50000, portoffset = 0; int thisport = 0, otherport = 0; int thisnewport = 0, othernewport = 0; struct sockaddr_in *so_in; struct sockaddr_in6 *so_in6; struct sockaddr_storage *src; int i, link_idx, allnodesup = 0; int policy = KNET_LINK_POLICY_PASSIVE, policyfound = 0; int protocol = KNET_TRANSPORT_UDP, protofound = 0; int wait = 1; int pmtud_interval = 60; struct knet_handle_crypto_cfg knet_handle_crypto_cfg; char *cryptomodel = NULL, *cryptotype = NULL, *cryptohash = NULL; struct knet_handle_compress_cfg knet_handle_compress_cfg; memset(nodes, 0, sizeof(nodes)); - while ((rv = getopt(argc, argv, "aCT:S:s:ldfom:wb:t:n:c:p:X::P:z:h")) != EOF) { + while ((rv = getopt(argc, argv, "aCT:S:s:ldfom:wb:t:n:c:p:x:X::P:z:h")) != EOF) { switch(rv) { case 'h': print_help(); exit(PASS); break; case 'a': machine_output = 1; break; case 'd': debug = KNET_LOG_DEBUG; break; case 'f': use_access_lists = 1; break; case 'c': if (cryptocfg) { printf("Error: -c can only be specified once\n"); exit(FAIL); } cryptocfg = optarg; break; case 'p': if (policystr) { printf("Error: -p can only be specified once\n"); exit(FAIL); } if (optarg) { policystr = optarg; if (!strcmp(policystr, "active")) { policy = KNET_LINK_POLICY_ACTIVE; policyfound = 1; } /* * we can't use rr because clangs can't compile * an array of 3 strings, one of which is 2 bytes long */ if (!strcmp(policystr, "round-robin")) { policy = KNET_LINK_POLICY_RR; policyfound = 1; } if (!strcmp(policystr, "passive")) { policy = KNET_LINK_POLICY_PASSIVE; policyfound = 1; } } if (!policyfound) { printf("Error: invalid policy %s specified. -p accepts active|passive|rr\n", policystr); exit(FAIL); } break; case 'P': if (protostr) { printf("Error: -P can only be specified once\n"); exit(FAIL); } if (optarg) { protostr = optarg; if (!strcmp(protostr, "UDP")) { protocol = KNET_TRANSPORT_UDP; protofound = 1; } if (!strcmp(protostr, "SCTP")) { protocol = KNET_TRANSPORT_SCTP; protofound = 1; } } if (!protofound) { printf("Error: invalid protocol %s specified. -P accepts udp|sctp\n", policystr); exit(FAIL); } break; case 't': if (thisnodeid >= 0) { printf("Error: -t can only be specified once\n"); exit(FAIL); } thisnodeid = atoi(optarg); if ((thisnodeid < 0) || (thisnodeid > 65536)) { printf("Error: -t nodeid out of range %d (1 - 65536)\n", thisnodeid); exit(FAIL); } break; case 'n': if (onidx == MAX_NODES) { printf("Error: too many other nodes. Max %d\n", MAX_NODES); exit(FAIL); } othernodeinfo[onidx] = optarg; onidx++; break; case 'b': port = atoi(optarg); if ((port < 1) || (port > 65536)) { printf("Error: port %d out of range (1 - 65536)\n", port); exit(FAIL); } break; case 'o': if (globallistener) { printf("Error: -l cannot be used with -o\n"); exit(FAIL); } portoffset = 1; break; case 'm': pmtud_interval = atoi(optarg); if (pmtud_interval < 1) { printf("Error: pmtud interval %d out of range (> 0)\n", pmtud_interval); exit(FAIL); } break; case 'l': if (portoffset) { printf("Error: -o cannot be used with -l\n"); exit(FAIL); } globallistener = 1; break; case 'w': wait = 0; break; case 's': if (senderid >= 0) { printf("Error: -s can only be specified once\n"); exit(FAIL); } senderid = atoi(optarg); if ((senderid < 0) || (senderid > 65536)) { printf("Error: -s nodeid out of range %d (1 - 65536)\n", senderid); exit(FAIL); } break; case 'T': if (optarg) { if (!strcmp("ping", optarg)) { test_type = TEST_PING; } if (!strcmp("ping_data", optarg)) { test_type = TEST_PING_AND_DATA; } if (!strcmp("perf-by-size", optarg)) { test_type = TEST_PERF_BY_SIZE; } if (!strcmp("perf-by-time", optarg)) { test_type = TEST_PERF_BY_TIME; } } else { printf("Error: -T requires an option\n"); exit(FAIL); } break; case 'S': perf_by_size_size = (uint64_t)atoi(optarg) * ONE_GIGABYTE; perf_by_time_secs = (uint64_t)atoi(optarg); break; + case 'x': + force_packet_size = (uint32_t)atoi(optarg); + if ((force_packet_size < 1) || (force_packet_size > 65536)) { + printf("Unsupported packet size %u (accepted 1 - 65536)\n", force_packet_size); + exit(FAIL); + } + break; case 'C': continous = 1; break; case 'X': if (optarg) { show_stats = atoi(optarg); } else { show_stats = 1; } break; case 'z': if (compresscfg) { printf("Error: -c can only be specified once\n"); exit(FAIL); } compresscfg = optarg; break; default: break; } } if (thisnodeid < 0) { printf("Who am I?!? missing -t from command line?\n"); exit(FAIL); } if (onidx < 1) { printf("no other nodes configured?!? missing -n from command line\n"); exit(FAIL); } parse_nodes(othernodeinfo, onidx, port, nodes, &thisidx); if (thisidx < 0) { printf("no config for this node found\n"); exit(FAIL); } if (senderid >= 0) { for (i=0; i < onidx; i++) { if (senderid == nodes[i].nodeid) { break; } } if (i == onidx) { printf("Unable to find senderid in nodelist\n"); exit(FAIL); } } if (((test_type == TEST_PERF_BY_SIZE) || (test_type == TEST_PERF_BY_TIME)) && (senderid < 0)) { printf("Error: performance test requires -s to be set (for now)\n"); exit(FAIL); } logfd = start_logging(stdout); knet_h = knet_handle_new(thisnodeid, logfd, debug, 0); if (!knet_h) { printf("Unable to knet_handle_new: %s\n", strerror(errno)); exit(FAIL); } if (knet_handle_enable_access_lists(knet_h, use_access_lists) < 0) { printf("Unable to knet_handle_enable_access_lists: %s\n", strerror(errno)); exit(FAIL); } if (cryptocfg) { memset(&knet_handle_crypto_cfg, 0, sizeof(knet_handle_crypto_cfg)); cryptomodel = strtok(cryptocfg, ":"); cryptotype = strtok(NULL, ":"); cryptohash = strtok(NULL, ":"); if (cryptomodel) { strncpy(knet_handle_crypto_cfg.crypto_model, cryptomodel, sizeof(knet_handle_crypto_cfg.crypto_model) - 1); } if (cryptotype) { strncpy(knet_handle_crypto_cfg.crypto_cipher_type, cryptotype, sizeof(knet_handle_crypto_cfg.crypto_cipher_type) - 1); } if (cryptohash) { strncpy(knet_handle_crypto_cfg.crypto_hash_type, cryptohash, sizeof(knet_handle_crypto_cfg.crypto_hash_type) - 1); } knet_handle_crypto_cfg.private_key_len = KNET_MAX_KEY_LEN; if (knet_handle_crypto(knet_h, &knet_handle_crypto_cfg)) { printf("Unable to init crypto\n"); exit(FAIL); } } if (compresscfg) { memset(&knet_handle_compress_cfg, 0, sizeof(struct knet_handle_compress_cfg)); snprintf(knet_handle_compress_cfg.compress_model, 16, "%s", strtok(compresscfg, ":")); knet_handle_compress_cfg.compress_level = atoi(strtok(NULL, ":")); knet_handle_compress_cfg.compress_threshold = atoi(strtok(NULL, ":")); if (knet_handle_compress(knet_h, &knet_handle_compress_cfg)) { printf("Unable to configure compress\n"); exit(FAIL); } } if (knet_handle_enable_sock_notify(knet_h, &private_data, sock_notify) < 0) { printf("knet_handle_enable_sock_notify failed: %s\n", strerror(errno)); knet_handle_free(knet_h); exit(FAIL); } datafd = 0; channel = -1; if (knet_handle_add_datafd(knet_h, &datafd, &channel) < 0) { printf("knet_handle_add_datafd failed: %s\n", strerror(errno)); knet_handle_free(knet_h); exit(FAIL); } if (knet_handle_pmtud_setfreq(knet_h, pmtud_interval) < 0) { printf("knet_handle_pmtud_setfreq failed: %s\n", strerror(errno)); knet_handle_free(knet_h); exit(FAIL); } for (i=0; i < onidx; i++) { if (i == thisidx) { continue; } if (knet_host_add(knet_h, nodes[i].nodeid) < 0) { printf("knet_host_add failed: %s\n", strerror(errno)); exit(FAIL); } if (knet_host_set_policy(knet_h, nodes[i].nodeid, policy) < 0) { printf("knet_host_set_policy failed: %s\n", strerror(errno)); exit(FAIL); } for (link_idx = 0; link_idx < nodes[i].links; link_idx++) { if (portoffset) { if (nodes[thisidx].address[link_idx].ss_family == AF_INET) { so_in = (struct sockaddr_in *)&nodes[thisidx].address[link_idx]; thisport = ntohs(so_in->sin_port); thisnewport = thisport + nodes[i].nodeid; so_in->sin_port = (htons(thisnewport)); so_in = (struct sockaddr_in *)&nodes[i].address[link_idx]; otherport = ntohs(so_in->sin_port); othernewport = otherport + nodes[thisidx].nodeid; so_in->sin_port = (htons(othernewport)); } else { so_in6 = (struct sockaddr_in6 *)&nodes[thisidx].address[link_idx]; thisport = ntohs(so_in6->sin6_port); thisnewport = thisport + nodes[i].nodeid; so_in6->sin6_port = (htons(thisnewport)); so_in6 = (struct sockaddr_in6 *)&nodes[i].address[link_idx]; otherport = ntohs(so_in6->sin6_port); othernewport = otherport + nodes[thisidx].nodeid; so_in6->sin6_port = (htons(othernewport)); } } if (!globallistener) { src = &nodes[thisidx].address[link_idx]; } else { if (nodes[thisidx].address[link_idx].ss_family == AF_INET) { src = &allv4; } else { src = &allv6; } } /* * -P overrides per link protocol configuration */ if (protofound) { nodes[i].transport[link_idx] = protocol; } if (knet_link_set_config(knet_h, nodes[i].nodeid, link_idx, nodes[i].transport[link_idx], src, &nodes[i].address[link_idx], 0) < 0) { printf("Unable to configure link: %s\n", strerror(errno)); exit(FAIL); } if (portoffset) { if (nodes[thisidx].address[link_idx].ss_family == AF_INET) { so_in = (struct sockaddr_in *)&nodes[thisidx].address[link_idx]; so_in->sin_port = (htons(thisport)); so_in = (struct sockaddr_in *)&nodes[i].address[link_idx]; so_in->sin_port = (htons(otherport)); } else { so_in6 = (struct sockaddr_in6 *)&nodes[thisidx].address[link_idx]; so_in6->sin6_port = (htons(thisport)); so_in6 = (struct sockaddr_in6 *)&nodes[i].address[link_idx]; so_in6->sin6_port = (htons(otherport)); } } if (knet_link_set_enable(knet_h, nodes[i].nodeid, link_idx, 1) < 0) { printf("knet_link_set_enable failed: %s\n", strerror(errno)); exit(FAIL); } if (knet_link_set_ping_timers(knet_h, nodes[i].nodeid, link_idx, 1000, 10000, 2048) < 0) { printf("knet_link_set_ping_timers failed: %s\n", strerror(errno)); exit(FAIL); } if (knet_link_set_pong_count(knet_h, nodes[i].nodeid, link_idx, 2) < 0) { printf("knet_link_set_pong_count failed: %s\n", strerror(errno)); exit(FAIL); } } } if (knet_handle_enable_filter(knet_h, NULL, ping_dst_host_filter)) { printf("Unable to enable dst_host_filter: %s\n", strerror(errno)); exit(FAIL); } if (knet_handle_setfwd(knet_h, 1) < 0) { printf("knet_handle_setfwd failed: %s\n", strerror(errno)); exit(FAIL); } if (wait) { while(!allnodesup) { allnodesup = 1; for (i=0; i < onidx; i++) { if (i == thisidx) { continue; } if (knet_h->host_index[nodes[i].nodeid]->status.reachable != 1) { printf("[info]: waiting host %d to be reachable\n", nodes[i].nodeid); allnodesup = 0; } } if (!allnodesup) { sleep(1); } } sleep(1); } } static void *_rx_thread(void *args) { int rx_epoll; struct epoll_event ev; struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; struct sockaddr_storage address[PCKT_FRAG_MAX]; struct knet_mmsghdr msg[PCKT_FRAG_MAX]; struct iovec iov_in[PCKT_FRAG_MAX]; int i, msg_recv; struct timespec clock_start, clock_end; unsigned long long time_diff = 0; uint64_t rx_pkts = 0; uint64_t rx_bytes = 0; unsigned int current_pckt_size = 0; for (i = 0; i < PCKT_FRAG_MAX; i++) { rx_buf[i] = malloc(KNET_MAX_PACKET_SIZE); if (!rx_buf[i]) { printf("RXT: Unable to malloc!\nHALTING RX THREAD!\n"); return NULL; } memset(rx_buf[i], 0, KNET_MAX_PACKET_SIZE); iov_in[i].iov_base = (void *)rx_buf[i]; iov_in[i].iov_len = KNET_MAX_PACKET_SIZE; memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr)); msg[i].msg_hdr.msg_name = &address[i]; msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); msg[i].msg_hdr.msg_iov = &iov_in[i]; msg[i].msg_hdr.msg_iovlen = 1; } rx_epoll = epoll_create(KNET_EPOLL_MAX_EVENTS + 1); if (rx_epoll < 0) { printf("RXT: Unable to create epoll!\nHALTING RX THREAD!\n"); return NULL; } memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = datafd; if (epoll_ctl(rx_epoll, EPOLL_CTL_ADD, datafd, &ev)) { printf("RXT: Unable to add datafd to epoll\nHALTING RX THREAD!\n"); return NULL; } memset(&clock_start, 0, sizeof(clock_start)); memset(&clock_end, 0, sizeof(clock_start)); while (!bench_shutdown_in_progress) { if (epoll_wait(rx_epoll, events, KNET_EPOLL_MAX_EVENTS, 1) >= 1) { msg_recv = _recvmmsg(datafd, &msg[0], PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL); if (msg_recv < 0) { printf("[info]: RXT: error from recvmmsg: %s\n", strerror(errno)); } switch(test_type) { case TEST_PING_AND_DATA: for (i = 0; i < msg_recv; i++) { if (msg[i].msg_len == 0) { printf("[info]: RXT: received 0 bytes message?\n"); } printf("[info]: received %u bytes message: %s\n", msg[i].msg_len, (char *)msg[i].msg_hdr.msg_iov->iov_base); } break; case TEST_PERF_BY_TIME: case TEST_PERF_BY_SIZE: for (i = 0; i < msg_recv; i++) { if (msg[i].msg_len < 64) { if (msg[i].msg_len == 0) { printf("[info]: RXT: received 0 bytes message?\n"); } if (msg[i].msg_len == TEST_START) { if (clock_gettime(CLOCK_MONOTONIC, &clock_start) != 0) { printf("[info]: unable to get start time!\n"); } } if (msg[i].msg_len == TEST_STOP) { double average_rx_mbytes; double average_rx_pkts; double time_diff_sec; if (clock_gettime(CLOCK_MONOTONIC, &clock_end) != 0) { printf("[info]: unable to get end time!\n"); } timespec_diff(clock_start, clock_end, &time_diff); /* * adjust for sleep(2) between sending the last data and TEST_STOP */ time_diff = time_diff - 2000000000llu; /* * convert to seconds */ time_diff_sec = (double)time_diff / 1000000000llu; average_rx_mbytes = (double)((rx_bytes / time_diff_sec) / (1024 * 1024)); average_rx_pkts = (double)(rx_pkts / time_diff_sec); if (!machine_output) { printf("[perf] execution time: %8.4f secs Average speed: %8.4f MB/sec %8.4f pckts/sec (size: %u total: %" PRIu64 ")\n", time_diff_sec, average_rx_mbytes, average_rx_pkts, current_pckt_size, rx_pkts); } else { printf("[perf],%.4f,%u,%" PRIu64 ",%.4f,%.4f\n", time_diff_sec, current_pckt_size, rx_pkts, average_rx_mbytes, average_rx_pkts); } rx_pkts = 0; rx_bytes = 0; current_pckt_size = 0; } if (msg[i].msg_len == TEST_COMPLETE) { wait_for_perf_rx = 1; } continue; } rx_pkts++; rx_bytes = rx_bytes + msg[i].msg_len; current_pckt_size = msg[i].msg_len; } break; } } } epoll_ctl(rx_epoll, EPOLL_CTL_DEL, datafd, &ev); close(rx_epoll); return NULL; } static void setup_data_txrx_common(void) { if (!rx_thread) { if (knet_handle_enable_filter(knet_h, NULL, ping_dst_host_filter)) { printf("Unable to enable dst_host_filter: %s\n", strerror(errno)); exit(FAIL); } printf("[info]: setting up rx thread\n"); if (pthread_create(&rx_thread, 0, _rx_thread, NULL)) { printf("Unable to start rx thread\n"); exit(FAIL); } } } static void stop_rx_thread(void) { void *retval; int i; if (rx_thread) { printf("[info]: shutting down rx thread\n"); sleep(2); pthread_cancel(rx_thread); pthread_join(rx_thread, &retval); for (i = 0; i < PCKT_FRAG_MAX; i ++) { free(rx_buf[i]); } } } static void send_ping_data(void) { char buf[65535]; ssize_t len; memset(&buf, 0, sizeof(buf)); snprintf(buf, sizeof(buf), "Hello world!"); if (compresscfg) { len = sizeof(buf); } else { len = strlen(buf); } if (knet_send(knet_h, buf, len, channel) != len) { printf("[info]: Error sending hello world: %s\n", strerror(errno)); } sleep(1); } static int send_messages(struct knet_mmsghdr *msg, int msgs_to_send) { int sent_msgs, prev_sent, progress, total_sent; total_sent = 0; sent_msgs = 0; prev_sent = 0; progress = 1; retry: errno = 0; sent_msgs = _sendmmsg(datafd, 0, &msg[0], msgs_to_send, MSG_NOSIGNAL); if (sent_msgs < 0) { if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { usleep(KNET_THREADS_TIMER_RES / 16); goto retry; } printf("[info]: Unable to send messages: %s\n", strerror(errno)); return -1; } total_sent = total_sent + sent_msgs; if ((sent_msgs >= 0) && (sent_msgs < msgs_to_send)) { if ((sent_msgs) || (progress)) { msgs_to_send = msgs_to_send - sent_msgs; prev_sent = prev_sent + sent_msgs; if (sent_msgs) { progress = 1; } else { progress = 0; } goto retry; } if (!progress) { printf("[info]: Unable to send more messages after retry\n"); } } return total_sent; } static int setup_send_buffers_common(struct knet_mmsghdr *msg, struct iovec *iov_out, char *tx_buf[]) { int i; for (i = 0; i < PCKT_FRAG_MAX; i++) { tx_buf[i] = malloc(KNET_MAX_PACKET_SIZE); if (!tx_buf[i]) { printf("TXT: Unable to malloc!\n"); return -1; } - memset(tx_buf[i], 0, KNET_MAX_PACKET_SIZE); + memset(tx_buf[i], i, KNET_MAX_PACKET_SIZE); iov_out[i].iov_base = (void *)tx_buf[i]; memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr)); msg[i].msg_hdr.msg_iov = &iov_out[i]; msg[i].msg_hdr.msg_iovlen = 1; } return 0; } static void send_perf_data_by_size(void) { char *tx_buf[PCKT_FRAG_MAX]; struct knet_mmsghdr msg[PCKT_FRAG_MAX]; struct iovec iov_out[PCKT_FRAG_MAX]; char ctrl_message[16]; int sent_msgs; int i; uint64_t total_pkts_to_tx; uint64_t packets_to_send; uint32_t packetsize = 64; setup_send_buffers_common(msg, iov_out, tx_buf); while (packetsize <= KNET_MAX_PACKET_SIZE) { + if (force_packet_size) { + packetsize = force_packet_size; + } for (i = 0; i < PCKT_FRAG_MAX; i++) { iov_out[i].iov_len = packetsize; } total_pkts_to_tx = perf_by_size_size / packetsize; printf("[info]: testing with %u packet size. total bytes to transfer: %" PRIu64 " (%" PRIu64 " packets)\n", packetsize, perf_by_size_size, total_pkts_to_tx); memset(ctrl_message, 0, sizeof(ctrl_message)); knet_send(knet_h, ctrl_message, TEST_START, channel); while (total_pkts_to_tx > 0) { if (total_pkts_to_tx >= PCKT_FRAG_MAX) { packets_to_send = PCKT_FRAG_MAX; } else { packets_to_send = total_pkts_to_tx; } sent_msgs = send_messages(&msg[0], packets_to_send); if (sent_msgs < 0) { printf("Something went wrong, aborting\n"); exit(FAIL); } total_pkts_to_tx = total_pkts_to_tx - sent_msgs; } sleep(2); knet_send(knet_h, ctrl_message, TEST_STOP, channel); - if (packetsize == KNET_MAX_PACKET_SIZE) { + if ((packetsize == KNET_MAX_PACKET_SIZE) || (force_packet_size)) { break; } /* * Use a multiplier that can always divide properly a GB * into smaller chunks without worry about boundaries */ packetsize *= 4; if (packetsize > KNET_MAX_PACKET_SIZE) { packetsize = KNET_MAX_PACKET_SIZE; } } knet_send(knet_h, ctrl_message, TEST_COMPLETE, channel); for (i = 0; i < PCKT_FRAG_MAX; i++) { free(tx_buf[i]); } } /* For sorting the node list into order */ static int node_compare(const void *aptr, const void *bptr) { uint16_t a,b; a = *(uint16_t *)aptr; b = *(uint16_t *)bptr; return a > b; } static void display_stats(int level) { struct knet_handle_stats handle_stats; struct knet_link_status link_status; struct knet_link_stats total_link_stats; knet_node_id_t host_list[KNET_MAX_HOST]; uint8_t link_list[KNET_MAX_LINK]; unsigned int i,j; size_t num_hosts, num_links; if (knet_handle_get_stats(knet_h, &handle_stats, sizeof(handle_stats)) < 0) { perror("[info]: failed to get knet handle stats"); return; } if (compresscfg || cryptocfg) { printf("\n"); printf("[stat]: handle stats\n"); printf("[stat]: ------------\n"); if (compresscfg) { printf("[stat]: tx_uncompressed_packets: %" PRIu64 "\n", handle_stats.tx_uncompressed_packets); printf("[stat]: tx_compressed_packets: %" PRIu64 "\n", handle_stats.tx_compressed_packets); printf("[stat]: tx_compressed_original_bytes: %" PRIu64 "\n", handle_stats.tx_compressed_original_bytes); printf("[stat]: tx_compressed_size_bytes: %" PRIu64 "\n", handle_stats.tx_compressed_size_bytes ); printf("[stat]: tx_compress_time_ave: %" PRIu64 "\n", handle_stats.tx_compress_time_ave); printf("[stat]: tx_compress_time_min: %" PRIu64 "\n", handle_stats.tx_compress_time_min); printf("[stat]: tx_compress_time_max: %" PRIu64 "\n", handle_stats.tx_compress_time_max); printf("[stat]: tx_failed_to_compress: %" PRIu64 "\n", handle_stats.tx_failed_to_compress); printf("[stat]: tx_unable_to_compress: %" PRIu64 "\n", handle_stats.tx_unable_to_compress); printf("[stat]: rx_compressed_packets: %" PRIu64 "\n", handle_stats.rx_compressed_packets); printf("[stat]: rx_compressed_original_bytes: %" PRIu64 "\n", handle_stats.rx_compressed_original_bytes); printf("[stat]: rx_compressed_size_bytes: %" PRIu64 "\n", handle_stats.rx_compressed_size_bytes); printf("[stat]: rx_compress_time_ave: %" PRIu64 "\n", handle_stats.rx_compress_time_ave); printf("[stat]: rx_compress_time_min: %" PRIu64 "\n", handle_stats.rx_compress_time_min); printf("[stat]: rx_compress_time_max: %" PRIu64 "\n", handle_stats.rx_compress_time_max); printf("[stat]: rx_failed_to_decompress: %" PRIu64 "\n", handle_stats.rx_failed_to_decompress); printf("\n"); } if (cryptocfg) { printf("[stat]: tx_crypt_packets: %" PRIu64 "\n", handle_stats.tx_crypt_packets); printf("[stat]: tx_crypt_byte_overhead: %" PRIu64 "\n", handle_stats.tx_crypt_byte_overhead); printf("[stat]: tx_crypt_time_ave: %" PRIu64 "\n", handle_stats.tx_crypt_time_ave); printf("[stat]: tx_crypt_time_min: %" PRIu64 "\n", handle_stats.tx_crypt_time_min); printf("[stat]: tx_crypt_time_max: %" PRIu64 "\n", handle_stats.tx_crypt_time_max); printf("[stat]: rx_crypt_packets: %" PRIu64 "\n", handle_stats.rx_crypt_packets); printf("[stat]: rx_crypt_time_ave: %" PRIu64 "\n", handle_stats.rx_crypt_time_ave); printf("[stat]: rx_crypt_time_min: %" PRIu64 "\n", handle_stats.rx_crypt_time_min); printf("[stat]: rx_crypt_time_max: %" PRIu64 "\n", handle_stats.rx_crypt_time_max); printf("\n"); } } if (level < 2) { return; } memset(&total_link_stats, 0, sizeof(struct knet_link_stats)); if (knet_host_get_host_list(knet_h, host_list, &num_hosts) < 0) { perror("[info]: cannot get host list for stats"); return; } /* Print in host ID order */ qsort(host_list, num_hosts, sizeof(uint16_t), node_compare); for (j=0; j 2) { printf("\n"); printf("[stat]: Node %d Link %d\n", host_list[j], link_list[i]); printf("[stat]: tx_data_packets: %" PRIu64 "\n", link_status.stats.tx_data_packets); printf("[stat]: rx_data_packets: %" PRIu64 "\n", link_status.stats.rx_data_packets); printf("[stat]: tx_data_bytes: %" PRIu64 "\n", link_status.stats.tx_data_bytes); printf("[stat]: rx_data_bytes: %" PRIu64 "\n", link_status.stats.rx_data_bytes); printf("[stat]: rx_ping_packets: %" PRIu64 "\n", link_status.stats.rx_ping_packets); printf("[stat]: tx_ping_packets: %" PRIu64 "\n", link_status.stats.tx_ping_packets); printf("[stat]: rx_ping_bytes: %" PRIu64 "\n", link_status.stats.rx_ping_bytes); printf("[stat]: tx_ping_bytes: %" PRIu64 "\n", link_status.stats.tx_ping_bytes); printf("[stat]: rx_pong_packets: %" PRIu64 "\n", link_status.stats.rx_pong_packets); printf("[stat]: tx_pong_packets: %" PRIu64 "\n", link_status.stats.tx_pong_packets); printf("[stat]: rx_pong_bytes: %" PRIu64 "\n", link_status.stats.rx_pong_bytes); printf("[stat]: tx_pong_bytes: %" PRIu64 "\n", link_status.stats.tx_pong_bytes); printf("[stat]: rx_pmtu_packets: %" PRIu64 "\n", link_status.stats.rx_pmtu_packets); printf("[stat]: tx_pmtu_packets: %" PRIu64 "\n", link_status.stats.tx_pmtu_packets); printf("[stat]: rx_pmtu_bytes: %" PRIu64 "\n", link_status.stats.rx_pmtu_bytes); printf("[stat]: tx_pmtu_bytes: %" PRIu64 "\n", link_status.stats.tx_pmtu_bytes); printf("[stat]: tx_total_packets: %" PRIu64 "\n", link_status.stats.tx_total_packets); printf("[stat]: rx_total_packets: %" PRIu64 "\n", link_status.stats.rx_total_packets); printf("[stat]: tx_total_bytes: %" PRIu64 "\n", link_status.stats.tx_total_bytes); printf("[stat]: rx_total_bytes: %" PRIu64 "\n", link_status.stats.rx_total_bytes); printf("[stat]: tx_total_errors: %" PRIu64 "\n", link_status.stats.tx_total_errors); printf("[stat]: tx_total_retries: %" PRIu64 "\n", link_status.stats.tx_total_retries); printf("[stat]: tx_pmtu_errors: %" PRIu32 "\n", link_status.stats.tx_pmtu_errors); printf("[stat]: tx_pmtu_retries: %" PRIu32 "\n", link_status.stats.tx_pmtu_retries); printf("[stat]: tx_ping_errors: %" PRIu32 "\n", link_status.stats.tx_ping_errors); printf("[stat]: tx_ping_retries: %" PRIu32 "\n", link_status.stats.tx_ping_retries); printf("[stat]: tx_pong_errors: %" PRIu32 "\n", link_status.stats.tx_pong_errors); printf("[stat]: tx_pong_retries: %" PRIu32 "\n", link_status.stats.tx_pong_retries); printf("[stat]: tx_data_errors: %" PRIu32 "\n", link_status.stats.tx_data_errors); printf("[stat]: tx_data_retries: %" PRIu32 "\n", link_status.stats.tx_data_retries); printf("[stat]: latency_min: %" PRIu32 "\n", link_status.stats.latency_min); printf("[stat]: latency_max: %" PRIu32 "\n", link_status.stats.latency_max); printf("[stat]: latency_ave: %" PRIu32 "\n", link_status.stats.latency_ave); printf("[stat]: latency_samples: %" PRIu32 "\n", link_status.stats.latency_samples); printf("[stat]: down_count: %" PRIu32 "\n", link_status.stats.down_count); printf("[stat]: up_count: %" PRIu32 "\n", link_status.stats.up_count); } } } printf("\n"); printf("[stat]: Total link stats\n"); printf("[stat]: ----------------\n"); printf("[stat]: tx_data_packets: %" PRIu64 "\n", total_link_stats.tx_data_packets); printf("[stat]: rx_data_packets: %" PRIu64 "\n", total_link_stats.rx_data_packets); printf("[stat]: tx_data_bytes: %" PRIu64 "\n", total_link_stats.tx_data_bytes); printf("[stat]: rx_data_bytes: %" PRIu64 "\n", total_link_stats.rx_data_bytes); printf("[stat]: rx_ping_packets: %" PRIu64 "\n", total_link_stats.rx_ping_packets); printf("[stat]: tx_ping_packets: %" PRIu64 "\n", total_link_stats.tx_ping_packets); printf("[stat]: rx_ping_bytes: %" PRIu64 "\n", total_link_stats.rx_ping_bytes); printf("[stat]: tx_ping_bytes: %" PRIu64 "\n", total_link_stats.tx_ping_bytes); printf("[stat]: rx_pong_packets: %" PRIu64 "\n", total_link_stats.rx_pong_packets); printf("[stat]: tx_pong_packets: %" PRIu64 "\n", total_link_stats.tx_pong_packets); printf("[stat]: rx_pong_bytes: %" PRIu64 "\n", total_link_stats.rx_pong_bytes); printf("[stat]: tx_pong_bytes: %" PRIu64 "\n", total_link_stats.tx_pong_bytes); printf("[stat]: rx_pmtu_packets: %" PRIu64 "\n", total_link_stats.rx_pmtu_packets); printf("[stat]: tx_pmtu_packets: %" PRIu64 "\n", total_link_stats.tx_pmtu_packets); printf("[stat]: rx_pmtu_bytes: %" PRIu64 "\n", total_link_stats.rx_pmtu_bytes); printf("[stat]: tx_pmtu_bytes: %" PRIu64 "\n", total_link_stats.tx_pmtu_bytes); printf("[stat]: tx_total_packets: %" PRIu64 "\n", total_link_stats.tx_total_packets); printf("[stat]: rx_total_packets: %" PRIu64 "\n", total_link_stats.rx_total_packets); printf("[stat]: tx_total_bytes: %" PRIu64 "\n", total_link_stats.tx_total_bytes); printf("[stat]: rx_total_bytes: %" PRIu64 "\n", total_link_stats.rx_total_bytes); printf("[stat]: tx_total_errors: %" PRIu64 "\n", total_link_stats.tx_total_errors); printf("[stat]: tx_total_retries: %" PRIu64 "\n", total_link_stats.tx_total_retries); printf("[stat]: tx_pmtu_errors: %" PRIu32 "\n", total_link_stats.tx_pmtu_errors); printf("[stat]: tx_pmtu_retries: %" PRIu32 "\n", total_link_stats.tx_pmtu_retries); printf("[stat]: tx_ping_errors: %" PRIu32 "\n", total_link_stats.tx_ping_errors); printf("[stat]: tx_ping_retries: %" PRIu32 "\n", total_link_stats.tx_ping_retries); printf("[stat]: tx_pong_errors: %" PRIu32 "\n", total_link_stats.tx_pong_errors); printf("[stat]: tx_pong_retries: %" PRIu32 "\n", total_link_stats.tx_pong_retries); printf("[stat]: tx_data_errors: %" PRIu32 "\n", total_link_stats.tx_data_errors); printf("[stat]: tx_data_retries: %" PRIu32 "\n", total_link_stats.tx_data_retries); printf("[stat]: down_count: %" PRIu32 "\n", total_link_stats.down_count); printf("[stat]: up_count: %" PRIu32 "\n", total_link_stats.up_count); } static void send_perf_data_by_time(void) { char *tx_buf[PCKT_FRAG_MAX]; struct knet_mmsghdr msg[PCKT_FRAG_MAX]; struct iovec iov_out[PCKT_FRAG_MAX]; char ctrl_message[16]; int sent_msgs; int i; uint32_t packetsize = 64; struct timespec clock_start, clock_end; unsigned long long time_diff = 0; setup_send_buffers_common(msg, iov_out, tx_buf); memset(&clock_start, 0, sizeof(clock_start)); memset(&clock_end, 0, sizeof(clock_start)); while (packetsize <= KNET_MAX_PACKET_SIZE) { + if (force_packet_size) { + packetsize = force_packet_size; + } for (i = 0; i < PCKT_FRAG_MAX; i++) { iov_out[i].iov_len = packetsize; } printf("[info]: testing with %u bytes packet size for %" PRIu64 " seconds.\n", packetsize, perf_by_time_secs); memset(ctrl_message, 0, sizeof(ctrl_message)); knet_send(knet_h, ctrl_message, TEST_START, channel); if (clock_gettime(CLOCK_MONOTONIC, &clock_start) != 0) { printf("[info]: unable to get start time!\n"); } time_diff = 0; while (time_diff < (perf_by_time_secs * 1000000000llu)) { sent_msgs = send_messages(&msg[0], PCKT_FRAG_MAX); if (sent_msgs < 0) { printf("Something went wrong, aborting\n"); exit(FAIL); } if (clock_gettime(CLOCK_MONOTONIC, &clock_end) != 0) { printf("[info]: unable to get end time!\n"); } timespec_diff(clock_start, clock_end, &time_diff); } sleep(2); knet_send(knet_h, ctrl_message, TEST_STOP, channel); - if (packetsize == KNET_MAX_PACKET_SIZE) { + if ((packetsize == KNET_MAX_PACKET_SIZE) || (force_packet_size)) { break; } /* * Use a multiplier that can always divide properly a GB * into smaller chunks without worry about boundaries */ packetsize *= 4; if (packetsize > KNET_MAX_PACKET_SIZE) { packetsize = KNET_MAX_PACKET_SIZE; } } knet_send(knet_h, ctrl_message, TEST_COMPLETE, channel); for (i = 0; i < PCKT_FRAG_MAX; i++) { free(tx_buf[i]); } } static void cleanup_all(void) { if (pthread_mutex_lock(&shutdown_mutex)) { return; } if (bench_shutdown_in_progress) { pthread_mutex_unlock(&shutdown_mutex); return; } bench_shutdown_in_progress = 1; pthread_mutex_unlock(&shutdown_mutex); if (rx_thread) { stop_rx_thread(); } knet_handle_stop(knet_h); } static void sigint_handler(int signum) { printf("[info]: cleaning up... got signal: %d\n", signum); cleanup_all(); exit(PASS); } int main(int argc, char *argv[]) { if (signal(SIGINT, sigint_handler) == SIG_ERR) { printf("Unable to configure SIGINT handler\n"); exit(FAIL); } setup_knet(argc, argv); setup_data_txrx_common(); sleep(5); restart: switch(test_type) { default: case TEST_PING: /* basic ping, no data */ sleep(5); break; case TEST_PING_AND_DATA: send_ping_data(); break; case TEST_PERF_BY_SIZE: if (senderid == thisnodeid) { send_perf_data_by_size(); } else { printf("[info]: waiting for perf rx thread to finish\n"); while(!wait_for_perf_rx) { sleep(1); } } break; case TEST_PERF_BY_TIME: if (senderid == thisnodeid) { send_perf_data_by_time(); } else { printf("[info]: waiting for perf rx thread to finish\n"); while(!wait_for_perf_rx) { sleep(1); } } break; } if (continous) { goto restart; } if (show_stats) { display_stats(show_stats); } cleanup_all(); return PASS; } diff --git a/libknet/threads_pmtud.c b/libknet/threads_pmtud.c index 9e67e5fb..867b13bb 100644 --- a/libknet/threads_pmtud.c +++ b/libknet/threads_pmtud.c @@ -1,616 +1,623 @@ /* * Copyright (C) 2015-2019 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under LGPL-2.0+ */ #include "config.h" #include #include #include #include #include "crypto.h" #include "links.h" #include "host.h" #include "logging.h" #include "transports.h" #include "threads_common.h" #include "threads_pmtud.h" static int _calculate_manual_mtu(knet_handle_t knet_h, struct knet_link *dst_link) { size_t ipproto_overhead_len; /* onwire packet overhead (protocol based) */ switch (dst_link->dst_addr.ss_family) { case AF_INET6: ipproto_overhead_len = KNET_PMTUD_OVERHEAD_V6 + dst_link->proto_overhead; break; case AF_INET: ipproto_overhead_len = KNET_PMTUD_OVERHEAD_V4 + dst_link->proto_overhead; break; default: log_debug(knet_h, KNET_SUB_PMTUD, "unknown protocol"); return 0; break; } dst_link->status.mtu = calc_max_data_outlen(knet_h, knet_h->manual_mtu - ipproto_overhead_len); return 1; } static int _handle_check_link_pmtud(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link) { int err, ret, savederrno, mutex_retry_limit, failsafe, use_kernel_mtu, warn_once; uint32_t kernel_mtu; /* record kernel_mtu from EMSGSIZE */ size_t onwire_len; /* current packet onwire size */ size_t ipproto_overhead_len; /* onwire packet overhead (protocol based) */ size_t max_mtu_len; /* max mtu for protocol */ size_t data_len; /* how much data we can send in the packet * generally would be onwire_len - ipproto_overhead_len * needs to be adjusted for crypto */ size_t app_mtu_len; /* real data that we can send onwire */ ssize_t len; /* len of what we were able to sendto onwire */ struct timespec ts, pmtud_crypto_start_ts, pmtud_crypto_stop_ts; unsigned long long pong_timeout_adj_tmp, timediff; int pmtud_crypto_reduce = 1; unsigned char *outbuf = (unsigned char *)knet_h->pmtudbuf; warn_once = 0; mutex_retry_limit = 0; failsafe = 0; knet_h->pmtudbuf->khp_pmtud_link = dst_link->link_id; switch (dst_link->dst_addr.ss_family) { case AF_INET6: max_mtu_len = KNET_PMTUD_SIZE_V6; ipproto_overhead_len = KNET_PMTUD_OVERHEAD_V6 + dst_link->proto_overhead; break; case AF_INET: max_mtu_len = KNET_PMTUD_SIZE_V4; ipproto_overhead_len = KNET_PMTUD_OVERHEAD_V4 + dst_link->proto_overhead; break; default: log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD aborted, unknown protocol"); return -1; break; } dst_link->last_bad_mtu = 0; dst_link->last_good_mtu = dst_link->last_ping_size + ipproto_overhead_len; /* * discovery starts from the top because kernel will * refuse to send packets > current iface mtu. * this saves us some time and network bw. */ onwire_len = max_mtu_len; restart: /* * prevent a race when interface mtu is changed _exactly_ during * the discovery process and it's complex to detect. Easier * to wait the next loop. * 30 is not an arbitrary value. To bisect from 576 to 128000 doesn't * take more than 18/19 steps. */ if (failsafe == 30) { log_err(knet_h, KNET_SUB_PMTUD, "Aborting PMTUD process: Too many attempts. MTU might have changed during discovery."); return -1; } else { failsafe++; } /* * common to all packets */ /* * calculate the application MTU based on current onwire_len minus ipproto_overhead_len */ app_mtu_len = calc_max_data_outlen(knet_h, onwire_len - ipproto_overhead_len); /* * recalculate onwire len back that might be different based * on data padding from crypto layer. */ onwire_len = calc_data_outlen(knet_h, app_mtu_len + KNET_HEADER_ALL_SIZE) + ipproto_overhead_len; /* * calculate the size of what we need to send to sendto(2). * see also onwire.c for packet format explanation. */ data_len = app_mtu_len + knet_h->sec_hash_size + knet_h->sec_salt_size + KNET_HEADER_ALL_SIZE; if (knet_h->crypto_instance) { if (data_len < (knet_h->sec_hash_size + knet_h->sec_salt_size) + 1) { log_debug(knet_h, KNET_SUB_PMTUD, "Aborting PMTUD process: link mtu smaller than crypto header detected (link might have been disconnected)"); return -1; } knet_h->pmtudbuf->khp_pmtud_size = onwire_len; if (crypto_encrypt_and_sign(knet_h, (const unsigned char *)knet_h->pmtudbuf, data_len - (knet_h->sec_hash_size + knet_h->sec_salt_size), knet_h->pmtudbuf_crypt, (ssize_t *)&data_len) < 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to crypto pmtud packet"); return -1; } outbuf = knet_h->pmtudbuf_crypt; knet_h->stats_extra.tx_crypt_pmtu_packets++; } else { knet_h->pmtudbuf->khp_pmtud_size = onwire_len; } /* link has gone down, aborting pmtud */ if (dst_link->status.connected != 1) { log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD detected host (%u) link (%u) has been disconnected", dst_host->host_id, dst_link->link_id); return -1; } if (dst_link->transport_connected != 1) { log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD detected host (%u) link (%u) has been disconnected", dst_host->host_id, dst_link->link_id); return -1; } if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock"); return -1; } if (knet_h->pmtud_abort) { pthread_mutex_unlock(&knet_h->pmtud_mutex); errno = EDEADLK; return -1; } savederrno = pthread_mutex_lock(&knet_h->tx_mutex); if (savederrno) { log_err(knet_h, KNET_SUB_PMTUD, "Unable to get TX mutex lock: %s", strerror(savederrno)); return -1; } retry: if (transport_get_connection_oriented(knet_h, dst_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) { len = sendto(dst_link->outsock, outbuf, data_len, MSG_DONTWAIT | MSG_NOSIGNAL, (struct sockaddr *) &dst_link->dst_addr, sizeof(struct sockaddr_storage)); } else { len = sendto(dst_link->outsock, outbuf, data_len, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0); } savederrno = errno; /* * we cannot hold a lock on kmtu_mutex between resetting * knet_h->kernel_mtu here and below where it's used. * use_kernel_mtu tells us if the knet_h->kernel_mtu was * set to 0 and we can trust its value later. */ use_kernel_mtu = 0; if (pthread_mutex_lock(&knet_h->kmtu_mutex) == 0) { use_kernel_mtu = 1; knet_h->kernel_mtu = 0; pthread_mutex_unlock(&knet_h->kmtu_mutex); } kernel_mtu = 0; err = transport_tx_sock_error(knet_h, dst_link->transport, dst_link->outsock, len, savederrno); switch(err) { case -1: /* unrecoverable error */ log_debug(knet_h, KNET_SUB_PMTUD, "Unable to send pmtu packet (sendto): %d %s", savederrno, strerror(savederrno)); pthread_mutex_unlock(&knet_h->tx_mutex); pthread_mutex_unlock(&knet_h->pmtud_mutex); dst_link->status.stats.tx_pmtu_errors++; return -1; case 0: /* ignore error and continue */ break; case 1: /* retry to send those same data */ dst_link->status.stats.tx_pmtu_retries++; goto retry; break; } pthread_mutex_unlock(&knet_h->tx_mutex); if (len != (ssize_t )data_len) { if (savederrno == EMSGSIZE) { /* * we cannot hold a lock on kmtu_mutex between resetting * knet_h->kernel_mtu and here. * use_kernel_mtu tells us if the knet_h->kernel_mtu was * set to 0 previously and we can trust its value now. */ if (use_kernel_mtu) { use_kernel_mtu = 0; if (pthread_mutex_lock(&knet_h->kmtu_mutex) == 0) { kernel_mtu = knet_h->kernel_mtu; pthread_mutex_unlock(&knet_h->kmtu_mutex); } } if (kernel_mtu > 0) { dst_link->last_bad_mtu = kernel_mtu + 1; } else { dst_link->last_bad_mtu = onwire_len; } } else { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to send pmtu packet len: %zu err: %s", onwire_len, strerror(savederrno)); } } else { dst_link->last_sent_mtu = onwire_len; dst_link->last_recv_mtu = 0; dst_link->status.stats.tx_pmtu_packets++; dst_link->status.stats.tx_pmtu_bytes += data_len; if (clock_gettime(CLOCK_REALTIME, &ts) < 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get current time: %s", strerror(errno)); pthread_mutex_unlock(&knet_h->pmtud_mutex); return -1; } /* * non fatal, we can wait the next round to reduce the * multiplier */ if (clock_gettime(CLOCK_MONOTONIC, &pmtud_crypto_start_ts) < 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get current time: %s", strerror(errno)); pmtud_crypto_reduce = 0; } /* * set PMTUd reply timeout to match pong_timeout on a given link * * math: internally pong_timeout is expressed in microseconds, while * the public API exports milliseconds. So careful with the 0's here. * the loop is necessary because we are grabbing the current time just above * and add values to it that could overflow into seconds. */ if (pthread_mutex_lock(&knet_h->backoff_mutex)) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get backoff_mutex"); pthread_mutex_unlock(&knet_h->pmtud_mutex); return -1; } if (knet_h->crypto_instance) { /* * crypto, under pressure, is a royal PITA */ pong_timeout_adj_tmp = dst_link->pong_timeout_adj * dst_link->pmtud_crypto_timeout_multiplier; } else { pong_timeout_adj_tmp = dst_link->pong_timeout_adj; } ts.tv_sec += pong_timeout_adj_tmp / 1000000; ts.tv_nsec += (((pong_timeout_adj_tmp) % 1000000) * 1000); while (ts.tv_nsec > 1000000000) { ts.tv_sec += 1; ts.tv_nsec -= 1000000000; } pthread_mutex_unlock(&knet_h->backoff_mutex); knet_h->pmtud_waiting = 1; ret = pthread_cond_timedwait(&knet_h->pmtud_cond, &knet_h->pmtud_mutex, &ts); knet_h->pmtud_waiting = 0; if (knet_h->pmtud_abort) { pthread_mutex_unlock(&knet_h->pmtud_mutex); errno = EDEADLK; return -1; } /* * we cannot use shutdown_in_progress in here because * we already hold the read lock */ if (knet_h->fini_in_progress) { pthread_mutex_unlock(&knet_h->pmtud_mutex); log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD aborted. shutdown in progress"); return -1; } if (ret) { if (ret == ETIMEDOUT) { if ((knet_h->crypto_instance) && (dst_link->pmtud_crypto_timeout_multiplier < KNET_LINK_PMTUD_CRYPTO_TIMEOUT_MULTIPLIER_MAX)) { dst_link->pmtud_crypto_timeout_multiplier = dst_link->pmtud_crypto_timeout_multiplier * 2; pmtud_crypto_reduce = 0; log_debug(knet_h, KNET_SUB_PMTUD, "Increasing PMTUd response timeout multiplier to (%u) for host %u link: %u", dst_link->pmtud_crypto_timeout_multiplier, dst_host->host_id, dst_link->link_id); pthread_mutex_unlock(&knet_h->pmtud_mutex); goto restart; } if (!warn_once) { log_warn(knet_h, KNET_SUB_PMTUD, "possible MTU misconfiguration detected. " "kernel is reporting MTU: %u bytes for " "host %u link %u but the other node is " "not acknowledging packets of this size. ", dst_link->last_sent_mtu, dst_host->host_id, dst_link->link_id); log_warn(knet_h, KNET_SUB_PMTUD, "This can be caused by this node interface MTU " "too big or a network device that does not " "support or has been misconfigured to manage MTU " "of this size, or packet loss. knet will continue " "to run but performances might be affected."); warn_once = 1; } } else { pthread_mutex_unlock(&knet_h->pmtud_mutex); if (mutex_retry_limit == 3) { log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD aborted, unable to get mutex lock"); return -1; } mutex_retry_limit++; goto restart; } } if ((knet_h->crypto_instance) && (pmtud_crypto_reduce == 1) && (dst_link->pmtud_crypto_timeout_multiplier > KNET_LINK_PMTUD_CRYPTO_TIMEOUT_MULTIPLIER_MIN)) { if (!clock_gettime(CLOCK_MONOTONIC, &pmtud_crypto_stop_ts)) { timespec_diff(pmtud_crypto_start_ts, pmtud_crypto_stop_ts, &timediff); if (((pong_timeout_adj_tmp * 1000) / 2) > timediff) { dst_link->pmtud_crypto_timeout_multiplier = dst_link->pmtud_crypto_timeout_multiplier / 2; log_debug(knet_h, KNET_SUB_PMTUD, "Decreasing PMTUd response timeout multiplier to (%u) for host %u link: %u", dst_link->pmtud_crypto_timeout_multiplier, dst_host->host_id, dst_link->link_id); } } else { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get current time: %s", strerror(errno)); } } if ((dst_link->last_recv_mtu != onwire_len) || (ret)) { dst_link->last_bad_mtu = onwire_len; } else { int found_mtu = 0; if (knet_h->sec_block_size) { if ((onwire_len + knet_h->sec_block_size >= max_mtu_len) || ((dst_link->last_bad_mtu) && (dst_link->last_bad_mtu <= (onwire_len + knet_h->sec_block_size)))) { found_mtu = 1; } } else { if ((onwire_len == max_mtu_len) || ((dst_link->last_bad_mtu) && (dst_link->last_bad_mtu == (onwire_len + 1))) || (dst_link->last_bad_mtu == dst_link->last_good_mtu)) { found_mtu = 1; } } if (found_mtu) { /* * account for IP overhead, knet headers and crypto in PMTU calculation */ dst_link->status.mtu = calc_max_data_outlen(knet_h, onwire_len - ipproto_overhead_len); pthread_mutex_unlock(&knet_h->pmtud_mutex); return 0; } dst_link->last_good_mtu = onwire_len; } } if (kernel_mtu) { onwire_len = kernel_mtu; } else { onwire_len = (dst_link->last_good_mtu + dst_link->last_bad_mtu) / 2; } pthread_mutex_unlock(&knet_h->pmtud_mutex); goto restart; } static int _handle_check_pmtud(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link, int force_run) { uint8_t saved_valid_pmtud; unsigned int saved_pmtud; struct timespec clock_now; unsigned long long diff_pmtud, interval; if (clock_gettime(CLOCK_MONOTONIC, &clock_now) != 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get monotonic clock"); return 0; } if (!force_run) { interval = knet_h->pmtud_interval * 1000000000llu; /* nanoseconds */ timespec_diff(dst_link->pmtud_last, clock_now, &diff_pmtud); if (diff_pmtud < interval) { return dst_link->has_valid_mtu; } } /* * status.proto_overhead should include all IP/(UDP|SCTP)/knet headers * * please note that it is not the same as link->proto_overhead that * includes only either UDP or SCTP (at the moment) overhead. */ switch (dst_link->dst_addr.ss_family) { case AF_INET6: dst_link->status.proto_overhead = KNET_PMTUD_OVERHEAD_V6 + dst_link->proto_overhead + KNET_HEADER_ALL_SIZE + knet_h->sec_hash_size + knet_h->sec_salt_size; break; case AF_INET: dst_link->status.proto_overhead = KNET_PMTUD_OVERHEAD_V4 + dst_link->proto_overhead + KNET_HEADER_ALL_SIZE + knet_h->sec_hash_size + knet_h->sec_salt_size; break; } saved_pmtud = dst_link->status.mtu; saved_valid_pmtud = dst_link->has_valid_mtu; log_debug(knet_h, KNET_SUB_PMTUD, "Starting PMTUD for host: %u link: %u", dst_host->host_id, dst_link->link_id); errno = 0; if (_handle_check_link_pmtud(knet_h, dst_host, dst_link) < 0) { if (errno == EDEADLK) { log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD for host: %u link: %u has been rescheduled", dst_host->host_id, dst_link->link_id); dst_link->status.mtu = saved_pmtud; dst_link->has_valid_mtu = saved_valid_pmtud; errno = EDEADLK; return dst_link->has_valid_mtu; } dst_link->has_valid_mtu = 0; } else { - dst_link->has_valid_mtu = 1; + if (dst_link->status.mtu < calc_min_mtu(knet_h)) { + log_info(knet_h, KNET_SUB_PMTUD, + "Invalid MTU detected for host: %u link: %u mtu: %u", + dst_host->host_id, dst_link->link_id, dst_link->status.mtu); + dst_link->has_valid_mtu = 0; + } else { + dst_link->has_valid_mtu = 1; + } if (dst_link->has_valid_mtu) { if ((saved_pmtud) && (saved_pmtud != dst_link->status.mtu)) { log_info(knet_h, KNET_SUB_PMTUD, "PMTUD link change for host: %u link: %u from %u to %u", dst_host->host_id, dst_link->link_id, saved_pmtud, dst_link->status.mtu); } log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD completed for host: %u link: %u current link mtu: %u", dst_host->host_id, dst_link->link_id, dst_link->status.mtu); /* * set pmtud_last, if we can, after we are done with the PMTUd process * because it can take a very long time. */ dst_link->pmtud_last = clock_now; if (!clock_gettime(CLOCK_MONOTONIC, &clock_now)) { dst_link->pmtud_last = clock_now; } } } if (saved_valid_pmtud != dst_link->has_valid_mtu) { _host_dstcache_update_async(knet_h, dst_host); } return dst_link->has_valid_mtu; } void *_handle_pmtud_link_thread(void *data) { knet_handle_t knet_h = (knet_handle_t) data; struct knet_host *dst_host; struct knet_link *dst_link; int link_idx; unsigned int have_mtu; unsigned int lower_mtu; int link_has_mtu; int force_run = 0; set_thread_status(knet_h, KNET_THREAD_PMTUD, KNET_THREAD_STARTED); knet_h->data_mtu = calc_min_mtu(knet_h); /* preparing pmtu buffer */ knet_h->pmtudbuf->kh_version = KNET_HEADER_VERSION; knet_h->pmtudbuf->kh_type = KNET_HEADER_TYPE_PMTUD; knet_h->pmtudbuf->kh_node = htons(knet_h->host_id); while (!shutdown_in_progress(knet_h)) { usleep(knet_h->threads_timer_res); if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock"); continue; } knet_h->pmtud_abort = 0; knet_h->pmtud_running = 1; force_run = knet_h->pmtud_forcerun; knet_h->pmtud_forcerun = 0; pthread_mutex_unlock(&knet_h->pmtud_mutex); if (force_run) { log_debug(knet_h, KNET_SUB_PMTUD, "PMTUd request to rerun has been received"); } if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get read lock"); continue; } lower_mtu = KNET_PMTUD_SIZE_V4; have_mtu = 0; for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { dst_link = &dst_host->link[link_idx]; if ((dst_link->status.enabled != 1) || (dst_link->status.connected != 1) || (dst_host->link[link_idx].transport == KNET_TRANSPORT_LOOPBACK) || (!dst_link->last_ping_size) || ((dst_link->dynamic == KNET_LINK_DYNIP) && (dst_link->status.dynconnected != 1))) continue; if (!knet_h->manual_mtu) { link_has_mtu = _handle_check_pmtud(knet_h, dst_host, dst_link, force_run); if (errno == EDEADLK) { goto out_unlock; } if (link_has_mtu) { have_mtu = 1; if (dst_link->status.mtu < lower_mtu) { lower_mtu = dst_link->status.mtu; } } } else { link_has_mtu = _calculate_manual_mtu(knet_h, dst_link); if (link_has_mtu) { have_mtu = 1; if (dst_link->status.mtu < lower_mtu) { lower_mtu = dst_link->status.mtu; } } } } } if (have_mtu) { if (knet_h->data_mtu != lower_mtu) { knet_h->data_mtu = lower_mtu; log_info(knet_h, KNET_SUB_PMTUD, "Global data MTU changed to: %u", knet_h->data_mtu); if (knet_h->pmtud_notify_fn) { knet_h->pmtud_notify_fn(knet_h->pmtud_notify_fn_private_data, knet_h->data_mtu); } } } out_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock"); } else { knet_h->pmtud_running = 0; pthread_mutex_unlock(&knet_h->pmtud_mutex); } } set_thread_status(knet_h, KNET_THREAD_PMTUD, KNET_THREAD_STOPPED); return NULL; } diff --git a/libknet/threads_rx.c b/libknet/threads_rx.c index e529c16d..4693cc0b 100644 --- a/libknet/threads_rx.c +++ b/libknet/threads_rx.c @@ -1,897 +1,899 @@ /* * Copyright (C) 2012-2019 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include "compat.h" #include "compress.h" #include "crypto.h" #include "host.h" #include "links.h" #include "links_acl.h" #include "logging.h" #include "transports.h" #include "transport_common.h" #include "threads_common.h" #include "threads_heartbeat.h" #include "threads_rx.h" #include "netutils.h" /* * RECV */ /* * return 1 if a > b * return -1 if b > a * return 0 if they are equal */ static inline int timecmp(struct timespec a, struct timespec b) { if (a.tv_sec != b.tv_sec) { if (a.tv_sec > b.tv_sec) { return 1; } else { return -1; } } else { if (a.tv_nsec > b.tv_nsec) { return 1; } else if (a.tv_nsec < b.tv_nsec) { return -1; } else { return 0; } } } /* * this functions needs to return an index (0 to 7) * to a knet_host_defrag_buf. (-1 on errors) */ static int find_pckt_defrag_buf(knet_handle_t knet_h, struct knet_header *inbuf) { struct knet_host *src_host = knet_h->host_index[inbuf->kh_node]; int i, oldest; /* * check if there is a buffer already in use handling the same seq_num */ for (i = 0; i < KNET_MAX_LINK; i++) { if (src_host->defrag_buf[i].in_use) { if (src_host->defrag_buf[i].pckt_seq == inbuf->khp_data_seq_num) { return i; } } } /* * If there is no buffer that's handling the current seq_num * either it's new or it's been reclaimed already. * check if it's been reclaimed/seen before using the defrag circular * buffer. If the pckt has been seen before, the buffer expired (ETIME) * and there is no point to try to defrag it again. */ if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 1, 0)) { errno = ETIME; return -1; } /* * register the pckt as seen */ _seq_num_set(src_host, inbuf->khp_data_seq_num, 1); /* * see if there is a free buffer */ for (i = 0; i < KNET_MAX_LINK; i++) { if (!src_host->defrag_buf[i].in_use) { return i; } } /* * at this point, there are no free buffers, the pckt is new * and we need to reclaim a buffer, and we will take the one * with the oldest timestamp. It's as good as any. */ oldest = 0; for (i = 0; i < KNET_MAX_LINK; i++) { if (timecmp(src_host->defrag_buf[i].last_update, src_host->defrag_buf[oldest].last_update) < 0) { oldest = i; } } src_host->defrag_buf[oldest].in_use = 0; return oldest; } static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *len) { struct knet_host_defrag_buf *defrag_buf; int defrag_buf_idx; defrag_buf_idx = find_pckt_defrag_buf(knet_h, inbuf); if (defrag_buf_idx < 0) { if (errno == ETIME) { log_debug(knet_h, KNET_SUB_RX, "Defrag buffer expired"); } return 1; } defrag_buf = &knet_h->host_index[inbuf->kh_node]->defrag_buf[defrag_buf_idx]; /* * if the buf is not is use, then make sure it's clean */ if (!defrag_buf->in_use) { memset(defrag_buf, 0, sizeof(struct knet_host_defrag_buf)); defrag_buf->in_use = 1; defrag_buf->pckt_seq = inbuf->khp_data_seq_num; } /* * update timestamp on the buffer */ clock_gettime(CLOCK_MONOTONIC, &defrag_buf->last_update); /* * check if we already received this fragment */ if (defrag_buf->frag_map[inbuf->khp_data_frag_seq]) { /* * if we have received this fragment and we didn't clear the buffer * it means that we don't have all fragments yet */ return 1; } /* * we need to handle the last packet with gloves due to its different size */ if (inbuf->khp_data_frag_seq == inbuf->khp_data_frag_num) { defrag_buf->last_frag_size = *len; /* * in the event when the last packet arrives first, * we still don't know the offset vs the other fragments (based on MTU), * so we store the fragment at the end of the buffer where it's safe * and take a copy of the len so that we can restore its offset later. * remember we can't use the local MTU for this calculation because pMTU * can be asymettric between the same hosts. */ if (!defrag_buf->frag_size) { defrag_buf->last_first = 1; memmove(defrag_buf->buf + (KNET_MAX_PACKET_SIZE - *len), inbuf->khp_data_userdata, *len); } } else { defrag_buf->frag_size = *len; } - memmove(defrag_buf->buf + ((inbuf->khp_data_frag_seq - 1) * defrag_buf->frag_size), - inbuf->khp_data_userdata, *len); + if (defrag_buf->frag_size) { + memmove(defrag_buf->buf + ((inbuf->khp_data_frag_seq - 1) * defrag_buf->frag_size), + inbuf->khp_data_userdata, *len); + } defrag_buf->frag_recv++; defrag_buf->frag_map[inbuf->khp_data_frag_seq] = 1; /* * check if we received all the fragments */ if (defrag_buf->frag_recv == inbuf->khp_data_frag_num) { /* * special case the last pckt */ if (defrag_buf->last_first) { memmove(defrag_buf->buf + ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size), defrag_buf->buf + (KNET_MAX_PACKET_SIZE - defrag_buf->last_frag_size), defrag_buf->last_frag_size); } /* * recalculate packet lenght */ *len = ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size) + defrag_buf->last_frag_size; /* * copy the pckt back in the user data */ memmove(inbuf->khp_data_userdata, defrag_buf->buf, *len); /* * free this buffer */ defrag_buf->in_use = 0; return 0; } return 1; } static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struct knet_mmsghdr *msg) { int err = 0, savederrno = 0; ssize_t outlen; struct knet_host *src_host; struct knet_link *src_link; unsigned long long latency_last; knet_node_id_t dst_host_ids[KNET_MAX_HOST]; size_t dst_host_ids_entries = 0; int bcast = 1; int was_decrypted = 0; uint64_t crypt_time = 0; struct timespec recvtime; struct knet_header *inbuf = msg->msg_hdr.msg_iov->iov_base; unsigned char *outbuf = (unsigned char *)msg->msg_hdr.msg_iov->iov_base; ssize_t len = msg->msg_len; struct knet_hostinfo *knet_hostinfo; struct iovec iov_out[1]; int8_t channel; struct sockaddr_storage pckt_src; seq_num_t recv_seq_num; int wipe_bufs = 0; if (knet_h->crypto_instance) { struct timespec start_time; struct timespec end_time; clock_gettime(CLOCK_MONOTONIC, &start_time); if (crypto_authenticate_and_decrypt(knet_h, (unsigned char *)inbuf, len, knet_h->recv_from_links_buf_decrypt, &outlen) < 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to decrypt/auth packet"); return; } clock_gettime(CLOCK_MONOTONIC, &end_time); timespec_diff(start_time, end_time, &crypt_time); if (crypt_time < knet_h->stats.rx_crypt_time_min) { knet_h->stats.rx_crypt_time_min = crypt_time; } if (crypt_time > knet_h->stats.rx_crypt_time_max) { knet_h->stats.rx_crypt_time_max = crypt_time; } len = outlen; inbuf = (struct knet_header *)knet_h->recv_from_links_buf_decrypt; was_decrypted++; } if (len < (ssize_t)(KNET_HEADER_SIZE + 1)) { log_debug(knet_h, KNET_SUB_RX, "Packet is too short: %ld", (long)len); return; } if (inbuf->kh_version != KNET_HEADER_VERSION) { log_debug(knet_h, KNET_SUB_RX, "Packet version does not match"); return; } inbuf->kh_node = ntohs(inbuf->kh_node); src_host = knet_h->host_index[inbuf->kh_node]; if (src_host == NULL) { /* host not found */ log_debug(knet_h, KNET_SUB_RX, "Unable to find source host for this packet"); return; } src_link = NULL; src_link = src_host->link + (inbuf->khp_ping_link % KNET_MAX_LINK); if ((inbuf->kh_type & KNET_HEADER_TYPE_PMSK) != 0) { if (src_link->dynamic == KNET_LINK_DYNIP) { /* * cpyaddrport will only copy address and port of the incoming * packet and strip extra bits such as flow and scopeid */ cpyaddrport(&pckt_src, msg->msg_hdr.msg_name); if (cmpaddr(&src_link->dst_addr, sockaddr_len(&src_link->dst_addr), &pckt_src, sockaddr_len(&pckt_src)) != 0) { log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u appears to have changed ip address", src_host->host_id, src_link->link_id); memmove(&src_link->dst_addr, &pckt_src, sizeof(struct sockaddr_storage)); if (knet_addrtostr(&src_link->dst_addr, sockaddr_len(msg->msg_hdr.msg_name), src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN, src_link->status.dst_port, KNET_MAX_PORT_LEN) != 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to resolve ???"); snprintf(src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN - 1, "Unknown!!!"); snprintf(src_link->status.dst_port, KNET_MAX_PORT_LEN - 1, "??"); } else { log_info(knet_h, KNET_SUB_RX, "host: %u link: %u new connection established from: %s %s", src_host->host_id, src_link->link_id, src_link->status.dst_ipaddr, src_link->status.dst_port); } } /* * transport has already accepted the connection here * otherwise we would not be receiving packets */ transport_link_dyn_connect(knet_h, sockfd, src_link); } } switch (inbuf->kh_type) { case KNET_HEADER_TYPE_HOST_INFO: case KNET_HEADER_TYPE_DATA: /* * TODO: should we accept data even if we can't reply to the other node? * how would that work with SCTP and guaranteed delivery? */ if (!src_host->status.reachable) { log_debug(knet_h, KNET_SUB_RX, "Source host %u not reachable yet", src_host->host_id); //return; } inbuf->khp_data_seq_num = ntohs(inbuf->khp_data_seq_num); channel = inbuf->khp_data_channel; src_host->got_data = 1; if (src_link) { src_link->status.stats.rx_data_packets++; src_link->status.stats.rx_data_bytes += len; } if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) { if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) { log_debug(knet_h, KNET_SUB_RX, "Packet has already been delivered"); } return; } if (inbuf->khp_data_frag_num > 1) { /* * len as received from the socket also includes extra stuff * that the defrag code doesn't care about. So strip it * here and readd only for repadding once we are done * defragging */ len = len - KNET_HEADER_DATA_SIZE; if (pckt_defrag(knet_h, inbuf, &len)) { return; } len = len + KNET_HEADER_DATA_SIZE; } if (inbuf->khp_data_compress) { ssize_t decmp_outlen = KNET_DATABUFSIZE_COMPRESS; struct timespec start_time; struct timespec end_time; uint64_t compress_time; clock_gettime(CLOCK_MONOTONIC, &start_time); err = decompress(knet_h, inbuf->khp_data_compress, (const unsigned char *)inbuf->khp_data_userdata, len - KNET_HEADER_DATA_SIZE, knet_h->recv_from_links_buf_decompress, &decmp_outlen); if (!err) { /* Collect stats */ clock_gettime(CLOCK_MONOTONIC, &end_time); timespec_diff(start_time, end_time, &compress_time); if (compress_time < knet_h->stats.rx_compress_time_min) { knet_h->stats.rx_compress_time_min = compress_time; } if (compress_time > knet_h->stats.rx_compress_time_max) { knet_h->stats.rx_compress_time_max = compress_time; } knet_h->stats.rx_compress_time_ave = (knet_h->stats.rx_compress_time_ave * knet_h->stats.rx_compressed_packets + compress_time) / (knet_h->stats.rx_compressed_packets+1); knet_h->stats.rx_compressed_packets++; knet_h->stats.rx_compressed_original_bytes += decmp_outlen; knet_h->stats.rx_compressed_size_bytes += len - KNET_HEADER_SIZE; memmove(inbuf->khp_data_userdata, knet_h->recv_from_links_buf_decompress, decmp_outlen); len = decmp_outlen + KNET_HEADER_DATA_SIZE; } else { knet_h->stats.rx_failed_to_decompress++; log_warn(knet_h, KNET_SUB_COMPRESS, "Unable to decompress packet (%d): %s", err, strerror(errno)); return; } } if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) { if (knet_h->enabled != 1) /* data forward is disabled */ break; /* Only update the crypto overhead for data packets. Mainly to be consistent with TX */ knet_h->stats.rx_crypt_time_ave = (knet_h->stats.rx_crypt_time_ave * knet_h->stats.rx_crypt_packets + crypt_time) / (knet_h->stats.rx_crypt_packets+1); knet_h->stats.rx_crypt_packets++; if (knet_h->dst_host_filter_fn) { size_t host_idx; int found = 0; bcast = knet_h->dst_host_filter_fn( knet_h->dst_host_filter_fn_private_data, (const unsigned char *)inbuf->khp_data_userdata, len - KNET_HEADER_DATA_SIZE, KNET_NOTIFY_RX, knet_h->host_id, inbuf->kh_node, &channel, dst_host_ids, &dst_host_ids_entries); if (bcast < 0) { log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast); return; } if ((!bcast) && (!dst_host_ids_entries)) { log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries"); return; } /* check if we are dst for this packet */ if (!bcast) { if (dst_host_ids_entries > KNET_MAX_HOST) { log_debug(knet_h, KNET_SUB_RX, "dst_host_filter_fn returned too many destinations"); return; } for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) { if (dst_host_ids[host_idx] == knet_h->host_id) { found = 1; break; } } if (!found) { log_debug(knet_h, KNET_SUB_RX, "Packet is not for us"); return; } } } } if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) { if (!knet_h->sockfd[channel].in_use) { log_debug(knet_h, KNET_SUB_RX, "received packet for channel %d but there is no local sock connected", channel); return; } memset(iov_out, 0, sizeof(iov_out)); iov_out[0].iov_base = (void *) inbuf->khp_data_userdata; iov_out[0].iov_len = len - KNET_HEADER_DATA_SIZE; outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1); if (outlen <= 0) { knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data, knet_h->sockfd[channel].sockfd[0], channel, KNET_NOTIFY_RX, outlen, errno); return; } if ((size_t)outlen == iov_out[0].iov_len) { _seq_num_set(src_host, inbuf->khp_data_seq_num, 0); } } else { /* HOSTINFO */ knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata; if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) { knet_hostinfo->khi_dst_node_id = ntohs(knet_hostinfo->khi_dst_node_id); } if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) { return; } _seq_num_set(src_host, inbuf->khp_data_seq_num, 0); switch(knet_hostinfo->khi_type) { case KNET_HOSTINFO_TYPE_LINK_UP_DOWN: break; case KNET_HOSTINFO_TYPE_LINK_TABLE: break; default: log_warn(knet_h, KNET_SUB_RX, "Receiving unknown host info message from host %u", src_host->host_id); break; } } break; case KNET_HEADER_TYPE_PING: outlen = KNET_HEADER_PING_SIZE; inbuf->kh_type = KNET_HEADER_TYPE_PONG; inbuf->kh_node = htons(knet_h->host_id); recv_seq_num = ntohs(inbuf->khp_ping_seq_num); src_link->status.stats.rx_ping_packets++; src_link->status.stats.rx_ping_bytes += len; wipe_bufs = 0; if (!inbuf->khp_ping_timed) { /* * we might be receiving this message from all links, but we want * to process it only the first time */ if (recv_seq_num != src_host->untimed_rx_seq_num) { /* * cache the untimed seq num */ src_host->untimed_rx_seq_num = recv_seq_num; /* * if the host has received data in between * untimed ping, then we don't need to wipe the bufs */ if (src_host->got_data) { src_host->got_data = 0; wipe_bufs = 0; } else { wipe_bufs = 1; } } _seq_num_lookup(src_host, recv_seq_num, 0, wipe_bufs); } else { /* * pings always arrives in bursts over all the link * catch the first of them to cache the seq num and * avoid duplicate processing */ if (recv_seq_num != src_host->timed_rx_seq_num) { src_host->timed_rx_seq_num = recv_seq_num; if (recv_seq_num == 0) { _seq_num_lookup(src_host, recv_seq_num, 0, 1); } } } if (knet_h->crypto_instance) { if (crypto_encrypt_and_sign(knet_h, (const unsigned char *)inbuf, outlen, knet_h->recv_from_links_buf_crypt, &outlen) < 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt pong packet"); break; } outbuf = knet_h->recv_from_links_buf_crypt; knet_h->stats_extra.tx_crypt_pong_packets++; } retry_pong: if (transport_get_connection_oriented(knet_h, src_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) { len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, (struct sockaddr *) &src_link->dst_addr, sizeof(struct sockaddr_storage)); } else { len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0); } savederrno = errno; if (len != outlen) { err = transport_tx_sock_error(knet_h, src_link->transport, src_link->outsock, len, savederrno); switch(err) { case -1: /* unrecoverable error */ log_debug(knet_h, KNET_SUB_RX, "Unable to send pong reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s", src_link->outsock, errno, strerror(errno), src_link->status.src_ipaddr, src_link->status.src_port, src_link->status.dst_ipaddr, src_link->status.dst_port); src_link->status.stats.tx_pong_errors++; break; case 0: /* ignore error and continue */ break; case 1: /* retry to send those same data */ src_link->status.stats.tx_pong_retries++; goto retry_pong; break; } } src_link->status.stats.tx_pong_packets++; src_link->status.stats.tx_pong_bytes += outlen; break; case KNET_HEADER_TYPE_PONG: src_link->status.stats.rx_pong_packets++; src_link->status.stats.rx_pong_bytes += len; clock_gettime(CLOCK_MONOTONIC, &src_link->status.pong_last); memmove(&recvtime, &inbuf->khp_ping_time[0], sizeof(struct timespec)); timespec_diff(recvtime, src_link->status.pong_last, &latency_last); if ((latency_last / 1000llu) > src_link->pong_timeout) { log_debug(knet_h, KNET_SUB_RX, "Incoming pong packet from host: %u link: %u has higher latency than pong_timeout. Discarding", src_host->host_id, src_link->link_id); } else { src_link->status.latency = ((src_link->status.latency * src_link->latency_exp) + ((latency_last / 1000llu) * (src_link->latency_fix - src_link->latency_exp))) / src_link->latency_fix; if (src_link->status.latency < src_link->pong_timeout_adj) { if (!src_link->status.connected) { if (src_link->received_pong >= src_link->pong_count) { log_info(knet_h, KNET_SUB_RX, "host: %u link: %u is up", src_host->host_id, src_link->link_id); _link_updown(knet_h, src_host->host_id, src_link->link_id, src_link->status.enabled, 1); } else { src_link->received_pong++; log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u received pong: %u", src_host->host_id, src_link->link_id, src_link->received_pong); } } } /* Calculate latency stats */ if (src_link->status.latency > src_link->status.stats.latency_max) { src_link->status.stats.latency_max = src_link->status.latency; } if (src_link->status.latency < src_link->status.stats.latency_min) { src_link->status.stats.latency_min = src_link->status.latency; } src_link->status.stats.latency_ave = (src_link->status.stats.latency_ave * src_link->status.stats.latency_samples + src_link->status.latency) / (src_link->status.stats.latency_samples+1); src_link->status.stats.latency_samples++; } break; case KNET_HEADER_TYPE_PMTUD: src_link->status.stats.rx_pmtu_packets++; src_link->status.stats.rx_pmtu_bytes += len; outlen = KNET_HEADER_PMTUD_SIZE; inbuf->kh_type = KNET_HEADER_TYPE_PMTUD_REPLY; inbuf->kh_node = htons(knet_h->host_id); if (knet_h->crypto_instance) { if (crypto_encrypt_and_sign(knet_h, (const unsigned char *)inbuf, outlen, knet_h->recv_from_links_buf_crypt, &outlen) < 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt PMTUd reply packet"); break; } outbuf = knet_h->recv_from_links_buf_crypt; knet_h->stats_extra.tx_crypt_pmtu_reply_packets++; } savederrno = pthread_mutex_lock(&knet_h->tx_mutex); if (savederrno) { log_err(knet_h, KNET_SUB_RX, "Unable to get TX mutex lock: %s", strerror(savederrno)); goto out_pmtud; } retry_pmtud: if (transport_get_connection_oriented(knet_h, src_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) { len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, (struct sockaddr *) &src_link->dst_addr, sizeof(struct sockaddr_storage)); } else { len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0); } savederrno = errno; if (len != outlen) { err = transport_tx_sock_error(knet_h, src_link->transport, src_link->outsock, len, savederrno); switch(err) { case -1: /* unrecoverable error */ log_debug(knet_h, KNET_SUB_RX, "Unable to send PMTUd reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s", src_link->outsock, errno, strerror(errno), src_link->status.src_ipaddr, src_link->status.src_port, src_link->status.dst_ipaddr, src_link->status.dst_port); src_link->status.stats.tx_pmtu_errors++; break; case 0: /* ignore error and continue */ src_link->status.stats.tx_pmtu_errors++; break; case 1: /* retry to send those same data */ src_link->status.stats.tx_pmtu_retries++; goto retry_pmtud; break; } } pthread_mutex_unlock(&knet_h->tx_mutex); out_pmtud: break; case KNET_HEADER_TYPE_PMTUD_REPLY: src_link->status.stats.rx_pmtu_packets++; src_link->status.stats.rx_pmtu_bytes += len; if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to get mutex lock"); break; } src_link->last_recv_mtu = inbuf->khp_pmtud_size; pthread_cond_signal(&knet_h->pmtud_cond); pthread_mutex_unlock(&knet_h->pmtud_mutex); break; default: return; } } static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg) { int err, savederrno; int i, msg_recv, transport; if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to get global read lock"); return; } if (_is_valid_fd(knet_h, sockfd) < 1) { /* * this is normal if a fd got an event and before we grab the read lock * and the link is removed by another thread */ goto exit_unlock; } transport = knet_h->knet_transport_fd_tracker[sockfd].transport; /* * reset msg_namelen to buffer size because after recvmmsg * each msg_namelen will contain sizeof sockaddr_in or sockaddr_in6 */ for (i = 0; i < PCKT_RX_BUFS; i++) { msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); } msg_recv = _recvmmsg(sockfd, &msg[0], PCKT_RX_BUFS, MSG_DONTWAIT | MSG_NOSIGNAL); savederrno = errno; /* * WARNING: man page for recvmmsg is wrong. Kernel implementation here: * recvmmsg can return: * -1 on error * 0 if the previous run of recvmmsg recorded an error on the socket * N number of messages (see exception below). * * If there is an error from recvmsg after receiving a frame or more, the recvmmsg * loop is interrupted, error recorded in the socket (getsockopt(SO_ERROR) and * it will be visibile in the next run. * * Need to be careful how we handle errors at this stage. * * error messages need to be handled on a per transport/protocol base * at this point we have different layers of error handling * - msg_recv < 0 -> error from this run * msg_recv = 0 -> error from previous run and error on socket needs to be cleared * - per-transport message data * example: msg[i].msg_hdr.msg_flags & MSG_NOTIFICATION or msg_len for SCTP == EOF, * but for UDP it is perfectly legal to receive a 0 bytes message.. go figure * - NOTE: on SCTP MSG_NOTIFICATION we get msg_recv == PCKT_FRAG_MAX messages and no * errno set. That means the error api needs to be able to abort the loop below. */ if (msg_recv <= 0) { transport_rx_sock_error(knet_h, transport, sockfd, msg_recv, savederrno); goto exit_unlock; } for (i = 0; i < msg_recv; i++) { err = transport_rx_is_data(knet_h, transport, sockfd, &msg[i]); /* * TODO: make this section silent once we are confident * all protocols packet handlers are good */ switch(err) { case -1: /* on error */ log_debug(knet_h, KNET_SUB_RX, "Transport reported error parsing packet"); goto exit_unlock; break; case 0: /* packet is not data and we should continue the packet process loop */ log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, continue"); break; case 1: /* packet is not data and we should STOP the packet process loop */ log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, stop"); goto exit_unlock; break; case 2: /* packet is data and should be parsed as such */ /* * processing incoming packets vs access lists */ if ((knet_h->use_access_lists) && (transport_get_acl_type(knet_h, transport) == USE_GENERIC_ACL)) { if (!check_validate(knet_h, sockfd, transport, msg[i].msg_hdr.msg_name)) { char src_ipaddr[KNET_MAX_HOST_LEN]; char src_port[KNET_MAX_PORT_LEN]; memset(src_ipaddr, 0, KNET_MAX_HOST_LEN); memset(src_port, 0, KNET_MAX_PORT_LEN); if (knet_addrtostr(msg[i].msg_hdr.msg_name, sockaddr_len(msg[i].msg_hdr.msg_name), src_ipaddr, KNET_MAX_HOST_LEN, src_port, KNET_MAX_PORT_LEN) < 0) { log_debug(knet_h, KNET_SUB_RX, "Packet rejected: unable to resolve host/port"); } else { log_debug(knet_h, KNET_SUB_RX, "Packet rejected from %s/%s", src_ipaddr, src_port); } /* * continue processing the other packets */ continue; } } _parse_recv_from_links(knet_h, sockfd, &msg[i]); break; } } exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); } void *_handle_recv_from_links_thread(void *data) { int i, nev; knet_handle_t knet_h = (knet_handle_t) data; struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; struct sockaddr_storage address[PCKT_RX_BUFS]; struct knet_mmsghdr msg[PCKT_RX_BUFS]; struct iovec iov_in[PCKT_RX_BUFS]; set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_STARTED); memset(&msg, 0, sizeof(msg)); for (i = 0; i < PCKT_RX_BUFS; i++) { iov_in[i].iov_base = (void *)knet_h->recv_from_links_buf[i]; iov_in[i].iov_len = KNET_DATABUFSIZE; memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr)); msg[i].msg_hdr.msg_name = &address[i]; msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); msg[i].msg_hdr.msg_iov = &iov_in[i]; msg[i].msg_hdr.msg_iovlen = 1; } while (!shutdown_in_progress(knet_h)) { nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, knet_h->threads_timer_res / 1000); /* * the RX threads only need to notify that there has been at least * one successful run after queue flush has been requested. * See setfwd in handle.c */ if (get_thread_flush_queue(knet_h, KNET_THREAD_RX) == KNET_THREAD_QUEUE_FLUSH) { set_thread_flush_queue(knet_h, KNET_THREAD_RX, KNET_THREAD_QUEUE_FLUSHED); } /* * we use timeout to detect if thread is shutting down */ if (nev == 0) { continue; } for (i = 0; i < nev; i++) { _handle_recv_from_links(knet_h, events[i].data.fd, msg); } } set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_STOPPED); return NULL; }