Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/libknet/common.c b/libknet/common.c
index c908e23f..be46f23e 100644
--- a/libknet/common.c
+++ b/libknet/common.c
@@ -1,156 +1,156 @@
/*
* Copyright (C) 2010-2019 Red Hat, Inc. All rights reserved.
*
* Author: Fabio M. Di Nitto <fabbione@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <unistd.h>
#include <fcntl.h>
#include <dlfcn.h>
#include <errno.h>
#include <string.h>
#include <sys/param.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <stdio.h>
#include <stdlib.h>
#include "logging.h"
#include "common.h"
int _fdset_cloexec(int fd)
{
int fdflags;
fdflags = fcntl(fd, F_GETFD, 0);
if (fdflags < 0)
return -1;
fdflags |= FD_CLOEXEC;
if (fcntl(fd, F_SETFD, fdflags) < 0)
return -1;
return 0;
}
int _fdset_nonblock(int fd)
{
int fdflags;
fdflags = fcntl(fd, F_GETFL, 0);
if (fdflags < 0)
return -1;
fdflags |= O_NONBLOCK;
if (fcntl(fd, F_SETFL, fdflags) < 0)
return -1;
return 0;
}
static void *open_lib(knet_handle_t knet_h, const char *libname, int extra_flags)
{
void *ret = NULL;
char *error = NULL;
char dir[MAXPATHLEN], path[MAXPATHLEN * 2], link[MAXPATHLEN];
struct stat sb;
/*
* clear any pending error
*/
dlerror();
ret = dlopen(libname, RTLD_NOW | RTLD_GLOBAL | extra_flags);
error = dlerror();
if (error != NULL) {
log_err(knet_h, KNET_SUB_COMMON, "unable to dlopen %s: %s",
libname, error);
errno = EAGAIN;
return NULL;
}
memset(dir, 0, sizeof(dir));
memset(link, 0, sizeof(link));
memset(path, 0, sizeof(path));
if (dlinfo(ret, RTLD_DI_ORIGIN, &dir) < 0) {
/*
* should we dlclose and return error?
*/
error = dlerror();
log_warn(knet_h, KNET_SUB_COMMON, "unable to dlinfo %s: %s",
libname, error);
} else {
snprintf(path, sizeof(path), "%s/%s", dir, libname);
log_info(knet_h, KNET_SUB_COMMON, "%s has been loaded from %s", libname, path);
/*
* try to resolve the library and check if it is a symlink and to where.
* we can't prevent symlink attacks but at least we can log where the library
* has been loaded from
*/
if (lstat(path, &sb) < 0) {
log_debug(knet_h, KNET_SUB_COMMON, "Unable to stat %s: %s", path, strerror(errno));
goto out;
}
if (S_ISLNK(sb.st_mode)) {
- if (readlink(path, link, sizeof(link)) < 0) {
+ if (readlink(path, link, sizeof(link)-1) < 0) {
log_debug(knet_h, KNET_SUB_COMMON, "Unable to readlink %s: %s", path, strerror(errno));
goto out;
}
/*
* symlink is relative to the directory
*/
if (link[0] != '/') {
snprintf(path, sizeof(path), "%s/%s", dir, link);
log_info(knet_h, KNET_SUB_COMMON, "%s/%s is a symlink to %s", dir, libname, path);
} else {
log_info(knet_h, KNET_SUB_COMMON, "%s/%s is a symlink to %s", dir, libname, link);
}
}
}
out:
return ret;
}
void *load_module(knet_handle_t knet_h, const char *type, const char *name)
{
void *module, *ops;
log_msg_t **log_msg_sym;
char soname[MAXPATHLEN], opsname[MAXPATHLEN];
snprintf (soname, sizeof soname, "%s_%s.so", type, name);
module = open_lib(knet_h, soname, 0);
if (!module) {
return NULL;
}
log_msg_sym = dlsym (module, "log_msg");
if (!log_msg_sym) {
log_err (knet_h, KNET_SUB_COMMON, "unable to map symbol 'log_msg' in module %s: %s",
soname, dlerror ());
errno = EINVAL;
return NULL;
}
*log_msg_sym = log_msg;
snprintf (opsname, sizeof opsname, "%s_model", type);
ops = dlsym (module, opsname);
if (!ops) {
log_err (knet_h, KNET_SUB_COMMON, "unable to map symbol 'model' in module %s: %s",
soname, dlerror ());
errno = EINVAL;
return NULL;
}
return ops;
}
diff --git a/libknet/compress.c b/libknet/compress.c
index 7eab454b..864828f3 100644
--- a/libknet/compress.c
+++ b/libknet/compress.c
@@ -1,484 +1,484 @@
/*
* Copyright (C) 2017-2019 Red Hat, Inc. All rights reserved.
*
* Author: Fabio M. Di Nitto <fabbione@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include <time.h>
#include "internals.h"
#include "compress.h"
#include "compress_model.h"
#include "logging.h"
#include "threads_common.h"
#include "common.h"
/*
* internal module switch data
*/
/*
* DO NOT CHANGE MODEL_ID HERE OR ONWIRE COMPATIBILITY
* WILL BREAK!
*
* Always add new items before the last NULL.
*/
static compress_model_t compress_modules_cmds[] = {
{ "none" , 0, 0, 0, NULL },
{ "zlib" , 1, WITH_COMPRESS_ZLIB , 0, NULL },
{ "lz4" , 2, WITH_COMPRESS_LZ4 , 0, NULL },
{ "lz4hc", 3, WITH_COMPRESS_LZ4 , 0, NULL },
{ "lzo2" , 4, WITH_COMPRESS_LZO2 , 0, NULL },
{ "lzma" , 5, WITH_COMPRESS_LZMA , 0, NULL },
{ "bzip2", 6, WITH_COMPRESS_BZIP2, 0, NULL },
{ "zstd" , 7, WITH_COMPRESS_ZSTD, 0, NULL },
{ NULL, 255, 0, 0, NULL }
};
static int max_model = 0;
static struct timespec last_load_failure;
static int compress_get_model(const char *model)
{
int idx = 0;
while (compress_modules_cmds[idx].model_name != NULL) {
if (!strcmp(compress_modules_cmds[idx].model_name, model)) {
return compress_modules_cmds[idx].model_id;
}
idx++;
}
return -1;
}
static int compress_get_max_model(void)
{
int idx = 0;
while (compress_modules_cmds[idx].model_name != NULL) {
idx++;
}
return idx - 1;
}
static int compress_is_valid_model(int compress_model)
{
int idx = 0;
while (compress_modules_cmds[idx].model_name != NULL) {
if ((compress_model == compress_modules_cmds[idx].model_id) &&
(compress_modules_cmds[idx].built_in == 1)) {
return 0;
}
idx++;
}
return -1;
}
static int val_level(
knet_handle_t knet_h,
int compress_model,
int compress_level)
{
if (compress_modules_cmds[compress_model].ops->val_level != NULL) {
return compress_modules_cmds[compress_model].ops->val_level(knet_h, compress_level);
}
return 0;
}
/*
* compress_check_lib_is_init needs to be invoked in a locked context!
*/
static int compress_check_lib_is_init(knet_handle_t knet_h, int cmp_model)
{
/*
* lack of a .is_init function means that the module does not require
* init per handle so we use a fake reference in the compress_int_data
* to identify that we already increased the libref for this handle
*/
if (compress_modules_cmds[cmp_model].loaded == 1) {
if (compress_modules_cmds[cmp_model].ops->is_init == NULL) {
if (knet_h->compress_int_data[cmp_model] != NULL) {
return 1;
}
} else {
if (compress_modules_cmds[cmp_model].ops->is_init(knet_h, cmp_model) == 1) {
return 1;
}
}
}
return 0;
}
/*
* compress_load_lib should _always_ be invoked in write lock context
*/
static int compress_load_lib(knet_handle_t knet_h, int cmp_model, int rate_limit)
{
struct timespec clock_now;
unsigned long long timediff;
/*
* checking again for paranoia and because
* compress_check_lib_is_init is usually invoked in read context
* and we need to switch from read to write locking in between.
* another thread might have init the library in the meantime
*/
if (compress_check_lib_is_init(knet_h, cmp_model)) {
return 0;
}
/*
* due to the fact that decompress can load libraries
* on demand, depending on the compress model selected
* on other nodes, it is possible for an attacker
* to send crafted packets to attempt to load libraries
* at random in a DoS fashion.
* If there is an error loading a library, then we want
* to rate_limit a retry to reload the library every X
* seconds to avoid a lock DoS that could greatly slow
* down libknet.
*/
if (rate_limit) {
if ((last_load_failure.tv_sec != 0) ||
(last_load_failure.tv_nsec != 0)) {
clock_gettime(CLOCK_MONOTONIC, &clock_now);
timespec_diff(last_load_failure, clock_now, &timediff);
if (timediff < 10000000000) {
errno = EAGAIN;
return -1;
}
}
}
if (compress_modules_cmds[cmp_model].loaded == 0) {
compress_modules_cmds[cmp_model].ops = load_module (knet_h, "compress", compress_modules_cmds[cmp_model].model_name);
if (!compress_modules_cmds[cmp_model].ops) {
clock_gettime(CLOCK_MONOTONIC, &last_load_failure);
return -1;
}
if (compress_modules_cmds[cmp_model].ops->abi_ver != KNET_COMPRESS_MODEL_ABI) {
log_err(knet_h, KNET_SUB_COMPRESS,
"ABI mismatch loading module %s. knet ver: %d, module ver: %d",
compress_modules_cmds[cmp_model].model_name, KNET_COMPRESS_MODEL_ABI,
compress_modules_cmds[cmp_model].ops->abi_ver);
errno = EINVAL;
return -1;
}
compress_modules_cmds[cmp_model].loaded = 1;
}
if (compress_modules_cmds[cmp_model].ops->init != NULL) {
if (compress_modules_cmds[cmp_model].ops->init(knet_h, cmp_model) < 0) {
return -1;
}
} else {
knet_h->compress_int_data[cmp_model] = (void *)&"1";
}
return 0;
}
static int compress_lib_test(knet_handle_t knet_h)
{
int savederrno = 0;
unsigned char src[KNET_DATABUFSIZE];
unsigned char dst[KNET_DATABUFSIZE_COMPRESS];
ssize_t dst_comp_len = KNET_DATABUFSIZE_COMPRESS, dst_decomp_len = KNET_DATABUFSIZE;
unsigned int i;
memset(src, 0, KNET_DATABUFSIZE);
memset(dst, 0, KNET_DATABUFSIZE_COMPRESS);
/*
* NOTE: we cannot use compress and decompress API calls due to locking
* so we need to call directly into the modules
*/
if (compress_modules_cmds[knet_h->compress_model].ops->compress(knet_h, src, KNET_DATABUFSIZE, dst, &dst_comp_len) < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to compress test buffer. Please check your compression settings: %s", strerror(savederrno));
errno = savederrno;
return -1;
}
if (compress_modules_cmds[knet_h->compress_model].ops->decompress(knet_h, dst, dst_comp_len, src, &dst_decomp_len) < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to decompress test buffer. Please check your compression settings: %s", strerror(savederrno));
errno = savederrno;
return -1;
}
for (i = 0; i < KNET_DATABUFSIZE; i++) {
if (src[i] != 0) {
log_err(knet_h, KNET_SUB_COMPRESS, "Decompressed buffer contains incorrect data");
errno = EINVAL;
return -1;
}
}
return 0;
}
int compress_init(
knet_handle_t knet_h)
{
max_model = compress_get_max_model();
if (max_model > KNET_MAX_COMPRESS_METHODS) {
log_err(knet_h, KNET_SUB_COMPRESS, "Too many compress methods defined in compress.c.");
errno = EINVAL;
return -1;
}
memset(&last_load_failure, 0, sizeof(struct timespec));
return 0;
}
int compress_cfg(
knet_handle_t knet_h,
struct knet_handle_compress_cfg *knet_handle_compress_cfg)
{
int savederrno = 0, err = 0;
int cmp_model;
cmp_model = compress_get_model(knet_handle_compress_cfg->compress_model);
if (cmp_model < 0) {
log_err(knet_h, KNET_SUB_COMPRESS, "compress model %s not supported", knet_handle_compress_cfg->compress_model);
errno = EINVAL;
return -1;
}
log_debug(knet_h, KNET_SUB_COMPRESS,
"Initizializing compress module [%s/%d/%u]",
knet_handle_compress_cfg->compress_model, knet_handle_compress_cfg->compress_level, knet_handle_compress_cfg->compress_threshold);
if (cmp_model > 0) {
if (compress_modules_cmds[cmp_model].built_in == 0) {
log_err(knet_h, KNET_SUB_COMPRESS, "compress model %s support has not been built in. Please contact your vendor or fix the build", knet_handle_compress_cfg->compress_model);
errno = EINVAL;
return -1;
}
if (knet_handle_compress_cfg->compress_threshold > KNET_MAX_PACKET_SIZE) {
log_err(knet_h, KNET_SUB_COMPRESS, "compress threshold cannot be higher than KNET_MAX_PACKET_SIZE (%d).",
KNET_MAX_PACKET_SIZE);
errno = EINVAL;
return -1;
}
if (knet_handle_compress_cfg->compress_threshold == 0) {
knet_h->compress_threshold = KNET_COMPRESS_THRESHOLD;
log_debug(knet_h, KNET_SUB_COMPRESS, "resetting compression threshold to default (%d)", KNET_COMPRESS_THRESHOLD);
} else {
knet_h->compress_threshold = knet_handle_compress_cfg->compress_threshold;
}
savederrno = pthread_rwlock_rdlock(&shlib_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!compress_check_lib_is_init(knet_h, cmp_model)) {
/*
* need to switch to write lock, load the lib, and return with a write lock
* this is not racy because compress_load_lib is written idempotent.
*/
pthread_rwlock_unlock(&shlib_rwlock);
savederrno = pthread_rwlock_wrlock(&shlib_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (compress_load_lib(knet_h, cmp_model, 0) < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to load library: %s",
strerror(savederrno));
err = -1;
goto out_unlock;
}
}
if (val_level(knet_h, cmp_model, knet_handle_compress_cfg->compress_level) < 0) {
log_err(knet_h, KNET_SUB_COMPRESS, "compress level %d not supported for model %s",
knet_handle_compress_cfg->compress_level, knet_handle_compress_cfg->compress_model);
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
knet_h->compress_model = cmp_model;
knet_h->compress_level = knet_handle_compress_cfg->compress_level;
if (compress_lib_test(knet_h) < 0) {
savederrno = errno;
err = -1;
goto out_unlock;
}
out_unlock:
pthread_rwlock_unlock(&shlib_rwlock);
}
if (err) {
knet_h->compress_model = 0;
knet_h->compress_level = 0;
}
errno = savederrno;
return err;
}
void compress_fini(
knet_handle_t knet_h,
int all)
{
int savederrno = 0;
int idx = 0;
savederrno = pthread_rwlock_wrlock(&shlib_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to get write lock: %s",
strerror(savederrno));
return;
}
while (compress_modules_cmds[idx].model_name != NULL) {
- if ((compress_modules_cmds[idx].built_in == 1) &&
+ if ((idx < KNET_MAX_COMPRESS_METHODS) && /* check idx first so we don't read bad data */
+ (compress_modules_cmds[idx].built_in == 1) &&
(compress_modules_cmds[idx].loaded == 1) &&
(compress_modules_cmds[idx].model_id > 0) &&
- (knet_h->compress_int_data[idx] != NULL) &&
- (idx < KNET_MAX_COMPRESS_METHODS)) {
+ (knet_h->compress_int_data[idx] != NULL)) {
if ((all) || (compress_modules_cmds[idx].model_id == knet_h->compress_model)) {
if (compress_modules_cmds[idx].ops->fini != NULL) {
compress_modules_cmds[idx].ops->fini(knet_h, idx);
} else {
knet_h->compress_int_data[idx] = NULL;
}
}
}
idx++;
}
pthread_rwlock_unlock(&shlib_rwlock);
return;
}
/*
* compress does not require compress_check_lib_is_init
* because it's protected by compress_cfg
*/
int compress(
knet_handle_t knet_h,
const unsigned char *buf_in,
const ssize_t buf_in_len,
unsigned char *buf_out,
ssize_t *buf_out_len)
{
return compress_modules_cmds[knet_h->compress_model].ops->compress(knet_h, buf_in, buf_in_len, buf_out, buf_out_len);
}
int decompress(
knet_handle_t knet_h,
int compress_model,
const unsigned char *buf_in,
const ssize_t buf_in_len,
unsigned char *buf_out,
ssize_t *buf_out_len)
{
int savederrno = 0, err = 0;
if (compress_model > max_model) {
log_err(knet_h, KNET_SUB_COMPRESS, "Received packet with unknown compress model %d", compress_model);
errno = EINVAL;
return -1;
}
if (compress_is_valid_model(compress_model) < 0) {
log_err(knet_h, KNET_SUB_COMPRESS, "Received packet compressed with %s but support is not built in this version of libknet. Please contact your distribution vendor or fix the build.", compress_modules_cmds[compress_model].model_name);
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&shlib_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!compress_check_lib_is_init(knet_h, compress_model)) {
/*
* need to switch to write lock, load the lib, and return with a write lock
* this is not racy because compress_load_lib is written idempotent.
*/
pthread_rwlock_unlock(&shlib_rwlock);
savederrno = pthread_rwlock_wrlock(&shlib_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (compress_load_lib(knet_h, compress_model, 1) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to load library: %s",
strerror(savederrno));
goto out_unlock;
}
}
err = compress_modules_cmds[compress_model].ops->decompress(knet_h, buf_in, buf_in_len, buf_out, buf_out_len);
savederrno = errno;
out_unlock:
pthread_rwlock_unlock(&shlib_rwlock);
errno = savederrno;
return err;
}
int knet_get_compress_list(struct knet_compress_info *compress_list, size_t *compress_list_entries)
{
int err = 0;
int idx = 0;
int outidx = 0;
if (!compress_list_entries) {
errno = EINVAL;
return -1;
}
while (compress_modules_cmds[idx].model_name != NULL) {
if (compress_modules_cmds[idx].built_in) {
if (compress_list) {
compress_list[outidx].name = compress_modules_cmds[idx].model_name;
}
outidx++;
}
idx++;
}
*compress_list_entries = outidx;
if (!err)
errno = 0;
return err;
}
diff --git a/libknet/crypto.c b/libknet/crypto.c
index 419f9ccc..41d67c95 100644
--- a/libknet/crypto.c
+++ b/libknet/crypto.c
@@ -1,214 +1,213 @@
/*
* Copyright (C) 2012-2019 Red Hat, Inc. All rights reserved.
*
* Author: Fabio M. Di Nitto <fabbione@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <sys/errno.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <time.h>
#include "crypto.h"
#include "crypto_model.h"
#include "internals.h"
#include "logging.h"
#include "common.h"
/*
* internal module switch data
*/
static crypto_model_t crypto_modules_cmds[] = {
{ "nss", WITH_CRYPTO_NSS, 0, NULL },
{ "openssl", WITH_CRYPTO_OPENSSL, 0, NULL },
{ NULL, 0, 0, NULL }
};
static int crypto_get_model(const char *model)
{
int idx = 0;
while (crypto_modules_cmds[idx].model_name != NULL) {
if (!strcmp(crypto_modules_cmds[idx].model_name, model))
return idx;
idx++;
}
return -1;
}
/*
* exported API
*/
int crypto_encrypt_and_sign (
knet_handle_t knet_h,
const unsigned char *buf_in,
const ssize_t buf_in_len,
unsigned char *buf_out,
ssize_t *buf_out_len)
{
return crypto_modules_cmds[knet_h->crypto_instance->model].ops->crypt(knet_h, buf_in, buf_in_len, buf_out, buf_out_len);
}
int crypto_encrypt_and_signv (
knet_handle_t knet_h,
const struct iovec *iov_in,
int iovcnt_in,
unsigned char *buf_out,
ssize_t *buf_out_len)
{
return crypto_modules_cmds[knet_h->crypto_instance->model].ops->cryptv(knet_h, iov_in, iovcnt_in, buf_out, buf_out_len);
}
int crypto_authenticate_and_decrypt (
knet_handle_t knet_h,
const unsigned char *buf_in,
const ssize_t buf_in_len,
unsigned char *buf_out,
ssize_t *buf_out_len)
{
return crypto_modules_cmds[knet_h->crypto_instance->model].ops->decrypt(knet_h, buf_in, buf_in_len, buf_out, buf_out_len);
}
int crypto_init(
knet_handle_t knet_h,
struct knet_handle_crypto_cfg *knet_handle_crypto_cfg)
{
int savederrno = 0;
int model = 0;
model = crypto_get_model(knet_handle_crypto_cfg->crypto_model);
if (model < 0) {
log_err(knet_h, KNET_SUB_CRYPTO, "model %s not supported", knet_handle_crypto_cfg->crypto_model);
return -1;
}
if (crypto_modules_cmds[model].built_in == 0) {
log_err(knet_h, KNET_SUB_CRYPTO, "this version of libknet was built without %s support. Please contact your vendor or fix the build.", knet_handle_crypto_cfg->crypto_model);
return -1;
}
savederrno = pthread_rwlock_wrlock(&shlib_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_CRYPTO, "Unable to get write lock: %s",
strerror(savederrno));
return -1;
}
if (!crypto_modules_cmds[model].loaded) {
crypto_modules_cmds[model].ops = load_module (knet_h, "crypto", crypto_modules_cmds[model].model_name);
if (!crypto_modules_cmds[model].ops) {
savederrno = errno;
log_err(knet_h, KNET_SUB_CRYPTO, "Unable to load %s lib", crypto_modules_cmds[model].model_name);
goto out_err;
}
if (crypto_modules_cmds[model].ops->abi_ver != KNET_CRYPTO_MODEL_ABI) {
log_err(knet_h, KNET_SUB_CRYPTO,
"ABI mismatch loading module %s. knet ver: %d, module ver: %d",
crypto_modules_cmds[model].model_name, KNET_CRYPTO_MODEL_ABI,
crypto_modules_cmds[model].ops->abi_ver);
savederrno = EINVAL;
goto out_err;
}
crypto_modules_cmds[model].loaded = 1;
}
log_debug(knet_h, KNET_SUB_CRYPTO,
"Initizializing crypto module [%s/%s/%s]",
knet_handle_crypto_cfg->crypto_model,
knet_handle_crypto_cfg->crypto_cipher_type,
knet_handle_crypto_cfg->crypto_hash_type);
knet_h->crypto_instance = malloc(sizeof(struct crypto_instance));
if (!knet_h->crypto_instance) {
log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto instance");
- pthread_rwlock_unlock(&shlib_rwlock);
savederrno = ENOMEM;
goto out_err;
}
/*
* if crypto_modules_cmds.ops->init fails, it is expected that
* it will clean everything by itself.
* crypto_modules_cmds.ops->fini is not invoked on error.
*/
knet_h->crypto_instance->model = model;
if (crypto_modules_cmds[knet_h->crypto_instance->model].ops->init(knet_h, knet_handle_crypto_cfg)) {
savederrno = errno;
goto out_err;
}
log_debug(knet_h, KNET_SUB_CRYPTO, "security network overhead: %zu", knet_h->sec_header_size);
pthread_rwlock_unlock(&shlib_rwlock);
return 0;
out_err:
if (knet_h->crypto_instance) {
free(knet_h->crypto_instance);
knet_h->crypto_instance = NULL;
}
pthread_rwlock_unlock(&shlib_rwlock);
errno = savederrno;
return -1;
}
void crypto_fini(
knet_handle_t knet_h)
{
int savederrno = 0;
int model = 0;
savederrno = pthread_rwlock_wrlock(&shlib_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_CRYPTO, "Unable to get write lock: %s",
strerror(savederrno));
return;
}
if (knet_h->crypto_instance) {
model = knet_h->crypto_instance->model;
if (crypto_modules_cmds[model].ops->fini != NULL) {
crypto_modules_cmds[model].ops->fini(knet_h);
}
free(knet_h->crypto_instance);
knet_h->crypto_instance = NULL;
}
pthread_rwlock_unlock(&shlib_rwlock);
return;
}
int knet_get_crypto_list(struct knet_crypto_info *crypto_list, size_t *crypto_list_entries)
{
int err = 0;
int idx = 0;
int outidx = 0;
if (!crypto_list_entries) {
errno = EINVAL;
return -1;
}
while (crypto_modules_cmds[idx].model_name != NULL) {
if (crypto_modules_cmds[idx].built_in) {
if (crypto_list) {
crypto_list[outidx].name = crypto_modules_cmds[idx].model_name;
}
outidx++;
}
idx++;
}
*crypto_list_entries = outidx;
if (!err)
errno = 0;
return err;
}
diff --git a/libknet/crypto_nss.c b/libknet/crypto_nss.c
index a17ff628..640b560f 100644
--- a/libknet/crypto_nss.c
+++ b/libknet/crypto_nss.c
@@ -1,842 +1,842 @@
/*
* Copyright (C) 2012-2019 Red Hat, Inc. All rights reserved.
*
* Author: Fabio M. Di Nitto <fabbione@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#define KNET_MODULE
#include "config.h"
#include <errno.h>
#include <stdlib.h>
#include <nss.h>
#include <nspr.h>
#include <pk11pub.h>
#include <pkcs11.h>
#include <prerror.h>
#include <blapit.h>
#include <hasht.h>
#include <pthread.h>
#include <secerr.h>
#include <prinit.h>
#include "crypto_model.h"
#include "logging.h"
static int nss_db_is_init = 0;
static void nss_atexit_handler(void)
{
if (nss_db_is_init) {
NSS_Shutdown();
if (PR_Initialized()) {
PL_ArenaFinish();
PR_Cleanup();
}
}
return;
}
/*
* crypto definitions and conversion tables
*/
#define SALT_SIZE 16
/*
* This are defined in new NSS. For older one, we will define our own
*/
#ifndef AES_256_KEY_LENGTH
#define AES_256_KEY_LENGTH 32
#endif
#ifndef AES_192_KEY_LENGTH
#define AES_192_KEY_LENGTH 24
#endif
#ifndef AES_128_KEY_LENGTH
#define AES_128_KEY_LENGTH 16
#endif
enum nsscrypto_crypt_t {
CRYPTO_CIPHER_TYPE_NONE = 0,
CRYPTO_CIPHER_TYPE_AES256 = 1,
CRYPTO_CIPHER_TYPE_AES192 = 2,
CRYPTO_CIPHER_TYPE_AES128 = 3
};
CK_MECHANISM_TYPE cipher_to_nss[] = {
0, /* CRYPTO_CIPHER_TYPE_NONE */
CKM_AES_CBC_PAD, /* CRYPTO_CIPHER_TYPE_AES256 */
CKM_AES_CBC_PAD, /* CRYPTO_CIPHER_TYPE_AES192 */
CKM_AES_CBC_PAD /* CRYPTO_CIPHER_TYPE_AES128 */
};
size_t nsscipher_key_len[] = {
0, /* CRYPTO_CIPHER_TYPE_NONE */
AES_256_KEY_LENGTH, /* CRYPTO_CIPHER_TYPE_AES256 */
AES_192_KEY_LENGTH, /* CRYPTO_CIPHER_TYPE_AES192 */
AES_128_KEY_LENGTH /* CRYPTO_CIPHER_TYPE_AES128 */
};
size_t nsscypher_block_len[] = {
0, /* CRYPTO_CIPHER_TYPE_NONE */
AES_BLOCK_SIZE, /* CRYPTO_CIPHER_TYPE_AES256 */
AES_BLOCK_SIZE, /* CRYPTO_CIPHER_TYPE_AES192 */
AES_BLOCK_SIZE /* CRYPTO_CIPHER_TYPE_AES128 */
};
/*
* hash definitions and conversion tables
*/
enum nsscrypto_hash_t {
CRYPTO_HASH_TYPE_NONE = 0,
CRYPTO_HASH_TYPE_MD5 = 1,
CRYPTO_HASH_TYPE_SHA1 = 2,
CRYPTO_HASH_TYPE_SHA256 = 3,
CRYPTO_HASH_TYPE_SHA384 = 4,
CRYPTO_HASH_TYPE_SHA512 = 5
};
CK_MECHANISM_TYPE hash_to_nss[] = {
0, /* CRYPTO_HASH_TYPE_NONE */
CKM_MD5_HMAC, /* CRYPTO_HASH_TYPE_MD5 */
CKM_SHA_1_HMAC, /* CRYPTO_HASH_TYPE_SHA1 */
CKM_SHA256_HMAC, /* CRYPTO_HASH_TYPE_SHA256 */
CKM_SHA384_HMAC, /* CRYPTO_HASH_TYPE_SHA384 */
CKM_SHA512_HMAC /* CRYPTO_HASH_TYPE_SHA512 */
};
size_t nsshash_len[] = {
0, /* CRYPTO_HASH_TYPE_NONE */
MD5_LENGTH, /* CRYPTO_HASH_TYPE_MD5 */
SHA1_LENGTH, /* CRYPTO_HASH_TYPE_SHA1 */
SHA256_LENGTH, /* CRYPTO_HASH_TYPE_SHA256 */
SHA384_LENGTH, /* CRYPTO_HASH_TYPE_SHA384 */
SHA512_LENGTH /* CRYPTO_HASH_TYPE_SHA512 */
};
enum sym_key_type {
SYM_KEY_TYPE_CRYPT,
SYM_KEY_TYPE_HASH
};
struct nsscrypto_instance {
PK11SymKey *nss_sym_key;
PK11SymKey *nss_sym_key_sign;
unsigned char *private_key;
unsigned int private_key_len;
int crypto_cipher_type;
int crypto_hash_type;
};
/*
* crypt/decrypt functions
*/
static int nssstring_to_crypto_cipher_type(const char* crypto_cipher_type)
{
if (strcmp(crypto_cipher_type, "none") == 0) {
return CRYPTO_CIPHER_TYPE_NONE;
} else if (strcmp(crypto_cipher_type, "aes256") == 0) {
return CRYPTO_CIPHER_TYPE_AES256;
} else if (strcmp(crypto_cipher_type, "aes192") == 0) {
return CRYPTO_CIPHER_TYPE_AES192;
} else if (strcmp(crypto_cipher_type, "aes128") == 0) {
return CRYPTO_CIPHER_TYPE_AES128;
}
return -1;
}
static PK11SymKey *nssimport_symmetric_key(knet_handle_t knet_h, enum sym_key_type key_type)
{
struct nsscrypto_instance *instance = knet_h->crypto_instance->model_instance;
SECItem key_item;
PK11SlotInfo *slot;
PK11SymKey *res_key;
CK_MECHANISM_TYPE cipher;
CK_ATTRIBUTE_TYPE operation;
CK_MECHANISM_TYPE wrap_mechanism;
int wrap_key_len;
PK11SymKey *wrap_key;
PK11Context *wrap_key_crypt_context;
SECItem tmp_sec_item;
SECItem wrapped_key;
int wrapped_key_len;
int wrap_key_block_size;
unsigned char wrapped_key_data[KNET_MAX_KEY_LEN];
unsigned char pad_key_data[KNET_MAX_KEY_LEN];
memset(&key_item, 0, sizeof(key_item));
slot = NULL;
wrap_key = NULL;
res_key = NULL;
wrap_key_crypt_context = NULL;
if (instance->private_key_len > sizeof(pad_key_data)) {
log_err(knet_h, KNET_SUB_NSSCRYPTO, "Import symmetric key failed. Private key is too long");
goto exit_res_key;
}
memset(pad_key_data, 0, sizeof(pad_key_data));
memcpy(pad_key_data, instance->private_key, instance->private_key_len);
key_item.type = siBuffer;
key_item.data = pad_key_data;
switch (key_type) {
case SYM_KEY_TYPE_CRYPT:
key_item.len = nsscipher_key_len[instance->crypto_cipher_type];
cipher = cipher_to_nss[instance->crypto_cipher_type];
operation = CKA_ENCRYPT|CKA_DECRYPT;
break;
case SYM_KEY_TYPE_HASH:
key_item.len = instance->private_key_len;
cipher = hash_to_nss[instance->crypto_hash_type];
operation = CKA_SIGN;
break;
default:
log_err(knet_h, KNET_SUB_NSSCRYPTO, "Import symmetric key failed. Unknown keyimport request");
goto exit_res_key;
break;
}
slot = PK11_GetBestSlot(cipher, NULL);
if (slot == NULL) {
log_err(knet_h, KNET_SUB_NSSCRYPTO, "Unable to find security slot (%d): %s",
PR_GetError(), PR_ErrorToString(PR_GetError(), PR_LANGUAGE_I_DEFAULT));
goto exit_res_key;
}
/*
* Without FIPS it would be possible to just use
* res_key = PK11_ImportSymKey(slot, cipher, PK11_OriginUnwrap, operation, &key_item, NULL);
* with FIPS NSS Level 2 certification has to be "workarounded" (so it becomes Level 1) by using
* following method:
* 1. Generate wrap key
* 2. Encrypt authkey with wrap key
* 3. Unwrap encrypted authkey using wrap key
*/
/*
* Generate wrapping key
*/
wrap_mechanism = PK11_GetBestWrapMechanism(slot);
wrap_key_len = PK11_GetBestKeyLength(slot, wrap_mechanism);
wrap_key = PK11_KeyGen(slot, wrap_mechanism, NULL, wrap_key_len, NULL);
if (wrap_key == NULL) {
log_err(knet_h, KNET_SUB_NSSCRYPTO, "Unable to generate wrapping key (%d): %s",
PR_GetError(), PR_ErrorToString(PR_GetError(), PR_LANGUAGE_I_DEFAULT));
goto exit_res_key;
}
/*
* Encrypt authkey with wrapping key
*/
/*
* Key must be padded to a block size
*/
wrap_key_block_size = PK11_GetBlockSize(wrap_mechanism, 0);
if (wrap_key_block_size < 0) {
log_err(knet_h, KNET_SUB_NSSCRYPTO, "Unable to get wrap key block size (%d): %s",
PR_GetError(), PR_ErrorToString(PR_GetError(), PR_LANGUAGE_I_DEFAULT));
goto exit_res_key;
}
if (sizeof(pad_key_data) % wrap_key_block_size != 0) {
log_err(knet_h, KNET_SUB_NSSCRYPTO, "Padded key buffer size (%zu) is not dividable by "
"wrap key block size (%u).", sizeof(pad_key_data), (unsigned int)wrap_key_block_size);
goto exit_res_key;
}
/*
* Initialization of IV is not needed because PK11_GetBestWrapMechanism should return ECB mode
*/
memset(&tmp_sec_item, 0, sizeof(tmp_sec_item));
wrap_key_crypt_context = PK11_CreateContextBySymKey(wrap_mechanism, CKA_ENCRYPT,
wrap_key, &tmp_sec_item);
if (wrap_key_crypt_context == NULL) {
log_err(knet_h, KNET_SUB_NSSCRYPTO, "Unable to create encrypt context (%d): %s",
PR_GetError(), PR_ErrorToString(PR_GetError(), PR_LANGUAGE_I_DEFAULT));
goto exit_res_key;
}
wrapped_key_len = (int)sizeof(wrapped_key_data);
if (PK11_CipherOp(wrap_key_crypt_context, wrapped_key_data, &wrapped_key_len,
sizeof(wrapped_key_data), key_item.data, sizeof(pad_key_data)) != SECSuccess) {
log_err(knet_h, KNET_SUB_NSSCRYPTO, "Unable to encrypt authkey (%d): %s",
PR_GetError(), PR_ErrorToString(PR_GetError(), PR_LANGUAGE_I_DEFAULT));
goto exit_res_key;
}
if (PK11_Finalize(wrap_key_crypt_context) != SECSuccess) {
log_err(knet_h, KNET_SUB_NSSCRYPTO, "Unable to finalize encryption of authkey (%d): %s",
PR_GetError(), PR_ErrorToString(PR_GetError(), PR_LANGUAGE_I_DEFAULT));
goto exit_res_key;
}
/*
* Finally unwrap sym key
*/
memset(&tmp_sec_item, 0, sizeof(tmp_sec_item));
wrapped_key.data = wrapped_key_data;
wrapped_key.len = wrapped_key_len;
res_key = PK11_UnwrapSymKey(wrap_key, wrap_mechanism, &tmp_sec_item, &wrapped_key,
cipher, operation, key_item.len);
if (res_key == NULL) {
log_err(knet_h, KNET_SUB_NSSCRYPTO, "Failure to import key into NSS (%d): %s",
PR_GetError(), PR_ErrorToString(PR_GetError(), PR_LANGUAGE_I_DEFAULT));
if (PR_GetError() == SEC_ERROR_BAD_DATA) {
/*
* Maximum key length for FIPS enabled softtoken is limited to
* MAX_KEY_LEN (pkcs11i.h - 256) and checked in NSC_UnwrapKey. Returned
* error is CKR_TEMPLATE_INCONSISTENT which is mapped to SEC_ERROR_BAD_DATA.
*/
log_err(knet_h, KNET_SUB_NSSCRYPTO, "Secret key is probably too long. "
"Try reduce it to 256 bytes");
}
goto exit_res_key;
}
exit_res_key:
if (wrap_key_crypt_context != NULL) {
PK11_DestroyContext(wrap_key_crypt_context, PR_TRUE);
}
if (wrap_key != NULL) {
PK11_FreeSymKey(wrap_key);
}
if (slot != NULL) {
PK11_FreeSlot(slot);
}
return (res_key);
}
static int init_nss_crypto(knet_handle_t knet_h)
{
struct nsscrypto_instance *instance = knet_h->crypto_instance->model_instance;
if (!cipher_to_nss[instance->crypto_cipher_type]) {
return 0;
}
instance->nss_sym_key = nssimport_symmetric_key(knet_h, SYM_KEY_TYPE_CRYPT);
if (instance->nss_sym_key == NULL) {
errno = ENXIO; /* NSS reported error */
return -1;
}
return 0;
}
static int encrypt_nss(
knet_handle_t knet_h,
const struct iovec *iov,
int iovcnt,
unsigned char *buf_out,
ssize_t *buf_out_len)
{
struct nsscrypto_instance *instance = knet_h->crypto_instance->model_instance;
PK11Context* crypt_context = NULL;
SECItem crypt_param;
SECItem *nss_sec_param = NULL;
int tmp_outlen = 0, tmp1_outlen = 0;
unsigned int tmp2_outlen = 0;
unsigned char *salt = buf_out;
unsigned char *data = buf_out + SALT_SIZE;
int err = -1;
int i;
if (PK11_GenerateRandom(salt, SALT_SIZE) != SECSuccess) {
log_err(knet_h, KNET_SUB_NSSCRYPTO, "Failure to generate a random number (err %d): %s",
PR_GetError(), PR_ErrorToString(PR_GetError(), PR_LANGUAGE_I_DEFAULT));
goto out;
}
crypt_param.type = siBuffer;
crypt_param.data = salt;
crypt_param.len = SALT_SIZE;
nss_sec_param = PK11_ParamFromIV(cipher_to_nss[instance->crypto_cipher_type],
&crypt_param);
if (nss_sec_param == NULL) {
log_err(knet_h, KNET_SUB_NSSCRYPTO, "Failure to set up PKCS11 param (err %d): %s",
PR_GetError(), PR_ErrorToString(PR_GetError(), PR_LANGUAGE_I_DEFAULT));
goto out;
}
/*
* Create cipher context for encryption
*/
crypt_context = PK11_CreateContextBySymKey(cipher_to_nss[instance->crypto_cipher_type],
CKA_ENCRYPT,
instance->nss_sym_key,
nss_sec_param);
if (!crypt_context) {
log_err(knet_h, KNET_SUB_NSSCRYPTO, "PK11_CreateContext failed (encrypt) crypt_type=%d (err %d): %s",
(int)cipher_to_nss[instance->crypto_cipher_type],
PR_GetError(), PR_ErrorToString(PR_GetError(), PR_LANGUAGE_I_DEFAULT));
goto out;
}
for (i=0; i<iovcnt; i++) {
if (PK11_CipherOp(crypt_context, data,
&tmp_outlen,
KNET_DATABUFSIZE_CRYPT,
(unsigned char *)iov[i].iov_base,
iov[i].iov_len) != SECSuccess) {
log_err(knet_h, KNET_SUB_NSSCRYPTO, "PK11_CipherOp failed (encrypt) crypt_type=%d (err %d): %s",
(int)cipher_to_nss[instance->crypto_cipher_type],
PR_GetError(), PR_ErrorToString(PR_GetError(), PR_LANGUAGE_I_DEFAULT));
goto out;
}
tmp1_outlen = tmp1_outlen + tmp_outlen;
}
if (PK11_DigestFinal(crypt_context, data + tmp1_outlen,
&tmp2_outlen, KNET_DATABUFSIZE_CRYPT - tmp1_outlen) != SECSuccess) {
log_err(knet_h, KNET_SUB_NSSCRYPTO, "PK11_DigestFinal failed (encrypt) crypt_type=%d (err %d): %s",
(int)cipher_to_nss[instance->crypto_cipher_type],
PR_GetError(), PR_ErrorToString(PR_GetError(), PR_LANGUAGE_I_DEFAULT));
goto out;
}
*buf_out_len = tmp1_outlen + tmp2_outlen + SALT_SIZE;
err = 0;
out:
if (crypt_context) {
PK11_DestroyContext(crypt_context, PR_TRUE);
}
if (nss_sec_param) {
SECITEM_FreeItem(nss_sec_param, PR_TRUE);
}
return err;
}
static int decrypt_nss (
knet_handle_t knet_h,
const unsigned char *buf_in,
const ssize_t buf_in_len,
unsigned char *buf_out,
ssize_t *buf_out_len)
{
struct nsscrypto_instance *instance = knet_h->crypto_instance->model_instance;
PK11Context* decrypt_context = NULL;
SECItem decrypt_param;
int tmp1_outlen = 0;
unsigned int tmp2_outlen = 0;
unsigned char *salt = (unsigned char *)buf_in;
unsigned char *data = salt + SALT_SIZE;
int datalen = buf_in_len - SALT_SIZE;
int err = -1;
if (datalen <= 0) {
log_err(knet_h, KNET_SUB_NSSCRYPTO, "Packet is too short");
goto out;
}
/* Create cipher context for decryption */
decrypt_param.type = siBuffer;
decrypt_param.data = salt;
decrypt_param.len = SALT_SIZE;
decrypt_context = PK11_CreateContextBySymKey(cipher_to_nss[instance->crypto_cipher_type],
CKA_DECRYPT,
instance->nss_sym_key, &decrypt_param);
if (!decrypt_context) {
log_err(knet_h, KNET_SUB_NSSCRYPTO, "PK11_CreateContext (decrypt) failed (err %d): %s",
PR_GetError(), PR_ErrorToString(PR_GetError(), PR_LANGUAGE_I_DEFAULT));
goto out;
}
if (PK11_CipherOp(decrypt_context, buf_out, &tmp1_outlen,
KNET_DATABUFSIZE_CRYPT, data, datalen) != SECSuccess) {
log_err(knet_h, KNET_SUB_NSSCRYPTO, "PK11_CipherOp (decrypt) failed (err %d): %s",
PR_GetError(), PR_ErrorToString(PR_GetError(), PR_LANGUAGE_I_DEFAULT));
goto out;
}
if (PK11_DigestFinal(decrypt_context, buf_out + tmp1_outlen, &tmp2_outlen,
KNET_DATABUFSIZE_CRYPT - tmp1_outlen) != SECSuccess) {
log_err(knet_h, KNET_SUB_NSSCRYPTO, "PK11_DigestFinal (decrypt) failed (err %d): %s",
PR_GetError(), PR_ErrorToString(PR_GetError(), PR_LANGUAGE_I_DEFAULT));
goto out;
}
*buf_out_len = tmp1_outlen + tmp2_outlen;
err = 0;
out:
if (decrypt_context) {
PK11_DestroyContext(decrypt_context, PR_TRUE);
}
return err;
}
/*
* hash/hmac/digest functions
*/
static int nssstring_to_crypto_hash_type(const char* crypto_hash_type)
{
if (strcmp(crypto_hash_type, "none") == 0) {
return CRYPTO_HASH_TYPE_NONE;
} else if (strcmp(crypto_hash_type, "md5") == 0) {
return CRYPTO_HASH_TYPE_MD5;
} else if (strcmp(crypto_hash_type, "sha1") == 0) {
return CRYPTO_HASH_TYPE_SHA1;
} else if (strcmp(crypto_hash_type, "sha256") == 0) {
return CRYPTO_HASH_TYPE_SHA256;
} else if (strcmp(crypto_hash_type, "sha384") == 0) {
return CRYPTO_HASH_TYPE_SHA384;
} else if (strcmp(crypto_hash_type, "sha512") == 0) {
return CRYPTO_HASH_TYPE_SHA512;
}
return -1;
}
static int init_nss_hash(knet_handle_t knet_h)
{
struct nsscrypto_instance *instance = knet_h->crypto_instance->model_instance;
if (!hash_to_nss[instance->crypto_hash_type]) {
return 0;
}
instance->nss_sym_key_sign = nssimport_symmetric_key(knet_h, SYM_KEY_TYPE_HASH);
if (instance->nss_sym_key_sign == NULL) {
errno = ENXIO; /* NSS reported error */
return -1;
}
return 0;
}
static int calculate_nss_hash(
knet_handle_t knet_h,
const unsigned char *buf,
const size_t buf_len,
unsigned char *hash)
{
struct nsscrypto_instance *instance = knet_h->crypto_instance->model_instance;
PK11Context* hash_context = NULL;
SECItem hash_param;
unsigned int hash_tmp_outlen = 0;
int err = -1;
/* Now do the digest */
hash_param.type = siBuffer;
hash_param.data = 0;
hash_param.len = 0;
hash_context = PK11_CreateContextBySymKey(hash_to_nss[instance->crypto_hash_type],
CKA_SIGN,
instance->nss_sym_key_sign,
&hash_param);
if (!hash_context) {
log_err(knet_h, KNET_SUB_NSSCRYPTO, "PK11_CreateContext failed (hash) hash_type=%d (err %d): %s",
(int)hash_to_nss[instance->crypto_hash_type],
PR_GetError(), PR_ErrorToString(PR_GetError(), PR_LANGUAGE_I_DEFAULT));
goto out;
}
if (PK11_DigestBegin(hash_context) != SECSuccess) {
log_err(knet_h, KNET_SUB_NSSCRYPTO, "PK11_DigestBegin failed (hash) hash_type=%d (err %d): %s",
(int)hash_to_nss[instance->crypto_hash_type],
PR_GetError(), PR_ErrorToString(PR_GetError(), PR_LANGUAGE_I_DEFAULT));
goto out;
}
if (PK11_DigestOp(hash_context, buf, buf_len) != SECSuccess) {
log_err(knet_h, KNET_SUB_NSSCRYPTO, "PK11_DigestOp failed (hash) hash_type=%d (err %d): %s",
(int)hash_to_nss[instance->crypto_hash_type],
PR_GetError(), PR_ErrorToString(PR_GetError(), PR_LANGUAGE_I_DEFAULT));
goto out;
}
if (PK11_DigestFinal(hash_context, hash,
&hash_tmp_outlen, nsshash_len[instance->crypto_hash_type]) != SECSuccess) {
log_err(knet_h, KNET_SUB_NSSCRYPTO, "PK11_DigestFinale failed (hash) hash_type=%d (err %d): %s",
(int)hash_to_nss[instance->crypto_hash_type],
PR_GetError(), PR_ErrorToString(PR_GetError(), PR_LANGUAGE_I_DEFAULT));
goto out;
}
err = 0;
out:
if (hash_context) {
PK11_DestroyContext(hash_context, PR_TRUE);
}
return err;
}
/*
* global/glue nss functions
*/
static int init_nss(knet_handle_t knet_h)
{
static int at_exit_registered = 0;
if (!at_exit_registered) {
if (atexit(nss_atexit_handler)) {
log_err(knet_h, KNET_SUB_NSSCRYPTO, "Unable to register NSS atexit handler");
errno = EAGAIN;
return -1;
}
at_exit_registered = 1;
}
if (!nss_db_is_init) {
if (NSS_NoDB_Init(NULL) != SECSuccess) {
log_err(knet_h, KNET_SUB_NSSCRYPTO, "NSS DB initialization failed (err %d): %s",
PR_GetError(), PR_ErrorToString(PR_GetError(), PR_LANGUAGE_I_DEFAULT));
errno = EAGAIN;
return -1;
}
nss_db_is_init = 1;
}
if (init_nss_crypto(knet_h) < 0) {
return -1;
}
if (init_nss_hash(knet_h) < 0) {
return -1;
}
return 0;
}
/*
* exported API
*/
static int nsscrypto_encrypt_and_signv (
knet_handle_t knet_h,
const struct iovec *iov_in,
int iovcnt_in,
unsigned char *buf_out,
ssize_t *buf_out_len)
{
struct nsscrypto_instance *instance = knet_h->crypto_instance->model_instance;
int i;
if (cipher_to_nss[instance->crypto_cipher_type]) {
if (encrypt_nss(knet_h, iov_in, iovcnt_in, buf_out, buf_out_len) < 0) {
return -1;
}
} else {
*buf_out_len = 0;
for (i=0; i<iovcnt_in; i++) {
memmove(buf_out + *buf_out_len, iov_in[i].iov_base, iov_in[i].iov_len);
*buf_out_len = *buf_out_len + iov_in[i].iov_len;
}
}
if (hash_to_nss[instance->crypto_hash_type]) {
if (calculate_nss_hash(knet_h, buf_out, *buf_out_len, buf_out + *buf_out_len) < 0) {
return -1;
}
*buf_out_len = *buf_out_len + nsshash_len[instance->crypto_hash_type];
}
return 0;
}
static int nsscrypto_encrypt_and_sign (
knet_handle_t knet_h,
const unsigned char *buf_in,
const ssize_t buf_in_len,
unsigned char *buf_out,
ssize_t *buf_out_len)
{
struct iovec iov_in;
memset(&iov_in, 0, sizeof(iov_in));
iov_in.iov_base = (unsigned char *)buf_in;
iov_in.iov_len = buf_in_len;
return nsscrypto_encrypt_and_signv(knet_h, &iov_in, 1, buf_out, buf_out_len);
}
static int nsscrypto_authenticate_and_decrypt (
knet_handle_t knet_h,
const unsigned char *buf_in,
const ssize_t buf_in_len,
unsigned char *buf_out,
ssize_t *buf_out_len)
{
struct nsscrypto_instance *instance = knet_h->crypto_instance->model_instance;
ssize_t temp_len = buf_in_len;
if (hash_to_nss[instance->crypto_hash_type]) {
unsigned char tmp_hash[nsshash_len[instance->crypto_hash_type]];
ssize_t temp_buf_len = buf_in_len - nsshash_len[instance->crypto_hash_type];
if ((temp_buf_len <= 0) || (temp_buf_len > KNET_MAX_PACKET_SIZE)) {
log_err(knet_h, KNET_SUB_NSSCRYPTO, "Incorrect packet size.");
return -1;
}
if (calculate_nss_hash(knet_h, buf_in, temp_buf_len, tmp_hash) < 0) {
return -1;
}
if (memcmp(tmp_hash, buf_in + temp_buf_len, nsshash_len[instance->crypto_hash_type]) != 0) {
log_err(knet_h, KNET_SUB_NSSCRYPTO, "Digest does not match");
return -1;
}
temp_len = temp_len - nsshash_len[instance->crypto_hash_type];
*buf_out_len = temp_len;
}
if (cipher_to_nss[instance->crypto_cipher_type]) {
if (decrypt_nss(knet_h, buf_in, temp_len, buf_out, buf_out_len) < 0) {
return -1;
}
} else {
memmove(buf_out, buf_in, temp_len);
*buf_out_len = temp_len;
}
return 0;
}
static void nsscrypto_fini(
knet_handle_t knet_h)
{
struct nsscrypto_instance *nsscrypto_instance = knet_h->crypto_instance->model_instance;
if (nsscrypto_instance) {
if (nsscrypto_instance->nss_sym_key) {
PK11_FreeSymKey(nsscrypto_instance->nss_sym_key);
nsscrypto_instance->nss_sym_key = NULL;
}
if (nsscrypto_instance->nss_sym_key_sign) {
PK11_FreeSymKey(nsscrypto_instance->nss_sym_key_sign);
nsscrypto_instance->nss_sym_key_sign = NULL;
}
free(nsscrypto_instance);
knet_h->crypto_instance->model_instance = NULL;
knet_h->sec_header_size = 0;
}
return;
}
static int nsscrypto_init(
knet_handle_t knet_h,
struct knet_handle_crypto_cfg *knet_handle_crypto_cfg)
{
struct nsscrypto_instance *nsscrypto_instance = NULL;
int savederrno;
log_debug(knet_h, KNET_SUB_NSSCRYPTO,
"Initizializing nss crypto module [%s/%s]",
knet_handle_crypto_cfg->crypto_cipher_type,
knet_handle_crypto_cfg->crypto_hash_type);
knet_h->crypto_instance->model_instance = malloc(sizeof(struct nsscrypto_instance));
if (!knet_h->crypto_instance->model_instance) {
log_err(knet_h, KNET_SUB_NSSCRYPTO, "Unable to allocate memory for nss model instance");
- savederrno = ENOMEM;
+ errno = ENOMEM;
return -1;
}
nsscrypto_instance = knet_h->crypto_instance->model_instance;
memset(nsscrypto_instance, 0, sizeof(struct nsscrypto_instance));
nsscrypto_instance->crypto_cipher_type = nssstring_to_crypto_cipher_type(knet_handle_crypto_cfg->crypto_cipher_type);
if (nsscrypto_instance->crypto_cipher_type < 0) {
log_err(knet_h, KNET_SUB_NSSCRYPTO, "unknown crypto cipher type requested");
savederrno = ENXIO;
goto out_err;
}
nsscrypto_instance->crypto_hash_type = nssstring_to_crypto_hash_type(knet_handle_crypto_cfg->crypto_hash_type);
if (nsscrypto_instance->crypto_hash_type < 0) {
log_err(knet_h, KNET_SUB_NSSCRYPTO, "unknown crypto hash type requested");
savederrno = ENXIO;
goto out_err;
}
if ((nsscrypto_instance->crypto_cipher_type > 0) &&
(nsscrypto_instance->crypto_hash_type == 0)) {
log_err(knet_h, KNET_SUB_NSSCRYPTO, "crypto communication requires hash specified");
savederrno = EINVAL;
goto out_err;
}
nsscrypto_instance->private_key = knet_handle_crypto_cfg->private_key;
nsscrypto_instance->private_key_len = knet_handle_crypto_cfg->private_key_len;
if (init_nss(knet_h) < 0) {
savederrno = errno;
goto out_err;
}
knet_h->sec_header_size = 0;
if (nsscrypto_instance->crypto_hash_type > 0) {
knet_h->sec_header_size += nsshash_len[nsscrypto_instance->crypto_hash_type];
knet_h->sec_hash_size = nsshash_len[nsscrypto_instance->crypto_hash_type];
}
if (nsscrypto_instance->crypto_cipher_type > 0) {
int block_size;
if (nsscypher_block_len[nsscrypto_instance->crypto_cipher_type]) {
block_size = nsscypher_block_len[nsscrypto_instance->crypto_cipher_type];
} else {
block_size = PK11_GetBlockSize(nsscrypto_instance->crypto_cipher_type, NULL);
if (block_size < 0) {
savederrno = ENXIO;
goto out_err;
}
}
knet_h->sec_header_size += (block_size * 2);
knet_h->sec_header_size += SALT_SIZE;
knet_h->sec_salt_size = SALT_SIZE;
knet_h->sec_block_size = block_size;
}
return 0;
out_err:
nsscrypto_fini(knet_h);
errno = savederrno;
return -1;
}
crypto_ops_t crypto_model = {
KNET_CRYPTO_MODEL_ABI,
nsscrypto_init,
nsscrypto_fini,
nsscrypto_encrypt_and_sign,
nsscrypto_encrypt_and_signv,
nsscrypto_authenticate_and_decrypt
};
diff --git a/libknet/handle.c b/libknet/handle.c
index 268d6106..fd26beab 100644
--- a/libknet/handle.c
+++ b/libknet/handle.c
@@ -1,1652 +1,1651 @@
/*
* Copyright (C) 2010-2019 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <pthread.h>
#include <sys/uio.h>
#include <math.h>
#include <sys/time.h>
#include <sys/resource.h>
#include "internals.h"
#include "crypto.h"
#include "links.h"
#include "compress.h"
#include "compat.h"
#include "common.h"
#include "threads_common.h"
#include "threads_heartbeat.h"
#include "threads_pmtud.h"
#include "threads_dsthandler.h"
#include "threads_rx.h"
#include "threads_tx.h"
#include "transports.h"
#include "transport_common.h"
#include "logging.h"
static pthread_mutex_t handle_config_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_rwlock_t shlib_rwlock;
static uint8_t shlib_wrlock_init = 0;
static uint32_t knet_ref = 0;
static int _init_shlib_tracker(knet_handle_t knet_h)
{
int savederrno = 0;
if (!shlib_wrlock_init) {
savederrno = pthread_rwlock_init(&shlib_rwlock, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize shared lib rwlock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
shlib_wrlock_init = 1;
}
return 0;
}
static void _fini_shlib_tracker(void)
{
if (knet_ref == 0) {
pthread_rwlock_destroy(&shlib_rwlock);
shlib_wrlock_init = 0;
}
return;
}
static int _init_locks(knet_handle_t knet_h)
{
int savederrno = 0;
savederrno = pthread_rwlock_init(&knet_h->global_rwlock, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize list rwlock: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_mutex_init(&knet_h->threads_status_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize threads status mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_mutex_init(&knet_h->pmtud_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize pmtud mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_mutex_init(&knet_h->kmtu_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize kernel_mtu mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_cond_init(&knet_h->pmtud_cond, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize pmtud conditional mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_mutex_init(&knet_h->hb_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize hb_thread mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_mutex_init(&knet_h->tx_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize tx_thread mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_mutex_init(&knet_h->backoff_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize pong timeout backoff mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_mutex_init(&knet_h->tx_seq_num_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize tx_seq_num_mutex mutex: %s",
strerror(savederrno));
goto exit_fail;
}
return 0;
exit_fail:
errno = savederrno;
return -1;
}
static void _destroy_locks(knet_handle_t knet_h)
{
pthread_rwlock_destroy(&knet_h->global_rwlock);
pthread_mutex_destroy(&knet_h->pmtud_mutex);
pthread_mutex_destroy(&knet_h->kmtu_mutex);
pthread_cond_destroy(&knet_h->pmtud_cond);
pthread_mutex_destroy(&knet_h->hb_mutex);
pthread_mutex_destroy(&knet_h->tx_mutex);
pthread_mutex_destroy(&knet_h->backoff_mutex);
pthread_mutex_destroy(&knet_h->tx_seq_num_mutex);
pthread_mutex_destroy(&knet_h->threads_status_mutex);
}
static int _init_socks(knet_handle_t knet_h)
{
int savederrno = 0;
if (_init_socketpair(knet_h, knet_h->hostsockfd)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize internal hostsockpair: %s",
strerror(savederrno));
goto exit_fail;
}
if (_init_socketpair(knet_h, knet_h->dstsockfd)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize internal dstsockpair: %s",
strerror(savederrno));
goto exit_fail;
}
return 0;
exit_fail:
errno = savederrno;
return -1;
}
static void _close_socks(knet_handle_t knet_h)
{
_close_socketpair(knet_h, knet_h->dstsockfd);
_close_socketpair(knet_h, knet_h->hostsockfd);
}
static int _init_buffers(knet_handle_t knet_h)
{
int savederrno = 0;
int i;
size_t bufsize;
for (i = 0; i < PCKT_FRAG_MAX; i++) {
bufsize = ceil((float)KNET_MAX_PACKET_SIZE / (i + 1)) + KNET_HEADER_ALL_SIZE;
knet_h->send_to_links_buf[i] = malloc(bufsize);
if (!knet_h->send_to_links_buf[i]) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory datafd to link buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->send_to_links_buf[i], 0, bufsize);
}
for (i = 0; i < PCKT_RX_BUFS; i++) {
knet_h->recv_from_links_buf[i] = malloc(KNET_DATABUFSIZE);
if (!knet_h->recv_from_links_buf[i]) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for link to datafd buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->recv_from_links_buf[i], 0, KNET_DATABUFSIZE);
}
knet_h->recv_from_sock_buf = malloc(KNET_DATABUFSIZE);
if (!knet_h->recv_from_sock_buf) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for app to datafd buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->recv_from_sock_buf, 0, KNET_DATABUFSIZE);
knet_h->pingbuf = malloc(KNET_HEADER_PING_SIZE);
if (!knet_h->pingbuf) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for hearbeat buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->pingbuf, 0, KNET_HEADER_PING_SIZE);
knet_h->pmtudbuf = malloc(KNET_PMTUD_SIZE_V6);
if (!knet_h->pmtudbuf) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for pmtud buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->pmtudbuf, 0, KNET_PMTUD_SIZE_V6);
for (i = 0; i < PCKT_FRAG_MAX; i++) {
bufsize = ceil((float)KNET_MAX_PACKET_SIZE / (i + 1)) + KNET_HEADER_ALL_SIZE + KNET_DATABUFSIZE_CRYPT_PAD;
knet_h->send_to_links_buf_crypt[i] = malloc(bufsize);
if (!knet_h->send_to_links_buf_crypt[i]) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for crypto datafd to link buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->send_to_links_buf_crypt[i], 0, bufsize);
}
knet_h->recv_from_links_buf_decrypt = malloc(KNET_DATABUFSIZE_CRYPT);
if (!knet_h->recv_from_links_buf_decrypt) {
savederrno = errno;
log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto link to datafd buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->recv_from_links_buf_decrypt, 0, KNET_DATABUFSIZE_CRYPT);
knet_h->recv_from_links_buf_crypt = malloc(KNET_DATABUFSIZE_CRYPT);
if (!knet_h->recv_from_links_buf_crypt) {
savederrno = errno;
log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto link to datafd buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->recv_from_links_buf_crypt, 0, KNET_DATABUFSIZE_CRYPT);
knet_h->pingbuf_crypt = malloc(KNET_DATABUFSIZE_CRYPT);
if (!knet_h->pingbuf_crypt) {
savederrno = errno;
log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto hearbeat buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->pingbuf_crypt, 0, KNET_DATABUFSIZE_CRYPT);
knet_h->pmtudbuf_crypt = malloc(KNET_DATABUFSIZE_CRYPT);
if (!knet_h->pmtudbuf_crypt) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for crypto pmtud buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->pmtudbuf_crypt, 0, KNET_DATABUFSIZE_CRYPT);
knet_h->recv_from_links_buf_decompress = malloc(KNET_DATABUFSIZE_COMPRESS);
if (!knet_h->recv_from_links_buf_decompress) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for decompress buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->recv_from_links_buf_decompress, 0, KNET_DATABUFSIZE_COMPRESS);
knet_h->send_to_links_buf_compress = malloc(KNET_DATABUFSIZE_COMPRESS);
if (!knet_h->send_to_links_buf_compress) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for compress buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->send_to_links_buf_compress, 0, KNET_DATABUFSIZE_COMPRESS);
memset(knet_h->knet_transport_fd_tracker, 0, sizeof(knet_h->knet_transport_fd_tracker));
for (i = 0; i < KNET_MAX_FDS; i++) {
knet_h->knet_transport_fd_tracker[i].transport = KNET_MAX_TRANSPORTS;
}
return 0;
exit_fail:
errno = savederrno;
return -1;
}
static void _destroy_buffers(knet_handle_t knet_h)
{
int i;
for (i = 0; i < PCKT_FRAG_MAX; i++) {
free(knet_h->send_to_links_buf[i]);
free(knet_h->send_to_links_buf_crypt[i]);
}
for (i = 0; i < PCKT_RX_BUFS; i++) {
free(knet_h->recv_from_links_buf[i]);
}
free(knet_h->recv_from_links_buf_decompress);
free(knet_h->send_to_links_buf_compress);
free(knet_h->recv_from_sock_buf);
free(knet_h->recv_from_links_buf_decrypt);
free(knet_h->recv_from_links_buf_crypt);
free(knet_h->pingbuf);
free(knet_h->pingbuf_crypt);
free(knet_h->pmtudbuf);
free(knet_h->pmtudbuf_crypt);
}
static int _init_epolls(knet_handle_t knet_h)
{
struct epoll_event ev;
int savederrno = 0;
/*
* even if the kernel does dynamic allocation with epoll_ctl
* we need to reserve one extra for host to host communication
*/
knet_h->send_to_links_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS + 1);
if (knet_h->send_to_links_epollfd < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to create epoll datafd to link fd: %s",
strerror(savederrno));
goto exit_fail;
}
knet_h->recv_from_links_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS);
if (knet_h->recv_from_links_epollfd < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to create epoll link to datafd fd: %s",
strerror(savederrno));
goto exit_fail;
}
knet_h->dst_link_handler_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS);
if (knet_h->dst_link_handler_epollfd < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to create epoll dst cache fd: %s",
strerror(savederrno));
goto exit_fail;
}
if (_fdset_cloexec(knet_h->send_to_links_epollfd)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on datafd to link epoll fd: %s",
strerror(savederrno));
goto exit_fail;
}
if (_fdset_cloexec(knet_h->recv_from_links_epollfd)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on link to datafd epoll fd: %s",
strerror(savederrno));
goto exit_fail;
}
if (_fdset_cloexec(knet_h->dst_link_handler_epollfd)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on dst cache epoll fd: %s",
strerror(savederrno));
goto exit_fail;
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = knet_h->hostsockfd[0];
if (epoll_ctl(knet_h->send_to_links_epollfd,
EPOLL_CTL_ADD, knet_h->hostsockfd[0], &ev)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to add hostsockfd[0] to epoll pool: %s",
strerror(savederrno));
goto exit_fail;
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = knet_h->dstsockfd[0];
if (epoll_ctl(knet_h->dst_link_handler_epollfd,
EPOLL_CTL_ADD, knet_h->dstsockfd[0], &ev)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to add dstsockfd[0] to epoll pool: %s",
strerror(savederrno));
goto exit_fail;
}
return 0;
exit_fail:
errno = savederrno;
return -1;
}
static void _close_epolls(knet_handle_t knet_h)
{
struct epoll_event ev;
int i;
memset(&ev, 0, sizeof(struct epoll_event));
for (i = 0; i < KNET_DATAFD_MAX; i++) {
if (knet_h->sockfd[i].in_use) {
epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->sockfd[i].sockfd[knet_h->sockfd[i].is_created], &ev);
if (knet_h->sockfd[i].sockfd[knet_h->sockfd[i].is_created]) {
_close_socketpair(knet_h, knet_h->sockfd[i].sockfd);
}
}
}
epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->hostsockfd[0], &ev);
epoll_ctl(knet_h->dst_link_handler_epollfd, EPOLL_CTL_DEL, knet_h->dstsockfd[0], &ev);
close(knet_h->send_to_links_epollfd);
close(knet_h->recv_from_links_epollfd);
close(knet_h->dst_link_handler_epollfd);
}
static int _start_threads(knet_handle_t knet_h)
{
int savederrno = 0;
set_thread_status(knet_h, KNET_THREAD_PMTUD, KNET_THREAD_REGISTERED);
savederrno = pthread_create(&knet_h->pmtud_link_handler_thread, 0,
_handle_pmtud_link_thread, (void *) knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to start pmtud link thread: %s",
strerror(savederrno));
goto exit_fail;
}
set_thread_status(knet_h, KNET_THREAD_DST_LINK, KNET_THREAD_REGISTERED);
savederrno = pthread_create(&knet_h->dst_link_handler_thread, 0,
_handle_dst_link_handler_thread, (void *) knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to start dst cache thread: %s",
strerror(savederrno));
goto exit_fail;
}
set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_REGISTERED);
savederrno = pthread_create(&knet_h->send_to_links_thread, 0,
_handle_send_to_links_thread, (void *) knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to start datafd to link thread: %s",
strerror(savederrno));
goto exit_fail;
}
set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_REGISTERED);
savederrno = pthread_create(&knet_h->recv_from_links_thread, 0,
_handle_recv_from_links_thread, (void *) knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to start link to datafd thread: %s",
strerror(savederrno));
goto exit_fail;
}
set_thread_status(knet_h, KNET_THREAD_HB, KNET_THREAD_REGISTERED);
savederrno = pthread_create(&knet_h->heartbt_thread, 0,
_handle_heartbt_thread, (void *) knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to start heartbeat thread: %s",
strerror(savederrno));
goto exit_fail;
}
return 0;
exit_fail:
errno = savederrno;
return -1;
}
static void _stop_threads(knet_handle_t knet_h)
{
void *retval;
wait_all_threads_status(knet_h, KNET_THREAD_STOPPED);
if (knet_h->heartbt_thread) {
pthread_cancel(knet_h->heartbt_thread);
pthread_join(knet_h->heartbt_thread, &retval);
}
if (knet_h->send_to_links_thread) {
pthread_cancel(knet_h->send_to_links_thread);
pthread_join(knet_h->send_to_links_thread, &retval);
}
if (knet_h->recv_from_links_thread) {
pthread_cancel(knet_h->recv_from_links_thread);
pthread_join(knet_h->recv_from_links_thread, &retval);
}
if (knet_h->dst_link_handler_thread) {
pthread_cancel(knet_h->dst_link_handler_thread);
pthread_join(knet_h->dst_link_handler_thread, &retval);
}
if (knet_h->pmtud_link_handler_thread) {
pthread_cancel(knet_h->pmtud_link_handler_thread);
pthread_join(knet_h->pmtud_link_handler_thread, &retval);
}
}
knet_handle_t knet_handle_new_ex(knet_node_id_t host_id,
int log_fd,
uint8_t default_log_level,
uint64_t flags)
{
knet_handle_t knet_h;
int savederrno = 0;
struct rlimit cur;
if (getrlimit(RLIMIT_NOFILE, &cur) < 0) {
return NULL;
}
if ((log_fd < 0) || ((unsigned int)log_fd >= cur.rlim_max)) {
errno = EINVAL;
return NULL;
}
/*
* validate incoming request
*/
if ((log_fd) && (default_log_level > KNET_LOG_DEBUG)) {
errno = EINVAL;
return NULL;
}
if (flags > KNET_HANDLE_FLAG_PRIVILEGED * 2 - 1) {
errno = EINVAL;
return NULL;
}
/*
* allocate handle
*/
knet_h = malloc(sizeof(struct knet_handle));
if (!knet_h) {
errno = ENOMEM;
return NULL;
}
memset(knet_h, 0, sizeof(struct knet_handle));
/*
* setting up some handle data so that we can use logging
* also when initializing the library global locks
* and trackers
*/
knet_h->flags = flags;
/*
* copy config in place
*/
knet_h->host_id = host_id;
knet_h->logfd = log_fd;
if (knet_h->logfd > 0) {
memset(&knet_h->log_levels, default_log_level, KNET_MAX_SUBSYSTEMS);
}
/*
* set pmtud default timers
*/
knet_h->pmtud_interval = KNET_PMTUD_DEFAULT_INTERVAL;
/*
* set transports reconnect default timers
*/
knet_h->reconnect_int = KNET_TRANSPORT_DEFAULT_RECONNECT_INTERVAL;
/*
* Set 'min' stats to the maximum value so the
* first value we get is always less
*/
knet_h->stats.tx_compress_time_min = UINT64_MAX;
knet_h->stats.rx_compress_time_min = UINT64_MAX;
knet_h->stats.tx_crypt_time_min = UINT64_MAX;
knet_h->stats.rx_crypt_time_min = UINT64_MAX;
/*
* init global shlib tracker
*/
savederrno = pthread_mutex_lock(&handle_config_mutex);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get handle mutex lock: %s",
strerror(savederrno));
free(knet_h);
knet_h = NULL;
errno = savederrno;
return NULL;
}
knet_ref++;
if (_init_shlib_tracker(knet_h) < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to init handles traceker: %s",
strerror(savederrno));
errno = savederrno;
goto exit_fail;
}
pthread_mutex_unlock(&handle_config_mutex);
/*
* init main locking structures
*/
if (_init_locks(knet_h)) {
savederrno = errno;
goto exit_fail;
}
/*
* init sockets
*/
if (_init_socks(knet_h)) {
savederrno = errno;
goto exit_fail;
}
/*
* allocate packet buffers
*/
if (_init_buffers(knet_h)) {
savederrno = errno;
goto exit_fail;
}
if (compress_init(knet_h)) {
savederrno = errno;
goto exit_fail;
}
/*
* create epoll fds
*/
if (_init_epolls(knet_h)) {
savederrno = errno;
goto exit_fail;
}
/*
* start transports
*/
if (start_all_transports(knet_h)) {
savederrno = errno;
goto exit_fail;
}
/*
* start internal threads
*/
if (_start_threads(knet_h)) {
savederrno = errno;
goto exit_fail;
}
wait_all_threads_status(knet_h, KNET_THREAD_STARTED);
errno = 0;
return knet_h;
exit_fail:
knet_handle_free(knet_h);
errno = savederrno;
return NULL;
}
knet_handle_t knet_handle_new(knet_node_id_t host_id,
int log_fd,
uint8_t default_log_level)
{
return knet_handle_new_ex(host_id, log_fd, default_log_level, KNET_HANDLE_FLAG_PRIVILEGED);
}
int knet_handle_free(knet_handle_t knet_h)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (knet_h->host_head != NULL) {
savederrno = EBUSY;
log_err(knet_h, KNET_SUB_HANDLE,
"Unable to free handle: host(s) or listener(s) are still active: %s",
strerror(savederrno));
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return -1;
}
knet_h->fini_in_progress = 1;
pthread_rwlock_unlock(&knet_h->global_rwlock);
_stop_threads(knet_h);
stop_all_transports(knet_h);
_close_epolls(knet_h);
_destroy_buffers(knet_h);
_close_socks(knet_h);
crypto_fini(knet_h);
compress_fini(knet_h, 1);
_destroy_locks(knet_h);
free(knet_h);
knet_h = NULL;
(void)pthread_mutex_lock(&handle_config_mutex);
knet_ref--;
_fini_shlib_tracker();
pthread_mutex_unlock(&handle_config_mutex);
errno = 0;
return 0;
}
int knet_handle_enable_sock_notify(knet_handle_t knet_h,
void *sock_notify_fn_private_data,
void (*sock_notify_fn) (
void *private_data,
int datafd,
int8_t channel,
uint8_t tx_rx,
int error,
int errorno))
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!sock_notify_fn) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->sock_notify_fn_private_data = sock_notify_fn_private_data;
knet_h->sock_notify_fn = sock_notify_fn;
log_debug(knet_h, KNET_SUB_HANDLE, "sock_notify_fn enabled");
pthread_rwlock_unlock(&knet_h->global_rwlock);
return 0;
}
int knet_handle_add_datafd(knet_handle_t knet_h, int *datafd, int8_t *channel)
{
int err = 0, savederrno = 0;
int i;
struct epoll_event ev;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (datafd == NULL) {
errno = EINVAL;
return -1;
}
if (channel == NULL) {
errno = EINVAL;
return -1;
}
if (*channel >= KNET_DATAFD_MAX) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->sock_notify_fn) {
log_err(knet_h, KNET_SUB_HANDLE, "Adding datafd requires sock notify callback enabled!");
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
if (*datafd > 0) {
for (i = 0; i < KNET_DATAFD_MAX; i++) {
if ((knet_h->sockfd[i].in_use) && (knet_h->sockfd[i].sockfd[0] == *datafd)) {
log_err(knet_h, KNET_SUB_HANDLE, "requested datafd: %d already exist in index: %d", *datafd, i);
savederrno = EEXIST;
err = -1;
goto out_unlock;
}
}
}
/*
* auto allocate a channel
*/
if (*channel < 0) {
for (i = 0; i < KNET_DATAFD_MAX; i++) {
if (!knet_h->sockfd[i].in_use) {
*channel = i;
break;
}
}
if (*channel < 0) {
savederrno = EBUSY;
err = -1;
goto out_unlock;
}
} else {
if (knet_h->sockfd[*channel].in_use) {
savederrno = EBUSY;
err = -1;
goto out_unlock;
}
}
knet_h->sockfd[*channel].is_created = 0;
knet_h->sockfd[*channel].is_socket = 0;
knet_h->sockfd[*channel].has_error = 0;
if (*datafd > 0) {
int sockopt;
socklen_t sockoptlen = sizeof(sockopt);
if (_fdset_cloexec(*datafd)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on datafd: %s",
strerror(savederrno));
goto out_unlock;
}
if (_fdset_nonblock(*datafd)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to set NONBLOCK on datafd: %s",
strerror(savederrno));
goto out_unlock;
}
knet_h->sockfd[*channel].sockfd[0] = *datafd;
knet_h->sockfd[*channel].sockfd[1] = 0;
if (!getsockopt(knet_h->sockfd[*channel].sockfd[0], SOL_SOCKET, SO_TYPE, &sockopt, &sockoptlen)) {
knet_h->sockfd[*channel].is_socket = 1;
}
} else {
if (_init_socketpair(knet_h, knet_h->sockfd[*channel].sockfd)) {
savederrno = errno;
err = -1;
goto out_unlock;
}
knet_h->sockfd[*channel].is_created = 1;
knet_h->sockfd[*channel].is_socket = 1;
*datafd = knet_h->sockfd[*channel].sockfd[0];
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created];
if (epoll_ctl(knet_h->send_to_links_epollfd,
EPOLL_CTL_ADD, knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created], &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to add datafd %d to linkfd epoll pool: %s",
knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created], strerror(savederrno));
if (knet_h->sockfd[*channel].is_created) {
_close_socketpair(knet_h, knet_h->sockfd[*channel].sockfd);
}
goto out_unlock;
}
knet_h->sockfd[*channel].in_use = 1;
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_handle_remove_datafd(knet_handle_t knet_h, int datafd)
{
int err = 0, savederrno = 0;
int8_t channel = -1;
int i;
struct epoll_event ev;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (datafd <= 0) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
for (i = 0; i < KNET_DATAFD_MAX; i++) {
if ((knet_h->sockfd[i].in_use) &&
(knet_h->sockfd[i].sockfd[0] == datafd)) {
channel = i;
break;
}
}
if (channel < 0) {
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
if (!knet_h->sockfd[channel].has_error) {
memset(&ev, 0, sizeof(struct epoll_event));
if (epoll_ctl(knet_h->send_to_links_epollfd,
EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to del datafd %d from linkfd epoll pool: %s",
knet_h->sockfd[channel].sockfd[0], strerror(savederrno));
goto out_unlock;
}
}
if (knet_h->sockfd[channel].is_created) {
_close_socketpair(knet_h, knet_h->sockfd[channel].sockfd);
}
memset(&knet_h->sockfd[channel], 0, sizeof(struct knet_sock));
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_handle_get_datafd(knet_handle_t knet_h, const int8_t channel, int *datafd)
{
int err = 0, savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if ((channel < 0) || (channel >= KNET_DATAFD_MAX)) {
errno = EINVAL;
return -1;
}
if (datafd == NULL) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->sockfd[channel].in_use) {
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
*datafd = knet_h->sockfd[channel].sockfd[0];
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_handle_get_channel(knet_handle_t knet_h, const int datafd, int8_t *channel)
{
int err = 0, savederrno = 0;
int i;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (datafd <= 0) {
errno = EINVAL;
return -1;
}
if (channel == NULL) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
*channel = -1;
for (i = 0; i < KNET_DATAFD_MAX; i++) {
if ((knet_h->sockfd[i].in_use) &&
(knet_h->sockfd[i].sockfd[0] == datafd)) {
*channel = i;
break;
}
}
if (*channel < 0) {
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_handle_enable_filter(knet_handle_t knet_h,
void *dst_host_filter_fn_private_data,
int (*dst_host_filter_fn) (
void *private_data,
const unsigned char *outdata,
ssize_t outdata_len,
uint8_t tx_rx,
knet_node_id_t this_host_id,
knet_node_id_t src_node_id,
int8_t *channel,
knet_node_id_t *dst_host_ids,
size_t *dst_host_ids_entries))
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->dst_host_filter_fn_private_data = dst_host_filter_fn_private_data;
knet_h->dst_host_filter_fn = dst_host_filter_fn;
if (knet_h->dst_host_filter_fn) {
log_debug(knet_h, KNET_SUB_HANDLE, "dst_host_filter_fn enabled");
} else {
log_debug(knet_h, KNET_SUB_HANDLE, "dst_host_filter_fn disabled");
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = 0;
return 0;
}
int knet_handle_setfwd(knet_handle_t knet_h, unsigned int enabled)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (enabled > 1) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->enabled = enabled;
if (enabled) {
log_debug(knet_h, KNET_SUB_HANDLE, "Data forwarding is enabled");
} else {
log_debug(knet_h, KNET_SUB_HANDLE, "Data forwarding is disabled");
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = 0;
return 0;
}
int knet_handle_enable_access_lists(knet_handle_t knet_h, unsigned int enabled)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (enabled > 1) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->use_access_lists = enabled;
if (enabled) {
log_debug(knet_h, KNET_SUB_HANDLE, "Links access lists are enabled");
} else {
log_debug(knet_h, KNET_SUB_HANDLE, "Links access lists are disabled");
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = 0;
return 0;
}
int knet_handle_pmtud_getfreq(knet_handle_t knet_h, unsigned int *interval)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!interval) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
*interval = knet_h->pmtud_interval;
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = 0;
return 0;
}
int knet_handle_pmtud_setfreq(knet_handle_t knet_h, unsigned int interval)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if ((!interval) || (interval > 86400)) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->pmtud_interval = interval;
log_debug(knet_h, KNET_SUB_HANDLE, "PMTUd interval set to: %u seconds", interval);
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = 0;
return 0;
}
int knet_handle_enable_pmtud_notify(knet_handle_t knet_h,
void *pmtud_notify_fn_private_data,
void (*pmtud_notify_fn) (
void *private_data,
unsigned int data_mtu))
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->pmtud_notify_fn_private_data = pmtud_notify_fn_private_data;
knet_h->pmtud_notify_fn = pmtud_notify_fn;
if (knet_h->pmtud_notify_fn) {
log_debug(knet_h, KNET_SUB_HANDLE, "pmtud_notify_fn enabled");
} else {
log_debug(knet_h, KNET_SUB_HANDLE, "pmtud_notify_fn disabled");
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = 0;
return 0;
}
int knet_handle_pmtud_get(knet_handle_t knet_h,
unsigned int *data_mtu)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!data_mtu) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
*data_mtu = knet_h->data_mtu;
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = 0;
return 0;
}
int knet_handle_crypto(knet_handle_t knet_h, struct knet_handle_crypto_cfg *knet_handle_crypto_cfg)
{
int savederrno = 0;
int err = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!knet_handle_crypto_cfg) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
crypto_fini(knet_h);
if ((!strncmp("none", knet_handle_crypto_cfg->crypto_model, 4)) ||
((!strncmp("none", knet_handle_crypto_cfg->crypto_cipher_type, 4)) &&
(!strncmp("none", knet_handle_crypto_cfg->crypto_hash_type, 4)))) {
log_debug(knet_h, KNET_SUB_CRYPTO, "crypto is not enabled");
err = 0;
goto exit_unlock;
}
if (knet_handle_crypto_cfg->private_key_len < KNET_MIN_KEY_LEN) {
log_debug(knet_h, KNET_SUB_CRYPTO, "private key len too short (min %d): %u",
KNET_MIN_KEY_LEN, knet_handle_crypto_cfg->private_key_len);
savederrno = EINVAL;
err = -1;
goto exit_unlock;
}
if (knet_handle_crypto_cfg->private_key_len > KNET_MAX_KEY_LEN) {
log_debug(knet_h, KNET_SUB_CRYPTO, "private key len too long (max %d): %u",
KNET_MAX_KEY_LEN, knet_handle_crypto_cfg->private_key_len);
savederrno = EINVAL;
err = -1;
goto exit_unlock;
}
err = crypto_init(knet_h, knet_handle_crypto_cfg);
if (err) {
err = -2;
savederrno = errno;
}
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_handle_compress(knet_handle_t knet_h, struct knet_handle_compress_cfg *knet_handle_compress_cfg)
{
int savederrno = 0;
int err = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!knet_handle_compress_cfg) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
compress_fini(knet_h, 0);
err = compress_cfg(knet_h, knet_handle_compress_cfg);
savederrno = errno;
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
ssize_t knet_recv(knet_handle_t knet_h, char *buff, const size_t buff_len, const int8_t channel)
{
int savederrno = 0;
ssize_t err = 0;
struct iovec iov_in;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (buff == NULL) {
errno = EINVAL;
return -1;
}
if (buff_len <= 0) {
errno = EINVAL;
return -1;
}
if (buff_len > KNET_MAX_PACKET_SIZE) {
errno = EINVAL;
return -1;
}
if (channel < 0) {
errno = EINVAL;
return -1;
}
if (channel >= KNET_DATAFD_MAX) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->sockfd[channel].in_use) {
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
memset(&iov_in, 0, sizeof(iov_in));
iov_in.iov_base = (void *)buff;
iov_in.iov_len = buff_len;
err = readv(knet_h->sockfd[channel].sockfd[0], &iov_in, 1);
savederrno = errno;
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
ssize_t knet_send(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel)
{
int savederrno = 0;
ssize_t err = 0;
struct iovec iov_out[1];
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (buff == NULL) {
errno = EINVAL;
return -1;
}
if (buff_len <= 0) {
errno = EINVAL;
return -1;
}
if (buff_len > KNET_MAX_PACKET_SIZE) {
errno = EINVAL;
return -1;
}
if (channel < 0) {
errno = EINVAL;
return -1;
}
if (channel >= KNET_DATAFD_MAX) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->sockfd[channel].in_use) {
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
memset(iov_out, 0, sizeof(iov_out));
iov_out[0].iov_base = (void *)buff;
iov_out[0].iov_len = buff_len;
err = writev(knet_h->sockfd[channel].sockfd[0], iov_out, 1);
savederrno = errno;
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_handle_get_stats(knet_handle_t knet_h, struct knet_handle_stats *stats, size_t struct_size)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!stats) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (struct_size > sizeof(struct knet_handle_stats)) {
struct_size = sizeof(struct knet_handle_stats);
}
memmove(stats, &knet_h->stats, struct_size);
/*
* TX crypt stats only count the data packets sent, so add in the ping/pong/pmtud figures
* RX is OK as it counts them before they are sorted.
*/
stats->tx_crypt_packets += knet_h->stats_extra.tx_crypt_ping_packets +
knet_h->stats_extra.tx_crypt_pong_packets +
knet_h->stats_extra.tx_crypt_pmtu_packets +
knet_h->stats_extra.tx_crypt_pmtu_reply_packets;
/* Tell the caller our full size in case they have an old version */
stats->size = sizeof(struct knet_handle_stats);
pthread_rwlock_unlock(&knet_h->global_rwlock);
return 0;
}
int knet_handle_clear_stats(knet_handle_t knet_h, int clear_option)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (clear_option != KNET_CLEARSTATS_HANDLE_ONLY &&
clear_option != KNET_CLEARSTATS_HANDLE_AND_LINK) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
memset(&knet_h->stats, 0, sizeof(struct knet_handle_stats));
memset(&knet_h->stats_extra, 0, sizeof(struct knet_handle_stats_extra));
if (clear_option == KNET_CLEARSTATS_HANDLE_AND_LINK) {
_link_clear_stats(knet_h);
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
return 0;
}
-
diff --git a/libknet/threads_pmtud.c b/libknet/threads_pmtud.c
index c5a2b276..b4ee632d 100644
--- a/libknet/threads_pmtud.c
+++ b/libknet/threads_pmtud.c
@@ -1,569 +1,568 @@
/*
* Copyright (C) 2015-2019 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include "crypto.h"
#include "links.h"
#include "host.h"
#include "logging.h"
#include "transports.h"
#include "threads_common.h"
#include "threads_pmtud.h"
static int _handle_check_link_pmtud(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link)
{
int err, ret, savederrno, mutex_retry_limit, failsafe, use_kernel_mtu, warn_once;
uint32_t kernel_mtu; /* record kernel_mtu from EMSGSIZE */
size_t onwire_len; /* current packet onwire size */
size_t overhead_len; /* onwire packet overhead (protocol based) */
size_t max_mtu_len; /* max mtu for protocol */
size_t data_len; /* how much data we can send in the packet
* generally would be onwire_len - overhead_len
* needs to be adjusted for crypto
*/
size_t pad_len; /* crypto packet pad size, needs to move into crypto.c callbacks */
ssize_t len; /* len of what we were able to sendto onwire */
struct timespec ts;
unsigned long long pong_timeout_adj_tmp;
unsigned char *outbuf = (unsigned char *)knet_h->pmtudbuf;
warn_once = 0;
mutex_retry_limit = 0;
failsafe = 0;
- pad_len = 0;
dst_link->last_bad_mtu = 0;
knet_h->pmtudbuf->khp_pmtud_link = dst_link->link_id;
switch (dst_link->dst_addr.ss_family) {
case AF_INET6:
max_mtu_len = KNET_PMTUD_SIZE_V6;
overhead_len = KNET_PMTUD_OVERHEAD_V6 + dst_link->proto_overhead;
dst_link->last_good_mtu = dst_link->last_ping_size + overhead_len;
break;
case AF_INET:
max_mtu_len = KNET_PMTUD_SIZE_V4;
overhead_len = KNET_PMTUD_OVERHEAD_V4 + dst_link->proto_overhead;
dst_link->last_good_mtu = dst_link->last_ping_size + overhead_len;
break;
default:
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD aborted, unknown protocol");
return -1;
break;
}
/*
* discovery starts from the top because kernel will
* refuse to send packets > current iface mtu.
* this saves us some time and network bw.
*/
onwire_len = max_mtu_len;
restart:
/*
* prevent a race when interface mtu is changed _exactly_ during
* the discovery process and it's complex to detect. Easier
* to wait the next loop.
* 30 is not an arbitrary value. To bisect from 576 to 128000 doesn't
* take more than 18/19 steps.
*/
if (failsafe == 30) {
log_err(knet_h, KNET_SUB_PMTUD,
"Aborting PMTUD process: Too many attempts. MTU might have changed during discovery.");
return -1;
} else {
failsafe++;
}
data_len = onwire_len - overhead_len;
if (knet_h->crypto_instance) {
if (knet_h->sec_block_size) {
pad_len = knet_h->sec_block_size - (data_len % knet_h->sec_block_size);
if (pad_len == knet_h->sec_block_size) {
pad_len = 0;
}
data_len = data_len + pad_len;
}
data_len = data_len + (knet_h->sec_hash_size + knet_h->sec_salt_size + knet_h->sec_block_size);
if (knet_h->sec_block_size) {
while (data_len + overhead_len >= max_mtu_len) {
data_len = data_len - knet_h->sec_block_size;
}
}
if (dst_link->last_bad_mtu) {
while (data_len + overhead_len >= dst_link->last_bad_mtu) {
data_len = data_len - (knet_h->sec_hash_size + knet_h->sec_salt_size + knet_h->sec_block_size);
}
}
if (data_len < (knet_h->sec_hash_size + knet_h->sec_salt_size + knet_h->sec_block_size) + 1) {
log_debug(knet_h, KNET_SUB_PMTUD, "Aborting PMTUD process: link mtu smaller than crypto header detected (link might have been disconnected)");
return -1;
}
onwire_len = data_len + overhead_len;
knet_h->pmtudbuf->khp_pmtud_size = onwire_len;
if (crypto_encrypt_and_sign(knet_h,
(const unsigned char *)knet_h->pmtudbuf,
data_len - (knet_h->sec_hash_size + knet_h->sec_salt_size + knet_h->sec_block_size),
knet_h->pmtudbuf_crypt,
(ssize_t *)&data_len) < 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to crypto pmtud packet");
return -1;
}
outbuf = knet_h->pmtudbuf_crypt;
knet_h->stats_extra.tx_crypt_pmtu_packets++;
} else {
knet_h->pmtudbuf->khp_pmtud_size = onwire_len;
}
/* link has gone down, aborting pmtud */
if (dst_link->status.connected != 1) {
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD detected host (%u) link (%u) has been disconnected", dst_host->host_id, dst_link->link_id);
return -1;
}
if (dst_link->transport_connected != 1) {
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD detected host (%u) link (%u) has been disconnected", dst_host->host_id, dst_link->link_id);
return -1;
}
if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock");
return -1;
}
if (knet_h->pmtud_abort) {
pthread_mutex_unlock(&knet_h->pmtud_mutex);
errno = EDEADLK;
return -1;
}
savederrno = pthread_mutex_lock(&knet_h->tx_mutex);
if (savederrno) {
log_err(knet_h, KNET_SUB_PMTUD, "Unable to get TX mutex lock: %s", strerror(savederrno));
return -1;
}
retry:
if (transport_get_connection_oriented(knet_h, dst_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) {
len = sendto(dst_link->outsock, outbuf, data_len, MSG_DONTWAIT | MSG_NOSIGNAL,
(struct sockaddr *) &dst_link->dst_addr, sizeof(struct sockaddr_storage));
} else {
len = sendto(dst_link->outsock, outbuf, data_len, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0);
}
savederrno = errno;
/*
* we cannot hold a lock on kmtu_mutex between resetting
* knet_h->kernel_mtu here and below where it's used.
* use_kernel_mtu tells us if the knet_h->kernel_mtu was
* set to 0 and we can trust its value later.
*/
use_kernel_mtu = 0;
if (pthread_mutex_lock(&knet_h->kmtu_mutex) == 0) {
use_kernel_mtu = 1;
knet_h->kernel_mtu = 0;
pthread_mutex_unlock(&knet_h->kmtu_mutex);
}
kernel_mtu = 0;
err = transport_tx_sock_error(knet_h, dst_link->transport, dst_link->outsock, len, savederrno);
switch(err) {
case -1: /* unrecoverable error */
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to send pmtu packet (sendto): %d %s", savederrno, strerror(savederrno));
pthread_mutex_unlock(&knet_h->tx_mutex);
pthread_mutex_unlock(&knet_h->pmtud_mutex);
dst_link->status.stats.tx_pmtu_errors++;
return -1;
case 0: /* ignore error and continue */
break;
case 1: /* retry to send those same data */
dst_link->status.stats.tx_pmtu_retries++;
goto retry;
break;
}
pthread_mutex_unlock(&knet_h->tx_mutex);
if (len != (ssize_t )data_len) {
if (savederrno == EMSGSIZE) {
/*
* we cannot hold a lock on kmtu_mutex between resetting
* knet_h->kernel_mtu and here.
* use_kernel_mtu tells us if the knet_h->kernel_mtu was
* set to 0 previously and we can trust its value now.
*/
if (use_kernel_mtu) {
use_kernel_mtu = 0;
if (pthread_mutex_lock(&knet_h->kmtu_mutex) == 0) {
kernel_mtu = knet_h->kernel_mtu;
pthread_mutex_unlock(&knet_h->kmtu_mutex);
}
}
if (kernel_mtu > 0) {
dst_link->last_bad_mtu = kernel_mtu + 1;
} else {
dst_link->last_bad_mtu = onwire_len;
}
} else {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to send pmtu packet len: %zu err: %s", onwire_len, strerror(savederrno));
}
} else {
dst_link->last_sent_mtu = onwire_len;
dst_link->last_recv_mtu = 0;
dst_link->status.stats.tx_pmtu_packets++;
dst_link->status.stats.tx_pmtu_bytes += data_len;
if (clock_gettime(CLOCK_REALTIME, &ts) < 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get current time: %s", strerror(errno));
pthread_mutex_unlock(&knet_h->pmtud_mutex);
return -1;
}
/*
* set PMTUd reply timeout to match pong_timeout on a given link
*
* math: internally pong_timeout is expressed in microseconds, while
* the public API exports milliseconds. So careful with the 0's here.
* the loop is necessary because we are grabbing the current time just above
* and add values to it that could overflow into seconds.
*/
if (pthread_mutex_lock(&knet_h->backoff_mutex)) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get backoff_mutex");
pthread_mutex_unlock(&knet_h->pmtud_mutex);
return -1;
}
if (knet_h->crypto_instance) {
/*
* crypto, under pressure, is a royal PITA
*/
pong_timeout_adj_tmp = dst_link->pong_timeout_adj * 2;
} else {
pong_timeout_adj_tmp = dst_link->pong_timeout_adj;
}
ts.tv_sec += pong_timeout_adj_tmp / 1000000;
ts.tv_nsec += (((pong_timeout_adj_tmp) % 1000000) * 1000);
while (ts.tv_nsec > 1000000000) {
ts.tv_sec += 1;
ts.tv_nsec -= 1000000000;
}
pthread_mutex_unlock(&knet_h->backoff_mutex);
knet_h->pmtud_waiting = 1;
ret = pthread_cond_timedwait(&knet_h->pmtud_cond, &knet_h->pmtud_mutex, &ts);
knet_h->pmtud_waiting = 0;
if (knet_h->pmtud_abort) {
pthread_mutex_unlock(&knet_h->pmtud_mutex);
errno = EDEADLK;
return -1;
}
if (shutdown_in_progress(knet_h)) {
pthread_mutex_unlock(&knet_h->pmtud_mutex);
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD aborted. shutdown in progress");
return -1;
}
if (ret) {
if (ret == ETIMEDOUT) {
if (!warn_once) {
log_warn(knet_h, KNET_SUB_PMTUD,
"possible MTU misconfiguration detected. "
"kernel is reporting MTU: %u bytes for "
"host %u link %u but the other node is "
"not acknowledging packets of this size. ",
dst_link->last_sent_mtu,
dst_host->host_id,
dst_link->link_id);
log_warn(knet_h, KNET_SUB_PMTUD,
"This can be caused by this node interface MTU "
"too big or a network device that does not "
"support or has been misconfigured to manage MTU "
"of this size, or packet loss. knet will continue "
"to run but performances might be affected.");
warn_once = 1;
}
} else {
pthread_mutex_unlock(&knet_h->pmtud_mutex);
if (mutex_retry_limit == 3) {
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD aborted, unable to get mutex lock");
return -1;
}
mutex_retry_limit++;
goto restart;
}
}
if ((dst_link->last_recv_mtu != onwire_len) || (ret)) {
dst_link->last_bad_mtu = onwire_len;
} else {
int found_mtu = 0;
if (knet_h->sec_block_size) {
if ((onwire_len + knet_h->sec_block_size >= max_mtu_len) ||
((dst_link->last_bad_mtu) && (dst_link->last_bad_mtu <= (onwire_len + knet_h->sec_block_size)))) {
found_mtu = 1;
}
} else {
if ((onwire_len == max_mtu_len) ||
((dst_link->last_bad_mtu) && (dst_link->last_bad_mtu == (onwire_len + 1))) ||
(dst_link->last_bad_mtu == dst_link->last_good_mtu)) {
found_mtu = 1;
}
}
if (found_mtu) {
/*
* account for IP overhead, knet headers and crypto in PMTU calculation
*/
dst_link->status.mtu = onwire_len - dst_link->status.proto_overhead;
pthread_mutex_unlock(&knet_h->pmtud_mutex);
return 0;
}
dst_link->last_good_mtu = onwire_len;
}
}
if (kernel_mtu) {
onwire_len = kernel_mtu;
} else {
onwire_len = (dst_link->last_good_mtu + dst_link->last_bad_mtu) / 2;
}
pthread_mutex_unlock(&knet_h->pmtud_mutex);
goto restart;
}
static int _handle_check_pmtud(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link, unsigned int *min_mtu, int force_run)
{
uint8_t saved_valid_pmtud;
unsigned int saved_pmtud;
struct timespec clock_now;
unsigned long long diff_pmtud, interval;
if (clock_gettime(CLOCK_MONOTONIC, &clock_now) != 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get monotonic clock");
return 0;
}
if (!force_run) {
interval = knet_h->pmtud_interval * 1000000000llu; /* nanoseconds */
timespec_diff(dst_link->pmtud_last, clock_now, &diff_pmtud);
if (diff_pmtud < interval) {
*min_mtu = dst_link->status.mtu;
return dst_link->has_valid_mtu;
}
}
switch (dst_link->dst_addr.ss_family) {
case AF_INET6:
dst_link->status.proto_overhead = KNET_PMTUD_OVERHEAD_V6 + dst_link->proto_overhead + KNET_HEADER_ALL_SIZE + knet_h->sec_header_size;
break;
case AF_INET:
dst_link->status.proto_overhead = KNET_PMTUD_OVERHEAD_V4 + dst_link->proto_overhead + KNET_HEADER_ALL_SIZE + knet_h->sec_header_size;
break;
}
saved_pmtud = dst_link->status.mtu;
saved_valid_pmtud = dst_link->has_valid_mtu;
log_debug(knet_h, KNET_SUB_PMTUD, "Starting PMTUD for host: %u link: %u", dst_host->host_id, dst_link->link_id);
errno = 0;
if (_handle_check_link_pmtud(knet_h, dst_host, dst_link) < 0) {
if (errno == EDEADLK) {
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD for host: %u link: %u has been rescheduled", dst_host->host_id, dst_link->link_id);
dst_link->status.mtu = saved_pmtud;
dst_link->has_valid_mtu = saved_valid_pmtud;
errno = EDEADLK;
return dst_link->has_valid_mtu;
}
dst_link->has_valid_mtu = 0;
} else {
dst_link->has_valid_mtu = 1;
switch (dst_link->dst_addr.ss_family) {
case AF_INET6:
if (((dst_link->status.mtu + dst_link->status.proto_overhead) < KNET_PMTUD_MIN_MTU_V6) ||
((dst_link->status.mtu + dst_link->status.proto_overhead) > KNET_PMTUD_SIZE_V6)) {
log_debug(knet_h, KNET_SUB_PMTUD,
"PMTUD detected an IPv6 MTU out of bound value (%u) for host: %u link: %u.",
dst_link->status.mtu + dst_link->status.proto_overhead, dst_host->host_id, dst_link->link_id);
dst_link->has_valid_mtu = 0;
}
break;
case AF_INET:
if (((dst_link->status.mtu + dst_link->status.proto_overhead) < KNET_PMTUD_MIN_MTU_V4) ||
((dst_link->status.mtu + dst_link->status.proto_overhead) > KNET_PMTUD_SIZE_V4)) {
log_debug(knet_h, KNET_SUB_PMTUD,
"PMTUD detected an IPv4 MTU out of bound value (%u) for host: %u link: %u.",
dst_link->status.mtu + dst_link->status.proto_overhead, dst_host->host_id, dst_link->link_id);
dst_link->has_valid_mtu = 0;
}
break;
}
if (dst_link->has_valid_mtu) {
if ((saved_pmtud) && (saved_pmtud != dst_link->status.mtu)) {
log_info(knet_h, KNET_SUB_PMTUD, "PMTUD link change for host: %u link: %u from %u to %u",
dst_host->host_id, dst_link->link_id, saved_pmtud, dst_link->status.mtu);
}
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD completed for host: %u link: %u current link mtu: %u",
dst_host->host_id, dst_link->link_id, dst_link->status.mtu);
if (dst_link->status.mtu < *min_mtu) {
*min_mtu = dst_link->status.mtu;
}
/*
* set pmtud_last, if we can, after we are done with the PMTUd process
* because it can take a very long time.
*/
dst_link->pmtud_last = clock_now;
if (!clock_gettime(CLOCK_MONOTONIC, &clock_now)) {
dst_link->pmtud_last = clock_now;
}
}
}
if (saved_valid_pmtud != dst_link->has_valid_mtu) {
_host_dstcache_update_sync(knet_h, dst_host);
}
return dst_link->has_valid_mtu;
}
void *_handle_pmtud_link_thread(void *data)
{
knet_handle_t knet_h = (knet_handle_t) data;
struct knet_host *dst_host;
struct knet_link *dst_link;
int link_idx;
unsigned int min_mtu, have_mtu;
unsigned int lower_mtu;
int link_has_mtu;
int force_run = 0;
set_thread_status(knet_h, KNET_THREAD_PMTUD, KNET_THREAD_STARTED);
knet_h->data_mtu = KNET_PMTUD_MIN_MTU_V4 - KNET_HEADER_ALL_SIZE - knet_h->sec_header_size;
/* preparing pmtu buffer */
knet_h->pmtudbuf->kh_version = KNET_HEADER_VERSION;
knet_h->pmtudbuf->kh_type = KNET_HEADER_TYPE_PMTUD;
knet_h->pmtudbuf->kh_node = htons(knet_h->host_id);
while (!shutdown_in_progress(knet_h)) {
usleep(KNET_THREADS_TIMERES);
if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock");
continue;
}
knet_h->pmtud_abort = 0;
knet_h->pmtud_running = 1;
force_run = knet_h->pmtud_forcerun;
knet_h->pmtud_forcerun = 0;
pthread_mutex_unlock(&knet_h->pmtud_mutex);
if (force_run) {
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUd request to rerun has been received");
}
if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get read lock");
continue;
}
lower_mtu = KNET_PMTUD_SIZE_V4;
min_mtu = KNET_PMTUD_SIZE_V4 - KNET_HEADER_ALL_SIZE - knet_h->sec_header_size;
have_mtu = 0;
for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
dst_link = &dst_host->link[link_idx];
if ((dst_link->status.enabled != 1) ||
(dst_link->status.connected != 1) ||
(dst_host->link[link_idx].transport == KNET_TRANSPORT_LOOPBACK) ||
(!dst_link->last_ping_size) ||
((dst_link->dynamic == KNET_LINK_DYNIP) &&
(dst_link->status.dynconnected != 1)))
continue;
link_has_mtu = _handle_check_pmtud(knet_h, dst_host, dst_link, &min_mtu, force_run);
if (errno == EDEADLK) {
goto out_unlock;
}
if (link_has_mtu) {
have_mtu = 1;
if (min_mtu < lower_mtu) {
lower_mtu = min_mtu;
}
}
}
}
if (have_mtu) {
if (knet_h->data_mtu != lower_mtu) {
knet_h->data_mtu = lower_mtu;
log_info(knet_h, KNET_SUB_PMTUD, "Global data MTU changed to: %u", knet_h->data_mtu);
if (knet_h->pmtud_notify_fn) {
knet_h->pmtud_notify_fn(knet_h->pmtud_notify_fn_private_data,
knet_h->data_mtu);
}
}
}
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock");
} else {
knet_h->pmtud_running = 0;
pthread_mutex_unlock(&knet_h->pmtud_mutex);
}
}
set_thread_status(knet_h, KNET_THREAD_PMTUD, KNET_THREAD_STOPPED);
return NULL;
}
diff --git a/libknet/threads_rx.c b/libknet/threads_rx.c
index ae39b38e..626cbc4c 100644
--- a/libknet/threads_rx.c
+++ b/libknet/threads_rx.c
@@ -1,880 +1,879 @@
/*
* Copyright (C) 2012-2019 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <sys/uio.h>
#include <pthread.h>
#include "compat.h"
#include "compress.h"
#include "crypto.h"
#include "host.h"
#include "links.h"
#include "links_acl.h"
#include "logging.h"
#include "transports.h"
#include "transport_common.h"
#include "threads_common.h"
#include "threads_heartbeat.h"
#include "threads_rx.h"
#include "netutils.h"
/*
* RECV
*/
/*
* return 1 if a > b
* return -1 if b > a
* return 0 if they are equal
*/
static inline int timecmp(struct timespec a, struct timespec b)
{
if (a.tv_sec != b.tv_sec) {
if (a.tv_sec > b.tv_sec) {
return 1;
} else {
return -1;
}
} else {
if (a.tv_nsec > b.tv_nsec) {
return 1;
} else if (a.tv_nsec < b.tv_nsec) {
return -1;
} else {
return 0;
}
}
}
/*
* this functions needs to return an index (0 to 7)
* to a knet_host_defrag_buf. (-1 on errors)
*/
static int find_pckt_defrag_buf(knet_handle_t knet_h, struct knet_header *inbuf)
{
struct knet_host *src_host = knet_h->host_index[inbuf->kh_node];
int i, oldest;
/*
* check if there is a buffer already in use handling the same seq_num
*/
for (i = 0; i < KNET_MAX_LINK; i++) {
if (src_host->defrag_buf[i].in_use) {
if (src_host->defrag_buf[i].pckt_seq == inbuf->khp_data_seq_num) {
return i;
}
}
}
/*
* If there is no buffer that's handling the current seq_num
* either it's new or it's been reclaimed already.
* check if it's been reclaimed/seen before using the defrag circular
* buffer. If the pckt has been seen before, the buffer expired (ETIME)
* and there is no point to try to defrag it again.
*/
if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 1, 0)) {
errno = ETIME;
return -1;
}
/*
* register the pckt as seen
*/
_seq_num_set(src_host, inbuf->khp_data_seq_num, 1);
/*
* see if there is a free buffer
*/
for (i = 0; i < KNET_MAX_LINK; i++) {
if (!src_host->defrag_buf[i].in_use) {
return i;
}
}
/*
* at this point, there are no free buffers, the pckt is new
* and we need to reclaim a buffer, and we will take the one
* with the oldest timestamp. It's as good as any.
*/
oldest = 0;
for (i = 0; i < KNET_MAX_LINK; i++) {
if (timecmp(src_host->defrag_buf[i].last_update, src_host->defrag_buf[oldest].last_update) < 0) {
oldest = i;
}
}
src_host->defrag_buf[oldest].in_use = 0;
return oldest;
}
static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *len)
{
struct knet_host_defrag_buf *defrag_buf;
int defrag_buf_idx;
defrag_buf_idx = find_pckt_defrag_buf(knet_h, inbuf);
if (defrag_buf_idx < 0) {
if (errno == ETIME) {
log_debug(knet_h, KNET_SUB_RX, "Defrag buffer expired");
}
return 1;
}
defrag_buf = &knet_h->host_index[inbuf->kh_node]->defrag_buf[defrag_buf_idx];
/*
* if the buf is not is use, then make sure it's clean
*/
if (!defrag_buf->in_use) {
memset(defrag_buf, 0, sizeof(struct knet_host_defrag_buf));
defrag_buf->in_use = 1;
defrag_buf->pckt_seq = inbuf->khp_data_seq_num;
}
/*
* update timestamp on the buffer
*/
clock_gettime(CLOCK_MONOTONIC, &defrag_buf->last_update);
/*
* check if we already received this fragment
*/
if (defrag_buf->frag_map[inbuf->khp_data_frag_seq]) {
/*
* if we have received this fragment and we didn't clear the buffer
* it means that we don't have all fragments yet
*/
return 1;
}
/*
* we need to handle the last packet with gloves due to its different size
*/
if (inbuf->khp_data_frag_seq == inbuf->khp_data_frag_num) {
defrag_buf->last_frag_size = *len;
/*
* in the event when the last packet arrives first,
* we still don't know the offset vs the other fragments (based on MTU),
* so we store the fragment at the end of the buffer where it's safe
* and take a copy of the len so that we can restore its offset later.
* remember we can't use the local MTU for this calculation because pMTU
* can be asymettric between the same hosts.
*/
if (!defrag_buf->frag_size) {
defrag_buf->last_first = 1;
memmove(defrag_buf->buf + (KNET_MAX_PACKET_SIZE - *len),
inbuf->khp_data_userdata,
*len);
}
} else {
defrag_buf->frag_size = *len;
}
memmove(defrag_buf->buf + ((inbuf->khp_data_frag_seq - 1) * defrag_buf->frag_size),
inbuf->khp_data_userdata, *len);
defrag_buf->frag_recv++;
defrag_buf->frag_map[inbuf->khp_data_frag_seq] = 1;
/*
* check if we received all the fragments
*/
if (defrag_buf->frag_recv == inbuf->khp_data_frag_num) {
/*
* special case the last pckt
*/
if (defrag_buf->last_first) {
memmove(defrag_buf->buf + ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size),
defrag_buf->buf + (KNET_MAX_PACKET_SIZE - defrag_buf->last_frag_size),
defrag_buf->last_frag_size);
}
/*
* recalculate packet lenght
*/
*len = ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size) + defrag_buf->last_frag_size;
/*
* copy the pckt back in the user data
*/
memmove(inbuf->khp_data_userdata, defrag_buf->buf, *len);
/*
* free this buffer
*/
defrag_buf->in_use = 0;
return 0;
}
return 1;
}
static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struct knet_mmsghdr *msg)
{
int err = 0, savederrno = 0;
ssize_t outlen;
struct knet_host *src_host;
struct knet_link *src_link;
unsigned long long latency_last;
knet_node_id_t dst_host_ids[KNET_MAX_HOST];
size_t dst_host_ids_entries = 0;
int bcast = 1;
int was_decrypted = 0;
uint64_t crypt_time = 0;
struct timespec recvtime;
struct knet_header *inbuf = msg->msg_hdr.msg_iov->iov_base;
unsigned char *outbuf = (unsigned char *)msg->msg_hdr.msg_iov->iov_base;
ssize_t len = msg->msg_len;
struct knet_hostinfo *knet_hostinfo;
struct iovec iov_out[1];
int8_t channel;
struct sockaddr_storage pckt_src;
seq_num_t recv_seq_num;
int wipe_bufs = 0;
if (knet_h->crypto_instance) {
struct timespec start_time;
struct timespec end_time;
clock_gettime(CLOCK_MONOTONIC, &start_time);
if (crypto_authenticate_and_decrypt(knet_h,
(unsigned char *)inbuf,
len,
knet_h->recv_from_links_buf_decrypt,
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to decrypt/auth packet");
return;
}
clock_gettime(CLOCK_MONOTONIC, &end_time);
timespec_diff(start_time, end_time, &crypt_time);
if (crypt_time < knet_h->stats.rx_crypt_time_min) {
knet_h->stats.rx_crypt_time_min = crypt_time;
}
if (crypt_time > knet_h->stats.rx_crypt_time_max) {
knet_h->stats.rx_crypt_time_max = crypt_time;
}
len = outlen;
inbuf = (struct knet_header *)knet_h->recv_from_links_buf_decrypt;
was_decrypted++;
}
if (len < (ssize_t)(KNET_HEADER_SIZE + 1)) {
log_debug(knet_h, KNET_SUB_RX, "Packet is too short: %ld", (long)len);
return;
}
if (inbuf->kh_version != KNET_HEADER_VERSION) {
log_debug(knet_h, KNET_SUB_RX, "Packet version does not match");
return;
}
inbuf->kh_node = ntohs(inbuf->kh_node);
src_host = knet_h->host_index[inbuf->kh_node];
if (src_host == NULL) { /* host not found */
log_debug(knet_h, KNET_SUB_RX, "Unable to find source host for this packet");
return;
}
src_link = NULL;
src_link = src_host->link +
(inbuf->khp_ping_link % KNET_MAX_LINK);
if ((inbuf->kh_type & KNET_HEADER_TYPE_PMSK) != 0) {
if (src_link->dynamic == KNET_LINK_DYNIP) {
/*
* cpyaddrport will only copy address and port of the incoming
* packet and strip extra bits such as flow and scopeid
*/
cpyaddrport(&pckt_src, msg->msg_hdr.msg_name);
if (cmpaddr(&src_link->dst_addr, sockaddr_len(&src_link->dst_addr),
&pckt_src, sockaddr_len(&pckt_src)) != 0) {
log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u appears to have changed ip address",
src_host->host_id, src_link->link_id);
memmove(&src_link->dst_addr, &pckt_src, sizeof(struct sockaddr_storage));
if (knet_addrtostr(&src_link->dst_addr, sockaddr_len(msg->msg_hdr.msg_name),
src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN,
src_link->status.dst_port, KNET_MAX_PORT_LEN) != 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to resolve ???");
snprintf(src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN - 1, "Unknown!!!");
snprintf(src_link->status.dst_port, KNET_MAX_PORT_LEN - 1, "??");
} else {
log_info(knet_h, KNET_SUB_RX,
"host: %u link: %u new connection established from: %s %s",
src_host->host_id, src_link->link_id,
src_link->status.dst_ipaddr, src_link->status.dst_port);
}
}
/*
* transport has already accepted the connection here
* otherwise we would not be receiving packets
*/
transport_link_dyn_connect(knet_h, sockfd, src_link);
}
}
switch (inbuf->kh_type) {
case KNET_HEADER_TYPE_HOST_INFO:
case KNET_HEADER_TYPE_DATA:
/*
* TODO: should we accept data even if we can't reply to the other node?
* how would that work with SCTP and guaranteed delivery?
*/
if (!src_host->status.reachable) {
log_debug(knet_h, KNET_SUB_RX, "Source host %u not reachable yet", src_host->host_id);
//return;
}
inbuf->khp_data_seq_num = ntohs(inbuf->khp_data_seq_num);
channel = inbuf->khp_data_channel;
src_host->got_data = 1;
if (src_link) {
src_link->status.stats.rx_data_packets++;
src_link->status.stats.rx_data_bytes += len;
}
if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) {
if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) {
log_debug(knet_h, KNET_SUB_RX, "Packet has already been delivered");
}
return;
}
if (inbuf->khp_data_frag_num > 1) {
/*
* len as received from the socket also includes extra stuff
* that the defrag code doesn't care about. So strip it
* here and readd only for repadding once we are done
* defragging
*/
len = len - KNET_HEADER_DATA_SIZE;
if (pckt_defrag(knet_h, inbuf, &len)) {
return;
}
len = len + KNET_HEADER_DATA_SIZE;
}
if (inbuf->khp_data_compress) {
ssize_t decmp_outlen = KNET_DATABUFSIZE_COMPRESS;
struct timespec start_time;
struct timespec end_time;
uint64_t compress_time;
clock_gettime(CLOCK_MONOTONIC, &start_time);
err = decompress(knet_h, inbuf->khp_data_compress,
(const unsigned char *)inbuf->khp_data_userdata,
len - KNET_HEADER_DATA_SIZE,
knet_h->recv_from_links_buf_decompress,
&decmp_outlen);
if (!err) {
/* Collect stats */
clock_gettime(CLOCK_MONOTONIC, &end_time);
timespec_diff(start_time, end_time, &compress_time);
if (compress_time < knet_h->stats.rx_compress_time_min) {
knet_h->stats.rx_compress_time_min = compress_time;
}
if (compress_time > knet_h->stats.rx_compress_time_max) {
knet_h->stats.rx_compress_time_max = compress_time;
}
knet_h->stats.rx_compress_time_ave =
(knet_h->stats.rx_compress_time_ave * knet_h->stats.rx_compressed_packets +
compress_time) / (knet_h->stats.rx_compressed_packets+1);
knet_h->stats.rx_compressed_packets++;
knet_h->stats.rx_compressed_original_bytes += decmp_outlen;
knet_h->stats.rx_compressed_size_bytes += len - KNET_HEADER_SIZE;
memmove(inbuf->khp_data_userdata, knet_h->recv_from_links_buf_decompress, decmp_outlen);
len = decmp_outlen + KNET_HEADER_DATA_SIZE;
} else {
log_warn(knet_h, KNET_SUB_COMPRESS, "Unable to decompress packet (%d): %s",
err, strerror(errno));
return;
}
}
if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) {
if (knet_h->enabled != 1) /* data forward is disabled */
break;
/* Only update the crypto overhead for data packets. Mainly to be
consistent with TX */
knet_h->stats.rx_crypt_time_ave =
(knet_h->stats.rx_crypt_time_ave * knet_h->stats.rx_crypt_packets +
crypt_time) / (knet_h->stats.rx_crypt_packets+1);
knet_h->stats.rx_crypt_packets++;
if (knet_h->dst_host_filter_fn) {
size_t host_idx;
int found = 0;
bcast = knet_h->dst_host_filter_fn(
knet_h->dst_host_filter_fn_private_data,
(const unsigned char *)inbuf->khp_data_userdata,
len - KNET_HEADER_DATA_SIZE,
KNET_NOTIFY_RX,
knet_h->host_id,
inbuf->kh_node,
&channel,
dst_host_ids,
&dst_host_ids_entries);
if (bcast < 0) {
log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast);
return;
}
if ((!bcast) && (!dst_host_ids_entries)) {
log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries");
return;
}
/* check if we are dst for this packet */
if (!bcast) {
if (dst_host_ids_entries > KNET_MAX_HOST) {
log_debug(knet_h, KNET_SUB_RX, "dst_host_filter_fn returned too many destinations");
return;
}
for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
if (dst_host_ids[host_idx] == knet_h->host_id) {
found = 1;
break;
}
}
if (!found) {
log_debug(knet_h, KNET_SUB_RX, "Packet is not for us");
return;
}
}
}
}
if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) {
if (!knet_h->sockfd[channel].in_use) {
log_debug(knet_h, KNET_SUB_RX,
"received packet for channel %d but there is no local sock connected",
channel);
return;
}
memset(iov_out, 0, sizeof(iov_out));
iov_out[0].iov_base = (void *) inbuf->khp_data_userdata;
iov_out[0].iov_len = len - KNET_HEADER_DATA_SIZE;
outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1);
if (outlen <= 0) {
knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
knet_h->sockfd[channel].sockfd[0],
channel,
KNET_NOTIFY_RX,
outlen,
errno);
return;
}
if ((size_t)outlen == iov_out[0].iov_len) {
_seq_num_set(src_host, inbuf->khp_data_seq_num, 0);
}
} else { /* HOSTINFO */
knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata;
if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) {
- bcast = 0;
knet_hostinfo->khi_dst_node_id = ntohs(knet_hostinfo->khi_dst_node_id);
}
if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) {
return;
}
_seq_num_set(src_host, inbuf->khp_data_seq_num, 0);
switch(knet_hostinfo->khi_type) {
case KNET_HOSTINFO_TYPE_LINK_UP_DOWN:
break;
case KNET_HOSTINFO_TYPE_LINK_TABLE:
break;
default:
log_warn(knet_h, KNET_SUB_RX, "Receiving unknown host info message from host %u", src_host->host_id);
break;
}
}
break;
case KNET_HEADER_TYPE_PING:
outlen = KNET_HEADER_PING_SIZE;
inbuf->kh_type = KNET_HEADER_TYPE_PONG;
inbuf->kh_node = htons(knet_h->host_id);
recv_seq_num = ntohs(inbuf->khp_ping_seq_num);
src_link->status.stats.rx_ping_packets++;
src_link->status.stats.rx_ping_bytes += len;
wipe_bufs = 0;
if (!inbuf->khp_ping_timed) {
/*
* we might be receiving this message from all links, but we want
* to process it only the first time
*/
if (recv_seq_num != src_host->untimed_rx_seq_num) {
/*
* cache the untimed seq num
*/
src_host->untimed_rx_seq_num = recv_seq_num;
/*
* if the host has received data in between
* untimed ping, then we don't need to wipe the bufs
*/
if (src_host->got_data) {
src_host->got_data = 0;
wipe_bufs = 0;
} else {
wipe_bufs = 1;
}
}
_seq_num_lookup(src_host, recv_seq_num, 0, wipe_bufs);
} else {
/*
* pings always arrives in bursts over all the link
* catch the first of them to cache the seq num and
* avoid duplicate processing
*/
if (recv_seq_num != src_host->timed_rx_seq_num) {
src_host->timed_rx_seq_num = recv_seq_num;
if (recv_seq_num == 0) {
_seq_num_lookup(src_host, recv_seq_num, 0, 1);
}
}
}
if (knet_h->crypto_instance) {
if (crypto_encrypt_and_sign(knet_h,
(const unsigned char *)inbuf,
outlen,
knet_h->recv_from_links_buf_crypt,
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt pong packet");
break;
}
outbuf = knet_h->recv_from_links_buf_crypt;
knet_h->stats_extra.tx_crypt_pong_packets++;
}
retry_pong:
if (transport_get_connection_oriented(knet_h, src_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) {
len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
(struct sockaddr *) &src_link->dst_addr, sizeof(struct sockaddr_storage));
} else {
len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0);
}
savederrno = errno;
if (len != outlen) {
err = transport_tx_sock_error(knet_h, src_link->transport, src_link->outsock, len, savederrno);
switch(err) {
case -1: /* unrecoverable error */
log_debug(knet_h, KNET_SUB_RX,
"Unable to send pong reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
src_link->outsock, errno, strerror(errno),
src_link->status.src_ipaddr, src_link->status.src_port,
src_link->status.dst_ipaddr, src_link->status.dst_port);
src_link->status.stats.tx_pong_errors++;
break;
case 0: /* ignore error and continue */
break;
case 1: /* retry to send those same data */
src_link->status.stats.tx_pong_retries++;
goto retry_pong;
break;
}
}
src_link->status.stats.tx_pong_packets++;
src_link->status.stats.tx_pong_bytes += outlen;
break;
case KNET_HEADER_TYPE_PONG:
src_link->status.stats.rx_pong_packets++;
src_link->status.stats.rx_pong_bytes += len;
clock_gettime(CLOCK_MONOTONIC, &src_link->status.pong_last);
memmove(&recvtime, &inbuf->khp_ping_time[0], sizeof(struct timespec));
timespec_diff(recvtime,
src_link->status.pong_last, &latency_last);
src_link->status.latency =
((src_link->status.latency * src_link->latency_exp) +
((latency_last / 1000llu) *
(src_link->latency_fix - src_link->latency_exp))) /
src_link->latency_fix;
if (src_link->status.latency < src_link->pong_timeout_adj) {
if (!src_link->status.connected) {
if (src_link->received_pong >= src_link->pong_count) {
log_info(knet_h, KNET_SUB_RX, "host: %u link: %u is up",
src_host->host_id, src_link->link_id);
_link_updown(knet_h, src_host->host_id, src_link->link_id, src_link->status.enabled, 1);
} else {
src_link->received_pong++;
log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u received pong: %u",
src_host->host_id, src_link->link_id, src_link->received_pong);
}
}
}
/* Calculate latency stats */
if (src_link->status.latency > src_link->status.stats.latency_max) {
src_link->status.stats.latency_max = src_link->status.latency;
}
if (src_link->status.latency < src_link->status.stats.latency_min) {
src_link->status.stats.latency_min = src_link->status.latency;
}
src_link->status.stats.latency_ave =
(src_link->status.stats.latency_ave * src_link->status.stats.latency_samples +
src_link->status.latency) / (src_link->status.stats.latency_samples+1);
src_link->status.stats.latency_samples++;
break;
case KNET_HEADER_TYPE_PMTUD:
src_link->status.stats.rx_pmtu_packets++;
src_link->status.stats.rx_pmtu_bytes += len;
outlen = KNET_HEADER_PMTUD_SIZE;
inbuf->kh_type = KNET_HEADER_TYPE_PMTUD_REPLY;
inbuf->kh_node = htons(knet_h->host_id);
if (knet_h->crypto_instance) {
if (crypto_encrypt_and_sign(knet_h,
(const unsigned char *)inbuf,
outlen,
knet_h->recv_from_links_buf_crypt,
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt PMTUd reply packet");
break;
}
outbuf = knet_h->recv_from_links_buf_crypt;
knet_h->stats_extra.tx_crypt_pmtu_reply_packets++;
}
savederrno = pthread_mutex_lock(&knet_h->tx_mutex);
if (savederrno) {
log_err(knet_h, KNET_SUB_RX, "Unable to get TX mutex lock: %s", strerror(savederrno));
goto out_pmtud;
}
retry_pmtud:
if (transport_get_connection_oriented(knet_h, src_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) {
len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
(struct sockaddr *) &src_link->dst_addr, sizeof(struct sockaddr_storage));
} else {
len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0);
}
savederrno = errno;
if (len != outlen) {
err = transport_tx_sock_error(knet_h, src_link->transport, src_link->outsock, len, savederrno);
switch(err) {
case -1: /* unrecoverable error */
log_debug(knet_h, KNET_SUB_RX,
"Unable to send PMTUd reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
src_link->outsock, errno, strerror(errno),
src_link->status.src_ipaddr, src_link->status.src_port,
src_link->status.dst_ipaddr, src_link->status.dst_port);
src_link->status.stats.tx_pmtu_errors++;
break;
case 0: /* ignore error and continue */
src_link->status.stats.tx_pmtu_errors++;
break;
case 1: /* retry to send those same data */
src_link->status.stats.tx_pmtu_retries++;
goto retry_pmtud;
break;
}
}
pthread_mutex_unlock(&knet_h->tx_mutex);
out_pmtud:
break;
case KNET_HEADER_TYPE_PMTUD_REPLY:
src_link->status.stats.rx_pmtu_packets++;
src_link->status.stats.rx_pmtu_bytes += len;
if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to get mutex lock");
break;
}
src_link->last_recv_mtu = inbuf->khp_pmtud_size;
pthread_cond_signal(&knet_h->pmtud_cond);
pthread_mutex_unlock(&knet_h->pmtud_mutex);
break;
default:
return;
}
}
static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg)
{
int err, savederrno;
int i, msg_recv, transport;
if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to get global read lock");
return;
}
if (_is_valid_fd(knet_h, sockfd) < 1) {
/*
* this is normal if a fd got an event and before we grab the read lock
* and the link is removed by another thread
*/
goto exit_unlock;
}
transport = knet_h->knet_transport_fd_tracker[sockfd].transport;
/*
* reset msg_namelen to buffer size because after recvmmsg
* each msg_namelen will contain sizeof sockaddr_in or sockaddr_in6
*/
for (i = 0; i < PCKT_RX_BUFS; i++) {
msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
}
msg_recv = _recvmmsg(sockfd, &msg[0], PCKT_RX_BUFS, MSG_DONTWAIT | MSG_NOSIGNAL);
savederrno = errno;
/*
* WARNING: man page for recvmmsg is wrong. Kernel implementation here:
* recvmmsg can return:
* -1 on error
* 0 if the previous run of recvmmsg recorded an error on the socket
* N number of messages (see exception below).
*
* If there is an error from recvmsg after receiving a frame or more, the recvmmsg
* loop is interrupted, error recorded in the socket (getsockopt(SO_ERROR) and
* it will be visibile in the next run.
*
* Need to be careful how we handle errors at this stage.
*
* error messages need to be handled on a per transport/protocol base
* at this point we have different layers of error handling
* - msg_recv < 0 -> error from this run
* msg_recv = 0 -> error from previous run and error on socket needs to be cleared
* - per-transport message data
* example: msg[i].msg_hdr.msg_flags & MSG_NOTIFICATION or msg_len for SCTP == EOF,
* but for UDP it is perfectly legal to receive a 0 bytes message.. go figure
* - NOTE: on SCTP MSG_NOTIFICATION we get msg_recv == PCKT_FRAG_MAX messages and no
* errno set. That means the error api needs to be able to abort the loop below.
*/
if (msg_recv <= 0) {
transport_rx_sock_error(knet_h, transport, sockfd, msg_recv, savederrno);
goto exit_unlock;
}
for (i = 0; i < msg_recv; i++) {
err = transport_rx_is_data(knet_h, transport, sockfd, &msg[i]);
/*
* TODO: make this section silent once we are confident
* all protocols packet handlers are good
*/
switch(err) {
case -1: /* on error */
log_debug(knet_h, KNET_SUB_RX, "Transport reported error parsing packet");
goto exit_unlock;
break;
case 0: /* packet is not data and we should continue the packet process loop */
log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, continue");
break;
case 1: /* packet is not data and we should STOP the packet process loop */
log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, stop");
goto exit_unlock;
break;
case 2: /* packet is data and should be parsed as such */
/*
* processing incoming packets vs access lists
*/
if ((knet_h->use_access_lists) &&
(transport_get_acl_type(knet_h, transport) == USE_GENERIC_ACL)) {
if (!check_validate(knet_h, sockfd, transport, msg[i].msg_hdr.msg_name)) {
char src_ipaddr[KNET_MAX_HOST_LEN];
char src_port[KNET_MAX_PORT_LEN];
memset(src_ipaddr, 0, KNET_MAX_HOST_LEN);
memset(src_port, 0, KNET_MAX_PORT_LEN);
knet_addrtostr(msg[i].msg_hdr.msg_name, sockaddr_len(msg[i].msg_hdr.msg_name),
src_ipaddr, KNET_MAX_HOST_LEN,
src_port, KNET_MAX_PORT_LEN);
log_debug(knet_h, KNET_SUB_RX, "Packet rejected from %s/%s", src_ipaddr, src_port);
/*
* continue processing the other packets
*/
continue;
}
}
_parse_recv_from_links(knet_h, sockfd, &msg[i]);
break;
}
}
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
}
void *_handle_recv_from_links_thread(void *data)
{
int i, nev;
knet_handle_t knet_h = (knet_handle_t) data;
struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
struct sockaddr_storage address[PCKT_RX_BUFS];
struct knet_mmsghdr msg[PCKT_RX_BUFS];
struct iovec iov_in[PCKT_RX_BUFS];
set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_STARTED);
memset(&msg, 0, sizeof(msg));
for (i = 0; i < PCKT_RX_BUFS; i++) {
iov_in[i].iov_base = (void *)knet_h->recv_from_links_buf[i];
iov_in[i].iov_len = KNET_DATABUFSIZE;
memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr));
msg[i].msg_hdr.msg_name = &address[i];
msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
msg[i].msg_hdr.msg_iov = &iov_in[i];
msg[i].msg_hdr.msg_iovlen = 1;
}
while (!shutdown_in_progress(knet_h)) {
nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, KNET_THREADS_TIMERES / 1000);
/*
* we use timeout to detect if thread is shutting down
*/
if (nev == 0) {
continue;
}
for (i = 0; i < nev; i++) {
_handle_recv_from_links(knet_h, events[i].data.fd, msg);
}
}
set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_STOPPED);
return NULL;
}
diff --git a/libknet/threads_tx.c b/libknet/threads_tx.c
index e987eb13..80969067 100644
--- a/libknet/threads_tx.c
+++ b/libknet/threads_tx.c
@@ -1,749 +1,748 @@
/*
* Copyright (C) 2012-2019 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <math.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <sys/uio.h>
#include <errno.h>
#include "compat.h"
#include "compress.h"
#include "crypto.h"
#include "host.h"
#include "link.h"
#include "logging.h"
#include "transports.h"
#include "transport_common.h"
#include "threads_common.h"
#include "threads_heartbeat.h"
#include "threads_tx.h"
#include "netutils.h"
/*
* SEND
*/
static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_mmsghdr *msg, int msgs_to_send)
{
int link_idx, msg_idx, sent_msgs, prev_sent, progress;
int err = 0, savederrno = 0;
unsigned int i;
struct knet_mmsghdr *cur;
struct knet_link *cur_link;
for (link_idx = 0; link_idx < dst_host->active_link_entries; link_idx++) {
- sent_msgs = 0;
prev_sent = 0;
progress = 1;
cur_link = &dst_host->link[dst_host->active_links[link_idx]];
if (cur_link->transport == KNET_TRANSPORT_LOOPBACK) {
continue;
}
msg_idx = 0;
while (msg_idx < msgs_to_send) {
msg[msg_idx].msg_hdr.msg_name = &cur_link->dst_addr;
/* Cast for Linux/BSD compatibility */
for (i=0; i<(unsigned int)msg[msg_idx].msg_hdr.msg_iovlen; i++) {
cur_link->status.stats.tx_data_bytes += msg[msg_idx].msg_hdr.msg_iov[i].iov_len;
}
cur_link->status.stats.tx_data_packets++;
msg_idx++;
}
retry:
cur = &msg[prev_sent];
sent_msgs = _sendmmsg(dst_host->link[dst_host->active_links[link_idx]].outsock,
transport_get_connection_oriented(knet_h, dst_host->link[dst_host->active_links[link_idx]].transport),
&cur[0], msgs_to_send - prev_sent, MSG_DONTWAIT | MSG_NOSIGNAL);
savederrno = errno;
err = transport_tx_sock_error(knet_h, dst_host->link[dst_host->active_links[link_idx]].transport, dst_host->link[dst_host->active_links[link_idx]].outsock, sent_msgs, savederrno);
switch(err) {
case -1: /* unrecoverable error */
cur_link->status.stats.tx_data_errors++;
goto out_unlock;
break;
case 0: /* ignore error and continue */
break;
case 1: /* retry to send those same data */
cur_link->status.stats.tx_data_retries++;
goto retry;
break;
}
prev_sent = prev_sent + sent_msgs;
if ((sent_msgs >= 0) && (prev_sent < msgs_to_send)) {
if ((sent_msgs) || (progress)) {
if (sent_msgs) {
progress = 1;
} else {
progress = 0;
}
#ifdef DEBUG
log_debug(knet_h, KNET_SUB_TX, "Unable to send all (%d/%d) data packets to host %s (%u) link %s:%s (%u)",
sent_msgs, msg_idx,
dst_host->name, dst_host->host_id,
dst_host->link[dst_host->active_links[link_idx]].status.dst_ipaddr,
dst_host->link[dst_host->active_links[link_idx]].status.dst_port,
dst_host->link[dst_host->active_links[link_idx]].link_id);
#endif
goto retry;
}
if (!progress) {
savederrno = EAGAIN;
err = -1;
goto out_unlock;
}
}
if ((dst_host->link_handler_policy == KNET_LINK_POLICY_RR) &&
(dst_host->active_link_entries > 1)) {
uint8_t cur_link_id = dst_host->active_links[0];
memmove(&dst_host->active_links[0], &dst_host->active_links[1], KNET_MAX_LINK - 1);
dst_host->active_links[dst_host->active_link_entries - 1] = cur_link_id;
break;
}
}
out_unlock:
errno = savederrno;
return err;
}
static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t channel, int is_sync)
{
size_t outlen, frag_len;
struct knet_host *dst_host;
knet_node_id_t dst_host_ids_temp[KNET_MAX_HOST];
size_t dst_host_ids_entries_temp = 0;
knet_node_id_t dst_host_ids[KNET_MAX_HOST];
size_t dst_host_ids_entries = 0;
int bcast = 1;
struct knet_hostinfo *knet_hostinfo;
struct iovec iov_out[PCKT_FRAG_MAX][2];
int iovcnt_out = 2;
uint8_t frag_idx;
unsigned int temp_data_mtu;
size_t host_idx;
int send_mcast = 0;
struct knet_header *inbuf;
int savederrno = 0;
int err = 0;
seq_num_t tx_seq_num;
struct knet_mmsghdr msg[PCKT_FRAG_MAX];
int msgs_to_send, msg_idx;
unsigned int i;
int j;
int send_local = 0;
int data_compressed = 0;
size_t uncrypted_frag_size;
inbuf = knet_h->recv_from_sock_buf;
if ((knet_h->enabled != 1) &&
(inbuf->kh_type != KNET_HEADER_TYPE_HOST_INFO)) { /* data forward is disabled */
log_debug(knet_h, KNET_SUB_TX, "Received data packet but forwarding is disabled");
savederrno = ECANCELED;
err = -1;
goto out_unlock;
}
/*
* move this into a separate function to expand on
* extra switching rules
*/
switch(inbuf->kh_type) {
case KNET_HEADER_TYPE_DATA:
if (knet_h->dst_host_filter_fn) {
bcast = knet_h->dst_host_filter_fn(
knet_h->dst_host_filter_fn_private_data,
(const unsigned char *)inbuf->khp_data_userdata,
inlen,
KNET_NOTIFY_TX,
knet_h->host_id,
knet_h->host_id,
&channel,
dst_host_ids_temp,
&dst_host_ids_entries_temp);
if (bcast < 0) {
log_debug(knet_h, KNET_SUB_TX, "Error from dst_host_filter_fn: %d", bcast);
savederrno = EFAULT;
err = -1;
goto out_unlock;
}
if ((!bcast) && (!dst_host_ids_entries_temp)) {
log_debug(knet_h, KNET_SUB_TX, "Message is unicast but no dst_host_ids_entries");
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
if ((!bcast) &&
(dst_host_ids_entries_temp > KNET_MAX_HOST)) {
log_debug(knet_h, KNET_SUB_TX, "dst_host_filter_fn returned too many destinations");
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
}
/* Send to localhost if appropriate and enabled */
if (knet_h->has_loop_link) {
send_local = 0;
if (bcast) {
send_local = 1;
} else {
for (i=0; i< dst_host_ids_entries_temp; i++) {
if (dst_host_ids_temp[i] == knet_h->host_id) {
send_local = 1;
}
}
}
if (send_local) {
const unsigned char *buf = inbuf->khp_data_userdata;
ssize_t buflen = inlen;
struct knet_link *local_link;
local_link = knet_h->host_index[knet_h->host_id]->link;
local_retry:
err = write(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], buf, buflen);
if (err < 0) {
log_err(knet_h, KNET_SUB_TRANSP_LOOPBACK, "send local failed. error=%s\n", strerror(errno));
local_link->status.stats.tx_data_errors++;
}
if (err > 0 && err < buflen) {
log_debug(knet_h, KNET_SUB_TRANSP_LOOPBACK, "send local incomplete=%d bytes of %zu\n", err, inlen);
local_link->status.stats.tx_data_retries++;
buf += err;
buflen -= err;
usleep(KNET_THREADS_TIMERES / 16);
goto local_retry;
}
if (err == buflen) {
local_link->status.stats.tx_data_packets++;
local_link->status.stats.tx_data_bytes += inlen;
}
}
}
break;
case KNET_HEADER_TYPE_HOST_INFO:
knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata;
if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) {
bcast = 0;
dst_host_ids_temp[0] = knet_hostinfo->khi_dst_node_id;
dst_host_ids_entries_temp = 1;
knet_hostinfo->khi_dst_node_id = htons(knet_hostinfo->khi_dst_node_id);
}
break;
default:
log_warn(knet_h, KNET_SUB_TX, "Receiving unknown messages from socket");
savederrno = ENOMSG;
err = -1;
goto out_unlock;
break;
}
if (is_sync) {
if ((bcast) ||
((!bcast) && (dst_host_ids_entries_temp > 1))) {
log_debug(knet_h, KNET_SUB_TX, "knet_send_sync is only supported with unicast packets for one destination");
savederrno = E2BIG;
err = -1;
goto out_unlock;
}
}
/*
* check destinations hosts before spending time
* in fragmenting/encrypting packets to save
* time processing data for unreachable hosts.
* for unicast, also remap the destination data
* to skip unreachable hosts.
*/
if (!bcast) {
dst_host_ids_entries = 0;
for (host_idx = 0; host_idx < dst_host_ids_entries_temp; host_idx++) {
dst_host = knet_h->host_index[dst_host_ids_temp[host_idx]];
if (!dst_host) {
continue;
}
if (!(dst_host->host_id == knet_h->host_id &&
knet_h->has_loop_link) &&
dst_host->status.reachable) {
dst_host_ids[dst_host_ids_entries] = dst_host_ids_temp[host_idx];
dst_host_ids_entries++;
}
}
if (!dst_host_ids_entries) {
savederrno = EHOSTDOWN;
err = -1;
goto out_unlock;
}
} else {
send_mcast = 0;
for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
if (!(dst_host->host_id == knet_h->host_id &&
knet_h->has_loop_link) &&
dst_host->status.reachable) {
send_mcast = 1;
break;
}
}
if (!send_mcast) {
savederrno = EHOSTDOWN;
err = -1;
goto out_unlock;
}
}
if (!knet_h->data_mtu) {
/*
* using MIN_MTU_V4 for data mtu is not completely accurate but safe enough
*/
log_debug(knet_h, KNET_SUB_TX,
"Received data packet but data MTU is still unknown."
" Packet might not be delivered."
" Assuming minimum IPv4 MTU (%d)",
KNET_PMTUD_MIN_MTU_V4);
temp_data_mtu = KNET_PMTUD_MIN_MTU_V4;
} else {
/*
* take a copy of the mtu to avoid value changing under
* our feet while we are sending a fragmented pckt
*/
temp_data_mtu = knet_h->data_mtu;
}
/*
* compress data
*/
if ((knet_h->compress_model > 0) && (inlen > knet_h->compress_threshold)) {
size_t cmp_outlen = KNET_DATABUFSIZE_COMPRESS;
struct timespec start_time;
struct timespec end_time;
uint64_t compress_time;
clock_gettime(CLOCK_MONOTONIC, &start_time);
err = compress(knet_h,
(const unsigned char *)inbuf->khp_data_userdata, inlen,
knet_h->send_to_links_buf_compress, (ssize_t *)&cmp_outlen);
if (err < 0) {
log_warn(knet_h, KNET_SUB_COMPRESS, "Compression failed (%d): %s", err, strerror(errno));
} else {
/* Collect stats */
clock_gettime(CLOCK_MONOTONIC, &end_time);
timespec_diff(start_time, end_time, &compress_time);
if (compress_time < knet_h->stats.tx_compress_time_min) {
knet_h->stats.tx_compress_time_min = compress_time;
}
if (compress_time > knet_h->stats.tx_compress_time_max) {
knet_h->stats.tx_compress_time_max = compress_time;
}
knet_h->stats.tx_compress_time_ave =
(unsigned long long)(knet_h->stats.tx_compress_time_ave * knet_h->stats.tx_compressed_packets +
compress_time) / (knet_h->stats.tx_compressed_packets+1);
knet_h->stats.tx_compressed_packets++;
knet_h->stats.tx_compressed_original_bytes += inlen;
knet_h->stats.tx_compressed_size_bytes += cmp_outlen;
if (cmp_outlen < inlen) {
memmove(inbuf->khp_data_userdata, knet_h->send_to_links_buf_compress, cmp_outlen);
inlen = cmp_outlen;
data_compressed = 1;
}
}
}
if (knet_h->compress_model > 0 && !data_compressed) {
knet_h->stats.tx_uncompressed_packets++;
}
/*
* prepare the outgoing buffers
*/
frag_len = inlen;
frag_idx = 0;
inbuf->khp_data_bcast = bcast;
inbuf->khp_data_frag_num = ceil((float)inlen / temp_data_mtu);
inbuf->khp_data_channel = channel;
if (data_compressed) {
inbuf->khp_data_compress = knet_h->compress_model;
} else {
inbuf->khp_data_compress = 0;
}
if (pthread_mutex_lock(&knet_h->tx_seq_num_mutex)) {
log_debug(knet_h, KNET_SUB_TX, "Unable to get seq mutex lock");
goto out_unlock;
}
knet_h->tx_seq_num++;
/*
* force seq_num 0 to detect a node that has crashed and rejoining
* the knet instance. seq_num 0 will clear the buffers in the RX
* thread
*/
if (knet_h->tx_seq_num == 0) {
knet_h->tx_seq_num++;
}
/*
* cache the value in locked context
*/
tx_seq_num = knet_h->tx_seq_num;
inbuf->khp_data_seq_num = htons(knet_h->tx_seq_num);
pthread_mutex_unlock(&knet_h->tx_seq_num_mutex);
/*
* forcefully broadcast a ping to all nodes every SEQ_MAX / 8
* pckts.
* this solves 2 problems:
* 1) on TX socket overloads we generate extra pings to keep links alive
* 2) in 3+ nodes setup, where all the traffic is flowing between node 1 and 2,
* node 3+ will be able to keep in sync on the TX seq_num even without
* receiving traffic or pings in betweens. This avoids issues with
* rollover of the circular buffer
*/
if (tx_seq_num % (SEQ_MAX / 8) == 0) {
_send_pings(knet_h, 0);
}
if (inbuf->khp_data_frag_num > 1) {
while (frag_idx < inbuf->khp_data_frag_num) {
/*
* set the iov_base
*/
iov_out[frag_idx][0].iov_base = (void *)knet_h->send_to_links_buf[frag_idx];
iov_out[frag_idx][0].iov_len = KNET_HEADER_DATA_SIZE;
iov_out[frag_idx][1].iov_base = inbuf->khp_data_userdata + (temp_data_mtu * frag_idx);
/*
* set the len
*/
if (frag_len > temp_data_mtu) {
iov_out[frag_idx][1].iov_len = temp_data_mtu;
} else {
iov_out[frag_idx][1].iov_len = frag_len;
}
/*
* copy the frag info on all buffers
*/
knet_h->send_to_links_buf[frag_idx]->kh_type = inbuf->kh_type;
knet_h->send_to_links_buf[frag_idx]->khp_data_seq_num = inbuf->khp_data_seq_num;
knet_h->send_to_links_buf[frag_idx]->khp_data_frag_num = inbuf->khp_data_frag_num;
knet_h->send_to_links_buf[frag_idx]->khp_data_bcast = inbuf->khp_data_bcast;
knet_h->send_to_links_buf[frag_idx]->khp_data_channel = inbuf->khp_data_channel;
knet_h->send_to_links_buf[frag_idx]->khp_data_compress = inbuf->khp_data_compress;
frag_len = frag_len - temp_data_mtu;
frag_idx++;
}
iovcnt_out = 2;
} else {
iov_out[frag_idx][0].iov_base = (void *)inbuf;
iov_out[frag_idx][0].iov_len = frag_len + KNET_HEADER_DATA_SIZE;
iovcnt_out = 1;
}
if (knet_h->crypto_instance) {
struct timespec start_time;
struct timespec end_time;
uint64_t crypt_time;
frag_idx = 0;
while (frag_idx < inbuf->khp_data_frag_num) {
clock_gettime(CLOCK_MONOTONIC, &start_time);
if (crypto_encrypt_and_signv(
knet_h,
iov_out[frag_idx], iovcnt_out,
knet_h->send_to_links_buf_crypt[frag_idx],
(ssize_t *)&outlen) < 0) {
log_debug(knet_h, KNET_SUB_TX, "Unable to encrypt packet");
savederrno = ECHILD;
err = -1;
goto out_unlock;
}
clock_gettime(CLOCK_MONOTONIC, &end_time);
timespec_diff(start_time, end_time, &crypt_time);
if (crypt_time < knet_h->stats.tx_crypt_time_min) {
knet_h->stats.tx_crypt_time_min = crypt_time;
}
if (crypt_time > knet_h->stats.tx_crypt_time_max) {
knet_h->stats.tx_crypt_time_max = crypt_time;
}
knet_h->stats.tx_crypt_time_ave =
(knet_h->stats.tx_crypt_time_ave * knet_h->stats.tx_crypt_packets +
crypt_time) / (knet_h->stats.tx_crypt_packets+1);
uncrypted_frag_size = 0;
for (j=0; j < iovcnt_out; j++) {
uncrypted_frag_size += iov_out[frag_idx][j].iov_len;
}
knet_h->stats.tx_crypt_byte_overhead += (outlen - uncrypted_frag_size);
knet_h->stats.tx_crypt_packets++;
iov_out[frag_idx][0].iov_base = knet_h->send_to_links_buf_crypt[frag_idx];
iov_out[frag_idx][0].iov_len = outlen;
frag_idx++;
}
iovcnt_out = 1;
}
memset(&msg, 0, sizeof(msg));
msgs_to_send = inbuf->khp_data_frag_num;
msg_idx = 0;
while (msg_idx < msgs_to_send) {
msg[msg_idx].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
msg[msg_idx].msg_hdr.msg_iov = &iov_out[msg_idx][0];
msg[msg_idx].msg_hdr.msg_iovlen = iovcnt_out;
msg_idx++;
}
if (!bcast) {
for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
dst_host = knet_h->host_index[dst_host_ids[host_idx]];
err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send);
savederrno = errno;
if (err) {
goto out_unlock;
}
}
} else {
for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
if (dst_host->status.reachable) {
err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send);
savederrno = errno;
if (err) {
goto out_unlock;
}
}
}
}
out_unlock:
errno = savederrno;
return err;
}
int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel)
{
int savederrno = 0, err = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (buff == NULL) {
errno = EINVAL;
return -1;
}
if (buff_len <= 0) {
errno = EINVAL;
return -1;
}
if (buff_len > KNET_MAX_PACKET_SIZE) {
errno = EINVAL;
return -1;
}
if (channel < 0) {
errno = EINVAL;
return -1;
}
if (channel >= KNET_DATAFD_MAX) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_TX, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->sockfd[channel].in_use) {
savederrno = EINVAL;
err = -1;
goto out;
}
savederrno = pthread_mutex_lock(&knet_h->tx_mutex);
if (savederrno) {
log_err(knet_h, KNET_SUB_TX, "Unable to get TX mutex lock: %s",
strerror(savederrno));
err = -1;
goto out;
}
knet_h->recv_from_sock_buf->kh_type = KNET_HEADER_TYPE_DATA;
memmove(knet_h->recv_from_sock_buf->khp_data_userdata, buff, buff_len);
err = _parse_recv_from_sock(knet_h, buff_len, channel, 1);
savederrno = errno;
pthread_mutex_unlock(&knet_h->tx_mutex);
out:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
static void _handle_send_to_links(knet_handle_t knet_h, struct msghdr *msg, int sockfd, int8_t channel, int type)
{
ssize_t inlen = 0;
int savederrno = 0, docallback = 0;
if ((channel >= 0) &&
(channel < KNET_DATAFD_MAX) &&
(!knet_h->sockfd[channel].is_socket)) {
inlen = readv(sockfd, msg->msg_iov, 1);
} else {
inlen = recvmsg(sockfd, msg, MSG_DONTWAIT | MSG_NOSIGNAL);
}
if (inlen == 0) {
savederrno = 0;
docallback = 1;
} else if (inlen < 0) {
struct epoll_event ev;
savederrno = errno;
docallback = 1;
memset(&ev, 0, sizeof(struct epoll_event));
if (epoll_ctl(knet_h->send_to_links_epollfd,
EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) {
log_err(knet_h, KNET_SUB_TX, "Unable to del datafd %d from linkfd epoll pool: %s",
knet_h->sockfd[channel].sockfd[0], strerror(savederrno));
} else {
knet_h->sockfd[channel].has_error = 1;
}
} else {
knet_h->recv_from_sock_buf->kh_type = type;
_parse_recv_from_sock(knet_h, inlen, channel, 0);
}
if (docallback) {
knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
knet_h->sockfd[channel].sockfd[0],
channel,
KNET_NOTIFY_TX,
inlen,
savederrno);
}
}
void *_handle_send_to_links_thread(void *data)
{
knet_handle_t knet_h = (knet_handle_t) data;
struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
int i, nev, type;
int8_t channel;
struct iovec iov_in;
struct msghdr msg;
struct sockaddr_storage address;
set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_STARTED);
memset(&iov_in, 0, sizeof(iov_in));
iov_in.iov_base = (void *)knet_h->recv_from_sock_buf->khp_data_userdata;
iov_in.iov_len = KNET_MAX_PACKET_SIZE;
memset(&msg, 0, sizeof(struct msghdr));
msg.msg_name = &address;
msg.msg_namelen = sizeof(struct sockaddr_storage);
msg.msg_iov = &iov_in;
msg.msg_iovlen = 1;
knet_h->recv_from_sock_buf->kh_version = KNET_HEADER_VERSION;
knet_h->recv_from_sock_buf->khp_data_frag_seq = 0;
knet_h->recv_from_sock_buf->kh_node = htons(knet_h->host_id);
for (i = 0; i < PCKT_FRAG_MAX; i++) {
knet_h->send_to_links_buf[i]->kh_version = KNET_HEADER_VERSION;
knet_h->send_to_links_buf[i]->khp_data_frag_seq = i + 1;
knet_h->send_to_links_buf[i]->kh_node = htons(knet_h->host_id);
}
while (!shutdown_in_progress(knet_h)) {
nev = epoll_wait(knet_h->send_to_links_epollfd, events, KNET_EPOLL_MAX_EVENTS + 1, KNET_THREADS_TIMERES / 1000);
/*
* we use timeout to detect if thread is shutting down
*/
if (nev == 0) {
continue;
}
if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
log_debug(knet_h, KNET_SUB_TX, "Unable to get read lock");
continue;
}
for (i = 0; i < nev; i++) {
if (events[i].data.fd == knet_h->hostsockfd[0]) {
type = KNET_HEADER_TYPE_HOST_INFO;
channel = -1;
} else {
type = KNET_HEADER_TYPE_DATA;
for (channel = 0; channel < KNET_DATAFD_MAX; channel++) {
if ((knet_h->sockfd[channel].in_use) &&
(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created] == events[i].data.fd)) {
break;
}
}
if (channel >= KNET_DATAFD_MAX) {
log_debug(knet_h, KNET_SUB_TX, "No available channels");
continue; /* channel not found */
}
}
if (pthread_mutex_lock(&knet_h->tx_mutex) != 0) {
log_debug(knet_h, KNET_SUB_TX, "Unable to get mutex lock");
continue;
}
_handle_send_to_links(knet_h, &msg, events[i].data.fd, channel, type);
pthread_mutex_unlock(&knet_h->tx_mutex);
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
}
set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_STOPPED);
return NULL;
}
diff --git a/libknet/transport_common.c b/libknet/transport_common.c
index 3c3c439a..fe40ad88 100644
--- a/libknet/transport_common.c
+++ b/libknet/transport_common.c
@@ -1,443 +1,443 @@
/*
* Copyright (C) 2016-2019 Red Hat, Inc. All rights reserved.
*
* Author: Fabio M. Di Nitto <fabbione@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include "libknet.h"
#include "compat.h"
#include "host.h"
#include "link.h"
#include "logging.h"
#include "common.h"
#include "transport_common.h"
/*
* reuse Jan Friesse's compat layer as wrapper to drop usage of sendmmsg
*
* TODO: kill those wrappers once we work on packet delivery guarantees
*/
int _recvmmsg(int sockfd, struct knet_mmsghdr *msgvec, unsigned int vlen, unsigned int flags)
{
int savederrno = 0, err = 0;
unsigned int i;
for (i = 0; i < vlen; i++) {
err = recvmsg(sockfd, &msgvec[i].msg_hdr, flags);
savederrno = errno;
if (err >= 0) {
msgvec[i].msg_len = err;
} else {
if ((i > 0) &&
((errno == EAGAIN) || (errno == EWOULDBLOCK))) {
savederrno = 0;
}
break;
}
}
errno = savederrno;
return ((i > 0) ? (int)i : err);
}
int _sendmmsg(int sockfd, int connection_oriented, struct knet_mmsghdr *msgvec, unsigned int vlen, unsigned int flags)
{
int savederrno = 0, err = 0;
unsigned int i;
for (i = 0; i < vlen; i++) {
if (connection_oriented == TRANSPORT_PROTO_IS_CONNECTION_ORIENTED) {
msgvec[i].msg_hdr.msg_name = NULL;
msgvec[i].msg_hdr.msg_namelen = 0;
}
err = sendmsg(sockfd, &msgvec[i].msg_hdr, flags);
savederrno = errno;
if (err < 0) {
break;
}
}
errno = savederrno;
return ((i > 0) ? (int)i : err);
}
/* Assume neither of these constants can ever be zero */
#ifndef SO_RCVBUFFORCE
#define SO_RCVBUFFORCE 0
#endif
#ifndef SO_SNDBUFFORCE
#define SO_SNDBUFFORCE 0
#endif
static int _configure_sockbuf(knet_handle_t knet_h, int sock, int option, int force, int target)
{
int savederrno = 0;
int new_value;
socklen_t value_len = sizeof new_value;
if (setsockopt(sock, SOL_SOCKET, option, &target, sizeof target) != 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_TRANSPORT,
"Error setting socket buffer via option %d to value %d: %s\n",
option, target, strerror(savederrno));
errno = savederrno;
return -1;
}
if (getsockopt(sock, SOL_SOCKET, option, &new_value, &value_len) != 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_TRANSPORT,
"Error getting socket buffer via option %d: %s\n",
option, strerror(savederrno));
errno = savederrno;
return -1;
}
if (value_len != sizeof new_value) {
log_err(knet_h, KNET_SUB_TRANSPORT,
"Socket option %d returned unexpected size %u\n",
option, value_len);
errno = ERANGE;
return -1;
}
if (target <= new_value) {
return 0;
}
if (!force || !(knet_h->flags & KNET_HANDLE_FLAG_PRIVILEGED)) {
log_err(knet_h, KNET_SUB_TRANSPORT,
"Failed to set socket buffer via option %d to value %d: capped at %d",
option, target, new_value);
if (!(knet_h->flags & KNET_HANDLE_FLAG_PRIVILEGED)) {
log_err(knet_h, KNET_SUB_TRANSPORT,
"Continuing regardless, as the handle is not privileged."
" Expect poor performance!");
return 0;
} else {
errno = ENAMETOOLONG;
return -1;
}
}
if (setsockopt(sock, SOL_SOCKET, force, &target, sizeof target) < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_TRANSPORT,
"Failed to set socket buffer via force option %d: %s",
force, strerror(savederrno));
if (savederrno == EPERM) {
errno = ENAMETOOLONG;
} else {
errno = savederrno;
}
return -1;
}
return 0;
}
int _configure_common_socket(knet_handle_t knet_h, int sock, uint64_t flags, const char *type)
{
int err = 0, savederrno = 0;
int value;
if (_fdset_cloexec(sock)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s CLOEXEC socket opts: %s",
type, strerror(savederrno));
goto exit_error;
}
if (_fdset_nonblock(sock)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s NONBLOCK socket opts: %s",
type, strerror(savederrno));
goto exit_error;
}
if (_configure_sockbuf(knet_h, sock, SO_RCVBUF, SO_RCVBUFFORCE, KNET_RING_RCVBUFF)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s receive buffer: %s",
type, strerror(savederrno));
goto exit_error;
}
if (_configure_sockbuf(knet_h, sock, SO_SNDBUF, SO_SNDBUFFORCE, KNET_RING_RCVBUFF)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s send buffer: %s",
type, strerror(savederrno));
goto exit_error;
}
if (flags & KNET_LINK_FLAG_TRAFFICHIPRIO) {
#ifdef KNET_LINUX
#ifdef SO_PRIORITY
value = 6; /* TC_PRIO_INTERACTIVE */
if (setsockopt(sock, SOL_SOCKET, SO_PRIORITY, &value, sizeof(value)) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s priority: %s",
type, strerror(savederrno));
goto exit_error;
}
log_debug(knet_h, KNET_SUB_TRANSPORT, "TC_PRIO_INTERACTIVE enabled on socket: %i", sock);
#else
log_debug(knet_h, KNET_SUB_TRANSPORT, "TC_PRIO_INTERACTIVE not available in this build/platform");
#endif
#endif
#if defined(IP_TOS) && defined(IPTOS_LOWDELAY)
value = IPTOS_LOWDELAY;
if (setsockopt(sock, IPPROTO_IP, IP_TOS, &value, sizeof(value)) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s priority: %s",
type, strerror(savederrno));
goto exit_error;
}
log_debug(knet_h, KNET_SUB_TRANSPORT, "IPTOS_LOWDELAY enabled on socket: %i", sock);
#else
log_debug(knet_h, KNET_SUB_TRANSPORT, "IPTOS_LOWDELAY not available in this build/platform");
#endif
}
exit_error:
errno = savederrno;
return err;
}
int _configure_transport_socket(knet_handle_t knet_h, int sock, struct sockaddr_storage *address, uint64_t flags, const char *type)
{
int err = 0, savederrno = 0;
int value;
if (_configure_common_socket(knet_h, sock, flags, type) < 0) {
savederrno = errno;
err = -1;
goto exit_error;
}
#ifdef KNET_LINUX
#ifdef IP_FREEBIND
value = 1;
if (setsockopt(sock, SOL_IP, IP_FREEBIND, &value, sizeof(value)) <0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set FREEBIND on %s socket: %s",
type, strerror(savederrno));
goto exit_error;
}
log_debug(knet_h, KNET_SUB_TRANSPORT, "FREEBIND enabled on socket: %i", sock);
#else
log_debug(knet_h, KNET_SUB_TRANSPORT, "FREEBIND not available in this build/platform");
#endif
#endif
#ifdef KNET_BSD
#ifdef IP_BINDANY /* BSD */
value = 1;
if (setsockopt(sock, IPPROTO_IP, IP_BINDANY, &value, sizeof(value)) <0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set BINDANY on %s socket: %s",
type, strerror(savederrno));
goto exit_error;
}
log_debug(knet_h, KNET_SUB_TRANSPORT, "BINDANY enabled on socket: %i", sock);
#else
log_debug(knet_h, KNET_SUB_TRANSPORT, "BINDANY not available in this build/platform");
#endif
#endif
if (address->ss_family == AF_INET6) {
value = 1;
if (setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY,
&value, sizeof(value)) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s IPv6 only: %s",
type, strerror(savederrno));
goto exit_error;
}
#ifdef KNET_LINUX
#ifdef IPV6_MTU_DISCOVER
value = IPV6_PMTUDISC_PROBE;
if (setsockopt(sock, SOL_IPV6, IPV6_MTU_DISCOVER, &value, sizeof(value)) <0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set PMTUDISC on %s socket: %s",
type, strerror(savederrno));
goto exit_error;
}
log_debug(knet_h, KNET_SUB_TRANSPORT, "IPV6_MTU_DISCOVER enabled on socket: %i", sock);
#else
log_debug(knet_h, KNET_SUB_TRANSPORT, "IPV6_MTU_DISCOVER not available in this build/platform");
#endif
#endif
#ifdef IPV6_DONTFRAG
value = 1;
if (setsockopt(sock, IPPROTO_IPV6, IPV6_DONTFRAG, &value, sizeof(value)) <0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set DONTFRAG on %s socket: %s",
type, strerror(savederrno));
goto exit_error;
}
log_debug(knet_h, KNET_SUB_TRANSPORT, "IPV6_DONTFRAG enabled on socket: %i", sock);
#else
log_debug(knet_h, KNET_SUB_TRANSPORT, "IPV6_DONTFRAG not available in this build/platform");
#endif
} else {
#ifdef KNET_LINUX
#ifdef IP_MTU_DISCOVER
value = IP_PMTUDISC_PROBE;
if (setsockopt(sock, SOL_IP, IP_MTU_DISCOVER, &value, sizeof(value)) <0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set PMTUDISC on %s socket: %s",
type, strerror(savederrno));
goto exit_error;
}
log_debug(knet_h, KNET_SUB_TRANSPORT, "PMTUDISC enabled on socket: %i", sock);
#else
log_debug(knet_h, KNET_SUB_TRANSPORT, "PMTUDISC not available in this build/platform");
#endif
#endif
#ifdef KNET_BSD
#ifdef IP_DONTFRAG
value = 1;
if (setsockopt(sock, IPPROTO_IP, IP_DONTFRAG, &value, sizeof(value)) <0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set DONTFRAG on %s socket: %s",
type, strerror(savederrno));
goto exit_error;
}
log_debug(knet_h, KNET_SUB_TRANSPORT, "DONTFRAG enabled on socket: %i", sock);
#else
log_debug(knet_h, KNET_SUB_TRANSPORT, "DONTFRAG not available in this build/platform");
#endif
#endif
}
value = 1;
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value)) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s reuseaddr: %s",
type, strerror(savederrno));
goto exit_error;
}
exit_error:
errno = savederrno;
return err;
}
int _init_socketpair(knet_handle_t knet_h, int *sock)
{
int err = 0, savederrno = 0;
int i;
if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, sock) != 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize socketpair: %s",
strerror(savederrno));
goto exit_fail;
}
for (i = 0; i < 2; i++) {
if (_configure_common_socket(knet_h, sock[i], 0, "local socketpair") < 0) {
savederrno = errno;
err = -1;
goto exit_fail;
}
}
exit_fail:
errno = savederrno;
return err;
}
void _close_socketpair(knet_handle_t knet_h, int *sock)
{
int i;
for (i = 0; i < 2; i++) {
if (sock[i]) {
close(sock[i]);
sock[i] = 0;
}
}
}
/*
* must be called with global read lock
*
* return -1 on error
* return 0 if fd is invalid
* return 1 if fd is valid
*/
int _is_valid_fd(knet_handle_t knet_h, int sockfd)
{
int ret = 0;
if (sockfd < 0) {
errno = EINVAL;
return -1;
}
- if (sockfd > KNET_MAX_FDS) {
+ if (sockfd >= KNET_MAX_FDS) {
errno = EINVAL;
return -1;
}
if (knet_h->knet_transport_fd_tracker[sockfd].transport >= KNET_MAX_TRANSPORTS) {
ret = 0;
} else {
ret = 1;
}
return ret;
}
/*
* must be called with global write lock
*/
int _set_fd_tracker(knet_handle_t knet_h, int sockfd, uint8_t transport, uint8_t data_type, void *data)
{
if (sockfd < 0) {
errno = EINVAL;
return -1;
}
- if (sockfd > KNET_MAX_FDS) {
+ if (sockfd >= KNET_MAX_FDS) {
errno = EINVAL;
return -1;
}
knet_h->knet_transport_fd_tracker[sockfd].transport = transport;
knet_h->knet_transport_fd_tracker[sockfd].data_type = data_type;
knet_h->knet_transport_fd_tracker[sockfd].data = data;
return 0;
}
diff --git a/libknet/transport_sctp.c b/libknet/transport_sctp.c
index bdfc98d1..2c1cdcc9 100644
--- a/libknet/transport_sctp.c
+++ b/libknet/transport_sctp.c
@@ -1,1547 +1,1547 @@
/*
* Copyright (C) 2016-2019 Red Hat, Inc. All rights reserved.
*
* Author: Christine Caulfield <ccaulfie@redhat.com>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <stdlib.h>
#include "compat.h"
#include "host.h"
#include "links.h"
#include "links_acl.h"
#include "links_acl_ip.h"
#include "logging.h"
#include "common.h"
#include "transport_common.h"
#include "threads_common.h"
#ifdef HAVE_NETINET_SCTP_H
#include <netinet/sctp.h>
#include "transport_sctp.h"
typedef struct sctp_handle_info {
struct knet_list_head listen_links_list;
struct knet_list_head connect_links_list;
int connect_epollfd;
int connectsockfd[2];
int listen_epollfd;
int listensockfd[2];
pthread_t connect_thread;
pthread_t listen_thread;
socklen_t event_subscribe_kernel_size;
char *event_subscribe_buffer;
} sctp_handle_info_t;
/*
* use by fd_tracker data type
*/
#define SCTP_NO_LINK_INFO 0
#define SCTP_LISTENER_LINK_INFO 1
#define SCTP_ACCEPTED_LINK_INFO 2
#define SCTP_CONNECT_LINK_INFO 3
/*
* this value is per listener
*/
#define MAX_ACCEPTED_SOCKS 256
typedef struct sctp_listen_link_info {
struct knet_list_head list;
int listen_sock;
int accepted_socks[MAX_ACCEPTED_SOCKS];
struct sockaddr_storage src_address;
int on_listener_epoll;
int on_rx_epoll;
} sctp_listen_link_info_t;
typedef struct sctp_accepted_link_info {
char mread_buf[KNET_DATABUFSIZE];
ssize_t mread_len;
sctp_listen_link_info_t *link_info;
} sctp_accepted_link_info_t ;
typedef struct sctp_connect_link_info {
struct knet_list_head list;
sctp_listen_link_info_t *listener;
struct knet_link *link;
struct sockaddr_storage dst_address;
int connect_sock;
int on_connected_epoll;
int on_rx_epoll;
int close_sock;
} sctp_connect_link_info_t;
/*
* socket handling functions
*
* those functions do NOT perform locking. locking
* should be handled in the right context from callers
*/
/*
* sockets are removed from rx_epoll from callers
* see also error handling functions
*/
static int _close_connect_socket(knet_handle_t knet_h, struct knet_link *kn_link)
{
int err = 0, savederrno = 0;
sctp_connect_link_info_t *info = kn_link->transport_link;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
struct epoll_event ev;
if (info->on_connected_epoll) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLOUT;
ev.data.fd = info->connect_sock;
if (epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_DEL, info->connect_sock, &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove connected socket from the epoll pool: %s",
strerror(errno));
goto exit_error;
}
info->on_connected_epoll = 0;
}
exit_error:
if (info->connect_sock != -1) {
if (_set_fd_tracker(knet_h, info->connect_sock, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s",
strerror(savederrno));
goto exit_error;
}
close(info->connect_sock);
info->connect_sock = -1;
}
errno = savederrno;
return err;
}
static int _enable_sctp_notifications(knet_handle_t knet_h, int sock, const char *type)
{
int err = 0, savederrno = 0;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
if (setsockopt(sock, IPPROTO_SCTP, SCTP_EVENTS,
handle_info->event_subscribe_buffer,
handle_info->event_subscribe_kernel_size) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to enable %s events: %s",
type, strerror(savederrno));
}
errno = savederrno;
return err;
}
static int _configure_sctp_socket(knet_handle_t knet_h, int sock, struct sockaddr_storage *address, uint64_t flags, const char *type)
{
int err = 0, savederrno = 0;
int value;
int level;
#ifdef SOL_SCTP
level = SOL_SCTP;
#else
level = IPPROTO_SCTP;
#endif
if (_configure_transport_socket(knet_h, sock, address, flags, type) < 0) {
savederrno = errno;
err = -1;
goto exit_error;
}
value = 1;
if (setsockopt(sock, level, SCTP_NODELAY, &value, sizeof(value)) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set sctp nodelay: %s",
strerror(savederrno));
goto exit_error;
}
if (_enable_sctp_notifications(knet_h, sock, type) < 0) {
savederrno = errno;
err = -1;
}
exit_error:
errno = savederrno;
return err;
}
static int _reconnect_socket(knet_handle_t knet_h, struct knet_link *kn_link)
{
int err = 0, savederrno = 0;
sctp_connect_link_info_t *info = kn_link->transport_link;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
struct epoll_event ev;
if (connect(info->connect_sock, (struct sockaddr *)&kn_link->dst_addr, sockaddr_len(&kn_link->dst_addr)) < 0) {
if ((errno != EALREADY) && (errno != EINPROGRESS) && (errno != EISCONN)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to connect SCTP socket %d: %s",
info->connect_sock, strerror(savederrno));
goto exit_error;
}
}
if (!info->on_connected_epoll) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLOUT;
ev.data.fd = info->connect_sock;
if (epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_ADD, info->connect_sock, &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add send/recv to epoll pool: %s",
strerror(savederrno));
goto exit_error;
}
info->on_connected_epoll = 1;
}
exit_error:
errno = savederrno;
return err;
}
static int _create_connect_socket(knet_handle_t knet_h, struct knet_link *kn_link)
{
int err = 0, savederrno = 0;
sctp_connect_link_info_t *info = kn_link->transport_link;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
struct epoll_event ev;
int connect_sock;
connect_sock = socket(kn_link->dst_addr.ss_family, SOCK_STREAM, IPPROTO_SCTP);
if (connect_sock < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create send/recv socket: %s",
strerror(savederrno));
goto exit_error;
}
if (_configure_sctp_socket(knet_h, connect_sock, &kn_link->dst_addr, kn_link->flags, "SCTP connect") < 0) {
savederrno = errno;
err = -1;
goto exit_error;
}
if (_set_fd_tracker(knet_h, connect_sock, KNET_TRANSPORT_SCTP, SCTP_CONNECT_LINK_INFO, info) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s",
strerror(savederrno));
goto exit_error;
}
info->connect_sock = connect_sock;
info->close_sock = 0;
if (_reconnect_socket(knet_h, kn_link) < 0) {
savederrno = errno;
err = -1;
goto exit_error;
}
exit_error:
if (err) {
if (info->on_connected_epoll) {
epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_DEL, connect_sock, &ev);
}
if (connect_sock >= 0) {
close(connect_sock);
}
}
errno = savederrno;
return err;
}
int sctp_transport_tx_sock_error(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno)
{
sctp_connect_link_info_t *connect_info = knet_h->knet_transport_fd_tracker[sockfd].data;
sctp_accepted_link_info_t *accepted_info = knet_h->knet_transport_fd_tracker[sockfd].data;
sctp_listen_link_info_t *listen_info;
if (recv_err < 0) {
switch (knet_h->knet_transport_fd_tracker[sockfd].data_type) {
case SCTP_CONNECT_LINK_INFO:
if (connect_info->link->transport_connected == 0) {
return -1;
}
break;
case SCTP_ACCEPTED_LINK_INFO:
listen_info = accepted_info->link_info;
if (listen_info->listen_sock != sockfd) {
if (listen_info->on_rx_epoll == 0) {
return -1;
}
}
break;
}
if (recv_errno == EAGAIN) {
#ifdef DEBUG
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Sock: %d is overloaded. Slowing TX down", sockfd);
#endif
/* Don't hold onto the lock while sleeping */
pthread_rwlock_unlock(&knet_h->global_rwlock);
usleep(KNET_THREADS_TIMERES / 16);
pthread_rwlock_rdlock(&knet_h->global_rwlock);
return 1;
}
return -1;
}
return 0;
}
/*
* socket error management functions
*
* both called with global read lock.
*
* NOTE: we need to remove the fd from the epoll as soon as possible
* even before we notify the respective thread to take care of it
* because scheduling can make it so that this thread will overload
* and the threads supposed to take care of the error will never
* be able to take action.
* we CANNOT handle FDs here diretly (close/reconnect/etc) due
* to locking context. We need to delegate that to their respective
* management threads within global write lock.
*
* this function is called from:
* - RX thread with recv_err <= 0 directly on recvmmsg error
* - transport_rx_is_data when msg_len == 0 (recv_err = 1)
* - transport_rx_is_data on notification (recv_err = 2)
*
* basically this small abouse of recv_err is to detect notifications
* generated by sockets created by listen().
*/
int sctp_transport_rx_sock_error(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno)
{
struct epoll_event ev;
sctp_connect_link_info_t *connect_info = knet_h->knet_transport_fd_tracker[sockfd].data;
sctp_accepted_link_info_t *accepted_info = knet_h->knet_transport_fd_tracker[sockfd].data;
sctp_listen_link_info_t *listen_info;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
switch (knet_h->knet_transport_fd_tracker[sockfd].data_type) {
case SCTP_CONNECT_LINK_INFO:
/*
* all connect link have notifications enabled
* and we accept only data from notification and
* generic recvmmsg errors.
*
* Errors generated by msg_len 0 can be ignored because
* they follow a notification (double notification)
*/
if (recv_err != 1) {
connect_info->link->transport_connected = 0;
if (connect_info->on_rx_epoll) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = sockfd;
if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, sockfd, &ev)) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove EOFed socket from epoll pool: %s",
strerror(errno));
return -1;
}
connect_info->on_rx_epoll = 0;
}
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Notifying connect thread that sockfd %d received an error", sockfd);
if (sendto(handle_info->connectsockfd[1], &sockfd, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0) != sizeof(int)) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to notify connect thread: %s", strerror(errno));
}
}
break;
case SCTP_ACCEPTED_LINK_INFO:
listen_info = accepted_info->link_info;
if (listen_info->listen_sock != sockfd) {
if (recv_err != 1) {
if (listen_info->on_rx_epoll) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = sockfd;
if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, sockfd, &ev)) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove EOFed socket from epoll pool: %s",
strerror(errno));
return -1;
}
listen_info->on_rx_epoll = 0;
}
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Notifying listen thread that sockfd %d received an error", sockfd);
if (sendto(handle_info->listensockfd[1], &sockfd, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0) != sizeof(int)) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to notify listen thread: %s", strerror(errno));
}
}
} else {
/*
* this means the listen() socket has generated
* a notification. now what? :-)
*/
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received stray notification for listen() socket %d", sockfd);
}
break;
default:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received unknown notification? %d", sockfd);
break;
}
/*
* Under RX pressure we need to give time to IPC to pick up the message
*/
/* Don't hold onto the lock while sleeping */
pthread_rwlock_unlock(&knet_h->global_rwlock);
usleep(KNET_THREADS_TIMERES / 2);
pthread_rwlock_rdlock(&knet_h->global_rwlock);
return 0;
}
/*
* NOTE: sctp_transport_rx_is_data is called with global rdlock
* delegate any FD error management to sctp_transport_rx_sock_error
* and keep this code to parsing incoming data only
*/
int sctp_transport_rx_is_data(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg)
{
size_t i;
struct iovec *iov = msg->msg_hdr.msg_iov;
size_t iovlen = msg->msg_hdr.msg_iovlen;
struct sctp_assoc_change *sac;
union sctp_notification *snp;
sctp_accepted_link_info_t *info = knet_h->knet_transport_fd_tracker[sockfd].data;
if (!(msg->msg_hdr.msg_flags & MSG_NOTIFICATION)) {
if (msg->msg_len == 0) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "received 0 bytes len packet: %d", sockfd);
/*
* NOTE: with event notification enabled, we receive error twice:
* 1) from the event notification
* 2) followed by a 0 byte msg_len
*
* This is generally not a problem if not for causing extra
* handling for the same issue. Should we drop notifications
* and keep the code generic (handle all errors via msg_len = 0)
* or keep the duplication as safety measure, or drop msg_len = 0
* handling (what about sockets without events enabled?)
*/
sctp_transport_rx_sock_error(knet_h, sockfd, 1, 0);
return 1;
}
/*
* missing MSG_EOR has to be treated as a short read
* from the socket and we need to fill in the mread buf
* while we wait for MSG_EOR
*/
if (!(msg->msg_hdr.msg_flags & MSG_EOR)) {
/*
* copy the incoming data into mread_buf + mread_len (incremental)
* and increase mread_len
*/
memmove(info->mread_buf + info->mread_len, iov->iov_base, msg->msg_len);
info->mread_len = info->mread_len + msg->msg_len;
return 0;
}
/*
* got EOR.
* if mread_len is > 0 we are completing a packet from short reads
* complete reassembling the packet in mread_buf, copy it back in the iov
* and set the iov/msg len numbers (size) correctly
*/
if (info->mread_len) {
/*
* add last fragment to mread_buf
*/
memmove(info->mread_buf + info->mread_len, iov->iov_base, msg->msg_len);
info->mread_len = info->mread_len + msg->msg_len;
/*
* move all back into the iovec
*/
memmove(iov->iov_base, info->mread_buf, info->mread_len);
msg->msg_len = info->mread_len;
info->mread_len = 0;
}
return 2;
}
if (!(msg->msg_hdr.msg_flags & MSG_EOR)) {
return 1;
}
for (i=0; i< iovlen; i++) {
snp = iov[i].iov_base;
switch (snp->sn_header.sn_type) {
case SCTP_ASSOC_CHANGE:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp assoc change");
sac = &snp->sn_assoc_change;
if (sac->sac_state == SCTP_COMM_LOST) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp assoc change: comm_lost");
sctp_transport_rx_sock_error(knet_h, sockfd, 2, 0);
}
break;
case SCTP_SHUTDOWN_EVENT:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp shutdown event");
sctp_transport_rx_sock_error(knet_h, sockfd, 2, 0);
break;
case SCTP_SEND_FAILED:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp send failed");
break;
case SCTP_PEER_ADDR_CHANGE:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp peer addr change");
break;
case SCTP_REMOTE_ERROR:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp remote error");
break;
default:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] unknown sctp event type: %hu\n", snp->sn_header.sn_type);
break;
}
}
return 0;
}
/*
* connect / outgoing socket management thread
*/
/*
* _handle_connected_sctp* are called with a global write lock
* from the connect_thread
*/
static void _handle_connected_sctp(knet_handle_t knet_h, int connect_sock)
{
int err;
struct epoll_event ev;
unsigned int status, len = sizeof(status);
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
sctp_connect_link_info_t *info = knet_h->knet_transport_fd_tracker[connect_sock].data;
struct knet_link *kn_link = info->link;
err = getsockopt(connect_sock, SOL_SOCKET, SO_ERROR, &status, &len);
if (err) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP getsockopt() on connecting socket %d failed: %s",
connect_sock, strerror(errno));
return;
}
if (info->close_sock) {
if (_close_connect_socket(knet_h, kn_link) < 0) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to close sock %d from _handle_connected_sctp: %s", connect_sock, strerror(errno));
return;
}
info->close_sock = 0;
if (_create_connect_socket(knet_h, kn_link) < 0) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to recreate connecting sock! %s", strerror(errno));
return;
}
}
if (status) {
log_info(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP connect on %d to %s port %s failed: %s",
connect_sock, kn_link->status.dst_ipaddr, kn_link->status.dst_port,
strerror(status));
/*
* No need to create a new socket if connect failed,
* just retry connect
*/
_reconnect_socket(knet_h, info->link);
return;
}
/*
* Connected - Remove us from the connect epoll
*/
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLOUT;
ev.data.fd = connect_sock;
if (epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_DEL, connect_sock, &ev)) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove connected socket %d from epoll pool: %s",
connect_sock, strerror(errno));
}
info->on_connected_epoll = 0;
kn_link->transport_connected = 1;
kn_link->outsock = info->connect_sock;
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = connect_sock;
if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_ADD, connect_sock, &ev)) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add connected socket to epoll pool: %s",
strerror(errno));
}
info->on_rx_epoll = 1;
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP handler fd %d now connected to %s port %s",
connect_sock,
kn_link->status.dst_ipaddr, kn_link->status.dst_port);
}
static void _handle_connected_sctp_errors(knet_handle_t knet_h)
{
int sockfd = -1;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
sctp_connect_link_info_t *info;
if (recv(handle_info->connectsockfd[0], &sockfd, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL) != sizeof(int)) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Short read on connectsockfd");
return;
}
if (_is_valid_fd(knet_h, sockfd) < 1) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received stray notification for connected socket fd error");
return;
}
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Processing connected error on socket: %d", sockfd);
info = knet_h->knet_transport_fd_tracker[sockfd].data;
info->close_sock = 1;
info->link->transport_connected = 0;
_reconnect_socket(knet_h, info->link);
}
static void *_sctp_connect_thread(void *data)
{
int savederrno;
int i, nev;
knet_handle_t knet_h = (knet_handle_t) data;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
set_thread_status(knet_h, KNET_THREAD_SCTP_CONN, KNET_THREAD_STARTED);
while (!shutdown_in_progress(knet_h)) {
nev = epoll_wait(handle_info->connect_epollfd, events, KNET_EPOLL_MAX_EVENTS, KNET_THREADS_TIMERES / 1000);
/*
* we use timeout to detect if thread is shutting down
*/
if (nev == 0) {
continue;
}
if (nev < 0) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP connect handler EPOLL ERROR: %s",
strerror(errno));
continue;
}
/*
* Sort out which FD has a connection
*/
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to get write lock: %s",
strerror(savederrno));
continue;
}
/*
* minor optimization: deduplicate events
*
* in some cases we can receive multiple notifcations
* of the same FD having issues or need handling.
* It's enough to process it once even tho it's safe
* to handle them multiple times.
*/
for (i = 0; i < nev; i++) {
if (events[i].data.fd == handle_info->connectsockfd[0]) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received notification from rx_error for connected socket");
_handle_connected_sctp_errors(knet_h);
} else {
if (_is_valid_fd(knet_h, events[i].data.fd) == 1) {
_handle_connected_sctp(knet_h, events[i].data.fd);
} else {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received stray notification for dead fd %d\n", events[i].data.fd);
}
}
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
/*
* this thread can generate events for itself.
* we need to sleep in between loops to allow other threads
* to be scheduled
*/
usleep(knet_h->reconnect_int * 1000);
}
set_thread_status(knet_h, KNET_THREAD_SCTP_CONN, KNET_THREAD_STOPPED);
return NULL;
}
/*
* listen/incoming connections management thread
*/
/*
* Listener received a new connection
* called with a write lock from main thread
*/
static void _handle_incoming_sctp(knet_handle_t knet_h, int listen_sock)
{
int err = 0, savederrno = 0;
int new_fd;
int i = -1;
sctp_listen_link_info_t *info = knet_h->knet_transport_fd_tracker[listen_sock].data;
struct epoll_event ev;
struct sockaddr_storage ss;
socklen_t sock_len = sizeof(ss);
char addr_str[KNET_MAX_HOST_LEN];
char port_str[KNET_MAX_PORT_LEN];
sctp_accepted_link_info_t *accept_info = NULL;
new_fd = accept(listen_sock, (struct sockaddr *)&ss, &sock_len);
if (new_fd < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: accept error: %s", strerror(errno));
goto exit_error;
}
if (knet_addrtostr(&ss, sizeof(ss),
addr_str, KNET_MAX_HOST_LEN,
port_str, KNET_MAX_PORT_LEN) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: unable to gather socket info");
goto exit_error;
}
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: received connection from: %s port: %s",
addr_str, port_str);
if (knet_h->use_access_lists) {
if (!check_validate(knet_h, listen_sock, KNET_TRANSPORT_SCTP, &ss)) {
savederrno = EINVAL;
- err = -1;
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Connection rejected from %s/%s", addr_str, port_str);
close(new_fd);
errno = savederrno;
return;
}
}
/*
* Keep a track of all accepted FDs
*/
for (i=0; i<MAX_ACCEPTED_SOCKS; i++) {
if (info->accepted_socks[i] == -1) {
info->accepted_socks[i] = new_fd;
break;
}
}
if (i == MAX_ACCEPTED_SOCKS) {
errno = EBUSY;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: too many connections!");
goto exit_error;
}
if (_configure_common_socket(knet_h, new_fd, 0, "SCTP incoming") < 0) { /* Inherit flags from listener? */
savederrno = errno;
err = -1;
goto exit_error;
}
if (_enable_sctp_notifications(knet_h, new_fd, "Incoming connection") < 0) {
savederrno = errno;
err = -1;
goto exit_error;
}
accept_info = malloc(sizeof(sctp_accepted_link_info_t));
if (!accept_info) {
savederrno = errno;
err = -1;
goto exit_error;
}
memset(accept_info, 0, sizeof(sctp_accepted_link_info_t));
accept_info->link_info = info;
if (_set_fd_tracker(knet_h, new_fd, KNET_TRANSPORT_SCTP, SCTP_ACCEPTED_LINK_INFO, accept_info) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s",
strerror(errno));
goto exit_error;
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = new_fd;
if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_ADD, new_fd, &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: unable to add accepted socket %d to epoll pool: %s",
new_fd, strerror(errno));
goto exit_error;
}
info->on_rx_epoll = 1;
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: accepted new fd %d for %s/%s (listen fd: %d). index: %d",
new_fd, addr_str, port_str, info->listen_sock, i);
exit_error:
if (err) {
if ((i >= 0) || (i < MAX_ACCEPTED_SOCKS)) {
info->accepted_socks[i] = -1;
}
_set_fd_tracker(knet_h, new_fd, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL);
free(accept_info);
close(new_fd);
}
errno = savederrno;
return;
}
/*
* Listen thread received a notification of a bad socket that needs closing
* called with a write lock from main thread
*/
static void _handle_listen_sctp_errors(knet_handle_t knet_h)
{
int sockfd = -1;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
sctp_accepted_link_info_t *accept_info;
sctp_listen_link_info_t *info;
struct knet_host *host;
int link_idx;
int i;
if (recv(handle_info->listensockfd[0], &sockfd, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL) != sizeof(int)) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Short read on listensockfd");
return;
}
if (_is_valid_fd(knet_h, sockfd) < 1) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received stray notification for listen socket fd error");
return;
}
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Processing listen error on socket: %d", sockfd);
accept_info = knet_h->knet_transport_fd_tracker[sockfd].data;
info = accept_info->link_info;
/*
* clear all links using this accepted socket as
* outbound dynamically connected socket
*/
for (host = knet_h->host_head; host != NULL; host = host->next) {
for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
if ((host->link[link_idx].dynamic == KNET_LINK_DYNIP) &&
(host->link[link_idx].outsock == sockfd)) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Found dynamic connection on host %d link %d (%d)",
host->host_id, link_idx, sockfd);
host->link[link_idx].status.dynconnected = 0;
host->link[link_idx].transport_connected = 0;
host->link[link_idx].outsock = 0;
memset(&host->link[link_idx].dst_addr, 0, sizeof(struct sockaddr_storage));
}
}
}
for (i=0; i<MAX_ACCEPTED_SOCKS; i++) {
if (sockfd == info->accepted_socks[i]) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Closing accepted socket %d", sockfd);
_set_fd_tracker(knet_h, sockfd, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL);
info->accepted_socks[i] = -1;
free(accept_info);
close(sockfd);
+ break; /* Keeps covscan happy */
}
}
}
static void *_sctp_listen_thread(void *data)
{
int savederrno;
int i, nev;
knet_handle_t knet_h = (knet_handle_t) data;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
set_thread_status(knet_h, KNET_THREAD_SCTP_LISTEN, KNET_THREAD_STARTED);
while (!shutdown_in_progress(knet_h)) {
nev = epoll_wait(handle_info->listen_epollfd, events, KNET_EPOLL_MAX_EVENTS, KNET_THREADS_TIMERES / 1000);
/*
* we use timeout to detect if thread is shutting down
*/
if (nev == 0) {
continue;
}
if (nev < 0) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP listen handler EPOLL ERROR: %s",
strerror(errno));
continue;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to get write lock: %s",
strerror(savederrno));
continue;
}
/*
* Sort out which FD has an incoming connection
*/
for (i = 0; i < nev; i++) {
if (events[i].data.fd == handle_info->listensockfd[0]) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received notification from rx_error for listener/accepted socket");
_handle_listen_sctp_errors(knet_h);
} else {
if (_is_valid_fd(knet_h, events[i].data.fd) == 1) {
_handle_incoming_sctp(knet_h, events[i].data.fd);
} else {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received listen notification from invalid socket");
}
}
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
}
set_thread_status(knet_h, KNET_THREAD_SCTP_LISTEN, KNET_THREAD_STOPPED);
return NULL;
}
/*
* sctp_link_listener_start/stop are called in global write lock
* context from set_config and clear_config.
*/
static sctp_listen_link_info_t *sctp_link_listener_start(knet_handle_t knet_h, struct knet_link *kn_link)
{
int err = 0, savederrno = 0;
int listen_sock = -1;
struct epoll_event ev;
sctp_listen_link_info_t *info = NULL;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
/*
* Only allocate a new listener if src address is different
*/
knet_list_for_each_entry(info, &handle_info->listen_links_list, list) {
if (memcmp(&info->src_address, &kn_link->src_addr, sizeof(struct sockaddr_storage)) == 0) {
if ((check_add(knet_h, info->listen_sock, KNET_TRANSPORT_SCTP, -1,
&kn_link->dst_addr, &kn_link->dst_addr, CHECK_TYPE_ADDRESS, CHECK_ACCEPT) < 0) && (errno != EEXIST)) {
return NULL;
}
return info;
}
}
info = malloc(sizeof(sctp_listen_link_info_t));
if (!info) {
err = -1;
goto exit_error;
}
memset(info, 0, sizeof(sctp_listen_link_info_t));
memset(info->accepted_socks, -1, sizeof(info->accepted_socks));
memmove(&info->src_address, &kn_link->src_addr, sizeof(struct sockaddr_storage));
listen_sock = socket(kn_link->src_addr.ss_family, SOCK_STREAM, IPPROTO_SCTP);
if (listen_sock < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create listener socket: %s",
strerror(savederrno));
goto exit_error;
}
if (_configure_sctp_socket(knet_h, listen_sock, &kn_link->src_addr, kn_link->flags, "SCTP listener") < 0) {
savederrno = errno;
err = -1;
goto exit_error;
}
if (bind(listen_sock, (struct sockaddr *)&kn_link->src_addr, sockaddr_len(&kn_link->src_addr)) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to bind listener socket: %s",
strerror(savederrno));
goto exit_error;
}
if (listen(listen_sock, 5) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to listen on listener socket: %s",
strerror(savederrno));
goto exit_error;
}
if (_set_fd_tracker(knet_h, listen_sock, KNET_TRANSPORT_SCTP, SCTP_LISTENER_LINK_INFO, info) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s",
strerror(savederrno));
goto exit_error;
}
if ((check_add(knet_h, listen_sock, KNET_TRANSPORT_SCTP, -1,
&kn_link->dst_addr, &kn_link->dst_addr, CHECK_TYPE_ADDRESS, CHECK_ACCEPT) < 0) && (errno != EEXIST)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to configure default access lists: %s",
strerror(savederrno));
goto exit_error;
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = listen_sock;
if (epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_ADD, listen_sock, &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add listener to epoll pool: %s",
strerror(savederrno));
goto exit_error;
}
info->on_listener_epoll = 1;
info->listen_sock = listen_sock;
knet_list_add(&info->list, &handle_info->listen_links_list);
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Listening on fd %d for %s:%s", listen_sock, kn_link->status.src_ipaddr, kn_link->status.src_port);
exit_error:
if (err) {
if (info->on_listener_epoll) {
epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_DEL, listen_sock, &ev);
}
check_rmall(knet_h, listen_sock, KNET_TRANSPORT_SCTP);
if (listen_sock >= 0) {
close(listen_sock);
}
if (info) {
free(info);
info = NULL;
}
}
errno = savederrno;
return info;
}
static int sctp_link_listener_stop(knet_handle_t knet_h, struct knet_link *kn_link)
{
int err = 0, savederrno = 0;
int found = 0, i;
struct knet_host *host;
int link_idx;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
sctp_connect_link_info_t *this_link_info = kn_link->transport_link;
sctp_listen_link_info_t *info = this_link_info->listener;
sctp_connect_link_info_t *link_info;
struct epoll_event ev;
for (host = knet_h->host_head; host != NULL; host = host->next) {
for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
if (&host->link[link_idx] == kn_link)
continue;
link_info = host->link[link_idx].transport_link;
if ((link_info) &&
(link_info->listener == info)) {
found = 1;
break;
}
}
}
if ((check_rm(knet_h, info->listen_sock, KNET_TRANSPORT_SCTP,
&kn_link->dst_addr, &kn_link->dst_addr, CHECK_TYPE_ADDRESS, CHECK_ACCEPT) < 0) && (errno != ENOENT)) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove default access lists for %d", info->listen_sock);
}
if (found) {
this_link_info->listener = NULL;
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP listener socket %d still in use", info->listen_sock);
savederrno = EBUSY;
err = -1;
goto exit_error;
}
if (info->on_listener_epoll) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = info->listen_sock;
if (epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_DEL, info->listen_sock, &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove listener to epoll pool: %s",
strerror(savederrno));
goto exit_error;
}
info->on_listener_epoll = 0;
}
if (_set_fd_tracker(knet_h, info->listen_sock, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s",
strerror(savederrno));
goto exit_error;
}
check_rmall(knet_h, info->listen_sock, KNET_TRANSPORT_SCTP);
close(info->listen_sock);
for (i=0; i< MAX_ACCEPTED_SOCKS; i++) {
if (info->accepted_socks[i] > -1) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = info->accepted_socks[i];
if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, info->accepted_socks[i], &ev)) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove EOFed socket from epoll pool: %s",
strerror(errno));
}
info->on_rx_epoll = 0;
free(knet_h->knet_transport_fd_tracker[info->accepted_socks[i]].data);
close(info->accepted_socks[i]);
if (_set_fd_tracker(knet_h, info->accepted_socks[i], KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s",
strerror(savederrno));
goto exit_error;
}
info->accepted_socks[i] = -1;
}
}
knet_list_del(&info->list);
free(info);
this_link_info->listener = NULL;
exit_error:
errno = savederrno;
return err;
}
/*
* Links config/clear. Both called with global wrlock from link_set_config/clear_config
*/
int sctp_transport_link_set_config(knet_handle_t knet_h, struct knet_link *kn_link)
{
int savederrno = 0, err = 0;
sctp_connect_link_info_t *info;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
info = malloc(sizeof(sctp_connect_link_info_t));
if (!info) {
goto exit_error;
}
memset(info, 0, sizeof(sctp_connect_link_info_t));
kn_link->transport_link = info;
info->link = kn_link;
memmove(&info->dst_address, &kn_link->dst_addr, sizeof(struct sockaddr_storage));
info->on_connected_epoll = 0;
info->connect_sock = -1;
info->listener = sctp_link_listener_start(knet_h, kn_link);
if (!info->listener) {
savederrno = errno;
err = -1;
goto exit_error;
}
if (kn_link->dynamic == KNET_LINK_STATIC) {
if (_create_connect_socket(knet_h, kn_link) < 0) {
savederrno = errno;
err = -1;
goto exit_error;
}
kn_link->outsock = info->connect_sock;
}
knet_list_add(&info->list, &handle_info->connect_links_list);
exit_error:
if (err) {
if (info) {
if (info->connect_sock) {
close(info->connect_sock);
}
if (info->listener) {
sctp_link_listener_stop(knet_h, kn_link);
}
kn_link->transport_link = NULL;
free(info);
}
}
errno = savederrno;
return err;
}
/*
* called with global wrlock
*/
int sctp_transport_link_clear_config(knet_handle_t knet_h, struct knet_link *kn_link)
{
int err = 0, savederrno = 0;
sctp_connect_link_info_t *info;
struct epoll_event ev;
if (!kn_link) {
errno = EINVAL;
return -1;
}
info = kn_link->transport_link;
if (!info) {
errno = EINVAL;
return -1;
}
if ((sctp_link_listener_stop(knet_h, kn_link) <0) && (errno != EBUSY)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove listener trasport: %s",
strerror(savederrno));
goto exit_error;
}
if (info->on_rx_epoll) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = info->connect_sock;
if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, info->connect_sock, &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove connected socket from epoll pool: %s",
strerror(savederrno));
goto exit_error;
}
info->on_rx_epoll = 0;
}
if (_close_connect_socket(knet_h, kn_link) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to close connected socket: %s",
strerror(savederrno));
goto exit_error;
}
knet_list_del(&info->list);
free(info);
kn_link->transport_link = NULL;
exit_error:
errno = savederrno;
return err;
}
/*
* transport_free and transport_init are
* called only from knet_handle_new and knet_handle_free.
* all resources (hosts/links) should have been already freed at this point
* and they are called in a write locked context, hence they
* don't need their own locking.
*/
int sctp_transport_free(knet_handle_t knet_h)
{
sctp_handle_info_t *handle_info;
void *thread_status;
struct epoll_event ev;
if (!knet_h->transports[KNET_TRANSPORT_SCTP]) {
errno = EINVAL;
return -1;
}
handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
/*
* keep it here while we debug list usage and such
*/
if (!knet_list_empty(&handle_info->listen_links_list)) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Internal error. listen links list is not empty");
}
if (!knet_list_empty(&handle_info->connect_links_list)) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Internal error. connect links list is not empty");
}
if (handle_info->listen_thread) {
pthread_cancel(handle_info->listen_thread);
pthread_join(handle_info->listen_thread, &thread_status);
}
if (handle_info->connect_thread) {
pthread_cancel(handle_info->connect_thread);
pthread_join(handle_info->connect_thread, &thread_status);
}
if (handle_info->listensockfd[0] >= 0) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = handle_info->listensockfd[0];
epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_DEL, handle_info->listensockfd[0], &ev);
}
if (handle_info->connectsockfd[0] >= 0) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = handle_info->connectsockfd[0];
epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_DEL, handle_info->connectsockfd[0], &ev);
}
_close_socketpair(knet_h, handle_info->connectsockfd);
_close_socketpair(knet_h, handle_info->listensockfd);
if (handle_info->listen_epollfd >= 0) {
close(handle_info->listen_epollfd);
}
if (handle_info->connect_epollfd >= 0) {
close(handle_info->connect_epollfd);
}
free(handle_info->event_subscribe_buffer);
free(handle_info);
knet_h->transports[KNET_TRANSPORT_SCTP] = NULL;
return 0;
}
static int _sctp_subscribe_init(knet_handle_t knet_h)
{
int test_socket, savederrno;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
char dummy_events[100];
struct sctp_event_subscribe *events;
/* Below we set the first 6 fields of this expanding struct.
* SCTP_EVENTS is deprecated, but SCTP_EVENT is not available
* on Linux; on the other hand, FreeBSD and old Linux does not
* accept small transfers, so we can't simply use this minimum
* everywhere. Thus we query and store the native size. */
const unsigned int subscribe_min = 6;
test_socket = socket(PF_INET, SOCK_STREAM, IPPROTO_SCTP);
if (test_socket < 0) {
if (errno == EPROTONOSUPPORT) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP not supported, skipping initialization");
return 0;
}
savederrno = errno;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create test socket: %s",
strerror(savederrno));
return savederrno;
}
handle_info->event_subscribe_kernel_size = sizeof dummy_events;
if (getsockopt(test_socket, IPPROTO_SCTP, SCTP_EVENTS, &dummy_events,
&handle_info->event_subscribe_kernel_size)) {
close(test_socket);
savederrno = errno;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to query kernel size of struct sctp_event_subscribe: %s",
strerror(savederrno));
return savederrno;
}
close(test_socket);
if (handle_info->event_subscribe_kernel_size < subscribe_min) {
savederrno = ERANGE;
log_err(knet_h, KNET_SUB_TRANSP_SCTP,
"No kernel support for the necessary notifications: struct sctp_event_subscribe is %u bytes, %u needed",
handle_info->event_subscribe_kernel_size, subscribe_min);
return savederrno;
}
events = malloc(handle_info->event_subscribe_kernel_size);
if (!events) {
savederrno = errno;
log_err(knet_h, KNET_SUB_TRANSP_SCTP,
"Failed to allocate event subscribe buffer: %s", strerror(savederrno));
return savederrno;
}
memset(events, 0, handle_info->event_subscribe_kernel_size);
events->sctp_data_io_event = 1;
events->sctp_association_event = 1;
events->sctp_address_event = 1;
events->sctp_send_failure_event = 1;
events->sctp_peer_error_event = 1;
events->sctp_shutdown_event = 1;
handle_info->event_subscribe_buffer = (char *)events;
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Size of struct sctp_event_subscribe is %u in kernel, %zu in user space",
handle_info->event_subscribe_kernel_size, sizeof(struct sctp_event_subscribe));
return 0;
}
int sctp_transport_init(knet_handle_t knet_h)
{
int err = 0, savederrno = 0;
sctp_handle_info_t *handle_info;
struct epoll_event ev;
if (knet_h->transports[KNET_TRANSPORT_SCTP]) {
errno = EEXIST;
return -1;
}
handle_info = malloc(sizeof(sctp_handle_info_t));
if (!handle_info) {
return -1;
}
memset(handle_info, 0,sizeof(sctp_handle_info_t));
knet_h->transports[KNET_TRANSPORT_SCTP] = handle_info;
savederrno = _sctp_subscribe_init(knet_h);
if (savederrno) {
err = -1;
goto exit_fail;
}
knet_list_init(&handle_info->listen_links_list);
knet_list_init(&handle_info->connect_links_list);
handle_info->listen_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS + 1);
if (handle_info->listen_epollfd < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create epoll listen fd: %s",
strerror(savederrno));
goto exit_fail;
}
if (_fdset_cloexec(handle_info->listen_epollfd)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set CLOEXEC on listen_epollfd: %s",
strerror(savederrno));
goto exit_fail;
}
handle_info->connect_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS + 1);
if (handle_info->connect_epollfd < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create epoll connect fd: %s",
strerror(savederrno));
goto exit_fail;
}
if (_fdset_cloexec(handle_info->connect_epollfd)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set CLOEXEC on connect_epollfd: %s",
strerror(savederrno));
goto exit_fail;
}
if (_init_socketpair(knet_h, handle_info->connectsockfd) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to init connect socketpair: %s",
strerror(savederrno));
goto exit_fail;
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = handle_info->connectsockfd[0];
if (epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_ADD, handle_info->connectsockfd[0], &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add connectsockfd[0] to connect epoll pool: %s",
strerror(savederrno));
goto exit_fail;
}
if (_init_socketpair(knet_h, handle_info->listensockfd) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to init listen socketpair: %s",
strerror(savederrno));
goto exit_fail;
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = handle_info->listensockfd[0];
if (epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_ADD, handle_info->listensockfd[0], &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add listensockfd[0] to listen epoll pool: %s",
strerror(savederrno));
goto exit_fail;
}
/*
* Start connect & listener threads
*/
set_thread_status(knet_h, KNET_THREAD_SCTP_LISTEN, KNET_THREAD_REGISTERED);
savederrno = pthread_create(&handle_info->listen_thread, 0, _sctp_listen_thread, (void *) knet_h);
if (savederrno) {
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to start sctp listen thread: %s",
strerror(savederrno));
goto exit_fail;
}
set_thread_status(knet_h, KNET_THREAD_SCTP_CONN, KNET_THREAD_REGISTERED);
savederrno = pthread_create(&handle_info->connect_thread, 0, _sctp_connect_thread, (void *) knet_h);
if (savederrno) {
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to start sctp connect thread: %s",
strerror(savederrno));
goto exit_fail;
}
exit_fail:
if (err < 0) {
sctp_transport_free(knet_h);
}
errno = savederrno;
return err;
}
int sctp_transport_link_dyn_connect(knet_handle_t knet_h, int sockfd, struct knet_link *kn_link)
{
kn_link->outsock = sockfd;
kn_link->status.dynconnected = 1;
kn_link->transport_connected = 1;
return 0;
}
int sctp_transport_link_get_acl_fd(knet_handle_t knet_h, struct knet_link *kn_link)
{
sctp_connect_link_info_t *this_link_info = kn_link->transport_link;
sctp_listen_link_info_t *info = this_link_info->listener;
return info->listen_sock;
}
#endif
diff --git a/libknet/transport_udp.c b/libknet/transport_udp.c
index e243a913..3fd69ee4 100644
--- a/libknet/transport_udp.c
+++ b/libknet/transport_udp.c
@@ -1,445 +1,446 @@
/*
* Copyright (C) 2016-2019 Red Hat, Inc. All rights reserved.
*
* Author: Christine Caulfield <ccaulfie@redhat.com>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <stdlib.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <netinet/ip_icmp.h>
#if defined (IP_RECVERR) || defined (IPV6_RECVERR)
#include <linux/errqueue.h>
#endif
#include "libknet.h"
#include "compat.h"
#include "host.h"
#include "link.h"
#include "logging.h"
#include "common.h"
#include "transport_common.h"
#include "transport_udp.h"
#include "threads_common.h"
typedef struct udp_handle_info {
struct knet_list_head links_list;
} udp_handle_info_t;
typedef struct udp_link_info {
struct knet_list_head list;
struct sockaddr_storage local_address;
int socket_fd;
int on_epoll;
} udp_link_info_t;
int udp_transport_link_set_config(knet_handle_t knet_h, struct knet_link *kn_link)
{
int err = 0, savederrno = 0;
int sock = -1;
struct epoll_event ev;
udp_link_info_t *info;
udp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_UDP];
#if defined (IP_RECVERR) || defined (IPV6_RECVERR)
int value;
#endif
/*
* Only allocate a new link if the local address is different
*/
knet_list_for_each_entry(info, &handle_info->links_list, list) {
if (memcmp(&info->local_address, &kn_link->src_addr, sizeof(struct sockaddr_storage)) == 0) {
log_debug(knet_h, KNET_SUB_TRANSP_UDP, "Re-using existing UDP socket for new link");
kn_link->outsock = info->socket_fd;
kn_link->transport_link = info;
kn_link->transport_connected = 1;
return 0;
}
}
info = malloc(sizeof(udp_link_info_t));
if (!info) {
err = -1;
goto exit_error;
}
+ memset(info, 0, sizeof(udp_link_info_t));
sock = socket(kn_link->src_addr.ss_family, SOCK_DGRAM, 0);
if (sock < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_UDP, "Unable to create listener socket: %s",
strerror(savederrno));
goto exit_error;
}
if (_configure_transport_socket(knet_h, sock, &kn_link->src_addr, kn_link->flags, "UDP") < 0) {
savederrno = errno;
err = -1;
goto exit_error;
}
#ifdef IP_RECVERR
if (kn_link->src_addr.ss_family == AF_INET) {
value = 1;
if (setsockopt(sock, SOL_IP, IP_RECVERR, &value, sizeof(value)) <0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_UDP, "Unable to set RECVERR on socket: %s",
strerror(savederrno));
goto exit_error;
}
log_debug(knet_h, KNET_SUB_TRANSP_UDP, "IP_RECVERR enabled on socket: %i", sock);
}
#else
log_debug(knet_h, KNET_SUB_TRANSP_UDP, "IP_RECVERR not available in this build/platform");
#endif
#ifdef IPV6_RECVERR
if (kn_link->src_addr.ss_family == AF_INET6) {
value = 1;
if (setsockopt(sock, SOL_IPV6, IPV6_RECVERR, &value, sizeof(value)) <0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_UDP, "Unable to set RECVERR on socket: %s",
strerror(savederrno));
goto exit_error;
}
log_debug(knet_h, KNET_SUB_TRANSP_UDP, "IPV6_RECVERR enabled on socket: %i", sock);
}
#else
log_debug(knet_h, KNET_SUB_TRANSP_UDP, "IPV6_RECVERR not available in this build/platform");
#endif
if (bind(sock, (struct sockaddr *)&kn_link->src_addr, sockaddr_len(&kn_link->src_addr))) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_UDP, "Unable to bind listener socket: %s",
strerror(savederrno));
goto exit_error;
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = sock;
if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_ADD, sock, &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_UDP, "Unable to add listener to epoll pool: %s",
strerror(savederrno));
goto exit_error;
}
info->on_epoll = 1;
if (_set_fd_tracker(knet_h, sock, KNET_TRANSPORT_UDP, 0, info) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_UDP, "Unable to set fd tracker: %s",
strerror(savederrno));
goto exit_error;
}
memmove(&info->local_address, &kn_link->src_addr, sizeof(struct sockaddr_storage));
info->socket_fd = sock;
knet_list_add(&info->list, &handle_info->links_list);
kn_link->outsock = sock;
kn_link->transport_link = info;
kn_link->transport_connected = 1;
exit_error:
if (err) {
if (info) {
if (info->on_epoll) {
epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, sock, &ev);
}
free(info);
}
if (sock >= 0) {
close(sock);
}
}
errno = savederrno;
return err;
}
int udp_transport_link_clear_config(knet_handle_t knet_h, struct knet_link *kn_link)
{
int err = 0, savederrno = 0;
int found = 0;
struct knet_host *host;
int link_idx;
udp_link_info_t *info = kn_link->transport_link;
struct epoll_event ev;
for (host = knet_h->host_head; host != NULL; host = host->next) {
for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
if (&host->link[link_idx] == kn_link)
continue;
if (host->link[link_idx].transport_link == info) {
found = 1;
break;
}
}
}
if (found) {
log_debug(knet_h, KNET_SUB_TRANSP_UDP, "UDP socket %d still in use", info->socket_fd);
savederrno = EBUSY;
err = -1;
goto exit_error;
}
if (info->on_epoll) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = info->socket_fd;
if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, info->socket_fd, &ev) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_UDP, "Unable to remove UDP socket from epoll poll: %s",
strerror(errno));
goto exit_error;
}
info->on_epoll = 0;
}
if (_set_fd_tracker(knet_h, info->socket_fd, KNET_MAX_TRANSPORTS, 0, NULL) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_UDP, "Unable to set fd tracker: %s",
strerror(savederrno));
goto exit_error;
}
close(info->socket_fd);
knet_list_del(&info->list);
free(kn_link->transport_link);
exit_error:
errno = savederrno;
return err;
}
int udp_transport_free(knet_handle_t knet_h)
{
udp_handle_info_t *handle_info;
if (!knet_h->transports[KNET_TRANSPORT_UDP]) {
errno = EINVAL;
return -1;
}
handle_info = knet_h->transports[KNET_TRANSPORT_UDP];
/*
* keep it here while we debug list usage and such
*/
if (!knet_list_empty(&handle_info->links_list)) {
log_err(knet_h, KNET_SUB_TRANSP_UDP, "Internal error. handle list is not empty");
return -1;
}
free(handle_info);
knet_h->transports[KNET_TRANSPORT_UDP] = NULL;
return 0;
}
int udp_transport_init(knet_handle_t knet_h)
{
udp_handle_info_t *handle_info;
if (knet_h->transports[KNET_TRANSPORT_UDP]) {
errno = EEXIST;
return -1;
}
handle_info = malloc(sizeof(udp_handle_info_t));
if (!handle_info) {
return -1;
}
memset(handle_info, 0, sizeof(udp_handle_info_t));
knet_h->transports[KNET_TRANSPORT_UDP] = handle_info;
knet_list_init(&handle_info->links_list);
return 0;
}
#if defined (IP_RECVERR) || defined (IPV6_RECVERR)
static int read_errs_from_sock(knet_handle_t knet_h, int sockfd)
{
int err = 0, savederrno = 0;
int got_err = 0;
char buffer[1024];
struct iovec iov;
struct msghdr msg;
struct cmsghdr *cmsg;
struct sock_extended_err *sock_err;
struct icmphdr icmph;
struct sockaddr_storage remote;
struct sockaddr_storage *origin;
char addr_str[KNET_MAX_HOST_LEN];
char port_str[KNET_MAX_PORT_LEN];
char addr_remote_str[KNET_MAX_HOST_LEN];
char port_remote_str[KNET_MAX_PORT_LEN];
iov.iov_base = &icmph;
iov.iov_len = sizeof(icmph);
msg.msg_name = (void*)&remote;
msg.msg_namelen = sizeof(remote);
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
msg.msg_flags = 0;
msg.msg_control = buffer;
msg.msg_controllen = sizeof(buffer);
for (;;) {
err = recvmsg(sockfd, &msg, MSG_ERRQUEUE);
savederrno = errno;
if (err < 0) {
if (!got_err) {
errno = savederrno;
return -1;
} else {
return 0;
}
}
got_err = 1;
for (cmsg = CMSG_FIRSTHDR(&msg);cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
if (((cmsg->cmsg_level == SOL_IP) && (cmsg->cmsg_type == IP_RECVERR)) ||
((cmsg->cmsg_level == SOL_IPV6 && (cmsg->cmsg_type == IPV6_RECVERR)))) {
sock_err = (struct sock_extended_err*)(void *)CMSG_DATA(cmsg);
if (sock_err) {
switch (sock_err->ee_origin) {
case SO_EE_ORIGIN_NONE: /* no origin */
case SO_EE_ORIGIN_LOCAL: /* local source (EMSGSIZE) */
if (sock_err->ee_errno == EMSGSIZE) {
if (pthread_mutex_lock(&knet_h->kmtu_mutex) != 0) {
log_debug(knet_h, KNET_SUB_TRANSP_UDP, "Unable to get mutex lock");
knet_h->kernel_mtu = 0;
break;
} else {
knet_h->kernel_mtu = sock_err->ee_info;
pthread_mutex_unlock(&knet_h->kmtu_mutex);
}
/*
* we can only try to take a lock here. This part of the code
* can be invoked by any thread, including PMTUd that is already
* holding a lock at that stage.
* If PMTUd is holding the lock, most likely it is already running
* and we don't need to notify it back.
*/
if (!pthread_mutex_trylock(&knet_h->pmtud_mutex)) {
if (!knet_h->pmtud_running) {
if (!knet_h->pmtud_forcerun) {
log_debug(knet_h, KNET_SUB_TRANSP_UDP, "Notifying PMTUd to rerun");
knet_h->pmtud_forcerun = 1;
}
}
pthread_mutex_unlock(&knet_h->pmtud_mutex);
}
}
/*
* those errors are way too noisy
*/
break;
case SO_EE_ORIGIN_ICMP: /* ICMP */
case SO_EE_ORIGIN_ICMP6: /* ICMP6 */
origin = (struct sockaddr_storage *)(void *)SO_EE_OFFENDER(sock_err);
- if (knet_addrtostr(origin, sizeof(origin),
+ if (knet_addrtostr(origin, sizeof(*origin),
addr_str, KNET_MAX_HOST_LEN,
port_str, KNET_MAX_PORT_LEN) < 0) {
log_debug(knet_h, KNET_SUB_TRANSP_UDP, "Received ICMP error from unknown source: %s", strerror(sock_err->ee_errno));
} else {
if (knet_addrtostr(&remote, sizeof(remote),
addr_remote_str, KNET_MAX_HOST_LEN,
port_remote_str, KNET_MAX_PORT_LEN) < 0) {
log_debug(knet_h, KNET_SUB_TRANSP_UDP, "Received ICMP error from %s: %s destination unknown", addr_str, strerror(sock_err->ee_errno));
} else {
log_debug(knet_h, KNET_SUB_TRANSP_UDP, "Received ICMP error from %s: %s %s", addr_str, strerror(sock_err->ee_errno), addr_remote_str);
}
}
break;
}
} else {
log_debug(knet_h, KNET_SUB_TRANSP_UDP, "No data in MSG_ERRQUEUE");
}
}
}
}
}
#else
static int read_errs_from_sock(knet_handle_t knet_h, int sockfd)
{
return 0;
}
#endif
int udp_transport_rx_sock_error(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno)
{
if (recv_errno == EAGAIN) {
read_errs_from_sock(knet_h, sockfd);
}
return 0;
}
int udp_transport_tx_sock_error(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno)
{
if (recv_err < 0) {
if (recv_errno == EMSGSIZE) {
read_errs_from_sock(knet_h, sockfd);
return 0;
}
if (recv_errno == EINVAL || recv_errno == EPERM) {
return -1;
}
if ((recv_errno == ENOBUFS) || (recv_errno == EAGAIN)) {
#ifdef DEBUG
log_debug(knet_h, KNET_SUB_TRANSP_UDP, "Sock: %d is overloaded. Slowing TX down", sockfd);
#endif
usleep(KNET_THREADS_TIMERES / 16);
} else {
read_errs_from_sock(knet_h, sockfd);
}
return 1;
}
return 0;
}
int udp_transport_rx_is_data(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg)
{
if (msg->msg_len == 0)
return 0;
return 2;
}
int udp_transport_link_dyn_connect(knet_handle_t knet_h, int sockfd, struct knet_link *kn_link)
{
kn_link->status.dynconnected = 1;
return 0;
}
int udp_transport_link_get_acl_fd(knet_handle_t knet_h, struct knet_link *kn_link)
{
return kn_link->outsock;
}

File Metadata

Mime Type
text/x-diff
Expires
Mon, Feb 24, 3:47 PM (1 h, 42 m ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1464311
Default Alt Text
(224 KB)

Event Timeline