Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F3151793
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
55 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/libknet/threads_pmtud.c b/libknet/threads_pmtud.c
index 9e433290..02945791 100644
--- a/libknet/threads_pmtud.c
+++ b/libknet/threads_pmtud.c
@@ -1,804 +1,895 @@
/*
* Copyright (C) 2015-2020 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under LGPL-2.0+
*/
#include "config.h"
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include "crypto.h"
#include "links.h"
#include "host.h"
#include "logging.h"
#include "transports.h"
#include "threads_common.h"
#include "threads_pmtud.h"
static int _calculate_manual_mtu(knet_handle_t knet_h, struct knet_link *dst_link)
{
size_t ipproto_overhead_len; /* onwire packet overhead (protocol based) */
switch (dst_link->dst_addr.ss_family) {
case AF_INET6:
ipproto_overhead_len = KNET_PMTUD_OVERHEAD_V6 + dst_link->proto_overhead;
break;
case AF_INET:
ipproto_overhead_len = KNET_PMTUD_OVERHEAD_V4 + dst_link->proto_overhead;
break;
default:
log_debug(knet_h, KNET_SUB_PMTUD, "unknown protocol");
return 0;
break;
}
dst_link->status.mtu = calc_max_data_outlen(knet_h, knet_h->manual_mtu - ipproto_overhead_len);
return 1;
}
static int _handle_check_link_pmtud(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link)
{
int err, ret, savederrno, mutex_retry_limit, failsafe, use_kernel_mtu, warn_once;
uint32_t kernel_mtu; /* record kernel_mtu from EMSGSIZE */
size_t onwire_len; /* current packet onwire size */
size_t ipproto_overhead_len; /* onwire packet overhead (protocol based) */
size_t max_mtu_len; /* max mtu for protocol */
size_t data_len; /* how much data we can send in the packet
* generally would be onwire_len - ipproto_overhead_len
* needs to be adjusted for crypto
*/
size_t app_mtu_len; /* real data that we can send onwire */
ssize_t len; /* len of what we were able to sendto onwire */
struct timespec ts, pmtud_crypto_start_ts, pmtud_crypto_stop_ts;
unsigned long long pong_timeout_adj_tmp, timediff;
int pmtud_crypto_reduce = 1;
unsigned char *outbuf = (unsigned char *)knet_h->pmtudbuf;
warn_once = 0;
mutex_retry_limit = 0;
failsafe = 0;
knet_h->pmtudbuf->khp_pmtud_link = dst_link->link_id;
switch (dst_link->dst_addr.ss_family) {
case AF_INET6:
max_mtu_len = KNET_PMTUD_SIZE_V6;
ipproto_overhead_len = KNET_PMTUD_OVERHEAD_V6 + dst_link->proto_overhead;
break;
case AF_INET:
max_mtu_len = KNET_PMTUD_SIZE_V4;
ipproto_overhead_len = KNET_PMTUD_OVERHEAD_V4 + dst_link->proto_overhead;
break;
default:
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD aborted, unknown protocol");
return -1;
break;
}
dst_link->last_bad_mtu = 0;
dst_link->last_good_mtu = dst_link->last_ping_size + ipproto_overhead_len;
/*
* discovery starts from the top because kernel will
* refuse to send packets > current iface mtu.
* this saves us some time and network bw.
*/
onwire_len = max_mtu_len;
restart:
/*
* prevent a race when interface mtu is changed _exactly_ during
* the discovery process and it's complex to detect. Easier
* to wait the next loop.
* 30 is not an arbitrary value. To bisect from 576 to 128000 doesn't
* take more than 18/19 steps.
*/
if (failsafe == 30) {
log_err(knet_h, KNET_SUB_PMTUD,
"Aborting PMTUD process: Too many attempts. MTU might have changed during discovery.");
return -1;
} else {
failsafe++;
}
/*
* common to all packets
*/
/*
* calculate the application MTU based on current onwire_len minus ipproto_overhead_len
*/
app_mtu_len = calc_max_data_outlen(knet_h, onwire_len - ipproto_overhead_len);
/*
* recalculate onwire len back that might be different based
* on data padding from crypto layer.
*/
onwire_len = calc_data_outlen(knet_h, app_mtu_len + KNET_HEADER_ALL_SIZE) + ipproto_overhead_len;
/*
* calculate the size of what we need to send to sendto(2).
* see also onwire.c for packet format explanation.
*/
data_len = app_mtu_len + knet_h->sec_hash_size + knet_h->sec_salt_size + KNET_HEADER_ALL_SIZE;
if (knet_h->crypto_in_use_config) {
if (data_len < (knet_h->sec_hash_size + knet_h->sec_salt_size) + 1) {
log_debug(knet_h, KNET_SUB_PMTUD, "Aborting PMTUD process: link mtu smaller than crypto header detected (link might have been disconnected)");
return -1;
}
knet_h->pmtudbuf->khp_pmtud_size = onwire_len;
if (crypto_encrypt_and_sign(knet_h,
(const unsigned char *)knet_h->pmtudbuf,
data_len - (knet_h->sec_hash_size + knet_h->sec_salt_size),
knet_h->pmtudbuf_crypt,
(ssize_t *)&data_len) < 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to crypto pmtud packet");
return -1;
}
outbuf = knet_h->pmtudbuf_crypt;
if (pthread_mutex_lock(&knet_h->handle_stats_mutex) < 0) {
log_err(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock");
return -1;
}
knet_h->stats_extra.tx_crypt_pmtu_packets++;
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
} else {
knet_h->pmtudbuf->khp_pmtud_size = onwire_len;
}
/* link has gone down, aborting pmtud */
if (dst_link->status.connected != 1) {
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD detected host (%u) link (%u) has been disconnected", dst_host->host_id, dst_link->link_id);
return -1;
}
if (dst_link->transport_connected != 1) {
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD detected host (%u) link (%u) has been disconnected", dst_host->host_id, dst_link->link_id);
return -1;
}
if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock");
return -1;
}
if (knet_h->pmtud_abort) {
pthread_mutex_unlock(&knet_h->pmtud_mutex);
errno = EDEADLK;
return -1;
}
savederrno = pthread_mutex_lock(&knet_h->tx_mutex);
if (savederrno) {
pthread_mutex_unlock(&knet_h->pmtud_mutex);
log_err(knet_h, KNET_SUB_PMTUD, "Unable to get TX mutex lock: %s", strerror(savederrno));
return -1;
}
savederrno = pthread_mutex_lock(&dst_link->link_stats_mutex);
if (savederrno) {
pthread_mutex_unlock(&knet_h->pmtud_mutex);
pthread_mutex_unlock(&knet_h->tx_mutex);
log_err(knet_h, KNET_SUB_PMTUD, "Unable to get stats mutex lock for host %u link %u: %s",
dst_host->host_id, dst_link->link_id, strerror(savederrno));
return -1;
}
retry:
if (transport_get_connection_oriented(knet_h, dst_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) {
len = sendto(dst_link->outsock, outbuf, data_len, MSG_DONTWAIT | MSG_NOSIGNAL,
(struct sockaddr *) &dst_link->dst_addr, sizeof(struct sockaddr_storage));
} else {
len = sendto(dst_link->outsock, outbuf, data_len, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0);
}
savederrno = errno;
/*
* we cannot hold a lock on kmtu_mutex between resetting
* knet_h->kernel_mtu here and below where it's used.
* use_kernel_mtu tells us if the knet_h->kernel_mtu was
* set to 0 and we can trust its value later.
*/
use_kernel_mtu = 0;
if (pthread_mutex_lock(&knet_h->kmtu_mutex) == 0) {
use_kernel_mtu = 1;
knet_h->kernel_mtu = 0;
pthread_mutex_unlock(&knet_h->kmtu_mutex);
}
kernel_mtu = 0;
err = transport_tx_sock_error(knet_h, dst_link->transport, dst_link->outsock, len, savederrno);
switch(err) {
case -1: /* unrecoverable error */
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to send pmtu packet (sendto): %d %s", savederrno, strerror(savederrno));
pthread_mutex_unlock(&knet_h->tx_mutex);
pthread_mutex_unlock(&knet_h->pmtud_mutex);
dst_link->status.stats.tx_pmtu_errors++;
pthread_mutex_unlock(&dst_link->link_stats_mutex);
return -1;
case 0: /* ignore error and continue */
break;
case 1: /* retry to send those same data */
dst_link->status.stats.tx_pmtu_retries++;
goto retry;
break;
}
pthread_mutex_unlock(&knet_h->tx_mutex);
if (len != (ssize_t )data_len) {
pthread_mutex_unlock(&dst_link->link_stats_mutex);
if (savederrno == EMSGSIZE) {
/*
* we cannot hold a lock on kmtu_mutex between resetting
* knet_h->kernel_mtu and here.
* use_kernel_mtu tells us if the knet_h->kernel_mtu was
* set to 0 previously and we can trust its value now.
*/
if (use_kernel_mtu) {
use_kernel_mtu = 0;
if (pthread_mutex_lock(&knet_h->kmtu_mutex) == 0) {
kernel_mtu = knet_h->kernel_mtu;
pthread_mutex_unlock(&knet_h->kmtu_mutex);
}
}
if (kernel_mtu > 0) {
dst_link->last_bad_mtu = kernel_mtu + 1;
} else {
dst_link->last_bad_mtu = onwire_len;
}
} else {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to send pmtu packet len: %zu err: %s", onwire_len, strerror(savederrno));
}
} else {
dst_link->last_sent_mtu = onwire_len;
dst_link->last_recv_mtu = 0;
dst_link->status.stats.tx_pmtu_packets++;
dst_link->status.stats.tx_pmtu_bytes += data_len;
pthread_mutex_unlock(&dst_link->link_stats_mutex);
if (clock_gettime(CLOCK_REALTIME, &ts) < 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get current time: %s", strerror(errno));
pthread_mutex_unlock(&knet_h->pmtud_mutex);
return -1;
}
/*
* non fatal, we can wait the next round to reduce the
* multiplier
*/
if (clock_gettime(CLOCK_MONOTONIC, &pmtud_crypto_start_ts) < 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get current time: %s", strerror(errno));
pmtud_crypto_reduce = 0;
}
/*
* set PMTUd reply timeout to match pong_timeout on a given link
*
* math: internally pong_timeout is expressed in microseconds, while
* the public API exports milliseconds. So careful with the 0's here.
* the loop is necessary because we are grabbing the current time just above
* and add values to it that could overflow into seconds.
*/
if (pthread_mutex_lock(&knet_h->backoff_mutex)) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get backoff_mutex");
pthread_mutex_unlock(&knet_h->pmtud_mutex);
return -1;
}
if (knet_h->crypto_in_use_config) {
/*
* crypto, under pressure, is a royal PITA
*/
pong_timeout_adj_tmp = dst_link->pong_timeout_adj * dst_link->pmtud_crypto_timeout_multiplier;
} else {
pong_timeout_adj_tmp = dst_link->pong_timeout_adj;
}
ts.tv_sec += pong_timeout_adj_tmp / 1000000;
ts.tv_nsec += (((pong_timeout_adj_tmp) % 1000000) * 1000);
while (ts.tv_nsec > 1000000000) {
ts.tv_sec += 1;
ts.tv_nsec -= 1000000000;
}
pthread_mutex_unlock(&knet_h->backoff_mutex);
knet_h->pmtud_waiting = 1;
ret = pthread_cond_timedwait(&knet_h->pmtud_cond, &knet_h->pmtud_mutex, &ts);
knet_h->pmtud_waiting = 0;
if (knet_h->pmtud_abort) {
pthread_mutex_unlock(&knet_h->pmtud_mutex);
errno = EDEADLK;
return -1;
}
/*
* we cannot use shutdown_in_progress in here because
* we already hold the read lock
*/
if (knet_h->fini_in_progress) {
pthread_mutex_unlock(&knet_h->pmtud_mutex);
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD aborted. shutdown in progress");
return -1;
}
if (ret) {
if (ret == ETIMEDOUT) {
if ((knet_h->crypto_in_use_config) && (dst_link->pmtud_crypto_timeout_multiplier < KNET_LINK_PMTUD_CRYPTO_TIMEOUT_MULTIPLIER_MAX)) {
dst_link->pmtud_crypto_timeout_multiplier = dst_link->pmtud_crypto_timeout_multiplier * 2;
pmtud_crypto_reduce = 0;
log_debug(knet_h, KNET_SUB_PMTUD,
"Increasing PMTUd response timeout multiplier to (%u) for host %u link: %u",
dst_link->pmtud_crypto_timeout_multiplier,
dst_host->host_id,
dst_link->link_id);
pthread_mutex_unlock(&knet_h->pmtud_mutex);
goto restart;
}
if (!warn_once) {
log_warn(knet_h, KNET_SUB_PMTUD,
"possible MTU misconfiguration detected. "
"kernel is reporting MTU: %u bytes for "
"host %u link %u but the other node is "
"not acknowledging packets of this size. ",
dst_link->last_sent_mtu,
dst_host->host_id,
dst_link->link_id);
log_warn(knet_h, KNET_SUB_PMTUD,
"This can be caused by this node interface MTU "
"too big or a network device that does not "
"support or has been misconfigured to manage MTU "
"of this size, or packet loss. knet will continue "
"to run but performances might be affected.");
warn_once = 1;
}
} else {
pthread_mutex_unlock(&knet_h->pmtud_mutex);
if (mutex_retry_limit == 3) {
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD aborted, unable to get mutex lock");
return -1;
}
mutex_retry_limit++;
goto restart;
}
}
if ((knet_h->crypto_in_use_config) && (pmtud_crypto_reduce == 1) &&
(dst_link->pmtud_crypto_timeout_multiplier > KNET_LINK_PMTUD_CRYPTO_TIMEOUT_MULTIPLIER_MIN)) {
if (!clock_gettime(CLOCK_MONOTONIC, &pmtud_crypto_stop_ts)) {
timespec_diff(pmtud_crypto_start_ts, pmtud_crypto_stop_ts, &timediff);
if (((pong_timeout_adj_tmp * 1000) / 2) > timediff) {
dst_link->pmtud_crypto_timeout_multiplier = dst_link->pmtud_crypto_timeout_multiplier / 2;
log_debug(knet_h, KNET_SUB_PMTUD,
"Decreasing PMTUd response timeout multiplier to (%u) for host %u link: %u",
dst_link->pmtud_crypto_timeout_multiplier,
dst_host->host_id,
dst_link->link_id);
}
} else {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get current time: %s", strerror(errno));
}
}
if ((dst_link->last_recv_mtu != onwire_len) || (ret)) {
dst_link->last_bad_mtu = onwire_len;
} else {
int found_mtu = 0;
if (knet_h->sec_block_size) {
if ((onwire_len + knet_h->sec_block_size >= max_mtu_len) ||
((dst_link->last_bad_mtu) && (dst_link->last_bad_mtu <= (onwire_len + knet_h->sec_block_size)))) {
found_mtu = 1;
}
} else {
if ((onwire_len == max_mtu_len) ||
((dst_link->last_bad_mtu) && (dst_link->last_bad_mtu == (onwire_len + 1))) ||
(dst_link->last_bad_mtu == dst_link->last_good_mtu)) {
found_mtu = 1;
}
}
if (found_mtu) {
/*
* account for IP overhead, knet headers and crypto in PMTU calculation
*/
dst_link->status.mtu = calc_max_data_outlen(knet_h, onwire_len - ipproto_overhead_len);
pthread_mutex_unlock(&knet_h->pmtud_mutex);
return 0;
}
dst_link->last_good_mtu = onwire_len;
}
}
if (kernel_mtu) {
onwire_len = kernel_mtu;
} else {
onwire_len = (dst_link->last_good_mtu + dst_link->last_bad_mtu) / 2;
}
pthread_mutex_unlock(&knet_h->pmtud_mutex);
goto restart;
}
static int _handle_check_pmtud(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link, int force_run)
{
uint8_t saved_valid_pmtud;
unsigned int saved_pmtud;
struct timespec clock_now;
unsigned long long diff_pmtud, interval;
if (clock_gettime(CLOCK_MONOTONIC, &clock_now) != 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get monotonic clock");
return 0;
}
if (!force_run) {
interval = knet_h->pmtud_interval * 1000000000llu; /* nanoseconds */
timespec_diff(dst_link->pmtud_last, clock_now, &diff_pmtud);
if (diff_pmtud < interval) {
return dst_link->has_valid_mtu;
}
}
/*
* status.proto_overhead should include all IP/(UDP|SCTP)/knet headers
*
* please note that it is not the same as link->proto_overhead that
* includes only either UDP or SCTP (at the moment) overhead.
*/
switch (dst_link->dst_addr.ss_family) {
case AF_INET6:
dst_link->status.proto_overhead = KNET_PMTUD_OVERHEAD_V6 + dst_link->proto_overhead + KNET_HEADER_ALL_SIZE + knet_h->sec_hash_size + knet_h->sec_salt_size;
break;
case AF_INET:
dst_link->status.proto_overhead = KNET_PMTUD_OVERHEAD_V4 + dst_link->proto_overhead + KNET_HEADER_ALL_SIZE + knet_h->sec_hash_size + knet_h->sec_salt_size;
break;
}
saved_pmtud = dst_link->status.mtu;
saved_valid_pmtud = dst_link->has_valid_mtu;
log_debug(knet_h, KNET_SUB_PMTUD, "Starting PMTUD for host: %u link: %u", dst_host->host_id, dst_link->link_id);
errno = 0;
if (_handle_check_link_pmtud(knet_h, dst_host, dst_link) < 0) {
if (errno == EDEADLK) {
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD for host: %u link: %u has been rescheduled", dst_host->host_id, dst_link->link_id);
dst_link->status.mtu = saved_pmtud;
dst_link->has_valid_mtu = saved_valid_pmtud;
errno = EDEADLK;
return dst_link->has_valid_mtu;
}
dst_link->has_valid_mtu = 0;
} else {
if (dst_link->status.mtu < calc_min_mtu(knet_h)) {
log_info(knet_h, KNET_SUB_PMTUD,
"Invalid MTU detected for host: %u link: %u mtu: %u",
dst_host->host_id, dst_link->link_id, dst_link->status.mtu);
dst_link->has_valid_mtu = 0;
} else {
dst_link->has_valid_mtu = 1;
}
if (dst_link->has_valid_mtu) {
if ((saved_pmtud) && (saved_pmtud != dst_link->status.mtu)) {
log_info(knet_h, KNET_SUB_PMTUD, "PMTUD link change for host: %u link: %u from %u to %u",
dst_host->host_id, dst_link->link_id, saved_pmtud, dst_link->status.mtu);
}
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD completed for host: %u link: %u current link mtu: %u",
dst_host->host_id, dst_link->link_id, dst_link->status.mtu);
/*
* set pmtud_last, if we can, after we are done with the PMTUd process
* because it can take a very long time.
*/
dst_link->pmtud_last = clock_now;
if (!clock_gettime(CLOCK_MONOTONIC, &clock_now)) {
dst_link->pmtud_last = clock_now;
}
}
}
if (saved_valid_pmtud != dst_link->has_valid_mtu) {
_host_dstcache_update_async(knet_h, dst_host);
}
return dst_link->has_valid_mtu;
}
void *_handle_pmtud_link_thread(void *data)
{
knet_handle_t knet_h = (knet_handle_t) data;
struct knet_host *dst_host;
struct knet_link *dst_link;
int link_idx;
unsigned int have_mtu;
unsigned int lower_mtu;
int link_has_mtu;
int force_run = 0;
set_thread_status(knet_h, KNET_THREAD_PMTUD, KNET_THREAD_STARTED);
knet_h->data_mtu = calc_min_mtu(knet_h);
/* preparing pmtu buffer */
knet_h->pmtudbuf->kh_version = KNET_HEADER_VERSION;
knet_h->pmtudbuf->kh_type = KNET_HEADER_TYPE_PMTUD;
knet_h->pmtudbuf->kh_node = htons(knet_h->host_id);
while (!shutdown_in_progress(knet_h)) {
usleep(knet_h->threads_timer_res);
if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock");
continue;
}
knet_h->pmtud_abort = 0;
knet_h->pmtud_running = 1;
force_run = knet_h->pmtud_forcerun;
knet_h->pmtud_forcerun = 0;
pthread_mutex_unlock(&knet_h->pmtud_mutex);
if (force_run) {
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUd request to rerun has been received");
}
if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get read lock");
continue;
}
lower_mtu = KNET_PMTUD_SIZE_V4;
have_mtu = 0;
for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
dst_link = &dst_host->link[link_idx];
if ((dst_link->status.enabled != 1) ||
(dst_link->status.connected != 1) ||
(dst_host->link[link_idx].transport == KNET_TRANSPORT_LOOPBACK) ||
(!dst_link->last_ping_size) ||
((dst_link->dynamic == KNET_LINK_DYNIP) &&
(dst_link->status.dynconnected != 1)))
continue;
if (!knet_h->manual_mtu) {
link_has_mtu = _handle_check_pmtud(knet_h, dst_host, dst_link, force_run);
if (errno == EDEADLK) {
goto out_unlock;
}
if (link_has_mtu) {
have_mtu = 1;
if (dst_link->status.mtu < lower_mtu) {
lower_mtu = dst_link->status.mtu;
}
}
} else {
link_has_mtu = _calculate_manual_mtu(knet_h, dst_link);
if (link_has_mtu) {
have_mtu = 1;
if (dst_link->status.mtu < lower_mtu) {
lower_mtu = dst_link->status.mtu;
}
}
}
}
}
if (have_mtu) {
if (knet_h->data_mtu != lower_mtu) {
knet_h->data_mtu = lower_mtu;
log_info(knet_h, KNET_SUB_PMTUD, "Global data MTU changed to: %u", knet_h->data_mtu);
if (knet_h->pmtud_notify_fn) {
knet_h->pmtud_notify_fn(knet_h->pmtud_notify_fn_private_data,
knet_h->data_mtu);
}
}
}
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock");
} else {
knet_h->pmtud_running = 0;
pthread_mutex_unlock(&knet_h->pmtud_mutex);
}
}
set_thread_status(knet_h, KNET_THREAD_PMTUD, KNET_THREAD_STOPPED);
return NULL;
}
+static void send_pmtud_reply(knet_handle_t knet_h, struct knet_link *src_link, struct knet_header *inbuf)
+{
+ int err = 0, savederrno = 0, stats_err = 0;
+ unsigned char *outbuf = (unsigned char *)inbuf;
+ ssize_t len, outlen;
+
+ outlen = KNET_HEADER_PMTUD_SIZE;
+ inbuf->kh_type = KNET_HEADER_TYPE_PMTUD_REPLY;
+ inbuf->kh_node = htons(knet_h->host_id);
+
+ if (knet_h->crypto_in_use_config) {
+ if (crypto_encrypt_and_sign(knet_h,
+ (const unsigned char *)inbuf,
+ outlen,
+ knet_h->recv_from_links_buf_crypt,
+ &outlen) < 0) {
+ log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt PMTUd reply packet");
+ return;
+ }
+ outbuf = knet_h->recv_from_links_buf_crypt;
+ stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
+ if (stats_err < 0) {
+ log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
+ return;
+ }
+ knet_h->stats_extra.tx_crypt_pmtu_reply_packets++;
+ pthread_mutex_unlock(&knet_h->handle_stats_mutex);
+ }
+
+ savederrno = pthread_mutex_lock(&knet_h->tx_mutex);
+ if (savederrno) {
+ log_err(knet_h, KNET_SUB_RX, "Unable to get TX mutex lock: %s", strerror(savederrno));
+ return;
+ }
+retry_pmtud:
+ if (src_link->transport_connected) {
+ if (transport_get_connection_oriented(knet_h, src_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) {
+ len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
+ (struct sockaddr *) &src_link->dst_addr, sizeof(struct sockaddr_storage));
+ } else {
+ len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0);
+ }
+ savederrno = errno;
+ if (len != outlen) {
+ err = transport_tx_sock_error(knet_h, src_link->transport, src_link->outsock, len, savederrno);
+ stats_err = pthread_mutex_lock(&src_link->link_stats_mutex);
+ if (stats_err < 0) {
+ log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
+ return;
+ }
+ switch(err) {
+ case -1: /* unrecoverable error */
+ log_debug(knet_h, KNET_SUB_RX,
+ "Unable to send PMTUd reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
+ src_link->outsock, errno, strerror(errno),
+ src_link->status.src_ipaddr, src_link->status.src_port,
+ src_link->status.dst_ipaddr, src_link->status.dst_port);
+
+ src_link->status.stats.tx_pmtu_errors++;
+ break;
+ case 0: /* ignore error and continue */
+ src_link->status.stats.tx_pmtu_errors++;
+ break;
+ case 1: /* retry to send those same data */
+ src_link->status.stats.tx_pmtu_retries++;
+ pthread_mutex_unlock(&src_link->link_stats_mutex);
+ goto retry_pmtud;
+ break;
+ }
+ pthread_mutex_unlock(&src_link->link_stats_mutex);
+ }
+ }
+ pthread_mutex_unlock(&knet_h->tx_mutex);
+}
+
+void process_pmtud(knet_handle_t knet_h, struct knet_link *src_link, struct knet_header *inbuf)
+{
+ send_pmtud_reply(knet_h, src_link, inbuf);
+}
+
+void process_pmtud_reply(knet_handle_t knet_h, struct knet_link *src_link, struct knet_header *inbuf)
+{
+ if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
+ log_debug(knet_h, KNET_SUB_RX, "Unable to get mutex lock");
+ return;
+ }
+ src_link->last_recv_mtu = inbuf->khp_pmtud_size;
+ pthread_cond_signal(&knet_h->pmtud_cond);
+ pthread_mutex_unlock(&knet_h->pmtud_mutex);
+}
+
int knet_handle_pmtud_getfreq(knet_handle_t knet_h, unsigned int *interval)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!interval) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
*interval = knet_h->pmtud_interval;
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = 0;
return 0;
}
int knet_handle_pmtud_setfreq(knet_handle_t knet_h, unsigned int interval)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if ((!interval) || (interval > 86400)) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->pmtud_interval = interval;
log_debug(knet_h, KNET_SUB_HANDLE, "PMTUd interval set to: %u seconds", interval);
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = 0;
return 0;
}
int knet_handle_enable_pmtud_notify(knet_handle_t knet_h,
void *pmtud_notify_fn_private_data,
void (*pmtud_notify_fn) (
void *private_data,
unsigned int data_mtu))
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->pmtud_notify_fn_private_data = pmtud_notify_fn_private_data;
knet_h->pmtud_notify_fn = pmtud_notify_fn;
if (knet_h->pmtud_notify_fn) {
log_debug(knet_h, KNET_SUB_HANDLE, "pmtud_notify_fn enabled");
} else {
log_debug(knet_h, KNET_SUB_HANDLE, "pmtud_notify_fn disabled");
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = 0;
return 0;
}
int knet_handle_pmtud_set(knet_handle_t knet_h,
unsigned int iface_mtu)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (iface_mtu > KNET_PMTUD_SIZE_V4) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_PMTUD, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
log_info(knet_h, KNET_SUB_PMTUD, "MTU manually set to: %u", iface_mtu);
knet_h->manual_mtu = iface_mtu;
force_pmtud_run(knet_h, KNET_SUB_PMTUD, 0);
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = 0;
return 0;
}
int knet_handle_pmtud_get(knet_handle_t knet_h,
unsigned int *data_mtu)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!data_mtu) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
*data_mtu = knet_h->data_mtu;
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = 0;
return 0;
}
diff --git a/libknet/threads_pmtud.h b/libknet/threads_pmtud.h
index c2c2c7b7..de12ea33 100644
--- a/libknet/threads_pmtud.h
+++ b/libknet/threads_pmtud.h
@@ -1,15 +1,18 @@
/*
* Copyright (C) 2012-2020 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under LGPL-2.0+
*/
#ifndef __KNET_THREADS_PMTUD_H__
#define __KNET_THREADS_PMTUD_H__
void *_handle_pmtud_link_thread(void *data);
+void process_pmtud(knet_handle_t knet_h, struct knet_link *src_link, struct knet_header *inbuf);
+void process_pmtud_reply(knet_handle_t knet_h, struct knet_link *src_link, struct knet_header *inbuf);
+
#endif
diff --git a/libknet/threads_rx.c b/libknet/threads_rx.c
index 0467b4dd..b43ca14e 100644
--- a/libknet/threads_rx.c
+++ b/libknet/threads_rx.c
@@ -1,892 +1,816 @@
/*
* Copyright (C) 2012-2020 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under LGPL-2.0+
*/
#include "config.h"
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <sys/uio.h>
#include <pthread.h>
#include "compat.h"
#include "compress.h"
#include "crypto.h"
#include "host.h"
#include "links.h"
#include "links_acl.h"
#include "logging.h"
#include "transports.h"
#include "transport_common.h"
#include "threads_common.h"
#include "threads_heartbeat.h"
+#include "threads_pmtud.h"
#include "threads_rx.h"
#include "netutils.h"
/*
* RECV
*/
/*
* return 1 if a > b
* return -1 if b > a
* return 0 if they are equal
*/
static inline int timecmp(struct timespec a, struct timespec b)
{
if (a.tv_sec != b.tv_sec) {
if (a.tv_sec > b.tv_sec) {
return 1;
} else {
return -1;
}
} else {
if (a.tv_nsec > b.tv_nsec) {
return 1;
} else if (a.tv_nsec < b.tv_nsec) {
return -1;
} else {
return 0;
}
}
}
/*
* this functions needs to return an index (0 to 7)
* to a knet_host_defrag_buf. (-1 on errors)
*/
static int find_pckt_defrag_buf(knet_handle_t knet_h, struct knet_header *inbuf)
{
struct knet_host *src_host = knet_h->host_index[inbuf->kh_node];
int i, oldest;
/*
* check if there is a buffer already in use handling the same seq_num
*/
for (i = 0; i < KNET_MAX_LINK; i++) {
if (src_host->defrag_buf[i].in_use) {
if (src_host->defrag_buf[i].pckt_seq == inbuf->khp_data_seq_num) {
return i;
}
}
}
/*
* If there is no buffer that's handling the current seq_num
* either it's new or it's been reclaimed already.
* check if it's been reclaimed/seen before using the defrag circular
* buffer. If the pckt has been seen before, the buffer expired (ETIME)
* and there is no point to try to defrag it again.
*/
if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 1, 0)) {
errno = ETIME;
return -1;
}
/*
* register the pckt as seen
*/
_seq_num_set(src_host, inbuf->khp_data_seq_num, 1);
/*
* see if there is a free buffer
*/
for (i = 0; i < KNET_MAX_LINK; i++) {
if (!src_host->defrag_buf[i].in_use) {
return i;
}
}
/*
* at this point, there are no free buffers, the pckt is new
* and we need to reclaim a buffer, and we will take the one
* with the oldest timestamp. It's as good as any.
*/
oldest = 0;
for (i = 0; i < KNET_MAX_LINK; i++) {
if (timecmp(src_host->defrag_buf[i].last_update, src_host->defrag_buf[oldest].last_update) < 0) {
oldest = i;
}
}
src_host->defrag_buf[oldest].in_use = 0;
return oldest;
}
static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *len)
{
struct knet_host_defrag_buf *defrag_buf;
int defrag_buf_idx;
defrag_buf_idx = find_pckt_defrag_buf(knet_h, inbuf);
if (defrag_buf_idx < 0) {
return 1;
}
defrag_buf = &knet_h->host_index[inbuf->kh_node]->defrag_buf[defrag_buf_idx];
/*
* if the buf is not is use, then make sure it's clean
*/
if (!defrag_buf->in_use) {
memset(defrag_buf, 0, sizeof(struct knet_host_defrag_buf));
defrag_buf->in_use = 1;
defrag_buf->pckt_seq = inbuf->khp_data_seq_num;
}
/*
* update timestamp on the buffer
*/
clock_gettime(CLOCK_MONOTONIC, &defrag_buf->last_update);
/*
* check if we already received this fragment
*/
if (defrag_buf->frag_map[inbuf->khp_data_frag_seq]) {
/*
* if we have received this fragment and we didn't clear the buffer
* it means that we don't have all fragments yet
*/
return 1;
}
/*
* we need to handle the last packet with gloves due to its different size
*/
if (inbuf->khp_data_frag_seq == inbuf->khp_data_frag_num) {
defrag_buf->last_frag_size = *len;
/*
* in the event when the last packet arrives first,
* we still don't know the offset vs the other fragments (based on MTU),
* so we store the fragment at the end of the buffer where it's safe
* and take a copy of the len so that we can restore its offset later.
* remember we can't use the local MTU for this calculation because pMTU
* can be asymettric between the same hosts.
*/
if (!defrag_buf->frag_size) {
defrag_buf->last_first = 1;
memmove(defrag_buf->buf + (KNET_MAX_PACKET_SIZE - *len),
inbuf->khp_data_userdata,
*len);
}
} else {
defrag_buf->frag_size = *len;
}
if (defrag_buf->frag_size) {
memmove(defrag_buf->buf + ((inbuf->khp_data_frag_seq - 1) * defrag_buf->frag_size),
inbuf->khp_data_userdata, *len);
}
defrag_buf->frag_recv++;
defrag_buf->frag_map[inbuf->khp_data_frag_seq] = 1;
/*
* check if we received all the fragments
*/
if (defrag_buf->frag_recv == inbuf->khp_data_frag_num) {
/*
* special case the last pckt
*/
if (defrag_buf->last_first) {
memmove(defrag_buf->buf + ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size),
defrag_buf->buf + (KNET_MAX_PACKET_SIZE - defrag_buf->last_frag_size),
defrag_buf->last_frag_size);
}
/*
* recalculate packet lenght
*/
*len = ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size) + defrag_buf->last_frag_size;
/*
* copy the pckt back in the user data
*/
memmove(inbuf->khp_data_userdata, defrag_buf->buf, *len);
/*
* free this buffer
*/
defrag_buf->in_use = 0;
return 0;
}
return 1;
}
static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struct knet_mmsghdr *msg)
{
int err = 0, savederrno = 0, stats_err = 0;
ssize_t outlen;
struct knet_host *src_host;
struct knet_link *src_link;
knet_node_id_t dst_host_ids[KNET_MAX_HOST];
size_t dst_host_ids_entries = 0;
int bcast = 1;
uint64_t decrypt_time = 0;
struct knet_header *inbuf = msg->msg_hdr.msg_iov->iov_base;
- unsigned char *outbuf = (unsigned char *)msg->msg_hdr.msg_iov->iov_base;
ssize_t len = msg->msg_len;
struct iovec iov_out[1];
int8_t channel;
int try_decrypt = 0, decrypted = 0, i, found_link = 0;
for (i = 1; i <= KNET_MAX_CRYPTO_INSTANCES; i++) {
if (knet_h->crypto_instance[i]) {
try_decrypt = 1;
break;
}
}
if ((!try_decrypt) && (knet_h->crypto_only == KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC)) {
log_debug(knet_h, KNET_SUB_RX, "RX thread configured to accept only crypto packets, but no crypto configs are configured!");
return;
}
if (try_decrypt) {
struct timespec start_time;
struct timespec end_time;
clock_gettime(CLOCK_MONOTONIC, &start_time);
if (crypto_authenticate_and_decrypt(knet_h,
(unsigned char *)inbuf,
len,
knet_h->recv_from_links_buf_decrypt,
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to decrypt/auth packet");
if (knet_h->crypto_only == KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC) {
return;
}
log_debug(knet_h, KNET_SUB_RX, "Attempting to process packet as clear data");
} else {
clock_gettime(CLOCK_MONOTONIC, &end_time);
timespec_diff(start_time, end_time, &decrypt_time);
len = outlen;
inbuf = (struct knet_header *)knet_h->recv_from_links_buf_decrypt;
decrypted = 1;
}
}
if (len < (ssize_t)(KNET_HEADER_SIZE + 1)) {
log_debug(knet_h, KNET_SUB_RX, "Packet is too short: %ld", (long)len);
return;
}
if (inbuf->kh_version != KNET_HEADER_VERSION) {
log_debug(knet_h, KNET_SUB_RX, "Packet version does not match");
return;
}
inbuf->kh_node = ntohs(inbuf->kh_node);
src_host = knet_h->host_index[inbuf->kh_node];
if (src_host == NULL) { /* host not found */
log_debug(knet_h, KNET_SUB_RX, "Unable to find source host for this packet");
return;
}
if ((inbuf->kh_type & KNET_HEADER_TYPE_PMSK) != 0) {
/* be aware this works only for PING / PONG and PMTUd packets! */
src_link = src_host->link +
(inbuf->khp_ping_link % KNET_MAX_LINK);
if (src_link->dynamic == KNET_LINK_DYNIP) {
if (cmpaddr(&src_link->dst_addr, msg->msg_hdr.msg_name) != 0) {
log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u appears to have changed ip address",
src_host->host_id, src_link->link_id);
memmove(&src_link->dst_addr, msg->msg_hdr.msg_name, sizeof(struct sockaddr_storage));
if (knet_addrtostr(&src_link->dst_addr, sockaddr_len(&src_link->dst_addr),
src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN,
src_link->status.dst_port, KNET_MAX_PORT_LEN) != 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to resolve ???");
snprintf(src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN - 1, "Unknown!!!");
snprintf(src_link->status.dst_port, KNET_MAX_PORT_LEN - 1, "??");
} else {
log_info(knet_h, KNET_SUB_RX,
"host: %u link: %u new connection established from: %s %s",
src_host->host_id, src_link->link_id,
src_link->status.dst_ipaddr, src_link->status.dst_port);
}
}
/*
* transport has already accepted the connection here
* otherwise we would not be receiving packets
*/
transport_link_dyn_connect(knet_h, sockfd, src_link);
}
} else { /* data packet */
for (i = 0; i < KNET_MAX_LINK; i++) {
src_link = &src_host->link[i];
if (cmpaddr(&src_link->dst_addr, msg->msg_hdr.msg_name) == 0) {
found_link = 1;
break;
}
}
if (!found_link) {
log_debug(knet_h, KNET_SUB_RX, "Unable to determine source link for data packet. Discarding packet.");
return;
}
}
stats_err = pthread_mutex_lock(&src_link->link_stats_mutex);
if (stats_err) {
log_err(knet_h, KNET_SUB_RX, "Unable to get stats mutex lock for host %u link %u: %s",
src_host->host_id, src_link->link_id, strerror(savederrno));
return;
}
switch (inbuf->kh_type) {
case KNET_HEADER_TYPE_DATA:
/* data stats at the top for consistency with TX */
src_link->status.stats.rx_data_packets++;
src_link->status.stats.rx_data_bytes += len;
if (decrypted) {
stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
if (stats_err < 0) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
return;
}
/* Only update the crypto overhead for data packets. Mainly to be
consistent with TX */
if (decrypt_time < knet_h->stats.rx_crypt_time_min) {
knet_h->stats.rx_crypt_time_min = decrypt_time;
}
if (decrypt_time > knet_h->stats.rx_crypt_time_max) {
knet_h->stats.rx_crypt_time_max = decrypt_time;
}
knet_h->stats.rx_crypt_time_ave =
(knet_h->stats.rx_crypt_time_ave * knet_h->stats.rx_crypt_packets +
decrypt_time) / (knet_h->stats.rx_crypt_packets+1);
knet_h->stats.rx_crypt_packets++;
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
}
if (!src_host->status.reachable) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX, "Source host %u not reachable yet. Discarding packet.", src_host->host_id);
return;
}
inbuf->khp_data_seq_num = ntohs(inbuf->khp_data_seq_num);
channel = inbuf->khp_data_channel;
src_host->got_data = 1;
if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) {
log_debug(knet_h, KNET_SUB_RX, "Packet has already been delivered");
}
return;
}
if (inbuf->khp_data_frag_num > 1) {
/*
* len as received from the socket also includes extra stuff
* that the defrag code doesn't care about. So strip it
* here and readd only for repadding once we are done
* defragging
*/
len = len - KNET_HEADER_DATA_SIZE;
if (pckt_defrag(knet_h, inbuf, &len)) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
return;
}
len = len + KNET_HEADER_DATA_SIZE;
}
if (inbuf->khp_data_compress) {
ssize_t decmp_outlen = KNET_DATABUFSIZE_COMPRESS;
struct timespec start_time;
struct timespec end_time;
uint64_t compress_time;
clock_gettime(CLOCK_MONOTONIC, &start_time);
err = decompress(knet_h, inbuf->khp_data_compress,
(const unsigned char *)inbuf->khp_data_userdata,
len - KNET_HEADER_DATA_SIZE,
knet_h->recv_from_links_buf_decompress,
&decmp_outlen);
stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
if (stats_err < 0) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
return;
}
clock_gettime(CLOCK_MONOTONIC, &end_time);
timespec_diff(start_time, end_time, &compress_time);
if (!err) {
/* Collect stats */
if (compress_time < knet_h->stats.rx_compress_time_min) {
knet_h->stats.rx_compress_time_min = compress_time;
}
if (compress_time > knet_h->stats.rx_compress_time_max) {
knet_h->stats.rx_compress_time_max = compress_time;
}
knet_h->stats.rx_compress_time_ave =
(knet_h->stats.rx_compress_time_ave * knet_h->stats.rx_compressed_packets +
compress_time) / (knet_h->stats.rx_compressed_packets+1);
knet_h->stats.rx_compressed_packets++;
knet_h->stats.rx_compressed_original_bytes += decmp_outlen;
knet_h->stats.rx_compressed_size_bytes += len - KNET_HEADER_SIZE;
memmove(inbuf->khp_data_userdata, knet_h->recv_from_links_buf_decompress, decmp_outlen);
len = decmp_outlen + KNET_HEADER_DATA_SIZE;
} else {
knet_h->stats.rx_failed_to_decompress++;
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_warn(knet_h, KNET_SUB_COMPRESS, "Unable to decompress packet (%d): %s",
err, strerror(errno));
return;
}
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
}
if (knet_h->enabled != 1) /* data forward is disabled */
break;
if (knet_h->dst_host_filter_fn) {
size_t host_idx;
int found = 0;
bcast = knet_h->dst_host_filter_fn(
knet_h->dst_host_filter_fn_private_data,
(const unsigned char *)inbuf->khp_data_userdata,
len - KNET_HEADER_DATA_SIZE,
KNET_NOTIFY_RX,
knet_h->host_id,
inbuf->kh_node,
&channel,
dst_host_ids,
&dst_host_ids_entries);
if (bcast < 0) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast);
return;
}
if ((!bcast) && (!dst_host_ids_entries)) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries");
return;
}
/* check if we are dst for this packet */
if (!bcast) {
if (dst_host_ids_entries > KNET_MAX_HOST) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX, "dst_host_filter_fn returned too many destinations");
return;
}
for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
if (dst_host_ids[host_idx] == knet_h->host_id) {
found = 1;
break;
}
}
if (!found) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX, "Packet is not for us");
return;
}
}
}
if (!knet_h->sockfd[channel].in_use) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX,
"received packet for channel %d but there is no local sock connected",
channel);
return;
}
outlen = 0;
memset(iov_out, 0, sizeof(iov_out));
retry:
iov_out[0].iov_base = (void *) inbuf->khp_data_userdata + outlen;
iov_out[0].iov_len = len - (outlen + KNET_HEADER_DATA_SIZE);
outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1);
if ((outlen > 0) && (outlen < (ssize_t)iov_out[0].iov_len)) {
log_debug(knet_h, KNET_SUB_RX,
"Unable to send all data to the application in one go. Expected: %zu Sent: %zd\n",
iov_out[0].iov_len, outlen);
goto retry;
}
if (outlen <= 0) {
knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
knet_h->sockfd[channel].sockfd[0],
channel,
KNET_NOTIFY_RX,
outlen,
errno);
pthread_mutex_unlock(&src_link->link_stats_mutex);
return;
}
if ((size_t)outlen == iov_out[0].iov_len) {
_seq_num_set(src_host, inbuf->khp_data_seq_num, 0);
}
break;
case KNET_HEADER_TYPE_PING:
process_ping(knet_h, src_host, src_link, inbuf, len);
break;
case KNET_HEADER_TYPE_PONG:
process_pong(knet_h, src_host, src_link, inbuf, len);
break;
case KNET_HEADER_TYPE_PMTUD:
src_link->status.stats.rx_pmtu_packets++;
src_link->status.stats.rx_pmtu_bytes += len;
- outlen = KNET_HEADER_PMTUD_SIZE;
- inbuf->kh_type = KNET_HEADER_TYPE_PMTUD_REPLY;
- inbuf->kh_node = htons(knet_h->host_id);
-
- if (knet_h->crypto_in_use_config) {
- if (crypto_encrypt_and_sign(knet_h,
- (const unsigned char *)inbuf,
- outlen,
- knet_h->recv_from_links_buf_crypt,
- &outlen) < 0) {
- log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt PMTUd reply packet");
- break;
- }
- outbuf = knet_h->recv_from_links_buf_crypt;
- stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
- if (stats_err < 0) {
- log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
- break;
- }
- knet_h->stats_extra.tx_crypt_pmtu_reply_packets++;
- pthread_mutex_unlock(&knet_h->handle_stats_mutex);
- }
-
/* Unlock so we don't deadlock with tx_mutex */
pthread_mutex_unlock(&src_link->link_stats_mutex);
-
- savederrno = pthread_mutex_lock(&knet_h->tx_mutex);
- if (savederrno) {
- log_err(knet_h, KNET_SUB_RX, "Unable to get TX mutex lock: %s", strerror(savederrno));
- goto out_pmtud;
- }
-retry_pmtud:
- if (src_link->transport_connected) {
- if (transport_get_connection_oriented(knet_h, src_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) {
- len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
- (struct sockaddr *) &src_link->dst_addr, sizeof(struct sockaddr_storage));
- } else {
- len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0);
- }
- savederrno = errno;
- if (len != outlen) {
- err = transport_tx_sock_error(knet_h, src_link->transport, src_link->outsock, len, savederrno);
- stats_err = pthread_mutex_lock(&src_link->link_stats_mutex);
- if (stats_err < 0) {
- log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
- break;
- }
- switch(err) {
- case -1: /* unrecoverable error */
- log_debug(knet_h, KNET_SUB_RX,
- "Unable to send PMTUd reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
- src_link->outsock, errno, strerror(errno),
- src_link->status.src_ipaddr, src_link->status.src_port,
- src_link->status.dst_ipaddr, src_link->status.dst_port);
-
- src_link->status.stats.tx_pmtu_errors++;
- break;
- case 0: /* ignore error and continue */
- src_link->status.stats.tx_pmtu_errors++;
- break;
- case 1: /* retry to send those same data */
- src_link->status.stats.tx_pmtu_retries++;
- pthread_mutex_unlock(&src_link->link_stats_mutex);
- goto retry_pmtud;
- break;
- }
- pthread_mutex_unlock(&src_link->link_stats_mutex);
- }
- }
- pthread_mutex_unlock(&knet_h->tx_mutex);
-out_pmtud:
+ process_pmtud(knet_h, src_link, inbuf);
return; /* Don't need to unlock link_stats_mutex */
case KNET_HEADER_TYPE_PMTUD_REPLY:
src_link->status.stats.rx_pmtu_packets++;
src_link->status.stats.rx_pmtu_bytes += len;
-
/* pmtud_mutex can't be acquired while we hold a link_stats_mutex (ordering) */
pthread_mutex_unlock(&src_link->link_stats_mutex);
-
- if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
- log_debug(knet_h, KNET_SUB_RX, "Unable to get mutex lock");
- break;
- }
- src_link->last_recv_mtu = inbuf->khp_pmtud_size;
- pthread_cond_signal(&knet_h->pmtud_cond);
- pthread_mutex_unlock(&knet_h->pmtud_mutex);
+ process_pmtud_reply(knet_h, src_link, inbuf);
return;
default:
pthread_mutex_unlock(&src_link->link_stats_mutex);
return;
}
pthread_mutex_unlock(&src_link->link_stats_mutex);
}
static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg)
{
int err, savederrno;
int i, msg_recv, transport;
if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to get global read lock");
return;
}
if (_is_valid_fd(knet_h, sockfd) < 1) {
/*
* this is normal if a fd got an event and before we grab the read lock
* and the link is removed by another thread
*/
goto exit_unlock;
}
transport = knet_h->knet_transport_fd_tracker[sockfd].transport;
/*
* reset msg_namelen to buffer size because after recvmmsg
* each msg_namelen will contain sizeof sockaddr_in or sockaddr_in6
*/
for (i = 0; i < PCKT_RX_BUFS; i++) {
msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
}
msg_recv = _recvmmsg(sockfd, &msg[0], PCKT_RX_BUFS, MSG_DONTWAIT | MSG_NOSIGNAL);
savederrno = errno;
/*
* WARNING: man page for recvmmsg is wrong. Kernel implementation here:
* recvmmsg can return:
* -1 on error
* 0 if the previous run of recvmmsg recorded an error on the socket
* N number of messages (see exception below).
*
* If there is an error from recvmsg after receiving a frame or more, the recvmmsg
* loop is interrupted, error recorded in the socket (getsockopt(SO_ERROR) and
* it will be visibile in the next run.
*
* Need to be careful how we handle errors at this stage.
*
* error messages need to be handled on a per transport/protocol base
* at this point we have different layers of error handling
* - msg_recv < 0 -> error from this run
* msg_recv = 0 -> error from previous run and error on socket needs to be cleared
* - per-transport message data
* example: msg[i].msg_hdr.msg_flags & MSG_NOTIFICATION or msg_len for SCTP == EOF,
* but for UDP it is perfectly legal to receive a 0 bytes message.. go figure
* - NOTE: on SCTP MSG_NOTIFICATION we get msg_recv == PCKT_FRAG_MAX messages and no
* errno set. That means the error api needs to be able to abort the loop below.
*/
if (msg_recv <= 0) {
transport_rx_sock_error(knet_h, transport, sockfd, msg_recv, savederrno);
goto exit_unlock;
}
for (i = 0; i < msg_recv; i++) {
err = transport_rx_is_data(knet_h, transport, sockfd, &msg[i]);
/*
* TODO: make this section silent once we are confident
* all protocols packet handlers are good
*/
switch(err) {
case KNET_TRANSPORT_RX_ERROR: /* on error */
log_debug(knet_h, KNET_SUB_RX, "Transport reported error parsing packet");
goto exit_unlock;
break;
case KNET_TRANSPORT_RX_NOT_DATA_CONTINUE: /* packet is not data and we should continue the packet process loop */
log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, continue");
break;
case KNET_TRANSPORT_RX_NOT_DATA_STOP: /* packet is not data and we should STOP the packet process loop */
log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, stop");
goto exit_unlock;
break;
case KNET_TRANSPORT_RX_IS_DATA: /* packet is data and should be parsed as such */
/*
* processing incoming packets vs access lists
*/
if ((knet_h->use_access_lists) &&
(transport_get_acl_type(knet_h, transport) == USE_GENERIC_ACL)) {
if (!check_validate(knet_h, sockfd, transport, msg[i].msg_hdr.msg_name)) {
char src_ipaddr[KNET_MAX_HOST_LEN];
char src_port[KNET_MAX_PORT_LEN];
memset(src_ipaddr, 0, KNET_MAX_HOST_LEN);
memset(src_port, 0, KNET_MAX_PORT_LEN);
if (knet_addrtostr(msg[i].msg_hdr.msg_name, sockaddr_len(msg[i].msg_hdr.msg_name),
src_ipaddr, KNET_MAX_HOST_LEN,
src_port, KNET_MAX_PORT_LEN) < 0) {
log_debug(knet_h, KNET_SUB_RX, "Packet rejected: unable to resolve host/port");
} else {
log_debug(knet_h, KNET_SUB_RX, "Packet rejected from %s/%s", src_ipaddr, src_port);
}
/*
* continue processing the other packets
*/
continue;
}
}
_parse_recv_from_links(knet_h, sockfd, &msg[i]);
break;
case KNET_TRANSPORT_RX_OOB_DATA_CONTINUE:
log_debug(knet_h, KNET_SUB_RX, "Transport is processing sock OOB data, continue");
break;
case KNET_TRANSPORT_RX_OOB_DATA_STOP:
log_debug(knet_h, KNET_SUB_RX, "Transport has completed processing sock OOB data, stop");
goto exit_unlock;
break;
}
}
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
}
void *_handle_recv_from_links_thread(void *data)
{
int i, nev;
knet_handle_t knet_h = (knet_handle_t) data;
struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
struct sockaddr_storage address[PCKT_RX_BUFS];
struct knet_mmsghdr msg[PCKT_RX_BUFS];
struct iovec iov_in[PCKT_RX_BUFS];
set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_STARTED);
memset(&msg, 0, sizeof(msg));
memset(&events, 0, sizeof(events));
for (i = 0; i < PCKT_RX_BUFS; i++) {
iov_in[i].iov_base = (void *)knet_h->recv_from_links_buf[i];
iov_in[i].iov_len = KNET_DATABUFSIZE;
memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr));
msg[i].msg_hdr.msg_name = &address[i];
msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
msg[i].msg_hdr.msg_iov = &iov_in[i];
msg[i].msg_hdr.msg_iovlen = 1;
}
while (!shutdown_in_progress(knet_h)) {
nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, knet_h->threads_timer_res / 1000);
/*
* the RX threads only need to notify that there has been at least
* one successful run after queue flush has been requested.
* See setfwd in handle.c
*/
if (get_thread_flush_queue(knet_h, KNET_THREAD_RX) == KNET_THREAD_QUEUE_FLUSH) {
set_thread_flush_queue(knet_h, KNET_THREAD_RX, KNET_THREAD_QUEUE_FLUSHED);
}
/*
* we use timeout to detect if thread is shutting down
*/
if (nev == 0) {
continue;
}
for (i = 0; i < nev; i++) {
_handle_recv_from_links(knet_h, events[i].data.fd, msg);
}
}
set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_STOPPED);
return NULL;
}
ssize_t knet_recv(knet_handle_t knet_h, char *buff, const size_t buff_len, const int8_t channel)
{
int savederrno = 0;
ssize_t err = 0;
struct iovec iov_in;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (buff == NULL) {
errno = EINVAL;
return -1;
}
if (buff_len <= 0) {
errno = EINVAL;
return -1;
}
if (buff_len > KNET_MAX_PACKET_SIZE) {
errno = EINVAL;
return -1;
}
if (channel < 0) {
errno = EINVAL;
return -1;
}
if (channel >= KNET_DATAFD_MAX) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->sockfd[channel].in_use) {
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
memset(&iov_in, 0, sizeof(iov_in));
iov_in.iov_base = (void *)buff;
iov_in.iov_len = buff_len;
err = readv(knet_h->sockfd[channel].sockfd[0], &iov_in, 1);
savederrno = errno;
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Mon, Feb 24, 11:10 AM (19 h, 39 m ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1464153
Default Alt Text
(55 KB)
Attached To
Mode
rK kronosnet
Attached
Detach File
Event Timeline
Log In to Comment