diff --git a/libknet/host.c b/libknet/host.c index 34591a95..c082e541 100644 --- a/libknet/host.c +++ b/libknet/host.c @@ -1,726 +1,729 @@ /* * Copyright (C) 2010-2025 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include "host.h" #include "internals.h" #include "logging.h" #include "threads_common.h" static void _host_list_update(knet_handle_t knet_h) { struct knet_host *host; knet_h->host_ids_entries = 0; for (host = knet_h->host_head; host != NULL; host = host->next) { knet_h->host_ids[knet_h->host_ids_entries] = host->host_id; knet_h->host_ids_entries++; } } int knet_host_add(knet_handle_t knet_h, knet_node_id_t host_id) { int savederrno = 0, err = 0; struct knet_host *host = NULL; uint8_t link_idx; if (!_is_valid_handle(knet_h)) { return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (knet_h->host_index[host_id]) { err = -1; savederrno = EEXIST; log_err(knet_h, KNET_SUB_HOST, "Unable to add host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } host = malloc(sizeof(struct knet_host)); if (!host) { err = -1; savederrno = errno; log_err(knet_h, KNET_SUB_HOST, "Unable to allocate memory for host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } memset(host, 0, sizeof(struct knet_host)); /* * set host_id */ host->host_id = host_id; /* * set default host->name to host_id for logging */ snprintf(host->name, KNET_MAX_HOST_LEN, "%u", host_id); /* * initialize links internal data */ for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { host->link[link_idx].link_id = link_idx; host->link[link_idx].status.stats.latency_min = UINT32_MAX; } /* * add new host to the index */ knet_h->host_index[host_id] = host; /* * add new host to host list */ if (knet_h->host_head) { host->next = knet_h->host_head; } knet_h->host_head = host; _host_list_update(knet_h); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); if (err < 0) { free(host); } errno = err ? savederrno : 0; return err; } int knet_host_remove(knet_handle_t knet_h, knet_node_id_t host_id) { int savederrno = 0, err = 0; struct knet_host *host, *removed; uint8_t link_idx; if (!_is_valid_handle(knet_h)) { return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Unable to remove host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } /* * if links are configured we cannot release the host */ for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { if (host->link[link_idx].configured) { err = -1; savederrno = EBUSY; log_err(knet_h, KNET_SUB_HOST, "Unable to remove host %u, links are still configured: %s", host_id, strerror(savederrno)); goto exit_unlock; } } removed = NULL; /* * removing host from list */ + // coverity[NULL_FIELD:SUPPRESS] - host_head is not going to be NULL if (knet_h->host_head->host_id == host_id) { + // coverity[NULL_FIELD:SUPPRESS] - host_head is not going to be NULL removed = knet_h->host_head; + // coverity[NULL_FIELD:SUPPRESS] - host_head is not going to be NULL knet_h->host_head = removed->next; } else { for (host = knet_h->host_head; host->next != NULL; host = host->next) { if (host->next->host_id == host_id) { removed = host->next; host->next = removed->next; break; } } } knet_h->host_index[host_id] = NULL; free(removed); _host_list_update(knet_h); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_host_set_name(knet_handle_t knet_h, knet_node_id_t host_id, const char *name) { int savederrno = 0, err = 0; struct knet_host *host; if (!_is_valid_handle(knet_h)) { return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->host_index[host_id]) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Unable to find host %u to set name: %s", host_id, strerror(savederrno)); goto exit_unlock; } if (!name) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Unable to set name for host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } if (strlen(name) >= KNET_MAX_HOST_LEN) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Requested name for host %u is too long: %s", host_id, strerror(savederrno)); goto exit_unlock; } for (host = knet_h->host_head; host != NULL; host = host->next) { if (!strncmp(host->name, name, KNET_MAX_HOST_LEN)) { err = -1; savederrno = EEXIST; log_err(knet_h, KNET_SUB_HOST, "Duplicated name found on host_id %u", host->host_id); goto exit_unlock; } } snprintf(knet_h->host_index[host_id]->name, KNET_MAX_HOST_LEN, "%s", name); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_host_get_name_by_host_id(knet_handle_t knet_h, knet_node_id_t host_id, char *name) { int savederrno = 0, err = 0; if (!_is_valid_handle(knet_h)) { return -1; } if (!name) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->host_index[host_id]) { savederrno = EINVAL; err = -1; log_debug(knet_h, KNET_SUB_HOST, "Host %u not found", host_id); goto exit_unlock; } snprintf(name, KNET_MAX_HOST_LEN, "%s", knet_h->host_index[host_id]->name); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_host_get_id_by_host_name(knet_handle_t knet_h, const char *name, knet_node_id_t *host_id) { int savederrno = 0, err = 0, found = 0; struct knet_host *host; if (!_is_valid_handle(knet_h)) { return -1; } if (!name) { errno = EINVAL; return -1; } if (!host_id) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } for (host = knet_h->host_head; host != NULL; host = host->next) { if (!strncmp(name, host->name, KNET_MAX_HOST_LEN)) { found = 1; *host_id = host->host_id; break; } } if (!found) { savederrno = ENOENT; err = -1; } pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_host_get_host_list(knet_handle_t knet_h, knet_node_id_t *host_ids, size_t *host_ids_entries) { int savederrno = 0; if (!_is_valid_handle(knet_h)) { return -1; } if ((!host_ids) || (!host_ids_entries)) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } memmove(host_ids, knet_h->host_ids, sizeof(knet_h->host_ids)); *host_ids_entries = knet_h->host_ids_entries; pthread_rwlock_unlock(&knet_h->global_rwlock); return 0; } int knet_host_set_policy(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t policy) { int savederrno = 0, err = 0; uint8_t old_policy; if (!_is_valid_handle(knet_h)) { return -1; } if (policy > KNET_LINK_POLICY_RR) { errno = EINVAL; return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->host_index[host_id]) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Unable to set name for host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } old_policy = knet_h->host_index[host_id]->link_handler_policy; knet_h->host_index[host_id]->link_handler_policy = policy; if (_host_dstcache_update_async(knet_h, knet_h->host_index[host_id])) { savederrno = errno; err = -1; knet_h->host_index[host_id]->link_handler_policy = old_policy; log_debug(knet_h, KNET_SUB_HOST, "Unable to update switch cache for host %u: %s", host_id, strerror(savederrno)); } log_debug(knet_h, KNET_SUB_HOST, "Host %u has new switching policy: %u", host_id, policy); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_host_get_policy(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t *policy) { int savederrno = 0, err = 0; if (!_is_valid_handle(knet_h)) { return -1; } if (!policy) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->host_index[host_id]) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Unable to get name for host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } *policy = knet_h->host_index[host_id]->link_handler_policy; exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_host_get_status(knet_handle_t knet_h, knet_node_id_t host_id, struct knet_host_status *status) { int savederrno = 0, err = 0; struct knet_host *host; if (!_is_valid_handle(knet_h)) { return -1; } if (!status) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } memmove(status, &host->status, sizeof(struct knet_host_status)); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_host_enable_status_change_notify(knet_handle_t knet_h, void *host_status_change_notify_fn_private_data, void (*host_status_change_notify_fn) ( void *private_data, knet_node_id_t host_id, uint8_t reachable, uint8_t remote, uint8_t external)) { int savederrno = 0; if (!_is_valid_handle(knet_h)) { return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } knet_h->host_status_change_notify_fn_private_data = host_status_change_notify_fn_private_data; knet_h->host_status_change_notify_fn = host_status_change_notify_fn; if (knet_h->host_status_change_notify_fn) { log_debug(knet_h, KNET_SUB_HOST, "host_status_change_notify_fn enabled"); } else { log_debug(knet_h, KNET_SUB_HOST, "host_status_change_notify_fn disabled"); } pthread_rwlock_unlock(&knet_h->global_rwlock); errno = 0; return 0; } static void _clear_cbuffers(struct knet_host *host, seq_num_t rx_seq_num) { int i; memset(host->circular_buffer, 0, KNET_CBUFFER_SIZE); host->rx_seq_num = rx_seq_num; memset(host->circular_buffer_defrag, 0, KNET_CBUFFER_SIZE); for (i = 0; i < KNET_DEFRAG_BUFFERS; i++) { memset(&host->defrag_buf[i], 0, sizeof(struct knet_host_defrag_buf)); } } static void _reclaim_old_defrag_bufs(struct knet_host *host, seq_num_t seq_num) { seq_num_t head, tail; /* seq_num boundaries */ int i; head = seq_num + 1; tail = seq_num - (KNET_DEFRAG_BUFFERS + 1); /* * expire old defrag buffers */ for (i = 0; i < KNET_DEFRAG_BUFFERS; i++) { if (host->defrag_buf[i].in_use) { /* * head has done a rollover to 0+ */ if (tail > head) { if ((host->defrag_buf[i].pckt_seq >= head) && (host->defrag_buf[i].pckt_seq <= tail)) { host->defrag_buf[i].in_use = 0; } } else { if ((host->defrag_buf[i].pckt_seq >= head) || (host->defrag_buf[i].pckt_seq <= tail)){ host->defrag_buf[i].in_use = 0; } } } } } /* * check if a given packet seq num is in the circular buffers * defrag_buf = 0 -> use normal cbuf 1 -> use the defrag buffer lookup */ int _seq_num_lookup(struct knet_host *host, seq_num_t seq_num, int defrag_buf, int clear_buf) { size_t head, tail; /* circular buffer indexes */ seq_num_t seq_dist; char *dst_cbuf = host->circular_buffer; char *dst_cbuf_defrag = host->circular_buffer_defrag; seq_num_t *dst_seq_num = &host->rx_seq_num; /* * There is a potential race condition where the sender * is overloaded, sending data packets before pings * can kick in and set the correct dst_seq_num. * * if this node is starting up (dst_seq_num = 0), * it can start rejecing valid packets and get stuck. * * Set the dst_seq_num to the first seen packet and * use that as reference instead. */ if (!*dst_seq_num) { *dst_seq_num = seq_num; } if (clear_buf) { _clear_cbuffers(host, seq_num); } _reclaim_old_defrag_bufs(host, *dst_seq_num); if (seq_num < *dst_seq_num) { seq_dist = (SEQ_MAX - seq_num) + *dst_seq_num; } else { seq_dist = *dst_seq_num - seq_num; } head = seq_num % KNET_CBUFFER_SIZE; if (seq_dist < KNET_CBUFFER_SIZE) { /* seq num is in ring buffer */ if (!defrag_buf) { return (dst_cbuf[head] == 0) ? 1 : 0; } else { return (dst_cbuf_defrag[head] == 0) ? 1 : 0; } } else if (seq_dist <= SEQ_MAX - KNET_CBUFFER_SIZE) { memset(dst_cbuf, 0, KNET_CBUFFER_SIZE); memset(dst_cbuf_defrag, 0, KNET_CBUFFER_SIZE); *dst_seq_num = seq_num; } /* cleaning up circular buffer */ tail = (*dst_seq_num + 1) % KNET_CBUFFER_SIZE; if (tail > head) { memset(dst_cbuf + tail, 0, KNET_CBUFFER_SIZE - tail); memset(dst_cbuf, 0, head + 1); memset(dst_cbuf_defrag + tail, 0, KNET_CBUFFER_SIZE - tail); memset(dst_cbuf_defrag, 0, head + 1); } else { memset(dst_cbuf + tail, 0, head - tail + 1); memset(dst_cbuf_defrag + tail, 0, head - tail + 1); } *dst_seq_num = seq_num; return 1; } void _seq_num_set(struct knet_host *host, seq_num_t seq_num, int defrag_buf) { if (!defrag_buf) { host->circular_buffer[seq_num % KNET_CBUFFER_SIZE] = 1; } else { host->circular_buffer_defrag[seq_num % KNET_CBUFFER_SIZE] = 1; } return; } int _host_dstcache_update_async(knet_handle_t knet_h, struct knet_host *host) { int savederrno = 0; knet_node_id_t host_id = host->host_id; if (sendto(knet_h->dstsockfd[1], &host_id, sizeof(host_id), MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0) != sizeof(host_id)) { savederrno = errno; log_debug(knet_h, KNET_SUB_HOST, "Unable to write to dstpipefd[1]: %s", strerror(savederrno)); errno = savederrno; return -1; } return 0; } int _host_dstcache_update_sync(knet_handle_t knet_h, struct knet_host *host) { int link_idx; int best_priority = -1; int reachable = 0; if (knet_h->host_id == host->host_id && knet_h->has_loop_link) { host->active_link_entries = 1; return 0; } host->active_link_entries = 0; for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { if (host->link[link_idx].status.enabled != 1) /* link is not enabled */ continue; if (host->link[link_idx].status.connected != 1) /* link is not enabled */ continue; if (host->link[link_idx].has_valid_mtu != 1) /* link does not have valid MTU */ continue; if (host->link_handler_policy == KNET_LINK_POLICY_PASSIVE) { /* for passive we look for the only active link with higher priority */ if (host->link[link_idx].priority > best_priority) { host->active_links[0] = link_idx; best_priority = host->link[link_idx].priority; } host->active_link_entries = 1; } else { /* for RR and ACTIVE we need to copy all available links */ host->active_links[host->active_link_entries] = link_idx; host->active_link_entries++; } } if (host->link_handler_policy == KNET_LINK_POLICY_PASSIVE) { log_info(knet_h, KNET_SUB_HOST, "host: %u (passive) best link: %u (pri: %u)", host->host_id, host->link[host->active_links[0]].link_id, host->link[host->active_links[0]].priority); } else { log_info(knet_h, KNET_SUB_HOST, "host: %u has %u active links", host->host_id, host->active_link_entries); } /* no active links, we can clean the circular buffers and indexes */ if (!host->active_link_entries) { log_warn(knet_h, KNET_SUB_HOST, "host: %u has no active links", host->host_id); _clear_cbuffers(host, 0); } else { reachable = 1; } if (host->status.reachable != reachable) { host->status.reachable = reachable; if (knet_h->host_status_change_notify_fn) { knet_h->host_status_change_notify_fn( knet_h->host_status_change_notify_fn_private_data, host->host_id, host->status.reachable, host->status.remote, host->status.external); } } return 0; } diff --git a/libknet/links.c b/libknet/links.c index 8782fb2d..dc69b6f0 100644 --- a/libknet/links.c +++ b/libknet/links.c @@ -1,1555 +1,1556 @@ /* * Copyright (C) 2012-2025 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under LGPL-2.0+ */ #include "config.h" #include #include #include #include #include "netutils.h" #include "internals.h" #include "logging.h" #include "links.h" #include "transports.h" #include "host.h" #include "threads_common.h" #include "links_acl.h" #include #include static int find_ifindex(struct sockaddr_storage *addr) { struct ifaddrs *ifrs, *ifa; if (getifaddrs(&ifrs) == 0) { for (ifa = ifrs; ifa != NULL; ifa = ifa->ifa_next) { if (ifa->ifa_addr && cmpaddr(addr, (struct sockaddr_storage *)ifa->ifa_addr) == 0) { int ifindex = if_nametoindex(ifa->ifa_name); freeifaddrs(ifrs); return ifindex; } } freeifaddrs(ifrs); } return -1; } int _link_updown(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, unsigned int enabled, unsigned int connected, unsigned int lock_stats) { struct knet_host *host = knet_h->host_index[host_id]; struct knet_link *link = &host->link[link_id]; int savederrno = 0; if ((link->status.enabled == enabled) && (link->status.connected == connected)) return 0; link->status.enabled = enabled; link->status.connected = connected; _host_dstcache_update_async(knet_h, knet_h->host_index[host_id]); if ((link->status.dynconnected) && (!link->status.connected)) { link->status.dynconnected = 0; } if (!connected) { transport_link_is_down(knet_h, link); } else { /* Reset MTU in case new link can't use full line MTU */ log_info(knet_h, KNET_SUB_LINK, "Resetting MTU for link %u because host %u joined", link_id, host_id); force_pmtud_run(knet_h, KNET_SUB_LINK, 1, 1); } if (lock_stats) { savederrno = pthread_mutex_lock(&link->link_stats_mutex); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get stats mutex lock for host %u link %u: %s", host_id, link_id, strerror(savederrno)); errno = savederrno; return -1; } } if (connected) { time(&link->status.stats.last_up_times[link->status.stats.last_up_time_index]); link->status.stats.up_count++; if (++link->status.stats.last_up_time_index >= MAX_LINK_EVENTS) { link->status.stats.last_up_time_index = 0; } knet_h->knet_transport_fd_tracker[link->outsock].ifindex = find_ifindex(&link->src_addr); } else { time(&link->status.stats.last_down_times[link->status.stats.last_down_time_index]); link->status.stats.down_count++; if (++link->status.stats.last_down_time_index >= MAX_LINK_EVENTS) { link->status.stats.last_down_time_index = 0; } } if (lock_stats) { pthread_mutex_unlock(&link->link_stats_mutex); } return 0; } void _link_clear_stats(knet_handle_t knet_h) { struct knet_host *host; struct knet_link *link; uint32_t host_id; uint8_t link_id; for (host_id = 0; host_id < KNET_MAX_HOST; host_id++) { host = knet_h->host_index[host_id]; if (!host) { continue; } for (link_id = 0; link_id < KNET_MAX_LINK; link_id++) { link = &host->link[link_id]; memset(&link->status.stats, 0, sizeof(struct knet_link_stats)); } } } int knet_link_set_config(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, uint8_t transport, struct sockaddr_storage *src_addr, struct sockaddr_storage *dst_addr, uint64_t flags) { int savederrno = 0, err = 0, i, wipelink = 0, link_idx; struct knet_host *host, *tmp_host; struct knet_link *link = NULL; if (!_is_valid_handle(knet_h)) { return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } if (!src_addr) { errno = EINVAL; return -1; } if (dst_addr && (src_addr->ss_family != dst_addr->ss_family)) { log_err(knet_h, KNET_SUB_LINK, "Source address family does not match destination address family"); errno = EINVAL; return -1; } if (transport >= KNET_MAX_TRANSPORTS) { errno = EINVAL; return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (transport == KNET_TRANSPORT_LOOPBACK && knet_h->host_id != host_id) { log_err(knet_h, KNET_SUB_LINK, "Cannot create loopback link to remote node"); err = -1; savederrno = EINVAL; goto exit_unlock; } if (knet_h->host_id == host_id && knet_h->has_loop_link) { log_err(knet_h, KNET_SUB_LINK, "Cannot create more than 1 link when loopback is active"); err = -1; savederrno = EINVAL; goto exit_unlock; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } if (transport == KNET_TRANSPORT_LOOPBACK && knet_h->host_id == host_id) { for (i=0; ilink[i].configured) { log_err(knet_h, KNET_SUB_LINK, "Cannot add loopback link when other links are already configured."); err = -1; savederrno = EINVAL; goto exit_unlock; } } } link = &host->link[link_id]; if (link->configured != 0) { err =-1; savederrno = EBUSY; log_err(knet_h, KNET_SUB_LINK, "Host %u link %u is currently configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } if (link->status.enabled != 0) { err =-1; savederrno = EBUSY; log_err(knet_h, KNET_SUB_LINK, "Host %u link %u is currently in use: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } /* * errors happening after this point should trigger * a memset of the link */ wipelink = 1; copy_sockaddr(&link->src_addr, src_addr); err = knet_addrtostr(src_addr, sizeof(struct sockaddr_storage), link->status.src_ipaddr, KNET_MAX_HOST_LEN, link->status.src_port, KNET_MAX_PORT_LEN); if (err) { if (err == EAI_SYSTEM) { savederrno = errno; log_warn(knet_h, KNET_SUB_LINK, "Unable to resolve host: %u link: %u source addr/port: %s", host_id, link_id, strerror(savederrno)); } else { savederrno = EINVAL; log_warn(knet_h, KNET_SUB_LINK, "Unable to resolve host: %u link: %u source addr/port: %s", host_id, link_id, gai_strerror(err)); } err = -1; goto exit_unlock; } if (!dst_addr) { link->dynamic = KNET_LINK_DYNIP; } else { link->dynamic = KNET_LINK_STATIC; copy_sockaddr(&link->dst_addr, dst_addr); err = knet_addrtostr(dst_addr, sizeof(struct sockaddr_storage), link->status.dst_ipaddr, KNET_MAX_HOST_LEN, link->status.dst_port, KNET_MAX_PORT_LEN); if (err) { if (err == EAI_SYSTEM) { savederrno = errno; log_warn(knet_h, KNET_SUB_LINK, "Unable to resolve host: %u link: %u destination addr/port: %s", host_id, link_id, strerror(savederrno)); } else { savederrno = EINVAL; log_warn(knet_h, KNET_SUB_LINK, "Unable to resolve host: %u link: %u destination addr/port: %s", host_id, link_id, gai_strerror(err)); } err = -1; goto exit_unlock; } } link->pmtud_crypto_timeout_multiplier = KNET_LINK_PMTUD_CRYPTO_TIMEOUT_MULTIPLIER_MIN; link->pong_count = KNET_LINK_DEFAULT_PONG_COUNT; link->has_valid_mtu = 0; link->ping_interval = KNET_LINK_DEFAULT_PING_INTERVAL * 1000; /* microseconds */ link->pong_timeout = KNET_LINK_DEFAULT_PING_TIMEOUT * 1000; /* microseconds */ link->pong_timeout_backoff = KNET_LINK_PONG_TIMEOUT_BACKOFF; link->pong_timeout_adj = link->pong_timeout * link->pong_timeout_backoff; /* microseconds */ link->latency_max_samples = KNET_LINK_DEFAULT_PING_PRECISION; + // coverity[MISSING_LOCK:SUPPRESS] - global_wrlock is definitely held here link->latency_cur_samples = 0; link->flags = flags; /* * check for DYNIP vs STATIC collisions. * example: link0 is static, user attempts to configure link1 as dynamic with the same source * address/port. * This configuration is invalid and would cause ACL collisions. */ for (tmp_host = knet_h->host_head; tmp_host != NULL; tmp_host = tmp_host->next) { for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { if (&tmp_host->link[link_idx] == link) continue; if ((!memcmp(&tmp_host->link[link_idx].src_addr, &link->src_addr, sizeof(struct sockaddr_storage))) && (tmp_host->link[link_idx].dynamic != link->dynamic)) { savederrno = EINVAL; err = -1; log_err(knet_h, KNET_SUB_LINK, "Failed to configure host %u link %u dyn %u. Conflicts with host %u link %u dyn %u: %s", host_id, link_id, link->dynamic, tmp_host->host_id, link_idx, tmp_host->link[link_idx].dynamic, strerror(savederrno)); goto exit_unlock; } } } savederrno = pthread_mutex_init(&link->link_stats_mutex, NULL); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to initialize link stats mutex: %s", strerror(savederrno)); err = -1; goto exit_unlock; } if (transport_link_set_config(knet_h, link, transport) < 0) { savederrno = errno; err = -1; goto exit_transport_err; } /* * we can only configure default access lists if we know both endpoints * and the protocol uses GENERIC_ACL, otherwise the protocol has * to setup their own access lists above in transport_link_set_config. */ if ((transport_get_acl_type(knet_h, transport) == USE_GENERIC_ACL) && (link->dynamic == KNET_LINK_STATIC)) { log_debug(knet_h, KNET_SUB_LINK, "Configuring default access lists for host: %u link: %u socket: %d", host_id, link_id, link->outsock); if ((check_add(knet_h, link, -1, &link->dst_addr, &link->dst_addr, CHECK_TYPE_ADDRESS, CHECK_ACCEPT) < 0) && (errno != EEXIST)) { log_warn(knet_h, KNET_SUB_LINK, "Failed to configure default access lists for host: %u link: %u", host_id, link_id); savederrno = errno; err = -1; goto exit_acl_error; } } /* * no errors should happen after link is configured */ link->configured = 1; log_debug(knet_h, KNET_SUB_LINK, "host: %u link: %u is configured", host_id, link_id); if (transport == KNET_TRANSPORT_LOOPBACK) { knet_h->has_loop_link = 1; knet_h->loop_link = link_id; host->status.reachable = 1; link->status.mtu = KNET_PMTUD_SIZE_V6; } else { /* * calculate the minimum MTU that is safe to use, * based on RFCs and that each network device should * be able to support without any troubles */ if (link->dynamic == KNET_LINK_STATIC) { /* * with static link we can be more precise than using * the generic calc_min_mtu() */ switch (link->dst_addr.ss_family) { case AF_INET6: link->status.mtu = calc_max_data_outlen(knet_h, KNET_PMTUD_MIN_MTU_V6 - (KNET_PMTUD_OVERHEAD_V6 + link->proto_overhead)); break; case AF_INET: link->status.mtu = calc_max_data_outlen(knet_h, KNET_PMTUD_MIN_MTU_V4 - (KNET_PMTUD_OVERHEAD_V4 + link->proto_overhead)); break; } } else { /* * for dynamic links we start with the minimum MTU * possible and PMTUd will kick in immediately * after connection status is 1 */ link->status.mtu = calc_min_mtu(knet_h); } link->has_valid_mtu = 1; } exit_acl_error: /* * if creating access lists has error, we only need to clean * the transport and the stuff below. */ if (err < 0) { if ((transport_link_clear_config(knet_h, link) < 0) && (errno != EBUSY)) { log_warn(knet_h, KNET_SUB_LINK, "Failed to deconfigure transport for host %u link %u: %s", host_id, link_id, strerror(errno)); } } exit_transport_err: /* * if transport has errors, transport will clean after itself * and we only need to clean the mutex */ if (err < 0) { pthread_mutex_destroy(&link->link_stats_mutex); } exit_unlock: /* * re-init the link on error */ if ((err < 0) && (wipelink)) { memset(link, 0, sizeof(struct knet_link)); link->link_id = link_id; } pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_link_get_config(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, uint8_t *transport, struct sockaddr_storage *src_addr, struct sockaddr_storage *dst_addr, uint8_t *dynamic, uint64_t *flags) { int savederrno = 0, err = 0; struct knet_host *host; struct knet_link *link; if (!_is_valid_handle(knet_h)) { return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } if (!src_addr) { errno = EINVAL; return -1; } if (!dynamic) { errno = EINVAL; return -1; } if (!transport) { errno = EINVAL; return -1; } if (!flags) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } link = &host->link[link_id]; if (!link->configured) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } if ((link->dynamic == KNET_LINK_STATIC) && (!dst_addr)) { savederrno = EINVAL; err = -1; goto exit_unlock; } memmove(src_addr, &link->src_addr, sizeof(struct sockaddr_storage)); *transport = link->transport; *flags = link->flags; if (link->dynamic == KNET_LINK_STATIC) { *dynamic = 0; memmove(dst_addr, &link->dst_addr, sizeof(struct sockaddr_storage)); } else { *dynamic = 1; } exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_link_clear_config(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id) { int savederrno = 0, err = 0; struct knet_host *host; struct knet_link *link; int sock; uint8_t transport; if (!_is_valid_handle(knet_h)) { return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } link = &host->link[link_id]; if (link->configured != 1) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Host %u link %u is not configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } if (link->status.enabled != 0) { err = -1; savederrno = EBUSY; log_err(knet_h, KNET_SUB_LINK, "Host %u link %u is currently in use: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } /* * remove well known access lists here. * After the transport has done clearing the config, * then we can remove any leftover access lists if the link * is no longer in use. */ if ((transport_get_acl_type(knet_h, link->transport) == USE_GENERIC_ACL) && (link->dynamic == KNET_LINK_STATIC)) { if ((check_rm(knet_h, link, &link->dst_addr, &link->dst_addr, CHECK_TYPE_ADDRESS, CHECK_ACCEPT) < 0) && (errno != ENOENT)) { err = -1; savederrno = errno; log_err(knet_h, KNET_SUB_LINK, "Host %u link %u: unable to remove default access list", host_id, link_id); goto exit_unlock; } } /* * cache it for later as we don't know if the transport * will clear link info during clear_config. */ sock = link->outsock; transport = link->transport; if ((transport_link_clear_config(knet_h, link) < 0) && (errno != EBUSY)) { savederrno = errno; err = -1; goto exit_unlock; } /* * remove any other access lists when the socket is no * longer in use by the transport. */ if ((transport_get_acl_type(knet_h, transport) == USE_GENERIC_ACL) && (knet_h->knet_transport_fd_tracker[sock].transport == KNET_MAX_TRANSPORTS)) { check_rmall(knet_h, link); } pthread_mutex_destroy(&link->link_stats_mutex); memset(link, 0, sizeof(struct knet_link)); link->link_id = link_id; if (knet_h->has_loop_link && host_id == knet_h->host_id && link_id == knet_h->loop_link) { knet_h->has_loop_link = 0; if (host->active_link_entries == 0) { host->status.reachable = 0; } } log_debug(knet_h, KNET_SUB_LINK, "host: %u link: %u config has been wiped", host_id, link_id); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_link_set_enable(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, unsigned int enabled) { int savederrno = 0, err = 0; struct knet_host *host; struct knet_link *link; if (!_is_valid_handle(knet_h)) { return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } if (enabled > 1) { errno = EINVAL; return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } link = &host->link[link_id]; if (!link->configured) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } if (link->status.enabled == enabled) { err = 0; goto exit_unlock; } err = _link_updown(knet_h, host_id, link_id, enabled, link->status.connected, 0); savederrno = errno; if (enabled) { goto exit_unlock; } log_debug(knet_h, KNET_SUB_LINK, "host: %u link: %u is disabled", host_id, link_id); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_link_get_enable(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, unsigned int *enabled) { int savederrno = 0, err = 0; struct knet_host *host; struct knet_link *link; if (!_is_valid_handle(knet_h)) { return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } if (!enabled) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } link = &host->link[link_id]; if (!link->configured) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } *enabled = link->status.enabled; exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_link_set_pong_count(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, uint8_t pong_count) { int savederrno = 0, err = 0; struct knet_host *host; struct knet_link *link; if (!_is_valid_handle(knet_h)) { return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } if (pong_count < 1) { errno = EINVAL; return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } link = &host->link[link_id]; if (!link->configured) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } link->pong_count = pong_count; log_debug(knet_h, KNET_SUB_LINK, "host: %u link: %u pong count update: %u", host_id, link_id, link->pong_count); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_link_get_pong_count(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, uint8_t *pong_count) { int savederrno = 0, err = 0; struct knet_host *host; struct knet_link *link; if (!_is_valid_handle(knet_h)) { return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } if (!pong_count) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } link = &host->link[link_id]; if (!link->configured) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } *pong_count = link->pong_count; exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_link_set_ping_timers(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, time_t interval, time_t timeout, unsigned int precision) { int savederrno = 0, err = 0; struct knet_host *host; struct knet_link *link; unsigned long long ping_interval, pong_timeout; if (!_is_valid_handle(knet_h)) { return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } if (interval <= 0) { errno = EINVAL; return -1; } if (timeout <= 0) { errno = ENOSYS; return -1; } if (!precision) { errno = EINVAL; return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } link = &host->link[link_id]; if (!link->configured) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } ping_interval = (unsigned long long)interval * 1000; /* microseconds */ if (ping_interval < KNET_THREADS_TIMERES) { log_warn(knet_h, KNET_SUB_LINK, "host: %u link: %u interval: %llu too small (%s). interval lower than thread_timer_res (%u ms) has no effect", host_id, link_id, (unsigned long long)interval, strerror(savederrno), (KNET_THREADS_TIMERES / 1000)); } pong_timeout = (unsigned long long)timeout * 1000; /* microseconds */ if (pong_timeout < KNET_THREADS_TIMERES) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host: %u link: %u pong timeout: %llu too small (%s). timeout cannot be less than thread_timer_res (%u ms)", host_id, link_id, (unsigned long long)timeout, strerror(savederrno), (KNET_THREADS_TIMERES / 1000)); goto exit_unlock; } link->ping_interval = ping_interval; link->pong_timeout = pong_timeout; link->latency_max_samples = precision; log_debug(knet_h, KNET_SUB_LINK, "host: %u link: %u timeout update - interval: %llu timeout: %llu precision: %u", host_id, link_id, link->ping_interval, link->pong_timeout, precision); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_link_get_ping_timers(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, time_t *interval, time_t *timeout, unsigned int *precision) { int savederrno = 0, err = 0; struct knet_host *host; struct knet_link *link; if (!_is_valid_handle(knet_h)) { return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } if (!interval) { errno = EINVAL; return -1; } if (!timeout) { errno = EINVAL; return -1; } if (!precision) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } link = &host->link[link_id]; if (!link->configured) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } *interval = link->ping_interval / 1000; /* microseconds */ *timeout = link->pong_timeout / 1000; *precision = link->latency_max_samples; exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_link_set_priority(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, uint8_t priority) { int savederrno = 0, err = 0; struct knet_host *host; struct knet_link *link; uint8_t old_priority; if (!_is_valid_handle(knet_h)) { return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } link = &host->link[link_id]; if (!link->configured) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } old_priority = link->priority; if (link->priority == priority) { err = 0; goto exit_unlock; } link->priority = priority; if (_host_dstcache_update_sync(knet_h, host)) { savederrno = errno; log_debug(knet_h, KNET_SUB_LINK, "Unable to update link priority (host: %u link: %u priority: %u): %s", host_id, link_id, link->priority, strerror(savederrno)); link->priority = old_priority; err = -1; goto exit_unlock; } log_debug(knet_h, KNET_SUB_LINK, "host: %u link: %u priority set to: %u", host_id, link_id, link->priority); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_link_get_priority(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, uint8_t *priority) { int savederrno = 0, err = 0; struct knet_host *host; struct knet_link *link; if (!_is_valid_handle(knet_h)) { return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } if (!priority) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } link = &host->link[link_id]; if (!link->configured) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } *priority = link->priority; exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_link_get_link_list(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t *link_ids, size_t *link_ids_entries) { int savederrno = 0, err = 0, i, count = 0; struct knet_host *host; struct knet_link *link; if (!_is_valid_handle(knet_h)) { return -1; } if (!link_ids) { errno = EINVAL; return -1; } if (!link_ids_entries) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } for (i = 0; i < KNET_MAX_LINK; i++) { link = &host->link[i]; if (!link->configured) { continue; } link_ids[count] = i; count++; } *link_ids_entries = count; exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_link_get_status(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, struct knet_link_status *status, size_t struct_size) { int savederrno = 0, err = 0; struct knet_host *host; struct knet_link *link; if (!_is_valid_handle(knet_h)) { return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } if (!status) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } link = &host->link[link_id]; if (!link->configured) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } savederrno = pthread_mutex_lock(&link->link_stats_mutex); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get stats mutex lock for host %u link %u: %s", host_id, link_id, strerror(savederrno)); err = -1; goto exit_unlock; } memmove(status, &link->status, struct_size); pthread_mutex_unlock(&link->link_stats_mutex); /* Calculate totals - no point in doing this on-the-fly */ status->stats.rx_total_packets = status->stats.rx_data_packets + status->stats.rx_ping_packets + status->stats.rx_pong_packets + status->stats.rx_pmtu_packets; status->stats.tx_total_packets = status->stats.tx_data_packets + status->stats.tx_ping_packets + status->stats.tx_pong_packets + status->stats.tx_pmtu_packets; status->stats.rx_total_bytes = status->stats.rx_data_bytes + status->stats.rx_ping_bytes + status->stats.rx_pong_bytes + status->stats.rx_pmtu_bytes; status->stats.tx_total_bytes = status->stats.tx_data_bytes + status->stats.tx_ping_bytes + status->stats.tx_pong_bytes + status->stats.tx_pmtu_bytes; status->stats.tx_total_errors = status->stats.tx_data_errors + status->stats.tx_ping_errors + status->stats.tx_pong_errors + status->stats.tx_pmtu_errors; status->stats.tx_total_retries = status->stats.tx_data_retries + status->stats.tx_ping_retries + status->stats.tx_pong_retries + status->stats.tx_pmtu_retries; /* Tell the caller our full size in case they have an old version */ status->size = sizeof(struct knet_link_status); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_link_insert_acl(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, int index, struct sockaddr_storage *ss1, struct sockaddr_storage *ss2, check_type_t type, check_acceptreject_t acceptreject) { int savederrno = 0, err = 0; struct knet_host *host; struct knet_link *link; if (!_is_valid_handle(knet_h)) { return -1; } if (!ss1) { errno = EINVAL; return -1; } if ((type != CHECK_TYPE_ADDRESS) && (type != CHECK_TYPE_MASK) && (type != CHECK_TYPE_RANGE)) { errno = EINVAL; return -1; } if ((acceptreject != CHECK_ACCEPT) && (acceptreject != CHECK_REJECT)) { errno = EINVAL; return -1; } if ((type != CHECK_TYPE_ADDRESS) && (!ss2)) { errno = EINVAL; return -1; } if ((type == CHECK_TYPE_RANGE) && (ss1->ss_family != ss2->ss_family)) { errno = EINVAL; return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } link = &host->link[link_id]; if (!link->configured) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } if (link->dynamic != KNET_LINK_DYNIP) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is a point to point connection: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } err = check_add(knet_h, link, index, ss1, ss2, type, acceptreject); savederrno = errno; exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_link_add_acl(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, struct sockaddr_storage *ss1, struct sockaddr_storage *ss2, check_type_t type, check_acceptreject_t acceptreject) { return knet_link_insert_acl(knet_h, host_id, link_id, -1, ss1, ss2, type, acceptreject); } int knet_link_rm_acl(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, struct sockaddr_storage *ss1, struct sockaddr_storage *ss2, check_type_t type, check_acceptreject_t acceptreject) { int savederrno = 0, err = 0; struct knet_host *host; struct knet_link *link; if (!_is_valid_handle(knet_h)) { return -1; } if (!ss1) { errno = EINVAL; return -1; } if ((type != CHECK_TYPE_ADDRESS) && (type != CHECK_TYPE_MASK) && (type != CHECK_TYPE_RANGE)) { errno = EINVAL; return -1; } if ((acceptreject != CHECK_ACCEPT) && (acceptreject != CHECK_REJECT)) { errno = EINVAL; return -1; } if ((type != CHECK_TYPE_ADDRESS) && (!ss2)) { errno = EINVAL; return -1; } if ((type == CHECK_TYPE_RANGE) && (ss1->ss_family != ss2->ss_family)) { errno = EINVAL; return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } link = &host->link[link_id]; if (!link->configured) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } if (link->dynamic != KNET_LINK_DYNIP) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is a point to point connection: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } err = check_rm(knet_h, link, ss1, ss2, type, acceptreject); savederrno = errno; exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_link_clear_acl(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id) { int savederrno = 0, err = 0; struct knet_host *host; struct knet_link *link; if (!_is_valid_handle(knet_h)) { return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } link = &host->link[link_id]; if (!link->configured) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } if (link->dynamic != KNET_LINK_DYNIP) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is a point to point connection: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } check_rmall(knet_h, link); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } diff --git a/libknet/logging.c b/libknet/logging.c index cb5462da..6d2fe0cb 100644 --- a/libknet/logging.c +++ b/libknet/logging.c @@ -1,274 +1,275 @@ /* * Copyright (C) 2010-2025 Red Hat, Inc. All rights reserved. * * Author: Fabio M. Di Nitto * * This software licensed under LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include #include #include "internals.h" #include "logging.h" #include "threads_common.h" static struct pretty_names subsystem_names[KNET_MAX_SUBSYSTEMS] = { { "common", KNET_SUB_COMMON }, { "handle", KNET_SUB_HANDLE }, { "host", KNET_SUB_HOST }, { "listener", KNET_SUB_LISTENER }, { "link", KNET_SUB_LINK }, { "transport", KNET_SUB_TRANSPORT }, { "crypto", KNET_SUB_CRYPTO }, { "compress", KNET_SUB_COMPRESS }, { "filter", KNET_SUB_FILTER }, { "dstcache", KNET_SUB_DSTCACHE }, { "heartbeat", KNET_SUB_HEARTBEAT }, { "pmtud", KNET_SUB_PMTUD }, { "tx", KNET_SUB_TX }, { "rx", KNET_SUB_RX }, { "loopback", KNET_SUB_TRANSP_LOOPBACK }, { "udp", KNET_SUB_TRANSP_UDP }, { "sctp", KNET_SUB_TRANSP_SCTP }, { "nsscrypto", KNET_SUB_NSSCRYPTO }, { "opensslcrypto", KNET_SUB_OPENSSLCRYPTO }, { "zlibcomp", KNET_SUB_ZLIBCOMP }, { "lz4comp", KNET_SUB_LZ4COMP }, { "lz4hccomp", KNET_SUB_LZ4HCCOMP }, { "lzo2comp", KNET_SUB_LZO2COMP }, { "lzmacomp", KNET_SUB_LZMACOMP }, { "bzip2comp", KNET_SUB_BZIP2COMP }, { "zstdcomp", KNET_SUB_ZSTDCOMP }, { "unknown", KNET_SUB_UNKNOWN } /* unknown MUST always be last in this array */ }; const char *knet_log_get_subsystem_name(uint8_t subsystem) { unsigned int i; for (i = 0; i < KNET_MAX_SUBSYSTEMS; i++) { if (subsystem_names[i].val == KNET_SUB_UNKNOWN) { break; } if (subsystem_names[i].val == subsystem) { errno = 0; return subsystem_names[i].name; } } return "unknown"; } uint8_t knet_log_get_subsystem_id(const char *name) { unsigned int i; for (i = 0; i < KNET_MAX_SUBSYSTEMS; i++) { if (subsystem_names[i].val == KNET_SUB_UNKNOWN) { break; } if (strcasecmp(name, subsystem_names[i].name) == 0) { errno = 0; return subsystem_names[i].val; } } return KNET_SUB_UNKNOWN; } static int is_valid_subsystem(uint8_t subsystem) { unsigned int i; for (i = 0; i < KNET_MAX_SUBSYSTEMS; i++) { if ((subsystem != KNET_SUB_UNKNOWN) && (subsystem_names[i].val == KNET_SUB_UNKNOWN)) { break; } if (subsystem_names[i].val == subsystem) { return 0; } } return -1; } static struct pretty_names loglevel_names[KNET_LOG_TRACE + 1] = { { "ERROR", KNET_LOG_ERR }, { "WARNING", KNET_LOG_WARN }, { "info", KNET_LOG_INFO }, { "debug", KNET_LOG_DEBUG }, { "trace", KNET_LOG_TRACE } }; const char *knet_log_get_loglevel_name(uint8_t level) { unsigned int i; for (i = 0; i <= KNET_LOG_TRACE; i++) { if (loglevel_names[i].val == level) { errno = 0; return loglevel_names[i].name; } } return "ERROR"; } uint8_t knet_log_get_loglevel_id(const char *name) { unsigned int i; for (i = 0; i <= KNET_LOG_TRACE; i++) { if (strcasecmp(name, loglevel_names[i].name) == 0) { errno = 0; return loglevel_names[i].val; } } return KNET_LOG_ERR; } int knet_log_set_loglevel(knet_handle_t knet_h, uint8_t subsystem, uint8_t level) { int savederrno = 0; if (!_is_valid_handle(knet_h)) { return -1; } if (is_valid_subsystem(subsystem) < 0) { errno = EINVAL; return -1; } if (level > KNET_LOG_TRACE) { errno = EINVAL; return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, subsystem, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } knet_h->log_levels[subsystem] = level; pthread_rwlock_unlock(&knet_h->global_rwlock); errno = 0; return 0; } int knet_log_get_loglevel(knet_handle_t knet_h, uint8_t subsystem, uint8_t *level) { int savederrno = 0; if (!_is_valid_handle(knet_h)) { return -1; } if (is_valid_subsystem(subsystem) < 0) { errno = EINVAL; return -1; } if (!level) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, subsystem, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } *level = knet_h->log_levels[subsystem]; pthread_rwlock_unlock(&knet_h->global_rwlock); errno = 0; return 0; } void log_msg(knet_handle_t knet_h, uint8_t subsystem, uint8_t msglevel, const char *fmt, ...) { va_list ap; struct knet_log_msg msg; size_t byte_cnt = 0; int len; int retry_loop = 0; if ((!knet_h) || (subsystem == KNET_MAX_SUBSYSTEMS) || (msglevel > knet_h->log_levels[subsystem])) return; if (knet_h->logfd <= 0) goto out; memset(&msg, 0, sizeof(struct knet_log_msg)); msg.subsystem = subsystem; msg.msglevel = msglevel; + // coverity[UNINIT:SUPPRESS] - va_start is a macro, that's the C standard va_start(ap, fmt); #ifdef __clang__ #pragma clang diagnostic push #pragma clang diagnostic ignored "-Wformat-nonliteral" #endif vsnprintf(msg.msg, sizeof(msg.msg), fmt, ap); #ifdef __clang__ #pragma clang diagnostic pop #endif va_end(ap); retry: while (byte_cnt < sizeof(struct knet_log_msg)) { len = write(knet_h->logfd, &msg, sizeof(struct knet_log_msg) - byte_cnt); if (len <= 0) { if (errno == EAGAIN) { struct timeval tv; /* * those 3 lines are the equivalent of usleep(1) * but usleep makes some static code analizers very * unhappy. * * this version is somewhat stolen from gnulib * nanosleep implementation */ tv.tv_sec = 0; tv.tv_usec = 1; select(0, NULL, NULL, NULL, &tv); retry_loop++; /* * arbitrary amount of retries. * tested with fun_log_bench, 10 retries was never hit */ if (retry_loop >= 100) { goto out; } goto retry; } goto out; } byte_cnt += len; } out: return; } diff --git a/libknet/tests/api_knet_host_set_name.c b/libknet/tests/api_knet_host_set_name.c index 6e07d9b9..305bbf49 100644 --- a/libknet/tests/api_knet_host_set_name.c +++ b/libknet/tests/api_knet_host_set_name.c @@ -1,92 +1,93 @@ /* * Copyright (C) 2016-2025 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * * This software licensed under GPL-2.0+ */ #include "config.h" #include #include #include #include #include #include "libknet.h" #include "internals.h" #include "test-common.h" static void test(void) { knet_handle_t knet_h1, knet_h[2]; int logfds[2]; int res; char longhostname[KNET_MAX_HOST_LEN+2]; printf("Test knet_host_set_name incorrect knet_h\n"); + // coverity[CHECKED_RETURN:SUPPRESS] - it's a test , get over it if ((!knet_host_set_name(NULL, 1, "test")) || (errno != EINVAL)) { printf("knet_host_set_name accepted invalid knet_h or returned incorrect error: %s\n", strerror(errno)); exit(FAIL); } setup_logpipes(logfds); knet_h1 = knet_handle_start(logfds, KNET_LOG_DEBUG, knet_h); flush_logs(logfds[0], stdout); printf("Test knet_host_set_name with incorrect hostid 1\n"); FAIL_ON_SUCCESS(knet_host_set_name(knet_h1, 2, "test"), EINVAL); printf("Test knet_host_set_name with correct values\n"); FAIL_ON_ERR(knet_host_add(knet_h1, 1)); FAIL_ON_ERR(knet_host_set_name(knet_h1, 1, "test")); if (strcmp("test", knet_h1->host_index[1]->name)) { printf("knet_host_set_name failed to copy name\n"); CLEAN_EXIT(FAIL); } printf("Test knet_host_set_name with correct values (name change)\n"); FAIL_ON_ERR(knet_host_set_name(knet_h1, 1, "tes")); if (strcmp("tes", knet_h1->host_index[1]->name)) { printf("knet_host_set_name failed to change name\n"); CLEAN_EXIT(FAIL); } printf("Test knet_host_set_name with NULL name\n"); FAIL_ON_SUCCESS(knet_host_set_name(knet_h1, 1, NULL), EINVAL); printf("Test knet_host_set_name with duplicate name\n"); FAIL_ON_ERR(knet_host_add(knet_h1, 2)); if ((!knet_host_set_name(knet_h1, 2, "tes")) || (errno != EEXIST)) { printf("knet_host_set_name accepted duplicated name or returned incorrect error: %s\n", strerror(errno)); CLEAN_EXIT(FAIL); } knet_host_remove(knet_h1, 2); flush_logs(logfds[0], stdout); printf("Test knet_host_set_name with (too) long name\n"); memset(longhostname, 'a', sizeof(longhostname)); longhostname[KNET_MAX_HOST_LEN] = '\0'; if ((!knet_host_set_name(knet_h1, 1, longhostname)) || (errno != EINVAL)) { printf("knet_host_set_name accepted invalid (too long) name or returned incorrect error: %s\n", strerror(errno)); CLEAN_EXIT(FAIL); } CLEAN_EXIT(CONTINUE); } int main(int argc, char *argv[]) { test(); return PASS; } diff --git a/libknet/tests/api_knet_send_sync.c b/libknet/tests/api_knet_send_sync.c index bca163b0..66dc92c5 100644 --- a/libknet/tests/api_knet_send_sync.c +++ b/libknet/tests/api_knet_send_sync.c @@ -1,213 +1,218 @@ /* * Copyright (C) 2016-2025 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * * This software licensed under GPL-2.0+ */ #include "config.h" #include #include #include #include #include #include "libknet.h" #include "internals.h" #include "netutils.h" #include "test-common.h" static int private_data; static void sock_notify(void *pvt_data, int datafd, int8_t channel, uint8_t tx_rx, int error, int errorno) { return; } static int dhost_filter_ret = 0; static int dhost_filter(void *pvt_data, const unsigned char *outdata, ssize_t outdata_len, uint8_t tx_rx, knet_node_id_t this_host_id, knet_node_id_t src_host_id, int8_t *dst_channel, knet_node_id_t *dst_host_ids, size_t *dst_host_ids_entries) { dst_host_ids[0] = 0; /* * fatal fault */ if (dhost_filter_ret < 0) { return -1; } /* * trigger EINVAL * no ids found */ if (dhost_filter_ret == 0) { *dst_host_ids_entries = 0; return 0; } /* * send correct info back */ if (dhost_filter_ret == 1) { dst_host_ids[0] = 1; *dst_host_ids_entries = 1; return 0; } /* * trigger E2BIG * mcast destinations */ if (dhost_filter_ret == 2) { dst_host_ids[0] = 1; *dst_host_ids_entries = 2; return 0; } /* * return mcast */ if (dhost_filter_ret == 3) { return 1; } return dhost_filter_ret; } static void test(void) { knet_handle_t knet_h1, knet_h[2]; int logfds[2]; int datafd = 0; int8_t channel = 0; char send_buff[KNET_MAX_PACKET_SIZE]; struct sockaddr_storage lo; int res; memset(send_buff, 0, sizeof(send_buff)); printf("Test knet_send_sync incorrect knet_h\n"); if ((!knet_send_sync(NULL, send_buff, KNET_MAX_PACKET_SIZE, channel)) || (errno != EINVAL)) { printf("knet_send_sync accepted invalid knet_h or returned incorrect error: %s\n", strerror(errno)); exit(FAIL); } setup_logpipes(logfds); knet_h1 = knet_handle_start(logfds, KNET_LOG_DEBUG, knet_h); printf("Test knet_send_sync with no send_buff\n"); FAIL_ON_SUCCESS(knet_send_sync(knet_h1, NULL, KNET_MAX_PACKET_SIZE, channel), EINVAL); printf("Test knet_send_sync with invalid send_buff len (0)\n"); FAIL_ON_SUCCESS(knet_send_sync(knet_h1, send_buff, 0, channel), EINVAL); printf("Test knet_send_sync with invalid send_buff len (> KNET_MAX_PACKET_SIZE)\n"); FAIL_ON_SUCCESS(knet_send_sync(knet_h1, send_buff, KNET_MAX_PACKET_SIZE + 1, channel), EINVAL); printf("Test knet_send_sync with invalid channel (-1)\n"); channel = -1; FAIL_ON_SUCCESS(knet_send_sync(knet_h1, send_buff, KNET_MAX_PACKET_SIZE, channel), EINVAL); printf("Test knet_send_sync with invalid channel (KNET_DATAFD_MAX)\n"); channel = KNET_DATAFD_MAX; FAIL_ON_SUCCESS(knet_send_sync(knet_h1, send_buff, KNET_MAX_PACKET_SIZE, channel), EINVAL); printf("Test knet_send_sync with no filter configured\n"); channel = 1; FAIL_ON_SUCCESS(knet_send_sync(knet_h1, send_buff, KNET_MAX_PACKET_SIZE, channel), ENETDOWN); + // coverity[LOCK:SUPPRESS] - it's a test, get over it + // coverity[ORDER_REVERSAL:SUPPRESS] - it's a test, get over it FAIL_ON_ERR(knet_handle_enable_filter(knet_h1, NULL, dhost_filter)); printf("Test knet_send_sync with unconfigured channel\n"); channel = 0; + + // coverity[ORDER_REVERSAL:SUPPRESS] - it's a test, get over it FAIL_ON_SUCCESS(knet_send_sync(knet_h1, send_buff, KNET_MAX_PACKET_SIZE, channel), EINVAL); printf("Test knet_send_sync with data forwarding disabled\n"); FAIL_ON_ERR(knet_handle_enable_sock_notify(knet_h1, &private_data, sock_notify)); datafd = 0; channel = -1; + // coverity[ORDER_REVERSAL:SUPPRESS] - it's a test, get over it FAIL_ON_ERR(knet_handle_add_datafd(knet_h1, &datafd, &channel)); if ((knet_send_sync(knet_h1, send_buff, KNET_MAX_PACKET_SIZE, channel) == sizeof(send_buff)) || (errno != ECANCELED)) { printf("knet_send_sync didn't detect datafwd disabled or returned incorrect error: %s\n", strerror(errno)); CLEAN_EXIT(FAIL); } printf("Test knet_send_sync with broken dst_host_filter\n"); FAIL_ON_ERR(knet_handle_setfwd(knet_h1, 1)); dhost_filter_ret = -1; if ((knet_send_sync(knet_h1, send_buff, KNET_MAX_PACKET_SIZE, channel) == sizeof(send_buff)) || (errno != EFAULT)) { printf("knet_send_sync didn't detect fatal error from dst_host_filter or returned incorrect error: %s\n", strerror(errno)); CLEAN_EXIT(FAIL); } printf("Test knet_send_sync with dst_host_filter returning no host_ids_entries\n"); dhost_filter_ret = 0; if ((knet_send_sync(knet_h1, send_buff, KNET_MAX_PACKET_SIZE, channel) == sizeof(send_buff)) || (errno != EINVAL)) { printf("knet_send_sync didn't detect 0 host_ids from dst_host_filter or returned incorrect error: %s\n", strerror(errno)); CLEAN_EXIT(FAIL); } printf("Test knet_send_sync with host down\n"); dhost_filter_ret = 1; if ((knet_send_sync(knet_h1, send_buff, KNET_MAX_PACKET_SIZE, channel) == sizeof(send_buff)) || (errno != EHOSTDOWN)) { printf("knet_send_sync didn't detect hostdown or returned incorrect error: %s\n", strerror(errno)); CLEAN_EXIT(FAIL); } printf("Test knet_send_sync with dst_host_filter returning too many host_ids_entries\n"); FAIL_ON_ERR(knet_host_add(knet_h1, 1)); FAIL_ON_ERR(_knet_link_set_config(knet_h1, 1, 0, KNET_TRANSPORT_UDP, 0, AF_INET, 0, &lo)); FAIL_ON_ERR(knet_link_set_enable(knet_h1, 1, 0, 1)); FAIL_ON_ERR(wait_for_host(knet_h1, 1, 10, logfds[0], stdout)); dhost_filter_ret = 2; if ((knet_send_sync(knet_h1, send_buff, KNET_MAX_PACKET_SIZE, channel) == sizeof(send_buff)) || (errno != E2BIG)) { printf("knet_send_sync didn't detect 2+ host_ids from dst_host_filter or returned incorrect error: %s\n", strerror(errno)); CLEAN_EXIT(FAIL); } printf("Test knet_send_sync with dst_host_filter returning mcast packets\n"); dhost_filter_ret = 3; if ((knet_send_sync(knet_h1, send_buff, KNET_MAX_PACKET_SIZE, channel) == sizeof(send_buff)) || (errno != E2BIG)) { printf("knet_send_sync didn't detect mcast packet from dst_host_filter or returned incorrect error: %s\n", strerror(errno)); CLEAN_EXIT(FAIL); } printf("Test knet_send_sync with valid data\n"); dhost_filter_ret = 1; FAIL_ON_ERR(knet_send_sync(knet_h1, send_buff, KNET_MAX_PACKET_SIZE, channel)); FAIL_ON_ERR(knet_handle_setfwd(knet_h1, 0)); CLEAN_EXIT(CONTINUE); } int main(int argc, char *argv[]) { test(); return PASS; } diff --git a/libknet/tests/fun_acl_check.c b/libknet/tests/fun_acl_check.c index d105e78f..b7ca64f2 100644 --- a/libknet/tests/fun_acl_check.c +++ b/libknet/tests/fun_acl_check.c @@ -1,404 +1,405 @@ /* * Copyright (C) 2021-2025 Red Hat, Inc. All rights reserved. * * Authors: Christine Caulfield * * This software licensed under GPL-2.0+ */ #include "config.h" #include #include #include #include #include #include #include #include #include "libknet.h" #include "internals.h" #include "netutils.h" #include "test-common.h" /* * Keep track of how many messages got through: * clean + 3xACLs + QUIT */ #define CORRECT_NUM_MSGS 5 static int msgs_recvd = 0; #undef TESTNODES #define TESTNODES 2 static pthread_mutex_t recv_mutex = PTHREAD_MUTEX_INITIALIZER; static int quit_recv_thread = 0; static int reply_pipe[2]; /* Our local version of FOE that also tidies up the threads */ #define FAIL_ON_ERR_THR(fn) \ printf("FOE: %s\n", #fn); \ if ((res = fn) != 0) { \ int savederrno = errno; \ pthread_mutex_lock(&recv_mutex); \ quit_recv_thread = 1; \ pthread_mutex_unlock(&recv_mutex); \ if (recv_thread) { \ pthread_join(recv_thread, (void**)&thread_err); \ } \ knet_handle_stop_everything(knet_h, TESTNODES); \ stop_logthread(); \ flush_logs(logfds[0], stdout); \ close_logpipes(logfds); \ close(reply_pipe[0]); \ close(reply_pipe[1]); \ if (res == -2) { \ exit(SKIP); \ } else { \ printf("*** FAIL on line %d %s failed: %s\n", __LINE__ , #fn, strerror(savederrno)); \ exit(FAIL); \ } \ } static int knet_send_str(knet_handle_t knet_h, char *str) { + // coverity[LOCK:SUPPRESS] - it's a test, get over it return knet_send_sync(knet_h, str, strlen(str)+1, 0); } /* * lo0 is filled in with the local address on return. * lo1 is expected to be provided - it's the actual remote address to connect to. */ int dyn_knet_link_set_config(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, uint8_t transport, uint64_t flags, int family, int dynamic, struct sockaddr_storage *lo0, struct sockaddr_storage *lo1) { int err = 0, savederrno = 0; uint32_t port; char portstr[32]; for (port = 1025; port < 65536; port++) { sprintf(portstr, "%u", port); memset(lo0, 0, sizeof(struct sockaddr_storage)); if (family == AF_INET6) { err = knet_strtoaddr("::1", portstr, lo0, sizeof(struct sockaddr_storage)); } else { err = knet_strtoaddr("127.0.0.1", portstr, lo0, sizeof(struct sockaddr_storage)); } if (err < 0) { printf("Unable to convert loopback to sockaddr: %s\n", strerror(errno)); goto out; } errno = 0; if (dynamic) { err = knet_link_set_config(knet_h, host_id, link_id, transport, lo0, NULL, flags); } else { err = knet_link_set_config(knet_h, host_id, link_id, transport, lo0, lo1, flags); } savederrno = errno; if ((err < 0) && (savederrno != EADDRINUSE)) { if (savederrno == EPROTONOSUPPORT && transport == KNET_TRANSPORT_SCTP) { return -2; } else { printf("Unable to configure link: %s\n", strerror(savederrno)); goto out; } } if (!err) { printf("Using port %u\n", port); goto out; } } if (err) { printf("No more ports available\n"); } out: errno = savederrno; return err; } static void *recv_messages(void *handle) { knet_handle_t knet_h = (knet_handle_t)handle; char buf[4096]; ssize_t len; static int err = 0; int savederrno = 0, quit = 0; while ((len = knet_recv(knet_h, buf, sizeof(buf), 0)) && (!quit)) { savederrno = errno; pthread_mutex_lock(&recv_mutex); quit = quit_recv_thread; pthread_mutex_unlock(&recv_mutex); if (quit) { printf(" *** recv thread was requested to exit via FOE\n"); err = 1; return &err; } if (len > 0) { int res; printf("recv: (%ld) %s\n", (long)len, buf); msgs_recvd++; if (strcmp("QUIT", buf) == 0) { break; } if (buf[0] == '0') { /* We should not have received this! */ printf(" *** FAIL received packet that should have been blocked\n"); err = 1; return &err; } /* Tell the main thread we have received something */ res = write(reply_pipe[1], ".", 1); if (res != 1) { printf(" *** FAIL to send response back to main thread\n"); err = 1; return &err; } } usleep(1000); if (len < 0 && savederrno != EAGAIN) { break; } } printf("-- recv thread finished: %zd %d %s\n", len, errno, strerror(savederrno)); return &err; } static void notify_fn(void *private_data, int datafd, int8_t channel, uint8_t tx_rx, int error, int errorno) { printf("NOTIFY fn called\n"); } /* A VERY basic filter because all data traffic is going to one place */ static int dhost_filter(void *pvt_data, const unsigned char *outdata, ssize_t outdata_len, uint8_t tx_rx, knet_node_id_t this_host_id, knet_node_id_t src_host_id, int8_t *dst_channel, knet_node_id_t *dst_host_ids, size_t *dst_host_ids_entries) { dst_host_ids[0] = 1; *dst_host_ids_entries = 1; return 0; } /* This used to be a pthread condition variable, but there was a race where it could be triggered before the main thread was waiting for it. Go old-fashioned. */ static int wait_for_reply(int seconds) { int res; struct pollfd pfds; char tmpbuf[32]; pfds.fd = reply_pipe[0]; pfds.events = POLLIN | POLLERR | POLLHUP; pfds.revents = 0; res = poll(&pfds, 1, seconds*1000); if (res == 1) { if (pfds.revents & POLLIN) { res = read(reply_pipe[0], tmpbuf, sizeof(tmpbuf)); if (res > 0) { return 0; } } else { printf("Error on pipe poll revent = 0x%x\n", pfds.revents); errno = EIO; } } if (res == 0) { errno = ETIMEDOUT; return -1; } return -1; } static void test(int transport) { knet_handle_t knet_h[TESTNODES+1]; int logfds[2]; struct sockaddr_storage lo0, lo1; struct sockaddr_storage ss1, ss2; int res; pthread_t recv_thread = 0; int *thread_err; int datafd; int8_t channel; int seconds = 90; // dynamic tests take longer than normal tests if (is_memcheck() || is_helgrind()) { printf("Test suite is running under valgrind, adjusting wait_for_host timeout\n"); seconds = seconds * 16; } memset(knet_h, 0, sizeof(knet_h)); memset(reply_pipe, 0, sizeof(reply_pipe)); memset(logfds, 0, sizeof(logfds)); FAIL_ON_ERR_THR(pipe(reply_pipe)); // Initial setup gubbins msgs_recvd = 0; setup_logpipes(logfds); start_logthread(logfds[1], stdout); knet_handle_start_nodes(knet_h, TESTNODES, logfds, KNET_LOG_DEBUG); FAIL_ON_ERR_THR(knet_host_add(knet_h[2], 1)); FAIL_ON_ERR_THR(knet_host_add(knet_h[1], 2)); FAIL_ON_ERR_THR(knet_handle_enable_filter(knet_h[2], NULL, dhost_filter)); // Create the dynamic (receiving) link FAIL_ON_ERR_THR(dyn_knet_link_set_config(knet_h[1], 2, 0, transport, 0, AF_INET, 1, &lo0, NULL)); // Connect to the dynamic link FAIL_ON_ERR_THR(dyn_knet_link_set_config(knet_h[2], 1, 0, transport, 0, AF_INET, 0, &lo1, &lo0)); // All the rest of the setup gubbins FAIL_ON_ERR_THR(knet_handle_enable_sock_notify(knet_h[1], 0, ¬ify_fn)); FAIL_ON_ERR_THR(knet_handle_enable_sock_notify(knet_h[2], 0, ¬ify_fn)); channel = datafd = 0; FAIL_ON_ERR_THR(knet_handle_add_datafd(knet_h[1], &datafd, &channel)); channel = datafd = 0; FAIL_ON_ERR_THR(knet_handle_add_datafd(knet_h[2], &datafd, &channel)); FAIL_ON_ERR_THR(knet_link_set_enable(knet_h[1], 2, 0, 1)); FAIL_ON_ERR_THR(knet_link_set_enable(knet_h[2], 1, 0, 1)); FAIL_ON_ERR_THR(knet_handle_setfwd(knet_h[1], 1)); FAIL_ON_ERR_THR(knet_handle_setfwd(knet_h[2], 1)); // Start receive thread FAIL_ON_ERR_THR(pthread_create(&recv_thread, NULL, recv_messages, (void *)knet_h[1])); // Let everything settle down FAIL_ON_ERR_THR(wait_for_nodes_state(knet_h[1], TESTNODES, 1, seconds, logfds[0], stdout)); FAIL_ON_ERR_THR(wait_for_nodes_state(knet_h[2], TESTNODES, 1, seconds, logfds[0], stdout)); /* * TESTING STARTS HERE * strings starting '1' should reach the receiving thread * strings starting '0' should not */ // No ACL printf("Testing No ACL - this should get through\n"); FAIL_ON_ERR_THR(knet_send_str(knet_h[2], "1No ACL - this should get through")); FAIL_ON_ERR_THR(wait_for_reply(seconds)) // Block traffic from this address. memset(&ss1, 0, sizeof(ss1)); memset(&ss2, 0, sizeof(ss1)); knet_strtoaddr("127.0.0.1","0", &ss1, sizeof(ss1)); FAIL_ON_ERR_THR(knet_link_add_acl(knet_h[1], 2, 0, &ss1, NULL, CHECK_TYPE_ADDRESS, CHECK_REJECT)); // Accept ACL for when we remove them FAIL_ON_ERR_THR(knet_link_add_acl(knet_h[1], 2, 0, &ss1, NULL, CHECK_TYPE_ADDRESS, CHECK_ACCEPT)); // This needs to go after the first ACLs are added FAIL_ON_ERR_THR(knet_handle_enable_access_lists(knet_h[1], 1)); printf("Testing Address blocked - this should NOT get through\n"); FAIL_ON_ERR_THR(knet_send_str(knet_h[2], "0Address blocked - this should NOT get through")); // Unblock and check again FAIL_ON_ERR_THR(wait_for_nodes_state(knet_h[1], TESTNODES, 0, seconds, logfds[0], stdout)); FAIL_ON_ERR_THR(wait_for_nodes_state(knet_h[2], TESTNODES, 0, seconds, logfds[0], stdout)); FAIL_ON_ERR_THR(knet_link_rm_acl(knet_h[1], 2, 0, &ss1, NULL, CHECK_TYPE_ADDRESS, CHECK_REJECT)); FAIL_ON_ERR_THR(wait_for_nodes_state(knet_h[1], TESTNODES, 1, seconds, logfds[0], stdout)); FAIL_ON_ERR_THR(wait_for_nodes_state(knet_h[2], TESTNODES, 1, seconds, logfds[0], stdout)); printf("Testing Address unblocked - this should get through\n"); FAIL_ON_ERR_THR(knet_send_str(knet_h[2], "1Address unblocked - this should get through")); FAIL_ON_ERR_THR(wait_for_reply(seconds)); // Block traffic using a netmask knet_strtoaddr("127.0.0.1","0", &ss1, sizeof(ss1)); knet_strtoaddr("255.0.0.1","0", &ss2, sizeof(ss2)); FAIL_ON_ERR_THR(knet_link_insert_acl(knet_h[1], 2, 0, 0, &ss1, &ss2, CHECK_TYPE_MASK, CHECK_REJECT)); printf("Testing Netmask blocked - this should NOT get through\n"); FAIL_ON_ERR_THR(knet_send_str(knet_h[2], "0Netmask blocked - this should NOT get through")); // Unblock and check again FAIL_ON_ERR_THR(wait_for_nodes_state(knet_h[1], TESTNODES, 0, seconds, logfds[0], stdout)); FAIL_ON_ERR_THR(wait_for_nodes_state(knet_h[2], TESTNODES, 0, seconds, logfds[0], stdout)); FAIL_ON_ERR_THR(knet_link_rm_acl(knet_h[1], 2, 0, &ss1, &ss2, CHECK_TYPE_MASK, CHECK_REJECT)); FAIL_ON_ERR_THR(wait_for_nodes_state(knet_h[1], TESTNODES, 1, seconds, logfds[0], stdout)); FAIL_ON_ERR_THR(wait_for_nodes_state(knet_h[2], TESTNODES, 1, seconds, logfds[0], stdout)); printf("Testing Netmask unblocked - this should get through\n"); FAIL_ON_ERR_THR(knet_send_str(knet_h[2], "1Netmask unblocked - this should get through")); FAIL_ON_ERR_THR(wait_for_reply(seconds)); // Block traffic from a range knet_strtoaddr("127.0.0.0", "0", &ss1, sizeof(ss1)); knet_strtoaddr("127.0.0.9", "0", &ss2, sizeof(ss2)); FAIL_ON_ERR_THR(knet_link_insert_acl(knet_h[1], 2, 0, 0, &ss1, &ss2, CHECK_TYPE_RANGE, CHECK_REJECT)); printf("Testing Range blocked - this should NOT get through\n"); FAIL_ON_ERR_THR(knet_send_str(knet_h[2], "0Range blocked - this should NOT get through")); // Unblock and check again FAIL_ON_ERR_THR(wait_for_nodes_state(knet_h[1], TESTNODES, 0, seconds, logfds[0], stdout)); FAIL_ON_ERR_THR(wait_for_nodes_state(knet_h[2], TESTNODES, 0, seconds, logfds[0], stdout)); FAIL_ON_ERR_THR(knet_link_rm_acl(knet_h[1], 2, 0, &ss1, &ss2, CHECK_TYPE_RANGE, CHECK_REJECT)); FAIL_ON_ERR_THR(wait_for_nodes_state(knet_h[1], TESTNODES, 1, seconds, logfds[0], stdout)); FAIL_ON_ERR_THR(wait_for_nodes_state(knet_h[2], TESTNODES, 1, seconds, logfds[0], stdout)); printf("Testing Range unblocked - this should get through\n"); FAIL_ON_ERR_THR(knet_send_str(knet_h[2], "1Range unblocked - this should get through")); FAIL_ON_ERR_THR(wait_for_reply(seconds)); // Finish up - disable ACLS to make sure the QUIT message gets through FAIL_ON_ERR_THR(knet_handle_enable_access_lists(knet_h[1], 0)); FAIL_ON_ERR_THR(wait_for_nodes_state(knet_h[1], TESTNODES, 1, seconds, logfds[0], stdout)); FAIL_ON_ERR_THR(wait_for_nodes_state(knet_h[2], TESTNODES, 1, seconds, logfds[0], stdout)); FAIL_ON_ERR_THR(knet_send_str(knet_h[2], "QUIT")); // Check return from the receiving thread pthread_join(recv_thread, (void**)&thread_err); if (*thread_err) { printf("Thread returned %d\n", *thread_err); clean_exit(knet_h, TESTNODES, logfds, FAIL); } if (msgs_recvd != CORRECT_NUM_MSGS) { printf("*** FAIL Recv thread got %d messages, expected %d\n", msgs_recvd, CORRECT_NUM_MSGS); clean_exit(knet_h, TESTNODES, logfds, FAIL); } clean_exit(knet_h, TESTNODES, logfds, PASS); } int main(int argc, char *argv[]) { printf("Testing with UDP\n"); test(KNET_TRANSPORT_UDP); #ifdef HAVE_NETINET_SCTP_H printf("Testing with SCTP currently disabled\n"); //test(KNET_TRANSPORT_SCTP); #endif return PASS; } diff --git a/libknet/tests/knet_bench.c b/libknet/tests/knet_bench.c index 9aaa5fdc..cae3952d 100644 --- a/libknet/tests/knet_bench.c +++ b/libknet/tests/knet_bench.c @@ -1,1382 +1,1383 @@ /* * Copyright (C) 2016-2025 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * * This software licensed under GPL-2.0+ */ #include "config.h" #include #include #include #include #include #include #include #include #include #include "libknet.h" #include "compat.h" #include "internals.h" #include "netutils.h" #include "transport_common.h" #include "threads_common.h" #include "test-common.h" #define MAX_NODES 128 static int senderid = -1; static int thisnodeid = -1; static knet_handle_t knet_h; static int datafd = 0; static int8_t channel = 0; static int globallistener = 0; static int continous = 0; static int show_stats = 0; static struct sockaddr_storage allv4; static struct sockaddr_storage allv6; static int broadcast_test = 1; static pthread_t rx_thread = {0}; static char *rx_buf[PCKT_FRAG_MAX]; static int wait_for_perf_rx = 0; static char *compresscfg = NULL; static char *cryptocfg = NULL; static int machine_output = 0; static int use_access_lists = 0; static int use_pckt_verification = 0; static int bench_shutdown_in_progress = 0; static pthread_mutex_t shutdown_mutex = PTHREAD_MUTEX_INITIALIZER; #define TEST_PING 0 #define TEST_PING_AND_DATA 1 #define TEST_PERF_BY_SIZE 2 #define TEST_PERF_BY_TIME 3 static int test_type = TEST_PING; #define TEST_START 2 #define TEST_STOP 4 #define TEST_COMPLETE 6 #define ONE_GIGABYTE 1073741824 static uint64_t perf_by_size_size = 1 * ONE_GIGABYTE; static uint64_t perf_by_time_secs = 10; static uint32_t force_packet_size = 0; struct node { int nodeid; int links; uint8_t transport[KNET_MAX_LINK]; struct sockaddr_storage address[KNET_MAX_LINK]; }; struct pckt_ver { uint32_t len; uint32_t chksum; }; static void print_help(void) { printf("knet_bench usage:\n"); printf(" -h print this help (no really)\n"); printf(" -d enable debug logs (default INFO)\n"); printf(" -f enable use of access lists (default: off)\n"); printf(" -c [implementation]:[crypto]:[hashing] crypto configuration. (default disabled)\n"); printf(" Example: -c nss:aes128:sha1\n"); printf(" -z [implementation]:[level]:[threshold] compress configuration. (default disabled)\n"); printf(" Example: -z zlib:5:100\n"); printf(" -p [active|passive|rr] (default: passive)\n"); printf(" -P [UDP|SCTP] (default: UDP) protocol (transport) to use for all links\n"); printf(" -t [nodeid] This nodeid (required)\n"); printf(" -n [nodeid],[proto]/[link1_ip],[link2_..] Other nodes information (at least one required)\n"); printf(" Example: -n 1,192.168.8.1,SCTP/3ffe::8:1,UDP/172...\n"); printf(" can be repeated up to %d and should contain also the localnode info\n", MAX_NODES); printf(" -b [port] baseport (default: 50000)\n"); printf(" -l enable global listener on 0.0.0.0/:: (default: off, incompatible with -o)\n"); printf(" -o enable baseport offset per nodeid\n"); printf(" -m change PMTUd interval in seconds (default: 60)\n"); printf(" -w dont wait for all nodes to be up before starting the test (default: wait)\n"); printf(" -T [ping|ping_data|perf-by-size|perf-by-time]\n"); printf(" test type (default: ping)\n"); printf(" ping: will wait for all hosts to join the knet network, sleep 5 seconds and quit\n"); printf(" ping_data: will wait for all hosts to join the knet network, sends some data to all nodes and quit\n"); printf(" perf-by-size: will wait for all hosts to join the knet network,\n"); printf(" perform a series of benchmarks by transmitting a known\n"); printf(" size/quantity of packets and measuring the time, then quit\n"); printf(" perf-by-time: will wait for all hosts to join the knet network,\n"); printf(" perform a series of benchmarks by transmitting a known\n"); printf(" size of packets for a given amount of time (10 seconds)\n"); printf(" and measuring the quantity of data transmitted, then quit\n"); printf(" -s nodeid that will generate traffic for benchmarks\n"); printf(" -S [size|seconds] when used in combination with -T perf-by-size it indicates how many GB of traffic to generate for the test. (default: 1GB)\n"); printf(" when used in combination with -T perf-by-time it indicates how many Seconds of traffic to generate for the test. (default: 10 seconds)\n"); printf(" -x force packet size for perf-by-time or perf-by-size\n"); printf(" -C repeat the test continously (default: off)\n"); printf(" -X[XX] show stats at the end of the run (default: 1)\n"); printf(" 1: show handle stats, 2: show summary link stats\n"); printf(" 3: show detailed link stats\n"); printf(" -a enable machine parsable output (default: off).\n"); printf(" -v enable packet verification for performance tests (default: off).\n"); } static void parse_nodes(char *nodesinfo[MAX_NODES], int onidx, int port, struct node nodes[MAX_NODES], int *thisidx) { int i; char *temp = NULL; char port_str[11]; memset(port_str, 0, sizeof(port_str)); snprintf(port_str, sizeof(port_str), "%d", port); for (i = 0; i < onidx; i++) { nodes[i].nodeid = atoi(strtok(nodesinfo[i], ",")); if ((nodes[i].nodeid < 0) || (nodes[i].nodeid > KNET_MAX_HOST)) { printf("Invalid nodeid: %d (0 - %d)\n", nodes[i].nodeid, KNET_MAX_HOST); exit(FAIL); } if (thisnodeid == nodes[i].nodeid) { *thisidx = i; } while((temp = strtok(NULL, ","))) { char *slash = NULL; uint8_t transport; if (nodes[i].links == KNET_MAX_LINK) { printf("Too many links configured. Max %d\n", KNET_MAX_LINK); exit(FAIL); } slash = strstr(temp, "/"); if (slash) { memset(slash, 0, 1); transport = knet_get_transport_id_by_name(temp); if (transport == KNET_MAX_TRANSPORTS) { printf("Unknown transport: %s\n", temp); exit(FAIL); } nodes[i].transport[nodes[i].links] = transport; temp = slash + 1; } else { nodes[i].transport[nodes[i].links] = KNET_TRANSPORT_UDP; } if (knet_strtoaddr(temp, port_str, &nodes[i].address[nodes[i].links], sizeof(struct sockaddr_storage)) < 0) { printf("Unable to convert %s to sockaddress\n", temp); exit(FAIL); } nodes[i].links++; } } if (knet_strtoaddr("0.0.0.0", port_str, &allv4, sizeof(struct sockaddr_storage)) < 0) { printf("Unable to convert 0.0.0.0 to sockaddress\n"); exit(FAIL); } if (knet_strtoaddr("::", port_str, &allv6, sizeof(struct sockaddr_storage)) < 0) { printf("Unable to convert :: to sockaddress\n"); exit(FAIL); } for (i = 1; i < onidx; i++) { if (nodes[0].links != nodes[i].links) { printf("knet_bench does not support unbalanced link configuration\n"); exit(FAIL); } } return; } static int private_data; static void sock_notify(void *pvt_data, int local_datafd, int8_t local_channel, uint8_t tx_rx, int error, int errorno) { printf("[info]: error (%d - %d - %s) from socket: %d\n", error, errorno, strerror(errno), local_datafd); return; } static int ping_dst_host_filter(void *pvt_data, const unsigned char *outdata, ssize_t outdata_len, uint8_t tx_rx, knet_node_id_t this_host_id, knet_node_id_t src_host_id, int8_t *dst_channel, knet_node_id_t *dst_host_ids, size_t *dst_host_ids_entries) { if (broadcast_test) { return 1; } if (tx_rx == KNET_NOTIFY_TX) { memmove(&dst_host_ids[0], outdata, 2); } else { dst_host_ids[0] = this_host_id; } *dst_host_ids_entries = 1; return 0; } static void setup_knet(int argc, char *argv[]) { int logfd = 0; int rv; char *policystr = NULL, *protostr = NULL; char *othernodeinfo[MAX_NODES]; struct node nodes[MAX_NODES]; int thisidx = -1; int onidx = 0; int debug = KNET_LOG_INFO; int port = 50000, portoffset = 0; int thisport = 0, otherport = 0; int thisnewport = 0, othernewport = 0; struct sockaddr_in *so_in; struct sockaddr_in6 *so_in6; struct sockaddr_storage *src; int i, link_idx, allnodesup = 0; int policy = KNET_LINK_POLICY_PASSIVE, policyfound = 0; int protocol = KNET_TRANSPORT_UDP, protofound = 0; int wait = 1; int pmtud_interval = 60; struct knet_handle_crypto_cfg knet_handle_crypto_cfg; char *cryptomodel = NULL, *cryptotype = NULL, *cryptohash = NULL; struct knet_handle_compress_cfg knet_handle_compress_cfg; memset(nodes, 0, sizeof(nodes)); while ((rv = getopt(argc, argv, "aCT:S:s:lvdfom:wb:t:n:c:p:x:X::P:z:h")) != EOF) { switch(rv) { case 'h': print_help(); exit(PASS); break; case 'a': machine_output = 1; break; case 'd': debug = KNET_LOG_DEBUG; break; case 'f': use_access_lists = 1; break; case 'c': if (cryptocfg) { printf("Error: -c can only be specified once\n"); exit(FAIL); } cryptocfg = optarg; break; case 'p': if (policystr) { printf("Error: -p can only be specified once\n"); exit(FAIL); } if (optarg) { policystr = optarg; if (!strcmp(policystr, "active")) { policy = KNET_LINK_POLICY_ACTIVE; policyfound = 1; } /* * we can't use rr because clangs can't compile * an array of 3 strings, one of which is 2 bytes long */ if (!strcmp(policystr, "round-robin")) { policy = KNET_LINK_POLICY_RR; policyfound = 1; } if (!strcmp(policystr, "passive")) { policy = KNET_LINK_POLICY_PASSIVE; policyfound = 1; } } if (!policyfound) { printf("Error: invalid policy %s specified. -p accepts active|passive|rr\n", policystr); exit(FAIL); } break; case 'P': if (protostr) { printf("Error: -P can only be specified once\n"); exit(FAIL); } if (optarg) { protostr = optarg; if (!strcmp(protostr, "UDP")) { protocol = KNET_TRANSPORT_UDP; protofound = 1; } if (!strcmp(protostr, "SCTP")) { protocol = KNET_TRANSPORT_SCTP; protofound = 1; } } if (!protofound) { printf("Error: invalid protocol %s specified. -P accepts udp|sctp\n", policystr); exit(FAIL); } break; case 't': if (thisnodeid >= 0) { printf("Error: -t can only be specified once\n"); exit(FAIL); } thisnodeid = atoi(optarg); if ((thisnodeid < 0) || (thisnodeid > 65536)) { printf("Error: -t nodeid out of range %d (1 - 65536)\n", thisnodeid); exit(FAIL); } break; case 'n': if (onidx == MAX_NODES) { printf("Error: too many other nodes. Max %d\n", MAX_NODES); exit(FAIL); } othernodeinfo[onidx] = optarg; onidx++; break; case 'b': port = atoi(optarg); if ((port < 1) || (port > 65536)) { printf("Error: port %d out of range (1 - 65536)\n", port); exit(FAIL); } break; case 'o': if (globallistener) { printf("Error: -l cannot be used with -o\n"); exit(FAIL); } portoffset = 1; break; case 'm': pmtud_interval = atoi(optarg); if (pmtud_interval < 1) { printf("Error: pmtud interval %d out of range (> 0)\n", pmtud_interval); exit(FAIL); } break; case 'l': if (portoffset) { printf("Error: -o cannot be used with -l\n"); exit(FAIL); } globallistener = 1; break; case 'w': wait = 0; break; case 's': if (senderid >= 0) { printf("Error: -s can only be specified once\n"); exit(FAIL); } senderid = atoi(optarg); if ((senderid < 0) || (senderid > 65536)) { printf("Error: -s nodeid out of range %d (1 - 65536)\n", senderid); exit(FAIL); } break; case 'T': if (optarg) { if (!strcmp("ping", optarg)) { test_type = TEST_PING; } if (!strcmp("ping_data", optarg)) { test_type = TEST_PING_AND_DATA; } if (!strcmp("perf-by-size", optarg)) { test_type = TEST_PERF_BY_SIZE; } if (!strcmp("perf-by-time", optarg)) { test_type = TEST_PERF_BY_TIME; } } else { printf("Error: -T requires an option\n"); exit(FAIL); } break; case 'S': perf_by_size_size = (uint64_t)atoi(optarg) * ONE_GIGABYTE; perf_by_time_secs = (uint64_t)atoi(optarg); break; case 'x': force_packet_size = (uint32_t)atoi(optarg); if ((force_packet_size < 64) || (force_packet_size > 65536)) { printf("Unsupported packet size %u (accepted 64 - 65536)\n", force_packet_size); exit(FAIL); } break; case 'v': use_pckt_verification = 1; break; case 'C': continous = 1; break; case 'X': if (optarg) { show_stats = atoi(optarg); } else { show_stats = 1; } break; case 'z': if (compresscfg) { printf("Error: -c can only be specified once\n"); exit(FAIL); } compresscfg = optarg; break; default: break; } } if (thisnodeid < 0) { printf("Who am I?!? missing -t from command line?\n"); exit(FAIL); } if (onidx < 1) { printf("no other nodes configured?!? missing -n from command line\n"); exit(FAIL); } parse_nodes(othernodeinfo, onidx, port, nodes, &thisidx); if (thisidx < 0) { printf("no config for this node found\n"); exit(FAIL); } if (senderid >= 0) { for (i=0; i < onidx; i++) { if (senderid == nodes[i].nodeid) { break; } } if (i == onidx) { printf("Unable to find senderid in nodelist\n"); exit(FAIL); } } if (((test_type == TEST_PERF_BY_SIZE) || (test_type == TEST_PERF_BY_TIME)) && (senderid < 0)) { printf("Error: performance test requires -s to be set (for now)\n"); exit(FAIL); } logfd = start_logging(stdout); knet_h = knet_handle_new(thisnodeid, logfd, debug); if (!knet_h) { printf("Unable to knet_handle_new: %s\n", strerror(errno)); exit(FAIL); } if (knet_handle_enable_access_lists(knet_h, use_access_lists) < 0) { printf("Unable to knet_handle_enable_access_lists: %s\n", strerror(errno)); exit(FAIL); } if (cryptocfg) { memset(&knet_handle_crypto_cfg, 0, sizeof(knet_handle_crypto_cfg)); cryptomodel = strtok(cryptocfg, ":"); cryptotype = strtok(NULL, ":"); cryptohash = strtok(NULL, ":"); if (cryptomodel) { strncpy(knet_handle_crypto_cfg.crypto_model, cryptomodel, sizeof(knet_handle_crypto_cfg.crypto_model) - 1); } if (cryptotype) { strncpy(knet_handle_crypto_cfg.crypto_cipher_type, cryptotype, sizeof(knet_handle_crypto_cfg.crypto_cipher_type) - 1); } if (cryptohash) { strncpy(knet_handle_crypto_cfg.crypto_hash_type, cryptohash, sizeof(knet_handle_crypto_cfg.crypto_hash_type) - 1); } knet_handle_crypto_cfg.private_key_len = KNET_MAX_KEY_LEN; if (knet_handle_crypto(knet_h, &knet_handle_crypto_cfg)) { printf("Unable to init crypto\n"); exit(FAIL); } } if (compresscfg) { memset(&knet_handle_compress_cfg, 0, sizeof(struct knet_handle_compress_cfg)); snprintf(knet_handle_compress_cfg.compress_model, 16, "%s", strtok(compresscfg, ":")); knet_handle_compress_cfg.compress_level = atoi(strtok(NULL, ":")); knet_handle_compress_cfg.compress_threshold = atoi(strtok(NULL, ":")); if (knet_handle_compress(knet_h, &knet_handle_compress_cfg)) { printf("Unable to configure compress\n"); exit(FAIL); } } if (knet_handle_enable_sock_notify(knet_h, &private_data, sock_notify) < 0) { printf("knet_handle_enable_sock_notify failed: %s\n", strerror(errno)); knet_handle_free(knet_h); exit(FAIL); } datafd = 0; channel = -1; if (knet_handle_add_datafd(knet_h, &datafd, &channel) < 0) { printf("knet_handle_add_datafd failed: %s\n", strerror(errno)); knet_handle_free(knet_h); exit(FAIL); } if (knet_handle_pmtud_setfreq(knet_h, pmtud_interval) < 0) { printf("knet_handle_pmtud_setfreq failed: %s\n", strerror(errno)); knet_handle_free(knet_h); exit(FAIL); } for (i=0; i < onidx; i++) { if (i == thisidx) { continue; } if (knet_host_add(knet_h, nodes[i].nodeid) < 0) { printf("knet_host_add failed: %s\n", strerror(errno)); exit(FAIL); } if (knet_host_set_policy(knet_h, nodes[i].nodeid, policy) < 0) { printf("knet_host_set_policy failed: %s\n", strerror(errno)); exit(FAIL); } for (link_idx = 0; link_idx < nodes[i].links; link_idx++) { if (portoffset) { if (nodes[thisidx].address[link_idx].ss_family == AF_INET) { so_in = (struct sockaddr_in *)&nodes[thisidx].address[link_idx]; thisport = ntohs(so_in->sin_port); thisnewport = thisport + nodes[i].nodeid; so_in->sin_port = (htons(thisnewport)); so_in = (struct sockaddr_in *)&nodes[i].address[link_idx]; otherport = ntohs(so_in->sin_port); othernewport = otherport + nodes[thisidx].nodeid; so_in->sin_port = (htons(othernewport)); } else { so_in6 = (struct sockaddr_in6 *)&nodes[thisidx].address[link_idx]; thisport = ntohs(so_in6->sin6_port); thisnewport = thisport + nodes[i].nodeid; so_in6->sin6_port = (htons(thisnewport)); so_in6 = (struct sockaddr_in6 *)&nodes[i].address[link_idx]; otherport = ntohs(so_in6->sin6_port); othernewport = otherport + nodes[thisidx].nodeid; so_in6->sin6_port = (htons(othernewport)); } } if (!globallistener) { src = &nodes[thisidx].address[link_idx]; } else { if (nodes[thisidx].address[link_idx].ss_family == AF_INET) { src = &allv4; } else { src = &allv6; } } /* * -P overrides per link protocol configuration */ if (protofound) { nodes[i].transport[link_idx] = protocol; } if (knet_link_set_config(knet_h, nodes[i].nodeid, link_idx, nodes[i].transport[link_idx], src, &nodes[i].address[link_idx], 0) < 0) { printf("Unable to configure link: %s\n", strerror(errno)); exit(FAIL); } if (portoffset) { if (nodes[thisidx].address[link_idx].ss_family == AF_INET) { so_in = (struct sockaddr_in *)&nodes[thisidx].address[link_idx]; so_in->sin_port = (htons(thisport)); so_in = (struct sockaddr_in *)&nodes[i].address[link_idx]; so_in->sin_port = (htons(otherport)); } else { so_in6 = (struct sockaddr_in6 *)&nodes[thisidx].address[link_idx]; so_in6->sin6_port = (htons(thisport)); so_in6 = (struct sockaddr_in6 *)&nodes[i].address[link_idx]; so_in6->sin6_port = (htons(otherport)); } } if (knet_link_set_enable(knet_h, nodes[i].nodeid, link_idx, 1) < 0) { printf("knet_link_set_enable failed: %s\n", strerror(errno)); exit(FAIL); } if (knet_link_set_ping_timers(knet_h, nodes[i].nodeid, link_idx, 1000, 10000, 2048) < 0) { printf("knet_link_set_ping_timers failed: %s\n", strerror(errno)); exit(FAIL); } if (knet_link_set_pong_count(knet_h, nodes[i].nodeid, link_idx, 2) < 0) { printf("knet_link_set_pong_count failed: %s\n", strerror(errno)); exit(FAIL); } } } if (knet_handle_enable_filter(knet_h, NULL, ping_dst_host_filter)) { printf("Unable to enable dst_host_filter: %s\n", strerror(errno)); exit(FAIL); } if (knet_handle_setfwd(knet_h, 1) < 0) { printf("knet_handle_setfwd failed: %s\n", strerror(errno)); exit(FAIL); } if (wait) { while(!allnodesup) { allnodesup = 1; for (i=0; i < onidx; i++) { if (i == thisidx) { continue; } if (knet_h->host_index[nodes[i].nodeid]->status.reachable != 1) { printf("[info]: waiting host %d to be reachable\n", nodes[i].nodeid); allnodesup = 0; } } if (!allnodesup) { sleep(1); } } sleep(1); } } /* * calculate weak chksum (stole from corosync for debugging purposes) */ static uint32_t compute_chsum(const unsigned char *data, uint32_t data_len) { unsigned int i; unsigned int checksum = 0; for (i = 0; i < data_len; i++) { if (checksum & 1) { checksum |= 0x10000; } checksum = ((checksum >> 1) + (unsigned char)data[i]) & 0xffff; } return (checksum); } static void *_rx_thread(void *args) { int rx_epoll; struct epoll_event ev; struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; struct sockaddr_storage address[PCKT_FRAG_MAX]; struct knet_mmsghdr msg[PCKT_FRAG_MAX]; struct iovec iov_in[PCKT_FRAG_MAX]; int i, msg_recv; struct timespec clock_start, clock_end; unsigned long long time_diff = 0; uint64_t rx_pkts = 0; uint64_t rx_bytes = 0; unsigned int current_pckt_size = 0; for (i = 0; (unsigned int)i < PCKT_FRAG_MAX; i++) { rx_buf[i] = malloc(KNET_MAX_PACKET_SIZE); if (!rx_buf[i]) { printf("RXT: Unable to malloc!\nHALTING RX THREAD!\n"); return NULL; } memset(rx_buf[i], 0, KNET_MAX_PACKET_SIZE); iov_in[i].iov_base = (void *)rx_buf[i]; iov_in[i].iov_len = KNET_MAX_PACKET_SIZE; memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr)); msg[i].msg_hdr.msg_name = &address[i]; msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); msg[i].msg_hdr.msg_iov = &iov_in[i]; msg[i].msg_hdr.msg_iovlen = 1; } rx_epoll = epoll_create(KNET_EPOLL_MAX_EVENTS + 1); if (rx_epoll < 0) { printf("RXT: Unable to create epoll!\nHALTING RX THREAD!\n"); return NULL; } memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = datafd; if (epoll_ctl(rx_epoll, EPOLL_CTL_ADD, datafd, &ev)) { printf("RXT: Unable to add datafd to epoll\nHALTING RX THREAD!\n"); return NULL; } memset(&clock_start, 0, sizeof(clock_start)); memset(&clock_end, 0, sizeof(clock_start)); + // coverity[MISSING_LOCK:SUPPRESS] - It's a test, get over it. while (!bench_shutdown_in_progress) { if (epoll_wait(rx_epoll, events, KNET_EPOLL_MAX_EVENTS, 1) >= 1) { msg_recv = _recvmmsg(datafd, &msg[0], PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL); if (msg_recv < 0) { printf("[info]: RXT: error from recvmmsg: %s\n", strerror(errno)); } switch(test_type) { case TEST_PING_AND_DATA: for (i = 0; i < msg_recv; i++) { if (msg[i].msg_len == 0) { printf("[info]: RXT: received 0 bytes message?\n"); } printf("[info]: received %u bytes message: %s\n", msg[i].msg_len, (char *)msg[i].msg_hdr.msg_iov->iov_base); } break; case TEST_PERF_BY_TIME: case TEST_PERF_BY_SIZE: for (i = 0; i < msg_recv; i++) { if (msg[i].msg_len < 64) { if (msg[i].msg_len == 0) { printf("[info]: RXT: received 0 bytes message?\n"); } if (msg[i].msg_len == TEST_START) { if (clock_gettime(CLOCK_MONOTONIC, &clock_start) != 0) { printf("[info]: unable to get start time!\n"); } } if (msg[i].msg_len == TEST_STOP) { double average_rx_mbytes; double average_rx_pkts; double time_diff_sec; if (clock_gettime(CLOCK_MONOTONIC, &clock_end) != 0) { printf("[info]: unable to get end time!\n"); } timespec_diff(clock_start, clock_end, &time_diff); /* * adjust for sleep(2) between sending the last data and TEST_STOP */ time_diff = time_diff - 2000000000llu; /* * convert to seconds */ time_diff_sec = (double)time_diff / 1000000000llu; average_rx_mbytes = (double)((rx_bytes / time_diff_sec) / (1024 * 1024)); average_rx_pkts = (double)(rx_pkts / time_diff_sec); if (!machine_output) { printf("[perf] execution time: %8.4f secs Average speed: %8.4f MB/sec %8.4f pckts/sec (size: %u total: %" PRIu64 ")\n", time_diff_sec, average_rx_mbytes, average_rx_pkts, current_pckt_size, rx_pkts); } else { printf("[perf],%.4f,%u,%" PRIu64 ",%.4f,%.4f\n", time_diff_sec, current_pckt_size, rx_pkts, average_rx_mbytes, average_rx_pkts); } rx_pkts = 0; rx_bytes = 0; current_pckt_size = 0; } if (msg[i].msg_len == TEST_COMPLETE) { wait_for_perf_rx = 1; } continue; } if (use_pckt_verification) { struct pckt_ver *recv_pckt = (struct pckt_ver *)msg[i].msg_hdr.msg_iov->iov_base; uint32_t chksum; if (msg[i].msg_len != recv_pckt->len) { printf("Wrong packet len received: %u expected: %u!\n", msg[i].msg_len, recv_pckt->len); exit(FAIL); } chksum = compute_chsum((const unsigned char *)msg[i].msg_hdr.msg_iov->iov_base + sizeof(struct pckt_ver), msg[i].msg_len - sizeof(struct pckt_ver)); if (recv_pckt->chksum != chksum){ printf("Wrong packet checksum received: %u expected: %u!\n", recv_pckt->chksum, chksum); exit(FAIL); } } rx_pkts++; rx_bytes = rx_bytes + msg[i].msg_len; current_pckt_size = msg[i].msg_len; } break; } } } epoll_ctl(rx_epoll, EPOLL_CTL_DEL, datafd, &ev); close(rx_epoll); return NULL; } static void setup_data_txrx_common(void) { if (!rx_thread) { if (knet_handle_enable_filter(knet_h, NULL, ping_dst_host_filter)) { printf("Unable to enable dst_host_filter: %s\n", strerror(errno)); exit(FAIL); } printf("[info]: setting up rx thread\n"); if (pthread_create(&rx_thread, 0, _rx_thread, NULL)) { printf("Unable to start rx thread\n"); exit(FAIL); } } } static void stop_rx_thread(void) { void *retval; unsigned int i; if (rx_thread) { printf("[info]: shutting down rx thread\n"); sleep(2); pthread_cancel(rx_thread); pthread_join(rx_thread, &retval); for (i = 0; i < PCKT_FRAG_MAX; i ++) { free(rx_buf[i]); } } } static void send_ping_data(void) { char buf[65535]; ssize_t len; memset(&buf, 0, sizeof(buf)); snprintf(buf, sizeof(buf), "Hello world!"); if (compresscfg) { len = sizeof(buf); } else { len = strlen(buf); } if (knet_send(knet_h, buf, len, channel) != len) { printf("[info]: Error sending hello world: %s\n", strerror(errno)); } sleep(1); } static int send_messages(struct knet_mmsghdr *msg, int msgs_to_send) { int sent_msgs, prev_sent, progress, total_sent; total_sent = 0; sent_msgs = 0; prev_sent = 0; progress = 1; retry: errno = 0; sent_msgs = _sendmmsg(datafd, 0, &msg[0], msgs_to_send, MSG_NOSIGNAL); if (sent_msgs < 0) { if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { usleep(KNET_THREADS_TIMERES / 16); goto retry; } printf("[info]: Unable to send messages: %s\n", strerror(errno)); return -1; } total_sent = total_sent + sent_msgs; if ((sent_msgs >= 0) && (sent_msgs < msgs_to_send)) { if ((sent_msgs) || (progress)) { msgs_to_send = msgs_to_send - sent_msgs; prev_sent = prev_sent + sent_msgs; if (sent_msgs) { progress = 1; } else { progress = 0; } goto retry; } if (!progress) { printf("[info]: Unable to send more messages after retry\n"); } } return total_sent; } static int setup_send_buffers_common(struct knet_mmsghdr *msg, struct iovec *iov_out, char *tx_buf[]) { unsigned int i; for (i = 0; i < PCKT_FRAG_MAX; i++) { tx_buf[i] = malloc(KNET_MAX_PACKET_SIZE); if (!tx_buf[i]) { printf("TXT: Unable to malloc!\n"); return -1; } memset(tx_buf[i], i, KNET_MAX_PACKET_SIZE); iov_out[i].iov_base = (void *)tx_buf[i]; memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr)); msg[i].msg_hdr.msg_iov = &iov_out[i]; msg[i].msg_hdr.msg_iovlen = 1; } return 0; } static void send_perf_data_by_size(void) { char *tx_buf[PCKT_FRAG_MAX]; struct knet_mmsghdr msg[PCKT_FRAG_MAX]; struct iovec iov_out[PCKT_FRAG_MAX]; char ctrl_message[16]; int sent_msgs; unsigned int i; uint64_t total_pkts_to_tx; uint64_t packets_to_send; uint32_t packetsize = 64; setup_send_buffers_common(msg, iov_out, tx_buf); while (packetsize <= KNET_MAX_PACKET_SIZE) { if (force_packet_size) { packetsize = force_packet_size; } for (i = 0; i < PCKT_FRAG_MAX; i++) { iov_out[i].iov_len = packetsize; if (use_pckt_verification) { struct pckt_ver *tx_pckt = (struct pckt_ver *)&iov_out[i].iov_base; tx_pckt->len = iov_out[i].iov_len; tx_pckt->chksum = compute_chsum((const unsigned char *)iov_out[i].iov_base + sizeof(struct pckt_ver), iov_out[i].iov_len - sizeof(struct pckt_ver)); } } total_pkts_to_tx = perf_by_size_size / packetsize; printf("[info]: testing with %u packet size. total bytes to transfer: %" PRIu64 " (%" PRIu64 " packets)\n", packetsize, perf_by_size_size, total_pkts_to_tx); memset(ctrl_message, 0, sizeof(ctrl_message)); knet_send(knet_h, ctrl_message, TEST_START, channel); while (total_pkts_to_tx > 0) { if (total_pkts_to_tx >= PCKT_FRAG_MAX) { packets_to_send = PCKT_FRAG_MAX; } else { packets_to_send = total_pkts_to_tx; } sent_msgs = send_messages(&msg[0], packets_to_send); if (sent_msgs < 0) { printf("Something went wrong, aborting\n"); exit(FAIL); } total_pkts_to_tx = total_pkts_to_tx - sent_msgs; } sleep(2); knet_send(knet_h, ctrl_message, TEST_STOP, channel); if ((packetsize == KNET_MAX_PACKET_SIZE) || (force_packet_size)) { break; } /* * Use a multiplier that can always divide properly a GB * into smaller chunks without worry about boundaries */ packetsize *= 4; if (packetsize > KNET_MAX_PACKET_SIZE) { packetsize = KNET_MAX_PACKET_SIZE; } } knet_send(knet_h, ctrl_message, TEST_COMPLETE, channel); for (i = 0; i < PCKT_FRAG_MAX; i++) { free(tx_buf[i]); } } /* For sorting the node list into order */ static int node_compare(const void *aptr, const void *bptr) { uint16_t a,b; a = *(uint16_t *)aptr; b = *(uint16_t *)bptr; return a > b; } static void display_stats(int level) { struct knet_handle_stats handle_stats; struct knet_link_status link_status; struct knet_link_stats total_link_stats; knet_node_id_t host_list[KNET_MAX_HOST]; uint8_t link_list[KNET_MAX_LINK]; unsigned int i,j; size_t num_hosts, num_links; if (knet_handle_get_stats(knet_h, &handle_stats, sizeof(handle_stats)) < 0) { perror("[info]: failed to get knet handle stats"); return; } if (compresscfg || cryptocfg) { printf("\n"); printf("[stat]: handle stats\n"); printf("[stat]: ------------\n"); if (compresscfg) { printf("[stat]: tx_uncompressed_packets: %" PRIu64 "\n", handle_stats.tx_uncompressed_packets); printf("[stat]: tx_compressed_packets: %" PRIu64 "\n", handle_stats.tx_compressed_packets); printf("[stat]: tx_compressed_original_bytes: %" PRIu64 "\n", handle_stats.tx_compressed_original_bytes); printf("[stat]: tx_compressed_size_bytes: %" PRIu64 "\n", handle_stats.tx_compressed_size_bytes ); printf("[stat]: tx_compress_time_ave: %" PRIu64 "\n", handle_stats.tx_compress_time_ave); printf("[stat]: tx_compress_time_min: %" PRIu64 "\n", handle_stats.tx_compress_time_min); printf("[stat]: tx_compress_time_max: %" PRIu64 "\n", handle_stats.tx_compress_time_max); printf("[stat]: rx_compressed_packets: %" PRIu64 "\n", handle_stats.rx_compressed_packets); printf("[stat]: rx_compressed_original_bytes: %" PRIu64 "\n", handle_stats.rx_compressed_original_bytes); printf("[stat]: rx_compressed_size_bytes: %" PRIu64 "\n", handle_stats.rx_compressed_size_bytes); printf("[stat]: rx_compress_time_ave: %" PRIu64 "\n", handle_stats.rx_compress_time_ave); printf("[stat]: rx_compress_time_min: %" PRIu64 "\n", handle_stats.rx_compress_time_min); printf("[stat]: rx_compress_time_max: %" PRIu64 "\n", handle_stats.rx_compress_time_max); printf("\n"); } if (cryptocfg) { printf("[stat]: tx_crypt_packets: %" PRIu64 "\n", handle_stats.tx_crypt_packets); printf("[stat]: tx_crypt_byte_overhead: %" PRIu64 "\n", handle_stats.tx_crypt_byte_overhead); printf("[stat]: tx_crypt_time_ave: %" PRIu64 "\n", handle_stats.tx_crypt_time_ave); printf("[stat]: tx_crypt_time_min: %" PRIu64 "\n", handle_stats.tx_crypt_time_min); printf("[stat]: tx_crypt_time_max: %" PRIu64 "\n", handle_stats.tx_crypt_time_max); printf("[stat]: rx_crypt_packets: %" PRIu64 "\n", handle_stats.rx_crypt_packets); printf("[stat]: rx_crypt_time_ave: %" PRIu64 "\n", handle_stats.rx_crypt_time_ave); printf("[stat]: rx_crypt_time_min: %" PRIu64 "\n", handle_stats.rx_crypt_time_min); printf("[stat]: rx_crypt_time_max: %" PRIu64 "\n", handle_stats.rx_crypt_time_max); printf("\n"); } } if (level < 2) { return; } memset(&total_link_stats, 0, sizeof(struct knet_link_stats)); if (knet_host_get_host_list(knet_h, host_list, &num_hosts) < 0) { perror("[info]: cannot get host list for stats"); return; } /* Print in host ID order */ qsort(host_list, num_hosts, sizeof(uint16_t), node_compare); for (j=0; j 2) { printf("\n"); printf("[stat]: Node %d Link %d\n", host_list[j], link_list[i]); printf("[stat]: tx_data_packets: %" PRIu64 "\n", link_status.stats.tx_data_packets); printf("[stat]: rx_data_packets: %" PRIu64 "\n", link_status.stats.rx_data_packets); printf("[stat]: tx_data_bytes: %" PRIu64 "\n", link_status.stats.tx_data_bytes); printf("[stat]: rx_data_bytes: %" PRIu64 "\n", link_status.stats.rx_data_bytes); printf("[stat]: rx_ping_packets: %" PRIu64 "\n", link_status.stats.rx_ping_packets); printf("[stat]: tx_ping_packets: %" PRIu64 "\n", link_status.stats.tx_ping_packets); printf("[stat]: rx_ping_bytes: %" PRIu64 "\n", link_status.stats.rx_ping_bytes); printf("[stat]: tx_ping_bytes: %" PRIu64 "\n", link_status.stats.tx_ping_bytes); printf("[stat]: rx_pong_packets: %" PRIu64 "\n", link_status.stats.rx_pong_packets); printf("[stat]: tx_pong_packets: %" PRIu64 "\n", link_status.stats.tx_pong_packets); printf("[stat]: rx_pong_bytes: %" PRIu64 "\n", link_status.stats.rx_pong_bytes); printf("[stat]: tx_pong_bytes: %" PRIu64 "\n", link_status.stats.tx_pong_bytes); printf("[stat]: rx_pmtu_packets: %" PRIu64 "\n", link_status.stats.rx_pmtu_packets); printf("[stat]: tx_pmtu_packets: %" PRIu64 "\n", link_status.stats.tx_pmtu_packets); printf("[stat]: rx_pmtu_bytes: %" PRIu64 "\n", link_status.stats.rx_pmtu_bytes); printf("[stat]: tx_pmtu_bytes: %" PRIu64 "\n", link_status.stats.tx_pmtu_bytes); printf("[stat]: tx_total_packets: %" PRIu64 "\n", link_status.stats.tx_total_packets); printf("[stat]: rx_total_packets: %" PRIu64 "\n", link_status.stats.rx_total_packets); printf("[stat]: tx_total_bytes: %" PRIu64 "\n", link_status.stats.tx_total_bytes); printf("[stat]: rx_total_bytes: %" PRIu64 "\n", link_status.stats.rx_total_bytes); printf("[stat]: tx_total_errors: %" PRIu64 "\n", link_status.stats.tx_total_errors); printf("[stat]: tx_total_retries: %" PRIu64 "\n", link_status.stats.tx_total_retries); printf("[stat]: tx_pmtu_errors: %" PRIu32 "\n", link_status.stats.tx_pmtu_errors); printf("[stat]: tx_pmtu_retries: %" PRIu32 "\n", link_status.stats.tx_pmtu_retries); printf("[stat]: tx_ping_errors: %" PRIu32 "\n", link_status.stats.tx_ping_errors); printf("[stat]: tx_ping_retries: %" PRIu32 "\n", link_status.stats.tx_ping_retries); printf("[stat]: tx_pong_errors: %" PRIu32 "\n", link_status.stats.tx_pong_errors); printf("[stat]: tx_pong_retries: %" PRIu32 "\n", link_status.stats.tx_pong_retries); printf("[stat]: tx_data_errors: %" PRIu32 "\n", link_status.stats.tx_data_errors); printf("[stat]: tx_data_retries: %" PRIu32 "\n", link_status.stats.tx_data_retries); printf("[stat]: latency_min: %" PRIu32 "\n", link_status.stats.latency_min); printf("[stat]: latency_max: %" PRIu32 "\n", link_status.stats.latency_max); printf("[stat]: latency_ave: %" PRIu32 "\n", link_status.stats.latency_ave); printf("[stat]: latency_samples: %" PRIu32 "\n", link_status.stats.latency_samples); printf("[stat]: down_count: %" PRIu32 "\n", link_status.stats.down_count); printf("[stat]: up_count: %" PRIu32 "\n", link_status.stats.up_count); } } } printf("\n"); printf("[stat]: Total link stats\n"); printf("[stat]: ----------------\n"); printf("[stat]: tx_data_packets: %" PRIu64 "\n", total_link_stats.tx_data_packets); printf("[stat]: rx_data_packets: %" PRIu64 "\n", total_link_stats.rx_data_packets); printf("[stat]: tx_data_bytes: %" PRIu64 "\n", total_link_stats.tx_data_bytes); printf("[stat]: rx_data_bytes: %" PRIu64 "\n", total_link_stats.rx_data_bytes); printf("[stat]: rx_ping_packets: %" PRIu64 "\n", total_link_stats.rx_ping_packets); printf("[stat]: tx_ping_packets: %" PRIu64 "\n", total_link_stats.tx_ping_packets); printf("[stat]: rx_ping_bytes: %" PRIu64 "\n", total_link_stats.rx_ping_bytes); printf("[stat]: tx_ping_bytes: %" PRIu64 "\n", total_link_stats.tx_ping_bytes); printf("[stat]: rx_pong_packets: %" PRIu64 "\n", total_link_stats.rx_pong_packets); printf("[stat]: tx_pong_packets: %" PRIu64 "\n", total_link_stats.tx_pong_packets); printf("[stat]: rx_pong_bytes: %" PRIu64 "\n", total_link_stats.rx_pong_bytes); printf("[stat]: tx_pong_bytes: %" PRIu64 "\n", total_link_stats.tx_pong_bytes); printf("[stat]: rx_pmtu_packets: %" PRIu64 "\n", total_link_stats.rx_pmtu_packets); printf("[stat]: tx_pmtu_packets: %" PRIu64 "\n", total_link_stats.tx_pmtu_packets); printf("[stat]: rx_pmtu_bytes: %" PRIu64 "\n", total_link_stats.rx_pmtu_bytes); printf("[stat]: tx_pmtu_bytes: %" PRIu64 "\n", total_link_stats.tx_pmtu_bytes); printf("[stat]: tx_total_packets: %" PRIu64 "\n", total_link_stats.tx_total_packets); printf("[stat]: rx_total_packets: %" PRIu64 "\n", total_link_stats.rx_total_packets); printf("[stat]: tx_total_bytes: %" PRIu64 "\n", total_link_stats.tx_total_bytes); printf("[stat]: rx_total_bytes: %" PRIu64 "\n", total_link_stats.rx_total_bytes); printf("[stat]: tx_total_errors: %" PRIu64 "\n", total_link_stats.tx_total_errors); printf("[stat]: tx_total_retries: %" PRIu64 "\n", total_link_stats.tx_total_retries); printf("[stat]: tx_pmtu_errors: %" PRIu32 "\n", total_link_stats.tx_pmtu_errors); printf("[stat]: tx_pmtu_retries: %" PRIu32 "\n", total_link_stats.tx_pmtu_retries); printf("[stat]: tx_ping_errors: %" PRIu32 "\n", total_link_stats.tx_ping_errors); printf("[stat]: tx_ping_retries: %" PRIu32 "\n", total_link_stats.tx_ping_retries); printf("[stat]: tx_pong_errors: %" PRIu32 "\n", total_link_stats.tx_pong_errors); printf("[stat]: tx_pong_retries: %" PRIu32 "\n", total_link_stats.tx_pong_retries); printf("[stat]: tx_data_errors: %" PRIu32 "\n", total_link_stats.tx_data_errors); printf("[stat]: tx_data_retries: %" PRIu32 "\n", total_link_stats.tx_data_retries); printf("[stat]: down_count: %" PRIu32 "\n", total_link_stats.down_count); printf("[stat]: up_count: %" PRIu32 "\n", total_link_stats.up_count); } static void send_perf_data_by_time(void) { char *tx_buf[PCKT_FRAG_MAX]; struct knet_mmsghdr msg[PCKT_FRAG_MAX]; struct iovec iov_out[PCKT_FRAG_MAX]; char ctrl_message[16]; int sent_msgs; unsigned int i; uint32_t packetsize = 64; struct timespec clock_start, clock_end; unsigned long long time_diff = 0; setup_send_buffers_common(msg, iov_out, tx_buf); memset(&clock_start, 0, sizeof(clock_start)); memset(&clock_end, 0, sizeof(clock_start)); while (packetsize <= KNET_MAX_PACKET_SIZE) { if (force_packet_size) { packetsize = force_packet_size; } for (i = 0; i < PCKT_FRAG_MAX; i++) { iov_out[i].iov_len = packetsize; if (use_pckt_verification) { struct pckt_ver *tx_pckt = (struct pckt_ver *)iov_out[i].iov_base; tx_pckt->len = iov_out[i].iov_len; tx_pckt->chksum = compute_chsum((const unsigned char *)iov_out[i].iov_base + sizeof(struct pckt_ver), iov_out[i].iov_len - sizeof(struct pckt_ver)); } } printf("[info]: testing with %u bytes packet size for %" PRIu64 " seconds.\n", packetsize, perf_by_time_secs); memset(ctrl_message, 0, sizeof(ctrl_message)); knet_send(knet_h, ctrl_message, TEST_START, channel); if (clock_gettime(CLOCK_MONOTONIC, &clock_start) != 0) { printf("[info]: unable to get start time!\n"); } time_diff = 0; while (time_diff < (perf_by_time_secs * 1000000000llu)) { sent_msgs = send_messages(&msg[0], PCKT_FRAG_MAX); if (sent_msgs < 0) { printf("Something went wrong, aborting\n"); exit(FAIL); } if (clock_gettime(CLOCK_MONOTONIC, &clock_end) != 0) { printf("[info]: unable to get end time!\n"); } timespec_diff(clock_start, clock_end, &time_diff); } sleep(2); knet_send(knet_h, ctrl_message, TEST_STOP, channel); if ((packetsize == KNET_MAX_PACKET_SIZE) || (force_packet_size)) { break; } /* * Use a multiplier that can always divide properly a GB * into smaller chunks without worry about boundaries */ packetsize *= 4; if (packetsize > KNET_MAX_PACKET_SIZE) { packetsize = KNET_MAX_PACKET_SIZE; } } knet_send(knet_h, ctrl_message, TEST_COMPLETE, channel); for (i = 0; i < PCKT_FRAG_MAX; i++) { free(tx_buf[i]); } } static void cleanup_all(void) { knet_handle_t knet_h_tmp[2]; if (pthread_mutex_lock(&shutdown_mutex)) { return; } if (bench_shutdown_in_progress) { pthread_mutex_unlock(&shutdown_mutex); return; } bench_shutdown_in_progress = 1; pthread_mutex_unlock(&shutdown_mutex); if (rx_thread) { stop_rx_thread(); } knet_h_tmp[1] = knet_h; knet_handle_stop_everything(knet_h_tmp, 1); } static void sigint_handler(int signum) { printf("[info]: cleaning up... got signal: %d\n", signum); cleanup_all(); exit(PASS); } int main(int argc, char *argv[]) { if (signal(SIGINT, sigint_handler) == SIG_ERR) { printf("Unable to configure SIGINT handler\n"); exit(FAIL); } setup_knet(argc, argv); setup_data_txrx_common(); sleep(5); restart: switch(test_type) { default: case TEST_PING: /* basic ping, no data */ sleep(5); break; case TEST_PING_AND_DATA: send_ping_data(); break; case TEST_PERF_BY_SIZE: if (senderid == thisnodeid) { send_perf_data_by_size(); } else { printf("[info]: waiting for perf rx thread to finish\n"); while(!wait_for_perf_rx) { sleep(1); } } break; case TEST_PERF_BY_TIME: if (senderid == thisnodeid) { send_perf_data_by_time(); } else { printf("[info]: waiting for perf rx thread to finish\n"); while(!wait_for_perf_rx) { sleep(1); } } break; } if (continous) { goto restart; } if (show_stats) { display_stats(show_stats); } cleanup_all(); return PASS; } diff --git a/libknet/tests/test-common.c b/libknet/tests/test-common.c index 3bb53d4f..63559a3c 100644 --- a/libknet/tests/test-common.c +++ b/libknet/tests/test-common.c @@ -1,992 +1,994 @@ /* * Copyright (C) 2016-2025 Red Hat, Inc. All rights reserved. * * Author: Fabio M. Di Nitto * * This software licensed under GPL-2.0+ */ #include "config.h" #include #include #include #include #include #include #include #include #include #include #include #include #include "libknet.h" #include "test-common.h" static pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER; static int log_init = 0; static pthread_mutex_t log_thread_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_t log_thread; static int log_thread_init = 0; static int log_fds[2]; struct log_thread_data { int logfd; FILE *std; }; static struct log_thread_data data; static char plugin_path[PATH_MAX]; static int _read_pipe(int fd, char **file, size_t *length) { char buf[4096]; int n; int done = 0; *file = NULL; *length = 0; memset(buf, 0, sizeof(buf)); while (!done) { n = read(fd, buf, sizeof(buf)); if (n < 0) { if (errno == EINTR) continue; if (*file) free(*file); return n; } if (n == 0 && (!*length)) return 0; if (n == 0) done = 1; if (*file) *file = realloc(*file, (*length) + n + done); else *file = malloc(n + done); if (!*file) return -1; memmove((*file) + (*length), buf, n); *length += (done + n); } /* Null terminator */ (*file)[(*length) - 1] = 0; return 0; } int execute_shell(const char *command, char **error_string) { pid_t pid; int status, err = 0; int fd[2]; size_t size = 0; if ((command == NULL) || (!error_string)) { errno = EINVAL; return FAIL; } *error_string = NULL; err = pipe(fd); if (err) goto out_clean; pid = fork(); if (pid < 0) { err = pid; goto out_clean; } if (pid) { /* parent */ close(fd[1]); err = _read_pipe(fd[0], error_string, &size); if (err) goto out_clean0; waitpid(pid, &status, 0); if (!WIFEXITED(status)) { err = -1; goto out_clean0; } if (WIFEXITED(status) && WEXITSTATUS(status) != 0) { err = WEXITSTATUS(status); goto out_clean0; } goto out_clean0; } else { /* child */ close(0); close(1); close(2); close(fd[0]); dup2(fd[1], 1); dup2(fd[1], 2); close(fd[1]); execlp("/bin/sh", "/bin/sh", "-c", command, NULL); exit(FAIL); } out_clean: close(fd[1]); out_clean0: close(fd[0]); return err; } int is_memcheck(void) { char *val; val = getenv("KNETMEMCHECK"); if (val) { if (!strncmp(val, "yes", 3)) { return 1; } } return 0; } int is_helgrind(void) { char *val; val = getenv("KNETHELGRIND"); if (val) { if (!strncmp(val, "yes", 3)) { return 1; } } return 0; } void set_scheduler(int policy) { struct sched_param sched_param; int err; err = sched_get_priority_max(policy); if (err < 0) { printf("Could not get maximum scheduler priority\n"); exit(FAIL); } sched_param.sched_priority = err; err = sched_setscheduler(0, policy, &sched_param); if (err < 0) { printf("Could not set priority\n"); exit(FAIL); } return; } int setup_logpipes(int *logfds) { if (pipe2(logfds, O_CLOEXEC | O_NONBLOCK) < 0) { printf("Unable to setup logging pipe\n"); exit(FAIL); } + // coverity[ORDER_REVERSAL:SUPPRESS] - it's a test, get over it return PASS; } void close_logpipes(int *logfds) { close(logfds[0]); logfds[0] = 0; close(logfds[1]); logfds[1] = 0; } void flush_logs(int logfd, FILE *std) { struct knet_log_msg msg; int len; while (1) { len = read(logfd, &msg, sizeof(msg)); if (len != sizeof(msg)) { /* * clear errno to avoid incorrect propagation */ errno = 0; return; } msg.msg[sizeof(msg.msg) - 1] = 0; fprintf(std, "[knet]: [%s] %s: %.*s\n", knet_log_get_loglevel_name(msg.msglevel), knet_log_get_subsystem_name(msg.subsystem), KNET_MAX_LOG_MSG_SIZE, msg.msg); } } static void *_logthread(void *args) { while (1) { int num; struct timeval tv = { 60, 0 }; fd_set rfds; FD_ZERO(&rfds); FD_SET(data.logfd, &rfds); num = select(FD_SETSIZE, &rfds, NULL, NULL, &tv); if (num < 0) { fprintf(data.std, "Unable select over logfd!\nHALTING LOGTHREAD!\n"); return NULL; } if (num == 0) { fprintf(data.std, "[knet]: No logs in the last 60 seconds\n"); continue; } if (FD_ISSET(data.logfd, &rfds)) { flush_logs(data.logfd, data.std); } } } int start_logthread(int logfd, FILE *std) { int savederrno = 0; savederrno = pthread_mutex_lock(&log_thread_mutex); if (savederrno) { printf("Unable to get log_thread mutex lock\n"); return -1; } if (!log_thread_init) { data.logfd = logfd; data.std = std; savederrno = pthread_create(&log_thread, 0, _logthread, NULL); if (savederrno) { printf("Unable to start logging thread: %s\n", strerror(savederrno)); pthread_mutex_unlock(&log_thread_mutex); return -1; } log_thread_init = 1; } pthread_mutex_unlock(&log_thread_mutex); return 0; } int stop_logthread(void) { int savederrno = 0; void *retval; savederrno = pthread_mutex_lock(&log_thread_mutex); if (savederrno) { printf("Unable to get log_thread mutex lock\n"); return -1; } if (log_thread_init) { pthread_cancel(log_thread); pthread_join(log_thread, &retval); log_thread_init = 0; } pthread_mutex_unlock(&log_thread_mutex); return 0; } static void stop_logging(void) { stop_logthread(); flush_logs(log_fds[0], stdout); close_logpipes(log_fds); } int start_logging(FILE *std) { int savederrno = 0; savederrno = pthread_mutex_lock(&log_mutex); if (savederrno) { printf("Unable to get log_mutex lock\n"); return -1; } if (!log_init) { setup_logpipes(log_fds); if (atexit(&stop_logging) != 0) { printf("Unable to register atexit handler to stop logging: %s\n", strerror(errno)); exit(FAIL); } if (start_logthread(log_fds[0], std) < 0) { exit(FAIL); } log_init = 1; } pthread_mutex_unlock(&log_mutex); return log_fds[1]; } static int dir_filter(const struct dirent *dname) { if ( (strcmp(dname->d_name + strlen(dname->d_name)-3, ".so") == 0) && ((strncmp(dname->d_name,"crypto", 6) == 0) || (strncmp(dname->d_name,"compress", 8) == 0))) { return 1; } return 0; } /* Make sure the proposed plugin path has at least 1 of each plugin available - just as a sanity check really */ static int contains_plugins(char *path) { struct dirent **namelist; int n,i; size_t j; struct knet_compress_info compress_list[256]; struct knet_crypto_info crypto_list[256]; size_t num_compress, num_crypto; size_t compress_found = 0; size_t crypto_found = 0; if (knet_get_compress_list(compress_list, &num_compress) == -1) { return 0; } if (knet_get_crypto_list(crypto_list, &num_crypto) == -1) { return 0; } + // coverity[UNINIT:SUPPRESS] - it's supposed to be... n = scandir(path, &namelist, dir_filter, alphasort); if (n == -1) { return 0; } /* Look for plugins in the list */ for (i=0; id_name) >= 7 && strncmp(crypto_list[j].name, namelist[i]->d_name+7, strlen(crypto_list[j].name)) == 0) { crypto_found++; } } for (j=0; jd_name) >= 9 && strncmp(compress_list[j].name, namelist[i]->d_name+9, strlen(compress_list[j].name)) == 0) { compress_found++; } } free(namelist[i]); } free(namelist); /* If at least one plugin was found (or none were built) */ if ((crypto_found || num_crypto == 0) && (compress_found || num_compress == 0)) { return 1; } else { return 0; } } /* libtool sets LD_LIBRARY_PATH to the build tree when running test in-tree */ char *find_plugins_path(void) { char *ld_libs_env = getenv("LD_LIBRARY_PATH"); if (ld_libs_env) { char *ld_libs = strdup(ld_libs_env); char *str = strtok(ld_libs, ":"); while (str) { if (contains_plugins(str)) { strncpy(plugin_path, str, sizeof(plugin_path)-1); free(ld_libs); printf("Using plugins from %s\n", plugin_path); return plugin_path; } str = strtok(NULL, ":"); } free(ld_libs); } return NULL; } knet_handle_t knet_handle_start(int logfds[2], uint8_t log_level, knet_handle_t knet_h_array[]) { knet_handle_t knet_h = knet_handle_new_ex(1, logfds[1], log_level, 0); char *plugins_path; if (knet_h) { printf("knet_handle_new at %p\n", knet_h); plugins_path = find_plugins_path(); /* Use plugins from the build tree */ if (plugins_path) { knet_h->plugin_path = plugins_path; } knet_h_array[1] = knet_h; flush_logs(logfds[0], stdout); return knet_h; } else { printf("knet_handle_new failed: %s\n", strerror(errno)); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } } int knet_handle_reconnect_links(knet_handle_t knet_h) { size_t i, j; knet_node_id_t host_ids[KNET_MAX_HOST]; uint8_t link_ids[KNET_MAX_LINK]; size_t host_ids_entries = 0, link_ids_entries = 0; unsigned int enabled; if (!knet_h) { errno = EINVAL; return -1; } if (knet_host_get_host_list(knet_h, host_ids, &host_ids_entries) < 0) { printf("knet_host_get_host_list failed: %s\n", strerror(errno)); return -1; } for (i = 0; i < host_ids_entries; i++) { if (knet_link_get_link_list(knet_h, host_ids[i], link_ids, &link_ids_entries)) { printf("knet_link_get_link_list failed: %s\n", strerror(errno)); return -1; } for (j = 0; j < link_ids_entries; j++) { if (knet_link_get_enable(knet_h, host_ids[i], link_ids[j], &enabled)) { printf("knet_link_get_enable failed: %s\n", strerror(errno)); return -1; } if (!enabled) { if (knet_link_set_enable(knet_h, host_ids[i], j, 1)) { printf("knet_link_set_enable failed: %s\n", strerror(errno)); return -1; } } } } return 0; } static int _make_local_sockaddr(struct sockaddr_storage *lo, int offset, int family) { in_port_t port; char portstr[32]; if (offset < 0) { /* * api_knet_link_set_config needs to access the API directly, but * it does not send any traffic, so it´s safe to ask the kernel * for a random port. */ port = 0; } else { /* Use the pid if we can. but makes sure its in a sensible range */ port = (getpid() + offset) % (65536-1024) + 1024; } sprintf(portstr, "%u", port); memset(lo, 0, sizeof(struct sockaddr_storage)); printf("Using port %u\n", port); if (family == AF_INET6) { return knet_strtoaddr("::1", portstr, lo, sizeof(struct sockaddr_storage)); } return knet_strtoaddr("127.0.0.1", portstr, lo, sizeof(struct sockaddr_storage)); } int make_local_sockaddr(struct sockaddr_storage *lo, int offset) { return _make_local_sockaddr(lo, offset, AF_INET); } int make_local_sockaddr6(struct sockaddr_storage *lo, int offset) { return _make_local_sockaddr(lo, offset, AF_INET6); } int _knet_link_set_config(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, uint8_t transport, uint64_t flags, int family, int dynamic, struct sockaddr_storage *lo) { int err = 0, savederrno = 0; uint32_t port; char portstr[32]; for (port = 1025; port < 65536; port++) { sprintf(portstr, "%u", port); memset(lo, 0, sizeof(struct sockaddr_storage)); if (family == AF_INET6) { err = knet_strtoaddr("::1", portstr, lo, sizeof(struct sockaddr_storage)); } else { err = knet_strtoaddr("127.0.0.1", portstr, lo, sizeof(struct sockaddr_storage)); } if (err < 0) { printf("Unable to convert loopback to sockaddr: %s\n", strerror(errno)); goto out; } errno = 0; if (dynamic) { err = knet_link_set_config(knet_h, host_id, link_id, transport, lo, NULL, flags); } else { err = knet_link_set_config(knet_h, host_id, link_id, transport, lo, lo, flags); } savederrno = errno; if ((err < 0) && (savederrno != EADDRINUSE)) { printf("Unable to configure link: %s\n", strerror(savederrno)); goto out; } if (!err) { printf("Using port %u\n", port); goto out; } } if (err) { printf("No more ports available\n"); } out: errno = savederrno; return err; } void test_sleep(knet_handle_t knet_h, int seconds) { if (is_memcheck() || is_helgrind()) { printf("Test suite is running under valgrind, adjusting sleep timers\n"); seconds = seconds * 16; } sleep(seconds); } int wait_for_packet(knet_handle_t knet_h, int seconds, int datafd, int logfd, FILE *std) { fd_set rfds; struct timeval tv; int err = 0, i = 0; if (is_memcheck() || is_helgrind()) { printf("Test suite is running under valgrind, adjusting wait_for_packet timeout\n"); seconds = seconds * 16; } try_again: FD_ZERO(&rfds); FD_SET(datafd, &rfds); tv.tv_sec = 1; tv.tv_usec = 0; err = select(datafd+1, &rfds, NULL, NULL, &tv); /* * on slow arches the first call to select can return 0. * pick an arbitrary 10 times loop (multiplied by waiting seconds) * before failing. */ if ((!err) && (i < seconds)) { flush_logs(logfd, std); i++; goto try_again; } if ((err > 0) && (FD_ISSET(datafd, &rfds))) { return 0; } errno = ETIMEDOUT; return -1; } /* * functional tests helpers */ void knet_handle_start_nodes(knet_handle_t knet_h[], uint8_t numnodes, int logfds[2], uint8_t log_level) { uint8_t i; char *plugins_path = find_plugins_path(); for (i = 1; i <= numnodes; i++) { knet_h[i] = knet_handle_new_ex(i, logfds[1], log_level, 0); if (!knet_h[i]) { printf("failed to create handle: %s\n", strerror(errno)); break; } else { printf("knet_h[%u] at %p\n", i, knet_h[i]); } /* Use plugins from the build tree */ if (plugins_path) { knet_h[i]->plugin_path = plugins_path; } } if (i < numnodes) { knet_handle_stop_everything(knet_h, i); exit(FAIL); } return; } void knet_handle_join_nodes(knet_handle_t knet_h[], uint8_t numnodes, uint8_t numlinks, int family, uint8_t transport) { uint8_t i, x, j; struct sockaddr_storage src, dst; int offset = 0; int res; for (i = 1; i <= numnodes; i++) { for (j = 1; j <= numnodes; j++) { /* * don´t connect to itself */ if (j == i) { continue; } printf("host %u adding host: %u\n", i, j); if (knet_host_add(knet_h[i], j) < 0) { printf("Unable to add host: %s\n", strerror(errno)); knet_handle_stop_everything(knet_h, numnodes); exit(FAIL); } for (x = 0; x < numlinks; x++) { res = -1; offset = 0; while (i + x + offset++ < 65535 && res != 0) { if (_make_local_sockaddr(&src, i + x + offset, family) < 0) { printf("Unable to convert src to sockaddr: %s\n", strerror(errno)); knet_handle_stop_everything(knet_h, numnodes); exit(FAIL); } if (_make_local_sockaddr(&dst, j + x + offset, family) < 0) { printf("Unable to convert dst to sockaddr: %s\n", strerror(errno)); knet_handle_stop_everything(knet_h, numnodes); exit(FAIL); } res = knet_link_set_config(knet_h[i], j, x, transport, &src, &dst, 0); } printf("joining node %u with node %u via link %u src offset: %u dst offset: %u\n", i, j, x, i+x, j+x); if (knet_link_set_enable(knet_h[i], j, x, 1) < 0) { printf("unable to enable link: %s\n", strerror(errno)); knet_handle_stop_everything(knet_h, numnodes); exit(FAIL); } } } } for (i = 1; i <= numnodes; i++) { wait_for_nodes_state(knet_h[i], numnodes, 1, 600, knet_h[1]->logfd, stdout); } return; } static int target=0; static int state_wait_pipe[2] = {0,0}; static int host_wait_pipe[2] = {0,0}; static int count_nodes(knet_handle_t knet_h) { int nodes = 0; int i; for (i=0; i< KNET_MAX_HOST; i++) { if (knet_h->host_index[i] && knet_h->host_index[i]->status.reachable == 1) { nodes++; } } return nodes; } static void nodes_notify_callback(void *private_data, knet_node_id_t host_id, uint8_t reachable, uint8_t remote, uint8_t external) { knet_handle_t knet_h = (knet_handle_t) private_data; int nodes; int res; nodes = count_nodes(knet_h); if (nodes == target) { res = write(state_wait_pipe[1], ".", 1); if (res != 1) { printf("***FAILed to signal wait_for_nodes_state: %s\n", strerror(errno)); } } } /* Called atexit() */ static void finish_state_pipes() { if (state_wait_pipe[0] != 0) { close(state_wait_pipe[0]); close(state_wait_pipe[1]); state_wait_pipe[0] = 0; } if (host_wait_pipe[0] != 0) { close(host_wait_pipe[0]); close(host_wait_pipe[1]); host_wait_pipe[0] = 0; } } static void host_notify_callback(void *private_data, knet_node_id_t host_id, uint8_t reachable, uint8_t remote, uint8_t external) { knet_handle_t knet_h = (knet_handle_t) private_data; int res; if (knet_h->host_index[host_id]->status.reachable == 1) { res = write(host_wait_pipe[1], ".", 1); if (res != 1) { printf("***FAILed to signal wait_for_host: %s\n", strerror(errno)); } } } static int wait_for_reply(int seconds, int pipefd) { int res; struct pollfd pfds; char tmpbuf[32]; pfds.fd = pipefd; pfds.events = POLLIN | POLLERR | POLLHUP; pfds.revents = 0; res = poll(&pfds, 1, seconds*1000); if (res == 1) { if (pfds.revents & POLLIN) { res = read(pipefd, tmpbuf, sizeof(tmpbuf)); if (res > 0) { return 0; } } else { printf("Error on pipe poll revent = 0x%x\n", pfds.revents); errno = EIO; } } if (res == 0) { errno = ETIMEDOUT; return -1; } return -1; } /* Wait for a cluster of 'numnodes' to come up/go down */ int wait_for_nodes_state(knet_handle_t knet_h, size_t numnodes, uint8_t state, uint32_t timeout, int logfd, FILE *std) { int res, savederrno = 0; if (state_wait_pipe[0] == 0) { res = pipe(state_wait_pipe); if (res == -1) { savederrno = errno; printf("Error creating host reply pipe: %s\n", strerror(errno)); errno = savederrno; return -1; } if (atexit(finish_state_pipes)) { printf("Unable to register atexit handler to close pipes: %s\n", strerror(errno)); exit(FAIL); } } if (state) { target = numnodes-1; /* exclude us */ } else { target = 0; /* Wait for all to go down */ } /* Set this before checking existing status or there's a race condition */ knet_host_enable_status_change_notify(knet_h, (void *)(long)knet_h, nodes_notify_callback); /* Check we haven't already got all the nodes in the correct state */ if (count_nodes(knet_h) == target) { fprintf(stderr, "target already reached\n"); knet_host_enable_status_change_notify(knet_h, (void *)(long)0, NULL); flush_logs(logfd, std); return 0; } res = wait_for_reply(timeout, state_wait_pipe[0]); if (res == -1) { savederrno = errno; printf("Error waiting for nodes status reply: %s\n", strerror(errno)); } knet_host_enable_status_change_notify(knet_h, (void *)(long)0, NULL); flush_logs(logfd, std); errno = savederrno; return res; } /* Wait for a single node to come up */ int wait_for_host(knet_handle_t knet_h, uint16_t host_id, int seconds, int logfd, FILE *std) { int res = 0; int savederrno = 0; if (is_memcheck() || is_helgrind()) { printf("Test suite is running under valgrind, adjusting wait_for_host timeout\n"); seconds = seconds * 16; } if (host_wait_pipe[0] == 0) { res = pipe(host_wait_pipe); if (res == -1) { savederrno = errno; printf("Error creating host reply pipe: %s\n", strerror(errno)); errno = savederrno; return -1; } if (atexit(finish_state_pipes)) { printf("Unable to register atexit handler to close pipes: %s\n", strerror(errno)); exit(FAIL); } } /* Set this before checking existing status or there's a race condition */ knet_host_enable_status_change_notify(knet_h, (void *)(long)knet_h, host_notify_callback); /* Check it's not already reachable */ if (knet_h->host_index[host_id]->status.reachable == 1) { knet_host_enable_status_change_notify(knet_h, (void *)(long)0, NULL); flush_logs(logfd, std); return 0; } res = wait_for_reply(seconds, host_wait_pipe[0]); if (res == -1) { savederrno = errno; printf("Error waiting for host status reply: %s\n", strerror(errno)); } knet_host_enable_status_change_notify(knet_h, (void *)(long)0, NULL); /* Still wait for it to settle */ flush_logs(logfd, std); test_sleep(knet_h, 1); errno = savederrno; return res; } void clean_exit(knet_handle_t *knet_h, int testnodes, int *logfds, int exit_status) { knet_handle_stop_everything(knet_h, testnodes); stop_logthread(); flush_logs(logfds[0], stdout); close_logpipes(logfds); if (exit_status != CONTINUE) { exit(exit_status); } } /* Shutdown all nodes and links attached to an array of knet handles. * Mostly stolen from corosync code (that I wrote, before anyone complains about licences) */ void knet_handle_stop_everything(knet_handle_t knet_h[], uint8_t numnodes) { int res = 0; int h; size_t i,j; static knet_node_id_t nodes[KNET_MAX_HOST]; /* static to save stack */ uint8_t links[KNET_MAX_LINK]; size_t num_nodes; size_t num_links; for (h=1; h * Federico Simoncelli * * This software licensed under LGPL-2.0+ */ #include "config.h" #include #include #include #include "internals.h" #include "logging.h" #include "threads_common.h" int shutdown_in_progress(knet_handle_t knet_h) { int savederrno = 0; int ret; savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_COMMON, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } ret = knet_h->fini_in_progress; pthread_rwlock_unlock(&knet_h->global_rwlock); return ret; } static int _pmtud_reschedule(knet_handle_t knet_h) { + // coverity[MISSING_LOCK:SUPPRESS] - lock is taken before fn call if (knet_h->pmtud_running) { knet_h->pmtud_abort = 1; + // coverity[MISSING_LOCK:SUPPRESS] - lock is taken before fn call if (knet_h->pmtud_waiting) { pthread_cond_signal(&knet_h->pmtud_cond); } } return 0; } static int pmtud_reschedule(knet_handle_t knet_h) { int res; if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock"); return -1; } res = _pmtud_reschedule(knet_h); pthread_mutex_unlock(&knet_h->pmtud_mutex); return res; } int get_global_wrlock(knet_handle_t knet_h) { if (pmtud_reschedule(knet_h) < 0) { log_info(knet_h, KNET_SUB_PMTUD, "Unable to notify PMTUd to reschedule. Expect delays in executing API calls"); } return pthread_rwlock_wrlock(&knet_h->global_rwlock); } static struct pretty_names thread_names[KNET_THREAD_MAX] = { { "TX", KNET_THREAD_TX }, { "RX", KNET_THREAD_RX }, { "HB", KNET_THREAD_HB }, { "PMTUD", KNET_THREAD_PMTUD }, #ifdef HAVE_NETINET_SCTP_H { "SCTP_LISTEN", KNET_THREAD_SCTP_LISTEN }, { "SCTP_CONN", KNET_THREAD_SCTP_CONN }, #endif { "DST_LINK", KNET_THREAD_DST_LINK } }; static struct pretty_names thread_status[] = { { "unregistered", KNET_THREAD_UNREGISTERED }, { "registered", KNET_THREAD_REGISTERED }, { "started", KNET_THREAD_STARTED }, { "stopped", KNET_THREAD_STOPPED } }; static const char *get_thread_status_name(uint8_t status) { unsigned int i; for (i = 0; i < KNET_THREAD_STATUS_MAX; i++) { if (thread_status[i].val == status) { return thread_status[i].name; } } return "unknown"; } static const char *get_thread_name(uint8_t thread_id) { unsigned int i; for (i = 0; i < KNET_THREAD_MAX; i++) { if (thread_names[i].val == thread_id) { return thread_names[i].name; } } return "unknown"; } int get_thread_flush_queue(knet_handle_t knet_h, uint8_t thread_id) { uint8_t flush; if (pthread_mutex_lock(&knet_h->threads_status_mutex) != 0) { log_debug(knet_h, KNET_SUB_HANDLE, "Unable to get mutex lock"); return -1; } flush = knet_h->threads_flush_queue[thread_id]; pthread_mutex_unlock(&knet_h->threads_status_mutex); return flush; } int set_thread_flush_queue(knet_handle_t knet_h, uint8_t thread_id, uint8_t status) { if (pthread_mutex_lock(&knet_h->threads_status_mutex) != 0) { log_debug(knet_h, KNET_SUB_HANDLE, "Unable to get mutex lock"); return -1; } knet_h->threads_flush_queue[thread_id] = status; log_debug(knet_h, KNET_SUB_HANDLE, "Updated flush queue request for thread %s to %u", get_thread_name(thread_id), status); pthread_mutex_unlock(&knet_h->threads_status_mutex); return 0; } int wait_all_threads_flush_queue(knet_handle_t knet_h) { uint8_t i = 0, found = 0; while (!found) { usleep(KNET_THREADS_TIMERES); if (pthread_mutex_lock(&knet_h->threads_status_mutex) != 0) { continue; } found = 1; for (i = 0; i < KNET_THREAD_MAX; i++) { if (knet_h->threads_flush_queue[i] == KNET_THREAD_QUEUE_FLUSHED) { continue; } log_debug(knet_h, KNET_SUB_HANDLE, "Checking thread: %s queue: %u", get_thread_name(i), knet_h->threads_flush_queue[i]); if (knet_h->threads_flush_queue[i] != KNET_THREAD_QUEUE_FLUSHED) { found = 0; } } pthread_mutex_unlock(&knet_h->threads_status_mutex); } return 0; } int set_thread_status(knet_handle_t knet_h, uint8_t thread_id, uint8_t status) { if (pthread_mutex_lock(&knet_h->threads_status_mutex) != 0) { log_debug(knet_h, KNET_SUB_HANDLE, "Unable to get mutex lock"); return -1; } knet_h->threads_status[thread_id] = status; log_debug(knet_h, KNET_SUB_HANDLE, "Updated status for thread %s to %s", get_thread_name(thread_id), get_thread_status_name(status)); pthread_mutex_unlock(&knet_h->threads_status_mutex); return 0; } int wait_all_threads_status(knet_handle_t knet_h, uint8_t status) { uint8_t i = 0, found = 0; while (!found) { usleep(KNET_THREADS_TIMERES); if (pthread_mutex_lock(&knet_h->threads_status_mutex) != 0) { continue; } found = 1; for (i = 0; i < KNET_THREAD_MAX; i++) { if (knet_h->threads_status[i] == KNET_THREAD_UNREGISTERED) { continue; } log_debug(knet_h, KNET_SUB_HANDLE, "Checking thread: %s status: %s req: %s", get_thread_name(i), get_thread_status_name(knet_h->threads_status[i]), get_thread_status_name(status)); if (knet_h->threads_status[i] != status) { found = 0; } } pthread_mutex_unlock(&knet_h->threads_status_mutex); } return 0; } void force_pmtud_run(knet_handle_t knet_h, uint8_t subsystem, uint8_t reset_mtu, uint8_t force_restart) { if (reset_mtu) { log_debug(knet_h, subsystem, "PMTUd has been reset to default"); knet_h->data_mtu = calc_min_mtu(knet_h); if (knet_h->pmtud_notify_fn) { knet_h->pmtud_notify_fn(knet_h->pmtud_notify_fn_private_data, knet_h->data_mtu); } } /* * we can only try to take a lock here. This part of the code * can be invoked by any thread, including PMTUd that is already * holding a lock at that stage. * If PMTUd is holding the lock, most likely it is already running * and we don't need to notify it back. */ if (!pthread_mutex_trylock(&knet_h->pmtud_mutex)) { if (!knet_h->pmtud_running) { if (!knet_h->pmtud_forcerun) { log_debug(knet_h, subsystem, "Notifying PMTUd to rerun"); knet_h->pmtud_forcerun = 1; } } else { if (force_restart) { if (_pmtud_reschedule(knet_h) < 0) { log_info(knet_h, KNET_SUB_PMTUD, "Unable to notify PMTUd to reschedule. A joining node may struggle to connect properly"); } } } pthread_mutex_unlock(&knet_h->pmtud_mutex); } } diff --git a/libknet/threads_heartbeat.c b/libknet/threads_heartbeat.c index fda453af..bfd06baf 100644 --- a/libknet/threads_heartbeat.c +++ b/libknet/threads_heartbeat.c @@ -1,239 +1,241 @@ /* * Copyright (C) 2015-2025 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include "crypto.h" #include "links.h" #include "logging.h" #include "transports.h" #include "threads_common.h" #include "threads_heartbeat.h" static void _link_down(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link) { memset(&dst_link->pmtud_last, 0, sizeof(struct timespec)); dst_link->received_pong = 0; dst_link->status.pong_last.tv_nsec = 0; dst_link->pong_timeout_backoff = KNET_LINK_PONG_TIMEOUT_BACKOFF; if (dst_link->status.connected == 1) { log_info(knet_h, KNET_SUB_LINK, "host: %u link: %u is down", dst_host->host_id, dst_link->link_id); _link_updown(knet_h, dst_host->host_id, dst_link->link_id, dst_link->status.enabled, 0, 1); } } static void _handle_check_each(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link, int timed) { int err = 0, savederrno = 0, stats_err = 0; int len; ssize_t outlen = KNET_HEADER_PING_SIZE; struct timespec clock_now, pong_last; unsigned long long diff_ping; unsigned char *outbuf = (unsigned char *)knet_h->pingbuf; if (dst_link->transport_connected == 0) { _link_down(knet_h, dst_host, dst_link); return; } /* caching last pong to avoid race conditions */ pong_last = dst_link->status.pong_last; if (clock_gettime(CLOCK_MONOTONIC, &clock_now) != 0) { log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to get monotonic clock"); return; } timespec_diff(dst_link->ping_last, clock_now, &diff_ping); + // coverity[MISSING_LOCK:SUPPRESS] - hb_mutex is held by calling fn if ((diff_ping >= (dst_link->ping_interval * 1000llu)) || (!timed)) { memmove(&knet_h->pingbuf->khp_ping_time[0], &clock_now, sizeof(struct timespec)); knet_h->pingbuf->khp_ping_link = dst_link->link_id; if (pthread_mutex_lock(&knet_h->tx_seq_num_mutex)) { log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to get seq mutex lock"); return; } knet_h->pingbuf->khp_ping_seq_num = htons(knet_h->tx_seq_num); pthread_mutex_unlock(&knet_h->tx_seq_num_mutex); knet_h->pingbuf->khp_ping_timed = timed; if (knet_h->crypto_in_use_config) { if (crypto_encrypt_and_sign(knet_h, (const unsigned char *)knet_h->pingbuf, outlen, knet_h->pingbuf_crypt, &outlen) < 0) { log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to crypto ping packet"); return; } outbuf = knet_h->pingbuf_crypt; if (pthread_mutex_lock(&knet_h->handle_stats_mutex) < 0) { log_err(knet_h, KNET_SUB_HEARTBEAT, "Unable to get mutex lock"); return; } knet_h->stats_extra.tx_crypt_ping_packets++; pthread_mutex_unlock(&knet_h->handle_stats_mutex); } stats_err = pthread_mutex_lock(&dst_link->link_stats_mutex); if (stats_err) { log_err(knet_h, KNET_SUB_HEARTBEAT, "Unable to get stats mutex lock for host %u link %u: %s", dst_host->host_id, dst_link->link_id, strerror(stats_err)); return; } retry: if (transport_get_connection_oriented(knet_h, dst_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) { len = sendto(dst_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, (struct sockaddr *) &dst_link->dst_addr, knet_h->knet_transport_fd_tracker[dst_link->outsock].sockaddr_len); } else { len = sendto(dst_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0); } savederrno = errno; dst_link->ping_last = clock_now; dst_link->status.stats.tx_ping_packets++; dst_link->status.stats.tx_ping_bytes += outlen; if (len != outlen) { err = transport_tx_sock_error(knet_h, dst_link->transport, dst_link->outsock, KNET_SUB_HEARTBEAT, len, savederrno); switch(err) { case -1: /* unrecoverable error */ log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to send ping (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s", dst_link->outsock, savederrno, strerror(savederrno), dst_link->status.src_ipaddr, dst_link->status.src_port, dst_link->status.dst_ipaddr, dst_link->status.dst_port); dst_link->status.stats.tx_ping_errors++; break; case 0: break; case 1: dst_link->status.stats.tx_ping_retries++; goto retry; break; } } else { dst_link->last_ping_size = outlen; } pthread_mutex_unlock(&dst_link->link_stats_mutex); } timespec_diff(pong_last, clock_now, &diff_ping); if ((pong_last.tv_nsec) && (diff_ping >= (dst_link->pong_timeout_adj * 1000llu))) { _link_down(knet_h, dst_host, dst_link); } } void _send_pings(knet_handle_t knet_h, int timed) { struct knet_host *dst_host; int link_idx; if (pthread_mutex_lock(&knet_h->hb_mutex)) { log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to get hb mutex lock"); return; } for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { if ((dst_host->link[link_idx].status.enabled != 1) || (dst_host->link[link_idx].transport == KNET_TRANSPORT_LOOPBACK ) || ((dst_host->link[link_idx].dynamic == KNET_LINK_DYNIP) && (dst_host->link[link_idx].status.dynconnected != 1))) continue; _handle_check_each(knet_h, dst_host, &dst_host->link[link_idx], timed); } } pthread_mutex_unlock(&knet_h->hb_mutex); } static void _adjust_pong_timeouts(knet_handle_t knet_h) { struct knet_host *dst_host; struct knet_link *dst_link; int link_idx; if (pthread_mutex_lock(&knet_h->backoff_mutex)) { log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to get backoff_mutex"); return; } for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { if ((dst_host->link[link_idx].status.enabled != 1) || (dst_host->link[link_idx].transport == KNET_TRANSPORT_LOOPBACK ) || ((dst_host->link[link_idx].dynamic == KNET_LINK_DYNIP) && (dst_host->link[link_idx].status.dynconnected != 1))) continue; dst_link = &dst_host->link[link_idx]; if (dst_link->pong_timeout_backoff > 1) { dst_link->pong_timeout_backoff--; } + // coverity[MISSING_LOCK:SUPPRESS] - lock is taken in parent function dst_link->pong_timeout_adj = (dst_link->pong_timeout * dst_link->pong_timeout_backoff) + (dst_link->status.latency * KNET_LINK_PONG_TIMEOUT_LAT_MUL); } } pthread_mutex_unlock(&knet_h->backoff_mutex); } void *_handle_heartbt_thread(void *data) { knet_handle_t knet_h = (knet_handle_t) data; int i = 1; set_thread_status(knet_h, KNET_THREAD_HB, KNET_THREAD_STARTED); /* preparing ping buffer */ knet_h->pingbuf->kh_version = KNET_HEADER_VERSION; knet_h->pingbuf->kh_type = KNET_HEADER_TYPE_PING; knet_h->pingbuf->kh_node = htons(knet_h->host_id); while (!shutdown_in_progress(knet_h)) { usleep(KNET_THREADS_TIMERES); if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) { log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to get read lock"); continue; } /* * _adjust_pong_timeouts should execute approx once a second. */ if ((i % (1000000 / KNET_THREADS_TIMERES)) == 0) { _adjust_pong_timeouts(knet_h); i = 1; } else { i++; } _send_pings(knet_h, 1); pthread_rwlock_unlock(&knet_h->global_rwlock); } set_thread_status(knet_h, KNET_THREAD_HB, KNET_THREAD_STOPPED); return NULL; } diff --git a/libknet/threads_pmtud.c b/libknet/threads_pmtud.c index 57610bc6..242b27f9 100644 --- a/libknet/threads_pmtud.c +++ b/libknet/threads_pmtud.c @@ -1,800 +1,803 @@ /* * Copyright (C) 2015-2025 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under LGPL-2.0+ */ #include "config.h" #include #include #include #include #include "crypto.h" #include "links.h" #include "host.h" #include "logging.h" #include "transports.h" #include "threads_common.h" #include "threads_pmtud.h" static int _calculate_manual_mtu(knet_handle_t knet_h, struct knet_link *dst_link) { size_t ipproto_overhead_len; /* onwire packet overhead (protocol based) */ switch (dst_link->dst_addr.ss_family) { case AF_INET6: ipproto_overhead_len = KNET_PMTUD_OVERHEAD_V6 + dst_link->proto_overhead; break; case AF_INET: ipproto_overhead_len = KNET_PMTUD_OVERHEAD_V4 + dst_link->proto_overhead; break; default: log_debug(knet_h, KNET_SUB_PMTUD, "unknown protocol"); return 0; break; } dst_link->status.mtu = calc_max_data_outlen(knet_h, knet_h->manual_mtu - ipproto_overhead_len); return 1; } static int _handle_check_link_pmtud(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link) { int err, ret, savederrno, mutex_retry_limit, failsafe, use_kernel_mtu, warn_once; uint32_t kernel_mtu; /* record kernel_mtu from EMSGSIZE */ size_t onwire_len; /* current packet onwire size */ size_t ipproto_overhead_len; /* onwire packet overhead (protocol based) */ size_t max_mtu_len; /* max mtu for protocol */ size_t data_len; /* how much data we can send in the packet * generally would be onwire_len - ipproto_overhead_len * needs to be adjusted for crypto */ size_t app_mtu_len; /* real data that we can send onwire */ ssize_t len; /* len of what we were able to sendto onwire */ struct timespec ts, pmtud_crypto_start_ts, pmtud_crypto_stop_ts; unsigned long long pong_timeout_adj_tmp, timediff; int pmtud_crypto_reduce = 1; unsigned char *outbuf = (unsigned char *)knet_h->pmtudbuf; warn_once = 0; mutex_retry_limit = 0; failsafe = 0; knet_h->pmtudbuf->khp_pmtud_link = dst_link->link_id; switch (dst_link->dst_addr.ss_family) { case AF_INET6: max_mtu_len = KNET_PMTUD_SIZE_V6; ipproto_overhead_len = KNET_PMTUD_OVERHEAD_V6 + dst_link->proto_overhead; break; case AF_INET: max_mtu_len = KNET_PMTUD_SIZE_V4; ipproto_overhead_len = KNET_PMTUD_OVERHEAD_V4 + dst_link->proto_overhead; break; default: log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD aborted, unknown protocol"); return -1; break; } dst_link->last_bad_mtu = 0; dst_link->last_good_mtu = dst_link->last_ping_size + ipproto_overhead_len; /* * discovery starts from the top because kernel will * refuse to send packets > current iface mtu. * this saves us some time and network bw. */ onwire_len = max_mtu_len; restart: /* * prevent a race when interface mtu is changed _exactly_ during * the discovery process and it's complex to detect. Easier * to wait the next loop. * 30 is not an arbitrary value. To bisect from 576 to 128000 doesn't * take more than 18/19 steps. */ if (failsafe == 30) { log_err(knet_h, KNET_SUB_PMTUD, "Aborting PMTUD process: Too many attempts. MTU might have changed during discovery."); return -1; } else { failsafe++; } /* * common to all packets */ /* * calculate the application MTU based on current onwire_len minus ipproto_overhead_len */ app_mtu_len = calc_max_data_outlen(knet_h, onwire_len - ipproto_overhead_len); /* * recalculate onwire len back that might be different based * on data padding from crypto layer. */ onwire_len = calc_data_outlen(knet_h, app_mtu_len + KNET_HEADER_ALL_SIZE) + ipproto_overhead_len; /* * calculate the size of what we need to send to sendto(2). * see also onwire.c for packet format explanation. */ data_len = app_mtu_len + knet_h->sec_hash_size + knet_h->sec_salt_size + KNET_HEADER_ALL_SIZE; if (knet_h->crypto_in_use_config) { if (data_len < (knet_h->sec_hash_size + knet_h->sec_salt_size) + 1) { log_debug(knet_h, KNET_SUB_PMTUD, "Aborting PMTUD process: link mtu smaller than crypto header detected (link might have been disconnected)"); return -1; } knet_h->pmtudbuf->khp_pmtud_size = onwire_len; if (crypto_encrypt_and_sign(knet_h, (const unsigned char *)knet_h->pmtudbuf, data_len - (knet_h->sec_hash_size + knet_h->sec_salt_size), knet_h->pmtudbuf_crypt, (ssize_t *)&data_len) < 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to crypto pmtud packet"); return -1; } outbuf = knet_h->pmtudbuf_crypt; if (pthread_mutex_lock(&knet_h->handle_stats_mutex) < 0) { log_err(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock"); return -1; } knet_h->stats_extra.tx_crypt_pmtu_packets++; pthread_mutex_unlock(&knet_h->handle_stats_mutex); } else { knet_h->pmtudbuf->khp_pmtud_size = onwire_len; } /* link has gone down, aborting pmtud */ if (dst_link->status.connected != 1) { log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD detected host (%u) link (%u) has been disconnected", dst_host->host_id, dst_link->link_id); return -1; } if (dst_link->transport_connected != 1) { log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD detected host (%u) link (%u) has been disconnected", dst_host->host_id, dst_link->link_id); return -1; } if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock"); return -1; } if (knet_h->pmtud_abort) { pthread_mutex_unlock(&knet_h->pmtud_mutex); errno = EDEADLK; return -1; } + // coverity[ORDER_REVERSAL:SUPPRESS] - This is the normal lock ordering savederrno = pthread_mutex_lock(&knet_h->tx_mutex); if (savederrno) { pthread_mutex_unlock(&knet_h->pmtud_mutex); log_err(knet_h, KNET_SUB_PMTUD, "Unable to get TX mutex lock: %s", strerror(savederrno)); return -1; } savederrno = pthread_mutex_lock(&dst_link->link_stats_mutex); if (savederrno) { pthread_mutex_unlock(&knet_h->pmtud_mutex); pthread_mutex_unlock(&knet_h->tx_mutex); log_err(knet_h, KNET_SUB_PMTUD, "Unable to get stats mutex lock for host %u link %u: %s", dst_host->host_id, dst_link->link_id, strerror(savederrno)); return -1; } retry: if (transport_get_connection_oriented(knet_h, dst_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) { len = sendto(dst_link->outsock, outbuf, data_len, MSG_DONTWAIT | MSG_NOSIGNAL, (struct sockaddr *) &dst_link->dst_addr, knet_h->knet_transport_fd_tracker[dst_link->outsock].sockaddr_len); } else { len = sendto(dst_link->outsock, outbuf, data_len, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0); } savederrno = errno; /* * we cannot hold a lock on kmtu_mutex between resetting * knet_h->kernel_mtu here and below where it's used. * use_kernel_mtu tells us if the knet_h->kernel_mtu was * set to 0 and we can trust its value later. */ use_kernel_mtu = 0; if (pthread_mutex_lock(&knet_h->kmtu_mutex) == 0) { use_kernel_mtu = 1; knet_h->kernel_mtu = 0; pthread_mutex_unlock(&knet_h->kmtu_mutex); } kernel_mtu = 0; err = transport_tx_sock_error(knet_h, dst_link->transport, dst_link->outsock, KNET_SUB_PMTUD, len, savederrno); switch(err) { case -1: /* unrecoverable error */ log_debug(knet_h, KNET_SUB_PMTUD, "Unable to send pmtu packet (sendto): %d %s", savederrno, strerror(savederrno)); pthread_mutex_unlock(&knet_h->tx_mutex); pthread_mutex_unlock(&knet_h->pmtud_mutex); dst_link->status.stats.tx_pmtu_errors++; pthread_mutex_unlock(&dst_link->link_stats_mutex); return -1; case 0: /* ignore error and continue */ break; case 1: /* retry to send those same data */ dst_link->status.stats.tx_pmtu_retries++; goto retry; break; } pthread_mutex_unlock(&knet_h->tx_mutex); if (len != (ssize_t )data_len) { pthread_mutex_unlock(&dst_link->link_stats_mutex); if (savederrno == EMSGSIZE || savederrno == EPERM) { /* * we cannot hold a lock on kmtu_mutex between resetting * knet_h->kernel_mtu and here. * use_kernel_mtu tells us if the knet_h->kernel_mtu was * set to 0 previously and we can trust its value now. */ if (use_kernel_mtu) { use_kernel_mtu = 0; if (pthread_mutex_lock(&knet_h->kmtu_mutex) == 0) { kernel_mtu = knet_h->kernel_mtu; pthread_mutex_unlock(&knet_h->kmtu_mutex); } } if (kernel_mtu > 0) { dst_link->last_bad_mtu = kernel_mtu + 1; } else { dst_link->last_bad_mtu = onwire_len; } } else { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to send pmtu packet len: %zu err: %s", onwire_len, strerror(savederrno)); } } else { dst_link->last_sent_mtu = onwire_len; dst_link->last_recv_mtu = 0; dst_link->status.stats.tx_pmtu_packets++; dst_link->status.stats.tx_pmtu_bytes += data_len; pthread_mutex_unlock(&dst_link->link_stats_mutex); if (clock_gettime(CLOCK_REALTIME, &ts) < 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get current time: %s", strerror(errno)); pthread_mutex_unlock(&knet_h->pmtud_mutex); return -1; } /* * non fatal, we can wait the next round to reduce the * multiplier */ if (clock_gettime(CLOCK_MONOTONIC, &pmtud_crypto_start_ts) < 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get current time: %s", strerror(errno)); pmtud_crypto_reduce = 0; } /* * set PMTUd reply timeout to match pong_timeout on a given link * * math: internally pong_timeout is expressed in microseconds, while * the public API exports milliseconds. So careful with the 0's here. * the loop is necessary because we are grabbing the current time just above * and add values to it that could overflow into seconds. */ if (pthread_mutex_lock(&knet_h->backoff_mutex)) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get backoff_mutex"); pthread_mutex_unlock(&knet_h->pmtud_mutex); return -1; } if (knet_h->crypto_in_use_config) { /* * crypto, under pressure, is a royal PITA */ pong_timeout_adj_tmp = dst_link->pong_timeout_adj * dst_link->pmtud_crypto_timeout_multiplier; } else { pong_timeout_adj_tmp = dst_link->pong_timeout_adj; } ts.tv_sec += pong_timeout_adj_tmp / 1000000; ts.tv_nsec += (((pong_timeout_adj_tmp) % 1000000) * 1000); while (ts.tv_nsec > 1000000000) { ts.tv_sec += 1; ts.tv_nsec -= 1000000000; } pthread_mutex_unlock(&knet_h->backoff_mutex); knet_h->pmtud_waiting = 1; + // coverity[BAD_CHECK_OF_WAIT_COND:SUPPRESS] no wait loop needed here ret = pthread_cond_timedwait(&knet_h->pmtud_cond, &knet_h->pmtud_mutex, &ts); knet_h->pmtud_waiting = 0; if (knet_h->pmtud_abort) { pthread_mutex_unlock(&knet_h->pmtud_mutex); errno = EDEADLK; return -1; } /* * we cannot use shutdown_in_progress in here because * we already hold the read lock */ if (knet_h->fini_in_progress) { pthread_mutex_unlock(&knet_h->pmtud_mutex); log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD aborted. shutdown in progress"); return -1; } if (ret) { if (ret == ETIMEDOUT) { if ((knet_h->crypto_in_use_config) && (dst_link->pmtud_crypto_timeout_multiplier < KNET_LINK_PMTUD_CRYPTO_TIMEOUT_MULTIPLIER_MAX)) { dst_link->pmtud_crypto_timeout_multiplier = dst_link->pmtud_crypto_timeout_multiplier * 2; pmtud_crypto_reduce = 0; log_debug(knet_h, KNET_SUB_PMTUD, "Increasing PMTUd response timeout multiplier to (%u) for host %u link: %u", dst_link->pmtud_crypto_timeout_multiplier, dst_host->host_id, dst_link->link_id); pthread_mutex_unlock(&knet_h->pmtud_mutex); goto restart; } if (!warn_once) { log_warn(knet_h, KNET_SUB_PMTUD, "possible MTU misconfiguration detected. " "kernel is reporting MTU: %u bytes for " "host %u link %u but the other node is " "not acknowledging packets of this size. ", dst_link->last_sent_mtu, dst_host->host_id, dst_link->link_id); log_warn(knet_h, KNET_SUB_PMTUD, "This can be caused by this node interface MTU " "too big or a network device that does not " "support or has been misconfigured to manage MTU " "of this size, or packet loss. knet will continue " "to run but performances might be affected."); warn_once = 1; } } else { pthread_mutex_unlock(&knet_h->pmtud_mutex); if (mutex_retry_limit == 3) { log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD aborted, unable to get mutex lock"); return -1; } mutex_retry_limit++; goto restart; } } if ((knet_h->crypto_in_use_config) && (pmtud_crypto_reduce == 1) && (dst_link->pmtud_crypto_timeout_multiplier > KNET_LINK_PMTUD_CRYPTO_TIMEOUT_MULTIPLIER_MIN)) { if (!clock_gettime(CLOCK_MONOTONIC, &pmtud_crypto_stop_ts)) { timespec_diff(pmtud_crypto_start_ts, pmtud_crypto_stop_ts, &timediff); if (((pong_timeout_adj_tmp * 1000) / 2) > timediff) { dst_link->pmtud_crypto_timeout_multiplier = dst_link->pmtud_crypto_timeout_multiplier / 2; log_debug(knet_h, KNET_SUB_PMTUD, "Decreasing PMTUd response timeout multiplier to (%u) for host %u link: %u", dst_link->pmtud_crypto_timeout_multiplier, dst_host->host_id, dst_link->link_id); } } else { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get current time: %s", strerror(errno)); } } + // coverity[MISSING_LOCK:SUPPRESS] - lock is held by the calling function if ((dst_link->last_recv_mtu != onwire_len) || (ret)) { dst_link->last_bad_mtu = onwire_len; } else { int found_mtu = 0; if (knet_h->sec_block_size) { if ((onwire_len + knet_h->sec_block_size >= max_mtu_len) || ((dst_link->last_bad_mtu) && (dst_link->last_bad_mtu <= (onwire_len + knet_h->sec_block_size)))) { found_mtu = 1; } } else { if ((onwire_len == max_mtu_len) || ((dst_link->last_bad_mtu) && (dst_link->last_bad_mtu == (onwire_len + 1))) || (dst_link->last_bad_mtu == dst_link->last_good_mtu)) { found_mtu = 1; } } if (found_mtu) { /* * account for IP overhead, knet headers and crypto in PMTU calculation */ dst_link->status.mtu = calc_max_data_outlen(knet_h, onwire_len - ipproto_overhead_len); pthread_mutex_unlock(&knet_h->pmtud_mutex); return 0; } dst_link->last_good_mtu = onwire_len; } } if (kernel_mtu) { onwire_len = kernel_mtu; } else { onwire_len = (dst_link->last_good_mtu + dst_link->last_bad_mtu) / 2; } pthread_mutex_unlock(&knet_h->pmtud_mutex); goto restart; } static int _handle_check_pmtud(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link, int force_run) { uint8_t saved_valid_pmtud; unsigned int saved_pmtud; struct timespec clock_now; unsigned long long diff_pmtud, interval; if (clock_gettime(CLOCK_MONOTONIC, &clock_now) != 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get monotonic clock"); return 0; } if (!force_run) { interval = knet_h->pmtud_interval * 1000000000llu; /* nanoseconds */ timespec_diff(dst_link->pmtud_last, clock_now, &diff_pmtud); if (diff_pmtud < interval) { return dst_link->has_valid_mtu; } } /* * status.proto_overhead should include all IP/(UDP|SCTP)/knet headers * * please note that it is not the same as link->proto_overhead that * includes only either UDP or SCTP (at the moment) overhead. */ switch (dst_link->dst_addr.ss_family) { case AF_INET6: dst_link->status.proto_overhead = KNET_PMTUD_OVERHEAD_V6 + dst_link->proto_overhead + KNET_HEADER_ALL_SIZE + knet_h->sec_hash_size + knet_h->sec_salt_size; break; case AF_INET: dst_link->status.proto_overhead = KNET_PMTUD_OVERHEAD_V4 + dst_link->proto_overhead + KNET_HEADER_ALL_SIZE + knet_h->sec_hash_size + knet_h->sec_salt_size; break; } saved_pmtud = dst_link->status.mtu; saved_valid_pmtud = dst_link->has_valid_mtu; log_debug(knet_h, KNET_SUB_PMTUD, "Starting PMTUD for host: %u link: %u", dst_host->host_id, dst_link->link_id); errno = 0; if (_handle_check_link_pmtud(knet_h, dst_host, dst_link) < 0) { if (errno == EDEADLK) { log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD for host: %u link: %u has been rescheduled", dst_host->host_id, dst_link->link_id); dst_link->status.mtu = saved_pmtud; dst_link->has_valid_mtu = saved_valid_pmtud; errno = EDEADLK; return dst_link->has_valid_mtu; } dst_link->has_valid_mtu = 0; } else { if (dst_link->status.mtu < calc_min_mtu(knet_h)) { log_info(knet_h, KNET_SUB_PMTUD, "Invalid MTU detected for host: %u link: %u mtu: %u", dst_host->host_id, dst_link->link_id, dst_link->status.mtu); dst_link->has_valid_mtu = 0; } else { dst_link->has_valid_mtu = 1; } if (dst_link->has_valid_mtu) { if ((saved_pmtud) && (saved_pmtud != dst_link->status.mtu)) { log_info(knet_h, KNET_SUB_PMTUD, "PMTUD link change for host: %u link: %u from %u to %u", dst_host->host_id, dst_link->link_id, saved_pmtud, dst_link->status.mtu); } log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD completed for host: %u link: %u current link mtu: %u", dst_host->host_id, dst_link->link_id, dst_link->status.mtu); /* * set pmtud_last, if we can, after we are done with the PMTUd process * because it can take a very long time. */ dst_link->pmtud_last = clock_now; if (!clock_gettime(CLOCK_MONOTONIC, &clock_now)) { dst_link->pmtud_last = clock_now; } } } if (saved_valid_pmtud != dst_link->has_valid_mtu) { _host_dstcache_update_async(knet_h, dst_host); } return dst_link->has_valid_mtu; } void *_handle_pmtud_link_thread(void *data) { knet_handle_t knet_h = (knet_handle_t) data; struct knet_host *dst_host; struct knet_link *dst_link; int link_idx; unsigned int have_mtu; unsigned int lower_mtu; int link_has_mtu; int force_run = 0; set_thread_status(knet_h, KNET_THREAD_PMTUD, KNET_THREAD_STARTED); knet_h->data_mtu = calc_min_mtu(knet_h); /* preparing pmtu buffer */ knet_h->pmtudbuf->kh_version = KNET_HEADER_VERSION; knet_h->pmtudbuf->kh_type = KNET_HEADER_TYPE_PMTUD; knet_h->pmtudbuf->kh_node = htons(knet_h->host_id); while (!shutdown_in_progress(knet_h)) { usleep(KNET_THREADS_TIMERES); if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock"); continue; } knet_h->pmtud_abort = 0; knet_h->pmtud_running = 1; force_run = knet_h->pmtud_forcerun; knet_h->pmtud_forcerun = 0; pthread_mutex_unlock(&knet_h->pmtud_mutex); if (force_run) { log_debug(knet_h, KNET_SUB_PMTUD, "PMTUd request to rerun has been received"); } if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get read lock"); continue; } lower_mtu = KNET_PMTUD_SIZE_V4; have_mtu = 0; for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { dst_link = &dst_host->link[link_idx]; if ((dst_link->status.enabled != 1) || (dst_link->status.connected != 1) || (dst_host->link[link_idx].transport == KNET_TRANSPORT_LOOPBACK) || (!dst_link->last_ping_size) || ((dst_link->dynamic == KNET_LINK_DYNIP) && (dst_link->status.dynconnected != 1))) continue; if (!knet_h->manual_mtu) { link_has_mtu = _handle_check_pmtud(knet_h, dst_host, dst_link, force_run); if (errno == EDEADLK) { goto out_unlock; } if (link_has_mtu) { have_mtu = 1; if (dst_link->status.mtu < lower_mtu) { lower_mtu = dst_link->status.mtu; } } } else { link_has_mtu = _calculate_manual_mtu(knet_h, dst_link); if (link_has_mtu) { have_mtu = 1; if (dst_link->status.mtu < lower_mtu) { lower_mtu = dst_link->status.mtu; } } } } } if (have_mtu) { if (knet_h->data_mtu != lower_mtu) { knet_h->data_mtu = lower_mtu; log_info(knet_h, KNET_SUB_PMTUD, "Global data MTU changed to: %u", knet_h->data_mtu); if (knet_h->pmtud_notify_fn) { knet_h->pmtud_notify_fn(knet_h->pmtud_notify_fn_private_data, knet_h->data_mtu); } } } out_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock"); } else { knet_h->pmtud_running = 0; pthread_mutex_unlock(&knet_h->pmtud_mutex); } } set_thread_status(knet_h, KNET_THREAD_PMTUD, KNET_THREAD_STOPPED); return NULL; } int knet_handle_pmtud_getfreq(knet_handle_t knet_h, unsigned int *interval) { int savederrno = 0; if (!_is_valid_handle(knet_h)) { return -1; } if (!interval) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } *interval = knet_h->pmtud_interval; pthread_rwlock_unlock(&knet_h->global_rwlock); errno = 0; return 0; } int knet_handle_pmtud_setfreq(knet_handle_t knet_h, unsigned int interval) { int savederrno = 0; if (!_is_valid_handle(knet_h)) { return -1; } if ((!interval) || (interval > 86400)) { errno = EINVAL; return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } knet_h->pmtud_interval = interval; log_debug(knet_h, KNET_SUB_HANDLE, "PMTUd interval set to: %u seconds", interval); pthread_rwlock_unlock(&knet_h->global_rwlock); errno = 0; return 0; } int knet_handle_enable_pmtud_notify(knet_handle_t knet_h, void *pmtud_notify_fn_private_data, void (*pmtud_notify_fn) ( void *private_data, unsigned int data_mtu)) { int savederrno = 0; if (!_is_valid_handle(knet_h)) { return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } knet_h->pmtud_notify_fn_private_data = pmtud_notify_fn_private_data; knet_h->pmtud_notify_fn = pmtud_notify_fn; if (knet_h->pmtud_notify_fn) { log_debug(knet_h, KNET_SUB_HANDLE, "pmtud_notify_fn enabled"); } else { log_debug(knet_h, KNET_SUB_HANDLE, "pmtud_notify_fn disabled"); } pthread_rwlock_unlock(&knet_h->global_rwlock); errno = 0; return 0; } int knet_handle_pmtud_set(knet_handle_t knet_h, unsigned int iface_mtu) { int savederrno = 0; if (!_is_valid_handle(knet_h)) { return -1; } if (iface_mtu > KNET_PMTUD_SIZE_V4) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_PMTUD, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } log_info(knet_h, KNET_SUB_PMTUD, "MTU manually set to: %u", iface_mtu); knet_h->manual_mtu = iface_mtu; force_pmtud_run(knet_h, KNET_SUB_PMTUD, 0, 0); pthread_rwlock_unlock(&knet_h->global_rwlock); errno = 0; return 0; } int knet_handle_pmtud_get(knet_handle_t knet_h, unsigned int *data_mtu) { int savederrno = 0; if (!_is_valid_handle(knet_h)) { return -1; } if (!data_mtu) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } *data_mtu = knet_h->data_mtu; pthread_rwlock_unlock(&knet_h->global_rwlock); errno = 0; return 0; } diff --git a/libknet/threads_tx.c b/libknet/threads_tx.c index 20869d1b..8a76e607 100644 --- a/libknet/threads_tx.c +++ b/libknet/threads_tx.c @@ -1,888 +1,891 @@ /* * Copyright (C) 2012-2025 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include #include "compat.h" #include "compress.h" #include "crypto.h" #include "host.h" #include "link.h" #include "logging.h" #include "transports.h" #include "transport_common.h" #include "threads_common.h" #include "threads_heartbeat.h" #include "threads_tx.h" #include "netutils.h" /* * SEND */ static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_mmsghdr *msg, int msgs_to_send) { int link_idx, msg_idx, sent_msgs, prev_sent, progress; int err = 0, savederrno = 0, locked = 0; unsigned int i; struct knet_mmsghdr *cur; struct knet_link *cur_link; for (link_idx = 0; link_idx < dst_host->active_link_entries; link_idx++) { prev_sent = 0; progress = 1; locked = 0; cur_link = &dst_host->link[dst_host->active_links[link_idx]]; if (cur_link->transport == KNET_TRANSPORT_LOOPBACK) { continue; } savederrno = pthread_mutex_lock(&cur_link->link_stats_mutex); if (savederrno) { log_err(knet_h, KNET_SUB_TX, "Unable to get stats mutex lock for host %u link %u: %s", dst_host->host_id, cur_link->link_id, strerror(savederrno)); continue; } locked = 1; msg_idx = 0; while (msg_idx < msgs_to_send) { msg[msg_idx].msg_hdr.msg_name = &cur_link->dst_addr; msg[msg_idx].msg_hdr.msg_namelen = knet_h->knet_transport_fd_tracker[cur_link->outsock].sockaddr_len; /* Cast for Linux/BSD compatibility */ for (i=0; i<(unsigned int)msg[msg_idx].msg_hdr.msg_iovlen; i++) { cur_link->status.stats.tx_data_bytes += msg[msg_idx].msg_hdr.msg_iov[i].iov_len; } cur_link->status.stats.tx_data_packets++; msg_idx++; } retry: cur = &msg[prev_sent]; + // coverity[INTEGER_OVERFLOW:SUPPRESS] - sent_msgs errors are not added to prev_sent because of transport_tx_sock_error() handling sent_msgs = _sendmmsg(dst_host->link[dst_host->active_links[link_idx]].outsock, transport_get_connection_oriented(knet_h, dst_host->link[dst_host->active_links[link_idx]].transport), &cur[0], msgs_to_send - prev_sent, MSG_DONTWAIT | MSG_NOSIGNAL); savederrno = errno; err = transport_tx_sock_error(knet_h, dst_host->link[dst_host->active_links[link_idx]].transport, dst_host->link[dst_host->active_links[link_idx]].outsock, KNET_SUB_TX, sent_msgs, savederrno); switch(err) { case -1: /* unrecoverable error */ cur_link->status.stats.tx_data_errors++; goto out_unlock; break; case 0: /* ignore error and continue */ break; case 1: /* retry to send those same data */ cur_link->status.stats.tx_data_retries++; goto retry; break; } prev_sent = prev_sent + sent_msgs; if ((sent_msgs >= 0) && (prev_sent < msgs_to_send)) { if ((sent_msgs) || (progress)) { if (sent_msgs) { progress = 1; } else { progress = 0; } log_trace(knet_h, KNET_SUB_TX, "Unable to send all (%d/%d) data packets to host %s (%u) link %s:%s (%u)", sent_msgs, msg_idx, dst_host->name, dst_host->host_id, dst_host->link[dst_host->active_links[link_idx]].status.dst_ipaddr, dst_host->link[dst_host->active_links[link_idx]].status.dst_port, dst_host->link[dst_host->active_links[link_idx]].link_id); goto retry; } if (!progress) { savederrno = EAGAIN; err = -1; goto out_unlock; } } if ((dst_host->link_handler_policy == KNET_LINK_POLICY_RR) && (dst_host->active_link_entries > 1)) { uint8_t cur_link_id = dst_host->active_links[0]; memmove(&dst_host->active_links[0], &dst_host->active_links[1], KNET_MAX_LINK - 1); dst_host->active_links[dst_host->active_link_entries - 1] = cur_link_id; break; } pthread_mutex_unlock(&cur_link->link_stats_mutex); locked = 0; } out_unlock: if (locked) { pthread_mutex_unlock(&cur_link->link_stats_mutex); } errno = savederrno; return err; } static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t channel, int is_sync) { size_t outlen, frag_len; struct knet_host *dst_host; knet_node_id_t dst_host_ids_temp[KNET_MAX_HOST]; size_t dst_host_ids_entries_temp = 0; knet_node_id_t dst_host_ids[KNET_MAX_HOST]; size_t dst_host_ids_entries = 0; int bcast = 1; struct iovec iov_out[PCKT_FRAG_MAX][2]; int iovcnt_out = 2; uint8_t frag_idx; unsigned int temp_data_mtu; size_t host_idx; int send_mcast = 0; struct knet_header *inbuf; int savederrno = 0; int err = 0; seq_num_t tx_seq_num; struct knet_mmsghdr msg[PCKT_FRAG_MAX]; int msgs_to_send, msg_idx; unsigned int i; int j; int send_local = 0; int data_compressed = 0; size_t uncrypted_frag_size; int stats_locked = 0, stats_err = 0; inbuf = knet_h->recv_from_sock_buf; if (knet_h->enabled != 1) { log_debug(knet_h, KNET_SUB_TX, "Received data packet but forwarding is disabled"); savederrno = ECANCELED; err = -1; goto out_unlock; } /* * move this into a separate function to expand on * extra switching rules */ switch(inbuf->kh_type) { case KNET_HEADER_TYPE_DATA: if (knet_h->dst_host_filter_fn) { bcast = knet_h->dst_host_filter_fn( knet_h->dst_host_filter_fn_private_data, (const unsigned char *)inbuf->khp_data_userdata, inlen, KNET_NOTIFY_TX, knet_h->host_id, knet_h->host_id, &channel, dst_host_ids_temp, &dst_host_ids_entries_temp); if (bcast < 0) { log_debug(knet_h, KNET_SUB_TX, "Error from dst_host_filter_fn: %d", bcast); savederrno = EFAULT; err = -1; goto out_unlock; } if ((!bcast) && (!dst_host_ids_entries_temp)) { log_debug(knet_h, KNET_SUB_TX, "Message is unicast but no dst_host_ids_entries"); savederrno = EINVAL; err = -1; goto out_unlock; } if ((!bcast) && (dst_host_ids_entries_temp > KNET_MAX_HOST)) { log_debug(knet_h, KNET_SUB_TX, "dst_host_filter_fn returned too many destinations"); savederrno = EINVAL; err = -1; goto out_unlock; } } /* Send to localhost if appropriate and enabled */ if (knet_h->has_loop_link) { send_local = 0; if (bcast) { send_local = 1; } else { for (i=0; i< dst_host_ids_entries_temp; i++) { if (dst_host_ids_temp[i] == knet_h->host_id) { send_local = 1; } } } if (send_local) { const unsigned char *buf = inbuf->khp_data_userdata; ssize_t buflen = inlen; struct knet_link *local_link; local_link = knet_h->host_index[knet_h->host_id]->link; local_retry: + // coverity[INTEGER_OVERFLOW:SUPPRESS] - buflen is passsed in as a size_t so can't be negative err = write(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], buf, buflen); if (err < 0) { log_err(knet_h, KNET_SUB_TRANSP_LOOPBACK, "send local failed. error=%s\n", strerror(errno)); local_link->status.stats.tx_data_errors++; } if (err > 0 && err < buflen) { log_debug(knet_h, KNET_SUB_TRANSP_LOOPBACK, "send local incomplete=%d bytes of %zu\n", err, inlen); local_link->status.stats.tx_data_retries++; buf += err; buflen -= err; goto local_retry; } if (err == buflen) { local_link->status.stats.tx_data_packets++; local_link->status.stats.tx_data_bytes += inlen; } } } break; default: log_warn(knet_h, KNET_SUB_TX, "Receiving unknown messages from socket"); savederrno = ENOMSG; err = -1; goto out_unlock; break; } if (is_sync) { if ((bcast) || ((!bcast) && (dst_host_ids_entries_temp > 1))) { log_debug(knet_h, KNET_SUB_TX, "knet_send_sync is only supported with unicast packets for one destination"); savederrno = E2BIG; err = -1; goto out_unlock; } } /* * check destinations hosts before spending time * in fragmenting/encrypting packets to save * time processing data for unreachable hosts. * for unicast, also remap the destination data * to skip unreachable hosts. */ if (!bcast) { dst_host_ids_entries = 0; for (host_idx = 0; host_idx < dst_host_ids_entries_temp; host_idx++) { dst_host = knet_h->host_index[dst_host_ids_temp[host_idx]]; if (!dst_host) { continue; } if (!(dst_host->host_id == knet_h->host_id && knet_h->has_loop_link) && dst_host->status.reachable) { dst_host_ids[dst_host_ids_entries] = dst_host_ids_temp[host_idx]; dst_host_ids_entries++; } } if (!dst_host_ids_entries) { savederrno = EHOSTDOWN; err = -1; goto out_unlock; } } else { send_mcast = 0; for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { if (!(dst_host->host_id == knet_h->host_id && knet_h->has_loop_link) && dst_host->status.reachable) { send_mcast = 1; break; } } if (!send_mcast) { savederrno = EHOSTDOWN; err = -1; goto out_unlock; } } if (!knet_h->data_mtu) { /* * using MIN_MTU_V4 for data mtu is not completely accurate but safe enough */ log_debug(knet_h, KNET_SUB_TX, "Received data packet but data MTU is still unknown." " Packet might not be delivered." " Assuming minimum IPv4 MTU (%d)", KNET_PMTUD_MIN_MTU_V4); temp_data_mtu = KNET_PMTUD_MIN_MTU_V4; } else { /* * take a copy of the mtu to avoid value changing under * our feet while we are sending a fragmented pckt */ temp_data_mtu = knet_h->data_mtu; } /* * compress data */ if ((knet_h->compress_model > 0) && (inlen > knet_h->compress_threshold)) { size_t cmp_outlen = KNET_DATABUFSIZE_COMPRESS; struct timespec start_time; struct timespec end_time; uint64_t compress_time; clock_gettime(CLOCK_MONOTONIC, &start_time); err = compress(knet_h, (const unsigned char *)inbuf->khp_data_userdata, inlen, knet_h->send_to_links_buf_compress, (ssize_t *)&cmp_outlen); savederrno = errno; stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); if (stats_err < 0) { log_err(knet_h, KNET_SUB_TX, "Unable to get mutex lock: %s", strerror(stats_err)); err = -1; savederrno = stats_err; goto out_unlock; } stats_locked = 1; /* Collect stats */ clock_gettime(CLOCK_MONOTONIC, &end_time); timespec_diff(start_time, end_time, &compress_time); if (compress_time < knet_h->stats.tx_compress_time_min) { knet_h->stats.tx_compress_time_min = compress_time; } if (compress_time > knet_h->stats.tx_compress_time_max) { knet_h->stats.tx_compress_time_max = compress_time; } knet_h->stats.tx_compress_time_ave = (unsigned long long)(knet_h->stats.tx_compress_time_ave * knet_h->stats.tx_compressed_packets + compress_time) / (knet_h->stats.tx_compressed_packets+1); if (err < 0) { log_warn(knet_h, KNET_SUB_COMPRESS, "Compression failed (%d): %s", err, strerror(savederrno)); } else { knet_h->stats.tx_compressed_packets++; knet_h->stats.tx_compressed_original_bytes += inlen; knet_h->stats.tx_compressed_size_bytes += cmp_outlen; if (cmp_outlen < inlen) { memmove(inbuf->khp_data_userdata, knet_h->send_to_links_buf_compress, cmp_outlen); inlen = cmp_outlen; data_compressed = 1; } } } if (!stats_locked) { stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); if (stats_err < 0) { log_err(knet_h, KNET_SUB_TX, "Unable to get mutex lock: %s", strerror(stats_err)); err = -1; savederrno = stats_err; goto out_unlock; } } if (knet_h->compress_model > 0 && !data_compressed) { knet_h->stats.tx_uncompressed_packets++; } pthread_mutex_unlock(&knet_h->handle_stats_mutex); stats_locked = 0; /* * prepare the outgoing buffers */ frag_len = inlen; frag_idx = 0; inbuf->khp_data_bcast = bcast; inbuf->khp_data_frag_num = ceil((float)inlen / temp_data_mtu); inbuf->khp_data_channel = channel; if (data_compressed) { inbuf->khp_data_compress = knet_h->compress_model; } else { inbuf->khp_data_compress = 0; } if (pthread_mutex_lock(&knet_h->tx_seq_num_mutex)) { log_debug(knet_h, KNET_SUB_TX, "Unable to get seq mutex lock"); goto out_unlock; } knet_h->tx_seq_num++; /* * force seq_num 0 to detect a node that has crashed and rejoining * the knet instance. seq_num 0 will clear the buffers in the RX * thread */ if (knet_h->tx_seq_num == 0) { knet_h->tx_seq_num++; } /* * cache the value in locked context */ tx_seq_num = knet_h->tx_seq_num; inbuf->khp_data_seq_num = htons(knet_h->tx_seq_num); pthread_mutex_unlock(&knet_h->tx_seq_num_mutex); /* * forcefully broadcast a ping to all nodes every SEQ_MAX / 8 * pckts. * this solves 2 problems: * 1) on TX socket overloads we generate extra pings to keep links alive * 2) in 3+ nodes setup, where all the traffic is flowing between node 1 and 2, * node 3+ will be able to keep in sync on the TX seq_num even without * receiving traffic or pings in betweens. This avoids issues with * rollover of the circular buffer */ if (tx_seq_num % (SEQ_MAX / 8) == 0) { _send_pings(knet_h, 0); } if (inbuf->khp_data_frag_num > 1) { while (frag_idx < inbuf->khp_data_frag_num) { /* * set the iov_base */ iov_out[frag_idx][0].iov_base = (void *)knet_h->send_to_links_buf[frag_idx]; iov_out[frag_idx][0].iov_len = KNET_HEADER_DATA_SIZE; iov_out[frag_idx][1].iov_base = inbuf->khp_data_userdata + (temp_data_mtu * frag_idx); /* * set the len */ if (frag_len > temp_data_mtu) { iov_out[frag_idx][1].iov_len = temp_data_mtu; } else { iov_out[frag_idx][1].iov_len = frag_len; } /* * copy the frag info on all buffers */ knet_h->send_to_links_buf[frag_idx]->kh_type = inbuf->kh_type; knet_h->send_to_links_buf[frag_idx]->khp_data_seq_num = inbuf->khp_data_seq_num; knet_h->send_to_links_buf[frag_idx]->khp_data_frag_num = inbuf->khp_data_frag_num; knet_h->send_to_links_buf[frag_idx]->khp_data_bcast = inbuf->khp_data_bcast; knet_h->send_to_links_buf[frag_idx]->khp_data_channel = inbuf->khp_data_channel; knet_h->send_to_links_buf[frag_idx]->khp_data_compress = inbuf->khp_data_compress; frag_len = frag_len - temp_data_mtu; frag_idx++; } iovcnt_out = 2; } else { iov_out[frag_idx][0].iov_base = (void *)inbuf; iov_out[frag_idx][0].iov_len = frag_len + KNET_HEADER_DATA_SIZE; iovcnt_out = 1; } if (knet_h->crypto_in_use_config) { struct timespec start_time; struct timespec end_time; uint64_t crypt_time; frag_idx = 0; while (frag_idx < inbuf->khp_data_frag_num) { clock_gettime(CLOCK_MONOTONIC, &start_time); if (crypto_encrypt_and_signv( knet_h, iov_out[frag_idx], iovcnt_out, knet_h->send_to_links_buf_crypt[frag_idx], (ssize_t *)&outlen) < 0) { log_debug(knet_h, KNET_SUB_TX, "Unable to encrypt packet"); savederrno = ECHILD; err = -1; goto out_unlock; } clock_gettime(CLOCK_MONOTONIC, &end_time); timespec_diff(start_time, end_time, &crypt_time); stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); if (stats_err < 0) { log_err(knet_h, KNET_SUB_TX, "Unable to get mutex lock: %s", strerror(stats_err)); err = -1; savederrno = stats_err; goto out_unlock; } if (crypt_time < knet_h->stats.tx_crypt_time_min) { knet_h->stats.tx_crypt_time_min = crypt_time; } if (crypt_time > knet_h->stats.tx_crypt_time_max) { knet_h->stats.tx_crypt_time_max = crypt_time; } knet_h->stats.tx_crypt_time_ave = (knet_h->stats.tx_crypt_time_ave * knet_h->stats.tx_crypt_packets + crypt_time) / (knet_h->stats.tx_crypt_packets+1); uncrypted_frag_size = 0; for (j=0; j < iovcnt_out; j++) { uncrypted_frag_size += iov_out[frag_idx][j].iov_len; } knet_h->stats.tx_crypt_byte_overhead += (outlen - uncrypted_frag_size); knet_h->stats.tx_crypt_packets++; pthread_mutex_unlock(&knet_h->handle_stats_mutex); iov_out[frag_idx][0].iov_base = knet_h->send_to_links_buf_crypt[frag_idx]; iov_out[frag_idx][0].iov_len = outlen; frag_idx++; } iovcnt_out = 1; } memset(&msg, 0, sizeof(msg)); msgs_to_send = inbuf->khp_data_frag_num; msg_idx = 0; while (msg_idx < msgs_to_send) { msg[msg_idx].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); /* this will set properly in _dispatch_to_links() */ msg[msg_idx].msg_hdr.msg_iov = &iov_out[msg_idx][0]; msg[msg_idx].msg_hdr.msg_iovlen = iovcnt_out; msg_idx++; } if (!bcast) { for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) { dst_host = knet_h->host_index[dst_host_ids[host_idx]]; err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send); savederrno = errno; if (err) { goto out_unlock; } } } else { for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { if (dst_host->status.reachable) { err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send); savederrno = errno; if (err) { goto out_unlock; } } } } out_unlock: errno = savederrno; return err; } static void _handle_send_to_links(knet_handle_t knet_h, struct msghdr *msg, int sockfd, int8_t channel, int type) { ssize_t inlen = 0; int savederrno = 0, docallback = 0; /* * make sure BSD gets the right size */ msg->msg_namelen = knet_h->knet_transport_fd_tracker[sockfd].sockaddr_len; + // coverity[MISSING_LOCK:SUPPRESS] - already locked in calling function if ((channel >= 0) && (channel < KNET_DATAFD_MAX) && (!knet_h->sockfd[channel].is_socket)) { inlen = readv(sockfd, msg->msg_iov, 1); } else { inlen = recvmsg(sockfd, msg, MSG_DONTWAIT | MSG_NOSIGNAL); if (msg->msg_flags & MSG_TRUNC) { log_warn(knet_h, KNET_SUB_TX, "Received truncated message from sock %d. Discarding", sockfd); return; } } if (inlen == 0) { savederrno = 0; docallback = 1; } else if (inlen < 0) { struct epoll_event ev; savederrno = errno; docallback = 1; memset(&ev, 0, sizeof(struct epoll_event)); if (epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) { log_err(knet_h, KNET_SUB_TX, "Unable to del datafd %d from linkfd epoll pool: %s", knet_h->sockfd[channel].sockfd[0], strerror(savederrno)); } else { knet_h->sockfd[channel].has_error = 1; } } else { knet_h->recv_from_sock_buf->kh_type = type; _parse_recv_from_sock(knet_h, inlen, channel, 0); } if (docallback) { knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data, knet_h->sockfd[channel].sockfd[0], channel, KNET_NOTIFY_TX, inlen, savederrno); } } void *_handle_send_to_links_thread(void *data) { knet_handle_t knet_h = (knet_handle_t) data; struct epoll_event events[KNET_EPOLL_MAX_EVENTS + 1]; /* see _init_epolls for + 1 */ int i, nev, type; int flush, flush_queue_limit; int8_t channel; struct iovec iov_in; struct msghdr msg; struct sockaddr_storage address; set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_STARTED); memset(&events, 0, sizeof(events)); memset(&iov_in, 0, sizeof(iov_in)); iov_in.iov_base = (void *)knet_h->recv_from_sock_buf->khp_data_userdata; iov_in.iov_len = KNET_MAX_PACKET_SIZE; memset(&msg, 0, sizeof(struct msghdr)); msg.msg_name = &address; msg.msg_namelen = sizeof(struct sockaddr_storage); msg.msg_iov = &iov_in; msg.msg_iovlen = 1; knet_h->recv_from_sock_buf->kh_version = KNET_HEADER_VERSION; knet_h->recv_from_sock_buf->khp_data_frag_seq = 0; knet_h->recv_from_sock_buf->kh_node = htons(knet_h->host_id); for (i = 0; i < (int)PCKT_FRAG_MAX; i++) { knet_h->send_to_links_buf[i]->kh_version = KNET_HEADER_VERSION; knet_h->send_to_links_buf[i]->khp_data_frag_seq = i + 1; knet_h->send_to_links_buf[i]->kh_node = htons(knet_h->host_id); } flush_queue_limit = 0; while (!shutdown_in_progress(knet_h)) { nev = epoll_wait(knet_h->send_to_links_epollfd, events, KNET_EPOLL_MAX_EVENTS + 1, KNET_THREADS_TIMERES / 1000); flush = get_thread_flush_queue(knet_h, KNET_THREAD_TX); /* * we use timeout to detect if thread is shutting down */ if (nev == 0) { /* * ideally we want to communicate that we are done flushing * the queue when we have an epoll timeout event */ if (flush == KNET_THREAD_QUEUE_FLUSH) { set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSHED); flush_queue_limit = 0; } continue; } /* * fall back in case the TX sockets will continue receive traffic * and we do not hit an epoll timeout. * * allow up to a 100 loops to flush queues, then we give up. * there might be more clean ways to do it by checking the buffer queue * on each socket, but we have tons of sockets and calculations can go wrong. * Also, why would you disable data forwarding and still send packets? */ if (flush == KNET_THREAD_QUEUE_FLUSH) { if (flush_queue_limit >= 100) { log_debug(knet_h, KNET_SUB_TX, "Timeout flushing the TX queue, expect packet loss"); set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSHED); flush_queue_limit = 0; } else { flush_queue_limit++; } } else { flush_queue_limit = 0; } if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) { log_debug(knet_h, KNET_SUB_TX, "Unable to get read lock"); continue; } for (i = 0; i < nev; i++) { type = KNET_HEADER_TYPE_DATA; for (channel = 0; channel < KNET_DATAFD_MAX; channel++) { if ((knet_h->sockfd[channel].in_use) && (knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created] == events[i].data.fd)) { break; } } if (channel >= KNET_DATAFD_MAX) { log_debug(knet_h, KNET_SUB_TX, "No available channels"); continue; /* channel not found */ } if (pthread_mutex_lock(&knet_h->tx_mutex) != 0) { log_debug(knet_h, KNET_SUB_TX, "Unable to get mutex lock"); continue; } _handle_send_to_links(knet_h, &msg, events[i].data.fd, channel, type); pthread_mutex_unlock(&knet_h->tx_mutex); } pthread_rwlock_unlock(&knet_h->global_rwlock); } set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_STOPPED); return NULL; } int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel) { int savederrno = 0, err = 0; if (!_is_valid_handle(knet_h)) { return -1; } if (buff == NULL) { errno = EINVAL; return -1; } if (buff_len <= 0) { errno = EINVAL; return -1; } if (buff_len > KNET_MAX_PACKET_SIZE) { errno = EINVAL; return -1; } if (channel < 0) { errno = EINVAL; return -1; } if (channel >= KNET_DATAFD_MAX) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_TX, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->dst_host_filter_fn) { savederrno = ENETDOWN; err = -1; goto out; } if (!knet_h->sockfd[channel].in_use) { savederrno = EINVAL; err = -1; goto out; } savederrno = pthread_mutex_lock(&knet_h->tx_mutex); if (savederrno) { log_err(knet_h, KNET_SUB_TX, "Unable to get TX mutex lock: %s", strerror(savederrno)); err = -1; goto out; } knet_h->recv_from_sock_buf->kh_type = KNET_HEADER_TYPE_DATA; memmove(knet_h->recv_from_sock_buf->khp_data_userdata, buff, buff_len); err = _parse_recv_from_sock(knet_h, buff_len, channel, 1); savederrno = errno; pthread_mutex_unlock(&knet_h->tx_mutex); out: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } ssize_t knet_send(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel) { int savederrno = 0; ssize_t err = 0; struct iovec iov_out[1]; if (!_is_valid_handle(knet_h)) { return -1; } if (buff == NULL) { errno = EINVAL; return -1; } if (buff_len <= 0) { errno = EINVAL; return -1; } if (buff_len > KNET_MAX_PACKET_SIZE) { errno = EINVAL; return -1; } if (channel < 0) { errno = EINVAL; return -1; } if (channel >= KNET_DATAFD_MAX) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->sockfd[channel].in_use) { savederrno = EINVAL; err = -1; goto out_unlock; } memset(iov_out, 0, sizeof(iov_out)); iov_out[0].iov_base = (void *)buff; iov_out[0].iov_len = buff_len; err = writev(knet_h->sockfd[channel].sockfd[0], iov_out, 1); savederrno = errno; out_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; }