Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/libknet/host.c b/libknet/host.c
index 232e8e31..8a0c5e4b 100644
--- a/libknet/host.c
+++ b/libknet/host.c
@@ -1,794 +1,794 @@
/*
* Copyright (C) 2010-2021 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under LGPL-2.0+
*/
#include "config.h"
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include <stdio.h>
#include "host.h"
#include "internals.h"
#include "logging.h"
#include "threads_common.h"
static void _host_list_update(knet_handle_t knet_h)
{
struct knet_host *host;
knet_h->host_ids_entries = 0;
for (host = knet_h->host_head; host != NULL; host = host->next) {
knet_h->host_ids[knet_h->host_ids_entries] = host->host_id;
knet_h->host_ids_entries++;
}
}
int knet_host_add(knet_handle_t knet_h, knet_node_id_t host_id)
{
int savederrno = 0, err = 0;
struct knet_host *host = NULL;
uint8_t link_idx;
if (!_is_valid_handle(knet_h)) {
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (knet_h->host_index[host_id]) {
err = -1;
savederrno = EEXIST;
log_err(knet_h, KNET_SUB_HOST, "Unable to add host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
host = malloc(sizeof(struct knet_host));
if (!host) {
err = -1;
savederrno = errno;
log_err(knet_h, KNET_SUB_HOST, "Unable to allocate memory for host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
memset(host, 0, sizeof(struct knet_host));
/*
* set host_id
*/
host->host_id = host_id;
/*
* fill up our own data
*/
if (knet_h->host_id == host->host_id) {
host->onwire_ver = knet_h->onwire_ver;
host->onwire_max_ver = knet_h->onwire_max_ver;
}
/*
* set default host->name to host_id for logging
*/
snprintf(host->name, KNET_MAX_HOST_LEN, "%u", host_id);
/*
* initialize links internal data
*/
for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
host->link[link_idx].link_id = link_idx;
host->link[link_idx].status.stats.latency_min = UINT32_MAX;
}
/*
* add new host to the index
*/
knet_h->host_index[host_id] = host;
/*
* add new host to host list
*/
if (knet_h->host_head) {
host->next = knet_h->host_head;
}
knet_h->host_head = host;
_host_list_update(knet_h);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
if (err < 0) {
free(host);
}
errno = err ? savederrno : 0;
return err;
}
int knet_host_remove(knet_handle_t knet_h, knet_node_id_t host_id)
{
int savederrno = 0, err = 0;
struct knet_host *host, *removed;
uint8_t link_idx;
if (!_is_valid_handle(knet_h)) {
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_HOST, "Unable to remove host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
/*
* if links are configured we cannot release the host
*/
for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
if (host->link[link_idx].configured) {
err = -1;
savederrno = EBUSY;
log_err(knet_h, KNET_SUB_HOST, "Unable to remove host %u, links are still configured: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
}
removed = NULL;
/*
* removing host from list
*/
if (knet_h->host_head->host_id == host_id) {
removed = knet_h->host_head;
knet_h->host_head = removed->next;
} else {
for (host = knet_h->host_head; host->next != NULL; host = host->next) {
if (host->next->host_id == host_id) {
removed = host->next;
host->next = removed->next;
break;
}
}
}
knet_h->host_index[host_id] = NULL;
free(removed);
_host_list_update(knet_h);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_host_set_name(knet_handle_t knet_h, knet_node_id_t host_id, const char *name)
{
int savederrno = 0, err = 0;
struct knet_host *host;
if (!_is_valid_handle(knet_h)) {
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->host_index[host_id]) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_HOST, "Unable to find host %u to set name: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
if (!name) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_HOST, "Unable to set name for host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
if (strlen(name) >= KNET_MAX_HOST_LEN) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_HOST, "Requested name for host %u is too long: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
for (host = knet_h->host_head; host != NULL; host = host->next) {
if (!strncmp(host->name, name, KNET_MAX_HOST_LEN)) {
err = -1;
savederrno = EEXIST;
log_err(knet_h, KNET_SUB_HOST, "Duplicated name found on host_id %u",
host->host_id);
goto exit_unlock;
}
}
snprintf(knet_h->host_index[host_id]->name, KNET_MAX_HOST_LEN, "%s", name);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_host_get_name_by_host_id(knet_handle_t knet_h, knet_node_id_t host_id,
char *name)
{
int savederrno = 0, err = 0;
if (!_is_valid_handle(knet_h)) {
return -1;
}
if (!name) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->host_index[host_id]) {
savederrno = EINVAL;
err = -1;
log_debug(knet_h, KNET_SUB_HOST, "Host %u not found", host_id);
goto exit_unlock;
}
snprintf(name, KNET_MAX_HOST_LEN, "%s", knet_h->host_index[host_id]->name);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_host_get_id_by_host_name(knet_handle_t knet_h, const char *name,
knet_node_id_t *host_id)
{
int savederrno = 0, err = 0, found = 0;
struct knet_host *host;
if (!_is_valid_handle(knet_h)) {
return -1;
}
if (!name) {
errno = EINVAL;
return -1;
}
if (!host_id) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
for (host = knet_h->host_head; host != NULL; host = host->next) {
if (!strncmp(name, host->name, KNET_MAX_HOST_LEN)) {
found = 1;
*host_id = host->host_id;
break;
}
}
if (!found) {
savederrno = ENOENT;
err = -1;
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_host_get_host_list(knet_handle_t knet_h,
knet_node_id_t *host_ids, size_t *host_ids_entries)
{
int savederrno = 0;
if (!_is_valid_handle(knet_h)) {
return -1;
}
if ((!host_ids) || (!host_ids_entries)) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
memmove(host_ids, knet_h->host_ids, sizeof(knet_h->host_ids));
*host_ids_entries = knet_h->host_ids_entries;
pthread_rwlock_unlock(&knet_h->global_rwlock);
return 0;
}
int knet_host_set_policy(knet_handle_t knet_h, knet_node_id_t host_id,
uint8_t policy)
{
int savederrno = 0, err = 0;
uint8_t old_policy;
if (!_is_valid_handle(knet_h)) {
return -1;
}
if (policy > KNET_LINK_POLICY_RR) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->host_index[host_id]) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_HOST, "Unable to set name for host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
old_policy = knet_h->host_index[host_id]->link_handler_policy;
knet_h->host_index[host_id]->link_handler_policy = policy;
if (_host_dstcache_update_async(knet_h, knet_h->host_index[host_id])) {
savederrno = errno;
err = -1;
knet_h->host_index[host_id]->link_handler_policy = old_policy;
log_debug(knet_h, KNET_SUB_HOST, "Unable to update switch cache for host %u: %s",
host_id, strerror(savederrno));
}
log_debug(knet_h, KNET_SUB_HOST, "Host %u has new switching policy: %u", host_id, policy);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_host_get_policy(knet_handle_t knet_h, knet_node_id_t host_id,
uint8_t *policy)
{
int savederrno = 0, err = 0;
if (!_is_valid_handle(knet_h)) {
return -1;
}
if (!policy) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->host_index[host_id]) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_HOST, "Unable to get name for host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
*policy = knet_h->host_index[host_id]->link_handler_policy;
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_host_get_status(knet_handle_t knet_h, knet_node_id_t host_id,
struct knet_host_status *status)
{
int savederrno = 0, err = 0;
struct knet_host *host;
if (!_is_valid_handle(knet_h)) {
return -1;
}
if (!status) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_HOST, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
memmove(status, &host->status, sizeof(struct knet_host_status));
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_host_enable_status_change_notify(knet_handle_t knet_h,
void *host_status_change_notify_fn_private_data,
void (*host_status_change_notify_fn) (
void *private_data,
knet_node_id_t host_id,
uint8_t reachable,
uint8_t remote,
uint8_t external))
{
int savederrno = 0;
if (!_is_valid_handle(knet_h)) {
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->host_status_change_notify_fn_private_data = host_status_change_notify_fn_private_data;
knet_h->host_status_change_notify_fn = host_status_change_notify_fn;
if (knet_h->host_status_change_notify_fn) {
log_debug(knet_h, KNET_SUB_HOST, "host_status_change_notify_fn enabled");
} else {
log_debug(knet_h, KNET_SUB_HOST, "host_status_change_notify_fn disabled");
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = 0;
return 0;
}
static void _clear_cbuffers(struct knet_host *host, seq_num_t rx_seq_num)
{
int i;
memset(host->circular_buffer, 0, KNET_CBUFFER_SIZE);
host->rx_seq_num = rx_seq_num;
memset(host->circular_buffer_defrag, 0, KNET_CBUFFER_SIZE);
- for (i = 0; i < KNET_MAX_LINK; i++) {
+ for (i = 0; i < KNET_DEFRAG_BUFFERS; i++) {
memset(&host->defrag_buf[i], 0, sizeof(struct knet_host_defrag_buf));
}
}
-static void _reclaim_old_defrag_bufs(struct knet_host *host, seq_num_t seq_num)
+static void _reclaim_old_defrag_bufs(knet_handle_t knet_h, struct knet_host *host, seq_num_t seq_num)
{
seq_num_t head, tail; /* seq_num boundaries */
int i;
head = seq_num + 1;
- tail = seq_num - (KNET_MAX_LINK + 1);
+ tail = seq_num - (KNET_DEFRAG_BUFFERS + 1);
/*
* expire old defrag buffers
*/
- for (i = 0; i < KNET_MAX_LINK; i++) {
+ for (i = 0; i < KNET_DEFRAG_BUFFERS; i++) {
if (host->defrag_buf[i].in_use) {
/*
* head has done a rollover to 0+
*/
if (tail > head) {
if ((host->defrag_buf[i].pckt_seq >= head) && (host->defrag_buf[i].pckt_seq <= tail)) {
host->defrag_buf[i].in_use = 0;
}
} else {
if ((host->defrag_buf[i].pckt_seq >= head) || (host->defrag_buf[i].pckt_seq <= tail)){
host->defrag_buf[i].in_use = 0;
}
}
}
}
}
/*
* check if a given packet seq num is in the circular buffers
* defrag_buf = 0 -> use normal cbuf 1 -> use the defrag buffer lookup
*/
-int _seq_num_lookup(struct knet_host *host, seq_num_t seq_num, int defrag_buf, int clear_buf)
+int _seq_num_lookup(knet_handle_t knet_h, struct knet_host *host, seq_num_t seq_num, int defrag_buf, int clear_buf)
{
size_t head, tail; /* circular buffer indexes */
seq_num_t seq_dist;
char *dst_cbuf = host->circular_buffer;
char *dst_cbuf_defrag = host->circular_buffer_defrag;
seq_num_t *dst_seq_num = &host->rx_seq_num;
if (clear_buf) {
_clear_cbuffers(host, seq_num);
}
- _reclaim_old_defrag_bufs(host, seq_num);
+ _reclaim_old_defrag_bufs(knet_h, host, *dst_seq_num);
if (seq_num < *dst_seq_num) {
seq_dist = (SEQ_MAX - seq_num) + *dst_seq_num;
} else {
seq_dist = *dst_seq_num - seq_num;
}
head = seq_num % KNET_CBUFFER_SIZE;
if (seq_dist < KNET_CBUFFER_SIZE) { /* seq num is in ring buffer */
if (!defrag_buf) {
return (dst_cbuf[head] == 0) ? 1 : 0;
} else {
return (dst_cbuf_defrag[head] == 0) ? 1 : 0;
}
} else if (seq_dist <= SEQ_MAX - KNET_CBUFFER_SIZE) {
memset(dst_cbuf, 0, KNET_CBUFFER_SIZE);
memset(dst_cbuf_defrag, 0, KNET_CBUFFER_SIZE);
*dst_seq_num = seq_num;
}
/* cleaning up circular buffer */
tail = (*dst_seq_num + 1) % KNET_CBUFFER_SIZE;
if (tail > head) {
memset(dst_cbuf + tail, 0, KNET_CBUFFER_SIZE - tail);
memset(dst_cbuf, 0, head + 1);
memset(dst_cbuf_defrag + tail, 0, KNET_CBUFFER_SIZE - tail);
memset(dst_cbuf_defrag, 0, head + 1);
} else {
memset(dst_cbuf + tail, 0, head - tail + 1);
memset(dst_cbuf_defrag + tail, 0, head - tail + 1);
}
*dst_seq_num = seq_num;
return 1;
}
void _seq_num_set(struct knet_host *host, seq_num_t seq_num, int defrag_buf)
{
if (!defrag_buf) {
host->circular_buffer[seq_num % KNET_CBUFFER_SIZE] = 1;
} else {
host->circular_buffer_defrag[seq_num % KNET_CBUFFER_SIZE] = 1;
}
return;
}
int _host_dstcache_update_async(knet_handle_t knet_h, struct knet_host *host)
{
int savederrno = 0;
knet_node_id_t host_id = host->host_id;
if (sendto(knet_h->dstsockfd[1], &host_id, sizeof(host_id), MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0) != sizeof(host_id)) {
savederrno = errno;
log_debug(knet_h, KNET_SUB_HOST, "Unable to write to dstpipefd[1]: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
return 0;
}
int _host_dstcache_update_sync(knet_handle_t knet_h, struct knet_host *host)
{
int link_idx;
int best_priority = -1;
int reachable = 0;
if (knet_h->host_id == host->host_id && knet_h->has_loop_link) {
host->active_link_entries = 1;
return 0;
}
host->active_link_entries = 0;
for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
if (host->link[link_idx].status.enabled != 1) /* link is not enabled */
continue;
if (host->link[link_idx].status.connected != 1) /* link is not enabled */
continue;
if (host->link[link_idx].has_valid_mtu != 1) /* link does not have valid MTU */
continue;
if (host->link_handler_policy == KNET_LINK_POLICY_PASSIVE) {
/* for passive we look for the only active link with higher priority */
if (host->link[link_idx].priority > best_priority) {
host->active_links[0] = link_idx;
best_priority = host->link[link_idx].priority;
}
host->active_link_entries = 1;
} else {
/* for RR and ACTIVE we need to copy all available links */
host->active_links[host->active_link_entries] = link_idx;
host->active_link_entries++;
}
}
if (host->link_handler_policy == KNET_LINK_POLICY_PASSIVE) {
log_info(knet_h, KNET_SUB_HOST, "host: %u (passive) best link: %u (pri: %u)",
host->host_id, host->link[host->active_links[0]].link_id,
host->link[host->active_links[0]].priority);
} else {
log_info(knet_h, KNET_SUB_HOST, "host: %u has %u active links",
host->host_id, host->active_link_entries);
}
/* no active links, we can clean the circular buffers and indexes */
if (!host->active_link_entries) {
log_warn(knet_h, KNET_SUB_HOST, "host: %u has no active links", host->host_id);
_clear_cbuffers(host, 0);
} else {
reachable = 1;
}
if (host->status.reachable != reachable) {
host->status.reachable = reachable;
if (knet_h->host_status_change_notify_fn) {
knet_h->host_status_change_notify_fn(
knet_h->host_status_change_notify_fn_private_data,
host->host_id,
host->status.reachable,
host->status.remote,
host->status.external);
}
}
return 0;
}
void _handle_onwire_version(knet_handle_t knet_h, struct knet_host *host, struct knet_header *inbuf)
{
struct knet_host *tmp_host = NULL;
uint8_t onwire_ver = knet_h->onwire_max_ver;
int docallback = 0;
/*
* data we process here are onwire independent
* we are in a global read only lock context, so it´s safe to parse host lists
* and we can change onwire_ver using the dedicated mutex
*/
/*
* update current host onwire info
*/
host->onwire_ver = inbuf->kh_version;
host->onwire_max_ver = inbuf->kh_max_ver;
for (tmp_host = knet_h->host_head; tmp_host != NULL; tmp_host = tmp_host->next) {
/*
* do not attempt to change protocol till
* we see all nodes at least once.
*/
if (!tmp_host->onwire_max_ver) {
return;
}
/*
* ignore nodes were max ver is lower than our min ver
* logged as error by thread_rx, we need to make sure to skip it
* during onwire_ver calculation.
*/
if (tmp_host->onwire_max_ver < knet_h->onwire_min_ver) {
continue;
}
/*
* use the highest max_ver common to all known nodes
*/
if (tmp_host->onwire_max_ver < onwire_ver) {
onwire_ver = tmp_host->onwire_max_ver;
}
}
if (pthread_mutex_lock(&knet_h->onwire_mutex)) {
log_debug(knet_h, KNET_SUB_HOST, "Unable to get onwire mutex lock");
return;
}
if (knet_h->onwire_force_ver) {
onwire_ver = knet_h->onwire_force_ver;
}
if (knet_h->onwire_ver != onwire_ver) {
log_debug(knet_h, KNET_SUB_HOST, "node %u updating onwire version to %u", knet_h->host_id, onwire_ver);
knet_h->onwire_ver = onwire_ver;
docallback = 1;
}
pthread_mutex_unlock(&knet_h->onwire_mutex);
/*
* do the callback outside of locked context and use cached value
* to avoid blocking on locking
*/
if ((docallback) &&
(knet_h->onwire_ver_notify_fn)) {
knet_h->onwire_ver_notify_fn(knet_h->onwire_ver_notify_fn_private_data,
knet_h->onwire_min_ver,
knet_h->onwire_max_ver,
onwire_ver);
}
}
diff --git a/libknet/host.h b/libknet/host.h
index baa56592..ccd5514d 100644
--- a/libknet/host.h
+++ b/libknet/host.h
@@ -1,23 +1,23 @@
/*
* Copyright (C) 2012-2021 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under LGPL-2.0+
*/
#ifndef __KNET_HOST_H__
#define __KNET_HOST_H__
#include "internals.h"
-int _seq_num_lookup(struct knet_host *host, seq_num_t seq_num, int defrag_buf, int clear_buf);
+int _seq_num_lookup(knet_handle_t knet_h, struct knet_host *host, seq_num_t seq_num, int defrag_buf, int clear_buf);
void _seq_num_set(struct knet_host *host, seq_num_t seq_num, int defrag_buf);
int _host_dstcache_update_async(knet_handle_t knet_h, struct knet_host *host);
int _host_dstcache_update_sync(knet_handle_t knet_h, struct knet_host *host);
void _handle_onwire_version(knet_handle_t knet_h, struct knet_host *host, struct knet_header *inbuf);
#endif
diff --git a/libknet/internals.h b/libknet/internals.h
index fbb4e8f9..065d732a 100644
--- a/libknet/internals.h
+++ b/libknet/internals.h
@@ -1,462 +1,463 @@
/*
* Copyright (C) 2010-2021 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under LGPL-2.0+
*/
#ifndef __KNET_INTERNALS_H__
#define __KNET_INTERNALS_H__
/*
* NOTE: you shouldn't need to include this header normally
*/
#include <pthread.h>
#include <stddef.h>
#include <qb/qblist.h>
#include "libknet.h"
#include "onwire.h"
#include "compat.h"
#include "threads_common.h"
#define KNET_DATABUFSIZE KNET_MAX_PACKET_SIZE + KNET_HEADER_ALL_SIZE
#define KNET_DATABUFSIZE_CRYPT_PAD 1024
#define KNET_DATABUFSIZE_CRYPT KNET_DATABUFSIZE + KNET_DATABUFSIZE_CRYPT_PAD
#define KNET_DATABUFSIZE_COMPRESS_PAD 1024
#define KNET_DATABUFSIZE_COMPRESS KNET_DATABUFSIZE + KNET_DATABUFSIZE_COMPRESS_PAD
#define KNET_RING_RCVBUFF 8388608
#define PCKT_FRAG_MAX UINT8_MAX
#define PCKT_RX_BUFS 512
#define KNET_EPOLL_MAX_EVENTS KNET_DATAFD_MAX + 1
/*
* Size of threads stack. Value is choosen by experimenting, how much is needed
* to sucesfully finish test suite, and at the time of writing patch it was
* ~300KiB. To have some room for future enhancement it is increased
* by factor of 3 and rounded.
*/
#define KNET_THREAD_STACK_SIZE (1024 * 1024)
typedef void *knet_transport_link_t; /* per link transport handle */
typedef void *knet_transport_t; /* per knet_h transport handle */
struct knet_transport_ops; /* Forward because of circular dependancy */
struct knet_mmsghdr {
struct msghdr msg_hdr; /* Message header */
unsigned int msg_len; /* Number of bytes transmitted */
};
struct knet_link {
/* required */
struct sockaddr_storage src_addr;
struct sockaddr_storage dst_addr;
/* configurable */
unsigned int dynamic; /* see KNET_LINK_DYN_ define above */
uint8_t priority; /* higher priority == preferred for A/P */
unsigned long long ping_interval; /* interval */
unsigned long long pong_timeout; /* timeout */
unsigned long long pong_timeout_adj; /* timeout adjusted for latency */
uint8_t pong_timeout_backoff; /* see link.h for definition */
unsigned int latency_max_samples; /* precision */
uint8_t pong_count; /* how many ping/pong to send/receive before link is up */
uint64_t flags;
/* status */
struct knet_link_status status;
/* internals */
pthread_mutex_t link_stats_mutex; /* used to update link stats */
uint8_t link_id;
uint8_t transport; /* #defined constant from API */
knet_transport_link_t transport_link; /* link_info_t from transport */
int outsock;
unsigned int configured:1; /* set to 1 if src/dst have been configured transport initialized on this link*/
unsigned int transport_connected:1; /* set to 1 if lower level transport is connected */
uint8_t received_pong;
struct timespec ping_last;
/* used by PMTUD thread as temp per-link variables and should always contain the onwire_len value! */
uint32_t proto_overhead; /* IP + UDP/SCTP overhead. NOT to be confused
with stats.proto_overhead that includes also knet headers
and crypto headers */
struct timespec pmtud_last;
uint32_t last_ping_size;
uint32_t last_good_mtu;
uint32_t last_bad_mtu;
uint32_t last_sent_mtu;
uint32_t last_recv_mtu;
uint32_t pmtud_crypto_timeout_multiplier;/* used by PMTUd to adjust timeouts on high loads */
uint8_t has_valid_mtu;
};
+#define KNET_DEFRAG_BUFFERS 32
#define KNET_CBUFFER_SIZE 4096
struct knet_host_defrag_buf {
char buf[KNET_DATABUFSIZE];
uint8_t in_use; /* 0 buffer is free, 1 is in use */
seq_num_t pckt_seq; /* identify the pckt we are receiving */
uint8_t frag_recv; /* how many frags did we receive */
uint8_t frag_map[PCKT_FRAG_MAX];/* bitmap of what we received? */
uint8_t last_first; /* special case if we receive the last fragment first */
ssize_t frag_size; /* normal frag size (not the last one) */
ssize_t last_frag_size; /* the last fragment might not be aligned with MTU size */
struct timespec last_update; /* keep time of the last pckt */
};
struct knet_host {
/* required */
knet_node_id_t host_id;
/* configurable */
uint8_t link_handler_policy;
char name[KNET_MAX_HOST_LEN];
/* status */
struct knet_host_status status;
/*
* onwire info
*/
uint8_t onwire_ver; /* node current onwire version */
uint8_t onwire_max_ver; /* node supports up to this version */
/* internals */
char circular_buffer[KNET_CBUFFER_SIZE];
seq_num_t rx_seq_num;
seq_num_t untimed_rx_seq_num;
seq_num_t timed_rx_seq_num;
uint8_t got_data;
/* defrag/reassembly buffers */
- struct knet_host_defrag_buf defrag_buf[KNET_MAX_LINK];
+ struct knet_host_defrag_buf defrag_buf[KNET_DEFRAG_BUFFERS];
char circular_buffer_defrag[KNET_CBUFFER_SIZE];
/* link stuff */
struct knet_link link[KNET_MAX_LINK];
uint8_t active_link_entries;
uint8_t active_links[KNET_MAX_LINK];
struct knet_host *next;
};
struct knet_sock {
int sockfd[2]; /* sockfd[0] will always be application facing
* and sockfd[1] internal if sockpair has been created by knet */
int is_socket; /* check if it's a socket for recvmmsg usage */
int is_created; /* knet created this socket and has to clean up on exit/del */
int in_use; /* set to 1 if it's use, 0 if free */
int has_error; /* set to 1 if there were errors reading from the sock
* and socket has been removed from epoll */
};
struct knet_fd_trackers {
uint8_t transport; /* transport type (UDP/SCTP...) */
uint8_t data_type; /* internal use for transport to define what data are associated
* with this fd */
socklen_t sockaddr_len; /* Size of sockaddr_in[6] structure for this socket */
void *data; /* pointer to the data */
void *access_list_match_entry_head; /* pointer to access list match_entry list head */
};
#define KNET_MAX_FDS KNET_MAX_HOST * KNET_MAX_LINK * 4
#define KNET_MAX_COMPRESS_METHODS UINT8_MAX
#define KNET_MAX_CRYPTO_INSTANCES 2
struct knet_handle_stats_extra {
uint64_t tx_crypt_pmtu_packets;
uint64_t tx_crypt_pmtu_reply_packets;
uint64_t tx_crypt_ping_packets;
uint64_t tx_crypt_pong_packets;
};
struct knet_handle {
knet_node_id_t host_id;
unsigned int enabled:1;
struct knet_sock sockfd[KNET_DATAFD_MAX + 1];
int logfd;
uint8_t log_levels[KNET_MAX_SUBSYSTEMS];
int dstsockfd[2];
int send_to_links_epollfd;
int recv_from_links_epollfd;
int dst_link_handler_epollfd;
uint8_t use_access_lists; /* set to 0 for disable, 1 for enable */
unsigned int pmtud_interval;
unsigned int manual_mtu;
unsigned int data_mtu; /* contains the max data size that we can send onwire
* without frags */
struct knet_host *host_head;
struct knet_host *host_index[KNET_MAX_HOST];
knet_transport_t transports[KNET_MAX_TRANSPORTS+1];
struct knet_fd_trackers knet_transport_fd_tracker[KNET_MAX_FDS]; /* track status for each fd handled by transports */
struct knet_handle_stats stats;
struct knet_handle_stats_extra stats_extra;
pthread_mutex_t handle_stats_mutex; /* used to protect handle stats */
uint32_t reconnect_int;
knet_node_id_t host_ids[KNET_MAX_HOST];
size_t host_ids_entries;
struct knet_header *recv_from_sock_buf;
struct knet_header *send_to_links_buf[PCKT_FRAG_MAX];
struct knet_header *recv_from_links_buf[PCKT_RX_BUFS];
struct knet_header *pingbuf;
struct knet_header *pmtudbuf;
uint8_t threads_status[KNET_THREAD_MAX];
uint8_t threads_flush_queue[KNET_THREAD_MAX];
useconds_t threads_timer_res;
pthread_mutex_t threads_status_mutex;
pthread_t send_to_links_thread;
pthread_t recv_from_links_thread;
pthread_t heartbt_thread;
pthread_t dst_link_handler_thread;
pthread_t pmtud_link_handler_thread;
pthread_rwlock_t global_rwlock; /* global config lock */
pthread_mutex_t pmtud_mutex; /* pmtud mutex to handle conditional send/recv + timeout */
pthread_cond_t pmtud_cond; /* conditional for above */
pthread_mutex_t tx_mutex; /* used to protect knet_send_sync and TX thread */
pthread_mutex_t hb_mutex; /* used to protect heartbeat thread and seq_num broadcasting */
pthread_mutex_t backoff_mutex; /* used to protect dst_link->pong_timeout_adj */
pthread_mutex_t kmtu_mutex; /* used to protect kernel_mtu */
pthread_mutex_t onwire_mutex; /* used to protect onwire version */
uint8_t onwire_ver; /* currently agreed onwire version across known nodes */
uint8_t onwire_min_ver; /* min and max are constant and don´t need any mutex protection. */
uint8_t onwire_max_ver; /* we define them as part of internal handle so that we can mingle with them for testing purposes */
uint8_t onwire_force_ver; /* manually configure onwire_ver */
uint8_t onwire_ver_remap; /* when this is on, all mapping will use version 1 for now */
uint32_t kernel_mtu; /* contains the MTU detected by the kernel on a given link */
int pmtud_waiting;
int pmtud_running;
int pmtud_forcerun;
int pmtud_abort;
struct crypto_instance *crypto_instance[KNET_MAX_CRYPTO_INSTANCES + 1]; /* store an extra pointer to allow 0|1|2 values without too much magic in the code */
uint8_t crypto_in_use_config; /* crypto config to use for TX */
uint8_t crypto_only; /* allow only crypto (1) or also clear (0) traffic */
size_t sec_block_size;
size_t sec_hash_size;
size_t sec_salt_size;
unsigned char *send_to_links_buf_crypt[PCKT_FRAG_MAX];
unsigned char *recv_from_links_buf_crypt;
unsigned char *recv_from_links_buf_decrypt;
unsigned char *pingbuf_crypt;
unsigned char *pmtudbuf_crypt;
int compress_model;
int compress_level;
size_t compress_threshold;
void *compress_int_data[KNET_MAX_COMPRESS_METHODS]; /* for compress method private data */
unsigned char *recv_from_links_buf_decompress;
unsigned char *send_to_links_buf_compress;
seq_num_t tx_seq_num;
pthread_mutex_t tx_seq_num_mutex;
uint8_t has_loop_link;
uint8_t loop_link;
void *dst_host_filter_fn_private_data;
int (*dst_host_filter_fn) (
void *private_data,
const unsigned char *outdata,
ssize_t outdata_len,
uint8_t tx_rx,
knet_node_id_t this_host_id,
knet_node_id_t src_node_id,
int8_t *channel,
knet_node_id_t *dst_host_ids,
size_t *dst_host_ids_entries);
void *pmtud_notify_fn_private_data;
void (*pmtud_notify_fn) (
void *private_data,
unsigned int data_mtu);
void *host_status_change_notify_fn_private_data;
void (*host_status_change_notify_fn) (
void *private_data,
knet_node_id_t host_id,
uint8_t reachable,
uint8_t remote,
uint8_t external);
void *link_status_change_notify_fn_private_data;
void (*link_status_change_notify_fn) (
void *private_data,
knet_node_id_t host_id,
uint8_t link_id,
uint8_t connected,
uint8_t remote,
uint8_t external);
void *sock_notify_fn_private_data;
void (*sock_notify_fn) (
void *private_data,
int datafd,
int8_t channel,
uint8_t tx_rx,
int error,
int errorno);
void *onwire_ver_notify_fn_private_data;
void (*onwire_ver_notify_fn) (
void *private_data,
uint8_t onwire_min_ver,
uint8_t onwire_max_ver,
uint8_t onwire_ver);
int fini_in_progress;
uint64_t flags;
struct qb_list_head list;
const char *plugin_path;
};
struct handle_tracker {
struct qb_list_head head;
};
/*
* lib_config stuff shared across everything
*/
extern pthread_rwlock_t shlib_rwlock; /* global shared lib load lock */
extern pthread_mutex_t handle_config_mutex;
extern struct handle_tracker handle_list;
extern uint8_t handle_list_init;
int _is_valid_handle(knet_handle_t knet_h);
int _init_shlib_tracker(knet_handle_t knet_h);
void _fini_shlib_tracker(void);
/*
* NOTE: every single operation must be implementend
* for every protocol.
*/
/*
* for now knet supports only IP protocols (udp/sctp)
* in future there might be others like ARP
* or TIPC.
* keep this around as transport information
* to use for access lists and other operations
*/
#define TRANSPORT_PROTO_LOOPBACK 0
#define TRANSPORT_PROTO_IP_PROTO 1
/*
* some transports like SCTP can filter incoming
* connections before knet has to process
* any packets.
* GENERIC_ACL -> packet has to be read and filterted
* PROTO_ACL -> transport provides filtering at lower levels
* and packet does not need to be processed
*/
typedef enum {
USE_NO_ACL,
USE_GENERIC_ACL,
USE_PROTO_ACL
} transport_acl;
/*
* make it easier to map values in transports.c
*/
#define TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED 0
#define TRANSPORT_PROTO_IS_CONNECTION_ORIENTED 1
typedef struct knet_transport_ops {
/*
* transport generic information
*/
const char *transport_name;
const uint8_t transport_id;
const uint8_t built_in;
uint8_t transport_protocol;
transport_acl transport_acl_type;
/*
* connection oriented protocols like SCTP
* don´t need dst_addr in sendto calls and
* on some OSes are considered EINVAL.
*/
uint8_t transport_is_connection_oriented;
uint32_t transport_mtu_overhead;
/*
* transport init must allocate the new transport
* and perform all internal initializations
* (threads, lists, etc).
*/
int (*transport_init)(knet_handle_t knet_h);
/*
* transport free must releases _all_ resources
* allocated by tranport_init
*/
int (*transport_free)(knet_handle_t knet_h);
/*
* link operations should take care of all the
* sockets and epoll management for a given link/transport set
* transport_link_disable should return err = -1 and errno = EBUSY
* if listener is still in use, and any other errno in case
* the link cannot be disabled.
*
* set_config/clear_config are invoked in global write lock context
*/
int (*transport_link_set_config)(knet_handle_t knet_h, struct knet_link *link);
int (*transport_link_clear_config)(knet_handle_t knet_h, struct knet_link *link);
/*
* transport callback for incoming dynamic connections
* this is called in global read lock context
*/
int (*transport_link_dyn_connect)(knet_handle_t knet_h, int sockfd, struct knet_link *link);
/*
* return the fd to use for access lists
*/
int (*transport_link_get_acl_fd)(knet_handle_t knet_h, struct knet_link *link);
/*
* per transport error handling of recvmmsg
* (see _handle_recv_from_links comments for details)
*/
/*
* transport_rx_sock_error is invoked when recvmmsg returns <= 0
*
* transport_rx_sock_error is invoked with both global_rdlock
*/
int (*transport_rx_sock_error)(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno);
/*
* transport_tx_sock_error is invoked with global_rwlock and
* it's invoked when sendto or sendmmsg returns =< 0
*
* it should return:
* -1 on internal error
* 0 ignore error and continue
* 1 retry
* any sleep or wait action should happen inside the transport code
*/
int (*transport_tx_sock_error)(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno);
/*
* this function is called on _every_ received packet
* to verify if the packet is data or internal protocol error handling
*
* it should return:
* -1 on error
* 0 packet is not data and we should continue the packet process loop
* 1 packet is not data and we should STOP the packet process loop
* 2 packet is data and should be parsed as such
*
* transport_rx_is_data is invoked with both global_rwlock
* and fd_tracker read lock (from RX thread)
*/
int (*transport_rx_is_data)(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg);
/*
* this function is called by links.c when a link down event is recorded
* to notify the transport that packets are not going through, and give
* transport the opportunity to take actions.
*/
int (*transport_link_is_down)(knet_handle_t knet_h, struct knet_link *link);
} knet_transport_ops_t;
struct pretty_names {
const char *name;
uint8_t val;
};
#endif
diff --git a/libknet/onwire_v1.c b/libknet/onwire_v1.c
index 071dbeef..d820286e 100644
--- a/libknet/onwire_v1.c
+++ b/libknet/onwire_v1.c
@@ -1,216 +1,216 @@
/*
* Copyright (C) 2020-2021 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
*
* This software licensed under LGPL-2.0+
*/
#include "config.h"
#include <math.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <pthread.h>
#include <time.h>
#include "logging.h"
#include "host.h"
#include "links.h"
#include "onwire_v1.h"
int prep_ping_v1(knet_handle_t knet_h, struct knet_link *dst_link, uint8_t onwire_ver, struct timespec clock_now, int timed, ssize_t *outlen)
{
*outlen = KNET_HEADER_PING_V1_SIZE;
/* preparing ping buffer */
knet_h->pingbuf->kh_version = onwire_ver;
knet_h->pingbuf->kh_max_ver = knet_h->onwire_max_ver;
knet_h->pingbuf->kh_type = KNET_HEADER_TYPE_PING;
knet_h->pingbuf->kh_node = htons(knet_h->host_id);
knet_h->pingbuf->khp_ping_v1_link = dst_link->link_id;
knet_h->pingbuf->khp_ping_v1_timed = timed;
memmove(&knet_h->pingbuf->khp_ping_v1_time[0], &clock_now, sizeof(struct timespec));
if (pthread_mutex_lock(&knet_h->tx_seq_num_mutex)) {
log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to get seq mutex lock");
return -1;
}
knet_h->pingbuf->khp_ping_v1_seq_num = htons(knet_h->tx_seq_num);
pthread_mutex_unlock(&knet_h->tx_seq_num_mutex);
return 0;
}
void prep_pong_v1(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *outlen)
{
*outlen = KNET_HEADER_PING_V1_SIZE;
inbuf->kh_type = KNET_HEADER_TYPE_PONG;
inbuf->kh_node = htons(knet_h->host_id);
}
void process_ping_v1(knet_handle_t knet_h, struct knet_host *src_host, struct knet_link *src_link, struct knet_header *inbuf, ssize_t len)
{
int wipe_bufs = 0;
seq_num_t recv_seq_num = ntohs(inbuf->khp_ping_v1_seq_num);
if (!inbuf->khp_ping_v1_timed) {
/*
* we might be receiving this message from all links, but we want
* to process it only the first time
*/
if (recv_seq_num != src_host->untimed_rx_seq_num) {
/*
* cache the untimed seq num
*/
src_host->untimed_rx_seq_num = recv_seq_num;
/*
* if the host has received data in between
* untimed ping, then we don't need to wipe the bufs
*/
if (src_host->got_data) {
src_host->got_data = 0;
wipe_bufs = 0;
} else {
wipe_bufs = 1;
}
}
- _seq_num_lookup(src_host, recv_seq_num, 0, wipe_bufs);
+ _seq_num_lookup(knet_h, src_host, recv_seq_num, 0, wipe_bufs);
} else {
/*
* pings always arrives in bursts over all the link
* catch the first of them to cache the seq num and
* avoid duplicate processing
*/
if (recv_seq_num != src_host->timed_rx_seq_num) {
src_host->timed_rx_seq_num = recv_seq_num;
if (recv_seq_num == 0) {
- _seq_num_lookup(src_host, recv_seq_num, 0, 1);
+ _seq_num_lookup(knet_h, src_host, recv_seq_num, 0, 1);
}
}
}
}
void process_pong_v1(knet_handle_t knet_h, struct knet_host *src_host, struct knet_link *src_link, struct knet_header *inbuf, struct timespec *recvtime)
{
memmove(recvtime, &inbuf->khp_ping_v1_time[0], sizeof(struct timespec));
}
struct knet_link *get_link_from_pong_v1(knet_handle_t knet_h, struct knet_host *src_host, struct knet_header *inbuf)
{
return &src_host->link[inbuf->khp_ping_v1_link];
}
void prep_pmtud_v1(knet_handle_t knet_h, struct knet_link *dst_link, uint8_t onwire_ver, size_t onwire_len)
{
knet_h->pmtudbuf->kh_version = onwire_ver;
knet_h->pmtudbuf->kh_max_ver = knet_h->onwire_max_ver;
knet_h->pmtudbuf->kh_type = KNET_HEADER_TYPE_PMTUD;
knet_h->pmtudbuf->kh_node = htons(knet_h->host_id);
knet_h->pmtudbuf->khp_pmtud_v1_link = dst_link->link_id;
knet_h->pmtudbuf->khp_pmtud_v1_size = onwire_len;
}
void prep_pmtud_reply_v1(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *outlen)
{
*outlen = KNET_HEADER_PMTUD_V1_SIZE;
inbuf->kh_type = KNET_HEADER_TYPE_PMTUD_REPLY;
inbuf->kh_node = htons(knet_h->host_id);
}
void process_pmtud_reply_v1(knet_handle_t knet_h, struct knet_link *src_link, struct knet_header *inbuf)
{
src_link->last_recv_mtu = inbuf->khp_pmtud_v1_size;
}
void prep_tx_bufs_v1(knet_handle_t knet_h,
struct knet_header *inbuf, unsigned char *data, size_t inlen, unsigned int temp_data_mtu,
seq_num_t tx_seq_num, int8_t channel, int bcast, int data_compressed,
int *msgs_to_send, struct iovec iov_out[PCKT_FRAG_MAX][2], int *iovcnt_out)
{
uint8_t frag_idx = 0;
size_t frag_len = inlen;
/*
* prepare the main header
*/
inbuf->kh_type = KNET_HEADER_TYPE_DATA;
inbuf->kh_version = 1;
inbuf->kh_max_ver = knet_h->onwire_max_ver;
inbuf->kh_node = htons(knet_h->host_id);
/*
* prepare the data header
*/
inbuf->khp_data_v1_frag_seq = 0;
inbuf->khp_data_v1_bcast = bcast;
inbuf->khp_data_v1_frag_num = ceil((float)inlen / temp_data_mtu);
inbuf->khp_data_v1_channel = channel;
inbuf->khp_data_v1_seq_num = htons(tx_seq_num);
if (data_compressed) {
inbuf->khp_data_v1_compress = knet_h->compress_model;
} else {
inbuf->khp_data_v1_compress = 0;
}
/*
* handle fragmentation
*/
if (inbuf->khp_data_v1_frag_num > 1) {
while (frag_idx < inbuf->khp_data_v1_frag_num) {
/*
* set the iov_base
*/
iov_out[frag_idx][0].iov_base = (void *)knet_h->send_to_links_buf[frag_idx];
iov_out[frag_idx][0].iov_len = KNET_HEADER_DATA_V1_SIZE;
iov_out[frag_idx][1].iov_base = data + (temp_data_mtu * frag_idx);
/*
* set the len
*/
if (frag_len > temp_data_mtu) {
iov_out[frag_idx][1].iov_len = temp_data_mtu;
} else {
iov_out[frag_idx][1].iov_len = frag_len;
}
/*
* copy the frag info on all buffers
*/
memmove(knet_h->send_to_links_buf[frag_idx], inbuf, KNET_HEADER_DATA_V1_SIZE);
/*
* bump the frag
*/
knet_h->send_to_links_buf[frag_idx]->khp_data_v1_frag_seq = frag_idx + 1;
frag_len = frag_len - temp_data_mtu;
frag_idx++;
}
*iovcnt_out = 2;
} else {
iov_out[frag_idx][0].iov_base = (void *)inbuf;
iov_out[frag_idx][0].iov_len = frag_len + KNET_HEADER_DATA_V1_SIZE;
*iovcnt_out = 1;
}
*msgs_to_send = inbuf->khp_data_v1_frag_num;
}
unsigned char *get_data_v1(knet_handle_t knet_h, struct knet_header *inbuf)
{
return inbuf->khp_data_v1_userdata;
}
void get_data_header_info_v1(knet_handle_t knet_h, struct knet_header *inbuf,
ssize_t *header_size, int8_t *channel,
seq_num_t *seq_num, uint8_t *decompress_type,
uint8_t *frags, uint8_t *frag_seq)
{
*header_size = KNET_HEADER_DATA_V1_SIZE;
*channel = inbuf->khp_data_v1_channel;
*seq_num = ntohs(inbuf->khp_data_v1_seq_num);
*decompress_type = inbuf->khp_data_v1_compress;
*frags = inbuf->khp_data_v1_frag_num;
*frag_seq = inbuf->khp_data_v1_frag_seq;
}
diff --git a/libknet/threads_rx.c b/libknet/threads_rx.c
index 59a9a42d..7a097475 100644
--- a/libknet/threads_rx.c
+++ b/libknet/threads_rx.c
@@ -1,956 +1,956 @@
/*
* Copyright (C) 2012-2021 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under LGPL-2.0+
*/
#include "config.h"
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <sys/uio.h>
#include <pthread.h>
#include "compat.h"
#include "compress.h"
#include "crypto.h"
#include "host.h"
#include "links.h"
#include "links_acl.h"
#include "logging.h"
#include "transports.h"
#include "transport_common.h"
#include "threads_common.h"
#include "threads_heartbeat.h"
#include "threads_pmtud.h"
#include "threads_rx.h"
#include "netutils.h"
#include "onwire_v1.h"
/*
* RECV
*/
/*
* return 1 if a > b
* return -1 if b > a
* return 0 if they are equal
*/
static inline int _timecmp(struct timespec a, struct timespec b)
{
if (a.tv_sec != b.tv_sec) {
if (a.tv_sec > b.tv_sec) {
return 1;
} else {
return -1;
}
} else {
if (a.tv_nsec > b.tv_nsec) {
return 1;
} else if (a.tv_nsec < b.tv_nsec) {
return -1;
} else {
return 0;
}
}
}
/*
* this functions needs to return an index (0 to 7)
* to a knet_host_defrag_buf. (-1 on errors)
*/
static int _find_pckt_defrag_buf(knet_handle_t knet_h, struct knet_host *src_host, seq_num_t seq_num)
{
int i, oldest;
/*
* check if there is a buffer already in use handling the same seq_num
*/
- for (i = 0; i < KNET_MAX_LINK; i++) {
+ for (i = 0; i < KNET_DEFRAG_BUFFERS; i++) {
if (src_host->defrag_buf[i].in_use) {
if (src_host->defrag_buf[i].pckt_seq == seq_num) {
return i;
}
}
}
/*
* If there is no buffer that's handling the current seq_num
* either it's new or it's been reclaimed already.
* check if it's been reclaimed/seen before using the defrag circular
* buffer. If the pckt has been seen before, the buffer expired (ETIME)
* and there is no point to try to defrag it again.
*/
- if (!_seq_num_lookup(src_host, seq_num, 1, 0)) {
+ if (!_seq_num_lookup(knet_h, src_host, seq_num, 1, 0)) {
errno = ETIME;
return -1;
}
/*
* register the pckt as seen
*/
_seq_num_set(src_host, seq_num, 1);
/*
* see if there is a free buffer
*/
- for (i = 0; i < KNET_MAX_LINK; i++) {
+ for (i = 0; i < KNET_DEFRAG_BUFFERS; i++) {
if (!src_host->defrag_buf[i].in_use) {
return i;
}
}
/*
* at this point, there are no free buffers, the pckt is new
* and we need to reclaim a buffer, and we will take the one
* with the oldest timestamp. It's as good as any.
*/
oldest = 0;
- for (i = 0; i < KNET_MAX_LINK; i++) {
+ for (i = 0; i < KNET_DEFRAG_BUFFERS; i++) {
if (_timecmp(src_host->defrag_buf[i].last_update, src_host->defrag_buf[oldest].last_update) < 0) {
oldest = i;
}
}
src_host->defrag_buf[oldest].in_use = 0;
return oldest;
}
static int _pckt_defrag(knet_handle_t knet_h, struct knet_host *src_host, seq_num_t seq_num, unsigned char *data, ssize_t *len, uint8_t frags, uint8_t frag_seq)
{
struct knet_host_defrag_buf *defrag_buf;
int defrag_buf_idx;
defrag_buf_idx = _find_pckt_defrag_buf(knet_h, src_host, seq_num);
if (defrag_buf_idx < 0) {
return 1;
}
defrag_buf = &src_host->defrag_buf[defrag_buf_idx];
/*
* if the buf is not is use, then make sure it's clean
*/
if (!defrag_buf->in_use) {
memset(defrag_buf, 0, sizeof(struct knet_host_defrag_buf));
defrag_buf->in_use = 1;
defrag_buf->pckt_seq = seq_num;
}
/*
* update timestamp on the buffer
*/
clock_gettime(CLOCK_MONOTONIC, &defrag_buf->last_update);
/*
* check if we already received this fragment
*/
if (defrag_buf->frag_map[frag_seq]) {
/*
* if we have received this fragment and we didn't clear the buffer
* it means that we don't have all fragments yet
*/
return 1;
}
/*
* we need to handle the last packet with gloves due to its different size
*/
if (frag_seq == frags) {
defrag_buf->last_frag_size = *len;
/*
* in the event when the last packet arrives first,
* we still don't know the offset vs the other fragments (based on MTU),
* so we store the fragment at the end of the buffer where it's safe
* and take a copy of the len so that we can restore its offset later.
* remember we can't use the local MTU for this calculation because pMTU
* can be asymettric between the same hosts.
*/
if (!defrag_buf->frag_size) {
defrag_buf->last_first = 1;
memmove(defrag_buf->buf + (KNET_MAX_PACKET_SIZE - *len),
data,
*len);
}
} else {
defrag_buf->frag_size = *len;
}
if (defrag_buf->frag_size) {
memmove(defrag_buf->buf + ((frag_seq - 1) * defrag_buf->frag_size),
data, *len);
}
defrag_buf->frag_recv++;
defrag_buf->frag_map[frag_seq] = 1;
/*
* check if we received all the fragments
*/
if (defrag_buf->frag_recv == frags) {
/*
* special case the last pckt
*/
if (defrag_buf->last_first) {
memmove(defrag_buf->buf + ((frags - 1) * defrag_buf->frag_size),
defrag_buf->buf + (KNET_MAX_PACKET_SIZE - defrag_buf->last_frag_size),
defrag_buf->last_frag_size);
}
/*
* recalculate packet lenght
*/
*len = ((frags - 1) * defrag_buf->frag_size) + defrag_buf->last_frag_size;
/*
* copy the pckt back in the user data
*/
memmove(data, defrag_buf->buf, *len);
/*
* free this buffer
*/
defrag_buf->in_use = 0;
return 0;
}
return 1;
}
static int _handle_data_stats(knet_handle_t knet_h, struct knet_link *src_link, ssize_t len, uint64_t decrypt_time)
{
int stats_err;
/* data stats at the top for consistency with TX */
src_link->status.stats.rx_data_packets++;
src_link->status.stats.rx_data_bytes += len;
if (decrypt_time) {
stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
if (stats_err < 0) {
log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
return -1;
}
/* Only update the crypto overhead for data packets. Mainly to be
consistent with TX */
if (decrypt_time < knet_h->stats.rx_crypt_time_min) {
knet_h->stats.rx_crypt_time_min = decrypt_time;
}
if (decrypt_time > knet_h->stats.rx_crypt_time_max) {
knet_h->stats.rx_crypt_time_max = decrypt_time;
}
knet_h->stats.rx_crypt_time_ave =
(knet_h->stats.rx_crypt_time_ave * knet_h->stats.rx_crypt_packets +
decrypt_time) / (knet_h->stats.rx_crypt_packets+1);
knet_h->stats.rx_crypt_packets++;
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
}
return 0;
}
static int _decompress_data(knet_handle_t knet_h, uint8_t decompress_type, unsigned char *data, ssize_t *len, ssize_t header_size)
{
int err = 0, stats_err = 0;
if (decompress_type) {
ssize_t decmp_outlen = KNET_DATABUFSIZE_COMPRESS;
struct timespec start_time;
struct timespec end_time;
uint64_t decompress_time;
clock_gettime(CLOCK_MONOTONIC, &start_time);
err = decompress(knet_h, decompress_type,
data,
*len - header_size,
knet_h->recv_from_links_buf_decompress,
&decmp_outlen);
clock_gettime(CLOCK_MONOTONIC, &end_time);
timespec_diff(start_time, end_time, &decompress_time);
stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
if (stats_err < 0) {
log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
return -1;
}
if (!err) {
/* Collect stats */
if (decompress_time < knet_h->stats.rx_compress_time_min) {
knet_h->stats.rx_compress_time_min = decompress_time;
}
if (decompress_time > knet_h->stats.rx_compress_time_max) {
knet_h->stats.rx_compress_time_max = decompress_time;
}
knet_h->stats.rx_compress_time_ave =
(knet_h->stats.rx_compress_time_ave * knet_h->stats.rx_compressed_packets +
decompress_time) / (knet_h->stats.rx_compressed_packets+1);
knet_h->stats.rx_compressed_packets++;
knet_h->stats.rx_compressed_original_bytes += decmp_outlen;
knet_h->stats.rx_compressed_size_bytes += *len - KNET_HEADER_SIZE;
memmove(data, knet_h->recv_from_links_buf_decompress, decmp_outlen);
*len = decmp_outlen + header_size;
} else {
knet_h->stats.rx_failed_to_decompress++;
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
log_warn(knet_h, KNET_SUB_COMPRESS, "Unable to decompress packet (%d): %s",
err, strerror(errno));
return -1;
}
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
}
return 0;
}
static int _check_destination(knet_handle_t knet_h, struct knet_header *inbuf, unsigned char *data, ssize_t len, ssize_t header_size, int8_t *channel)
{
knet_node_id_t dst_host_ids[KNET_MAX_HOST];
size_t dst_host_ids_entries = 0;
int bcast = 1;
size_t host_idx;
int found = 0;
if (knet_h->dst_host_filter_fn) {
bcast = knet_h->dst_host_filter_fn(
knet_h->dst_host_filter_fn_private_data,
data,
len - header_size,
KNET_NOTIFY_RX,
knet_h->host_id,
inbuf->kh_node,
channel,
dst_host_ids,
&dst_host_ids_entries);
if (bcast < 0) {
log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast);
return -1;
}
if ((!bcast) && (!dst_host_ids_entries)) {
log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries");
return -1;
}
/* check if we are dst for this packet */
if (!bcast) {
if (dst_host_ids_entries > KNET_MAX_HOST) {
log_debug(knet_h, KNET_SUB_RX, "dst_host_filter_fn returned too many destinations");
return -1;
}
for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
if (dst_host_ids[host_idx] == knet_h->host_id) {
found = 1;
break;
}
}
if (!found) {
log_debug(knet_h, KNET_SUB_RX, "Packet is not for us");
return -1;
}
}
}
return 0;
}
static int _deliver_data(knet_handle_t knet_h, unsigned char *data, ssize_t len, ssize_t header_size, int8_t channel)
{
struct iovec iov_out[1];
ssize_t outlen = 0;
memset(iov_out, 0, sizeof(iov_out));
retry:
iov_out[0].iov_base = (void *) data + outlen;
iov_out[0].iov_len = len - (outlen + header_size);
outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1);
if ((outlen > 0) && (outlen < (ssize_t)iov_out[0].iov_len)) {
log_debug(knet_h, KNET_SUB_RX,
"Unable to send all data to the application in one go. Expected: %zu Sent: %zd\n",
iov_out[0].iov_len, outlen);
goto retry;
}
if (outlen <= 0) {
knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
knet_h->sockfd[channel].sockfd[0],
channel,
KNET_NOTIFY_RX,
outlen,
errno);
return -1;
}
if ((size_t)outlen != iov_out[0].iov_len) {
return -1;
}
return 0;
}
static void _process_data(knet_handle_t knet_h, struct knet_host *src_host, struct knet_link *src_link, struct knet_header *inbuf, ssize_t len, uint64_t decrypt_time)
{
int8_t channel;
uint8_t decompress_type = 0;
ssize_t header_size;
seq_num_t seq_num;
uint8_t frags, frag_seq;
unsigned char *data;
if (_handle_data_stats(knet_h, src_link, len, decrypt_time) < 0) {
return;
}
/*
* register host is sending data. Required to determine if we need
* to reset circular buffers. (see onwire_v1.c)
*/
src_host->got_data = 1;
if (knet_h->onwire_ver_remap) {
get_data_header_info_v1(knet_h, inbuf, &header_size, &channel, &seq_num, &decompress_type, &frags, &frag_seq);
data = get_data_v1(knet_h, inbuf);
} else {
switch (inbuf->kh_version) {
case 1:
get_data_header_info_v1(knet_h, inbuf, &header_size, &channel, &seq_num, &decompress_type, &frags, &frag_seq);
data = get_data_v1(knet_h, inbuf);
break;
default:
log_warn(knet_h, KNET_SUB_RX, "processing data onwire version %u not supported", inbuf->kh_version);
return;
break;
}
}
- if (!_seq_num_lookup(src_host, seq_num, 0, 0)) {
+ if (!_seq_num_lookup(knet_h, src_host, seq_num, 0, 0)) {
if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) {
log_debug(knet_h, KNET_SUB_RX, "Packet has already been delivered");
}
return;
}
if (frags > 1) {
/*
* len as received from the socket also includes extra stuff
* that the defrag code doesn't care about. So strip it
* here and readd only for repadding once we are done
* defragging
*
* the defrag code assumes that data packets have all the same size
* except the last one that might be smaller.
*
*/
len = len - header_size;
if (_pckt_defrag(knet_h, src_host, seq_num, data, &len, frags, frag_seq)) {
return;
}
len = len + header_size;
}
if (_decompress_data(knet_h, decompress_type, data, &len, header_size) < 0) {
return;
}
if (!src_host->status.reachable) {
log_debug(knet_h, KNET_SUB_RX, "Source host %u not reachable yet. Discarding packet.", src_host->host_id);
return;
}
if (knet_h->enabled != 1) /* data forward is disabled */
return;
if (_check_destination(knet_h, inbuf, data, len, header_size, &channel) < 0) {
return;
}
if (!knet_h->sockfd[channel].in_use) {
log_debug(knet_h, KNET_SUB_RX,
"received packet for channel %d but there is no local sock connected",
channel);
return;
}
if (_deliver_data(knet_h, data, len, header_size, channel) < 0) {
return;
}
_seq_num_set(src_host, seq_num, 0);
}
static struct knet_header *_decrypt_packet(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *len, uint64_t *decrypt_time)
{
int try_decrypt = 0;
int i = 0;
struct timespec start_time;
struct timespec end_time;
ssize_t outlen;
for (i = 1; i <= KNET_MAX_CRYPTO_INSTANCES; i++) {
if (knet_h->crypto_instance[i]) {
try_decrypt = 1;
break;
}
}
if ((!try_decrypt) && (knet_h->crypto_only == KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC)) {
log_debug(knet_h, KNET_SUB_RX, "RX thread configured to accept only crypto packets, but no crypto configs are configured!");
return NULL;
}
if (try_decrypt) {
clock_gettime(CLOCK_MONOTONIC, &start_time);
if (crypto_authenticate_and_decrypt(knet_h,
(unsigned char *)inbuf,
*len,
knet_h->recv_from_links_buf_decrypt,
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to decrypt/auth packet");
if (knet_h->crypto_only == KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC) {
return NULL;
}
log_debug(knet_h, KNET_SUB_RX, "Attempting to process packet as clear data");
} else {
clock_gettime(CLOCK_MONOTONIC, &end_time);
timespec_diff(start_time, end_time, decrypt_time);
*len = outlen;
inbuf = (struct knet_header *)knet_h->recv_from_links_buf_decrypt;
}
}
return inbuf;
}
static int _packet_checks(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t len)
{
if (len < (ssize_t)(KNET_HEADER_SIZE + 1)) {
log_debug(knet_h, KNET_SUB_RX, "Packet is too short: %ld", (long)len);
return -1;
}
/*
* old versions of knet did not advertise max_ver and max_ver is set to 0.
*/
if (!inbuf->kh_max_ver) {
inbuf->kh_max_ver = 1;
}
/*
* if the node joining max version is lower than the min version
* then we reject the node
*/
if (inbuf->kh_max_ver < knet_h->onwire_min_ver) {
log_warn(knet_h, KNET_SUB_RX,
"Received packet version %u from node %u, lower than currently minimal supported onwire version. Rejecting.", inbuf->kh_version, inbuf->kh_node);
return -1;
}
/*
* if the node joining with version higher than our max version
* then we reject the node
*/
if (inbuf->kh_version > knet_h->onwire_max_ver) {
log_warn(knet_h, KNET_SUB_RX,
"Received packet version %u from node %u, higher than currently maximum supported onwire version. Rejecting.", inbuf->kh_version, inbuf->kh_node);
return -1;
}
/*
* if the node joining with version lower than the current in use version
* then we reject the node
*
* NOTE: should we make this configurable and support downgrades?
*/
if ((!knet_h->onwire_force_ver) &&
(inbuf->kh_version < knet_h->onwire_ver) &&
(inbuf->kh_max_ver > inbuf->kh_version)) {
log_warn(knet_h, KNET_SUB_RX,
"Received packet version %u from node %u, lower than currently in use onwire version. Rejecting.", inbuf->kh_version, inbuf->kh_node);
return -1;
}
return 0;
}
static void _handle_dynip(knet_handle_t knet_h, struct knet_host *src_host, struct knet_link *src_link, int sockfd, const struct knet_mmsghdr *msg)
{
if (src_link->dynamic == KNET_LINK_DYNIP) {
if (cmpaddr(&src_link->dst_addr, msg->msg_hdr.msg_name) != 0) {
log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u appears to have changed ip address",
src_host->host_id, src_link->link_id);
memmove(&src_link->dst_addr, msg->msg_hdr.msg_name, sizeof(struct sockaddr_storage));
if (knet_addrtostr(&src_link->dst_addr, sockaddr_len(&src_link->dst_addr),
src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN,
src_link->status.dst_port, KNET_MAX_PORT_LEN) != 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to resolve ???");
snprintf(src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN - 1, "Unknown!!!");
snprintf(src_link->status.dst_port, KNET_MAX_PORT_LEN - 1, "??");
} else {
log_info(knet_h, KNET_SUB_RX,
"host: %u link: %u new connection established from: %s %s",
src_host->host_id, src_link->link_id,
src_link->status.dst_ipaddr, src_link->status.dst_port);
}
}
/*
* transport has already accepted the connection here
* otherwise we would not be receiving packets
*/
transport_link_dyn_connect(knet_h, sockfd, src_link);
}
}
static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struct knet_mmsghdr *msg)
{
int savederrno = 0, stats_err = 0;
struct knet_host *src_host;
struct knet_link *src_link;
uint64_t decrypt_time = 0;
struct knet_header *inbuf = msg->msg_hdr.msg_iov->iov_base;
ssize_t len = msg->msg_len;
int i, found_link = 0;
inbuf = _decrypt_packet(knet_h, inbuf, &len, &decrypt_time);
if (!inbuf) {
return;
}
inbuf->kh_node = ntohs(inbuf->kh_node);
if (_packet_checks(knet_h, inbuf, len) < 0) {
return;
}
/*
* determine source host
*/
src_host = knet_h->host_index[inbuf->kh_node];
if (src_host == NULL) { /* host not found */
log_debug(knet_h, KNET_SUB_RX, "Unable to find source host for this packet");
return;
}
/*
* deteremine source link
*/
if (inbuf->kh_type == KNET_HEADER_TYPE_PING) {
_handle_onwire_version(knet_h, src_host, inbuf);
if (knet_h->onwire_ver_remap) {
src_link = get_link_from_pong_v1(knet_h, src_host, inbuf);
} else {
switch (inbuf->kh_version) {
case 1:
src_link = get_link_from_pong_v1(knet_h, src_host, inbuf);
break;
default:
log_warn(knet_h, KNET_SUB_RX, "Parsing ping onwire version %u not supported", inbuf->kh_version);
return;
break;
}
}
_handle_dynip(knet_h, src_host, src_link, sockfd, msg);
} else { /* all other packets */
for (i = 0; i < KNET_MAX_LINK; i++) {
src_link = &src_host->link[i];
if (cmpaddr(&src_link->dst_addr, msg->msg_hdr.msg_name) == 0) {
found_link = 1;
break;
}
}
if (!found_link) {
log_debug(knet_h, KNET_SUB_RX, "Unable to determine source link for data packet. Discarding packet.");
return;
}
}
stats_err = pthread_mutex_lock(&src_link->link_stats_mutex);
if (stats_err) {
log_err(knet_h, KNET_SUB_RX, "Unable to get stats mutex lock for host %u link %u: %s",
src_host->host_id, src_link->link_id, strerror(savederrno));
return;
}
switch (inbuf->kh_type) {
case KNET_HEADER_TYPE_DATA:
_process_data(knet_h, src_host, src_link, inbuf, len, decrypt_time);
break;
case KNET_HEADER_TYPE_PING:
process_ping(knet_h, src_host, src_link, inbuf, len);
break;
case KNET_HEADER_TYPE_PONG:
process_pong(knet_h, src_host, src_link, inbuf, len);
break;
case KNET_HEADER_TYPE_PMTUD:
src_link->status.stats.rx_pmtu_packets++;
src_link->status.stats.rx_pmtu_bytes += len;
/* Unlock so we don't deadlock with tx_mutex */
pthread_mutex_unlock(&src_link->link_stats_mutex);
process_pmtud(knet_h, src_link, inbuf);
return; /* Don't need to unlock link_stats_mutex */
break;
case KNET_HEADER_TYPE_PMTUD_REPLY:
src_link->status.stats.rx_pmtu_packets++;
src_link->status.stats.rx_pmtu_bytes += len;
/* pmtud_mutex can't be acquired while we hold a link_stats_mutex (ordering) */
pthread_mutex_unlock(&src_link->link_stats_mutex);
process_pmtud_reply(knet_h, src_link, inbuf);
return;
break;
default:
pthread_mutex_unlock(&src_link->link_stats_mutex);
return;
break;
}
pthread_mutex_unlock(&src_link->link_stats_mutex);
}
static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg)
{
int err, savederrno;
int i, msg_recv, transport;
if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to get global read lock");
return;
}
if (_is_valid_fd(knet_h, sockfd) < 1) {
/*
* this is normal if a fd got an event and before we grab the read lock
* and the link is removed by another thread
*/
goto exit_unlock;
}
transport = knet_h->knet_transport_fd_tracker[sockfd].transport;
/*
* reset msg_namelen to buffer size because after recvmmsg
* each msg_namelen will contain sizeof sockaddr_in or sockaddr_in6
*/
for (i = 0; i < PCKT_RX_BUFS; i++) {
msg[i].msg_hdr.msg_namelen = knet_h->knet_transport_fd_tracker[sockfd].sockaddr_len;
}
msg_recv = _recvmmsg(sockfd, &msg[0], PCKT_RX_BUFS, MSG_DONTWAIT | MSG_NOSIGNAL);
savederrno = errno;
/*
* WARNING: man page for recvmmsg is wrong. Kernel implementation here:
* recvmmsg can return:
* -1 on error
* 0 if the previous run of recvmmsg recorded an error on the socket
* N number of messages (see exception below).
*
* If there is an error from recvmsg after receiving a frame or more, the recvmmsg
* loop is interrupted, error recorded in the socket (getsockopt(SO_ERROR) and
* it will be visibile in the next run.
*
* Need to be careful how we handle errors at this stage.
*
* error messages need to be handled on a per transport/protocol base
* at this point we have different layers of error handling
* - msg_recv < 0 -> error from this run
* msg_recv = 0 -> error from previous run and error on socket needs to be cleared
* - per-transport message data
* example: msg[i].msg_hdr.msg_flags & MSG_NOTIFICATION or msg_len for SCTP == EOF,
* but for UDP it is perfectly legal to receive a 0 bytes message.. go figure
* - NOTE: on SCTP MSG_NOTIFICATION we get msg_recv == PCKT_FRAG_MAX messages and no
* errno set. That means the error api needs to be able to abort the loop below.
*/
if (msg_recv <= 0) {
transport_rx_sock_error(knet_h, transport, sockfd, msg_recv, savederrno);
goto exit_unlock;
}
for (i = 0; i < msg_recv; i++) {
err = transport_rx_is_data(knet_h, transport, sockfd, &msg[i]);
/*
* TODO: make this section silent once we are confident
* all protocols packet handlers are good
*/
switch(err) {
case KNET_TRANSPORT_RX_ERROR: /* on error */
log_debug(knet_h, KNET_SUB_RX, "Transport reported error parsing packet");
goto exit_unlock;
break;
case KNET_TRANSPORT_RX_NOT_DATA_CONTINUE: /* packet is not data and we should continue the packet process loop */
log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, continue");
break;
case KNET_TRANSPORT_RX_NOT_DATA_STOP: /* packet is not data and we should STOP the packet process loop */
log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, stop");
goto exit_unlock;
break;
case KNET_TRANSPORT_RX_IS_DATA: /* packet is data and should be parsed as such */
/*
* processing incoming packets vs access lists
*/
if ((knet_h->use_access_lists) &&
(transport_get_acl_type(knet_h, transport) == USE_GENERIC_ACL)) {
if (!check_validate(knet_h, sockfd, transport, msg[i].msg_hdr.msg_name)) {
char src_ipaddr[KNET_MAX_HOST_LEN];
char src_port[KNET_MAX_PORT_LEN];
memset(src_ipaddr, 0, KNET_MAX_HOST_LEN);
memset(src_port, 0, KNET_MAX_PORT_LEN);
if (knet_addrtostr(msg[i].msg_hdr.msg_name, sockaddr_len(msg[i].msg_hdr.msg_name),
src_ipaddr, KNET_MAX_HOST_LEN,
src_port, KNET_MAX_PORT_LEN) < 0) {
log_debug(knet_h, KNET_SUB_RX, "Packet rejected: unable to resolve host/port");
} else {
log_debug(knet_h, KNET_SUB_RX, "Packet rejected from %s/%s", src_ipaddr, src_port);
}
/*
* continue processing the other packets
*/
continue;
}
}
_parse_recv_from_links(knet_h, sockfd, &msg[i]);
break;
case KNET_TRANSPORT_RX_OOB_DATA_CONTINUE:
log_debug(knet_h, KNET_SUB_RX, "Transport is processing sock OOB data, continue");
break;
case KNET_TRANSPORT_RX_OOB_DATA_STOP:
log_debug(knet_h, KNET_SUB_RX, "Transport has completed processing sock OOB data, stop");
goto exit_unlock;
break;
}
}
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
}
void *_handle_recv_from_links_thread(void *data)
{
int i, nev;
knet_handle_t knet_h = (knet_handle_t) data;
struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
struct sockaddr_storage address[PCKT_RX_BUFS];
struct knet_mmsghdr msg[PCKT_RX_BUFS];
struct iovec iov_in[PCKT_RX_BUFS];
set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_STARTED);
memset(&msg, 0, sizeof(msg));
memset(&events, 0, sizeof(events));
for (i = 0; i < PCKT_RX_BUFS; i++) {
iov_in[i].iov_base = (void *)knet_h->recv_from_links_buf[i];
iov_in[i].iov_len = KNET_DATABUFSIZE;
memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr));
msg[i].msg_hdr.msg_name = &address[i];
msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); /* Real value filled in before actual use */
msg[i].msg_hdr.msg_iov = &iov_in[i];
msg[i].msg_hdr.msg_iovlen = 1;
}
while (!shutdown_in_progress(knet_h)) {
nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, knet_h->threads_timer_res / 1000);
/*
* the RX threads only need to notify that there has been at least
* one successful run after queue flush has been requested.
* See setfwd in handle.c
*/
if (get_thread_flush_queue(knet_h, KNET_THREAD_RX) == KNET_THREAD_QUEUE_FLUSH) {
set_thread_flush_queue(knet_h, KNET_THREAD_RX, KNET_THREAD_QUEUE_FLUSHED);
}
/*
* we use timeout to detect if thread is shutting down
*/
if (nev == 0) {
continue;
}
for (i = 0; i < nev; i++) {
_handle_recv_from_links(knet_h, events[i].data.fd, msg);
}
}
set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_STOPPED);
return NULL;
}
ssize_t knet_recv(knet_handle_t knet_h, char *buff, const size_t buff_len, const int8_t channel)
{
int savederrno = 0;
ssize_t err = 0;
struct iovec iov_in;
if (!_is_valid_handle(knet_h)) {
return -1;
}
if (buff == NULL) {
errno = EINVAL;
return -1;
}
if (buff_len <= 0) {
errno = EINVAL;
return -1;
}
if (buff_len > KNET_MAX_PACKET_SIZE) {
errno = EINVAL;
return -1;
}
if (channel < 0) {
errno = EINVAL;
return -1;
}
if (channel >= KNET_DATAFD_MAX) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->sockfd[channel].in_use) {
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
memset(&iov_in, 0, sizeof(iov_in));
iov_in.iov_base = (void *)buff;
iov_in.iov_len = buff_len;
err = readv(knet_h->sockfd[channel].sockfd[0], &iov_in, 1);
savederrno = errno;
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}

File Metadata

Mime Type
text/x-diff
Expires
Wed, Jun 4, 6:22 AM (6 h, 24 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1854730
Default Alt Text
(71 KB)

Event Timeline