diff --git a/libknet/host.c b/libknet/host.c index 61b4edb9..8b422369 100644 --- a/libknet/host.c +++ b/libknet/host.c @@ -1,794 +1,794 @@ /* * Copyright (C) 2010-2021 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include "host.h" #include "internals.h" #include "logging.h" #include "threads_common.h" static void _host_list_update(knet_handle_t knet_h) { struct knet_host *host; knet_h->host_ids_entries = 0; for (host = knet_h->host_head; host != NULL; host = host->next) { knet_h->host_ids[knet_h->host_ids_entries] = host->host_id; knet_h->host_ids_entries++; } } int knet_host_add(knet_handle_t knet_h, knet_node_id_t host_id) { int savederrno = 0, err = 0; struct knet_host *host = NULL; uint8_t link_idx; if (!_is_valid_handle(knet_h)) { return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (knet_h->host_index[host_id]) { err = -1; savederrno = EEXIST; log_err(knet_h, KNET_SUB_HOST, "Unable to add host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } host = malloc(sizeof(struct knet_host)); if (!host) { err = -1; savederrno = errno; log_err(knet_h, KNET_SUB_HOST, "Unable to allocate memory for host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } memset(host, 0, sizeof(struct knet_host)); /* * set host_id */ host->host_id = host_id; /* * fill up our own data */ if (knet_h->host_id == host->host_id) { host->onwire_ver = knet_h->onwire_ver; host->onwire_max_ver = knet_h->onwire_max_ver; } /* * set default host->name to host_id for logging */ snprintf(host->name, KNET_MAX_HOST_LEN, "%u", host_id); /* * initialize links internal data */ for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { host->link[link_idx].link_id = link_idx; host->link[link_idx].status.stats.latency_min = UINT32_MAX; } /* * add new host to the index */ knet_h->host_index[host_id] = host; /* * add new host to host list */ if (knet_h->host_head) { host->next = knet_h->host_head; } knet_h->host_head = host; _host_list_update(knet_h); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); if (err < 0) { free(host); } errno = err ? savederrno : 0; return err; } int knet_host_remove(knet_handle_t knet_h, knet_node_id_t host_id) { int savederrno = 0, err = 0; struct knet_host *host, *removed; uint8_t link_idx; if (!_is_valid_handle(knet_h)) { return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Unable to remove host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } /* * if links are configured we cannot release the host */ for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { if (host->link[link_idx].configured) { err = -1; savederrno = EBUSY; log_err(knet_h, KNET_SUB_HOST, "Unable to remove host %u, links are still configured: %s", host_id, strerror(savederrno)); goto exit_unlock; } } removed = NULL; /* * removing host from list */ if (knet_h->host_head->host_id == host_id) { removed = knet_h->host_head; knet_h->host_head = removed->next; } else { for (host = knet_h->host_head; host->next != NULL; host = host->next) { if (host->next->host_id == host_id) { removed = host->next; host->next = removed->next; break; } } } knet_h->host_index[host_id] = NULL; free(removed); _host_list_update(knet_h); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_host_set_name(knet_handle_t knet_h, knet_node_id_t host_id, const char *name) { int savederrno = 0, err = 0; struct knet_host *host; if (!_is_valid_handle(knet_h)) { return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->host_index[host_id]) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Unable to find host %u to set name: %s", host_id, strerror(savederrno)); goto exit_unlock; } if (!name) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Unable to set name for host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } if (strlen(name) >= KNET_MAX_HOST_LEN) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Requested name for host %u is too long: %s", host_id, strerror(savederrno)); goto exit_unlock; } for (host = knet_h->host_head; host != NULL; host = host->next) { if (!strncmp(host->name, name, KNET_MAX_HOST_LEN)) { err = -1; savederrno = EEXIST; log_err(knet_h, KNET_SUB_HOST, "Duplicated name found on host_id %u", host->host_id); goto exit_unlock; } } snprintf(knet_h->host_index[host_id]->name, KNET_MAX_HOST_LEN, "%s", name); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_host_get_name_by_host_id(knet_handle_t knet_h, knet_node_id_t host_id, char *name) { int savederrno = 0, err = 0; if (!_is_valid_handle(knet_h)) { return -1; } if (!name) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->host_index[host_id]) { savederrno = EINVAL; err = -1; log_debug(knet_h, KNET_SUB_HOST, "Host %u not found", host_id); goto exit_unlock; } snprintf(name, KNET_MAX_HOST_LEN, "%s", knet_h->host_index[host_id]->name); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_host_get_id_by_host_name(knet_handle_t knet_h, const char *name, knet_node_id_t *host_id) { int savederrno = 0, err = 0, found = 0; struct knet_host *host; if (!_is_valid_handle(knet_h)) { return -1; } if (!name) { errno = EINVAL; return -1; } if (!host_id) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } for (host = knet_h->host_head; host != NULL; host = host->next) { if (!strncmp(name, host->name, KNET_MAX_HOST_LEN)) { found = 1; *host_id = host->host_id; break; } } if (!found) { savederrno = ENOENT; err = -1; } pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_host_get_host_list(knet_handle_t knet_h, knet_node_id_t *host_ids, size_t *host_ids_entries) { int savederrno = 0; if (!_is_valid_handle(knet_h)) { return -1; } if ((!host_ids) || (!host_ids_entries)) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } memmove(host_ids, knet_h->host_ids, sizeof(knet_h->host_ids)); *host_ids_entries = knet_h->host_ids_entries; pthread_rwlock_unlock(&knet_h->global_rwlock); return 0; } int knet_host_set_policy(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t policy) { int savederrno = 0, err = 0; uint8_t old_policy; if (!_is_valid_handle(knet_h)) { return -1; } if (policy > KNET_LINK_POLICY_RR) { errno = EINVAL; return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->host_index[host_id]) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Unable to set name for host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } old_policy = knet_h->host_index[host_id]->link_handler_policy; knet_h->host_index[host_id]->link_handler_policy = policy; if (_host_dstcache_update_async(knet_h, knet_h->host_index[host_id])) { savederrno = errno; err = -1; knet_h->host_index[host_id]->link_handler_policy = old_policy; log_debug(knet_h, KNET_SUB_HOST, "Unable to update switch cache for host %u: %s", host_id, strerror(savederrno)); } log_debug(knet_h, KNET_SUB_HOST, "Host %u has new switching policy: %u", host_id, policy); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_host_get_policy(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t *policy) { int savederrno = 0, err = 0; if (!_is_valid_handle(knet_h)) { return -1; } if (!policy) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->host_index[host_id]) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Unable to get name for host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } *policy = knet_h->host_index[host_id]->link_handler_policy; exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_host_get_status(knet_handle_t knet_h, knet_node_id_t host_id, struct knet_host_status *status) { int savederrno = 0, err = 0; struct knet_host *host; if (!_is_valid_handle(knet_h)) { return -1; } if (!status) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } memmove(status, &host->status, sizeof(struct knet_host_status)); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_host_enable_status_change_notify(knet_handle_t knet_h, void *host_status_change_notify_fn_private_data, void (*host_status_change_notify_fn) ( void *private_data, knet_node_id_t host_id, uint8_t reachable, uint8_t remote, uint8_t external)) { int savederrno = 0; if (!_is_valid_handle(knet_h)) { return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } knet_h->host_status_change_notify_fn_private_data = host_status_change_notify_fn_private_data; knet_h->host_status_change_notify_fn = host_status_change_notify_fn; if (knet_h->host_status_change_notify_fn) { log_debug(knet_h, KNET_SUB_HOST, "host_status_change_notify_fn enabled"); } else { log_debug(knet_h, KNET_SUB_HOST, "host_status_change_notify_fn disabled"); } pthread_rwlock_unlock(&knet_h->global_rwlock); errno = 0; return 0; } static void _clear_cbuffers(struct knet_host *host, seq_num_t rx_seq_num) { int i; memset(host->circular_buffer, 0, KNET_CBUFFER_SIZE); host->rx_seq_num = rx_seq_num; memset(host->circular_buffer_defrag, 0, KNET_CBUFFER_SIZE); for (i = 0; i < KNET_DEFRAG_BUFFERS; i++) { memset(&host->defrag_buf[i], 0, sizeof(struct knet_host_defrag_buf)); } } -static void _reclaim_old_defrag_bufs(struct knet_host *host, seq_num_t seq_num) +static void _reclaim_old_defrag_bufs(knet_handle_t knet_h, struct knet_host *host, seq_num_t seq_num) { seq_num_t head, tail; /* seq_num boundaries */ int i; head = seq_num + 1; tail = seq_num - (KNET_DEFRAG_BUFFERS + 1); /* * expire old defrag buffers */ for (i = 0; i < KNET_DEFRAG_BUFFERS; i++) { if (host->defrag_buf[i].in_use) { /* * head has done a rollover to 0+ */ if (tail > head) { if ((host->defrag_buf[i].pckt_seq >= head) && (host->defrag_buf[i].pckt_seq <= tail)) { host->defrag_buf[i].in_use = 0; } } else { if ((host->defrag_buf[i].pckt_seq >= head) || (host->defrag_buf[i].pckt_seq <= tail)){ host->defrag_buf[i].in_use = 0; } } } } } /* * check if a given packet seq num is in the circular buffers * defrag_buf = 0 -> use normal cbuf 1 -> use the defrag buffer lookup */ -int _seq_num_lookup(struct knet_host *host, seq_num_t seq_num, int defrag_buf, int clear_buf) +int _seq_num_lookup(knet_handle_t knet_h, struct knet_host *host, seq_num_t seq_num, int defrag_buf, int clear_buf) { size_t head, tail; /* circular buffer indexes */ seq_num_t seq_dist; char *dst_cbuf = host->circular_buffer; char *dst_cbuf_defrag = host->circular_buffer_defrag; seq_num_t *dst_seq_num = &host->rx_seq_num; if (clear_buf) { _clear_cbuffers(host, seq_num); } - _reclaim_old_defrag_bufs(host, seq_num); + _reclaim_old_defrag_bufs(knet_h, host, seq_num); if (seq_num < *dst_seq_num) { seq_dist = (SEQ_MAX - seq_num) + *dst_seq_num; } else { seq_dist = *dst_seq_num - seq_num; } head = seq_num % KNET_CBUFFER_SIZE; if (seq_dist < KNET_CBUFFER_SIZE) { /* seq num is in ring buffer */ if (!defrag_buf) { return (dst_cbuf[head] == 0) ? 1 : 0; } else { return (dst_cbuf_defrag[head] == 0) ? 1 : 0; } } else if (seq_dist <= SEQ_MAX - KNET_CBUFFER_SIZE) { memset(dst_cbuf, 0, KNET_CBUFFER_SIZE); memset(dst_cbuf_defrag, 0, KNET_CBUFFER_SIZE); *dst_seq_num = seq_num; } /* cleaning up circular buffer */ tail = (*dst_seq_num + 1) % KNET_CBUFFER_SIZE; if (tail > head) { memset(dst_cbuf + tail, 0, KNET_CBUFFER_SIZE - tail); memset(dst_cbuf, 0, head + 1); memset(dst_cbuf_defrag + tail, 0, KNET_CBUFFER_SIZE - tail); memset(dst_cbuf_defrag, 0, head + 1); } else { memset(dst_cbuf + tail, 0, head - tail + 1); memset(dst_cbuf_defrag + tail, 0, head - tail + 1); } *dst_seq_num = seq_num; return 1; } void _seq_num_set(struct knet_host *host, seq_num_t seq_num, int defrag_buf) { if (!defrag_buf) { host->circular_buffer[seq_num % KNET_CBUFFER_SIZE] = 1; } else { host->circular_buffer_defrag[seq_num % KNET_CBUFFER_SIZE] = 1; } return; } int _host_dstcache_update_async(knet_handle_t knet_h, struct knet_host *host) { int savederrno = 0; knet_node_id_t host_id = host->host_id; if (sendto(knet_h->dstsockfd[1], &host_id, sizeof(host_id), MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0) != sizeof(host_id)) { savederrno = errno; log_debug(knet_h, KNET_SUB_HOST, "Unable to write to dstpipefd[1]: %s", strerror(savederrno)); errno = savederrno; return -1; } return 0; } int _host_dstcache_update_sync(knet_handle_t knet_h, struct knet_host *host) { int link_idx; int best_priority = -1; int reachable = 0; if (knet_h->host_id == host->host_id && knet_h->has_loop_link) { host->active_link_entries = 1; return 0; } host->active_link_entries = 0; for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { if (host->link[link_idx].status.enabled != 1) /* link is not enabled */ continue; if (host->link[link_idx].status.connected != 1) /* link is not enabled */ continue; if (host->link[link_idx].has_valid_mtu != 1) /* link does not have valid MTU */ continue; if (host->link_handler_policy == KNET_LINK_POLICY_PASSIVE) { /* for passive we look for the only active link with higher priority */ if (host->link[link_idx].priority > best_priority) { host->active_links[0] = link_idx; best_priority = host->link[link_idx].priority; } host->active_link_entries = 1; } else { /* for RR and ACTIVE we need to copy all available links */ host->active_links[host->active_link_entries] = link_idx; host->active_link_entries++; } } if (host->link_handler_policy == KNET_LINK_POLICY_PASSIVE) { log_info(knet_h, KNET_SUB_HOST, "host: %u (passive) best link: %u (pri: %u)", host->host_id, host->link[host->active_links[0]].link_id, host->link[host->active_links[0]].priority); } else { log_info(knet_h, KNET_SUB_HOST, "host: %u has %u active links", host->host_id, host->active_link_entries); } /* no active links, we can clean the circular buffers and indexes */ if (!host->active_link_entries) { log_warn(knet_h, KNET_SUB_HOST, "host: %u has no active links", host->host_id); _clear_cbuffers(host, 0); } else { reachable = 1; } if (host->status.reachable != reachable) { host->status.reachable = reachable; if (knet_h->host_status_change_notify_fn) { knet_h->host_status_change_notify_fn( knet_h->host_status_change_notify_fn_private_data, host->host_id, host->status.reachable, host->status.remote, host->status.external); } } return 0; } void _handle_onwire_version(knet_handle_t knet_h, struct knet_host *host, struct knet_header *inbuf) { struct knet_host *tmp_host = NULL; uint8_t onwire_ver = knet_h->onwire_max_ver; int docallback = 0; /* * data we process here are onwire independent * we are in a global read only lock context, so it´s safe to parse host lists * and we can change onwire_ver using the dedicated mutex */ /* * update current host onwire info */ host->onwire_ver = inbuf->kh_version; host->onwire_max_ver = inbuf->kh_max_ver; for (tmp_host = knet_h->host_head; tmp_host != NULL; tmp_host = tmp_host->next) { /* * do not attempt to change protocol till * we see all nodes at least once. */ if (!tmp_host->onwire_max_ver) { return; } /* * ignore nodes were max ver is lower than our min ver * logged as error by thread_rx, we need to make sure to skip it * during onwire_ver calculation. */ if (tmp_host->onwire_max_ver < knet_h->onwire_min_ver) { continue; } /* * use the highest max_ver common to all known nodes */ if (tmp_host->onwire_max_ver < onwire_ver) { onwire_ver = tmp_host->onwire_max_ver; } } if (pthread_mutex_lock(&knet_h->onwire_mutex)) { log_debug(knet_h, KNET_SUB_HOST, "Unable to get onwire mutex lock"); return; } if (knet_h->onwire_force_ver) { onwire_ver = knet_h->onwire_force_ver; } if (knet_h->onwire_ver != onwire_ver) { log_debug(knet_h, KNET_SUB_HOST, "node %u updating onwire version to %u", knet_h->host_id, onwire_ver); knet_h->onwire_ver = onwire_ver; docallback = 1; } pthread_mutex_unlock(&knet_h->onwire_mutex); /* * do the callback outside of locked context and use cached value * to avoid blocking on locking */ if ((docallback) && (knet_h->onwire_ver_notify_fn)) { knet_h->onwire_ver_notify_fn(knet_h->onwire_ver_notify_fn_private_data, knet_h->onwire_min_ver, knet_h->onwire_max_ver, onwire_ver); } } diff --git a/libknet/host.h b/libknet/host.h index baa56592..ccd5514d 100644 --- a/libknet/host.h +++ b/libknet/host.h @@ -1,23 +1,23 @@ /* * Copyright (C) 2012-2021 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under LGPL-2.0+ */ #ifndef __KNET_HOST_H__ #define __KNET_HOST_H__ #include "internals.h" -int _seq_num_lookup(struct knet_host *host, seq_num_t seq_num, int defrag_buf, int clear_buf); +int _seq_num_lookup(knet_handle_t knet_h, struct knet_host *host, seq_num_t seq_num, int defrag_buf, int clear_buf); void _seq_num_set(struct knet_host *host, seq_num_t seq_num, int defrag_buf); int _host_dstcache_update_async(knet_handle_t knet_h, struct knet_host *host); int _host_dstcache_update_sync(knet_handle_t knet_h, struct knet_host *host); void _handle_onwire_version(knet_handle_t knet_h, struct knet_host *host, struct knet_header *inbuf); #endif diff --git a/libknet/onwire_v1.c b/libknet/onwire_v1.c index 071dbeef..d820286e 100644 --- a/libknet/onwire_v1.c +++ b/libknet/onwire_v1.c @@ -1,216 +1,216 @@ /* * Copyright (C) 2020-2021 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * * This software licensed under LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include #include "logging.h" #include "host.h" #include "links.h" #include "onwire_v1.h" int prep_ping_v1(knet_handle_t knet_h, struct knet_link *dst_link, uint8_t onwire_ver, struct timespec clock_now, int timed, ssize_t *outlen) { *outlen = KNET_HEADER_PING_V1_SIZE; /* preparing ping buffer */ knet_h->pingbuf->kh_version = onwire_ver; knet_h->pingbuf->kh_max_ver = knet_h->onwire_max_ver; knet_h->pingbuf->kh_type = KNET_HEADER_TYPE_PING; knet_h->pingbuf->kh_node = htons(knet_h->host_id); knet_h->pingbuf->khp_ping_v1_link = dst_link->link_id; knet_h->pingbuf->khp_ping_v1_timed = timed; memmove(&knet_h->pingbuf->khp_ping_v1_time[0], &clock_now, sizeof(struct timespec)); if (pthread_mutex_lock(&knet_h->tx_seq_num_mutex)) { log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to get seq mutex lock"); return -1; } knet_h->pingbuf->khp_ping_v1_seq_num = htons(knet_h->tx_seq_num); pthread_mutex_unlock(&knet_h->tx_seq_num_mutex); return 0; } void prep_pong_v1(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *outlen) { *outlen = KNET_HEADER_PING_V1_SIZE; inbuf->kh_type = KNET_HEADER_TYPE_PONG; inbuf->kh_node = htons(knet_h->host_id); } void process_ping_v1(knet_handle_t knet_h, struct knet_host *src_host, struct knet_link *src_link, struct knet_header *inbuf, ssize_t len) { int wipe_bufs = 0; seq_num_t recv_seq_num = ntohs(inbuf->khp_ping_v1_seq_num); if (!inbuf->khp_ping_v1_timed) { /* * we might be receiving this message from all links, but we want * to process it only the first time */ if (recv_seq_num != src_host->untimed_rx_seq_num) { /* * cache the untimed seq num */ src_host->untimed_rx_seq_num = recv_seq_num; /* * if the host has received data in between * untimed ping, then we don't need to wipe the bufs */ if (src_host->got_data) { src_host->got_data = 0; wipe_bufs = 0; } else { wipe_bufs = 1; } } - _seq_num_lookup(src_host, recv_seq_num, 0, wipe_bufs); + _seq_num_lookup(knet_h, src_host, recv_seq_num, 0, wipe_bufs); } else { /* * pings always arrives in bursts over all the link * catch the first of them to cache the seq num and * avoid duplicate processing */ if (recv_seq_num != src_host->timed_rx_seq_num) { src_host->timed_rx_seq_num = recv_seq_num; if (recv_seq_num == 0) { - _seq_num_lookup(src_host, recv_seq_num, 0, 1); + _seq_num_lookup(knet_h, src_host, recv_seq_num, 0, 1); } } } } void process_pong_v1(knet_handle_t knet_h, struct knet_host *src_host, struct knet_link *src_link, struct knet_header *inbuf, struct timespec *recvtime) { memmove(recvtime, &inbuf->khp_ping_v1_time[0], sizeof(struct timespec)); } struct knet_link *get_link_from_pong_v1(knet_handle_t knet_h, struct knet_host *src_host, struct knet_header *inbuf) { return &src_host->link[inbuf->khp_ping_v1_link]; } void prep_pmtud_v1(knet_handle_t knet_h, struct knet_link *dst_link, uint8_t onwire_ver, size_t onwire_len) { knet_h->pmtudbuf->kh_version = onwire_ver; knet_h->pmtudbuf->kh_max_ver = knet_h->onwire_max_ver; knet_h->pmtudbuf->kh_type = KNET_HEADER_TYPE_PMTUD; knet_h->pmtudbuf->kh_node = htons(knet_h->host_id); knet_h->pmtudbuf->khp_pmtud_v1_link = dst_link->link_id; knet_h->pmtudbuf->khp_pmtud_v1_size = onwire_len; } void prep_pmtud_reply_v1(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *outlen) { *outlen = KNET_HEADER_PMTUD_V1_SIZE; inbuf->kh_type = KNET_HEADER_TYPE_PMTUD_REPLY; inbuf->kh_node = htons(knet_h->host_id); } void process_pmtud_reply_v1(knet_handle_t knet_h, struct knet_link *src_link, struct knet_header *inbuf) { src_link->last_recv_mtu = inbuf->khp_pmtud_v1_size; } void prep_tx_bufs_v1(knet_handle_t knet_h, struct knet_header *inbuf, unsigned char *data, size_t inlen, unsigned int temp_data_mtu, seq_num_t tx_seq_num, int8_t channel, int bcast, int data_compressed, int *msgs_to_send, struct iovec iov_out[PCKT_FRAG_MAX][2], int *iovcnt_out) { uint8_t frag_idx = 0; size_t frag_len = inlen; /* * prepare the main header */ inbuf->kh_type = KNET_HEADER_TYPE_DATA; inbuf->kh_version = 1; inbuf->kh_max_ver = knet_h->onwire_max_ver; inbuf->kh_node = htons(knet_h->host_id); /* * prepare the data header */ inbuf->khp_data_v1_frag_seq = 0; inbuf->khp_data_v1_bcast = bcast; inbuf->khp_data_v1_frag_num = ceil((float)inlen / temp_data_mtu); inbuf->khp_data_v1_channel = channel; inbuf->khp_data_v1_seq_num = htons(tx_seq_num); if (data_compressed) { inbuf->khp_data_v1_compress = knet_h->compress_model; } else { inbuf->khp_data_v1_compress = 0; } /* * handle fragmentation */ if (inbuf->khp_data_v1_frag_num > 1) { while (frag_idx < inbuf->khp_data_v1_frag_num) { /* * set the iov_base */ iov_out[frag_idx][0].iov_base = (void *)knet_h->send_to_links_buf[frag_idx]; iov_out[frag_idx][0].iov_len = KNET_HEADER_DATA_V1_SIZE; iov_out[frag_idx][1].iov_base = data + (temp_data_mtu * frag_idx); /* * set the len */ if (frag_len > temp_data_mtu) { iov_out[frag_idx][1].iov_len = temp_data_mtu; } else { iov_out[frag_idx][1].iov_len = frag_len; } /* * copy the frag info on all buffers */ memmove(knet_h->send_to_links_buf[frag_idx], inbuf, KNET_HEADER_DATA_V1_SIZE); /* * bump the frag */ knet_h->send_to_links_buf[frag_idx]->khp_data_v1_frag_seq = frag_idx + 1; frag_len = frag_len - temp_data_mtu; frag_idx++; } *iovcnt_out = 2; } else { iov_out[frag_idx][0].iov_base = (void *)inbuf; iov_out[frag_idx][0].iov_len = frag_len + KNET_HEADER_DATA_V1_SIZE; *iovcnt_out = 1; } *msgs_to_send = inbuf->khp_data_v1_frag_num; } unsigned char *get_data_v1(knet_handle_t knet_h, struct knet_header *inbuf) { return inbuf->khp_data_v1_userdata; } void get_data_header_info_v1(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *header_size, int8_t *channel, seq_num_t *seq_num, uint8_t *decompress_type, uint8_t *frags, uint8_t *frag_seq) { *header_size = KNET_HEADER_DATA_V1_SIZE; *channel = inbuf->khp_data_v1_channel; *seq_num = ntohs(inbuf->khp_data_v1_seq_num); *decompress_type = inbuf->khp_data_v1_compress; *frags = inbuf->khp_data_v1_frag_num; *frag_seq = inbuf->khp_data_v1_frag_seq; } diff --git a/libknet/threads_rx.c b/libknet/threads_rx.c index df84d29a..7a097475 100644 --- a/libknet/threads_rx.c +++ b/libknet/threads_rx.c @@ -1,956 +1,956 @@ /* * Copyright (C) 2012-2021 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include "compat.h" #include "compress.h" #include "crypto.h" #include "host.h" #include "links.h" #include "links_acl.h" #include "logging.h" #include "transports.h" #include "transport_common.h" #include "threads_common.h" #include "threads_heartbeat.h" #include "threads_pmtud.h" #include "threads_rx.h" #include "netutils.h" #include "onwire_v1.h" /* * RECV */ /* * return 1 if a > b * return -1 if b > a * return 0 if they are equal */ static inline int _timecmp(struct timespec a, struct timespec b) { if (a.tv_sec != b.tv_sec) { if (a.tv_sec > b.tv_sec) { return 1; } else { return -1; } } else { if (a.tv_nsec > b.tv_nsec) { return 1; } else if (a.tv_nsec < b.tv_nsec) { return -1; } else { return 0; } } } /* * this functions needs to return an index (0 to 7) * to a knet_host_defrag_buf. (-1 on errors) */ static int _find_pckt_defrag_buf(knet_handle_t knet_h, struct knet_host *src_host, seq_num_t seq_num) { int i, oldest; /* * check if there is a buffer already in use handling the same seq_num */ for (i = 0; i < KNET_DEFRAG_BUFFERS; i++) { if (src_host->defrag_buf[i].in_use) { if (src_host->defrag_buf[i].pckt_seq == seq_num) { return i; } } } /* * If there is no buffer that's handling the current seq_num * either it's new or it's been reclaimed already. * check if it's been reclaimed/seen before using the defrag circular * buffer. If the pckt has been seen before, the buffer expired (ETIME) * and there is no point to try to defrag it again. */ - if (!_seq_num_lookup(src_host, seq_num, 1, 0)) { + if (!_seq_num_lookup(knet_h, src_host, seq_num, 1, 0)) { errno = ETIME; return -1; } /* * register the pckt as seen */ _seq_num_set(src_host, seq_num, 1); /* * see if there is a free buffer */ for (i = 0; i < KNET_DEFRAG_BUFFERS; i++) { if (!src_host->defrag_buf[i].in_use) { return i; } } /* * at this point, there are no free buffers, the pckt is new * and we need to reclaim a buffer, and we will take the one * with the oldest timestamp. It's as good as any. */ oldest = 0; for (i = 0; i < KNET_DEFRAG_BUFFERS; i++) { if (_timecmp(src_host->defrag_buf[i].last_update, src_host->defrag_buf[oldest].last_update) < 0) { oldest = i; } } src_host->defrag_buf[oldest].in_use = 0; return oldest; } static int _pckt_defrag(knet_handle_t knet_h, struct knet_host *src_host, seq_num_t seq_num, unsigned char *data, ssize_t *len, uint8_t frags, uint8_t frag_seq) { struct knet_host_defrag_buf *defrag_buf; int defrag_buf_idx; defrag_buf_idx = _find_pckt_defrag_buf(knet_h, src_host, seq_num); if (defrag_buf_idx < 0) { return 1; } defrag_buf = &src_host->defrag_buf[defrag_buf_idx]; /* * if the buf is not is use, then make sure it's clean */ if (!defrag_buf->in_use) { memset(defrag_buf, 0, sizeof(struct knet_host_defrag_buf)); defrag_buf->in_use = 1; defrag_buf->pckt_seq = seq_num; } /* * update timestamp on the buffer */ clock_gettime(CLOCK_MONOTONIC, &defrag_buf->last_update); /* * check if we already received this fragment */ if (defrag_buf->frag_map[frag_seq]) { /* * if we have received this fragment and we didn't clear the buffer * it means that we don't have all fragments yet */ return 1; } /* * we need to handle the last packet with gloves due to its different size */ if (frag_seq == frags) { defrag_buf->last_frag_size = *len; /* * in the event when the last packet arrives first, * we still don't know the offset vs the other fragments (based on MTU), * so we store the fragment at the end of the buffer where it's safe * and take a copy of the len so that we can restore its offset later. * remember we can't use the local MTU for this calculation because pMTU * can be asymettric between the same hosts. */ if (!defrag_buf->frag_size) { defrag_buf->last_first = 1; memmove(defrag_buf->buf + (KNET_MAX_PACKET_SIZE - *len), data, *len); } } else { defrag_buf->frag_size = *len; } if (defrag_buf->frag_size) { memmove(defrag_buf->buf + ((frag_seq - 1) * defrag_buf->frag_size), data, *len); } defrag_buf->frag_recv++; defrag_buf->frag_map[frag_seq] = 1; /* * check if we received all the fragments */ if (defrag_buf->frag_recv == frags) { /* * special case the last pckt */ if (defrag_buf->last_first) { memmove(defrag_buf->buf + ((frags - 1) * defrag_buf->frag_size), defrag_buf->buf + (KNET_MAX_PACKET_SIZE - defrag_buf->last_frag_size), defrag_buf->last_frag_size); } /* * recalculate packet lenght */ *len = ((frags - 1) * defrag_buf->frag_size) + defrag_buf->last_frag_size; /* * copy the pckt back in the user data */ memmove(data, defrag_buf->buf, *len); /* * free this buffer */ defrag_buf->in_use = 0; return 0; } return 1; } static int _handle_data_stats(knet_handle_t knet_h, struct knet_link *src_link, ssize_t len, uint64_t decrypt_time) { int stats_err; /* data stats at the top for consistency with TX */ src_link->status.stats.rx_data_packets++; src_link->status.stats.rx_data_bytes += len; if (decrypt_time) { stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); if (stats_err < 0) { log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err)); return -1; } /* Only update the crypto overhead for data packets. Mainly to be consistent with TX */ if (decrypt_time < knet_h->stats.rx_crypt_time_min) { knet_h->stats.rx_crypt_time_min = decrypt_time; } if (decrypt_time > knet_h->stats.rx_crypt_time_max) { knet_h->stats.rx_crypt_time_max = decrypt_time; } knet_h->stats.rx_crypt_time_ave = (knet_h->stats.rx_crypt_time_ave * knet_h->stats.rx_crypt_packets + decrypt_time) / (knet_h->stats.rx_crypt_packets+1); knet_h->stats.rx_crypt_packets++; pthread_mutex_unlock(&knet_h->handle_stats_mutex); } return 0; } static int _decompress_data(knet_handle_t knet_h, uint8_t decompress_type, unsigned char *data, ssize_t *len, ssize_t header_size) { int err = 0, stats_err = 0; if (decompress_type) { ssize_t decmp_outlen = KNET_DATABUFSIZE_COMPRESS; struct timespec start_time; struct timespec end_time; uint64_t decompress_time; clock_gettime(CLOCK_MONOTONIC, &start_time); err = decompress(knet_h, decompress_type, data, *len - header_size, knet_h->recv_from_links_buf_decompress, &decmp_outlen); clock_gettime(CLOCK_MONOTONIC, &end_time); timespec_diff(start_time, end_time, &decompress_time); stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); if (stats_err < 0) { log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err)); return -1; } if (!err) { /* Collect stats */ if (decompress_time < knet_h->stats.rx_compress_time_min) { knet_h->stats.rx_compress_time_min = decompress_time; } if (decompress_time > knet_h->stats.rx_compress_time_max) { knet_h->stats.rx_compress_time_max = decompress_time; } knet_h->stats.rx_compress_time_ave = (knet_h->stats.rx_compress_time_ave * knet_h->stats.rx_compressed_packets + decompress_time) / (knet_h->stats.rx_compressed_packets+1); knet_h->stats.rx_compressed_packets++; knet_h->stats.rx_compressed_original_bytes += decmp_outlen; knet_h->stats.rx_compressed_size_bytes += *len - KNET_HEADER_SIZE; memmove(data, knet_h->recv_from_links_buf_decompress, decmp_outlen); *len = decmp_outlen + header_size; } else { knet_h->stats.rx_failed_to_decompress++; pthread_mutex_unlock(&knet_h->handle_stats_mutex); log_warn(knet_h, KNET_SUB_COMPRESS, "Unable to decompress packet (%d): %s", err, strerror(errno)); return -1; } pthread_mutex_unlock(&knet_h->handle_stats_mutex); } return 0; } static int _check_destination(knet_handle_t knet_h, struct knet_header *inbuf, unsigned char *data, ssize_t len, ssize_t header_size, int8_t *channel) { knet_node_id_t dst_host_ids[KNET_MAX_HOST]; size_t dst_host_ids_entries = 0; int bcast = 1; size_t host_idx; int found = 0; if (knet_h->dst_host_filter_fn) { bcast = knet_h->dst_host_filter_fn( knet_h->dst_host_filter_fn_private_data, data, len - header_size, KNET_NOTIFY_RX, knet_h->host_id, inbuf->kh_node, channel, dst_host_ids, &dst_host_ids_entries); if (bcast < 0) { log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast); return -1; } if ((!bcast) && (!dst_host_ids_entries)) { log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries"); return -1; } /* check if we are dst for this packet */ if (!bcast) { if (dst_host_ids_entries > KNET_MAX_HOST) { log_debug(knet_h, KNET_SUB_RX, "dst_host_filter_fn returned too many destinations"); return -1; } for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) { if (dst_host_ids[host_idx] == knet_h->host_id) { found = 1; break; } } if (!found) { log_debug(knet_h, KNET_SUB_RX, "Packet is not for us"); return -1; } } } return 0; } static int _deliver_data(knet_handle_t knet_h, unsigned char *data, ssize_t len, ssize_t header_size, int8_t channel) { struct iovec iov_out[1]; ssize_t outlen = 0; memset(iov_out, 0, sizeof(iov_out)); retry: iov_out[0].iov_base = (void *) data + outlen; iov_out[0].iov_len = len - (outlen + header_size); outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1); if ((outlen > 0) && (outlen < (ssize_t)iov_out[0].iov_len)) { log_debug(knet_h, KNET_SUB_RX, "Unable to send all data to the application in one go. Expected: %zu Sent: %zd\n", iov_out[0].iov_len, outlen); goto retry; } if (outlen <= 0) { knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data, knet_h->sockfd[channel].sockfd[0], channel, KNET_NOTIFY_RX, outlen, errno); return -1; } if ((size_t)outlen != iov_out[0].iov_len) { return -1; } return 0; } static void _process_data(knet_handle_t knet_h, struct knet_host *src_host, struct knet_link *src_link, struct knet_header *inbuf, ssize_t len, uint64_t decrypt_time) { int8_t channel; uint8_t decompress_type = 0; ssize_t header_size; seq_num_t seq_num; uint8_t frags, frag_seq; unsigned char *data; if (_handle_data_stats(knet_h, src_link, len, decrypt_time) < 0) { return; } /* * register host is sending data. Required to determine if we need * to reset circular buffers. (see onwire_v1.c) */ src_host->got_data = 1; if (knet_h->onwire_ver_remap) { get_data_header_info_v1(knet_h, inbuf, &header_size, &channel, &seq_num, &decompress_type, &frags, &frag_seq); data = get_data_v1(knet_h, inbuf); } else { switch (inbuf->kh_version) { case 1: get_data_header_info_v1(knet_h, inbuf, &header_size, &channel, &seq_num, &decompress_type, &frags, &frag_seq); data = get_data_v1(knet_h, inbuf); break; default: log_warn(knet_h, KNET_SUB_RX, "processing data onwire version %u not supported", inbuf->kh_version); return; break; } } - if (!_seq_num_lookup(src_host, seq_num, 0, 0)) { + if (!_seq_num_lookup(knet_h, src_host, seq_num, 0, 0)) { if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) { log_debug(knet_h, KNET_SUB_RX, "Packet has already been delivered"); } return; } if (frags > 1) { /* * len as received from the socket also includes extra stuff * that the defrag code doesn't care about. So strip it * here and readd only for repadding once we are done * defragging * * the defrag code assumes that data packets have all the same size * except the last one that might be smaller. * */ len = len - header_size; if (_pckt_defrag(knet_h, src_host, seq_num, data, &len, frags, frag_seq)) { return; } len = len + header_size; } if (_decompress_data(knet_h, decompress_type, data, &len, header_size) < 0) { return; } if (!src_host->status.reachable) { log_debug(knet_h, KNET_SUB_RX, "Source host %u not reachable yet. Discarding packet.", src_host->host_id); return; } if (knet_h->enabled != 1) /* data forward is disabled */ return; if (_check_destination(knet_h, inbuf, data, len, header_size, &channel) < 0) { return; } if (!knet_h->sockfd[channel].in_use) { log_debug(knet_h, KNET_SUB_RX, "received packet for channel %d but there is no local sock connected", channel); return; } if (_deliver_data(knet_h, data, len, header_size, channel) < 0) { return; } _seq_num_set(src_host, seq_num, 0); } static struct knet_header *_decrypt_packet(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *len, uint64_t *decrypt_time) { int try_decrypt = 0; int i = 0; struct timespec start_time; struct timespec end_time; ssize_t outlen; for (i = 1; i <= KNET_MAX_CRYPTO_INSTANCES; i++) { if (knet_h->crypto_instance[i]) { try_decrypt = 1; break; } } if ((!try_decrypt) && (knet_h->crypto_only == KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC)) { log_debug(knet_h, KNET_SUB_RX, "RX thread configured to accept only crypto packets, but no crypto configs are configured!"); return NULL; } if (try_decrypt) { clock_gettime(CLOCK_MONOTONIC, &start_time); if (crypto_authenticate_and_decrypt(knet_h, (unsigned char *)inbuf, *len, knet_h->recv_from_links_buf_decrypt, &outlen) < 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to decrypt/auth packet"); if (knet_h->crypto_only == KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC) { return NULL; } log_debug(knet_h, KNET_SUB_RX, "Attempting to process packet as clear data"); } else { clock_gettime(CLOCK_MONOTONIC, &end_time); timespec_diff(start_time, end_time, decrypt_time); *len = outlen; inbuf = (struct knet_header *)knet_h->recv_from_links_buf_decrypt; } } return inbuf; } static int _packet_checks(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t len) { if (len < (ssize_t)(KNET_HEADER_SIZE + 1)) { log_debug(knet_h, KNET_SUB_RX, "Packet is too short: %ld", (long)len); return -1; } /* * old versions of knet did not advertise max_ver and max_ver is set to 0. */ if (!inbuf->kh_max_ver) { inbuf->kh_max_ver = 1; } /* * if the node joining max version is lower than the min version * then we reject the node */ if (inbuf->kh_max_ver < knet_h->onwire_min_ver) { log_warn(knet_h, KNET_SUB_RX, "Received packet version %u from node %u, lower than currently minimal supported onwire version. Rejecting.", inbuf->kh_version, inbuf->kh_node); return -1; } /* * if the node joining with version higher than our max version * then we reject the node */ if (inbuf->kh_version > knet_h->onwire_max_ver) { log_warn(knet_h, KNET_SUB_RX, "Received packet version %u from node %u, higher than currently maximum supported onwire version. Rejecting.", inbuf->kh_version, inbuf->kh_node); return -1; } /* * if the node joining with version lower than the current in use version * then we reject the node * * NOTE: should we make this configurable and support downgrades? */ if ((!knet_h->onwire_force_ver) && (inbuf->kh_version < knet_h->onwire_ver) && (inbuf->kh_max_ver > inbuf->kh_version)) { log_warn(knet_h, KNET_SUB_RX, "Received packet version %u from node %u, lower than currently in use onwire version. Rejecting.", inbuf->kh_version, inbuf->kh_node); return -1; } return 0; } static void _handle_dynip(knet_handle_t knet_h, struct knet_host *src_host, struct knet_link *src_link, int sockfd, const struct knet_mmsghdr *msg) { if (src_link->dynamic == KNET_LINK_DYNIP) { if (cmpaddr(&src_link->dst_addr, msg->msg_hdr.msg_name) != 0) { log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u appears to have changed ip address", src_host->host_id, src_link->link_id); memmove(&src_link->dst_addr, msg->msg_hdr.msg_name, sizeof(struct sockaddr_storage)); if (knet_addrtostr(&src_link->dst_addr, sockaddr_len(&src_link->dst_addr), src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN, src_link->status.dst_port, KNET_MAX_PORT_LEN) != 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to resolve ???"); snprintf(src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN - 1, "Unknown!!!"); snprintf(src_link->status.dst_port, KNET_MAX_PORT_LEN - 1, "??"); } else { log_info(knet_h, KNET_SUB_RX, "host: %u link: %u new connection established from: %s %s", src_host->host_id, src_link->link_id, src_link->status.dst_ipaddr, src_link->status.dst_port); } } /* * transport has already accepted the connection here * otherwise we would not be receiving packets */ transport_link_dyn_connect(knet_h, sockfd, src_link); } } static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struct knet_mmsghdr *msg) { int savederrno = 0, stats_err = 0; struct knet_host *src_host; struct knet_link *src_link; uint64_t decrypt_time = 0; struct knet_header *inbuf = msg->msg_hdr.msg_iov->iov_base; ssize_t len = msg->msg_len; int i, found_link = 0; inbuf = _decrypt_packet(knet_h, inbuf, &len, &decrypt_time); if (!inbuf) { return; } inbuf->kh_node = ntohs(inbuf->kh_node); if (_packet_checks(knet_h, inbuf, len) < 0) { return; } /* * determine source host */ src_host = knet_h->host_index[inbuf->kh_node]; if (src_host == NULL) { /* host not found */ log_debug(knet_h, KNET_SUB_RX, "Unable to find source host for this packet"); return; } /* * deteremine source link */ if (inbuf->kh_type == KNET_HEADER_TYPE_PING) { _handle_onwire_version(knet_h, src_host, inbuf); if (knet_h->onwire_ver_remap) { src_link = get_link_from_pong_v1(knet_h, src_host, inbuf); } else { switch (inbuf->kh_version) { case 1: src_link = get_link_from_pong_v1(knet_h, src_host, inbuf); break; default: log_warn(knet_h, KNET_SUB_RX, "Parsing ping onwire version %u not supported", inbuf->kh_version); return; break; } } _handle_dynip(knet_h, src_host, src_link, sockfd, msg); } else { /* all other packets */ for (i = 0; i < KNET_MAX_LINK; i++) { src_link = &src_host->link[i]; if (cmpaddr(&src_link->dst_addr, msg->msg_hdr.msg_name) == 0) { found_link = 1; break; } } if (!found_link) { log_debug(knet_h, KNET_SUB_RX, "Unable to determine source link for data packet. Discarding packet."); return; } } stats_err = pthread_mutex_lock(&src_link->link_stats_mutex); if (stats_err) { log_err(knet_h, KNET_SUB_RX, "Unable to get stats mutex lock for host %u link %u: %s", src_host->host_id, src_link->link_id, strerror(savederrno)); return; } switch (inbuf->kh_type) { case KNET_HEADER_TYPE_DATA: _process_data(knet_h, src_host, src_link, inbuf, len, decrypt_time); break; case KNET_HEADER_TYPE_PING: process_ping(knet_h, src_host, src_link, inbuf, len); break; case KNET_HEADER_TYPE_PONG: process_pong(knet_h, src_host, src_link, inbuf, len); break; case KNET_HEADER_TYPE_PMTUD: src_link->status.stats.rx_pmtu_packets++; src_link->status.stats.rx_pmtu_bytes += len; /* Unlock so we don't deadlock with tx_mutex */ pthread_mutex_unlock(&src_link->link_stats_mutex); process_pmtud(knet_h, src_link, inbuf); return; /* Don't need to unlock link_stats_mutex */ break; case KNET_HEADER_TYPE_PMTUD_REPLY: src_link->status.stats.rx_pmtu_packets++; src_link->status.stats.rx_pmtu_bytes += len; /* pmtud_mutex can't be acquired while we hold a link_stats_mutex (ordering) */ pthread_mutex_unlock(&src_link->link_stats_mutex); process_pmtud_reply(knet_h, src_link, inbuf); return; break; default: pthread_mutex_unlock(&src_link->link_stats_mutex); return; break; } pthread_mutex_unlock(&src_link->link_stats_mutex); } static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg) { int err, savederrno; int i, msg_recv, transport; if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to get global read lock"); return; } if (_is_valid_fd(knet_h, sockfd) < 1) { /* * this is normal if a fd got an event and before we grab the read lock * and the link is removed by another thread */ goto exit_unlock; } transport = knet_h->knet_transport_fd_tracker[sockfd].transport; /* * reset msg_namelen to buffer size because after recvmmsg * each msg_namelen will contain sizeof sockaddr_in or sockaddr_in6 */ for (i = 0; i < PCKT_RX_BUFS; i++) { msg[i].msg_hdr.msg_namelen = knet_h->knet_transport_fd_tracker[sockfd].sockaddr_len; } msg_recv = _recvmmsg(sockfd, &msg[0], PCKT_RX_BUFS, MSG_DONTWAIT | MSG_NOSIGNAL); savederrno = errno; /* * WARNING: man page for recvmmsg is wrong. Kernel implementation here: * recvmmsg can return: * -1 on error * 0 if the previous run of recvmmsg recorded an error on the socket * N number of messages (see exception below). * * If there is an error from recvmsg after receiving a frame or more, the recvmmsg * loop is interrupted, error recorded in the socket (getsockopt(SO_ERROR) and * it will be visibile in the next run. * * Need to be careful how we handle errors at this stage. * * error messages need to be handled on a per transport/protocol base * at this point we have different layers of error handling * - msg_recv < 0 -> error from this run * msg_recv = 0 -> error from previous run and error on socket needs to be cleared * - per-transport message data * example: msg[i].msg_hdr.msg_flags & MSG_NOTIFICATION or msg_len for SCTP == EOF, * but for UDP it is perfectly legal to receive a 0 bytes message.. go figure * - NOTE: on SCTP MSG_NOTIFICATION we get msg_recv == PCKT_FRAG_MAX messages and no * errno set. That means the error api needs to be able to abort the loop below. */ if (msg_recv <= 0) { transport_rx_sock_error(knet_h, transport, sockfd, msg_recv, savederrno); goto exit_unlock; } for (i = 0; i < msg_recv; i++) { err = transport_rx_is_data(knet_h, transport, sockfd, &msg[i]); /* * TODO: make this section silent once we are confident * all protocols packet handlers are good */ switch(err) { case KNET_TRANSPORT_RX_ERROR: /* on error */ log_debug(knet_h, KNET_SUB_RX, "Transport reported error parsing packet"); goto exit_unlock; break; case KNET_TRANSPORT_RX_NOT_DATA_CONTINUE: /* packet is not data and we should continue the packet process loop */ log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, continue"); break; case KNET_TRANSPORT_RX_NOT_DATA_STOP: /* packet is not data and we should STOP the packet process loop */ log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, stop"); goto exit_unlock; break; case KNET_TRANSPORT_RX_IS_DATA: /* packet is data and should be parsed as such */ /* * processing incoming packets vs access lists */ if ((knet_h->use_access_lists) && (transport_get_acl_type(knet_h, transport) == USE_GENERIC_ACL)) { if (!check_validate(knet_h, sockfd, transport, msg[i].msg_hdr.msg_name)) { char src_ipaddr[KNET_MAX_HOST_LEN]; char src_port[KNET_MAX_PORT_LEN]; memset(src_ipaddr, 0, KNET_MAX_HOST_LEN); memset(src_port, 0, KNET_MAX_PORT_LEN); if (knet_addrtostr(msg[i].msg_hdr.msg_name, sockaddr_len(msg[i].msg_hdr.msg_name), src_ipaddr, KNET_MAX_HOST_LEN, src_port, KNET_MAX_PORT_LEN) < 0) { log_debug(knet_h, KNET_SUB_RX, "Packet rejected: unable to resolve host/port"); } else { log_debug(knet_h, KNET_SUB_RX, "Packet rejected from %s/%s", src_ipaddr, src_port); } /* * continue processing the other packets */ continue; } } _parse_recv_from_links(knet_h, sockfd, &msg[i]); break; case KNET_TRANSPORT_RX_OOB_DATA_CONTINUE: log_debug(knet_h, KNET_SUB_RX, "Transport is processing sock OOB data, continue"); break; case KNET_TRANSPORT_RX_OOB_DATA_STOP: log_debug(knet_h, KNET_SUB_RX, "Transport has completed processing sock OOB data, stop"); goto exit_unlock; break; } } exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); } void *_handle_recv_from_links_thread(void *data) { int i, nev; knet_handle_t knet_h = (knet_handle_t) data; struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; struct sockaddr_storage address[PCKT_RX_BUFS]; struct knet_mmsghdr msg[PCKT_RX_BUFS]; struct iovec iov_in[PCKT_RX_BUFS]; set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_STARTED); memset(&msg, 0, sizeof(msg)); memset(&events, 0, sizeof(events)); for (i = 0; i < PCKT_RX_BUFS; i++) { iov_in[i].iov_base = (void *)knet_h->recv_from_links_buf[i]; iov_in[i].iov_len = KNET_DATABUFSIZE; memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr)); msg[i].msg_hdr.msg_name = &address[i]; msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); /* Real value filled in before actual use */ msg[i].msg_hdr.msg_iov = &iov_in[i]; msg[i].msg_hdr.msg_iovlen = 1; } while (!shutdown_in_progress(knet_h)) { nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, knet_h->threads_timer_res / 1000); /* * the RX threads only need to notify that there has been at least * one successful run after queue flush has been requested. * See setfwd in handle.c */ if (get_thread_flush_queue(knet_h, KNET_THREAD_RX) == KNET_THREAD_QUEUE_FLUSH) { set_thread_flush_queue(knet_h, KNET_THREAD_RX, KNET_THREAD_QUEUE_FLUSHED); } /* * we use timeout to detect if thread is shutting down */ if (nev == 0) { continue; } for (i = 0; i < nev; i++) { _handle_recv_from_links(knet_h, events[i].data.fd, msg); } } set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_STOPPED); return NULL; } ssize_t knet_recv(knet_handle_t knet_h, char *buff, const size_t buff_len, const int8_t channel) { int savederrno = 0; ssize_t err = 0; struct iovec iov_in; if (!_is_valid_handle(knet_h)) { return -1; } if (buff == NULL) { errno = EINVAL; return -1; } if (buff_len <= 0) { errno = EINVAL; return -1; } if (buff_len > KNET_MAX_PACKET_SIZE) { errno = EINVAL; return -1; } if (channel < 0) { errno = EINVAL; return -1; } if (channel >= KNET_DATAFD_MAX) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->sockfd[channel].in_use) { savederrno = EINVAL; err = -1; goto out_unlock; } memset(&iov_in, 0, sizeof(iov_in)); iov_in.iov_base = (void *)buff; iov_in.iov_len = buff_len; err = readv(knet_h->sockfd[channel].sockfd[0], &iov_in, 1); savederrno = errno; out_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; }