Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/libknet/Makefile.am b/libknet/Makefile.am
index 3127642a..dd16fd40 100644
--- a/libknet/Makefile.am
+++ b/libknet/Makefile.am
@@ -1,166 +1,167 @@
#
# Copyright (C) 2010-2020 Red Hat, Inc. All rights reserved.
#
# Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
# Federico Simoncelli <fsimon@kronosnet.org>
#
# This software licensed under GPL-2.0+
#
MAINTAINERCLEANFILES = Makefile.in
include $(top_srcdir)/build-aux/check.mk
SYMFILE = libknet_exported_syms
EXTRA_DIST = $(SYMFILE)
SUBDIRS = . tests
# https://www.gnu.org/software/libtool/manual/html_node/Updating-version-info.html
libversion = 5:0:4
# override global LIBS that pulls in lots of craft we don't need here
LIBS =
sources = \
common.c \
compat.c \
compress.c \
crypto.c \
handle.c \
+ handle_api.c \
host.c \
links.c \
links_acl.c \
links_acl_ip.c \
links_acl_loopback.c \
logging.c \
netutils.c \
onwire.c \
threads_common.c \
threads_dsthandler.c \
threads_heartbeat.c \
threads_pmtud.c \
threads_rx.c \
threads_tx.c \
transports.c \
transport_common.c \
transport_loopback.c \
transport_udp.c \
transport_sctp.c
include_HEADERS = libknet.h
pkgconfigdir = $(libdir)/pkgconfig
pkgconfig_DATA = libknet.pc
noinst_HEADERS = \
common.h \
compat.h \
compress.h \
compress_model.h \
crypto.h \
crypto_model.h \
host.h \
internals.h \
links.h \
links_acl.h \
links_acl_ip.h \
links_acl_loopback.h \
logging.h \
netutils.h \
onwire.h \
threads_common.h \
threads_dsthandler.h \
threads_heartbeat.h \
threads_pmtud.h \
threads_rx.h \
threads_tx.h \
transports.h \
transport_common.h \
transport_loopback.h \
transport_udp.h \
transport_sctp.h
lib_LTLIBRARIES = libknet.la
libknet_la_SOURCES = $(sources)
AM_CFLAGS += $(libqb_CFLAGS)
libknet_la_CFLAGS = $(AM_CFLAGS) $(PTHREAD_CFLAGS)
EXTRA_libknet_la_DEPENDENCIES = $(SYMFILE)
libknet_la_LDFLAGS = $(AM_LDFLAGS) \
-Wl,--version-script=$(srcdir)/$(SYMFILE) \
-Wl,-rpath=$(pkglibdir) \
-version-info $(libversion)
libknet_la_LIBADD = $(PTHREAD_LIBS) $(dl_LIBS) $(rt_LIBS) $(m_LIBS)
# Prepare empty value for appending
pkglib_LTLIBRARIES =
# MODULE_LDFLAGS would mean a target-specific variable for Automake
MODULELDFLAGS = $(AM_LDFLAGS) -module -avoid-version -export-dynamic
if BUILD_COMPRESS_ZSTD
pkglib_LTLIBRARIES += compress_zstd.la
compress_zstd_la_LDFLAGS = $(MODULELDFLAGS)
compress_zstd_la_CFLAGS = $(AM_CFLAGS) $(libzstd_CFLAGS)
compress_zstd_la_LIBADD = $(libzstd_LIBS)
endif
if BUILD_COMPRESS_ZLIB
pkglib_LTLIBRARIES += compress_zlib.la
compress_zlib_la_LDFLAGS = $(MODULELDFLAGS)
compress_zlib_la_CFLAGS = $(AM_CFLAGS) $(zlib_CFLAGS)
compress_zlib_la_LIBADD = $(zlib_LIBS)
endif
if BUILD_COMPRESS_LZ4
pkglib_LTLIBRARIES += compress_lz4.la compress_lz4hc.la
compress_lz4_la_LDFLAGS = $(MODULELDFLAGS)
compress_lz4_la_CFLAGS = $(AM_CFLAGS) $(liblz4_CFLAGS)
compress_lz4_la_LIBADD = $(liblz4_LIBS)
compress_lz4hc_la_LDFLAGS = $(MODULELDFLAGS)
compress_lz4hc_la_CFLAGS = $(AM_CFLAGS) $(liblz4_CFLAGS)
compress_lz4hc_la_LIBADD = $(liblz4_LIBS)
endif
if BUILD_COMPRESS_LZO2
pkglib_LTLIBRARIES += compress_lzo2.la
compress_lzo2_la_LDFLAGS = $(MODULELDFLAGS)
compress_lzo2_la_CFLAGS = $(AM_CFLAGS) $(lzo2_CFLAGS)
compress_lzo2_la_LIBADD = $(lzo2_LIBS)
endif
if BUILD_COMPRESS_LZMA
pkglib_LTLIBRARIES += compress_lzma.la
compress_lzma_la_LDFLAGS = $(MODULELDFLAGS)
compress_lzma_la_CFLAGS = $(AM_CFLAGS) $(liblzma_CFLAGS)
compress_lzma_la_LIBADD = $(liblzma_LIBS)
endif
if BUILD_COMPRESS_BZIP2
pkglib_LTLIBRARIES += compress_bzip2.la
compress_bzip2_la_LDFLAGS = $(MODULELDFLAGS)
compress_bzip2_la_CFLAGS = $(AM_CFLAGS) $(bzip2_CFLAGS)
compress_bzip2_la_LIBADD = $(bzip2_LIBS)
endif
if BUILD_CRYPTO_NSS
pkglib_LTLIBRARIES += crypto_nss.la
crypto_nss_la_LDFLAGS = $(MODULELDFLAGS)
crypto_nss_la_CFLAGS = $(AM_CFLAGS) $(nss_CFLAGS)
crypto_nss_la_LIBADD = $(nss_LIBS)
endif
if BUILD_CRYPTO_OPENSSL
pkglib_LTLIBRARIES += crypto_openssl.la
crypto_openssl_la_LDFLAGS = $(MODULELDFLAGS)
crypto_openssl_la_CFLAGS = $(AM_CFLAGS) $(openssl_CFLAGS)
crypto_openssl_la_LIBADD = $(openssl_LIBS)
endif
diff --git a/libknet/compress.c b/libknet/compress.c
index f62861d6..ec6f16d8 100644
--- a/libknet/compress.c
+++ b/libknet/compress.c
@@ -1,513 +1,545 @@
/*
* Copyright (C) 2017-2020 Red Hat, Inc. All rights reserved.
*
* Author: Fabio M. Di Nitto <fabbione@kronosnet.org>
*
* This software licensed under LGPL-2.0+
*/
#include "config.h"
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include <time.h>
#include "internals.h"
#include "compress.h"
#include "compress_model.h"
#include "logging.h"
#include "threads_common.h"
#include "common.h"
/*
* internal module switch data
*/
/*
* DO NOT CHANGE MODEL_ID HERE OR ONWIRE COMPATIBILITY
* WILL BREAK!
*
* Always add new items before the last NULL.
*/
static compress_model_t compress_modules_cmds[KNET_MAX_COMPRESS_METHODS + 1] = {
{ "none" , 0, 0, 0, NULL },
{ "zlib" , 1, WITH_COMPRESS_ZLIB , 0, NULL },
{ "lz4" , 2, WITH_COMPRESS_LZ4 , 0, NULL },
{ "lz4hc", 3, WITH_COMPRESS_LZ4 , 0, NULL },
{ "lzo2" , 4, WITH_COMPRESS_LZO2 , 0, NULL },
{ "lzma" , 5, WITH_COMPRESS_LZMA , 0, NULL },
{ "bzip2", 6, WITH_COMPRESS_BZIP2, 0, NULL },
{ "zstd" , 7, WITH_COMPRESS_ZSTD, 0, NULL },
{ NULL, KNET_MAX_COMPRESS_METHODS, 0, 0, NULL }
};
static int max_model = 0;
static struct timespec last_load_failure;
static int compress_get_model(const char *model)
{
int idx = 0;
while (compress_modules_cmds[idx].model_name != NULL) {
if (!strcmp(compress_modules_cmds[idx].model_name, model)) {
return compress_modules_cmds[idx].model_id;
}
idx++;
}
return -1;
}
static int compress_get_max_model(void)
{
int idx = 0;
while (compress_modules_cmds[idx].model_name != NULL) {
idx++;
}
return idx - 1;
}
static int compress_is_valid_model(int compress_model)
{
int idx = 0;
while (compress_modules_cmds[idx].model_name != NULL) {
if ((compress_model == compress_modules_cmds[idx].model_id) &&
(compress_modules_cmds[idx].built_in == 1)) {
return 0;
}
idx++;
}
return -1;
}
static int val_level(
knet_handle_t knet_h,
int compress_model,
int compress_level)
{
if (compress_modules_cmds[compress_model].ops->val_level != NULL) {
return compress_modules_cmds[compress_model].ops->val_level(knet_h, compress_level);
}
return 0;
}
/*
* compress_check_lib_is_init needs to be invoked in a locked context!
*/
static int compress_check_lib_is_init(knet_handle_t knet_h, int cmp_model)
{
/*
* lack of a .is_init function means that the module does not require
* init per handle so we use a fake reference in the compress_int_data
* to identify that we already increased the libref for this handle
*/
if (compress_modules_cmds[cmp_model].loaded == 1) {
if (compress_modules_cmds[cmp_model].ops->is_init == NULL) {
if (knet_h->compress_int_data[cmp_model] != NULL) {
return 1;
}
} else {
if (compress_modules_cmds[cmp_model].ops->is_init(knet_h, cmp_model) == 1) {
return 1;
}
}
}
return 0;
}
/*
* compress_load_lib should _always_ be invoked in write lock context
*/
static int compress_load_lib(knet_handle_t knet_h, int cmp_model, int rate_limit)
{
struct timespec clock_now;
unsigned long long timediff;
/*
* checking again for paranoia and because
* compress_check_lib_is_init is usually invoked in read context
* and we need to switch from read to write locking in between.
* another thread might have init the library in the meantime
*/
if (compress_check_lib_is_init(knet_h, cmp_model)) {
return 0;
}
/*
* due to the fact that decompress can load libraries
* on demand, depending on the compress model selected
* on other nodes, it is possible for an attacker
* to send crafted packets to attempt to load libraries
* at random in a DoS fashion.
* If there is an error loading a library, then we want
* to rate_limit a retry to reload the library every X
* seconds to avoid a lock DoS that could greatly slow
* down libknet.
*/
if (rate_limit) {
if ((last_load_failure.tv_sec != 0) ||
(last_load_failure.tv_nsec != 0)) {
clock_gettime(CLOCK_MONOTONIC, &clock_now);
timespec_diff(last_load_failure, clock_now, &timediff);
if (timediff < 10000000000) {
errno = EAGAIN;
return -1;
}
}
}
if (compress_modules_cmds[cmp_model].loaded == 0) {
compress_modules_cmds[cmp_model].ops = load_module (knet_h, "compress", compress_modules_cmds[cmp_model].model_name);
if (!compress_modules_cmds[cmp_model].ops) {
clock_gettime(CLOCK_MONOTONIC, &last_load_failure);
return -1;
}
if (compress_modules_cmds[cmp_model].ops->abi_ver != KNET_COMPRESS_MODEL_ABI) {
log_err(knet_h, KNET_SUB_COMPRESS,
"ABI mismatch loading module %s. knet ver: %d, module ver: %d",
compress_modules_cmds[cmp_model].model_name, KNET_COMPRESS_MODEL_ABI,
compress_modules_cmds[cmp_model].ops->abi_ver);
errno = EINVAL;
return -1;
}
compress_modules_cmds[cmp_model].loaded = 1;
}
if (compress_modules_cmds[cmp_model].ops->init != NULL) {
if (compress_modules_cmds[cmp_model].ops->init(knet_h, cmp_model) < 0) {
return -1;
}
} else {
knet_h->compress_int_data[cmp_model] = (void *)&"1";
}
return 0;
}
static int compress_lib_test(knet_handle_t knet_h)
{
int savederrno = 0;
unsigned char src[KNET_DATABUFSIZE];
unsigned char dst[KNET_DATABUFSIZE_COMPRESS];
ssize_t dst_comp_len = KNET_DATABUFSIZE_COMPRESS, dst_decomp_len = KNET_DATABUFSIZE;
unsigned int i;
int request_level;
memset(src, 0, KNET_DATABUFSIZE);
memset(dst, 0, KNET_DATABUFSIZE_COMPRESS);
/*
* NOTE: we cannot use compress and decompress API calls due to locking
* so we need to call directly into the modules
*/
if (compress_modules_cmds[knet_h->compress_model].ops->compress(knet_h, src, KNET_DATABUFSIZE, dst, &dst_comp_len) < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to compress test buffer. Please check your compression settings: %s", strerror(savederrno));
errno = savederrno;
return -1;
} else if ((long unsigned int)dst_comp_len >= KNET_DATABUFSIZE) {
/*
* compress not effective, try again using default compression level when available
*/
request_level = knet_h->compress_level;
log_warn(knet_h, KNET_SUB_COMPRESS,
"Requested compression level (%d) did not generate any compressed data (source: %zu destination: %zu)",
request_level, sizeof(src), dst_comp_len);
if ((!compress_modules_cmds[knet_h->compress_model].ops->get_default_level()) ||
((knet_h->compress_level = compress_modules_cmds[knet_h->compress_model].ops->get_default_level()) == KNET_COMPRESS_UNKNOWN_DEFAULT)) {
log_err(knet_h, KNET_SUB_COMPRESS, "compression %s does not provide a default value",
compress_modules_cmds[knet_h->compress_model].model_name);
errno = EINVAL;
return -1;
} else {
memset(src, 0, KNET_DATABUFSIZE);
memset(dst, 0, KNET_DATABUFSIZE_COMPRESS);
dst_comp_len = KNET_DATABUFSIZE_COMPRESS;
if (compress_modules_cmds[knet_h->compress_model].ops->compress(knet_h, src, KNET_DATABUFSIZE, dst, &dst_comp_len) < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to compress with default compression level: %s", strerror(savederrno));
errno = savederrno;
return -1;
}
log_warn(knet_h, KNET_SUB_COMPRESS, "Requested compression level (%d) did not work, switching to default (%d)",
request_level, knet_h->compress_level);
}
}
if (compress_modules_cmds[knet_h->compress_model].ops->decompress(knet_h, dst, dst_comp_len, src, &dst_decomp_len) < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to decompress test buffer. Please check your compression settings: %s", strerror(savederrno));
errno = savederrno;
return -1;
}
for (i = 0; i < KNET_DATABUFSIZE; i++) {
if (src[i] != 0) {
log_err(knet_h, KNET_SUB_COMPRESS, "Decompressed buffer contains incorrect data");
errno = EINVAL;
return -1;
}
}
return 0;
}
int compress_init(
knet_handle_t knet_h)
{
max_model = compress_get_max_model();
if (max_model > KNET_MAX_COMPRESS_METHODS) {
log_err(knet_h, KNET_SUB_COMPRESS, "Too many compress methods defined in compress.c.");
errno = EINVAL;
return -1;
}
memset(&last_load_failure, 0, sizeof(struct timespec));
return 0;
}
-int compress_cfg(
+static int compress_cfg(
knet_handle_t knet_h,
struct knet_handle_compress_cfg *knet_handle_compress_cfg)
{
int savederrno = 0, err = 0;
int cmp_model;
cmp_model = compress_get_model(knet_handle_compress_cfg->compress_model);
if (cmp_model < 0) {
log_err(knet_h, KNET_SUB_COMPRESS, "compress model %s not supported", knet_handle_compress_cfg->compress_model);
errno = EINVAL;
return -1;
}
log_debug(knet_h, KNET_SUB_COMPRESS,
"Initizializing compress module [%s/%d/%u]",
knet_handle_compress_cfg->compress_model, knet_handle_compress_cfg->compress_level, knet_handle_compress_cfg->compress_threshold);
if (cmp_model > 0) {
if (compress_modules_cmds[cmp_model].built_in == 0) {
log_err(knet_h, KNET_SUB_COMPRESS, "compress model %s support has not been built in. Please contact your vendor or fix the build", knet_handle_compress_cfg->compress_model);
errno = EINVAL;
return -1;
}
if (knet_handle_compress_cfg->compress_threshold > KNET_MAX_PACKET_SIZE) {
log_err(knet_h, KNET_SUB_COMPRESS, "compress threshold cannot be higher than KNET_MAX_PACKET_SIZE (%d).",
KNET_MAX_PACKET_SIZE);
errno = EINVAL;
return -1;
}
if (knet_handle_compress_cfg->compress_threshold == 0) {
knet_h->compress_threshold = KNET_COMPRESS_THRESHOLD;
log_debug(knet_h, KNET_SUB_COMPRESS, "resetting compression threshold to default (%d)", KNET_COMPRESS_THRESHOLD);
} else {
knet_h->compress_threshold = knet_handle_compress_cfg->compress_threshold;
}
savederrno = pthread_rwlock_rdlock(&shlib_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!compress_check_lib_is_init(knet_h, cmp_model)) {
/*
* need to switch to write lock, load the lib, and return with a write lock
* this is not racy because compress_load_lib is written idempotent.
*/
pthread_rwlock_unlock(&shlib_rwlock);
savederrno = pthread_rwlock_wrlock(&shlib_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (compress_load_lib(knet_h, cmp_model, 0) < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to load library: %s",
strerror(savederrno));
err = -1;
goto out_unlock;
}
}
if (val_level(knet_h, cmp_model, knet_handle_compress_cfg->compress_level) < 0) {
log_err(knet_h, KNET_SUB_COMPRESS, "compress level %d not supported for model %s",
knet_handle_compress_cfg->compress_level, knet_handle_compress_cfg->compress_model);
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
knet_h->compress_model = cmp_model;
knet_h->compress_level = knet_handle_compress_cfg->compress_level;
if (compress_lib_test(knet_h) < 0) {
savederrno = errno;
err = -1;
goto out_unlock;
}
out_unlock:
pthread_rwlock_unlock(&shlib_rwlock);
}
if (err) {
knet_h->compress_model = 0;
knet_h->compress_level = 0;
}
errno = savederrno;
return err;
}
void compress_fini(
knet_handle_t knet_h,
int all)
{
int savederrno = 0;
int idx = 0;
savederrno = pthread_rwlock_wrlock(&shlib_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to get write lock: %s",
strerror(savederrno));
return;
}
while (idx < KNET_MAX_COMPRESS_METHODS) {
if ((compress_modules_cmds[idx].model_name != NULL) &&
(compress_modules_cmds[idx].built_in == 1) &&
(compress_modules_cmds[idx].loaded == 1) &&
(compress_modules_cmds[idx].model_id > 0) &&
(knet_h->compress_int_data[idx] != NULL)) {
if ((all) || (compress_modules_cmds[idx].model_id == knet_h->compress_model)) {
if (compress_modules_cmds[idx].ops->fini != NULL) {
compress_modules_cmds[idx].ops->fini(knet_h, idx);
} else {
knet_h->compress_int_data[idx] = NULL;
}
}
}
idx++;
}
pthread_rwlock_unlock(&shlib_rwlock);
return;
}
/*
* compress does not require compress_check_lib_is_init
* because it's protected by compress_cfg
*/
int compress(
knet_handle_t knet_h,
const unsigned char *buf_in,
const ssize_t buf_in_len,
unsigned char *buf_out,
ssize_t *buf_out_len)
{
return compress_modules_cmds[knet_h->compress_model].ops->compress(knet_h, buf_in, buf_in_len, buf_out, buf_out_len);
}
int decompress(
knet_handle_t knet_h,
int compress_model,
const unsigned char *buf_in,
const ssize_t buf_in_len,
unsigned char *buf_out,
ssize_t *buf_out_len)
{
int savederrno = 0, err = 0;
if (compress_model > max_model) {
log_err(knet_h, KNET_SUB_COMPRESS, "Received packet with unknown compress model %d", compress_model);
errno = EINVAL;
return -1;
}
if (compress_is_valid_model(compress_model) < 0) {
log_err(knet_h, KNET_SUB_COMPRESS, "Received packet compressed with %s but support is not built in this version of libknet. Please contact your distribution vendor or fix the build.", compress_modules_cmds[compress_model].model_name);
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&shlib_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!compress_check_lib_is_init(knet_h, compress_model)) {
/*
* need to switch to write lock, load the lib, and return with a write lock
* this is not racy because compress_load_lib is written idempotent.
*/
pthread_rwlock_unlock(&shlib_rwlock);
savederrno = pthread_rwlock_wrlock(&shlib_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (compress_load_lib(knet_h, compress_model, 1) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to load library: %s",
strerror(savederrno));
goto out_unlock;
}
}
err = compress_modules_cmds[compress_model].ops->decompress(knet_h, buf_in, buf_in_len, buf_out, buf_out_len);
savederrno = errno;
out_unlock:
pthread_rwlock_unlock(&shlib_rwlock);
errno = savederrno;
return err;
}
+int knet_handle_compress(knet_handle_t knet_h, struct knet_handle_compress_cfg *knet_handle_compress_cfg)
+{
+ int savederrno = 0;
+ int err = 0;
+
+ if (!knet_h) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (!knet_handle_compress_cfg) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ savederrno = get_global_wrlock(knet_h);
+ if (savederrno) {
+ log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
+ strerror(savederrno));
+ errno = savederrno;
+ return -1;
+ }
+
+ compress_fini(knet_h, 0);
+ err = compress_cfg(knet_h, knet_handle_compress_cfg);
+ savederrno = errno;
+
+ pthread_rwlock_unlock(&knet_h->global_rwlock);
+ errno = err ? savederrno : 0;
+ return err;
+}
+
int knet_get_compress_list(struct knet_compress_info *compress_list, size_t *compress_list_entries)
{
int err = 0;
int idx = 0;
int outidx = 0;
if (!compress_list_entries) {
errno = EINVAL;
return -1;
}
while (compress_modules_cmds[idx].model_name != NULL) {
if (compress_modules_cmds[idx].built_in) {
if (compress_list) {
compress_list[outidx].name = compress_modules_cmds[idx].model_name;
}
outidx++;
}
idx++;
}
*compress_list_entries = outidx;
if (!err)
errno = 0;
return err;
}
diff --git a/libknet/compress.h b/libknet/compress.h
index 95464e82..c0f28f19 100644
--- a/libknet/compress.h
+++ b/libknet/compress.h
@@ -1,40 +1,36 @@
/*
* Copyright (C) 2017-2020 Red Hat, Inc. All rights reserved.
*
* Author: Fabio M. Di Nitto <fabbione@kronosnet.org>
*
* This software licensed under LGPL-2.0+
*/
#ifndef __KNET_COMPRESS_H__
#define __KNET_COMPRESS_H__
#include "internals.h"
-int compress_cfg(
- knet_handle_t knet_h,
- struct knet_handle_compress_cfg *knet_handle_compress_cfg);
-
int compress_init(
knet_handle_t knet_h);
void compress_fini(
knet_handle_t knet_h,
int all);
int compress(
knet_handle_t knet_h,
const unsigned char *buf_in,
const ssize_t buf_in_len,
unsigned char *buf_out,
ssize_t *buf_out_len);
int decompress(
knet_handle_t knet_h,
int compress_model,
const unsigned char *buf_in,
const ssize_t buf_in_len,
unsigned char *buf_out,
ssize_t *buf_out_len);
#endif
diff --git a/libknet/crypto.c b/libknet/crypto.c
index 0c475d0f..28d29ef3 100644
--- a/libknet/crypto.c
+++ b/libknet/crypto.c
@@ -1,319 +1,534 @@
/*
* Copyright (C) 2012-2020 Red Hat, Inc. All rights reserved.
*
* Author: Fabio M. Di Nitto <fabbione@kronosnet.org>
*
* This software licensed under LGPL-2.0+
*/
#include "config.h"
#include <errno.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <time.h>
#include "crypto.h"
#include "crypto_model.h"
#include "internals.h"
#include "logging.h"
#include "common.h"
/*
* internal module switch data
*/
static crypto_model_t crypto_modules_cmds[] = {
{ "nss", WITH_CRYPTO_NSS, 0, NULL },
{ "openssl", WITH_CRYPTO_OPENSSL, 0, NULL },
{ NULL, 0, 0, NULL }
};
static int crypto_get_model(const char *model)
{
int idx = 0;
while (crypto_modules_cmds[idx].model_name != NULL) {
if (!strcmp(crypto_modules_cmds[idx].model_name, model))
return idx;
idx++;
}
return -1;
}
/*
* exported API
*/
int crypto_encrypt_and_sign (
knet_handle_t knet_h,
const unsigned char *buf_in,
const ssize_t buf_in_len,
unsigned char *buf_out,
ssize_t *buf_out_len)
{
return crypto_modules_cmds[knet_h->crypto_instance[knet_h->crypto_in_use_config]->model].ops->crypt(knet_h, knet_h->crypto_instance[knet_h->crypto_in_use_config], buf_in, buf_in_len, buf_out, buf_out_len);
}
int crypto_encrypt_and_signv (
knet_handle_t knet_h,
const struct iovec *iov_in,
int iovcnt_in,
unsigned char *buf_out,
ssize_t *buf_out_len)
{
return crypto_modules_cmds[knet_h->crypto_instance[knet_h->crypto_in_use_config]->model].ops->cryptv(knet_h, knet_h->crypto_instance[knet_h->crypto_in_use_config], iov_in, iovcnt_in, buf_out, buf_out_len);
}
int crypto_authenticate_and_decrypt (
knet_handle_t knet_h,
const unsigned char *buf_in,
const ssize_t buf_in_len,
unsigned char *buf_out,
ssize_t *buf_out_len)
{
int i, err = 0;
int multiple_configs = 0;
uint8_t log_level = KNET_LOG_ERR;
for (i = 1; i <= KNET_MAX_CRYPTO_INSTANCES; i++) {
if (knet_h->crypto_instance[i]) {
multiple_configs++;
}
}
/*
* attempt to decrypt first with the in-use config
* to avoid excessive performance hit.
*/
if (multiple_configs > 1) {
log_level = KNET_LOG_DEBUG;
}
if (knet_h->crypto_in_use_config) {
err = crypto_modules_cmds[knet_h->crypto_instance[knet_h->crypto_in_use_config]->model].ops->decrypt(knet_h, knet_h->crypto_instance[knet_h->crypto_in_use_config], buf_in, buf_in_len, buf_out, buf_out_len, log_level);
} else {
err = -1;
}
/*
* if we fail, try to use the other configurations
*/
if (err) {
for (i = 1; i <= KNET_MAX_CRYPTO_INSTANCES; i++) {
/*
* in-use config was already attempted
*/
if (i == knet_h->crypto_in_use_config) {
continue;
}
if (knet_h->crypto_instance[i]) {
log_debug(knet_h, KNET_SUB_CRYPTO, "Alternative crypto configuration found, attempting to decrypt with config %u", i);
err = crypto_modules_cmds[knet_h->crypto_instance[i]->model].ops->decrypt(knet_h, knet_h->crypto_instance[i], buf_in, buf_in_len, buf_out, buf_out_len, KNET_LOG_ERR);
if (!err) {
errno = 0; /* clear errno from previous failures */
return err;
}
log_debug(knet_h, KNET_SUB_CRYPTO, "Packet failed to decrypt with crypto config %u", i);
}
}
}
return err;
}
-int crypto_use_config(
+static int crypto_use_config(
knet_handle_t knet_h,
uint8_t config_num)
{
if ((config_num) && (!knet_h->crypto_instance[config_num])) {
errno = EINVAL;
return -1;
}
knet_h->crypto_in_use_config = config_num;
if (config_num) {
knet_h->sec_block_size = knet_h->crypto_instance[config_num]->sec_block_size;
knet_h->sec_hash_size = knet_h->crypto_instance[config_num]->sec_hash_size;
knet_h->sec_salt_size = knet_h->crypto_instance[config_num]->sec_salt_size;
} else {
knet_h->sec_block_size = 0;
knet_h->sec_hash_size = 0;
knet_h->sec_salt_size = 0;
}
force_pmtud_run(knet_h, KNET_SUB_CRYPTO, 1);
return 0;
}
-int crypto_init(
+static int crypto_init(
knet_handle_t knet_h,
struct knet_handle_crypto_cfg *knet_handle_crypto_cfg,
uint8_t config_num)
{
int err = 0, savederrno = 0;
int model = 0;
struct crypto_instance *current = NULL, *new = NULL;
current = knet_h->crypto_instance[config_num];
model = crypto_get_model(knet_handle_crypto_cfg->crypto_model);
if (model < 0) {
log_err(knet_h, KNET_SUB_CRYPTO, "model %s not supported", knet_handle_crypto_cfg->crypto_model);
return -1;
}
if (crypto_modules_cmds[model].built_in == 0) {
log_err(knet_h, KNET_SUB_CRYPTO, "this version of libknet was built without %s support. Please contact your vendor or fix the build.", knet_handle_crypto_cfg->crypto_model);
return -1;
}
savederrno = pthread_rwlock_wrlock(&shlib_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_CRYPTO, "Unable to get write lock: %s",
strerror(savederrno));
return -1;
}
if (!crypto_modules_cmds[model].loaded) {
crypto_modules_cmds[model].ops = load_module (knet_h, "crypto", crypto_modules_cmds[model].model_name);
if (!crypto_modules_cmds[model].ops) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_CRYPTO, "Unable to load %s lib", crypto_modules_cmds[model].model_name);
goto out;
}
if (crypto_modules_cmds[model].ops->abi_ver != KNET_CRYPTO_MODEL_ABI) {
savederrno = EINVAL;
err = -1;
log_err(knet_h, KNET_SUB_CRYPTO,
"ABI mismatch loading module %s. knet ver: %d, module ver: %d",
crypto_modules_cmds[model].model_name, KNET_CRYPTO_MODEL_ABI,
crypto_modules_cmds[model].ops->abi_ver);
goto out;
}
crypto_modules_cmds[model].loaded = 1;
}
log_debug(knet_h, KNET_SUB_CRYPTO,
"Initizializing crypto module [%s/%s/%s]",
knet_handle_crypto_cfg->crypto_model,
knet_handle_crypto_cfg->crypto_cipher_type,
knet_handle_crypto_cfg->crypto_hash_type);
new = malloc(sizeof(struct crypto_instance));
if (!new) {
savederrno = ENOMEM;
err = -1;
log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto instance");
goto out;
}
/*
* if crypto_modules_cmds.ops->init fails, it is expected that
* it will clean everything by itself.
* crypto_modules_cmds.ops->fini is not invoked on error.
*/
new->model = model;
if (crypto_modules_cmds[model].ops->init(knet_h, new, knet_handle_crypto_cfg)) {
savederrno = errno;
err = -1;
goto out;
}
out:
if (!err) {
knet_h->crypto_instance[config_num] = new;
if (current) {
/*
* if we are replacing the current config, we need to enable it right away
*/
if (knet_h->crypto_in_use_config == config_num) {
crypto_use_config(knet_h, config_num);
}
if (crypto_modules_cmds[current->model].ops->fini != NULL) {
crypto_modules_cmds[current->model].ops->fini(knet_h, current);
}
free(current);
}
} else {
if (new) {
free(new);
}
}
pthread_rwlock_unlock(&shlib_rwlock);
errno = err ? savederrno : 0;
return err;
}
static void crypto_fini_config(
knet_handle_t knet_h,
uint8_t config_num)
{
if (knet_h->crypto_instance[config_num]) {
if (crypto_modules_cmds[knet_h->crypto_instance[config_num]->model].ops->fini != NULL) {
crypto_modules_cmds[knet_h->crypto_instance[config_num]->model].ops->fini(knet_h, knet_h->crypto_instance[config_num]);
}
free(knet_h->crypto_instance[config_num]);
knet_h->crypto_instance[config_num] = NULL;
}
}
void crypto_fini(
knet_handle_t knet_h,
uint8_t config_num)
{
int savederrno = 0, i;
savederrno = pthread_rwlock_wrlock(&shlib_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_CRYPTO, "Unable to get write lock: %s",
strerror(savederrno));
return;
}
if (config_num > KNET_MAX_CRYPTO_INSTANCES) {
for (i = 1; i <= KNET_MAX_CRYPTO_INSTANCES; i++) {
crypto_fini_config(knet_h, i);
}
} else {
crypto_fini_config(knet_h, config_num);
}
pthread_rwlock_unlock(&shlib_rwlock);
return;
}
+static int _knet_handle_crypto_set_config(knet_handle_t knet_h,
+ struct knet_handle_crypto_cfg *knet_handle_crypto_cfg,
+ uint8_t config_num,
+ uint8_t force)
+{
+ int savederrno = 0;
+ int err = 0;
+
+ if (!knet_h) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (!knet_handle_crypto_cfg) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if ((config_num < 1) || (config_num > KNET_MAX_CRYPTO_INSTANCES)) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ savederrno = get_global_wrlock(knet_h);
+ if (savederrno) {
+ log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
+ strerror(savederrno));
+ errno = savederrno;
+ return -1;
+ }
+
+ if ((knet_h->crypto_in_use_config == config_num) && (!force)) {
+ savederrno = EBUSY;
+ err = -1;
+ goto exit_unlock;
+ }
+
+ if ((!strncmp("none", knet_handle_crypto_cfg->crypto_model, 4)) ||
+ ((!strncmp("none", knet_handle_crypto_cfg->crypto_cipher_type, 4)) &&
+ (!strncmp("none", knet_handle_crypto_cfg->crypto_hash_type, 4)))) {
+ crypto_fini(knet_h, config_num);
+ log_debug(knet_h, KNET_SUB_CRYPTO, "crypto config %u is not enabled", config_num);
+ err = 0;
+ goto exit_unlock;
+ }
+
+ if (knet_handle_crypto_cfg->private_key_len < KNET_MIN_KEY_LEN) {
+ log_debug(knet_h, KNET_SUB_CRYPTO, "private key len too short for config %u (min %d): %u",
+ config_num, KNET_MIN_KEY_LEN, knet_handle_crypto_cfg->private_key_len);
+ savederrno = EINVAL;
+ err = -1;
+ goto exit_unlock;
+ }
+
+ if (knet_handle_crypto_cfg->private_key_len > KNET_MAX_KEY_LEN) {
+ log_debug(knet_h, KNET_SUB_CRYPTO, "private key len too long for config %u (max %d): %u",
+ config_num, KNET_MAX_KEY_LEN, knet_handle_crypto_cfg->private_key_len);
+ savederrno = EINVAL;
+ err = -1;
+ goto exit_unlock;
+ }
+
+ err = crypto_init(knet_h, knet_handle_crypto_cfg, config_num);
+
+ if (err) {
+ err = -2;
+ savederrno = errno;
+ }
+
+exit_unlock:
+ pthread_rwlock_unlock(&knet_h->global_rwlock);
+ errno = err ? savederrno : 0;
+ return err;
+}
+
+int knet_handle_crypto_set_config(knet_handle_t knet_h,
+ struct knet_handle_crypto_cfg *knet_handle_crypto_cfg,
+ uint8_t config_num)
+{
+ return _knet_handle_crypto_set_config(knet_h, knet_handle_crypto_cfg, config_num, 0);
+}
+
+int knet_handle_crypto_rx_clear_traffic(knet_handle_t knet_h,
+ uint8_t value)
+{
+ int savederrno = 0;
+
+ if (!knet_h) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (value > KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ savederrno = get_global_wrlock(knet_h);
+ if (savederrno) {
+ log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
+ strerror(savederrno));
+ errno = savederrno;
+ return -1;
+ }
+
+ knet_h->crypto_only = value;
+ if (knet_h->crypto_only) {
+ log_debug(knet_h, KNET_SUB_CRYPTO, "Only crypto traffic allowed for RX");
+ } else {
+ log_debug(knet_h, KNET_SUB_CRYPTO, "Both crypto and clear traffic allowed for RX");
+ }
+
+ pthread_rwlock_unlock(&knet_h->global_rwlock);
+ return 0;
+}
+
+int knet_handle_crypto_use_config(knet_handle_t knet_h,
+ uint8_t config_num)
+{
+ int savederrno = 0;
+ int err = 0;
+
+ if (!knet_h) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (config_num > KNET_MAX_CRYPTO_INSTANCES) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ savederrno = get_global_wrlock(knet_h);
+ if (savederrno) {
+ log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
+ strerror(savederrno));
+ errno = savederrno;
+ return -1;
+ }
+
+ err = crypto_use_config(knet_h, config_num);
+ savederrno = errno;
+
+ pthread_rwlock_unlock(&knet_h->global_rwlock);
+ errno = err ? savederrno : 0;
+ return err;
+}
+
int knet_get_crypto_list(struct knet_crypto_info *crypto_list, size_t *crypto_list_entries)
{
int err = 0;
int idx = 0;
int outidx = 0;
if (!crypto_list_entries) {
errno = EINVAL;
return -1;
}
while (crypto_modules_cmds[idx].model_name != NULL) {
if (crypto_modules_cmds[idx].built_in) {
if (crypto_list) {
crypto_list[outidx].name = crypto_modules_cmds[idx].model_name;
}
outidx++;
}
idx++;
}
*crypto_list_entries = outidx;
if (!err)
errno = 0;
return err;
}
+
+/*
+ * compatibility wrapper for 1.x releases
+ */
+int knet_handle_crypto(knet_handle_t knet_h, struct knet_handle_crypto_cfg *knet_handle_crypto_cfg)
+{
+ int err = 0;
+ uint8_t value;
+
+ if (!knet_h) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ value = knet_h->crypto_only;
+ /*
+ * configure crypto in slot 1
+ */
+ err = _knet_handle_crypto_set_config(knet_h, knet_handle_crypto_cfg, 1, 1);
+ if (err < 0) {
+ return err;
+ }
+
+ if ((!strncmp("none", knet_handle_crypto_cfg->crypto_model, 4)) ||
+ ((!strncmp("none", knet_handle_crypto_cfg->crypto_cipher_type, 4)) &&
+ (!strncmp("none", knet_handle_crypto_cfg->crypto_hash_type, 4)))) {
+ err = knet_handle_crypto_rx_clear_traffic(knet_h, KNET_CRYPTO_RX_ALLOW_CLEAR_TRAFFIC);
+ if (err < 0) {
+ return err;
+ }
+
+ /*
+ * start using clear traffic
+ */
+ err = knet_handle_crypto_use_config(knet_h, 0);
+ if (err < 0) {
+ err = knet_handle_crypto_rx_clear_traffic(knet_h, value);
+ if (err < 0) {
+ /*
+ * force attempt or things will go bad
+ */
+ knet_h->crypto_only = value;
+ }
+ }
+ return err;
+ } else {
+ err = knet_handle_crypto_rx_clear_traffic(knet_h, KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC);
+ if (err < 0) {
+ return err;
+ }
+
+ /*
+ * start using crypto traffic
+ */
+ err = knet_handle_crypto_use_config(knet_h, 1);
+ if (err < 0) {
+ err = knet_handle_crypto_rx_clear_traffic(knet_h, value);
+ if (err < 0) {
+ /*
+ * force attempt or things will go bad
+ */
+ knet_h->crypto_only = value;
+ }
+ }
+ return err;
+ }
+}
diff --git a/libknet/crypto.h b/libknet/crypto.h
index d04bc484..864ceba3 100644
--- a/libknet/crypto.h
+++ b/libknet/crypto.h
@@ -1,48 +1,39 @@
/*
* Copyright (C) 2012-2020 Red Hat, Inc. All rights reserved.
*
* Author: Fabio M. Di Nitto <fabbione@kronosnet.org>
*
* This software licensed under LGPL-2.0+
*/
#ifndef __KNET_CRYPTO_H__
#define __KNET_CRYPTO_H__
#include "internals.h"
int crypto_authenticate_and_decrypt (
knet_handle_t knet_h,
const unsigned char *buf_in,
const ssize_t buf_in_len,
unsigned char *buf_out,
ssize_t *buf_out_len);
int crypto_encrypt_and_sign (
knet_handle_t knet_h,
const unsigned char *buf_in,
const ssize_t buf_in_len,
unsigned char *buf_out,
ssize_t *buf_out_len);
int crypto_encrypt_and_signv (
knet_handle_t knet_h,
const struct iovec *iov_in,
int iovcnt_in,
unsigned char *buf_out,
ssize_t *buf_out_len);
-int crypto_use_config (
- knet_handle_t knet_h,
- uint8_t config_num);
-
-int crypto_init(
- knet_handle_t knet_h,
- struct knet_handle_crypto_cfg *knet_handle_crypto_cfg,
- uint8_t config_num);
-
void crypto_fini(
knet_handle_t knet_h,
uint8_t config_num);
#endif
diff --git a/libknet/handle.c b/libknet/handle.c
index 71658926..68641d6e 100644
--- a/libknet/handle.c
+++ b/libknet/handle.c
@@ -1,1923 +1,808 @@
/*
* Copyright (C) 2010-2020 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under LGPL-2.0+
*/
#include "config.h"
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <pthread.h>
-#include <sys/uio.h>
#include <math.h>
#include <sys/time.h>
#include <sys/resource.h>
#include "internals.h"
#include "crypto.h"
#include "links.h"
#include "compress.h"
#include "compat.h"
#include "common.h"
#include "threads_common.h"
#include "threads_heartbeat.h"
#include "threads_pmtud.h"
#include "threads_dsthandler.h"
#include "threads_rx.h"
#include "threads_tx.h"
#include "transports.h"
#include "transport_common.h"
#include "logging.h"
static pthread_mutex_t handle_config_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_rwlock_t shlib_rwlock;
static uint8_t shlib_wrlock_init = 0;
static uint32_t knet_ref = 0;
static int _init_shlib_tracker(knet_handle_t knet_h)
{
int savederrno = 0;
if (!shlib_wrlock_init) {
savederrno = pthread_rwlock_init(&shlib_rwlock, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize shared lib rwlock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
shlib_wrlock_init = 1;
}
return 0;
}
static void _fini_shlib_tracker(void)
{
if (knet_ref == 0) {
pthread_rwlock_destroy(&shlib_rwlock);
shlib_wrlock_init = 0;
}
return;
}
static int _init_locks(knet_handle_t knet_h)
{
int savederrno = 0;
savederrno = pthread_rwlock_init(&knet_h->global_rwlock, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize list rwlock: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_mutex_init(&knet_h->handle_stats_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize handle stats mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_mutex_init(&knet_h->threads_status_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize threads status mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_mutex_init(&knet_h->pmtud_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize pmtud mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_mutex_init(&knet_h->kmtu_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize kernel_mtu mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_cond_init(&knet_h->pmtud_cond, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize pmtud conditional mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_mutex_init(&knet_h->hb_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize hb_thread mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_mutex_init(&knet_h->tx_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize tx_thread mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_mutex_init(&knet_h->backoff_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize pong timeout backoff mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_mutex_init(&knet_h->tx_seq_num_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize tx_seq_num_mutex mutex: %s",
strerror(savederrno));
goto exit_fail;
}
return 0;
exit_fail:
errno = savederrno;
return -1;
}
static void _destroy_locks(knet_handle_t knet_h)
{
pthread_rwlock_destroy(&knet_h->global_rwlock);
pthread_mutex_destroy(&knet_h->pmtud_mutex);
pthread_mutex_destroy(&knet_h->kmtu_mutex);
pthread_cond_destroy(&knet_h->pmtud_cond);
pthread_mutex_destroy(&knet_h->hb_mutex);
pthread_mutex_destroy(&knet_h->tx_mutex);
pthread_mutex_destroy(&knet_h->backoff_mutex);
pthread_mutex_destroy(&knet_h->tx_seq_num_mutex);
pthread_mutex_destroy(&knet_h->threads_status_mutex);
pthread_mutex_destroy(&knet_h->handle_stats_mutex);
}
static int _init_socks(knet_handle_t knet_h)
{
int savederrno = 0;
if (_init_socketpair(knet_h, knet_h->hostsockfd)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize internal hostsockpair: %s",
strerror(savederrno));
goto exit_fail;
}
if (_init_socketpair(knet_h, knet_h->dstsockfd)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize internal dstsockpair: %s",
strerror(savederrno));
goto exit_fail;
}
return 0;
exit_fail:
errno = savederrno;
return -1;
}
static void _close_socks(knet_handle_t knet_h)
{
_close_socketpair(knet_h, knet_h->dstsockfd);
_close_socketpair(knet_h, knet_h->hostsockfd);
}
static int _init_buffers(knet_handle_t knet_h)
{
int savederrno = 0;
int i;
size_t bufsize;
for (i = 0; i < PCKT_FRAG_MAX; i++) {
bufsize = ceil((float)KNET_MAX_PACKET_SIZE / (i + 1)) + KNET_HEADER_ALL_SIZE;
knet_h->send_to_links_buf[i] = malloc(bufsize);
if (!knet_h->send_to_links_buf[i]) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory datafd to link buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->send_to_links_buf[i], 0, bufsize);
}
for (i = 0; i < PCKT_RX_BUFS; i++) {
knet_h->recv_from_links_buf[i] = malloc(KNET_DATABUFSIZE);
if (!knet_h->recv_from_links_buf[i]) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for link to datafd buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->recv_from_links_buf[i], 0, KNET_DATABUFSIZE);
}
knet_h->recv_from_sock_buf = malloc(KNET_DATABUFSIZE);
if (!knet_h->recv_from_sock_buf) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for app to datafd buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->recv_from_sock_buf, 0, KNET_DATABUFSIZE);
knet_h->pingbuf = malloc(KNET_HEADER_PING_SIZE);
if (!knet_h->pingbuf) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for hearbeat buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->pingbuf, 0, KNET_HEADER_PING_SIZE);
knet_h->pmtudbuf = malloc(KNET_PMTUD_SIZE_V6 + KNET_HEADER_ALL_SIZE);
if (!knet_h->pmtudbuf) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for pmtud buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->pmtudbuf, 0, KNET_PMTUD_SIZE_V6 + KNET_HEADER_ALL_SIZE);
for (i = 0; i < PCKT_FRAG_MAX; i++) {
bufsize = ceil((float)KNET_MAX_PACKET_SIZE / (i + 1)) + KNET_HEADER_ALL_SIZE + KNET_DATABUFSIZE_CRYPT_PAD;
knet_h->send_to_links_buf_crypt[i] = malloc(bufsize);
if (!knet_h->send_to_links_buf_crypt[i]) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for crypto datafd to link buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->send_to_links_buf_crypt[i], 0, bufsize);
}
knet_h->recv_from_links_buf_decrypt = malloc(KNET_DATABUFSIZE_CRYPT);
if (!knet_h->recv_from_links_buf_decrypt) {
savederrno = errno;
log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto link to datafd buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->recv_from_links_buf_decrypt, 0, KNET_DATABUFSIZE_CRYPT);
knet_h->recv_from_links_buf_crypt = malloc(KNET_DATABUFSIZE_CRYPT);
if (!knet_h->recv_from_links_buf_crypt) {
savederrno = errno;
log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto link to datafd buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->recv_from_links_buf_crypt, 0, KNET_DATABUFSIZE_CRYPT);
knet_h->pingbuf_crypt = malloc(KNET_DATABUFSIZE_CRYPT);
if (!knet_h->pingbuf_crypt) {
savederrno = errno;
log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto hearbeat buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->pingbuf_crypt, 0, KNET_DATABUFSIZE_CRYPT);
knet_h->pmtudbuf_crypt = malloc(KNET_DATABUFSIZE_CRYPT);
if (!knet_h->pmtudbuf_crypt) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for crypto pmtud buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->pmtudbuf_crypt, 0, KNET_DATABUFSIZE_CRYPT);
knet_h->recv_from_links_buf_decompress = malloc(KNET_DATABUFSIZE_COMPRESS);
if (!knet_h->recv_from_links_buf_decompress) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for decompress buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->recv_from_links_buf_decompress, 0, KNET_DATABUFSIZE_COMPRESS);
knet_h->send_to_links_buf_compress = malloc(KNET_DATABUFSIZE_COMPRESS);
if (!knet_h->send_to_links_buf_compress) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for compress buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->send_to_links_buf_compress, 0, KNET_DATABUFSIZE_COMPRESS);
memset(knet_h->knet_transport_fd_tracker, 0, sizeof(knet_h->knet_transport_fd_tracker));
for (i = 0; i < KNET_MAX_FDS; i++) {
knet_h->knet_transport_fd_tracker[i].transport = KNET_MAX_TRANSPORTS;
}
return 0;
exit_fail:
errno = savederrno;
return -1;
}
static void _destroy_buffers(knet_handle_t knet_h)
{
int i;
for (i = 0; i < PCKT_FRAG_MAX; i++) {
free(knet_h->send_to_links_buf[i]);
free(knet_h->send_to_links_buf_crypt[i]);
}
for (i = 0; i < PCKT_RX_BUFS; i++) {
free(knet_h->recv_from_links_buf[i]);
}
free(knet_h->recv_from_links_buf_decompress);
free(knet_h->send_to_links_buf_compress);
free(knet_h->recv_from_sock_buf);
free(knet_h->recv_from_links_buf_decrypt);
free(knet_h->recv_from_links_buf_crypt);
free(knet_h->pingbuf);
free(knet_h->pingbuf_crypt);
free(knet_h->pmtudbuf);
free(knet_h->pmtudbuf_crypt);
}
static int _init_epolls(knet_handle_t knet_h)
{
struct epoll_event ev;
int savederrno = 0;
/*
* even if the kernel does dynamic allocation with epoll_ctl
* we need to reserve one extra for host to host communication
*/
knet_h->send_to_links_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS + 1);
if (knet_h->send_to_links_epollfd < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to create epoll datafd to link fd: %s",
strerror(savederrno));
goto exit_fail;
}
knet_h->recv_from_links_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS);
if (knet_h->recv_from_links_epollfd < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to create epoll link to datafd fd: %s",
strerror(savederrno));
goto exit_fail;
}
knet_h->dst_link_handler_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS);
if (knet_h->dst_link_handler_epollfd < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to create epoll dst cache fd: %s",
strerror(savederrno));
goto exit_fail;
}
if (_fdset_cloexec(knet_h->send_to_links_epollfd)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on datafd to link epoll fd: %s",
strerror(savederrno));
goto exit_fail;
}
if (_fdset_cloexec(knet_h->recv_from_links_epollfd)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on link to datafd epoll fd: %s",
strerror(savederrno));
goto exit_fail;
}
if (_fdset_cloexec(knet_h->dst_link_handler_epollfd)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on dst cache epoll fd: %s",
strerror(savederrno));
goto exit_fail;
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = knet_h->hostsockfd[0];
if (epoll_ctl(knet_h->send_to_links_epollfd,
EPOLL_CTL_ADD, knet_h->hostsockfd[0], &ev)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to add hostsockfd[0] to epoll pool: %s",
strerror(savederrno));
goto exit_fail;
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = knet_h->dstsockfd[0];
if (epoll_ctl(knet_h->dst_link_handler_epollfd,
EPOLL_CTL_ADD, knet_h->dstsockfd[0], &ev)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to add dstsockfd[0] to epoll pool: %s",
strerror(savederrno));
goto exit_fail;
}
return 0;
exit_fail:
errno = savederrno;
return -1;
}
static void _close_epolls(knet_handle_t knet_h)
{
struct epoll_event ev;
int i;
memset(&ev, 0, sizeof(struct epoll_event));
for (i = 0; i < KNET_DATAFD_MAX; i++) {
if (knet_h->sockfd[i].in_use) {
epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->sockfd[i].sockfd[knet_h->sockfd[i].is_created], &ev);
if (knet_h->sockfd[i].sockfd[knet_h->sockfd[i].is_created]) {
_close_socketpair(knet_h, knet_h->sockfd[i].sockfd);
}
}
}
epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->hostsockfd[0], &ev);
epoll_ctl(knet_h->dst_link_handler_epollfd, EPOLL_CTL_DEL, knet_h->dstsockfd[0], &ev);
close(knet_h->send_to_links_epollfd);
close(knet_h->recv_from_links_epollfd);
close(knet_h->dst_link_handler_epollfd);
}
static int _start_threads(knet_handle_t knet_h)
{
int savederrno = 0;
pthread_attr_t attr;
set_thread_status(knet_h, KNET_THREAD_PMTUD, KNET_THREAD_REGISTERED);
savederrno = pthread_attr_init(&attr);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to init pthread attributes: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_attr_setstacksize(&attr, KNET_THREAD_STACK_SIZE);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to set stack size attribute: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_create(&knet_h->pmtud_link_handler_thread, &attr,
_handle_pmtud_link_thread, (void *) knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to start pmtud link thread: %s",
strerror(savederrno));
goto exit_fail;
}
set_thread_status(knet_h, KNET_THREAD_DST_LINK, KNET_THREAD_REGISTERED);
savederrno = pthread_create(&knet_h->dst_link_handler_thread, &attr,
_handle_dst_link_handler_thread, (void *) knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to start dst cache thread: %s",
strerror(savederrno));
goto exit_fail;
}
set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_REGISTERED);
savederrno = pthread_create(&knet_h->send_to_links_thread, &attr,
_handle_send_to_links_thread, (void *) knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to start datafd to link thread: %s",
strerror(savederrno));
goto exit_fail;
}
set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_REGISTERED);
savederrno = pthread_create(&knet_h->recv_from_links_thread, &attr,
_handle_recv_from_links_thread, (void *) knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to start link to datafd thread: %s",
strerror(savederrno));
goto exit_fail;
}
set_thread_status(knet_h, KNET_THREAD_HB, KNET_THREAD_REGISTERED);
savederrno = pthread_create(&knet_h->heartbt_thread, &attr,
_handle_heartbt_thread, (void *) knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to start heartbeat thread: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_attr_destroy(&attr);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to destroy pthread attributes: %s",
strerror(savederrno));
/*
* Do not return error code. Error is not critical.
*/
}
return 0;
exit_fail:
errno = savederrno;
return -1;
}
static void _stop_threads(knet_handle_t knet_h)
{
void *retval;
wait_all_threads_status(knet_h, KNET_THREAD_STOPPED);
if (knet_h->heartbt_thread) {
pthread_cancel(knet_h->heartbt_thread);
pthread_join(knet_h->heartbt_thread, &retval);
}
if (knet_h->send_to_links_thread) {
pthread_cancel(knet_h->send_to_links_thread);
pthread_join(knet_h->send_to_links_thread, &retval);
}
if (knet_h->recv_from_links_thread) {
pthread_cancel(knet_h->recv_from_links_thread);
pthread_join(knet_h->recv_from_links_thread, &retval);
}
if (knet_h->dst_link_handler_thread) {
pthread_cancel(knet_h->dst_link_handler_thread);
pthread_join(knet_h->dst_link_handler_thread, &retval);
}
if (knet_h->pmtud_link_handler_thread) {
pthread_cancel(knet_h->pmtud_link_handler_thread);
pthread_join(knet_h->pmtud_link_handler_thread, &retval);
}
}
knet_handle_t knet_handle_new_ex(knet_node_id_t host_id,
int log_fd,
uint8_t default_log_level,
uint64_t flags)
{
knet_handle_t knet_h;
int savederrno = 0;
struct rlimit cur;
if (getrlimit(RLIMIT_NOFILE, &cur) < 0) {
return NULL;
}
if ((log_fd < 0) || ((unsigned int)log_fd >= cur.rlim_max)) {
errno = EINVAL;
return NULL;
}
/*
* validate incoming request
*/
if ((log_fd) && (default_log_level > KNET_LOG_DEBUG)) {
errno = EINVAL;
return NULL;
}
if (flags > KNET_HANDLE_FLAG_PRIVILEGED * 2 - 1) {
errno = EINVAL;
return NULL;
}
/*
* allocate handle
*/
knet_h = malloc(sizeof(struct knet_handle));
if (!knet_h) {
errno = ENOMEM;
return NULL;
}
memset(knet_h, 0, sizeof(struct knet_handle));
/*
* setting up some handle data so that we can use logging
* also when initializing the library global locks
* and trackers
*/
knet_h->flags = flags;
/*
* copy config in place
*/
knet_h->host_id = host_id;
knet_h->logfd = log_fd;
if (knet_h->logfd > 0) {
memset(&knet_h->log_levels, default_log_level, KNET_MAX_SUBSYSTEMS);
}
/*
* set pmtud default timers
*/
knet_h->pmtud_interval = KNET_PMTUD_DEFAULT_INTERVAL;
/*
* set transports reconnect default timers
*/
knet_h->reconnect_int = KNET_TRANSPORT_DEFAULT_RECONNECT_INTERVAL;
/*
* Set 'min' stats to the maximum value so the
* first value we get is always less
*/
knet_h->stats.tx_compress_time_min = UINT64_MAX;
knet_h->stats.rx_compress_time_min = UINT64_MAX;
knet_h->stats.tx_crypt_time_min = UINT64_MAX;
knet_h->stats.rx_crypt_time_min = UINT64_MAX;
/*
* init global shlib tracker
*/
savederrno = pthread_mutex_lock(&handle_config_mutex);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get handle mutex lock: %s",
strerror(savederrno));
free(knet_h);
knet_h = NULL;
errno = savederrno;
return NULL;
}
knet_ref++;
if (_init_shlib_tracker(knet_h) < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to init handle tracker: %s",
strerror(savederrno));
errno = savederrno;
pthread_mutex_unlock(&handle_config_mutex);
goto exit_fail;
}
pthread_mutex_unlock(&handle_config_mutex);
/*
* init main locking structures
*/
if (_init_locks(knet_h)) {
savederrno = errno;
goto exit_fail;
}
/*
* init sockets
*/
if (_init_socks(knet_h)) {
savederrno = errno;
goto exit_fail;
}
/*
* allocate packet buffers
*/
if (_init_buffers(knet_h)) {
savederrno = errno;
goto exit_fail;
}
if (compress_init(knet_h)) {
savederrno = errno;
goto exit_fail;
}
/*
* create epoll fds
*/
if (_init_epolls(knet_h)) {
savederrno = errno;
goto exit_fail;
}
/*
* start transports
*/
if (start_all_transports(knet_h)) {
savederrno = errno;
goto exit_fail;
}
/*
* start internal threads
*/
if (_start_threads(knet_h)) {
savederrno = errno;
goto exit_fail;
}
wait_all_threads_status(knet_h, KNET_THREAD_STARTED);
errno = 0;
return knet_h;
exit_fail:
knet_handle_free(knet_h);
errno = savederrno;
return NULL;
}
knet_handle_t knet_handle_new(knet_node_id_t host_id,
int log_fd,
uint8_t default_log_level)
{
return knet_handle_new_ex(host_id, log_fd, default_log_level, KNET_HANDLE_FLAG_PRIVILEGED);
}
int knet_handle_free(knet_handle_t knet_h)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (knet_h->host_head != NULL) {
savederrno = EBUSY;
log_err(knet_h, KNET_SUB_HANDLE,
"Unable to free handle: host(s) or listener(s) are still active: %s",
strerror(savederrno));
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return -1;
}
knet_h->fini_in_progress = 1;
pthread_rwlock_unlock(&knet_h->global_rwlock);
_stop_threads(knet_h);
stop_all_transports(knet_h);
_close_epolls(knet_h);
_destroy_buffers(knet_h);
_close_socks(knet_h);
crypto_fini(knet_h, KNET_MAX_CRYPTO_INSTANCES + 1); /* values above MAX_CRYPTO will release all crypto resources */
compress_fini(knet_h, 1);
_destroy_locks(knet_h);
free(knet_h);
knet_h = NULL;
(void)pthread_mutex_lock(&handle_config_mutex);
knet_ref--;
_fini_shlib_tracker();
pthread_mutex_unlock(&handle_config_mutex);
errno = 0;
return 0;
}
-
-int knet_handle_enable_sock_notify(knet_handle_t knet_h,
- void *sock_notify_fn_private_data,
- void (*sock_notify_fn) (
- void *private_data,
- int datafd,
- int8_t channel,
- uint8_t tx_rx,
- int error,
- int errorno))
-{
- int savederrno = 0;
-
- if (!knet_h) {
- errno = EINVAL;
- return -1;
- }
-
- if (!sock_notify_fn) {
- errno = EINVAL;
- return -1;
- }
-
- savederrno = get_global_wrlock(knet_h);
- if (savederrno) {
- log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
- strerror(savederrno));
- errno = savederrno;
- return -1;
- }
-
- knet_h->sock_notify_fn_private_data = sock_notify_fn_private_data;
- knet_h->sock_notify_fn = sock_notify_fn;
- log_debug(knet_h, KNET_SUB_HANDLE, "sock_notify_fn enabled");
-
- pthread_rwlock_unlock(&knet_h->global_rwlock);
-
- return 0;
-}
-
-int knet_handle_add_datafd(knet_handle_t knet_h, int *datafd, int8_t *channel)
-{
- int err = 0, savederrno = 0;
- int i;
- struct epoll_event ev;
-
- if (!knet_h) {
- errno = EINVAL;
- return -1;
- }
-
- if (datafd == NULL) {
- errno = EINVAL;
- return -1;
- }
-
- if (channel == NULL) {
- errno = EINVAL;
- return -1;
- }
-
- if (*channel >= KNET_DATAFD_MAX) {
- errno = EINVAL;
- return -1;
- }
-
- savederrno = get_global_wrlock(knet_h);
- if (savederrno) {
- log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
- strerror(savederrno));
- errno = savederrno;
- return -1;
- }
-
- if (!knet_h->sock_notify_fn) {
- log_err(knet_h, KNET_SUB_HANDLE, "Adding datafd requires sock notify callback enabled!");
- savederrno = EINVAL;
- err = -1;
- goto out_unlock;
- }
-
- if (*datafd > 0) {
- for (i = 0; i < KNET_DATAFD_MAX; i++) {
- if ((knet_h->sockfd[i].in_use) && (knet_h->sockfd[i].sockfd[0] == *datafd)) {
- log_err(knet_h, KNET_SUB_HANDLE, "requested datafd: %d already exist in index: %d", *datafd, i);
- savederrno = EEXIST;
- err = -1;
- goto out_unlock;
- }
- }
- }
-
- /*
- * auto allocate a channel
- */
- if (*channel < 0) {
- for (i = 0; i < KNET_DATAFD_MAX; i++) {
- if (!knet_h->sockfd[i].in_use) {
- *channel = i;
- break;
- }
- }
- if (*channel < 0) {
- savederrno = EBUSY;
- err = -1;
- goto out_unlock;
- }
- } else {
- if (knet_h->sockfd[*channel].in_use) {
- savederrno = EBUSY;
- err = -1;
- goto out_unlock;
- }
- }
-
- knet_h->sockfd[*channel].is_created = 0;
- knet_h->sockfd[*channel].is_socket = 0;
- knet_h->sockfd[*channel].has_error = 0;
-
- if (*datafd > 0) {
- int sockopt;
- socklen_t sockoptlen = sizeof(sockopt);
-
- if (_fdset_cloexec(*datafd)) {
- savederrno = errno;
- err = -1;
- log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on datafd: %s",
- strerror(savederrno));
- goto out_unlock;
- }
-
- if (_fdset_nonblock(*datafd)) {
- savederrno = errno;
- err = -1;
- log_err(knet_h, KNET_SUB_HANDLE, "Unable to set NONBLOCK on datafd: %s",
- strerror(savederrno));
- goto out_unlock;
- }
-
- knet_h->sockfd[*channel].sockfd[0] = *datafd;
- knet_h->sockfd[*channel].sockfd[1] = 0;
-
- if (!getsockopt(knet_h->sockfd[*channel].sockfd[0], SOL_SOCKET, SO_TYPE, &sockopt, &sockoptlen)) {
- knet_h->sockfd[*channel].is_socket = 1;
- }
- } else {
- if (_init_socketpair(knet_h, knet_h->sockfd[*channel].sockfd)) {
- savederrno = errno;
- err = -1;
- goto out_unlock;
- }
-
- knet_h->sockfd[*channel].is_created = 1;
- knet_h->sockfd[*channel].is_socket = 1;
- *datafd = knet_h->sockfd[*channel].sockfd[0];
- }
-
- memset(&ev, 0, sizeof(struct epoll_event));
- ev.events = EPOLLIN;
- ev.data.fd = knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created];
-
- if (epoll_ctl(knet_h->send_to_links_epollfd,
- EPOLL_CTL_ADD, knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created], &ev)) {
- savederrno = errno;
- err = -1;
- log_err(knet_h, KNET_SUB_HANDLE, "Unable to add datafd %d to linkfd epoll pool: %s",
- knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created], strerror(savederrno));
- if (knet_h->sockfd[*channel].is_created) {
- _close_socketpair(knet_h, knet_h->sockfd[*channel].sockfd);
- }
- goto out_unlock;
- }
-
- knet_h->sockfd[*channel].in_use = 1;
-
-out_unlock:
- pthread_rwlock_unlock(&knet_h->global_rwlock);
- errno = err ? savederrno : 0;
- return err;
-}
-
-int knet_handle_remove_datafd(knet_handle_t knet_h, int datafd)
-{
- int err = 0, savederrno = 0;
- int8_t channel = -1;
- int i;
- struct epoll_event ev;
-
- if (!knet_h) {
- errno = EINVAL;
- return -1;
- }
-
- if (datafd <= 0) {
- errno = EINVAL;
- return -1;
- }
-
- savederrno = get_global_wrlock(knet_h);
- if (savederrno) {
- log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
- strerror(savederrno));
- errno = savederrno;
- return -1;
- }
-
- for (i = 0; i < KNET_DATAFD_MAX; i++) {
- if ((knet_h->sockfd[i].in_use) &&
- (knet_h->sockfd[i].sockfd[0] == datafd)) {
- channel = i;
- break;
- }
- }
-
- if (channel < 0) {
- savederrno = EINVAL;
- err = -1;
- goto out_unlock;
- }
-
- if (!knet_h->sockfd[channel].has_error) {
- memset(&ev, 0, sizeof(struct epoll_event));
-
- if (epoll_ctl(knet_h->send_to_links_epollfd,
- EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) {
- savederrno = errno;
- err = -1;
- log_err(knet_h, KNET_SUB_HANDLE, "Unable to del datafd %d from linkfd epoll pool: %s",
- knet_h->sockfd[channel].sockfd[0], strerror(savederrno));
- goto out_unlock;
- }
- }
-
- if (knet_h->sockfd[channel].is_created) {
- _close_socketpair(knet_h, knet_h->sockfd[channel].sockfd);
- }
-
- memset(&knet_h->sockfd[channel], 0, sizeof(struct knet_sock));
-
-out_unlock:
- pthread_rwlock_unlock(&knet_h->global_rwlock);
- errno = err ? savederrno : 0;
- return err;
-}
-
-int knet_handle_get_datafd(knet_handle_t knet_h, const int8_t channel, int *datafd)
-{
- int err = 0, savederrno = 0;
-
- if (!knet_h) {
- errno = EINVAL;
- return -1;
- }
-
- if ((channel < 0) || (channel >= KNET_DATAFD_MAX)) {
- errno = EINVAL;
- return -1;
- }
-
- if (datafd == NULL) {
- errno = EINVAL;
- return -1;
- }
-
- savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
- if (savederrno) {
- log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
- strerror(savederrno));
- errno = savederrno;
- return -1;
- }
-
- if (!knet_h->sockfd[channel].in_use) {
- savederrno = EINVAL;
- err = -1;
- goto out_unlock;
- }
-
- *datafd = knet_h->sockfd[channel].sockfd[0];
-
-out_unlock:
- pthread_rwlock_unlock(&knet_h->global_rwlock);
- errno = err ? savederrno : 0;
- return err;
-}
-
-int knet_handle_get_channel(knet_handle_t knet_h, const int datafd, int8_t *channel)
-{
- int err = 0, savederrno = 0;
- int i;
-
- if (!knet_h) {
- errno = EINVAL;
- return -1;
- }
-
- if (datafd <= 0) {
- errno = EINVAL;
- return -1;
- }
-
- if (channel == NULL) {
- errno = EINVAL;
- return -1;
- }
-
- savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
- if (savederrno) {
- log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
- strerror(savederrno));
- errno = savederrno;
- return -1;
- }
-
- *channel = -1;
-
- for (i = 0; i < KNET_DATAFD_MAX; i++) {
- if ((knet_h->sockfd[i].in_use) &&
- (knet_h->sockfd[i].sockfd[0] == datafd)) {
- *channel = i;
- break;
- }
- }
-
- if (*channel < 0) {
- savederrno = EINVAL;
- err = -1;
- goto out_unlock;
- }
-
-out_unlock:
- pthread_rwlock_unlock(&knet_h->global_rwlock);
- errno = err ? savederrno : 0;
- return err;
-}
-
-int knet_handle_enable_filter(knet_handle_t knet_h,
- void *dst_host_filter_fn_private_data,
- int (*dst_host_filter_fn) (
- void *private_data,
- const unsigned char *outdata,
- ssize_t outdata_len,
- uint8_t tx_rx,
- knet_node_id_t this_host_id,
- knet_node_id_t src_node_id,
- int8_t *channel,
- knet_node_id_t *dst_host_ids,
- size_t *dst_host_ids_entries))
-{
- int savederrno = 0;
-
- if (!knet_h) {
- errno = EINVAL;
- return -1;
- }
-
- savederrno = get_global_wrlock(knet_h);
- if (savederrno) {
- log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
- strerror(savederrno));
- errno = savederrno;
- return -1;
- }
-
- knet_h->dst_host_filter_fn_private_data = dst_host_filter_fn_private_data;
- knet_h->dst_host_filter_fn = dst_host_filter_fn;
- if (knet_h->dst_host_filter_fn) {
- log_debug(knet_h, KNET_SUB_HANDLE, "dst_host_filter_fn enabled");
- } else {
- log_debug(knet_h, KNET_SUB_HANDLE, "dst_host_filter_fn disabled");
- }
-
- pthread_rwlock_unlock(&knet_h->global_rwlock);
-
- errno = 0;
- return 0;
-}
-
-int knet_handle_setfwd(knet_handle_t knet_h, unsigned int enabled)
-{
- int savederrno = 0;
-
- if (!knet_h) {
- errno = EINVAL;
- return -1;
- }
-
- if (enabled > 1) {
- errno = EINVAL;
- return -1;
- }
-
- savederrno = get_global_wrlock(knet_h);
- if (savederrno) {
- log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
- strerror(savederrno));
- errno = savederrno;
- return -1;
- }
-
- if (enabled) {
- knet_h->enabled = enabled;
- log_debug(knet_h, KNET_SUB_HANDLE, "Data forwarding is enabled");
- } else {
- /*
- * notify TX and RX threads to flush the queues
- */
- if (set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSH) < 0) {
- log_debug(knet_h, KNET_SUB_HANDLE, "Unable to request queue flushing for TX thread");
- }
- if (set_thread_flush_queue(knet_h, KNET_THREAD_RX, KNET_THREAD_QUEUE_FLUSH) < 0) {
- log_debug(knet_h, KNET_SUB_HANDLE, "Unable to request queue flushing for RX thread");
- }
- }
-
- pthread_rwlock_unlock(&knet_h->global_rwlock);
-
- /*
- * when disabling data forward, we need to give time to TX and RX
- * to flush the queues.
- *
- * the TX thread is the main leader here. When there is no more
- * data in the TX queue, we will also close traffic for RX.
- */
- if (!enabled) {
- /*
- * this usleep might be unnecessary, but wait_all_threads_flush_queue
- * adds extra locking delay.
- *
- * allow all threads to run free without extra locking interference
- * and then we switch to a more active wait in case the scheduler
- * has decided to delay one thread or another
- */
- usleep(KNET_THREADS_TIMERES * 2);
- wait_all_threads_flush_queue(knet_h);
-
- /*
- * all threads have done flushing the queue, we can stop data forwarding
- */
- savederrno = get_global_wrlock(knet_h);
- if (savederrno) {
- log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
- strerror(savederrno));
- errno = savederrno;
- return -1;
- }
- knet_h->enabled = enabled;
- log_debug(knet_h, KNET_SUB_HANDLE, "Data forwarding is disabled");
- pthread_rwlock_unlock(&knet_h->global_rwlock);
- }
-
- errno = 0;
- return 0;
-}
-
-int knet_handle_enable_access_lists(knet_handle_t knet_h, unsigned int enabled)
-{
- int savederrno = 0;
-
- if (!knet_h) {
- errno = EINVAL;
- return -1;
- }
-
- if (enabled > 1) {
- errno = EINVAL;
- return -1;
- }
-
- savederrno = get_global_wrlock(knet_h);
- if (savederrno) {
- log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
- strerror(savederrno));
- errno = savederrno;
- return -1;
- }
-
- knet_h->use_access_lists = enabled;
-
- if (enabled) {
- log_debug(knet_h, KNET_SUB_HANDLE, "Links access lists are enabled");
- } else {
- log_debug(knet_h, KNET_SUB_HANDLE, "Links access lists are disabled");
- }
-
- pthread_rwlock_unlock(&knet_h->global_rwlock);
-
- errno = 0;
- return 0;
-}
-
-int knet_handle_pmtud_getfreq(knet_handle_t knet_h, unsigned int *interval)
-{
- int savederrno = 0;
-
- if (!knet_h) {
- errno = EINVAL;
- return -1;
- }
-
- if (!interval) {
- errno = EINVAL;
- return -1;
- }
-
- savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
- if (savederrno) {
- log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
- strerror(savederrno));
- errno = savederrno;
- return -1;
- }
-
- *interval = knet_h->pmtud_interval;
-
- pthread_rwlock_unlock(&knet_h->global_rwlock);
-
- errno = 0;
- return 0;
-}
-
-int knet_handle_pmtud_setfreq(knet_handle_t knet_h, unsigned int interval)
-{
- int savederrno = 0;
-
- if (!knet_h) {
- errno = EINVAL;
- return -1;
- }
-
- if ((!interval) || (interval > 86400)) {
- errno = EINVAL;
- return -1;
- }
-
- savederrno = get_global_wrlock(knet_h);
- if (savederrno) {
- log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
- strerror(savederrno));
- errno = savederrno;
- return -1;
- }
-
- knet_h->pmtud_interval = interval;
- log_debug(knet_h, KNET_SUB_HANDLE, "PMTUd interval set to: %u seconds", interval);
-
- pthread_rwlock_unlock(&knet_h->global_rwlock);
-
- errno = 0;
- return 0;
-}
-
-int knet_handle_enable_pmtud_notify(knet_handle_t knet_h,
- void *pmtud_notify_fn_private_data,
- void (*pmtud_notify_fn) (
- void *private_data,
- unsigned int data_mtu))
-{
- int savederrno = 0;
-
- if (!knet_h) {
- errno = EINVAL;
- return -1;
- }
-
- savederrno = get_global_wrlock(knet_h);
- if (savederrno) {
- log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
- strerror(savederrno));
- errno = savederrno;
- return -1;
- }
-
- knet_h->pmtud_notify_fn_private_data = pmtud_notify_fn_private_data;
- knet_h->pmtud_notify_fn = pmtud_notify_fn;
- if (knet_h->pmtud_notify_fn) {
- log_debug(knet_h, KNET_SUB_HANDLE, "pmtud_notify_fn enabled");
- } else {
- log_debug(knet_h, KNET_SUB_HANDLE, "pmtud_notify_fn disabled");
- }
-
- pthread_rwlock_unlock(&knet_h->global_rwlock);
-
- errno = 0;
- return 0;
-}
-
-int knet_handle_pmtud_set(knet_handle_t knet_h,
- unsigned int iface_mtu)
-{
- int savederrno = 0;
-
- if (!knet_h) {
- errno = EINVAL;
- return -1;
- }
-
- if (iface_mtu > KNET_PMTUD_SIZE_V4) {
- errno = EINVAL;
- return -1;
- }
-
- savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
- if (savederrno) {
- log_err(knet_h, KNET_SUB_PMTUD, "Unable to get read lock: %s",
- strerror(savederrno));
- errno = savederrno;
- return -1;
- }
-
- log_info(knet_h, KNET_SUB_PMTUD, "MTU manually set to: %u", iface_mtu);
-
- knet_h->manual_mtu = iface_mtu;
-
- force_pmtud_run(knet_h, KNET_SUB_PMTUD, 0);
-
- pthread_rwlock_unlock(&knet_h->global_rwlock);
-
- errno = 0;
- return 0;
-}
-
-int knet_handle_pmtud_get(knet_handle_t knet_h,
- unsigned int *data_mtu)
-{
- int savederrno = 0;
-
- if (!knet_h) {
- errno = EINVAL;
- return -1;
- }
-
- if (!data_mtu) {
- errno = EINVAL;
- return -1;
- }
-
- savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
- if (savederrno) {
- log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
- strerror(savederrno));
- errno = savederrno;
- return -1;
- }
-
- *data_mtu = knet_h->data_mtu;
-
- pthread_rwlock_unlock(&knet_h->global_rwlock);
-
- errno = 0;
- return 0;
-}
-
-static int _knet_handle_crypto_set_config(knet_handle_t knet_h,
- struct knet_handle_crypto_cfg *knet_handle_crypto_cfg,
- uint8_t config_num,
- uint8_t force)
-{
- int savederrno = 0;
- int err = 0;
-
- if (!knet_h) {
- errno = EINVAL;
- return -1;
- }
-
- if (!knet_handle_crypto_cfg) {
- errno = EINVAL;
- return -1;
- }
-
- if ((config_num < 1) || (config_num > KNET_MAX_CRYPTO_INSTANCES)) {
- errno = EINVAL;
- return -1;
- }
-
- savederrno = get_global_wrlock(knet_h);
- if (savederrno) {
- log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
- strerror(savederrno));
- errno = savederrno;
- return -1;
- }
-
- if ((knet_h->crypto_in_use_config == config_num) && (!force)) {
- savederrno = EBUSY;
- err = -1;
- goto exit_unlock;
- }
-
- if ((!strncmp("none", knet_handle_crypto_cfg->crypto_model, 4)) ||
- ((!strncmp("none", knet_handle_crypto_cfg->crypto_cipher_type, 4)) &&
- (!strncmp("none", knet_handle_crypto_cfg->crypto_hash_type, 4)))) {
- crypto_fini(knet_h, config_num);
- log_debug(knet_h, KNET_SUB_CRYPTO, "crypto config %u is not enabled", config_num);
- err = 0;
- goto exit_unlock;
- }
-
- if (knet_handle_crypto_cfg->private_key_len < KNET_MIN_KEY_LEN) {
- log_debug(knet_h, KNET_SUB_CRYPTO, "private key len too short for config %u (min %d): %u",
- config_num, KNET_MIN_KEY_LEN, knet_handle_crypto_cfg->private_key_len);
- savederrno = EINVAL;
- err = -1;
- goto exit_unlock;
- }
-
- if (knet_handle_crypto_cfg->private_key_len > KNET_MAX_KEY_LEN) {
- log_debug(knet_h, KNET_SUB_CRYPTO, "private key len too long for config %u (max %d): %u",
- config_num, KNET_MAX_KEY_LEN, knet_handle_crypto_cfg->private_key_len);
- savederrno = EINVAL;
- err = -1;
- goto exit_unlock;
- }
-
- err = crypto_init(knet_h, knet_handle_crypto_cfg, config_num);
-
- if (err) {
- err = -2;
- savederrno = errno;
- }
-
-exit_unlock:
- pthread_rwlock_unlock(&knet_h->global_rwlock);
- errno = err ? savederrno : 0;
- return err;
-}
-
-int knet_handle_crypto_set_config(knet_handle_t knet_h,
- struct knet_handle_crypto_cfg *knet_handle_crypto_cfg,
- uint8_t config_num)
-{
- return _knet_handle_crypto_set_config(knet_h, knet_handle_crypto_cfg, config_num, 0);
-}
-
-int knet_handle_crypto_rx_clear_traffic(knet_handle_t knet_h,
- uint8_t value)
-{
- int savederrno = 0;
-
- if (!knet_h) {
- errno = EINVAL;
- return -1;
- }
-
- if (value > KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC) {
- errno = EINVAL;
- return -1;
- }
-
- savederrno = get_global_wrlock(knet_h);
- if (savederrno) {
- log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
- strerror(savederrno));
- errno = savederrno;
- return -1;
- }
-
- knet_h->crypto_only = value;
- if (knet_h->crypto_only) {
- log_debug(knet_h, KNET_SUB_CRYPTO, "Only crypto traffic allowed for RX");
- } else {
- log_debug(knet_h, KNET_SUB_CRYPTO, "Both crypto and clear traffic allowed for RX");
- }
-
- pthread_rwlock_unlock(&knet_h->global_rwlock);
- return 0;
-}
-
-int knet_handle_crypto_use_config(knet_handle_t knet_h,
- uint8_t config_num)
-{
- int savederrno = 0;
- int err = 0;
-
- if (!knet_h) {
- errno = EINVAL;
- return -1;
- }
-
- if (config_num > KNET_MAX_CRYPTO_INSTANCES) {
- errno = EINVAL;
- return -1;
- }
-
- savederrno = get_global_wrlock(knet_h);
- if (savederrno) {
- log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
- strerror(savederrno));
- errno = savederrno;
- return -1;
- }
-
- err = crypto_use_config(knet_h, config_num);
- savederrno = errno;
-
- pthread_rwlock_unlock(&knet_h->global_rwlock);
- errno = err ? savederrno : 0;
- return err;
-}
-
-/*
- * compatibility wrapper for 1.x releases
- */
-int knet_handle_crypto(knet_handle_t knet_h, struct knet_handle_crypto_cfg *knet_handle_crypto_cfg)
-{
- int err = 0;
- uint8_t value;
-
- if (!knet_h) {
- errno = EINVAL;
- return -1;
- }
-
- value = knet_h->crypto_only;
- /*
- * configure crypto in slot 1
- */
- err = _knet_handle_crypto_set_config(knet_h, knet_handle_crypto_cfg, 1, 1);
- if (err < 0) {
- return err;
- }
-
- if ((!strncmp("none", knet_handle_crypto_cfg->crypto_model, 4)) ||
- ((!strncmp("none", knet_handle_crypto_cfg->crypto_cipher_type, 4)) &&
- (!strncmp("none", knet_handle_crypto_cfg->crypto_hash_type, 4)))) {
- err = knet_handle_crypto_rx_clear_traffic(knet_h, KNET_CRYPTO_RX_ALLOW_CLEAR_TRAFFIC);
- if (err < 0) {
- return err;
- }
-
- /*
- * start using clear traffic
- */
- err = knet_handle_crypto_use_config(knet_h, 0);
- if (err < 0) {
- err = knet_handle_crypto_rx_clear_traffic(knet_h, value);
- if (err < 0) {
- /*
- * force attempt or things will go bad
- */
- knet_h->crypto_only = value;
- }
- }
- return err;
- } else {
- err = knet_handle_crypto_rx_clear_traffic(knet_h, KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC);
- if (err < 0) {
- return err;
- }
-
- /*
- * start using crypto traffic
- */
- err = knet_handle_crypto_use_config(knet_h, 1);
- if (err < 0) {
- err = knet_handle_crypto_rx_clear_traffic(knet_h, value);
- if (err < 0) {
- /*
- * force attempt or things will go bad
- */
- knet_h->crypto_only = value;
- }
- }
- return err;
- }
-}
-
-int knet_handle_compress(knet_handle_t knet_h, struct knet_handle_compress_cfg *knet_handle_compress_cfg)
-{
- int savederrno = 0;
- int err = 0;
-
- if (!knet_h) {
- errno = EINVAL;
- return -1;
- }
-
- if (!knet_handle_compress_cfg) {
- errno = EINVAL;
- return -1;
- }
-
- savederrno = get_global_wrlock(knet_h);
- if (savederrno) {
- log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
- strerror(savederrno));
- errno = savederrno;
- return -1;
- }
-
- compress_fini(knet_h, 0);
- err = compress_cfg(knet_h, knet_handle_compress_cfg);
- savederrno = errno;
-
- pthread_rwlock_unlock(&knet_h->global_rwlock);
- errno = err ? savederrno : 0;
- return err;
-}
-
-ssize_t knet_recv(knet_handle_t knet_h, char *buff, const size_t buff_len, const int8_t channel)
-{
- int savederrno = 0;
- ssize_t err = 0;
- struct iovec iov_in;
-
- if (!knet_h) {
- errno = EINVAL;
- return -1;
- }
-
- if (buff == NULL) {
- errno = EINVAL;
- return -1;
- }
-
- if (buff_len <= 0) {
- errno = EINVAL;
- return -1;
- }
-
- if (buff_len > KNET_MAX_PACKET_SIZE) {
- errno = EINVAL;
- return -1;
- }
-
- if (channel < 0) {
- errno = EINVAL;
- return -1;
- }
-
- if (channel >= KNET_DATAFD_MAX) {
- errno = EINVAL;
- return -1;
- }
-
- savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
- if (savederrno) {
- log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
- strerror(savederrno));
- errno = savederrno;
- return -1;
- }
-
- if (!knet_h->sockfd[channel].in_use) {
- savederrno = EINVAL;
- err = -1;
- goto out_unlock;
- }
-
- memset(&iov_in, 0, sizeof(iov_in));
- iov_in.iov_base = (void *)buff;
- iov_in.iov_len = buff_len;
-
- err = readv(knet_h->sockfd[channel].sockfd[0], &iov_in, 1);
- savederrno = errno;
-
-out_unlock:
- pthread_rwlock_unlock(&knet_h->global_rwlock);
- errno = err ? savederrno : 0;
- return err;
-}
-
-ssize_t knet_send(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel)
-{
- int savederrno = 0;
- ssize_t err = 0;
- struct iovec iov_out[1];
-
- if (!knet_h) {
- errno = EINVAL;
- return -1;
- }
-
- if (buff == NULL) {
- errno = EINVAL;
- return -1;
- }
-
- if (buff_len <= 0) {
- errno = EINVAL;
- return -1;
- }
-
- if (buff_len > KNET_MAX_PACKET_SIZE) {
- errno = EINVAL;
- return -1;
- }
-
- if (channel < 0) {
- errno = EINVAL;
- return -1;
- }
-
- if (channel >= KNET_DATAFD_MAX) {
- errno = EINVAL;
- return -1;
- }
-
- savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
- if (savederrno) {
- log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
- strerror(savederrno));
- errno = savederrno;
- return -1;
- }
-
- if (!knet_h->sockfd[channel].in_use) {
- savederrno = EINVAL;
- err = -1;
- goto out_unlock;
- }
-
- memset(iov_out, 0, sizeof(iov_out));
-
- iov_out[0].iov_base = (void *)buff;
- iov_out[0].iov_len = buff_len;
-
- err = writev(knet_h->sockfd[channel].sockfd[0], iov_out, 1);
- savederrno = errno;
-
-out_unlock:
- pthread_rwlock_unlock(&knet_h->global_rwlock);
- errno = err ? savederrno : 0;
- return err;
-}
-
-int knet_handle_get_stats(knet_handle_t knet_h, struct knet_handle_stats *stats, size_t struct_size)
-{
- int err = 0, savederrno = 0;
-
- if (!knet_h) {
- errno = EINVAL;
- return -1;
- }
-
- if (!stats) {
- errno = EINVAL;
- return -1;
- }
-
- savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
- if (savederrno) {
- log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
- strerror(savederrno));
- errno = savederrno;
- return -1;
- }
-
- savederrno = pthread_mutex_lock(&knet_h->handle_stats_mutex);
- if (savederrno) {
- log_err(knet_h, KNET_SUB_HANDLE, "Unable to get mutex lock: %s",
- strerror(savederrno));
- err = -1;
- goto out_unlock;
- }
-
- if (struct_size > sizeof(struct knet_handle_stats)) {
- struct_size = sizeof(struct knet_handle_stats);
- }
-
- memmove(stats, &knet_h->stats, struct_size);
-
- /*
- * TX crypt stats only count the data packets sent, so add in the ping/pong/pmtud figures
- * RX is OK as it counts them before they are sorted.
- */
-
- stats->tx_crypt_packets += knet_h->stats_extra.tx_crypt_ping_packets +
- knet_h->stats_extra.tx_crypt_pong_packets +
- knet_h->stats_extra.tx_crypt_pmtu_packets +
- knet_h->stats_extra.tx_crypt_pmtu_reply_packets;
-
- /* Tell the caller our full size in case they have an old version */
- stats->size = sizeof(struct knet_handle_stats);
-
-out_unlock:
- pthread_mutex_unlock(&knet_h->handle_stats_mutex);
- pthread_rwlock_unlock(&knet_h->global_rwlock);
- return err;
-}
-
-int knet_handle_clear_stats(knet_handle_t knet_h, int clear_option)
-{
- int savederrno = 0;
-
- if (!knet_h) {
- errno = EINVAL;
- return -1;
- }
-
- if (clear_option != KNET_CLEARSTATS_HANDLE_ONLY &&
- clear_option != KNET_CLEARSTATS_HANDLE_AND_LINK) {
- errno = EINVAL;
- return -1;
- }
-
- savederrno = get_global_wrlock(knet_h);
- if (savederrno) {
- log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
- strerror(savederrno));
- errno = savederrno;
- return -1;
- }
-
- memset(&knet_h->stats, 0, sizeof(struct knet_handle_stats));
- memset(&knet_h->stats_extra, 0, sizeof(struct knet_handle_stats_extra));
- if (clear_option == KNET_CLEARSTATS_HANDLE_AND_LINK) {
- _link_clear_stats(knet_h);
- }
-
- pthread_rwlock_unlock(&knet_h->global_rwlock);
- return 0;
-}
diff --git a/libknet/handle_api.c b/libknet/handle_api.c
new file mode 100644
index 00000000..bf4324f5
--- /dev/null
+++ b/libknet/handle_api.c
@@ -0,0 +1,602 @@
+/*
+ * Copyright (C) 2020 Red Hat, Inc. All rights reserved.
+ *
+ * Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
+ * Federico Simoncelli <fsimon@kronosnet.org>
+ *
+ * This software licensed under LGPL-2.0+
+ */
+
+#include "config.h"
+
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <errno.h>
+#include <pthread.h>
+#include <sys/uio.h>
+
+#include "internals.h"
+#include "crypto.h"
+#include "links.h"
+#include "common.h"
+#include "transport_common.h"
+#include "logging.h"
+
+int knet_handle_enable_sock_notify(knet_handle_t knet_h,
+ void *sock_notify_fn_private_data,
+ void (*sock_notify_fn) (
+ void *private_data,
+ int datafd,
+ int8_t channel,
+ uint8_t tx_rx,
+ int error,
+ int errorno))
+{
+ int savederrno = 0;
+
+ if (!knet_h) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (!sock_notify_fn) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ savederrno = get_global_wrlock(knet_h);
+ if (savederrno) {
+ log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
+ strerror(savederrno));
+ errno = savederrno;
+ return -1;
+ }
+
+ knet_h->sock_notify_fn_private_data = sock_notify_fn_private_data;
+ knet_h->sock_notify_fn = sock_notify_fn;
+ log_debug(knet_h, KNET_SUB_HANDLE, "sock_notify_fn enabled");
+
+ pthread_rwlock_unlock(&knet_h->global_rwlock);
+
+ return 0;
+}
+
+int knet_handle_add_datafd(knet_handle_t knet_h, int *datafd, int8_t *channel)
+{
+ int err = 0, savederrno = 0;
+ int i;
+ struct epoll_event ev;
+
+ if (!knet_h) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (datafd == NULL) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (channel == NULL) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (*channel >= KNET_DATAFD_MAX) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ savederrno = get_global_wrlock(knet_h);
+ if (savederrno) {
+ log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
+ strerror(savederrno));
+ errno = savederrno;
+ return -1;
+ }
+
+ if (!knet_h->sock_notify_fn) {
+ log_err(knet_h, KNET_SUB_HANDLE, "Adding datafd requires sock notify callback enabled!");
+ savederrno = EINVAL;
+ err = -1;
+ goto out_unlock;
+ }
+
+ if (*datafd > 0) {
+ for (i = 0; i < KNET_DATAFD_MAX; i++) {
+ if ((knet_h->sockfd[i].in_use) && (knet_h->sockfd[i].sockfd[0] == *datafd)) {
+ log_err(knet_h, KNET_SUB_HANDLE, "requested datafd: %d already exist in index: %d", *datafd, i);
+ savederrno = EEXIST;
+ err = -1;
+ goto out_unlock;
+ }
+ }
+ }
+
+ /*
+ * auto allocate a channel
+ */
+ if (*channel < 0) {
+ for (i = 0; i < KNET_DATAFD_MAX; i++) {
+ if (!knet_h->sockfd[i].in_use) {
+ *channel = i;
+ break;
+ }
+ }
+ if (*channel < 0) {
+ savederrno = EBUSY;
+ err = -1;
+ goto out_unlock;
+ }
+ } else {
+ if (knet_h->sockfd[*channel].in_use) {
+ savederrno = EBUSY;
+ err = -1;
+ goto out_unlock;
+ }
+ }
+
+ knet_h->sockfd[*channel].is_created = 0;
+ knet_h->sockfd[*channel].is_socket = 0;
+ knet_h->sockfd[*channel].has_error = 0;
+
+ if (*datafd > 0) {
+ int sockopt;
+ socklen_t sockoptlen = sizeof(sockopt);
+
+ if (_fdset_cloexec(*datafd)) {
+ savederrno = errno;
+ err = -1;
+ log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on datafd: %s",
+ strerror(savederrno));
+ goto out_unlock;
+ }
+
+ if (_fdset_nonblock(*datafd)) {
+ savederrno = errno;
+ err = -1;
+ log_err(knet_h, KNET_SUB_HANDLE, "Unable to set NONBLOCK on datafd: %s",
+ strerror(savederrno));
+ goto out_unlock;
+ }
+
+ knet_h->sockfd[*channel].sockfd[0] = *datafd;
+ knet_h->sockfd[*channel].sockfd[1] = 0;
+
+ if (!getsockopt(knet_h->sockfd[*channel].sockfd[0], SOL_SOCKET, SO_TYPE, &sockopt, &sockoptlen)) {
+ knet_h->sockfd[*channel].is_socket = 1;
+ }
+ } else {
+ if (_init_socketpair(knet_h, knet_h->sockfd[*channel].sockfd)) {
+ savederrno = errno;
+ err = -1;
+ goto out_unlock;
+ }
+
+ knet_h->sockfd[*channel].is_created = 1;
+ knet_h->sockfd[*channel].is_socket = 1;
+ *datafd = knet_h->sockfd[*channel].sockfd[0];
+ }
+
+ memset(&ev, 0, sizeof(struct epoll_event));
+ ev.events = EPOLLIN;
+ ev.data.fd = knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created];
+
+ if (epoll_ctl(knet_h->send_to_links_epollfd,
+ EPOLL_CTL_ADD, knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created], &ev)) {
+ savederrno = errno;
+ err = -1;
+ log_err(knet_h, KNET_SUB_HANDLE, "Unable to add datafd %d to linkfd epoll pool: %s",
+ knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created], strerror(savederrno));
+ if (knet_h->sockfd[*channel].is_created) {
+ _close_socketpair(knet_h, knet_h->sockfd[*channel].sockfd);
+ }
+ goto out_unlock;
+ }
+
+ knet_h->sockfd[*channel].in_use = 1;
+
+out_unlock:
+ pthread_rwlock_unlock(&knet_h->global_rwlock);
+ errno = err ? savederrno : 0;
+ return err;
+}
+
+int knet_handle_remove_datafd(knet_handle_t knet_h, int datafd)
+{
+ int err = 0, savederrno = 0;
+ int8_t channel = -1;
+ int i;
+ struct epoll_event ev;
+
+ if (!knet_h) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (datafd <= 0) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ savederrno = get_global_wrlock(knet_h);
+ if (savederrno) {
+ log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
+ strerror(savederrno));
+ errno = savederrno;
+ return -1;
+ }
+
+ for (i = 0; i < KNET_DATAFD_MAX; i++) {
+ if ((knet_h->sockfd[i].in_use) &&
+ (knet_h->sockfd[i].sockfd[0] == datafd)) {
+ channel = i;
+ break;
+ }
+ }
+
+ if (channel < 0) {
+ savederrno = EINVAL;
+ err = -1;
+ goto out_unlock;
+ }
+
+ if (!knet_h->sockfd[channel].has_error) {
+ memset(&ev, 0, sizeof(struct epoll_event));
+
+ if (epoll_ctl(knet_h->send_to_links_epollfd,
+ EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) {
+ savederrno = errno;
+ err = -1;
+ log_err(knet_h, KNET_SUB_HANDLE, "Unable to del datafd %d from linkfd epoll pool: %s",
+ knet_h->sockfd[channel].sockfd[0], strerror(savederrno));
+ goto out_unlock;
+ }
+ }
+
+ if (knet_h->sockfd[channel].is_created) {
+ _close_socketpair(knet_h, knet_h->sockfd[channel].sockfd);
+ }
+
+ memset(&knet_h->sockfd[channel], 0, sizeof(struct knet_sock));
+
+out_unlock:
+ pthread_rwlock_unlock(&knet_h->global_rwlock);
+ errno = err ? savederrno : 0;
+ return err;
+}
+
+int knet_handle_get_datafd(knet_handle_t knet_h, const int8_t channel, int *datafd)
+{
+ int err = 0, savederrno = 0;
+
+ if (!knet_h) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if ((channel < 0) || (channel >= KNET_DATAFD_MAX)) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (datafd == NULL) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
+ if (savederrno) {
+ log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
+ strerror(savederrno));
+ errno = savederrno;
+ return -1;
+ }
+
+ if (!knet_h->sockfd[channel].in_use) {
+ savederrno = EINVAL;
+ err = -1;
+ goto out_unlock;
+ }
+
+ *datafd = knet_h->sockfd[channel].sockfd[0];
+
+out_unlock:
+ pthread_rwlock_unlock(&knet_h->global_rwlock);
+ errno = err ? savederrno : 0;
+ return err;
+}
+
+int knet_handle_get_channel(knet_handle_t knet_h, const int datafd, int8_t *channel)
+{
+ int err = 0, savederrno = 0;
+ int i;
+
+ if (!knet_h) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (datafd <= 0) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (channel == NULL) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
+ if (savederrno) {
+ log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
+ strerror(savederrno));
+ errno = savederrno;
+ return -1;
+ }
+
+ *channel = -1;
+
+ for (i = 0; i < KNET_DATAFD_MAX; i++) {
+ if ((knet_h->sockfd[i].in_use) &&
+ (knet_h->sockfd[i].sockfd[0] == datafd)) {
+ *channel = i;
+ break;
+ }
+ }
+
+ if (*channel < 0) {
+ savederrno = EINVAL;
+ err = -1;
+ goto out_unlock;
+ }
+
+out_unlock:
+ pthread_rwlock_unlock(&knet_h->global_rwlock);
+ errno = err ? savederrno : 0;
+ return err;
+}
+
+int knet_handle_enable_filter(knet_handle_t knet_h,
+ void *dst_host_filter_fn_private_data,
+ int (*dst_host_filter_fn) (
+ void *private_data,
+ const unsigned char *outdata,
+ ssize_t outdata_len,
+ uint8_t tx_rx,
+ knet_node_id_t this_host_id,
+ knet_node_id_t src_node_id,
+ int8_t *channel,
+ knet_node_id_t *dst_host_ids,
+ size_t *dst_host_ids_entries))
+{
+ int savederrno = 0;
+
+ if (!knet_h) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ savederrno = get_global_wrlock(knet_h);
+ if (savederrno) {
+ log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
+ strerror(savederrno));
+ errno = savederrno;
+ return -1;
+ }
+
+ knet_h->dst_host_filter_fn_private_data = dst_host_filter_fn_private_data;
+ knet_h->dst_host_filter_fn = dst_host_filter_fn;
+ if (knet_h->dst_host_filter_fn) {
+ log_debug(knet_h, KNET_SUB_HANDLE, "dst_host_filter_fn enabled");
+ } else {
+ log_debug(knet_h, KNET_SUB_HANDLE, "dst_host_filter_fn disabled");
+ }
+
+ pthread_rwlock_unlock(&knet_h->global_rwlock);
+
+ errno = 0;
+ return 0;
+}
+
+int knet_handle_setfwd(knet_handle_t knet_h, unsigned int enabled)
+{
+ int savederrno = 0;
+
+ if (!knet_h) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (enabled > 1) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ savederrno = get_global_wrlock(knet_h);
+ if (savederrno) {
+ log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
+ strerror(savederrno));
+ errno = savederrno;
+ return -1;
+ }
+
+ if (enabled) {
+ knet_h->enabled = enabled;
+ log_debug(knet_h, KNET_SUB_HANDLE, "Data forwarding is enabled");
+ } else {
+ /*
+ * notify TX and RX threads to flush the queues
+ */
+ if (set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSH) < 0) {
+ log_debug(knet_h, KNET_SUB_HANDLE, "Unable to request queue flushing for TX thread");
+ }
+ if (set_thread_flush_queue(knet_h, KNET_THREAD_RX, KNET_THREAD_QUEUE_FLUSH) < 0) {
+ log_debug(knet_h, KNET_SUB_HANDLE, "Unable to request queue flushing for RX thread");
+ }
+ }
+
+ pthread_rwlock_unlock(&knet_h->global_rwlock);
+
+ /*
+ * when disabling data forward, we need to give time to TX and RX
+ * to flush the queues.
+ *
+ * the TX thread is the main leader here. When there is no more
+ * data in the TX queue, we will also close traffic for RX.
+ */
+ if (!enabled) {
+ /*
+ * this usleep might be unnecessary, but wait_all_threads_flush_queue
+ * adds extra locking delay.
+ *
+ * allow all threads to run free without extra locking interference
+ * and then we switch to a more active wait in case the scheduler
+ * has decided to delay one thread or another
+ */
+ usleep(KNET_THREADS_TIMERES * 2);
+ wait_all_threads_flush_queue(knet_h);
+
+ /*
+ * all threads have done flushing the queue, we can stop data forwarding
+ */
+ savederrno = get_global_wrlock(knet_h);
+ if (savederrno) {
+ log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
+ strerror(savederrno));
+ errno = savederrno;
+ return -1;
+ }
+ knet_h->enabled = enabled;
+ log_debug(knet_h, KNET_SUB_HANDLE, "Data forwarding is disabled");
+ pthread_rwlock_unlock(&knet_h->global_rwlock);
+ }
+
+ errno = 0;
+ return 0;
+}
+
+int knet_handle_get_stats(knet_handle_t knet_h, struct knet_handle_stats *stats, size_t struct_size)
+{
+ int err = 0, savederrno = 0;
+
+ if (!knet_h) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (!stats) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
+ if (savederrno) {
+ log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
+ strerror(savederrno));
+ errno = savederrno;
+ return -1;
+ }
+
+ savederrno = pthread_mutex_lock(&knet_h->handle_stats_mutex);
+ if (savederrno) {
+ log_err(knet_h, KNET_SUB_HANDLE, "Unable to get mutex lock: %s",
+ strerror(savederrno));
+ err = -1;
+ goto out_unlock;
+ }
+
+ if (struct_size > sizeof(struct knet_handle_stats)) {
+ struct_size = sizeof(struct knet_handle_stats);
+ }
+
+ memmove(stats, &knet_h->stats, struct_size);
+
+ /*
+ * TX crypt stats only count the data packets sent, so add in the ping/pong/pmtud figures
+ * RX is OK as it counts them before they are sorted.
+ */
+
+ stats->tx_crypt_packets += knet_h->stats_extra.tx_crypt_ping_packets +
+ knet_h->stats_extra.tx_crypt_pong_packets +
+ knet_h->stats_extra.tx_crypt_pmtu_packets +
+ knet_h->stats_extra.tx_crypt_pmtu_reply_packets;
+
+ /* Tell the caller our full size in case they have an old version */
+ stats->size = sizeof(struct knet_handle_stats);
+
+out_unlock:
+ pthread_mutex_unlock(&knet_h->handle_stats_mutex);
+ pthread_rwlock_unlock(&knet_h->global_rwlock);
+ return err;
+}
+
+int knet_handle_clear_stats(knet_handle_t knet_h, int clear_option)
+{
+ int savederrno = 0;
+
+ if (!knet_h) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (clear_option != KNET_CLEARSTATS_HANDLE_ONLY &&
+ clear_option != KNET_CLEARSTATS_HANDLE_AND_LINK) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ savederrno = get_global_wrlock(knet_h);
+ if (savederrno) {
+ log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
+ strerror(savederrno));
+ errno = savederrno;
+ return -1;
+ }
+
+ memset(&knet_h->stats, 0, sizeof(struct knet_handle_stats));
+ memset(&knet_h->stats_extra, 0, sizeof(struct knet_handle_stats_extra));
+ if (clear_option == KNET_CLEARSTATS_HANDLE_AND_LINK) {
+ _link_clear_stats(knet_h);
+ }
+
+ pthread_rwlock_unlock(&knet_h->global_rwlock);
+ return 0;
+}
+
+int knet_handle_enable_access_lists(knet_handle_t knet_h, unsigned int enabled)
+{
+ int savederrno = 0;
+
+ if (!knet_h) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (enabled > 1) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ savederrno = get_global_wrlock(knet_h);
+ if (savederrno) {
+ log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
+ strerror(savederrno));
+ errno = savederrno;
+ return -1;
+ }
+
+ knet_h->use_access_lists = enabled;
+
+ if (enabled) {
+ log_debug(knet_h, KNET_SUB_HANDLE, "Links access lists are enabled");
+ } else {
+ log_debug(knet_h, KNET_SUB_HANDLE, "Links access lists are disabled");
+ }
+
+ pthread_rwlock_unlock(&knet_h->global_rwlock);
+
+ errno = 0;
+ return 0;
+}
diff --git a/libknet/threads_pmtud.c b/libknet/threads_pmtud.c
index b4432898..d49827aa 100644
--- a/libknet/threads_pmtud.c
+++ b/libknet/threads_pmtud.c
@@ -1,642 +1,804 @@
/*
* Copyright (C) 2015-2020 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under LGPL-2.0+
*/
#include "config.h"
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include "crypto.h"
#include "links.h"
#include "host.h"
#include "logging.h"
#include "transports.h"
#include "threads_common.h"
#include "threads_pmtud.h"
static int _calculate_manual_mtu(knet_handle_t knet_h, struct knet_link *dst_link)
{
size_t ipproto_overhead_len; /* onwire packet overhead (protocol based) */
switch (dst_link->dst_addr.ss_family) {
case AF_INET6:
ipproto_overhead_len = KNET_PMTUD_OVERHEAD_V6 + dst_link->proto_overhead;
break;
case AF_INET:
ipproto_overhead_len = KNET_PMTUD_OVERHEAD_V4 + dst_link->proto_overhead;
break;
default:
log_debug(knet_h, KNET_SUB_PMTUD, "unknown protocol");
return 0;
break;
}
dst_link->status.mtu = calc_max_data_outlen(knet_h, knet_h->manual_mtu - ipproto_overhead_len);
return 1;
}
static int _handle_check_link_pmtud(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link)
{
int err, ret, savederrno, mutex_retry_limit, failsafe, use_kernel_mtu, warn_once;
uint32_t kernel_mtu; /* record kernel_mtu from EMSGSIZE */
size_t onwire_len; /* current packet onwire size */
size_t ipproto_overhead_len; /* onwire packet overhead (protocol based) */
size_t max_mtu_len; /* max mtu for protocol */
size_t data_len; /* how much data we can send in the packet
* generally would be onwire_len - ipproto_overhead_len
* needs to be adjusted for crypto
*/
size_t app_mtu_len; /* real data that we can send onwire */
ssize_t len; /* len of what we were able to sendto onwire */
struct timespec ts, pmtud_crypto_start_ts, pmtud_crypto_stop_ts;
unsigned long long pong_timeout_adj_tmp, timediff;
int pmtud_crypto_reduce = 1;
unsigned char *outbuf = (unsigned char *)knet_h->pmtudbuf;
warn_once = 0;
mutex_retry_limit = 0;
failsafe = 0;
knet_h->pmtudbuf->khp_pmtud_link = dst_link->link_id;
switch (dst_link->dst_addr.ss_family) {
case AF_INET6:
max_mtu_len = KNET_PMTUD_SIZE_V6;
ipproto_overhead_len = KNET_PMTUD_OVERHEAD_V6 + dst_link->proto_overhead;
break;
case AF_INET:
max_mtu_len = KNET_PMTUD_SIZE_V4;
ipproto_overhead_len = KNET_PMTUD_OVERHEAD_V4 + dst_link->proto_overhead;
break;
default:
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD aborted, unknown protocol");
return -1;
break;
}
dst_link->last_bad_mtu = 0;
dst_link->last_good_mtu = dst_link->last_ping_size + ipproto_overhead_len;
/*
* discovery starts from the top because kernel will
* refuse to send packets > current iface mtu.
* this saves us some time and network bw.
*/
onwire_len = max_mtu_len;
restart:
/*
* prevent a race when interface mtu is changed _exactly_ during
* the discovery process and it's complex to detect. Easier
* to wait the next loop.
* 30 is not an arbitrary value. To bisect from 576 to 128000 doesn't
* take more than 18/19 steps.
*/
if (failsafe == 30) {
log_err(knet_h, KNET_SUB_PMTUD,
"Aborting PMTUD process: Too many attempts. MTU might have changed during discovery.");
return -1;
} else {
failsafe++;
}
/*
* common to all packets
*/
/*
* calculate the application MTU based on current onwire_len minus ipproto_overhead_len
*/
app_mtu_len = calc_max_data_outlen(knet_h, onwire_len - ipproto_overhead_len);
/*
* recalculate onwire len back that might be different based
* on data padding from crypto layer.
*/
onwire_len = calc_data_outlen(knet_h, app_mtu_len + KNET_HEADER_ALL_SIZE) + ipproto_overhead_len;
/*
* calculate the size of what we need to send to sendto(2).
* see also onwire.c for packet format explanation.
*/
data_len = app_mtu_len + knet_h->sec_hash_size + knet_h->sec_salt_size + KNET_HEADER_ALL_SIZE;
if (knet_h->crypto_in_use_config) {
if (data_len < (knet_h->sec_hash_size + knet_h->sec_salt_size) + 1) {
log_debug(knet_h, KNET_SUB_PMTUD, "Aborting PMTUD process: link mtu smaller than crypto header detected (link might have been disconnected)");
return -1;
}
knet_h->pmtudbuf->khp_pmtud_size = onwire_len;
if (crypto_encrypt_and_sign(knet_h,
(const unsigned char *)knet_h->pmtudbuf,
data_len - (knet_h->sec_hash_size + knet_h->sec_salt_size),
knet_h->pmtudbuf_crypt,
(ssize_t *)&data_len) < 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to crypto pmtud packet");
return -1;
}
outbuf = knet_h->pmtudbuf_crypt;
if (pthread_mutex_lock(&knet_h->handle_stats_mutex) < 0) {
log_err(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock");
return -1;
}
knet_h->stats_extra.tx_crypt_pmtu_packets++;
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
} else {
knet_h->pmtudbuf->khp_pmtud_size = onwire_len;
}
/* link has gone down, aborting pmtud */
if (dst_link->status.connected != 1) {
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD detected host (%u) link (%u) has been disconnected", dst_host->host_id, dst_link->link_id);
return -1;
}
if (dst_link->transport_connected != 1) {
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD detected host (%u) link (%u) has been disconnected", dst_host->host_id, dst_link->link_id);
return -1;
}
if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock");
return -1;
}
if (knet_h->pmtud_abort) {
pthread_mutex_unlock(&knet_h->pmtud_mutex);
errno = EDEADLK;
return -1;
}
savederrno = pthread_mutex_lock(&knet_h->tx_mutex);
if (savederrno) {
pthread_mutex_unlock(&knet_h->pmtud_mutex);
log_err(knet_h, KNET_SUB_PMTUD, "Unable to get TX mutex lock: %s", strerror(savederrno));
return -1;
}
savederrno = pthread_mutex_lock(&dst_link->link_stats_mutex);
if (savederrno) {
pthread_mutex_unlock(&knet_h->pmtud_mutex);
pthread_mutex_unlock(&knet_h->tx_mutex);
log_err(knet_h, KNET_SUB_PMTUD, "Unable to get stats mutex lock for host %u link %u: %s",
dst_host->host_id, dst_link->link_id, strerror(savederrno));
return -1;
}
retry:
if (transport_get_connection_oriented(knet_h, dst_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) {
len = sendto(dst_link->outsock, outbuf, data_len, MSG_DONTWAIT | MSG_NOSIGNAL,
(struct sockaddr *) &dst_link->dst_addr, sizeof(struct sockaddr_storage));
} else {
len = sendto(dst_link->outsock, outbuf, data_len, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0);
}
savederrno = errno;
/*
* we cannot hold a lock on kmtu_mutex between resetting
* knet_h->kernel_mtu here and below where it's used.
* use_kernel_mtu tells us if the knet_h->kernel_mtu was
* set to 0 and we can trust its value later.
*/
use_kernel_mtu = 0;
if (pthread_mutex_lock(&knet_h->kmtu_mutex) == 0) {
use_kernel_mtu = 1;
knet_h->kernel_mtu = 0;
pthread_mutex_unlock(&knet_h->kmtu_mutex);
}
kernel_mtu = 0;
err = transport_tx_sock_error(knet_h, dst_link->transport, dst_link->outsock, len, savederrno);
switch(err) {
case -1: /* unrecoverable error */
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to send pmtu packet (sendto): %d %s", savederrno, strerror(savederrno));
pthread_mutex_unlock(&knet_h->tx_mutex);
pthread_mutex_unlock(&knet_h->pmtud_mutex);
dst_link->status.stats.tx_pmtu_errors++;
pthread_mutex_unlock(&dst_link->link_stats_mutex);
return -1;
case 0: /* ignore error and continue */
break;
case 1: /* retry to send those same data */
dst_link->status.stats.tx_pmtu_retries++;
goto retry;
break;
}
pthread_mutex_unlock(&knet_h->tx_mutex);
if (len != (ssize_t )data_len) {
pthread_mutex_unlock(&dst_link->link_stats_mutex);
if (savederrno == EMSGSIZE) {
/*
* we cannot hold a lock on kmtu_mutex between resetting
* knet_h->kernel_mtu and here.
* use_kernel_mtu tells us if the knet_h->kernel_mtu was
* set to 0 previously and we can trust its value now.
*/
if (use_kernel_mtu) {
use_kernel_mtu = 0;
if (pthread_mutex_lock(&knet_h->kmtu_mutex) == 0) {
kernel_mtu = knet_h->kernel_mtu;
pthread_mutex_unlock(&knet_h->kmtu_mutex);
}
}
if (kernel_mtu > 0) {
dst_link->last_bad_mtu = kernel_mtu + 1;
} else {
dst_link->last_bad_mtu = onwire_len;
}
} else {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to send pmtu packet len: %zu err: %s", onwire_len, strerror(savederrno));
}
} else {
dst_link->last_sent_mtu = onwire_len;
dst_link->last_recv_mtu = 0;
dst_link->status.stats.tx_pmtu_packets++;
dst_link->status.stats.tx_pmtu_bytes += data_len;
pthread_mutex_unlock(&dst_link->link_stats_mutex);
if (clock_gettime(CLOCK_REALTIME, &ts) < 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get current time: %s", strerror(errno));
pthread_mutex_unlock(&knet_h->pmtud_mutex);
return -1;
}
/*
* non fatal, we can wait the next round to reduce the
* multiplier
*/
if (clock_gettime(CLOCK_MONOTONIC, &pmtud_crypto_start_ts) < 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get current time: %s", strerror(errno));
pmtud_crypto_reduce = 0;
}
/*
* set PMTUd reply timeout to match pong_timeout on a given link
*
* math: internally pong_timeout is expressed in microseconds, while
* the public API exports milliseconds. So careful with the 0's here.
* the loop is necessary because we are grabbing the current time just above
* and add values to it that could overflow into seconds.
*/
if (pthread_mutex_lock(&knet_h->backoff_mutex)) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get backoff_mutex");
pthread_mutex_unlock(&knet_h->pmtud_mutex);
return -1;
}
if (knet_h->crypto_in_use_config) {
/*
* crypto, under pressure, is a royal PITA
*/
pong_timeout_adj_tmp = dst_link->pong_timeout_adj * dst_link->pmtud_crypto_timeout_multiplier;
} else {
pong_timeout_adj_tmp = dst_link->pong_timeout_adj;
}
ts.tv_sec += pong_timeout_adj_tmp / 1000000;
ts.tv_nsec += (((pong_timeout_adj_tmp) % 1000000) * 1000);
while (ts.tv_nsec > 1000000000) {
ts.tv_sec += 1;
ts.tv_nsec -= 1000000000;
}
pthread_mutex_unlock(&knet_h->backoff_mutex);
knet_h->pmtud_waiting = 1;
ret = pthread_cond_timedwait(&knet_h->pmtud_cond, &knet_h->pmtud_mutex, &ts);
knet_h->pmtud_waiting = 0;
if (knet_h->pmtud_abort) {
pthread_mutex_unlock(&knet_h->pmtud_mutex);
errno = EDEADLK;
return -1;
}
/*
* we cannot use shutdown_in_progress in here because
* we already hold the read lock
*/
if (knet_h->fini_in_progress) {
pthread_mutex_unlock(&knet_h->pmtud_mutex);
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD aborted. shutdown in progress");
return -1;
}
if (ret) {
if (ret == ETIMEDOUT) {
if ((knet_h->crypto_in_use_config) && (dst_link->pmtud_crypto_timeout_multiplier < KNET_LINK_PMTUD_CRYPTO_TIMEOUT_MULTIPLIER_MAX)) {
dst_link->pmtud_crypto_timeout_multiplier = dst_link->pmtud_crypto_timeout_multiplier * 2;
pmtud_crypto_reduce = 0;
log_debug(knet_h, KNET_SUB_PMTUD,
"Increasing PMTUd response timeout multiplier to (%u) for host %u link: %u",
dst_link->pmtud_crypto_timeout_multiplier,
dst_host->host_id,
dst_link->link_id);
pthread_mutex_unlock(&knet_h->pmtud_mutex);
goto restart;
}
if (!warn_once) {
log_warn(knet_h, KNET_SUB_PMTUD,
"possible MTU misconfiguration detected. "
"kernel is reporting MTU: %u bytes for "
"host %u link %u but the other node is "
"not acknowledging packets of this size. ",
dst_link->last_sent_mtu,
dst_host->host_id,
dst_link->link_id);
log_warn(knet_h, KNET_SUB_PMTUD,
"This can be caused by this node interface MTU "
"too big or a network device that does not "
"support or has been misconfigured to manage MTU "
"of this size, or packet loss. knet will continue "
"to run but performances might be affected.");
warn_once = 1;
}
} else {
pthread_mutex_unlock(&knet_h->pmtud_mutex);
if (mutex_retry_limit == 3) {
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD aborted, unable to get mutex lock");
return -1;
}
mutex_retry_limit++;
goto restart;
}
}
if ((knet_h->crypto_in_use_config) && (pmtud_crypto_reduce == 1) &&
(dst_link->pmtud_crypto_timeout_multiplier > KNET_LINK_PMTUD_CRYPTO_TIMEOUT_MULTIPLIER_MIN)) {
if (!clock_gettime(CLOCK_MONOTONIC, &pmtud_crypto_stop_ts)) {
timespec_diff(pmtud_crypto_start_ts, pmtud_crypto_stop_ts, &timediff);
if (((pong_timeout_adj_tmp * 1000) / 2) > timediff) {
dst_link->pmtud_crypto_timeout_multiplier = dst_link->pmtud_crypto_timeout_multiplier / 2;
log_debug(knet_h, KNET_SUB_PMTUD,
"Decreasing PMTUd response timeout multiplier to (%u) for host %u link: %u",
dst_link->pmtud_crypto_timeout_multiplier,
dst_host->host_id,
dst_link->link_id);
}
} else {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get current time: %s", strerror(errno));
}
}
if ((dst_link->last_recv_mtu != onwire_len) || (ret)) {
dst_link->last_bad_mtu = onwire_len;
} else {
int found_mtu = 0;
if (knet_h->sec_block_size) {
if ((onwire_len + knet_h->sec_block_size >= max_mtu_len) ||
((dst_link->last_bad_mtu) && (dst_link->last_bad_mtu <= (onwire_len + knet_h->sec_block_size)))) {
found_mtu = 1;
}
} else {
if ((onwire_len == max_mtu_len) ||
((dst_link->last_bad_mtu) && (dst_link->last_bad_mtu == (onwire_len + 1))) ||
(dst_link->last_bad_mtu == dst_link->last_good_mtu)) {
found_mtu = 1;
}
}
if (found_mtu) {
/*
* account for IP overhead, knet headers and crypto in PMTU calculation
*/
dst_link->status.mtu = calc_max_data_outlen(knet_h, onwire_len - ipproto_overhead_len);
pthread_mutex_unlock(&knet_h->pmtud_mutex);
return 0;
}
dst_link->last_good_mtu = onwire_len;
}
}
if (kernel_mtu) {
onwire_len = kernel_mtu;
} else {
onwire_len = (dst_link->last_good_mtu + dst_link->last_bad_mtu) / 2;
}
pthread_mutex_unlock(&knet_h->pmtud_mutex);
goto restart;
}
static int _handle_check_pmtud(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link, int force_run)
{
uint8_t saved_valid_pmtud;
unsigned int saved_pmtud;
struct timespec clock_now;
unsigned long long diff_pmtud, interval;
if (clock_gettime(CLOCK_MONOTONIC, &clock_now) != 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get monotonic clock");
return 0;
}
if (!force_run) {
interval = knet_h->pmtud_interval * 1000000000llu; /* nanoseconds */
timespec_diff(dst_link->pmtud_last, clock_now, &diff_pmtud);
if (diff_pmtud < interval) {
return dst_link->has_valid_mtu;
}
}
/*
* status.proto_overhead should include all IP/(UDP|SCTP)/knet headers
*
* please note that it is not the same as link->proto_overhead that
* includes only either UDP or SCTP (at the moment) overhead.
*/
switch (dst_link->dst_addr.ss_family) {
case AF_INET6:
dst_link->status.proto_overhead = KNET_PMTUD_OVERHEAD_V6 + dst_link->proto_overhead + KNET_HEADER_ALL_SIZE + knet_h->sec_hash_size + knet_h->sec_salt_size;
break;
case AF_INET:
dst_link->status.proto_overhead = KNET_PMTUD_OVERHEAD_V4 + dst_link->proto_overhead + KNET_HEADER_ALL_SIZE + knet_h->sec_hash_size + knet_h->sec_salt_size;
break;
}
saved_pmtud = dst_link->status.mtu;
saved_valid_pmtud = dst_link->has_valid_mtu;
log_debug(knet_h, KNET_SUB_PMTUD, "Starting PMTUD for host: %u link: %u", dst_host->host_id, dst_link->link_id);
errno = 0;
if (_handle_check_link_pmtud(knet_h, dst_host, dst_link) < 0) {
if (errno == EDEADLK) {
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD for host: %u link: %u has been rescheduled", dst_host->host_id, dst_link->link_id);
dst_link->status.mtu = saved_pmtud;
dst_link->has_valid_mtu = saved_valid_pmtud;
errno = EDEADLK;
return dst_link->has_valid_mtu;
}
dst_link->has_valid_mtu = 0;
} else {
if (dst_link->status.mtu < calc_min_mtu(knet_h)) {
log_info(knet_h, KNET_SUB_PMTUD,
"Invalid MTU detected for host: %u link: %u mtu: %u",
dst_host->host_id, dst_link->link_id, dst_link->status.mtu);
dst_link->has_valid_mtu = 0;
} else {
dst_link->has_valid_mtu = 1;
}
if (dst_link->has_valid_mtu) {
if ((saved_pmtud) && (saved_pmtud != dst_link->status.mtu)) {
log_info(knet_h, KNET_SUB_PMTUD, "PMTUD link change for host: %u link: %u from %u to %u",
dst_host->host_id, dst_link->link_id, saved_pmtud, dst_link->status.mtu);
}
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD completed for host: %u link: %u current link mtu: %u",
dst_host->host_id, dst_link->link_id, dst_link->status.mtu);
/*
* set pmtud_last, if we can, after we are done with the PMTUd process
* because it can take a very long time.
*/
dst_link->pmtud_last = clock_now;
if (!clock_gettime(CLOCK_MONOTONIC, &clock_now)) {
dst_link->pmtud_last = clock_now;
}
}
}
if (saved_valid_pmtud != dst_link->has_valid_mtu) {
_host_dstcache_update_async(knet_h, dst_host);
}
return dst_link->has_valid_mtu;
}
void *_handle_pmtud_link_thread(void *data)
{
knet_handle_t knet_h = (knet_handle_t) data;
struct knet_host *dst_host;
struct knet_link *dst_link;
int link_idx;
unsigned int have_mtu;
unsigned int lower_mtu;
int link_has_mtu;
int force_run = 0;
set_thread_status(knet_h, KNET_THREAD_PMTUD, KNET_THREAD_STARTED);
knet_h->data_mtu = calc_min_mtu(knet_h);
/* preparing pmtu buffer */
knet_h->pmtudbuf->kh_version = KNET_HEADER_VERSION;
knet_h->pmtudbuf->kh_type = KNET_HEADER_TYPE_PMTUD;
knet_h->pmtudbuf->kh_node = htons(knet_h->host_id);
while (!shutdown_in_progress(knet_h)) {
usleep(KNET_THREADS_TIMERES);
if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock");
continue;
}
knet_h->pmtud_abort = 0;
knet_h->pmtud_running = 1;
force_run = knet_h->pmtud_forcerun;
knet_h->pmtud_forcerun = 0;
pthread_mutex_unlock(&knet_h->pmtud_mutex);
if (force_run) {
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUd request to rerun has been received");
}
if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get read lock");
continue;
}
lower_mtu = KNET_PMTUD_SIZE_V4;
have_mtu = 0;
for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
dst_link = &dst_host->link[link_idx];
if ((dst_link->status.enabled != 1) ||
(dst_link->status.connected != 1) ||
(dst_host->link[link_idx].transport == KNET_TRANSPORT_LOOPBACK) ||
(!dst_link->last_ping_size) ||
((dst_link->dynamic == KNET_LINK_DYNIP) &&
(dst_link->status.dynconnected != 1)))
continue;
if (!knet_h->manual_mtu) {
link_has_mtu = _handle_check_pmtud(knet_h, dst_host, dst_link, force_run);
if (errno == EDEADLK) {
goto out_unlock;
}
if (link_has_mtu) {
have_mtu = 1;
if (dst_link->status.mtu < lower_mtu) {
lower_mtu = dst_link->status.mtu;
}
}
} else {
link_has_mtu = _calculate_manual_mtu(knet_h, dst_link);
if (link_has_mtu) {
have_mtu = 1;
if (dst_link->status.mtu < lower_mtu) {
lower_mtu = dst_link->status.mtu;
}
}
}
}
}
if (have_mtu) {
if (knet_h->data_mtu != lower_mtu) {
knet_h->data_mtu = lower_mtu;
log_info(knet_h, KNET_SUB_PMTUD, "Global data MTU changed to: %u", knet_h->data_mtu);
if (knet_h->pmtud_notify_fn) {
knet_h->pmtud_notify_fn(knet_h->pmtud_notify_fn_private_data,
knet_h->data_mtu);
}
}
}
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock");
} else {
knet_h->pmtud_running = 0;
pthread_mutex_unlock(&knet_h->pmtud_mutex);
}
}
set_thread_status(knet_h, KNET_THREAD_PMTUD, KNET_THREAD_STOPPED);
return NULL;
}
+
+int knet_handle_pmtud_getfreq(knet_handle_t knet_h, unsigned int *interval)
+{
+ int savederrno = 0;
+
+ if (!knet_h) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (!interval) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
+ if (savederrno) {
+ log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
+ strerror(savederrno));
+ errno = savederrno;
+ return -1;
+ }
+
+ *interval = knet_h->pmtud_interval;
+
+ pthread_rwlock_unlock(&knet_h->global_rwlock);
+
+ errno = 0;
+ return 0;
+}
+
+int knet_handle_pmtud_setfreq(knet_handle_t knet_h, unsigned int interval)
+{
+ int savederrno = 0;
+
+ if (!knet_h) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if ((!interval) || (interval > 86400)) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ savederrno = get_global_wrlock(knet_h);
+ if (savederrno) {
+ log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
+ strerror(savederrno));
+ errno = savederrno;
+ return -1;
+ }
+
+ knet_h->pmtud_interval = interval;
+ log_debug(knet_h, KNET_SUB_HANDLE, "PMTUd interval set to: %u seconds", interval);
+
+ pthread_rwlock_unlock(&knet_h->global_rwlock);
+
+ errno = 0;
+ return 0;
+}
+
+int knet_handle_enable_pmtud_notify(knet_handle_t knet_h,
+ void *pmtud_notify_fn_private_data,
+ void (*pmtud_notify_fn) (
+ void *private_data,
+ unsigned int data_mtu))
+{
+ int savederrno = 0;
+
+ if (!knet_h) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ savederrno = get_global_wrlock(knet_h);
+ if (savederrno) {
+ log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
+ strerror(savederrno));
+ errno = savederrno;
+ return -1;
+ }
+
+ knet_h->pmtud_notify_fn_private_data = pmtud_notify_fn_private_data;
+ knet_h->pmtud_notify_fn = pmtud_notify_fn;
+ if (knet_h->pmtud_notify_fn) {
+ log_debug(knet_h, KNET_SUB_HANDLE, "pmtud_notify_fn enabled");
+ } else {
+ log_debug(knet_h, KNET_SUB_HANDLE, "pmtud_notify_fn disabled");
+ }
+
+ pthread_rwlock_unlock(&knet_h->global_rwlock);
+
+ errno = 0;
+ return 0;
+}
+
+int knet_handle_pmtud_set(knet_handle_t knet_h,
+ unsigned int iface_mtu)
+{
+ int savederrno = 0;
+
+ if (!knet_h) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (iface_mtu > KNET_PMTUD_SIZE_V4) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
+ if (savederrno) {
+ log_err(knet_h, KNET_SUB_PMTUD, "Unable to get read lock: %s",
+ strerror(savederrno));
+ errno = savederrno;
+ return -1;
+ }
+
+ log_info(knet_h, KNET_SUB_PMTUD, "MTU manually set to: %u", iface_mtu);
+
+ knet_h->manual_mtu = iface_mtu;
+
+ force_pmtud_run(knet_h, KNET_SUB_PMTUD, 0);
+
+ pthread_rwlock_unlock(&knet_h->global_rwlock);
+
+ errno = 0;
+ return 0;
+}
+
+int knet_handle_pmtud_get(knet_handle_t knet_h,
+ unsigned int *data_mtu)
+{
+ int savederrno = 0;
+
+ if (!knet_h) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (!data_mtu) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
+ if (savederrno) {
+ log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
+ strerror(savederrno));
+ errno = savederrno;
+ return -1;
+ }
+
+ *data_mtu = knet_h->data_mtu;
+
+ pthread_rwlock_unlock(&knet_h->global_rwlock);
+
+ errno = 0;
+ return 0;
+}
diff --git a/libknet/threads_rx.c b/libknet/threads_rx.c
index bff7dbad..61e2862d 100644
--- a/libknet/threads_rx.c
+++ b/libknet/threads_rx.c
@@ -1,1001 +1,1064 @@
/*
* Copyright (C) 2012-2020 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under LGPL-2.0+
*/
#include "config.h"
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <sys/uio.h>
#include <pthread.h>
#include "compat.h"
#include "compress.h"
#include "crypto.h"
#include "host.h"
#include "links.h"
#include "links_acl.h"
#include "logging.h"
#include "transports.h"
#include "transport_common.h"
#include "threads_common.h"
#include "threads_heartbeat.h"
#include "threads_rx.h"
#include "netutils.h"
/*
* RECV
*/
/*
* return 1 if a > b
* return -1 if b > a
* return 0 if they are equal
*/
static inline int timecmp(struct timespec a, struct timespec b)
{
if (a.tv_sec != b.tv_sec) {
if (a.tv_sec > b.tv_sec) {
return 1;
} else {
return -1;
}
} else {
if (a.tv_nsec > b.tv_nsec) {
return 1;
} else if (a.tv_nsec < b.tv_nsec) {
return -1;
} else {
return 0;
}
}
}
/*
* this functions needs to return an index (0 to 7)
* to a knet_host_defrag_buf. (-1 on errors)
*/
static int find_pckt_defrag_buf(knet_handle_t knet_h, struct knet_header *inbuf)
{
struct knet_host *src_host = knet_h->host_index[inbuf->kh_node];
int i, oldest;
/*
* check if there is a buffer already in use handling the same seq_num
*/
for (i = 0; i < KNET_MAX_LINK; i++) {
if (src_host->defrag_buf[i].in_use) {
if (src_host->defrag_buf[i].pckt_seq == inbuf->khp_data_seq_num) {
return i;
}
}
}
/*
* If there is no buffer that's handling the current seq_num
* either it's new or it's been reclaimed already.
* check if it's been reclaimed/seen before using the defrag circular
* buffer. If the pckt has been seen before, the buffer expired (ETIME)
* and there is no point to try to defrag it again.
*/
if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 1, 0)) {
errno = ETIME;
return -1;
}
/*
* register the pckt as seen
*/
_seq_num_set(src_host, inbuf->khp_data_seq_num, 1);
/*
* see if there is a free buffer
*/
for (i = 0; i < KNET_MAX_LINK; i++) {
if (!src_host->defrag_buf[i].in_use) {
return i;
}
}
/*
* at this point, there are no free buffers, the pckt is new
* and we need to reclaim a buffer, and we will take the one
* with the oldest timestamp. It's as good as any.
*/
oldest = 0;
for (i = 0; i < KNET_MAX_LINK; i++) {
if (timecmp(src_host->defrag_buf[i].last_update, src_host->defrag_buf[oldest].last_update) < 0) {
oldest = i;
}
}
src_host->defrag_buf[oldest].in_use = 0;
return oldest;
}
static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *len)
{
struct knet_host_defrag_buf *defrag_buf;
int defrag_buf_idx;
defrag_buf_idx = find_pckt_defrag_buf(knet_h, inbuf);
if (defrag_buf_idx < 0) {
return 1;
}
defrag_buf = &knet_h->host_index[inbuf->kh_node]->defrag_buf[defrag_buf_idx];
/*
* if the buf is not is use, then make sure it's clean
*/
if (!defrag_buf->in_use) {
memset(defrag_buf, 0, sizeof(struct knet_host_defrag_buf));
defrag_buf->in_use = 1;
defrag_buf->pckt_seq = inbuf->khp_data_seq_num;
}
/*
* update timestamp on the buffer
*/
clock_gettime(CLOCK_MONOTONIC, &defrag_buf->last_update);
/*
* check if we already received this fragment
*/
if (defrag_buf->frag_map[inbuf->khp_data_frag_seq]) {
/*
* if we have received this fragment and we didn't clear the buffer
* it means that we don't have all fragments yet
*/
return 1;
}
/*
* we need to handle the last packet with gloves due to its different size
*/
if (inbuf->khp_data_frag_seq == inbuf->khp_data_frag_num) {
defrag_buf->last_frag_size = *len;
/*
* in the event when the last packet arrives first,
* we still don't know the offset vs the other fragments (based on MTU),
* so we store the fragment at the end of the buffer where it's safe
* and take a copy of the len so that we can restore its offset later.
* remember we can't use the local MTU for this calculation because pMTU
* can be asymettric between the same hosts.
*/
if (!defrag_buf->frag_size) {
defrag_buf->last_first = 1;
memmove(defrag_buf->buf + (KNET_MAX_PACKET_SIZE - *len),
inbuf->khp_data_userdata,
*len);
}
} else {
defrag_buf->frag_size = *len;
}
if (defrag_buf->frag_size) {
memmove(defrag_buf->buf + ((inbuf->khp_data_frag_seq - 1) * defrag_buf->frag_size),
inbuf->khp_data_userdata, *len);
}
defrag_buf->frag_recv++;
defrag_buf->frag_map[inbuf->khp_data_frag_seq] = 1;
/*
* check if we received all the fragments
*/
if (defrag_buf->frag_recv == inbuf->khp_data_frag_num) {
/*
* special case the last pckt
*/
if (defrag_buf->last_first) {
memmove(defrag_buf->buf + ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size),
defrag_buf->buf + (KNET_MAX_PACKET_SIZE - defrag_buf->last_frag_size),
defrag_buf->last_frag_size);
}
/*
* recalculate packet lenght
*/
*len = ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size) + defrag_buf->last_frag_size;
/*
* copy the pckt back in the user data
*/
memmove(inbuf->khp_data_userdata, defrag_buf->buf, *len);
/*
* free this buffer
*/
defrag_buf->in_use = 0;
return 0;
}
return 1;
}
static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struct knet_mmsghdr *msg)
{
int err = 0, savederrno = 0, stats_err = 0;
ssize_t outlen;
struct knet_host *src_host;
struct knet_link *src_link;
unsigned long long latency_last;
knet_node_id_t dst_host_ids[KNET_MAX_HOST];
size_t dst_host_ids_entries = 0;
int bcast = 1;
uint64_t decrypt_time = 0;
struct timespec recvtime;
struct knet_header *inbuf = msg->msg_hdr.msg_iov->iov_base;
unsigned char *outbuf = (unsigned char *)msg->msg_hdr.msg_iov->iov_base;
ssize_t len = msg->msg_len;
struct knet_hostinfo *knet_hostinfo;
struct iovec iov_out[1];
int8_t channel;
struct sockaddr_storage pckt_src;
seq_num_t recv_seq_num;
int wipe_bufs = 0;
int try_decrypt = 0, decrypted = 0, i;
for (i = 1; i <= KNET_MAX_CRYPTO_INSTANCES; i++) {
if (knet_h->crypto_instance[i]) {
try_decrypt = 1;
break;
}
}
if ((!try_decrypt) && (knet_h->crypto_only == KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC)) {
log_debug(knet_h, KNET_SUB_RX, "RX thread configured to accept only crypto packets, but no crypto configs are configured!");
return;
}
if (try_decrypt) {
struct timespec start_time;
struct timespec end_time;
clock_gettime(CLOCK_MONOTONIC, &start_time);
if (crypto_authenticate_and_decrypt(knet_h,
(unsigned char *)inbuf,
len,
knet_h->recv_from_links_buf_decrypt,
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to decrypt/auth packet");
if (knet_h->crypto_only == KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC) {
return;
}
log_debug(knet_h, KNET_SUB_RX, "Attempting to process packet as clear data");
} else {
clock_gettime(CLOCK_MONOTONIC, &end_time);
timespec_diff(start_time, end_time, &decrypt_time);
len = outlen;
inbuf = (struct knet_header *)knet_h->recv_from_links_buf_decrypt;
decrypted = 1;
}
}
if (len < (ssize_t)(KNET_HEADER_SIZE + 1)) {
log_debug(knet_h, KNET_SUB_RX, "Packet is too short: %ld", (long)len);
return;
}
if (inbuf->kh_version != KNET_HEADER_VERSION) {
log_debug(knet_h, KNET_SUB_RX, "Packet version does not match");
return;
}
inbuf->kh_node = ntohs(inbuf->kh_node);
src_host = knet_h->host_index[inbuf->kh_node];
if (src_host == NULL) { /* host not found */
log_debug(knet_h, KNET_SUB_RX, "Unable to find source host for this packet");
return;
}
src_link = src_host->link +
(inbuf->khp_ping_link % KNET_MAX_LINK);
if ((inbuf->kh_type & KNET_HEADER_TYPE_PMSK) != 0) {
if (src_link->dynamic == KNET_LINK_DYNIP) {
/*
* cpyaddrport will only copy address and port of the incoming
* packet and strip extra bits such as flow and scopeid
*/
cpyaddrport(&pckt_src, msg->msg_hdr.msg_name);
if (cmpaddr(&src_link->dst_addr, sockaddr_len(&src_link->dst_addr),
&pckt_src, sockaddr_len(&pckt_src)) != 0) {
log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u appears to have changed ip address",
src_host->host_id, src_link->link_id);
memmove(&src_link->dst_addr, &pckt_src, sizeof(struct sockaddr_storage));
if (knet_addrtostr(&src_link->dst_addr, sockaddr_len(msg->msg_hdr.msg_name),
src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN,
src_link->status.dst_port, KNET_MAX_PORT_LEN) != 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to resolve ???");
snprintf(src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN - 1, "Unknown!!!");
snprintf(src_link->status.dst_port, KNET_MAX_PORT_LEN - 1, "??");
} else {
log_info(knet_h, KNET_SUB_RX,
"host: %u link: %u new connection established from: %s %s",
src_host->host_id, src_link->link_id,
src_link->status.dst_ipaddr, src_link->status.dst_port);
}
}
/*
* transport has already accepted the connection here
* otherwise we would not be receiving packets
*/
transport_link_dyn_connect(knet_h, sockfd, src_link);
}
}
stats_err = pthread_mutex_lock(&src_link->link_stats_mutex);
if (stats_err) {
log_err(knet_h, KNET_SUB_RX, "Unable to get stats mutex lock for host %u link %u: %s",
src_host->host_id, src_link->link_id, strerror(savederrno));
return;
}
switch (inbuf->kh_type) {
case KNET_HEADER_TYPE_HOST_INFO:
case KNET_HEADER_TYPE_DATA:
if (!src_host->status.reachable) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX, "Source host %u not reachable yet. Discarding packet.", src_host->host_id);
return;
}
inbuf->khp_data_seq_num = ntohs(inbuf->khp_data_seq_num);
channel = inbuf->khp_data_channel;
src_host->got_data = 1;
src_link->status.stats.rx_data_packets++;
src_link->status.stats.rx_data_bytes += len;
if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) {
log_debug(knet_h, KNET_SUB_RX, "Packet has already been delivered");
}
return;
}
if (inbuf->khp_data_frag_num > 1) {
/*
* len as received from the socket also includes extra stuff
* that the defrag code doesn't care about. So strip it
* here and readd only for repadding once we are done
* defragging
*/
len = len - KNET_HEADER_DATA_SIZE;
if (pckt_defrag(knet_h, inbuf, &len)) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
return;
}
len = len + KNET_HEADER_DATA_SIZE;
}
if (inbuf->khp_data_compress) {
ssize_t decmp_outlen = KNET_DATABUFSIZE_COMPRESS;
struct timespec start_time;
struct timespec end_time;
uint64_t compress_time;
clock_gettime(CLOCK_MONOTONIC, &start_time);
err = decompress(knet_h, inbuf->khp_data_compress,
(const unsigned char *)inbuf->khp_data_userdata,
len - KNET_HEADER_DATA_SIZE,
knet_h->recv_from_links_buf_decompress,
&decmp_outlen);
stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
if (stats_err < 0) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
return;
}
clock_gettime(CLOCK_MONOTONIC, &end_time);
timespec_diff(start_time, end_time, &compress_time);
if (!err) {
/* Collect stats */
if (compress_time < knet_h->stats.rx_compress_time_min) {
knet_h->stats.rx_compress_time_min = compress_time;
}
if (compress_time > knet_h->stats.rx_compress_time_max) {
knet_h->stats.rx_compress_time_max = compress_time;
}
knet_h->stats.rx_compress_time_ave =
(knet_h->stats.rx_compress_time_ave * knet_h->stats.rx_compressed_packets +
compress_time) / (knet_h->stats.rx_compressed_packets+1);
knet_h->stats.rx_compressed_packets++;
knet_h->stats.rx_compressed_original_bytes += decmp_outlen;
knet_h->stats.rx_compressed_size_bytes += len - KNET_HEADER_SIZE;
memmove(inbuf->khp_data_userdata, knet_h->recv_from_links_buf_decompress, decmp_outlen);
len = decmp_outlen + KNET_HEADER_DATA_SIZE;
} else {
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_warn(knet_h, KNET_SUB_COMPRESS, "Unable to decompress packet (%d): %s",
err, strerror(errno));
return;
}
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
}
if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) {
if (decrypted) {
stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
if (stats_err < 0) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
return;
}
/* Only update the crypto overhead for data packets. Mainly to be
consistent with TX */
if (decrypt_time < knet_h->stats.rx_crypt_time_min) {
knet_h->stats.rx_crypt_time_min = decrypt_time;
}
if (decrypt_time > knet_h->stats.rx_crypt_time_max) {
knet_h->stats.rx_crypt_time_max = decrypt_time;
}
knet_h->stats.rx_crypt_time_ave =
(knet_h->stats.rx_crypt_time_ave * knet_h->stats.rx_crypt_packets +
decrypt_time) / (knet_h->stats.rx_crypt_packets+1);
knet_h->stats.rx_crypt_packets++;
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
}
if (knet_h->enabled != 1) /* data forward is disabled */
break;
if (knet_h->dst_host_filter_fn) {
size_t host_idx;
int found = 0;
bcast = knet_h->dst_host_filter_fn(
knet_h->dst_host_filter_fn_private_data,
(const unsigned char *)inbuf->khp_data_userdata,
len - KNET_HEADER_DATA_SIZE,
KNET_NOTIFY_RX,
knet_h->host_id,
inbuf->kh_node,
&channel,
dst_host_ids,
&dst_host_ids_entries);
if (bcast < 0) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast);
return;
}
if ((!bcast) && (!dst_host_ids_entries)) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries");
return;
}
/* check if we are dst for this packet */
if (!bcast) {
if (dst_host_ids_entries > KNET_MAX_HOST) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX, "dst_host_filter_fn returned too many destinations");
return;
}
for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
if (dst_host_ids[host_idx] == knet_h->host_id) {
found = 1;
break;
}
}
if (!found) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX, "Packet is not for us");
return;
}
}
}
}
if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) {
if (!knet_h->sockfd[channel].in_use) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX,
"received packet for channel %d but there is no local sock connected",
channel);
return;
}
outlen = 0;
memset(iov_out, 0, sizeof(iov_out));
retry:
iov_out[0].iov_base = (void *) inbuf->khp_data_userdata + outlen;
iov_out[0].iov_len = len - (outlen + KNET_HEADER_DATA_SIZE);
outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1);
if ((outlen > 0) && (outlen < (ssize_t)iov_out[0].iov_len)) {
log_debug(knet_h, KNET_SUB_RX,
"Unable to send all data to the application in one go. Expected: %zu Sent: %zd\n",
iov_out[0].iov_len, outlen);
goto retry;
}
if (outlen <= 0) {
knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
knet_h->sockfd[channel].sockfd[0],
channel,
KNET_NOTIFY_RX,
outlen,
errno);
pthread_mutex_unlock(&src_link->link_stats_mutex);
return;
}
if ((size_t)outlen == iov_out[0].iov_len) {
_seq_num_set(src_host, inbuf->khp_data_seq_num, 0);
}
} else { /* HOSTINFO */
knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata;
if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) {
knet_hostinfo->khi_dst_node_id = ntohs(knet_hostinfo->khi_dst_node_id);
}
if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
return;
}
_seq_num_set(src_host, inbuf->khp_data_seq_num, 0);
switch(knet_hostinfo->khi_type) {
case KNET_HOSTINFO_TYPE_LINK_UP_DOWN:
break;
case KNET_HOSTINFO_TYPE_LINK_TABLE:
break;
default:
log_warn(knet_h, KNET_SUB_RX, "Receiving unknown host info message from host %u", src_host->host_id);
break;
}
}
break;
case KNET_HEADER_TYPE_PING:
outlen = KNET_HEADER_PING_SIZE;
inbuf->kh_type = KNET_HEADER_TYPE_PONG;
inbuf->kh_node = htons(knet_h->host_id);
recv_seq_num = ntohs(inbuf->khp_ping_seq_num);
src_link->status.stats.rx_ping_packets++;
src_link->status.stats.rx_ping_bytes += len;
wipe_bufs = 0;
if (!inbuf->khp_ping_timed) {
/*
* we might be receiving this message from all links, but we want
* to process it only the first time
*/
if (recv_seq_num != src_host->untimed_rx_seq_num) {
/*
* cache the untimed seq num
*/
src_host->untimed_rx_seq_num = recv_seq_num;
/*
* if the host has received data in between
* untimed ping, then we don't need to wipe the bufs
*/
if (src_host->got_data) {
src_host->got_data = 0;
wipe_bufs = 0;
} else {
wipe_bufs = 1;
}
}
_seq_num_lookup(src_host, recv_seq_num, 0, wipe_bufs);
} else {
/*
* pings always arrives in bursts over all the link
* catch the first of them to cache the seq num and
* avoid duplicate processing
*/
if (recv_seq_num != src_host->timed_rx_seq_num) {
src_host->timed_rx_seq_num = recv_seq_num;
if (recv_seq_num == 0) {
_seq_num_lookup(src_host, recv_seq_num, 0, 1);
}
}
}
if (knet_h->crypto_in_use_config) {
if (crypto_encrypt_and_sign(knet_h,
(const unsigned char *)inbuf,
outlen,
knet_h->recv_from_links_buf_crypt,
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt pong packet");
break;
}
outbuf = knet_h->recv_from_links_buf_crypt;
stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
if (stats_err < 0) {
log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
break;
}
knet_h->stats_extra.tx_crypt_pong_packets++;
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
}
retry_pong:
if (src_link->transport_connected) {
if (transport_get_connection_oriented(knet_h, src_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) {
len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
(struct sockaddr *) &src_link->dst_addr, sizeof(struct sockaddr_storage));
} else {
len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0);
}
savederrno = errno;
if (len != outlen) {
err = transport_tx_sock_error(knet_h, src_link->transport, src_link->outsock, len, savederrno);
switch(err) {
case -1: /* unrecoverable error */
log_debug(knet_h, KNET_SUB_RX,
"Unable to send pong reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
src_link->outsock, errno, strerror(errno),
src_link->status.src_ipaddr, src_link->status.src_port,
src_link->status.dst_ipaddr, src_link->status.dst_port);
src_link->status.stats.tx_pong_errors++;
break;
case 0: /* ignore error and continue */
break;
case 1: /* retry to send those same data */
src_link->status.stats.tx_pong_retries++;
goto retry_pong;
break;
}
}
src_link->status.stats.tx_pong_packets++;
src_link->status.stats.tx_pong_bytes += outlen;
}
break;
case KNET_HEADER_TYPE_PONG:
src_link->status.stats.rx_pong_packets++;
src_link->status.stats.rx_pong_bytes += len;
clock_gettime(CLOCK_MONOTONIC, &src_link->status.pong_last);
memmove(&recvtime, &inbuf->khp_ping_time[0], sizeof(struct timespec));
timespec_diff(recvtime,
src_link->status.pong_last, &latency_last);
if ((latency_last / 1000llu) > src_link->pong_timeout) {
log_debug(knet_h, KNET_SUB_RX,
"Incoming pong packet from host: %u link: %u has higher latency than pong_timeout. Discarding",
src_host->host_id, src_link->link_id);
} else {
/*
* in words : ('previous mean' * '(count -1)') + 'new value') / 'count'
*/
src_link->latency_cur_samples++;
/*
* limit to max_samples (precision)
*/
if (src_link->latency_cur_samples >= src_link->latency_max_samples) {
src_link->latency_cur_samples = src_link->latency_max_samples;
}
src_link->status.latency =
(((src_link->status.latency * (src_link->latency_cur_samples - 1)) + (latency_last / 1000llu)) / src_link->latency_cur_samples);
if (src_link->status.latency < src_link->pong_timeout_adj) {
if (!src_link->status.connected) {
if (src_link->received_pong >= src_link->pong_count) {
log_info(knet_h, KNET_SUB_RX, "host: %u link: %u is up",
src_host->host_id, src_link->link_id);
_link_updown(knet_h, src_host->host_id, src_link->link_id, src_link->status.enabled, 1, 0);
} else {
src_link->received_pong++;
log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u received pong: %u",
src_host->host_id, src_link->link_id, src_link->received_pong);
}
}
}
/* Calculate latency stats */
if (src_link->status.latency > src_link->status.stats.latency_max) {
src_link->status.stats.latency_max = src_link->status.latency;
}
if (src_link->status.latency < src_link->status.stats.latency_min) {
src_link->status.stats.latency_min = src_link->status.latency;
}
/*
* those 2 lines below make all latency average calculations consistent and capped to
* link precision. In future we will kill the one above to keep only this one in
* the stats structure, but for now we leave it around to avoid API/ABI
* breakage as we backport the fixes to stable
*/
src_link->status.stats.latency_ave = src_link->status.latency;
src_link->status.stats.latency_samples = src_link->latency_cur_samples;
}
break;
case KNET_HEADER_TYPE_PMTUD:
src_link->status.stats.rx_pmtu_packets++;
src_link->status.stats.rx_pmtu_bytes += len;
outlen = KNET_HEADER_PMTUD_SIZE;
inbuf->kh_type = KNET_HEADER_TYPE_PMTUD_REPLY;
inbuf->kh_node = htons(knet_h->host_id);
if (knet_h->crypto_in_use_config) {
if (crypto_encrypt_and_sign(knet_h,
(const unsigned char *)inbuf,
outlen,
knet_h->recv_from_links_buf_crypt,
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt PMTUd reply packet");
break;
}
outbuf = knet_h->recv_from_links_buf_crypt;
stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
if (stats_err < 0) {
log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
break;
}
knet_h->stats_extra.tx_crypt_pmtu_reply_packets++;
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
}
/* Unlock so we don't deadlock with tx_mutex */
pthread_mutex_unlock(&src_link->link_stats_mutex);
savederrno = pthread_mutex_lock(&knet_h->tx_mutex);
if (savederrno) {
log_err(knet_h, KNET_SUB_RX, "Unable to get TX mutex lock: %s", strerror(savederrno));
goto out_pmtud;
}
retry_pmtud:
if (src_link->transport_connected) {
if (transport_get_connection_oriented(knet_h, src_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) {
len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
(struct sockaddr *) &src_link->dst_addr, sizeof(struct sockaddr_storage));
} else {
len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0);
}
savederrno = errno;
if (len != outlen) {
err = transport_tx_sock_error(knet_h, src_link->transport, src_link->outsock, len, savederrno);
stats_err = pthread_mutex_lock(&src_link->link_stats_mutex);
if (stats_err < 0) {
log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
break;
}
switch(err) {
case -1: /* unrecoverable error */
log_debug(knet_h, KNET_SUB_RX,
"Unable to send PMTUd reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
src_link->outsock, errno, strerror(errno),
src_link->status.src_ipaddr, src_link->status.src_port,
src_link->status.dst_ipaddr, src_link->status.dst_port);
src_link->status.stats.tx_pmtu_errors++;
break;
case 0: /* ignore error and continue */
src_link->status.stats.tx_pmtu_errors++;
break;
case 1: /* retry to send those same data */
src_link->status.stats.tx_pmtu_retries++;
pthread_mutex_unlock(&src_link->link_stats_mutex);
goto retry_pmtud;
break;
}
pthread_mutex_unlock(&src_link->link_stats_mutex);
}
}
pthread_mutex_unlock(&knet_h->tx_mutex);
out_pmtud:
return; /* Don't need to unlock link_stats_mutex */
case KNET_HEADER_TYPE_PMTUD_REPLY:
src_link->status.stats.rx_pmtu_packets++;
src_link->status.stats.rx_pmtu_bytes += len;
/* pmtud_mutex can't be acquired while we hold a link_stats_mutex (ordering) */
pthread_mutex_unlock(&src_link->link_stats_mutex);
if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to get mutex lock");
break;
}
src_link->last_recv_mtu = inbuf->khp_pmtud_size;
pthread_cond_signal(&knet_h->pmtud_cond);
pthread_mutex_unlock(&knet_h->pmtud_mutex);
return;
default:
pthread_mutex_unlock(&src_link->link_stats_mutex);
return;
}
pthread_mutex_unlock(&src_link->link_stats_mutex);
}
static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg)
{
int err, savederrno;
int i, msg_recv, transport;
if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to get global read lock");
return;
}
if (_is_valid_fd(knet_h, sockfd) < 1) {
/*
* this is normal if a fd got an event and before we grab the read lock
* and the link is removed by another thread
*/
goto exit_unlock;
}
transport = knet_h->knet_transport_fd_tracker[sockfd].transport;
/*
* reset msg_namelen to buffer size because after recvmmsg
* each msg_namelen will contain sizeof sockaddr_in or sockaddr_in6
*/
for (i = 0; i < PCKT_RX_BUFS; i++) {
msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
}
msg_recv = _recvmmsg(sockfd, &msg[0], PCKT_RX_BUFS, MSG_DONTWAIT | MSG_NOSIGNAL);
savederrno = errno;
/*
* WARNING: man page for recvmmsg is wrong. Kernel implementation here:
* recvmmsg can return:
* -1 on error
* 0 if the previous run of recvmmsg recorded an error on the socket
* N number of messages (see exception below).
*
* If there is an error from recvmsg after receiving a frame or more, the recvmmsg
* loop is interrupted, error recorded in the socket (getsockopt(SO_ERROR) and
* it will be visibile in the next run.
*
* Need to be careful how we handle errors at this stage.
*
* error messages need to be handled on a per transport/protocol base
* at this point we have different layers of error handling
* - msg_recv < 0 -> error from this run
* msg_recv = 0 -> error from previous run and error on socket needs to be cleared
* - per-transport message data
* example: msg[i].msg_hdr.msg_flags & MSG_NOTIFICATION or msg_len for SCTP == EOF,
* but for UDP it is perfectly legal to receive a 0 bytes message.. go figure
* - NOTE: on SCTP MSG_NOTIFICATION we get msg_recv == PCKT_FRAG_MAX messages and no
* errno set. That means the error api needs to be able to abort the loop below.
*/
if (msg_recv <= 0) {
transport_rx_sock_error(knet_h, transport, sockfd, msg_recv, savederrno);
goto exit_unlock;
}
for (i = 0; i < msg_recv; i++) {
err = transport_rx_is_data(knet_h, transport, sockfd, &msg[i]);
/*
* TODO: make this section silent once we are confident
* all protocols packet handlers are good
*/
switch(err) {
case KNET_TRANSPORT_RX_ERROR: /* on error */
log_debug(knet_h, KNET_SUB_RX, "Transport reported error parsing packet");
goto exit_unlock;
break;
case KNET_TRANSPORT_RX_NOT_DATA_CONTINUE: /* packet is not data and we should continue the packet process loop */
log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, continue");
break;
case KNET_TRANSPORT_RX_NOT_DATA_STOP: /* packet is not data and we should STOP the packet process loop */
log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, stop");
goto exit_unlock;
break;
case KNET_TRANSPORT_RX_IS_DATA: /* packet is data and should be parsed as such */
/*
* processing incoming packets vs access lists
*/
if ((knet_h->use_access_lists) &&
(transport_get_acl_type(knet_h, transport) == USE_GENERIC_ACL)) {
if (!check_validate(knet_h, sockfd, transport, msg[i].msg_hdr.msg_name)) {
char src_ipaddr[KNET_MAX_HOST_LEN];
char src_port[KNET_MAX_PORT_LEN];
memset(src_ipaddr, 0, KNET_MAX_HOST_LEN);
memset(src_port, 0, KNET_MAX_PORT_LEN);
if (knet_addrtostr(msg[i].msg_hdr.msg_name, sockaddr_len(msg[i].msg_hdr.msg_name),
src_ipaddr, KNET_MAX_HOST_LEN,
src_port, KNET_MAX_PORT_LEN) < 0) {
log_debug(knet_h, KNET_SUB_RX, "Packet rejected: unable to resolve host/port");
} else {
log_debug(knet_h, KNET_SUB_RX, "Packet rejected from %s/%s", src_ipaddr, src_port);
}
/*
* continue processing the other packets
*/
continue;
}
}
_parse_recv_from_links(knet_h, sockfd, &msg[i]);
break;
case KNET_TRANSPORT_RX_OOB_DATA_CONTINUE:
log_debug(knet_h, KNET_SUB_RX, "Transport is processing sock OOB data, continue");
break;
case KNET_TRANSPORT_RX_OOB_DATA_STOP:
log_debug(knet_h, KNET_SUB_RX, "Transport has completed processing sock OOB data, stop");
goto exit_unlock;
break;
}
}
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
}
void *_handle_recv_from_links_thread(void *data)
{
int i, nev;
knet_handle_t knet_h = (knet_handle_t) data;
struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
struct sockaddr_storage address[PCKT_RX_BUFS];
struct knet_mmsghdr msg[PCKT_RX_BUFS];
struct iovec iov_in[PCKT_RX_BUFS];
set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_STARTED);
memset(&msg, 0, sizeof(msg));
for (i = 0; i < PCKT_RX_BUFS; i++) {
iov_in[i].iov_base = (void *)knet_h->recv_from_links_buf[i];
iov_in[i].iov_len = KNET_DATABUFSIZE;
memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr));
msg[i].msg_hdr.msg_name = &address[i];
msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
msg[i].msg_hdr.msg_iov = &iov_in[i];
msg[i].msg_hdr.msg_iovlen = 1;
}
while (!shutdown_in_progress(knet_h)) {
nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, KNET_THREADS_TIMERES / 1000);
/*
* the RX threads only need to notify that there has been at least
* one successful run after queue flush has been requested.
* See setfwd in handle.c
*/
if (get_thread_flush_queue(knet_h, KNET_THREAD_RX) == KNET_THREAD_QUEUE_FLUSH) {
set_thread_flush_queue(knet_h, KNET_THREAD_RX, KNET_THREAD_QUEUE_FLUSHED);
}
/*
* we use timeout to detect if thread is shutting down
*/
if (nev == 0) {
continue;
}
for (i = 0; i < nev; i++) {
_handle_recv_from_links(knet_h, events[i].data.fd, msg);
}
}
set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_STOPPED);
return NULL;
}
+
+ssize_t knet_recv(knet_handle_t knet_h, char *buff, const size_t buff_len, const int8_t channel)
+{
+ int savederrno = 0;
+ ssize_t err = 0;
+ struct iovec iov_in;
+
+ if (!knet_h) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (buff == NULL) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (buff_len <= 0) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (buff_len > KNET_MAX_PACKET_SIZE) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (channel < 0) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (channel >= KNET_DATAFD_MAX) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
+ if (savederrno) {
+ log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
+ strerror(savederrno));
+ errno = savederrno;
+ return -1;
+ }
+
+ if (!knet_h->sockfd[channel].in_use) {
+ savederrno = EINVAL;
+ err = -1;
+ goto out_unlock;
+ }
+
+ memset(&iov_in, 0, sizeof(iov_in));
+ iov_in.iov_base = (void *)buff;
+ iov_in.iov_len = buff_len;
+
+ err = readv(knet_h->sockfd[channel].sockfd[0], &iov_in, 1);
+ savederrno = errno;
+
+out_unlock:
+ pthread_rwlock_unlock(&knet_h->global_rwlock);
+ errno = err ? savederrno : 0;
+ return err;
+}
diff --git a/libknet/threads_tx.c b/libknet/threads_tx.c
index be5fb6b2..852064c2 100644
--- a/libknet/threads_tx.c
+++ b/libknet/threads_tx.c
@@ -1,837 +1,901 @@
/*
* Copyright (C) 2012-2020 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under LGPL-2.0+
*/
#include "config.h"
#include <math.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <sys/uio.h>
#include <errno.h>
#include "compat.h"
#include "compress.h"
#include "crypto.h"
#include "host.h"
#include "link.h"
#include "logging.h"
#include "transports.h"
#include "transport_common.h"
#include "threads_common.h"
#include "threads_heartbeat.h"
#include "threads_tx.h"
#include "netutils.h"
/*
* SEND
*/
static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_mmsghdr *msg, int msgs_to_send)
{
int link_idx, msg_idx, sent_msgs, prev_sent, progress;
int err = 0, savederrno = 0, locked = 0;
unsigned int i;
struct knet_mmsghdr *cur;
struct knet_link *cur_link;
for (link_idx = 0; link_idx < dst_host->active_link_entries; link_idx++) {
prev_sent = 0;
progress = 1;
locked = 0;
cur_link = &dst_host->link[dst_host->active_links[link_idx]];
if (cur_link->transport == KNET_TRANSPORT_LOOPBACK) {
continue;
}
savederrno = pthread_mutex_lock(&cur_link->link_stats_mutex);
if (savederrno) {
log_err(knet_h, KNET_SUB_TX, "Unable to get stats mutex lock for host %u link %u: %s",
dst_host->host_id, cur_link->link_id, strerror(savederrno));
continue;
}
locked = 1;
msg_idx = 0;
while (msg_idx < msgs_to_send) {
msg[msg_idx].msg_hdr.msg_name = &cur_link->dst_addr;
/* Cast for Linux/BSD compatibility */
for (i=0; i<(unsigned int)msg[msg_idx].msg_hdr.msg_iovlen; i++) {
cur_link->status.stats.tx_data_bytes += msg[msg_idx].msg_hdr.msg_iov[i].iov_len;
}
cur_link->status.stats.tx_data_packets++;
msg_idx++;
}
retry:
cur = &msg[prev_sent];
sent_msgs = _sendmmsg(dst_host->link[dst_host->active_links[link_idx]].outsock,
transport_get_connection_oriented(knet_h, dst_host->link[dst_host->active_links[link_idx]].transport),
&cur[0], msgs_to_send - prev_sent, MSG_DONTWAIT | MSG_NOSIGNAL);
savederrno = errno;
err = transport_tx_sock_error(knet_h, dst_host->link[dst_host->active_links[link_idx]].transport, dst_host->link[dst_host->active_links[link_idx]].outsock, sent_msgs, savederrno);
switch(err) {
case -1: /* unrecoverable error */
cur_link->status.stats.tx_data_errors++;
goto out_unlock;
break;
case 0: /* ignore error and continue */
break;
case 1: /* retry to send those same data */
cur_link->status.stats.tx_data_retries++;
goto retry;
break;
}
prev_sent = prev_sent + sent_msgs;
if ((sent_msgs >= 0) && (prev_sent < msgs_to_send)) {
if ((sent_msgs) || (progress)) {
if (sent_msgs) {
progress = 1;
} else {
progress = 0;
}
#ifdef DEBUG
log_debug(knet_h, KNET_SUB_TX, "Unable to send all (%d/%d) data packets to host %s (%u) link %s:%s (%u)",
sent_msgs, msg_idx,
dst_host->name, dst_host->host_id,
dst_host->link[dst_host->active_links[link_idx]].status.dst_ipaddr,
dst_host->link[dst_host->active_links[link_idx]].status.dst_port,
dst_host->link[dst_host->active_links[link_idx]].link_id);
#endif
goto retry;
}
if (!progress) {
savederrno = EAGAIN;
err = -1;
goto out_unlock;
}
}
if ((dst_host->link_handler_policy == KNET_LINK_POLICY_RR) &&
(dst_host->active_link_entries > 1)) {
uint8_t cur_link_id = dst_host->active_links[0];
memmove(&dst_host->active_links[0], &dst_host->active_links[1], KNET_MAX_LINK - 1);
dst_host->active_links[dst_host->active_link_entries - 1] = cur_link_id;
break;
}
pthread_mutex_unlock(&cur_link->link_stats_mutex);
locked = 0;
}
out_unlock:
if (locked) {
pthread_mutex_unlock(&cur_link->link_stats_mutex);
}
errno = savederrno;
return err;
}
static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t channel, int is_sync)
{
size_t outlen, frag_len;
struct knet_host *dst_host;
knet_node_id_t dst_host_ids_temp[KNET_MAX_HOST];
size_t dst_host_ids_entries_temp = 0;
knet_node_id_t dst_host_ids[KNET_MAX_HOST];
size_t dst_host_ids_entries = 0;
int bcast = 1;
struct knet_hostinfo *knet_hostinfo;
struct iovec iov_out[PCKT_FRAG_MAX][2];
int iovcnt_out = 2;
uint8_t frag_idx;
unsigned int temp_data_mtu;
size_t host_idx;
int send_mcast = 0;
struct knet_header *inbuf;
int savederrno = 0;
int err = 0;
seq_num_t tx_seq_num;
struct knet_mmsghdr msg[PCKT_FRAG_MAX];
int msgs_to_send, msg_idx;
unsigned int i;
int j;
int send_local = 0;
int data_compressed = 0;
size_t uncrypted_frag_size;
int stats_locked = 0, stats_err = 0;
inbuf = knet_h->recv_from_sock_buf;
if ((knet_h->enabled != 1) &&
(inbuf->kh_type != KNET_HEADER_TYPE_HOST_INFO)) { /* data forward is disabled */
log_debug(knet_h, KNET_SUB_TX, "Received data packet but forwarding is disabled");
savederrno = ECANCELED;
err = -1;
goto out_unlock;
}
/*
* move this into a separate function to expand on
* extra switching rules
*/
switch(inbuf->kh_type) {
case KNET_HEADER_TYPE_DATA:
if (knet_h->dst_host_filter_fn) {
bcast = knet_h->dst_host_filter_fn(
knet_h->dst_host_filter_fn_private_data,
(const unsigned char *)inbuf->khp_data_userdata,
inlen,
KNET_NOTIFY_TX,
knet_h->host_id,
knet_h->host_id,
&channel,
dst_host_ids_temp,
&dst_host_ids_entries_temp);
if (bcast < 0) {
log_debug(knet_h, KNET_SUB_TX, "Error from dst_host_filter_fn: %d", bcast);
savederrno = EFAULT;
err = -1;
goto out_unlock;
}
if ((!bcast) && (!dst_host_ids_entries_temp)) {
log_debug(knet_h, KNET_SUB_TX, "Message is unicast but no dst_host_ids_entries");
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
if ((!bcast) &&
(dst_host_ids_entries_temp > KNET_MAX_HOST)) {
log_debug(knet_h, KNET_SUB_TX, "dst_host_filter_fn returned too many destinations");
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
}
/* Send to localhost if appropriate and enabled */
if (knet_h->has_loop_link) {
send_local = 0;
if (bcast) {
send_local = 1;
} else {
for (i=0; i< dst_host_ids_entries_temp; i++) {
if (dst_host_ids_temp[i] == knet_h->host_id) {
send_local = 1;
}
}
}
if (send_local) {
const unsigned char *buf = inbuf->khp_data_userdata;
ssize_t buflen = inlen;
struct knet_link *local_link;
local_link = knet_h->host_index[knet_h->host_id]->link;
local_retry:
err = write(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], buf, buflen);
if (err < 0) {
log_err(knet_h, KNET_SUB_TRANSP_LOOPBACK, "send local failed. error=%s\n", strerror(errno));
local_link->status.stats.tx_data_errors++;
}
if (err > 0 && err < buflen) {
log_debug(knet_h, KNET_SUB_TRANSP_LOOPBACK, "send local incomplete=%d bytes of %zu\n", err, inlen);
local_link->status.stats.tx_data_retries++;
buf += err;
buflen -= err;
goto local_retry;
}
if (err == buflen) {
local_link->status.stats.tx_data_packets++;
local_link->status.stats.tx_data_bytes += inlen;
}
}
}
break;
case KNET_HEADER_TYPE_HOST_INFO:
knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata;
if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) {
bcast = 0;
dst_host_ids_temp[0] = knet_hostinfo->khi_dst_node_id;
dst_host_ids_entries_temp = 1;
knet_hostinfo->khi_dst_node_id = htons(knet_hostinfo->khi_dst_node_id);
}
break;
default:
log_warn(knet_h, KNET_SUB_TX, "Receiving unknown messages from socket");
savederrno = ENOMSG;
err = -1;
goto out_unlock;
break;
}
if (is_sync) {
if ((bcast) ||
((!bcast) && (dst_host_ids_entries_temp > 1))) {
log_debug(knet_h, KNET_SUB_TX, "knet_send_sync is only supported with unicast packets for one destination");
savederrno = E2BIG;
err = -1;
goto out_unlock;
}
}
/*
* check destinations hosts before spending time
* in fragmenting/encrypting packets to save
* time processing data for unreachable hosts.
* for unicast, also remap the destination data
* to skip unreachable hosts.
*/
if (!bcast) {
dst_host_ids_entries = 0;
for (host_idx = 0; host_idx < dst_host_ids_entries_temp; host_idx++) {
dst_host = knet_h->host_index[dst_host_ids_temp[host_idx]];
if (!dst_host) {
continue;
}
if (!(dst_host->host_id == knet_h->host_id &&
knet_h->has_loop_link) &&
dst_host->status.reachable) {
dst_host_ids[dst_host_ids_entries] = dst_host_ids_temp[host_idx];
dst_host_ids_entries++;
}
}
if (!dst_host_ids_entries) {
savederrno = EHOSTDOWN;
err = -1;
goto out_unlock;
}
} else {
send_mcast = 0;
for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
if (!(dst_host->host_id == knet_h->host_id &&
knet_h->has_loop_link) &&
dst_host->status.reachable) {
send_mcast = 1;
break;
}
}
if (!send_mcast) {
savederrno = EHOSTDOWN;
err = -1;
goto out_unlock;
}
}
if (!knet_h->data_mtu) {
/*
* using MIN_MTU_V4 for data mtu is not completely accurate but safe enough
*/
log_debug(knet_h, KNET_SUB_TX,
"Received data packet but data MTU is still unknown."
" Packet might not be delivered."
" Assuming minimum IPv4 MTU (%d)",
KNET_PMTUD_MIN_MTU_V4);
temp_data_mtu = KNET_PMTUD_MIN_MTU_V4;
} else {
/*
* take a copy of the mtu to avoid value changing under
* our feet while we are sending a fragmented pckt
*/
temp_data_mtu = knet_h->data_mtu;
}
/*
* compress data
*/
if ((knet_h->compress_model > 0) && (inlen > knet_h->compress_threshold)) {
size_t cmp_outlen = KNET_DATABUFSIZE_COMPRESS;
struct timespec start_time;
struct timespec end_time;
uint64_t compress_time;
clock_gettime(CLOCK_MONOTONIC, &start_time);
err = compress(knet_h,
(const unsigned char *)inbuf->khp_data_userdata, inlen,
knet_h->send_to_links_buf_compress, (ssize_t *)&cmp_outlen);
savederrno = errno;
stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
if (stats_err < 0) {
log_err(knet_h, KNET_SUB_TX, "Unable to get mutex lock: %s", strerror(stats_err));
err = -1;
savederrno = stats_err;
goto out_unlock;
}
stats_locked = 1;
/* Collect stats */
clock_gettime(CLOCK_MONOTONIC, &end_time);
timespec_diff(start_time, end_time, &compress_time);
if (compress_time < knet_h->stats.tx_compress_time_min) {
knet_h->stats.tx_compress_time_min = compress_time;
}
if (compress_time > knet_h->stats.tx_compress_time_max) {
knet_h->stats.tx_compress_time_max = compress_time;
}
knet_h->stats.tx_compress_time_ave =
(unsigned long long)(knet_h->stats.tx_compress_time_ave * knet_h->stats.tx_compressed_packets +
compress_time) / (knet_h->stats.tx_compressed_packets+1);
if (err < 0) {
log_warn(knet_h, KNET_SUB_COMPRESS, "Compression failed (%d): %s", err, strerror(savederrno));
} else {
knet_h->stats.tx_compressed_packets++;
knet_h->stats.tx_compressed_original_bytes += inlen;
knet_h->stats.tx_compressed_size_bytes += cmp_outlen;
if (cmp_outlen < inlen) {
memmove(inbuf->khp_data_userdata, knet_h->send_to_links_buf_compress, cmp_outlen);
inlen = cmp_outlen;
data_compressed = 1;
}
}
}
if (!stats_locked) {
stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
if (stats_err < 0) {
log_err(knet_h, KNET_SUB_TX, "Unable to get mutex lock: %s", strerror(stats_err));
err = -1;
savederrno = stats_err;
goto out_unlock;
}
}
if (knet_h->compress_model > 0 && !data_compressed) {
knet_h->stats.tx_uncompressed_packets++;
}
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
stats_locked = 0;
/*
* prepare the outgoing buffers
*/
frag_len = inlen;
frag_idx = 0;
inbuf->khp_data_bcast = bcast;
inbuf->khp_data_frag_num = ceil((float)inlen / temp_data_mtu);
inbuf->khp_data_channel = channel;
if (data_compressed) {
inbuf->khp_data_compress = knet_h->compress_model;
} else {
inbuf->khp_data_compress = 0;
}
if (pthread_mutex_lock(&knet_h->tx_seq_num_mutex)) {
log_debug(knet_h, KNET_SUB_TX, "Unable to get seq mutex lock");
goto out_unlock;
}
knet_h->tx_seq_num++;
/*
* force seq_num 0 to detect a node that has crashed and rejoining
* the knet instance. seq_num 0 will clear the buffers in the RX
* thread
*/
if (knet_h->tx_seq_num == 0) {
knet_h->tx_seq_num++;
}
/*
* cache the value in locked context
*/
tx_seq_num = knet_h->tx_seq_num;
inbuf->khp_data_seq_num = htons(knet_h->tx_seq_num);
pthread_mutex_unlock(&knet_h->tx_seq_num_mutex);
/*
* forcefully broadcast a ping to all nodes every SEQ_MAX / 8
* pckts.
* this solves 2 problems:
* 1) on TX socket overloads we generate extra pings to keep links alive
* 2) in 3+ nodes setup, where all the traffic is flowing between node 1 and 2,
* node 3+ will be able to keep in sync on the TX seq_num even without
* receiving traffic or pings in betweens. This avoids issues with
* rollover of the circular buffer
*/
if (tx_seq_num % (SEQ_MAX / 8) == 0) {
_send_pings(knet_h, 0);
}
if (inbuf->khp_data_frag_num > 1) {
while (frag_idx < inbuf->khp_data_frag_num) {
/*
* set the iov_base
*/
iov_out[frag_idx][0].iov_base = (void *)knet_h->send_to_links_buf[frag_idx];
iov_out[frag_idx][0].iov_len = KNET_HEADER_DATA_SIZE;
iov_out[frag_idx][1].iov_base = inbuf->khp_data_userdata + (temp_data_mtu * frag_idx);
/*
* set the len
*/
if (frag_len > temp_data_mtu) {
iov_out[frag_idx][1].iov_len = temp_data_mtu;
} else {
iov_out[frag_idx][1].iov_len = frag_len;
}
/*
* copy the frag info on all buffers
*/
knet_h->send_to_links_buf[frag_idx]->kh_type = inbuf->kh_type;
knet_h->send_to_links_buf[frag_idx]->khp_data_seq_num = inbuf->khp_data_seq_num;
knet_h->send_to_links_buf[frag_idx]->khp_data_frag_num = inbuf->khp_data_frag_num;
knet_h->send_to_links_buf[frag_idx]->khp_data_bcast = inbuf->khp_data_bcast;
knet_h->send_to_links_buf[frag_idx]->khp_data_channel = inbuf->khp_data_channel;
knet_h->send_to_links_buf[frag_idx]->khp_data_compress = inbuf->khp_data_compress;
frag_len = frag_len - temp_data_mtu;
frag_idx++;
}
iovcnt_out = 2;
} else {
iov_out[frag_idx][0].iov_base = (void *)inbuf;
iov_out[frag_idx][0].iov_len = frag_len + KNET_HEADER_DATA_SIZE;
iovcnt_out = 1;
}
if (knet_h->crypto_in_use_config) {
struct timespec start_time;
struct timespec end_time;
uint64_t crypt_time;
frag_idx = 0;
while (frag_idx < inbuf->khp_data_frag_num) {
clock_gettime(CLOCK_MONOTONIC, &start_time);
if (crypto_encrypt_and_signv(
knet_h,
iov_out[frag_idx], iovcnt_out,
knet_h->send_to_links_buf_crypt[frag_idx],
(ssize_t *)&outlen) < 0) {
log_debug(knet_h, KNET_SUB_TX, "Unable to encrypt packet");
savederrno = ECHILD;
err = -1;
goto out_unlock;
}
clock_gettime(CLOCK_MONOTONIC, &end_time);
timespec_diff(start_time, end_time, &crypt_time);
stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
if (stats_err < 0) {
log_err(knet_h, KNET_SUB_TX, "Unable to get mutex lock: %s", strerror(stats_err));
err = -1;
savederrno = stats_err;
goto out_unlock;
}
if (crypt_time < knet_h->stats.tx_crypt_time_min) {
knet_h->stats.tx_crypt_time_min = crypt_time;
}
if (crypt_time > knet_h->stats.tx_crypt_time_max) {
knet_h->stats.tx_crypt_time_max = crypt_time;
}
knet_h->stats.tx_crypt_time_ave =
(knet_h->stats.tx_crypt_time_ave * knet_h->stats.tx_crypt_packets +
crypt_time) / (knet_h->stats.tx_crypt_packets+1);
uncrypted_frag_size = 0;
for (j=0; j < iovcnt_out; j++) {
uncrypted_frag_size += iov_out[frag_idx][j].iov_len;
}
knet_h->stats.tx_crypt_byte_overhead += (outlen - uncrypted_frag_size);
knet_h->stats.tx_crypt_packets++;
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
iov_out[frag_idx][0].iov_base = knet_h->send_to_links_buf_crypt[frag_idx];
iov_out[frag_idx][0].iov_len = outlen;
frag_idx++;
}
iovcnt_out = 1;
}
memset(&msg, 0, sizeof(msg));
msgs_to_send = inbuf->khp_data_frag_num;
msg_idx = 0;
while (msg_idx < msgs_to_send) {
msg[msg_idx].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
msg[msg_idx].msg_hdr.msg_iov = &iov_out[msg_idx][0];
msg[msg_idx].msg_hdr.msg_iovlen = iovcnt_out;
msg_idx++;
}
if (!bcast) {
for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
dst_host = knet_h->host_index[dst_host_ids[host_idx]];
err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send);
savederrno = errno;
if (err) {
goto out_unlock;
}
}
} else {
for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
if (dst_host->status.reachable) {
err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send);
savederrno = errno;
if (err) {
goto out_unlock;
}
}
}
}
out_unlock:
errno = savederrno;
return err;
}
-int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel)
-{
- int savederrno = 0, err = 0;
-
- if (!knet_h) {
- errno = EINVAL;
- return -1;
- }
-
- if (buff == NULL) {
- errno = EINVAL;
- return -1;
- }
-
- if (buff_len <= 0) {
- errno = EINVAL;
- return -1;
- }
-
- if (buff_len > KNET_MAX_PACKET_SIZE) {
- errno = EINVAL;
- return -1;
- }
-
- if (channel < 0) {
- errno = EINVAL;
- return -1;
- }
-
- if (channel >= KNET_DATAFD_MAX) {
- errno = EINVAL;
- return -1;
- }
-
- savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
- if (savederrno) {
- log_err(knet_h, KNET_SUB_TX, "Unable to get read lock: %s",
- strerror(savederrno));
- errno = savederrno;
- return -1;
- }
-
- if (!knet_h->sockfd[channel].in_use) {
- savederrno = EINVAL;
- err = -1;
- goto out;
- }
-
- savederrno = pthread_mutex_lock(&knet_h->tx_mutex);
- if (savederrno) {
- log_err(knet_h, KNET_SUB_TX, "Unable to get TX mutex lock: %s",
- strerror(savederrno));
- err = -1;
- goto out;
- }
-
- knet_h->recv_from_sock_buf->kh_type = KNET_HEADER_TYPE_DATA;
- memmove(knet_h->recv_from_sock_buf->khp_data_userdata, buff, buff_len);
- err = _parse_recv_from_sock(knet_h, buff_len, channel, 1);
- savederrno = errno;
-
- pthread_mutex_unlock(&knet_h->tx_mutex);
-
-out:
- pthread_rwlock_unlock(&knet_h->global_rwlock);
-
- errno = err ? savederrno : 0;
- return err;
-}
-
static void _handle_send_to_links(knet_handle_t knet_h, struct msghdr *msg, int sockfd, int8_t channel, int type)
{
ssize_t inlen = 0;
int savederrno = 0, docallback = 0;
if ((channel >= 0) &&
(channel < KNET_DATAFD_MAX) &&
(!knet_h->sockfd[channel].is_socket)) {
inlen = readv(sockfd, msg->msg_iov, 1);
} else {
inlen = recvmsg(sockfd, msg, MSG_DONTWAIT | MSG_NOSIGNAL);
if (msg->msg_flags & MSG_TRUNC) {
log_warn(knet_h, KNET_SUB_TX, "Received truncated message from sock %d. Discarding", sockfd);
return;
}
}
if (inlen == 0) {
savederrno = 0;
docallback = 1;
} else if (inlen < 0) {
struct epoll_event ev;
savederrno = errno;
docallback = 1;
memset(&ev, 0, sizeof(struct epoll_event));
if (channel != KNET_INTERNAL_DATA_CHANNEL) {
if (epoll_ctl(knet_h->send_to_links_epollfd,
EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) {
log_err(knet_h, KNET_SUB_TX, "Unable to del datafd %d from linkfd epoll pool: %s",
knet_h->sockfd[channel].sockfd[0], strerror(savederrno));
} else {
knet_h->sockfd[channel].has_error = 1;
}
}
/*
* TODO: add error handling for KNET_INTERNAL_DATA_CHANNEL
* once we add support for internal knet communication
*/
} else {
knet_h->recv_from_sock_buf->kh_type = type;
_parse_recv_from_sock(knet_h, inlen, channel, 0);
}
if ((docallback) && (channel != KNET_INTERNAL_DATA_CHANNEL)) {
knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
knet_h->sockfd[channel].sockfd[0],
channel,
KNET_NOTIFY_TX,
inlen,
savederrno);
}
}
void *_handle_send_to_links_thread(void *data)
{
knet_handle_t knet_h = (knet_handle_t) data;
struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
int i, nev, type;
int flush, flush_queue_limit;
int8_t channel;
struct iovec iov_in;
struct msghdr msg;
struct sockaddr_storage address;
set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_STARTED);
memset(&iov_in, 0, sizeof(iov_in));
iov_in.iov_base = (void *)knet_h->recv_from_sock_buf->khp_data_userdata;
iov_in.iov_len = KNET_MAX_PACKET_SIZE;
memset(&msg, 0, sizeof(struct msghdr));
msg.msg_name = &address;
msg.msg_namelen = sizeof(struct sockaddr_storage);
msg.msg_iov = &iov_in;
msg.msg_iovlen = 1;
knet_h->recv_from_sock_buf->kh_version = KNET_HEADER_VERSION;
knet_h->recv_from_sock_buf->khp_data_frag_seq = 0;
knet_h->recv_from_sock_buf->kh_node = htons(knet_h->host_id);
for (i = 0; i < PCKT_FRAG_MAX; i++) {
knet_h->send_to_links_buf[i]->kh_version = KNET_HEADER_VERSION;
knet_h->send_to_links_buf[i]->khp_data_frag_seq = i + 1;
knet_h->send_to_links_buf[i]->kh_node = htons(knet_h->host_id);
}
flush_queue_limit = 0;
while (!shutdown_in_progress(knet_h)) {
nev = epoll_wait(knet_h->send_to_links_epollfd, events, KNET_EPOLL_MAX_EVENTS + 1, KNET_THREADS_TIMERES / 1000);
flush = get_thread_flush_queue(knet_h, KNET_THREAD_TX);
/*
* we use timeout to detect if thread is shutting down
*/
if (nev == 0) {
/*
* ideally we want to communicate that we are done flushing
* the queue when we have an epoll timeout event
*/
if (flush == KNET_THREAD_QUEUE_FLUSH) {
set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSHED);
flush_queue_limit = 0;
}
continue;
}
/*
* fall back in case the TX sockets will continue receive traffic
* and we do not hit an epoll timeout.
*
* allow up to a 100 loops to flush queues, then we give up.
* there might be more clean ways to do it by checking the buffer queue
* on each socket, but we have tons of sockets and calculations can go wrong.
* Also, why would you disable data forwarding and still send packets?
*/
if (flush == KNET_THREAD_QUEUE_FLUSH) {
if (flush_queue_limit >= 100) {
log_debug(knet_h, KNET_SUB_TX, "Timeout flushing the TX queue, expect packet loss");
set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSHED);
flush_queue_limit = 0;
} else {
flush_queue_limit++;
}
} else {
flush_queue_limit = 0;
}
if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
log_debug(knet_h, KNET_SUB_TX, "Unable to get read lock");
continue;
}
for (i = 0; i < nev; i++) {
if (events[i].data.fd == knet_h->hostsockfd[0]) {
type = KNET_HEADER_TYPE_HOST_INFO;
channel = KNET_INTERNAL_DATA_CHANNEL;
} else {
type = KNET_HEADER_TYPE_DATA;
for (channel = 0; channel < KNET_DATAFD_MAX; channel++) {
if ((knet_h->sockfd[channel].in_use) &&
(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created] == events[i].data.fd)) {
break;
}
}
if (channel >= KNET_DATAFD_MAX) {
log_debug(knet_h, KNET_SUB_TX, "No available channels");
continue; /* channel not found */
}
}
if (pthread_mutex_lock(&knet_h->tx_mutex) != 0) {
log_debug(knet_h, KNET_SUB_TX, "Unable to get mutex lock");
continue;
}
_handle_send_to_links(knet_h, &msg, events[i].data.fd, channel, type);
pthread_mutex_unlock(&knet_h->tx_mutex);
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
}
set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_STOPPED);
return NULL;
}
+
+int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel)
+{
+ int savederrno = 0, err = 0;
+
+ if (!knet_h) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (buff == NULL) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (buff_len <= 0) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (buff_len > KNET_MAX_PACKET_SIZE) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (channel < 0) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (channel >= KNET_DATAFD_MAX) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
+ if (savederrno) {
+ log_err(knet_h, KNET_SUB_TX, "Unable to get read lock: %s",
+ strerror(savederrno));
+ errno = savederrno;
+ return -1;
+ }
+
+ if (!knet_h->sockfd[channel].in_use) {
+ savederrno = EINVAL;
+ err = -1;
+ goto out;
+ }
+
+ savederrno = pthread_mutex_lock(&knet_h->tx_mutex);
+ if (savederrno) {
+ log_err(knet_h, KNET_SUB_TX, "Unable to get TX mutex lock: %s",
+ strerror(savederrno));
+ err = -1;
+ goto out;
+ }
+
+ knet_h->recv_from_sock_buf->kh_type = KNET_HEADER_TYPE_DATA;
+ memmove(knet_h->recv_from_sock_buf->khp_data_userdata, buff, buff_len);
+ err = _parse_recv_from_sock(knet_h, buff_len, channel, 1);
+ savederrno = errno;
+
+ pthread_mutex_unlock(&knet_h->tx_mutex);
+
+out:
+ pthread_rwlock_unlock(&knet_h->global_rwlock);
+
+ errno = err ? savederrno : 0;
+ return err;
+}
+
+ssize_t knet_send(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel)
+{
+ int savederrno = 0;
+ ssize_t err = 0;
+ struct iovec iov_out[1];
+
+ if (!knet_h) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (buff == NULL) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (buff_len <= 0) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (buff_len > KNET_MAX_PACKET_SIZE) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (channel < 0) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (channel >= KNET_DATAFD_MAX) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
+ if (savederrno) {
+ log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
+ strerror(savederrno));
+ errno = savederrno;
+ return -1;
+ }
+
+ if (!knet_h->sockfd[channel].in_use) {
+ savederrno = EINVAL;
+ err = -1;
+ goto out_unlock;
+ }
+
+ memset(iov_out, 0, sizeof(iov_out));
+
+ iov_out[0].iov_base = (void *)buff;
+ iov_out[0].iov_len = buff_len;
+
+ err = writev(knet_h->sockfd[channel].sockfd[0], iov_out, 1);
+ savederrno = errno;
+
+out_unlock:
+ pthread_rwlock_unlock(&knet_h->global_rwlock);
+ errno = err ? savederrno : 0;
+ return err;
+}

File Metadata

Mime Type
text/x-diff
Expires
Mon, Apr 21, 7:11 PM (10 h, 52 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1665418
Default Alt Text
(181 KB)

Event Timeline