diff --git a/libknet/threads_pmtud.c b/libknet/threads_pmtud.c index 9e433290..02945791 100644 --- a/libknet/threads_pmtud.c +++ b/libknet/threads_pmtud.c @@ -1,804 +1,895 @@ /* * Copyright (C) 2015-2020 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under LGPL-2.0+ */ #include "config.h" #include #include #include #include #include "crypto.h" #include "links.h" #include "host.h" #include "logging.h" #include "transports.h" #include "threads_common.h" #include "threads_pmtud.h" static int _calculate_manual_mtu(knet_handle_t knet_h, struct knet_link *dst_link) { size_t ipproto_overhead_len; /* onwire packet overhead (protocol based) */ switch (dst_link->dst_addr.ss_family) { case AF_INET6: ipproto_overhead_len = KNET_PMTUD_OVERHEAD_V6 + dst_link->proto_overhead; break; case AF_INET: ipproto_overhead_len = KNET_PMTUD_OVERHEAD_V4 + dst_link->proto_overhead; break; default: log_debug(knet_h, KNET_SUB_PMTUD, "unknown protocol"); return 0; break; } dst_link->status.mtu = calc_max_data_outlen(knet_h, knet_h->manual_mtu - ipproto_overhead_len); return 1; } static int _handle_check_link_pmtud(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link) { int err, ret, savederrno, mutex_retry_limit, failsafe, use_kernel_mtu, warn_once; uint32_t kernel_mtu; /* record kernel_mtu from EMSGSIZE */ size_t onwire_len; /* current packet onwire size */ size_t ipproto_overhead_len; /* onwire packet overhead (protocol based) */ size_t max_mtu_len; /* max mtu for protocol */ size_t data_len; /* how much data we can send in the packet * generally would be onwire_len - ipproto_overhead_len * needs to be adjusted for crypto */ size_t app_mtu_len; /* real data that we can send onwire */ ssize_t len; /* len of what we were able to sendto onwire */ struct timespec ts, pmtud_crypto_start_ts, pmtud_crypto_stop_ts; unsigned long long pong_timeout_adj_tmp, timediff; int pmtud_crypto_reduce = 1; unsigned char *outbuf = (unsigned char *)knet_h->pmtudbuf; warn_once = 0; mutex_retry_limit = 0; failsafe = 0; knet_h->pmtudbuf->khp_pmtud_link = dst_link->link_id; switch (dst_link->dst_addr.ss_family) { case AF_INET6: max_mtu_len = KNET_PMTUD_SIZE_V6; ipproto_overhead_len = KNET_PMTUD_OVERHEAD_V6 + dst_link->proto_overhead; break; case AF_INET: max_mtu_len = KNET_PMTUD_SIZE_V4; ipproto_overhead_len = KNET_PMTUD_OVERHEAD_V4 + dst_link->proto_overhead; break; default: log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD aborted, unknown protocol"); return -1; break; } dst_link->last_bad_mtu = 0; dst_link->last_good_mtu = dst_link->last_ping_size + ipproto_overhead_len; /* * discovery starts from the top because kernel will * refuse to send packets > current iface mtu. * this saves us some time and network bw. */ onwire_len = max_mtu_len; restart: /* * prevent a race when interface mtu is changed _exactly_ during * the discovery process and it's complex to detect. Easier * to wait the next loop. * 30 is not an arbitrary value. To bisect from 576 to 128000 doesn't * take more than 18/19 steps. */ if (failsafe == 30) { log_err(knet_h, KNET_SUB_PMTUD, "Aborting PMTUD process: Too many attempts. MTU might have changed during discovery."); return -1; } else { failsafe++; } /* * common to all packets */ /* * calculate the application MTU based on current onwire_len minus ipproto_overhead_len */ app_mtu_len = calc_max_data_outlen(knet_h, onwire_len - ipproto_overhead_len); /* * recalculate onwire len back that might be different based * on data padding from crypto layer. */ onwire_len = calc_data_outlen(knet_h, app_mtu_len + KNET_HEADER_ALL_SIZE) + ipproto_overhead_len; /* * calculate the size of what we need to send to sendto(2). * see also onwire.c for packet format explanation. */ data_len = app_mtu_len + knet_h->sec_hash_size + knet_h->sec_salt_size + KNET_HEADER_ALL_SIZE; if (knet_h->crypto_in_use_config) { if (data_len < (knet_h->sec_hash_size + knet_h->sec_salt_size) + 1) { log_debug(knet_h, KNET_SUB_PMTUD, "Aborting PMTUD process: link mtu smaller than crypto header detected (link might have been disconnected)"); return -1; } knet_h->pmtudbuf->khp_pmtud_size = onwire_len; if (crypto_encrypt_and_sign(knet_h, (const unsigned char *)knet_h->pmtudbuf, data_len - (knet_h->sec_hash_size + knet_h->sec_salt_size), knet_h->pmtudbuf_crypt, (ssize_t *)&data_len) < 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to crypto pmtud packet"); return -1; } outbuf = knet_h->pmtudbuf_crypt; if (pthread_mutex_lock(&knet_h->handle_stats_mutex) < 0) { log_err(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock"); return -1; } knet_h->stats_extra.tx_crypt_pmtu_packets++; pthread_mutex_unlock(&knet_h->handle_stats_mutex); } else { knet_h->pmtudbuf->khp_pmtud_size = onwire_len; } /* link has gone down, aborting pmtud */ if (dst_link->status.connected != 1) { log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD detected host (%u) link (%u) has been disconnected", dst_host->host_id, dst_link->link_id); return -1; } if (dst_link->transport_connected != 1) { log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD detected host (%u) link (%u) has been disconnected", dst_host->host_id, dst_link->link_id); return -1; } if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock"); return -1; } if (knet_h->pmtud_abort) { pthread_mutex_unlock(&knet_h->pmtud_mutex); errno = EDEADLK; return -1; } savederrno = pthread_mutex_lock(&knet_h->tx_mutex); if (savederrno) { pthread_mutex_unlock(&knet_h->pmtud_mutex); log_err(knet_h, KNET_SUB_PMTUD, "Unable to get TX mutex lock: %s", strerror(savederrno)); return -1; } savederrno = pthread_mutex_lock(&dst_link->link_stats_mutex); if (savederrno) { pthread_mutex_unlock(&knet_h->pmtud_mutex); pthread_mutex_unlock(&knet_h->tx_mutex); log_err(knet_h, KNET_SUB_PMTUD, "Unable to get stats mutex lock for host %u link %u: %s", dst_host->host_id, dst_link->link_id, strerror(savederrno)); return -1; } retry: if (transport_get_connection_oriented(knet_h, dst_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) { len = sendto(dst_link->outsock, outbuf, data_len, MSG_DONTWAIT | MSG_NOSIGNAL, (struct sockaddr *) &dst_link->dst_addr, sizeof(struct sockaddr_storage)); } else { len = sendto(dst_link->outsock, outbuf, data_len, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0); } savederrno = errno; /* * we cannot hold a lock on kmtu_mutex between resetting * knet_h->kernel_mtu here and below where it's used. * use_kernel_mtu tells us if the knet_h->kernel_mtu was * set to 0 and we can trust its value later. */ use_kernel_mtu = 0; if (pthread_mutex_lock(&knet_h->kmtu_mutex) == 0) { use_kernel_mtu = 1; knet_h->kernel_mtu = 0; pthread_mutex_unlock(&knet_h->kmtu_mutex); } kernel_mtu = 0; err = transport_tx_sock_error(knet_h, dst_link->transport, dst_link->outsock, len, savederrno); switch(err) { case -1: /* unrecoverable error */ log_debug(knet_h, KNET_SUB_PMTUD, "Unable to send pmtu packet (sendto): %d %s", savederrno, strerror(savederrno)); pthread_mutex_unlock(&knet_h->tx_mutex); pthread_mutex_unlock(&knet_h->pmtud_mutex); dst_link->status.stats.tx_pmtu_errors++; pthread_mutex_unlock(&dst_link->link_stats_mutex); return -1; case 0: /* ignore error and continue */ break; case 1: /* retry to send those same data */ dst_link->status.stats.tx_pmtu_retries++; goto retry; break; } pthread_mutex_unlock(&knet_h->tx_mutex); if (len != (ssize_t )data_len) { pthread_mutex_unlock(&dst_link->link_stats_mutex); if (savederrno == EMSGSIZE) { /* * we cannot hold a lock on kmtu_mutex between resetting * knet_h->kernel_mtu and here. * use_kernel_mtu tells us if the knet_h->kernel_mtu was * set to 0 previously and we can trust its value now. */ if (use_kernel_mtu) { use_kernel_mtu = 0; if (pthread_mutex_lock(&knet_h->kmtu_mutex) == 0) { kernel_mtu = knet_h->kernel_mtu; pthread_mutex_unlock(&knet_h->kmtu_mutex); } } if (kernel_mtu > 0) { dst_link->last_bad_mtu = kernel_mtu + 1; } else { dst_link->last_bad_mtu = onwire_len; } } else { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to send pmtu packet len: %zu err: %s", onwire_len, strerror(savederrno)); } } else { dst_link->last_sent_mtu = onwire_len; dst_link->last_recv_mtu = 0; dst_link->status.stats.tx_pmtu_packets++; dst_link->status.stats.tx_pmtu_bytes += data_len; pthread_mutex_unlock(&dst_link->link_stats_mutex); if (clock_gettime(CLOCK_REALTIME, &ts) < 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get current time: %s", strerror(errno)); pthread_mutex_unlock(&knet_h->pmtud_mutex); return -1; } /* * non fatal, we can wait the next round to reduce the * multiplier */ if (clock_gettime(CLOCK_MONOTONIC, &pmtud_crypto_start_ts) < 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get current time: %s", strerror(errno)); pmtud_crypto_reduce = 0; } /* * set PMTUd reply timeout to match pong_timeout on a given link * * math: internally pong_timeout is expressed in microseconds, while * the public API exports milliseconds. So careful with the 0's here. * the loop is necessary because we are grabbing the current time just above * and add values to it that could overflow into seconds. */ if (pthread_mutex_lock(&knet_h->backoff_mutex)) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get backoff_mutex"); pthread_mutex_unlock(&knet_h->pmtud_mutex); return -1; } if (knet_h->crypto_in_use_config) { /* * crypto, under pressure, is a royal PITA */ pong_timeout_adj_tmp = dst_link->pong_timeout_adj * dst_link->pmtud_crypto_timeout_multiplier; } else { pong_timeout_adj_tmp = dst_link->pong_timeout_adj; } ts.tv_sec += pong_timeout_adj_tmp / 1000000; ts.tv_nsec += (((pong_timeout_adj_tmp) % 1000000) * 1000); while (ts.tv_nsec > 1000000000) { ts.tv_sec += 1; ts.tv_nsec -= 1000000000; } pthread_mutex_unlock(&knet_h->backoff_mutex); knet_h->pmtud_waiting = 1; ret = pthread_cond_timedwait(&knet_h->pmtud_cond, &knet_h->pmtud_mutex, &ts); knet_h->pmtud_waiting = 0; if (knet_h->pmtud_abort) { pthread_mutex_unlock(&knet_h->pmtud_mutex); errno = EDEADLK; return -1; } /* * we cannot use shutdown_in_progress in here because * we already hold the read lock */ if (knet_h->fini_in_progress) { pthread_mutex_unlock(&knet_h->pmtud_mutex); log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD aborted. shutdown in progress"); return -1; } if (ret) { if (ret == ETIMEDOUT) { if ((knet_h->crypto_in_use_config) && (dst_link->pmtud_crypto_timeout_multiplier < KNET_LINK_PMTUD_CRYPTO_TIMEOUT_MULTIPLIER_MAX)) { dst_link->pmtud_crypto_timeout_multiplier = dst_link->pmtud_crypto_timeout_multiplier * 2; pmtud_crypto_reduce = 0; log_debug(knet_h, KNET_SUB_PMTUD, "Increasing PMTUd response timeout multiplier to (%u) for host %u link: %u", dst_link->pmtud_crypto_timeout_multiplier, dst_host->host_id, dst_link->link_id); pthread_mutex_unlock(&knet_h->pmtud_mutex); goto restart; } if (!warn_once) { log_warn(knet_h, KNET_SUB_PMTUD, "possible MTU misconfiguration detected. " "kernel is reporting MTU: %u bytes for " "host %u link %u but the other node is " "not acknowledging packets of this size. ", dst_link->last_sent_mtu, dst_host->host_id, dst_link->link_id); log_warn(knet_h, KNET_SUB_PMTUD, "This can be caused by this node interface MTU " "too big or a network device that does not " "support or has been misconfigured to manage MTU " "of this size, or packet loss. knet will continue " "to run but performances might be affected."); warn_once = 1; } } else { pthread_mutex_unlock(&knet_h->pmtud_mutex); if (mutex_retry_limit == 3) { log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD aborted, unable to get mutex lock"); return -1; } mutex_retry_limit++; goto restart; } } if ((knet_h->crypto_in_use_config) && (pmtud_crypto_reduce == 1) && (dst_link->pmtud_crypto_timeout_multiplier > KNET_LINK_PMTUD_CRYPTO_TIMEOUT_MULTIPLIER_MIN)) { if (!clock_gettime(CLOCK_MONOTONIC, &pmtud_crypto_stop_ts)) { timespec_diff(pmtud_crypto_start_ts, pmtud_crypto_stop_ts, &timediff); if (((pong_timeout_adj_tmp * 1000) / 2) > timediff) { dst_link->pmtud_crypto_timeout_multiplier = dst_link->pmtud_crypto_timeout_multiplier / 2; log_debug(knet_h, KNET_SUB_PMTUD, "Decreasing PMTUd response timeout multiplier to (%u) for host %u link: %u", dst_link->pmtud_crypto_timeout_multiplier, dst_host->host_id, dst_link->link_id); } } else { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get current time: %s", strerror(errno)); } } if ((dst_link->last_recv_mtu != onwire_len) || (ret)) { dst_link->last_bad_mtu = onwire_len; } else { int found_mtu = 0; if (knet_h->sec_block_size) { if ((onwire_len + knet_h->sec_block_size >= max_mtu_len) || ((dst_link->last_bad_mtu) && (dst_link->last_bad_mtu <= (onwire_len + knet_h->sec_block_size)))) { found_mtu = 1; } } else { if ((onwire_len == max_mtu_len) || ((dst_link->last_bad_mtu) && (dst_link->last_bad_mtu == (onwire_len + 1))) || (dst_link->last_bad_mtu == dst_link->last_good_mtu)) { found_mtu = 1; } } if (found_mtu) { /* * account for IP overhead, knet headers and crypto in PMTU calculation */ dst_link->status.mtu = calc_max_data_outlen(knet_h, onwire_len - ipproto_overhead_len); pthread_mutex_unlock(&knet_h->pmtud_mutex); return 0; } dst_link->last_good_mtu = onwire_len; } } if (kernel_mtu) { onwire_len = kernel_mtu; } else { onwire_len = (dst_link->last_good_mtu + dst_link->last_bad_mtu) / 2; } pthread_mutex_unlock(&knet_h->pmtud_mutex); goto restart; } static int _handle_check_pmtud(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link, int force_run) { uint8_t saved_valid_pmtud; unsigned int saved_pmtud; struct timespec clock_now; unsigned long long diff_pmtud, interval; if (clock_gettime(CLOCK_MONOTONIC, &clock_now) != 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get monotonic clock"); return 0; } if (!force_run) { interval = knet_h->pmtud_interval * 1000000000llu; /* nanoseconds */ timespec_diff(dst_link->pmtud_last, clock_now, &diff_pmtud); if (diff_pmtud < interval) { return dst_link->has_valid_mtu; } } /* * status.proto_overhead should include all IP/(UDP|SCTP)/knet headers * * please note that it is not the same as link->proto_overhead that * includes only either UDP or SCTP (at the moment) overhead. */ switch (dst_link->dst_addr.ss_family) { case AF_INET6: dst_link->status.proto_overhead = KNET_PMTUD_OVERHEAD_V6 + dst_link->proto_overhead + KNET_HEADER_ALL_SIZE + knet_h->sec_hash_size + knet_h->sec_salt_size; break; case AF_INET: dst_link->status.proto_overhead = KNET_PMTUD_OVERHEAD_V4 + dst_link->proto_overhead + KNET_HEADER_ALL_SIZE + knet_h->sec_hash_size + knet_h->sec_salt_size; break; } saved_pmtud = dst_link->status.mtu; saved_valid_pmtud = dst_link->has_valid_mtu; log_debug(knet_h, KNET_SUB_PMTUD, "Starting PMTUD for host: %u link: %u", dst_host->host_id, dst_link->link_id); errno = 0; if (_handle_check_link_pmtud(knet_h, dst_host, dst_link) < 0) { if (errno == EDEADLK) { log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD for host: %u link: %u has been rescheduled", dst_host->host_id, dst_link->link_id); dst_link->status.mtu = saved_pmtud; dst_link->has_valid_mtu = saved_valid_pmtud; errno = EDEADLK; return dst_link->has_valid_mtu; } dst_link->has_valid_mtu = 0; } else { if (dst_link->status.mtu < calc_min_mtu(knet_h)) { log_info(knet_h, KNET_SUB_PMTUD, "Invalid MTU detected for host: %u link: %u mtu: %u", dst_host->host_id, dst_link->link_id, dst_link->status.mtu); dst_link->has_valid_mtu = 0; } else { dst_link->has_valid_mtu = 1; } if (dst_link->has_valid_mtu) { if ((saved_pmtud) && (saved_pmtud != dst_link->status.mtu)) { log_info(knet_h, KNET_SUB_PMTUD, "PMTUD link change for host: %u link: %u from %u to %u", dst_host->host_id, dst_link->link_id, saved_pmtud, dst_link->status.mtu); } log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD completed for host: %u link: %u current link mtu: %u", dst_host->host_id, dst_link->link_id, dst_link->status.mtu); /* * set pmtud_last, if we can, after we are done with the PMTUd process * because it can take a very long time. */ dst_link->pmtud_last = clock_now; if (!clock_gettime(CLOCK_MONOTONIC, &clock_now)) { dst_link->pmtud_last = clock_now; } } } if (saved_valid_pmtud != dst_link->has_valid_mtu) { _host_dstcache_update_async(knet_h, dst_host); } return dst_link->has_valid_mtu; } void *_handle_pmtud_link_thread(void *data) { knet_handle_t knet_h = (knet_handle_t) data; struct knet_host *dst_host; struct knet_link *dst_link; int link_idx; unsigned int have_mtu; unsigned int lower_mtu; int link_has_mtu; int force_run = 0; set_thread_status(knet_h, KNET_THREAD_PMTUD, KNET_THREAD_STARTED); knet_h->data_mtu = calc_min_mtu(knet_h); /* preparing pmtu buffer */ knet_h->pmtudbuf->kh_version = KNET_HEADER_VERSION; knet_h->pmtudbuf->kh_type = KNET_HEADER_TYPE_PMTUD; knet_h->pmtudbuf->kh_node = htons(knet_h->host_id); while (!shutdown_in_progress(knet_h)) { usleep(knet_h->threads_timer_res); if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock"); continue; } knet_h->pmtud_abort = 0; knet_h->pmtud_running = 1; force_run = knet_h->pmtud_forcerun; knet_h->pmtud_forcerun = 0; pthread_mutex_unlock(&knet_h->pmtud_mutex); if (force_run) { log_debug(knet_h, KNET_SUB_PMTUD, "PMTUd request to rerun has been received"); } if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get read lock"); continue; } lower_mtu = KNET_PMTUD_SIZE_V4; have_mtu = 0; for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { dst_link = &dst_host->link[link_idx]; if ((dst_link->status.enabled != 1) || (dst_link->status.connected != 1) || (dst_host->link[link_idx].transport == KNET_TRANSPORT_LOOPBACK) || (!dst_link->last_ping_size) || ((dst_link->dynamic == KNET_LINK_DYNIP) && (dst_link->status.dynconnected != 1))) continue; if (!knet_h->manual_mtu) { link_has_mtu = _handle_check_pmtud(knet_h, dst_host, dst_link, force_run); if (errno == EDEADLK) { goto out_unlock; } if (link_has_mtu) { have_mtu = 1; if (dst_link->status.mtu < lower_mtu) { lower_mtu = dst_link->status.mtu; } } } else { link_has_mtu = _calculate_manual_mtu(knet_h, dst_link); if (link_has_mtu) { have_mtu = 1; if (dst_link->status.mtu < lower_mtu) { lower_mtu = dst_link->status.mtu; } } } } } if (have_mtu) { if (knet_h->data_mtu != lower_mtu) { knet_h->data_mtu = lower_mtu; log_info(knet_h, KNET_SUB_PMTUD, "Global data MTU changed to: %u", knet_h->data_mtu); if (knet_h->pmtud_notify_fn) { knet_h->pmtud_notify_fn(knet_h->pmtud_notify_fn_private_data, knet_h->data_mtu); } } } out_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock"); } else { knet_h->pmtud_running = 0; pthread_mutex_unlock(&knet_h->pmtud_mutex); } } set_thread_status(knet_h, KNET_THREAD_PMTUD, KNET_THREAD_STOPPED); return NULL; } +static void send_pmtud_reply(knet_handle_t knet_h, struct knet_link *src_link, struct knet_header *inbuf) +{ + int err = 0, savederrno = 0, stats_err = 0; + unsigned char *outbuf = (unsigned char *)inbuf; + ssize_t len, outlen; + + outlen = KNET_HEADER_PMTUD_SIZE; + inbuf->kh_type = KNET_HEADER_TYPE_PMTUD_REPLY; + inbuf->kh_node = htons(knet_h->host_id); + + if (knet_h->crypto_in_use_config) { + if (crypto_encrypt_and_sign(knet_h, + (const unsigned char *)inbuf, + outlen, + knet_h->recv_from_links_buf_crypt, + &outlen) < 0) { + log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt PMTUd reply packet"); + return; + } + outbuf = knet_h->recv_from_links_buf_crypt; + stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); + if (stats_err < 0) { + log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err)); + return; + } + knet_h->stats_extra.tx_crypt_pmtu_reply_packets++; + pthread_mutex_unlock(&knet_h->handle_stats_mutex); + } + + savederrno = pthread_mutex_lock(&knet_h->tx_mutex); + if (savederrno) { + log_err(knet_h, KNET_SUB_RX, "Unable to get TX mutex lock: %s", strerror(savederrno)); + return; + } +retry_pmtud: + if (src_link->transport_connected) { + if (transport_get_connection_oriented(knet_h, src_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) { + len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, + (struct sockaddr *) &src_link->dst_addr, sizeof(struct sockaddr_storage)); + } else { + len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0); + } + savederrno = errno; + if (len != outlen) { + err = transport_tx_sock_error(knet_h, src_link->transport, src_link->outsock, len, savederrno); + stats_err = pthread_mutex_lock(&src_link->link_stats_mutex); + if (stats_err < 0) { + log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err)); + return; + } + switch(err) { + case -1: /* unrecoverable error */ + log_debug(knet_h, KNET_SUB_RX, + "Unable to send PMTUd reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s", + src_link->outsock, errno, strerror(errno), + src_link->status.src_ipaddr, src_link->status.src_port, + src_link->status.dst_ipaddr, src_link->status.dst_port); + + src_link->status.stats.tx_pmtu_errors++; + break; + case 0: /* ignore error and continue */ + src_link->status.stats.tx_pmtu_errors++; + break; + case 1: /* retry to send those same data */ + src_link->status.stats.tx_pmtu_retries++; + pthread_mutex_unlock(&src_link->link_stats_mutex); + goto retry_pmtud; + break; + } + pthread_mutex_unlock(&src_link->link_stats_mutex); + } + } + pthread_mutex_unlock(&knet_h->tx_mutex); +} + +void process_pmtud(knet_handle_t knet_h, struct knet_link *src_link, struct knet_header *inbuf) +{ + send_pmtud_reply(knet_h, src_link, inbuf); +} + +void process_pmtud_reply(knet_handle_t knet_h, struct knet_link *src_link, struct knet_header *inbuf) +{ + if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) { + log_debug(knet_h, KNET_SUB_RX, "Unable to get mutex lock"); + return; + } + src_link->last_recv_mtu = inbuf->khp_pmtud_size; + pthread_cond_signal(&knet_h->pmtud_cond); + pthread_mutex_unlock(&knet_h->pmtud_mutex); +} + int knet_handle_pmtud_getfreq(knet_handle_t knet_h, unsigned int *interval) { int savederrno = 0; if (!knet_h) { errno = EINVAL; return -1; } if (!interval) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } *interval = knet_h->pmtud_interval; pthread_rwlock_unlock(&knet_h->global_rwlock); errno = 0; return 0; } int knet_handle_pmtud_setfreq(knet_handle_t knet_h, unsigned int interval) { int savederrno = 0; if (!knet_h) { errno = EINVAL; return -1; } if ((!interval) || (interval > 86400)) { errno = EINVAL; return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } knet_h->pmtud_interval = interval; log_debug(knet_h, KNET_SUB_HANDLE, "PMTUd interval set to: %u seconds", interval); pthread_rwlock_unlock(&knet_h->global_rwlock); errno = 0; return 0; } int knet_handle_enable_pmtud_notify(knet_handle_t knet_h, void *pmtud_notify_fn_private_data, void (*pmtud_notify_fn) ( void *private_data, unsigned int data_mtu)) { int savederrno = 0; if (!knet_h) { errno = EINVAL; return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } knet_h->pmtud_notify_fn_private_data = pmtud_notify_fn_private_data; knet_h->pmtud_notify_fn = pmtud_notify_fn; if (knet_h->pmtud_notify_fn) { log_debug(knet_h, KNET_SUB_HANDLE, "pmtud_notify_fn enabled"); } else { log_debug(knet_h, KNET_SUB_HANDLE, "pmtud_notify_fn disabled"); } pthread_rwlock_unlock(&knet_h->global_rwlock); errno = 0; return 0; } int knet_handle_pmtud_set(knet_handle_t knet_h, unsigned int iface_mtu) { int savederrno = 0; if (!knet_h) { errno = EINVAL; return -1; } if (iface_mtu > KNET_PMTUD_SIZE_V4) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_PMTUD, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } log_info(knet_h, KNET_SUB_PMTUD, "MTU manually set to: %u", iface_mtu); knet_h->manual_mtu = iface_mtu; force_pmtud_run(knet_h, KNET_SUB_PMTUD, 0); pthread_rwlock_unlock(&knet_h->global_rwlock); errno = 0; return 0; } int knet_handle_pmtud_get(knet_handle_t knet_h, unsigned int *data_mtu) { int savederrno = 0; if (!knet_h) { errno = EINVAL; return -1; } if (!data_mtu) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } *data_mtu = knet_h->data_mtu; pthread_rwlock_unlock(&knet_h->global_rwlock); errno = 0; return 0; } diff --git a/libknet/threads_pmtud.h b/libknet/threads_pmtud.h index c2c2c7b7..de12ea33 100644 --- a/libknet/threads_pmtud.h +++ b/libknet/threads_pmtud.h @@ -1,15 +1,18 @@ /* * Copyright (C) 2012-2020 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under LGPL-2.0+ */ #ifndef __KNET_THREADS_PMTUD_H__ #define __KNET_THREADS_PMTUD_H__ void *_handle_pmtud_link_thread(void *data); +void process_pmtud(knet_handle_t knet_h, struct knet_link *src_link, struct knet_header *inbuf); +void process_pmtud_reply(knet_handle_t knet_h, struct knet_link *src_link, struct knet_header *inbuf); + #endif diff --git a/libknet/threads_rx.c b/libknet/threads_rx.c index 0467b4dd..b43ca14e 100644 --- a/libknet/threads_rx.c +++ b/libknet/threads_rx.c @@ -1,892 +1,816 @@ /* * Copyright (C) 2012-2020 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include "compat.h" #include "compress.h" #include "crypto.h" #include "host.h" #include "links.h" #include "links_acl.h" #include "logging.h" #include "transports.h" #include "transport_common.h" #include "threads_common.h" #include "threads_heartbeat.h" +#include "threads_pmtud.h" #include "threads_rx.h" #include "netutils.h" /* * RECV */ /* * return 1 if a > b * return -1 if b > a * return 0 if they are equal */ static inline int timecmp(struct timespec a, struct timespec b) { if (a.tv_sec != b.tv_sec) { if (a.tv_sec > b.tv_sec) { return 1; } else { return -1; } } else { if (a.tv_nsec > b.tv_nsec) { return 1; } else if (a.tv_nsec < b.tv_nsec) { return -1; } else { return 0; } } } /* * this functions needs to return an index (0 to 7) * to a knet_host_defrag_buf. (-1 on errors) */ static int find_pckt_defrag_buf(knet_handle_t knet_h, struct knet_header *inbuf) { struct knet_host *src_host = knet_h->host_index[inbuf->kh_node]; int i, oldest; /* * check if there is a buffer already in use handling the same seq_num */ for (i = 0; i < KNET_MAX_LINK; i++) { if (src_host->defrag_buf[i].in_use) { if (src_host->defrag_buf[i].pckt_seq == inbuf->khp_data_seq_num) { return i; } } } /* * If there is no buffer that's handling the current seq_num * either it's new or it's been reclaimed already. * check if it's been reclaimed/seen before using the defrag circular * buffer. If the pckt has been seen before, the buffer expired (ETIME) * and there is no point to try to defrag it again. */ if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 1, 0)) { errno = ETIME; return -1; } /* * register the pckt as seen */ _seq_num_set(src_host, inbuf->khp_data_seq_num, 1); /* * see if there is a free buffer */ for (i = 0; i < KNET_MAX_LINK; i++) { if (!src_host->defrag_buf[i].in_use) { return i; } } /* * at this point, there are no free buffers, the pckt is new * and we need to reclaim a buffer, and we will take the one * with the oldest timestamp. It's as good as any. */ oldest = 0; for (i = 0; i < KNET_MAX_LINK; i++) { if (timecmp(src_host->defrag_buf[i].last_update, src_host->defrag_buf[oldest].last_update) < 0) { oldest = i; } } src_host->defrag_buf[oldest].in_use = 0; return oldest; } static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *len) { struct knet_host_defrag_buf *defrag_buf; int defrag_buf_idx; defrag_buf_idx = find_pckt_defrag_buf(knet_h, inbuf); if (defrag_buf_idx < 0) { return 1; } defrag_buf = &knet_h->host_index[inbuf->kh_node]->defrag_buf[defrag_buf_idx]; /* * if the buf is not is use, then make sure it's clean */ if (!defrag_buf->in_use) { memset(defrag_buf, 0, sizeof(struct knet_host_defrag_buf)); defrag_buf->in_use = 1; defrag_buf->pckt_seq = inbuf->khp_data_seq_num; } /* * update timestamp on the buffer */ clock_gettime(CLOCK_MONOTONIC, &defrag_buf->last_update); /* * check if we already received this fragment */ if (defrag_buf->frag_map[inbuf->khp_data_frag_seq]) { /* * if we have received this fragment and we didn't clear the buffer * it means that we don't have all fragments yet */ return 1; } /* * we need to handle the last packet with gloves due to its different size */ if (inbuf->khp_data_frag_seq == inbuf->khp_data_frag_num) { defrag_buf->last_frag_size = *len; /* * in the event when the last packet arrives first, * we still don't know the offset vs the other fragments (based on MTU), * so we store the fragment at the end of the buffer where it's safe * and take a copy of the len so that we can restore its offset later. * remember we can't use the local MTU for this calculation because pMTU * can be asymettric between the same hosts. */ if (!defrag_buf->frag_size) { defrag_buf->last_first = 1; memmove(defrag_buf->buf + (KNET_MAX_PACKET_SIZE - *len), inbuf->khp_data_userdata, *len); } } else { defrag_buf->frag_size = *len; } if (defrag_buf->frag_size) { memmove(defrag_buf->buf + ((inbuf->khp_data_frag_seq - 1) * defrag_buf->frag_size), inbuf->khp_data_userdata, *len); } defrag_buf->frag_recv++; defrag_buf->frag_map[inbuf->khp_data_frag_seq] = 1; /* * check if we received all the fragments */ if (defrag_buf->frag_recv == inbuf->khp_data_frag_num) { /* * special case the last pckt */ if (defrag_buf->last_first) { memmove(defrag_buf->buf + ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size), defrag_buf->buf + (KNET_MAX_PACKET_SIZE - defrag_buf->last_frag_size), defrag_buf->last_frag_size); } /* * recalculate packet lenght */ *len = ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size) + defrag_buf->last_frag_size; /* * copy the pckt back in the user data */ memmove(inbuf->khp_data_userdata, defrag_buf->buf, *len); /* * free this buffer */ defrag_buf->in_use = 0; return 0; } return 1; } static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struct knet_mmsghdr *msg) { int err = 0, savederrno = 0, stats_err = 0; ssize_t outlen; struct knet_host *src_host; struct knet_link *src_link; knet_node_id_t dst_host_ids[KNET_MAX_HOST]; size_t dst_host_ids_entries = 0; int bcast = 1; uint64_t decrypt_time = 0; struct knet_header *inbuf = msg->msg_hdr.msg_iov->iov_base; - unsigned char *outbuf = (unsigned char *)msg->msg_hdr.msg_iov->iov_base; ssize_t len = msg->msg_len; struct iovec iov_out[1]; int8_t channel; int try_decrypt = 0, decrypted = 0, i, found_link = 0; for (i = 1; i <= KNET_MAX_CRYPTO_INSTANCES; i++) { if (knet_h->crypto_instance[i]) { try_decrypt = 1; break; } } if ((!try_decrypt) && (knet_h->crypto_only == KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC)) { log_debug(knet_h, KNET_SUB_RX, "RX thread configured to accept only crypto packets, but no crypto configs are configured!"); return; } if (try_decrypt) { struct timespec start_time; struct timespec end_time; clock_gettime(CLOCK_MONOTONIC, &start_time); if (crypto_authenticate_and_decrypt(knet_h, (unsigned char *)inbuf, len, knet_h->recv_from_links_buf_decrypt, &outlen) < 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to decrypt/auth packet"); if (knet_h->crypto_only == KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC) { return; } log_debug(knet_h, KNET_SUB_RX, "Attempting to process packet as clear data"); } else { clock_gettime(CLOCK_MONOTONIC, &end_time); timespec_diff(start_time, end_time, &decrypt_time); len = outlen; inbuf = (struct knet_header *)knet_h->recv_from_links_buf_decrypt; decrypted = 1; } } if (len < (ssize_t)(KNET_HEADER_SIZE + 1)) { log_debug(knet_h, KNET_SUB_RX, "Packet is too short: %ld", (long)len); return; } if (inbuf->kh_version != KNET_HEADER_VERSION) { log_debug(knet_h, KNET_SUB_RX, "Packet version does not match"); return; } inbuf->kh_node = ntohs(inbuf->kh_node); src_host = knet_h->host_index[inbuf->kh_node]; if (src_host == NULL) { /* host not found */ log_debug(knet_h, KNET_SUB_RX, "Unable to find source host for this packet"); return; } if ((inbuf->kh_type & KNET_HEADER_TYPE_PMSK) != 0) { /* be aware this works only for PING / PONG and PMTUd packets! */ src_link = src_host->link + (inbuf->khp_ping_link % KNET_MAX_LINK); if (src_link->dynamic == KNET_LINK_DYNIP) { if (cmpaddr(&src_link->dst_addr, msg->msg_hdr.msg_name) != 0) { log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u appears to have changed ip address", src_host->host_id, src_link->link_id); memmove(&src_link->dst_addr, msg->msg_hdr.msg_name, sizeof(struct sockaddr_storage)); if (knet_addrtostr(&src_link->dst_addr, sockaddr_len(&src_link->dst_addr), src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN, src_link->status.dst_port, KNET_MAX_PORT_LEN) != 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to resolve ???"); snprintf(src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN - 1, "Unknown!!!"); snprintf(src_link->status.dst_port, KNET_MAX_PORT_LEN - 1, "??"); } else { log_info(knet_h, KNET_SUB_RX, "host: %u link: %u new connection established from: %s %s", src_host->host_id, src_link->link_id, src_link->status.dst_ipaddr, src_link->status.dst_port); } } /* * transport has already accepted the connection here * otherwise we would not be receiving packets */ transport_link_dyn_connect(knet_h, sockfd, src_link); } } else { /* data packet */ for (i = 0; i < KNET_MAX_LINK; i++) { src_link = &src_host->link[i]; if (cmpaddr(&src_link->dst_addr, msg->msg_hdr.msg_name) == 0) { found_link = 1; break; } } if (!found_link) { log_debug(knet_h, KNET_SUB_RX, "Unable to determine source link for data packet. Discarding packet."); return; } } stats_err = pthread_mutex_lock(&src_link->link_stats_mutex); if (stats_err) { log_err(knet_h, KNET_SUB_RX, "Unable to get stats mutex lock for host %u link %u: %s", src_host->host_id, src_link->link_id, strerror(savederrno)); return; } switch (inbuf->kh_type) { case KNET_HEADER_TYPE_DATA: /* data stats at the top for consistency with TX */ src_link->status.stats.rx_data_packets++; src_link->status.stats.rx_data_bytes += len; if (decrypted) { stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); if (stats_err < 0) { pthread_mutex_unlock(&src_link->link_stats_mutex); log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err)); return; } /* Only update the crypto overhead for data packets. Mainly to be consistent with TX */ if (decrypt_time < knet_h->stats.rx_crypt_time_min) { knet_h->stats.rx_crypt_time_min = decrypt_time; } if (decrypt_time > knet_h->stats.rx_crypt_time_max) { knet_h->stats.rx_crypt_time_max = decrypt_time; } knet_h->stats.rx_crypt_time_ave = (knet_h->stats.rx_crypt_time_ave * knet_h->stats.rx_crypt_packets + decrypt_time) / (knet_h->stats.rx_crypt_packets+1); knet_h->stats.rx_crypt_packets++; pthread_mutex_unlock(&knet_h->handle_stats_mutex); } if (!src_host->status.reachable) { pthread_mutex_unlock(&src_link->link_stats_mutex); log_debug(knet_h, KNET_SUB_RX, "Source host %u not reachable yet. Discarding packet.", src_host->host_id); return; } inbuf->khp_data_seq_num = ntohs(inbuf->khp_data_seq_num); channel = inbuf->khp_data_channel; src_host->got_data = 1; if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) { pthread_mutex_unlock(&src_link->link_stats_mutex); if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) { log_debug(knet_h, KNET_SUB_RX, "Packet has already been delivered"); } return; } if (inbuf->khp_data_frag_num > 1) { /* * len as received from the socket also includes extra stuff * that the defrag code doesn't care about. So strip it * here and readd only for repadding once we are done * defragging */ len = len - KNET_HEADER_DATA_SIZE; if (pckt_defrag(knet_h, inbuf, &len)) { pthread_mutex_unlock(&src_link->link_stats_mutex); return; } len = len + KNET_HEADER_DATA_SIZE; } if (inbuf->khp_data_compress) { ssize_t decmp_outlen = KNET_DATABUFSIZE_COMPRESS; struct timespec start_time; struct timespec end_time; uint64_t compress_time; clock_gettime(CLOCK_MONOTONIC, &start_time); err = decompress(knet_h, inbuf->khp_data_compress, (const unsigned char *)inbuf->khp_data_userdata, len - KNET_HEADER_DATA_SIZE, knet_h->recv_from_links_buf_decompress, &decmp_outlen); stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); if (stats_err < 0) { pthread_mutex_unlock(&src_link->link_stats_mutex); log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err)); return; } clock_gettime(CLOCK_MONOTONIC, &end_time); timespec_diff(start_time, end_time, &compress_time); if (!err) { /* Collect stats */ if (compress_time < knet_h->stats.rx_compress_time_min) { knet_h->stats.rx_compress_time_min = compress_time; } if (compress_time > knet_h->stats.rx_compress_time_max) { knet_h->stats.rx_compress_time_max = compress_time; } knet_h->stats.rx_compress_time_ave = (knet_h->stats.rx_compress_time_ave * knet_h->stats.rx_compressed_packets + compress_time) / (knet_h->stats.rx_compressed_packets+1); knet_h->stats.rx_compressed_packets++; knet_h->stats.rx_compressed_original_bytes += decmp_outlen; knet_h->stats.rx_compressed_size_bytes += len - KNET_HEADER_SIZE; memmove(inbuf->khp_data_userdata, knet_h->recv_from_links_buf_decompress, decmp_outlen); len = decmp_outlen + KNET_HEADER_DATA_SIZE; } else { knet_h->stats.rx_failed_to_decompress++; pthread_mutex_unlock(&knet_h->handle_stats_mutex); pthread_mutex_unlock(&src_link->link_stats_mutex); log_warn(knet_h, KNET_SUB_COMPRESS, "Unable to decompress packet (%d): %s", err, strerror(errno)); return; } pthread_mutex_unlock(&knet_h->handle_stats_mutex); } if (knet_h->enabled != 1) /* data forward is disabled */ break; if (knet_h->dst_host_filter_fn) { size_t host_idx; int found = 0; bcast = knet_h->dst_host_filter_fn( knet_h->dst_host_filter_fn_private_data, (const unsigned char *)inbuf->khp_data_userdata, len - KNET_HEADER_DATA_SIZE, KNET_NOTIFY_RX, knet_h->host_id, inbuf->kh_node, &channel, dst_host_ids, &dst_host_ids_entries); if (bcast < 0) { pthread_mutex_unlock(&src_link->link_stats_mutex); log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast); return; } if ((!bcast) && (!dst_host_ids_entries)) { pthread_mutex_unlock(&src_link->link_stats_mutex); log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries"); return; } /* check if we are dst for this packet */ if (!bcast) { if (dst_host_ids_entries > KNET_MAX_HOST) { pthread_mutex_unlock(&src_link->link_stats_mutex); log_debug(knet_h, KNET_SUB_RX, "dst_host_filter_fn returned too many destinations"); return; } for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) { if (dst_host_ids[host_idx] == knet_h->host_id) { found = 1; break; } } if (!found) { pthread_mutex_unlock(&src_link->link_stats_mutex); log_debug(knet_h, KNET_SUB_RX, "Packet is not for us"); return; } } } if (!knet_h->sockfd[channel].in_use) { pthread_mutex_unlock(&src_link->link_stats_mutex); log_debug(knet_h, KNET_SUB_RX, "received packet for channel %d but there is no local sock connected", channel); return; } outlen = 0; memset(iov_out, 0, sizeof(iov_out)); retry: iov_out[0].iov_base = (void *) inbuf->khp_data_userdata + outlen; iov_out[0].iov_len = len - (outlen + KNET_HEADER_DATA_SIZE); outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1); if ((outlen > 0) && (outlen < (ssize_t)iov_out[0].iov_len)) { log_debug(knet_h, KNET_SUB_RX, "Unable to send all data to the application in one go. Expected: %zu Sent: %zd\n", iov_out[0].iov_len, outlen); goto retry; } if (outlen <= 0) { knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data, knet_h->sockfd[channel].sockfd[0], channel, KNET_NOTIFY_RX, outlen, errno); pthread_mutex_unlock(&src_link->link_stats_mutex); return; } if ((size_t)outlen == iov_out[0].iov_len) { _seq_num_set(src_host, inbuf->khp_data_seq_num, 0); } break; case KNET_HEADER_TYPE_PING: process_ping(knet_h, src_host, src_link, inbuf, len); break; case KNET_HEADER_TYPE_PONG: process_pong(knet_h, src_host, src_link, inbuf, len); break; case KNET_HEADER_TYPE_PMTUD: src_link->status.stats.rx_pmtu_packets++; src_link->status.stats.rx_pmtu_bytes += len; - outlen = KNET_HEADER_PMTUD_SIZE; - inbuf->kh_type = KNET_HEADER_TYPE_PMTUD_REPLY; - inbuf->kh_node = htons(knet_h->host_id); - - if (knet_h->crypto_in_use_config) { - if (crypto_encrypt_and_sign(knet_h, - (const unsigned char *)inbuf, - outlen, - knet_h->recv_from_links_buf_crypt, - &outlen) < 0) { - log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt PMTUd reply packet"); - break; - } - outbuf = knet_h->recv_from_links_buf_crypt; - stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); - if (stats_err < 0) { - log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err)); - break; - } - knet_h->stats_extra.tx_crypt_pmtu_reply_packets++; - pthread_mutex_unlock(&knet_h->handle_stats_mutex); - } - /* Unlock so we don't deadlock with tx_mutex */ pthread_mutex_unlock(&src_link->link_stats_mutex); - - savederrno = pthread_mutex_lock(&knet_h->tx_mutex); - if (savederrno) { - log_err(knet_h, KNET_SUB_RX, "Unable to get TX mutex lock: %s", strerror(savederrno)); - goto out_pmtud; - } -retry_pmtud: - if (src_link->transport_connected) { - if (transport_get_connection_oriented(knet_h, src_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) { - len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, - (struct sockaddr *) &src_link->dst_addr, sizeof(struct sockaddr_storage)); - } else { - len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0); - } - savederrno = errno; - if (len != outlen) { - err = transport_tx_sock_error(knet_h, src_link->transport, src_link->outsock, len, savederrno); - stats_err = pthread_mutex_lock(&src_link->link_stats_mutex); - if (stats_err < 0) { - log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err)); - break; - } - switch(err) { - case -1: /* unrecoverable error */ - log_debug(knet_h, KNET_SUB_RX, - "Unable to send PMTUd reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s", - src_link->outsock, errno, strerror(errno), - src_link->status.src_ipaddr, src_link->status.src_port, - src_link->status.dst_ipaddr, src_link->status.dst_port); - - src_link->status.stats.tx_pmtu_errors++; - break; - case 0: /* ignore error and continue */ - src_link->status.stats.tx_pmtu_errors++; - break; - case 1: /* retry to send those same data */ - src_link->status.stats.tx_pmtu_retries++; - pthread_mutex_unlock(&src_link->link_stats_mutex); - goto retry_pmtud; - break; - } - pthread_mutex_unlock(&src_link->link_stats_mutex); - } - } - pthread_mutex_unlock(&knet_h->tx_mutex); -out_pmtud: + process_pmtud(knet_h, src_link, inbuf); return; /* Don't need to unlock link_stats_mutex */ case KNET_HEADER_TYPE_PMTUD_REPLY: src_link->status.stats.rx_pmtu_packets++; src_link->status.stats.rx_pmtu_bytes += len; - /* pmtud_mutex can't be acquired while we hold a link_stats_mutex (ordering) */ pthread_mutex_unlock(&src_link->link_stats_mutex); - - if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) { - log_debug(knet_h, KNET_SUB_RX, "Unable to get mutex lock"); - break; - } - src_link->last_recv_mtu = inbuf->khp_pmtud_size; - pthread_cond_signal(&knet_h->pmtud_cond); - pthread_mutex_unlock(&knet_h->pmtud_mutex); + process_pmtud_reply(knet_h, src_link, inbuf); return; default: pthread_mutex_unlock(&src_link->link_stats_mutex); return; } pthread_mutex_unlock(&src_link->link_stats_mutex); } static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg) { int err, savederrno; int i, msg_recv, transport; if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to get global read lock"); return; } if (_is_valid_fd(knet_h, sockfd) < 1) { /* * this is normal if a fd got an event and before we grab the read lock * and the link is removed by another thread */ goto exit_unlock; } transport = knet_h->knet_transport_fd_tracker[sockfd].transport; /* * reset msg_namelen to buffer size because after recvmmsg * each msg_namelen will contain sizeof sockaddr_in or sockaddr_in6 */ for (i = 0; i < PCKT_RX_BUFS; i++) { msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); } msg_recv = _recvmmsg(sockfd, &msg[0], PCKT_RX_BUFS, MSG_DONTWAIT | MSG_NOSIGNAL); savederrno = errno; /* * WARNING: man page for recvmmsg is wrong. Kernel implementation here: * recvmmsg can return: * -1 on error * 0 if the previous run of recvmmsg recorded an error on the socket * N number of messages (see exception below). * * If there is an error from recvmsg after receiving a frame or more, the recvmmsg * loop is interrupted, error recorded in the socket (getsockopt(SO_ERROR) and * it will be visibile in the next run. * * Need to be careful how we handle errors at this stage. * * error messages need to be handled on a per transport/protocol base * at this point we have different layers of error handling * - msg_recv < 0 -> error from this run * msg_recv = 0 -> error from previous run and error on socket needs to be cleared * - per-transport message data * example: msg[i].msg_hdr.msg_flags & MSG_NOTIFICATION or msg_len for SCTP == EOF, * but for UDP it is perfectly legal to receive a 0 bytes message.. go figure * - NOTE: on SCTP MSG_NOTIFICATION we get msg_recv == PCKT_FRAG_MAX messages and no * errno set. That means the error api needs to be able to abort the loop below. */ if (msg_recv <= 0) { transport_rx_sock_error(knet_h, transport, sockfd, msg_recv, savederrno); goto exit_unlock; } for (i = 0; i < msg_recv; i++) { err = transport_rx_is_data(knet_h, transport, sockfd, &msg[i]); /* * TODO: make this section silent once we are confident * all protocols packet handlers are good */ switch(err) { case KNET_TRANSPORT_RX_ERROR: /* on error */ log_debug(knet_h, KNET_SUB_RX, "Transport reported error parsing packet"); goto exit_unlock; break; case KNET_TRANSPORT_RX_NOT_DATA_CONTINUE: /* packet is not data and we should continue the packet process loop */ log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, continue"); break; case KNET_TRANSPORT_RX_NOT_DATA_STOP: /* packet is not data and we should STOP the packet process loop */ log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, stop"); goto exit_unlock; break; case KNET_TRANSPORT_RX_IS_DATA: /* packet is data and should be parsed as such */ /* * processing incoming packets vs access lists */ if ((knet_h->use_access_lists) && (transport_get_acl_type(knet_h, transport) == USE_GENERIC_ACL)) { if (!check_validate(knet_h, sockfd, transport, msg[i].msg_hdr.msg_name)) { char src_ipaddr[KNET_MAX_HOST_LEN]; char src_port[KNET_MAX_PORT_LEN]; memset(src_ipaddr, 0, KNET_MAX_HOST_LEN); memset(src_port, 0, KNET_MAX_PORT_LEN); if (knet_addrtostr(msg[i].msg_hdr.msg_name, sockaddr_len(msg[i].msg_hdr.msg_name), src_ipaddr, KNET_MAX_HOST_LEN, src_port, KNET_MAX_PORT_LEN) < 0) { log_debug(knet_h, KNET_SUB_RX, "Packet rejected: unable to resolve host/port"); } else { log_debug(knet_h, KNET_SUB_RX, "Packet rejected from %s/%s", src_ipaddr, src_port); } /* * continue processing the other packets */ continue; } } _parse_recv_from_links(knet_h, sockfd, &msg[i]); break; case KNET_TRANSPORT_RX_OOB_DATA_CONTINUE: log_debug(knet_h, KNET_SUB_RX, "Transport is processing sock OOB data, continue"); break; case KNET_TRANSPORT_RX_OOB_DATA_STOP: log_debug(knet_h, KNET_SUB_RX, "Transport has completed processing sock OOB data, stop"); goto exit_unlock; break; } } exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); } void *_handle_recv_from_links_thread(void *data) { int i, nev; knet_handle_t knet_h = (knet_handle_t) data; struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; struct sockaddr_storage address[PCKT_RX_BUFS]; struct knet_mmsghdr msg[PCKT_RX_BUFS]; struct iovec iov_in[PCKT_RX_BUFS]; set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_STARTED); memset(&msg, 0, sizeof(msg)); memset(&events, 0, sizeof(events)); for (i = 0; i < PCKT_RX_BUFS; i++) { iov_in[i].iov_base = (void *)knet_h->recv_from_links_buf[i]; iov_in[i].iov_len = KNET_DATABUFSIZE; memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr)); msg[i].msg_hdr.msg_name = &address[i]; msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); msg[i].msg_hdr.msg_iov = &iov_in[i]; msg[i].msg_hdr.msg_iovlen = 1; } while (!shutdown_in_progress(knet_h)) { nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, knet_h->threads_timer_res / 1000); /* * the RX threads only need to notify that there has been at least * one successful run after queue flush has been requested. * See setfwd in handle.c */ if (get_thread_flush_queue(knet_h, KNET_THREAD_RX) == KNET_THREAD_QUEUE_FLUSH) { set_thread_flush_queue(knet_h, KNET_THREAD_RX, KNET_THREAD_QUEUE_FLUSHED); } /* * we use timeout to detect if thread is shutting down */ if (nev == 0) { continue; } for (i = 0; i < nev; i++) { _handle_recv_from_links(knet_h, events[i].data.fd, msg); } } set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_STOPPED); return NULL; } ssize_t knet_recv(knet_handle_t knet_h, char *buff, const size_t buff_len, const int8_t channel) { int savederrno = 0; ssize_t err = 0; struct iovec iov_in; if (!knet_h) { errno = EINVAL; return -1; } if (buff == NULL) { errno = EINVAL; return -1; } if (buff_len <= 0) { errno = EINVAL; return -1; } if (buff_len > KNET_MAX_PACKET_SIZE) { errno = EINVAL; return -1; } if (channel < 0) { errno = EINVAL; return -1; } if (channel >= KNET_DATAFD_MAX) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->sockfd[channel].in_use) { savederrno = EINVAL; err = -1; goto out_unlock; } memset(&iov_in, 0, sizeof(iov_in)); iov_in.iov_base = (void *)buff; iov_in.iov_len = buff_len; err = readv(knet_h->sockfd[channel].sockfd[0], &iov_in, 1); savederrno = errno; out_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; }