Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/libknet/compress.c b/libknet/compress.c
index c7c0d0e6..8af7beff 100644
--- a/libknet/compress.c
+++ b/libknet/compress.c
@@ -1,481 +1,483 @@
/*
* Copyright (C) 2017-2018 Red Hat, Inc. All rights reserved.
*
* Author: Fabio M. Di Nitto <fabbione@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include <time.h>
#include "internals.h"
#include "compress.h"
#include "compress_model.h"
#include "logging.h"
#include "threads_common.h"
#include "common.h"
/*
* internal module switch data
*/
/*
* DO NOT CHANGE MODEL_ID HERE OR ONWIRE COMPATIBILITY
* WILL BREAK!
*
* Always add new items before the last NULL.
*/
static compress_model_t compress_modules_cmds[] = {
{ "none" , 0, 0, 0, NULL },
{ "zlib" , 1, WITH_COMPRESS_ZLIB , 0, NULL },
{ "lz4" , 2, WITH_COMPRESS_LZ4 , 0, NULL },
{ "lz4hc", 3, WITH_COMPRESS_LZ4 , 0, NULL },
{ "lzo2" , 4, WITH_COMPRESS_LZO2 , 0, NULL },
{ "lzma" , 5, WITH_COMPRESS_LZMA , 0, NULL },
{ "bzip2", 6, WITH_COMPRESS_BZIP2, 0, NULL },
{ NULL, 255, 0, 0, NULL }
};
static int max_model = 0;
static struct timespec last_load_failure;
static int compress_get_model(const char *model)
{
int idx = 0;
while (compress_modules_cmds[idx].model_name != NULL) {
if (!strcmp(compress_modules_cmds[idx].model_name, model)) {
return compress_modules_cmds[idx].model_id;
}
idx++;
}
return -1;
}
static int compress_get_max_model(void)
{
int idx = 0;
while (compress_modules_cmds[idx].model_name != NULL) {
idx++;
}
return idx - 1;
}
static int compress_is_valid_model(int compress_model)
{
int idx = 0;
while (compress_modules_cmds[idx].model_name != NULL) {
if ((compress_model == compress_modules_cmds[idx].model_id) &&
(compress_modules_cmds[idx].built_in == 1)) {
return 0;
}
idx++;
}
return -1;
}
static int val_level(
knet_handle_t knet_h,
int compress_model,
int compress_level)
{
if (compress_modules_cmds[compress_model].ops->val_level != NULL) {
return compress_modules_cmds[compress_model].ops->val_level(knet_h, compress_level);
}
return 0;
}
/*
* compress_check_lib_is_init needs to be invoked in a locked context!
*/
static int compress_check_lib_is_init(knet_handle_t knet_h, int cmp_model)
{
/*
* lack of a .is_init function means that the module does not require
* init per handle so we use a fake reference in the compress_int_data
* to identify that we already increased the libref for this handle
*/
if (compress_modules_cmds[cmp_model].loaded == 1) {
if (compress_modules_cmds[cmp_model].ops->is_init == NULL) {
if (knet_h->compress_int_data[cmp_model] != NULL) {
return 1;
}
} else {
if (compress_modules_cmds[cmp_model].ops->is_init(knet_h, cmp_model) == 1) {
return 1;
}
}
}
return 0;
}
/*
* compress_load_lib should _always_ be invoked in write lock context
*/
static int compress_load_lib(knet_handle_t knet_h, int cmp_model, int rate_limit)
{
struct timespec clock_now;
unsigned long long timediff;
/*
* checking again for paranoia and because
* compress_check_lib_is_init is usually invoked in read context
* and we need to switch from read to write locking in between.
* another thread might have init the library in the meantime
*/
if (compress_check_lib_is_init(knet_h, cmp_model)) {
return 0;
}
/*
* due to the fact that decompress can load libraries
* on demand, depending on the compress model selected
* on other nodes, it is possible for an attacker
* to send crafted packets to attempt to load libraries
* at random in a DoS fashion.
* If there is an error loading a library, then we want
* to rate_limit a retry to reload the library every X
* seconds to avoid a lock DoS that could greatly slow
* down libknet.
*/
if (rate_limit) {
if ((last_load_failure.tv_sec != 0) ||
(last_load_failure.tv_nsec != 0)) {
clock_gettime(CLOCK_MONOTONIC, &clock_now);
timespec_diff(last_load_failure, clock_now, &timediff);
if (timediff < 10000000000) {
errno = EAGAIN;
return -1;
}
}
}
if (compress_modules_cmds[cmp_model].loaded == 0) {
compress_modules_cmds[cmp_model].ops = load_module (knet_h, "compress", compress_modules_cmds[cmp_model].model_name);
if (!compress_modules_cmds[cmp_model].ops) {
clock_gettime(CLOCK_MONOTONIC, &last_load_failure);
return -1;
}
if (compress_modules_cmds[cmp_model].ops->abi_ver != KNET_COMPRESS_MODEL_ABI) {
log_err(knet_h, KNET_SUB_COMPRESS,
"ABI mismatch loading module %s. knet ver: %d, module ver: %d",
compress_modules_cmds[cmp_model].model_name, KNET_COMPRESS_MODEL_ABI,
compress_modules_cmds[cmp_model].ops->abi_ver);
errno = EINVAL;
return -1;
}
compress_modules_cmds[cmp_model].loaded = 1;
}
if (compress_modules_cmds[cmp_model].ops->init != NULL) {
if (compress_modules_cmds[cmp_model].ops->init(knet_h, cmp_model) < 0) {
return -1;
}
} else {
knet_h->compress_int_data[cmp_model] = (void *)&"1";
}
return 0;
}
static int compress_lib_test(knet_handle_t knet_h)
{
int savederrno = 0;
unsigned char src[KNET_DATABUFSIZE];
unsigned char dst[KNET_DATABUFSIZE_COMPRESS];
ssize_t dst_comp_len = KNET_DATABUFSIZE_COMPRESS, dst_decomp_len = KNET_DATABUFSIZE;
unsigned int i;
memset(src, 0, KNET_DATABUFSIZE);
memset(dst, 0, KNET_DATABUFSIZE_COMPRESS);
/*
* NOTE: we cannot use compress and decompress API calls due to locking
* so we need to call directly into the modules
*/
if (compress_modules_cmds[knet_h->compress_model].ops->compress(knet_h, src, KNET_DATABUFSIZE, dst, &dst_comp_len) < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to compress test buffer. Please check your compression settings: %s", strerror(savederrno));
errno = savederrno;
return -1;
}
if (compress_modules_cmds[knet_h->compress_model].ops->decompress(knet_h, dst, dst_comp_len, src, &dst_decomp_len) < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to decompress test buffer. Please check your compression settings: %s", strerror(savederrno));
errno = savederrno;
return -1;
}
for (i = 0; i < KNET_DATABUFSIZE; i++) {
if (src[i] != 0) {
log_err(knet_h, KNET_SUB_COMPRESS, "Decompressed buffer contains incorrect data");
errno = EINVAL;
return -1;
}
}
return 0;
}
int compress_init(
knet_handle_t knet_h)
{
max_model = compress_get_max_model();
if (max_model > KNET_MAX_COMPRESS_METHODS) {
log_err(knet_h, KNET_SUB_COMPRESS, "Too many compress methods defined in compress.c.");
errno = EINVAL;
return -1;
}
memset(&last_load_failure, 0, sizeof(struct timespec));
return 0;
}
int compress_cfg(
knet_handle_t knet_h,
struct knet_handle_compress_cfg *knet_handle_compress_cfg)
{
int savederrno = 0, err = 0;
int cmp_model;
cmp_model = compress_get_model(knet_handle_compress_cfg->compress_model);
if (cmp_model < 0) {
log_err(knet_h, KNET_SUB_COMPRESS, "compress model %s not supported", knet_handle_compress_cfg->compress_model);
errno = EINVAL;
return -1;
}
log_debug(knet_h, KNET_SUB_COMPRESS,
"Initizializing compress module [%s/%d/%u]",
knet_handle_compress_cfg->compress_model, knet_handle_compress_cfg->compress_level, knet_handle_compress_cfg->compress_threshold);
if (cmp_model > 0) {
if (compress_modules_cmds[cmp_model].built_in == 0) {
log_err(knet_h, KNET_SUB_COMPRESS, "compress model %s support has not been built in. Please contact your vendor or fix the build", knet_handle_compress_cfg->compress_model);
errno = EINVAL;
return -1;
}
if (knet_handle_compress_cfg->compress_threshold > KNET_MAX_PACKET_SIZE) {
log_err(knet_h, KNET_SUB_COMPRESS, "compress threshold cannot be higher than KNET_MAX_PACKET_SIZE (%d).",
KNET_MAX_PACKET_SIZE);
errno = EINVAL;
return -1;
}
if (knet_handle_compress_cfg->compress_threshold == 0) {
knet_h->compress_threshold = KNET_COMPRESS_THRESHOLD;
log_debug(knet_h, KNET_SUB_COMPRESS, "resetting compression threshold to default (%d)", KNET_COMPRESS_THRESHOLD);
} else {
knet_h->compress_threshold = knet_handle_compress_cfg->compress_threshold;
}
savederrno = pthread_rwlock_rdlock(&shlib_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!compress_check_lib_is_init(knet_h, cmp_model)) {
/*
* need to switch to write lock, load the lib, and return with a write lock
* this is not racy because compress_load_lib is written idempotent.
*/
pthread_rwlock_unlock(&shlib_rwlock);
savederrno = pthread_rwlock_wrlock(&shlib_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (compress_load_lib(knet_h, cmp_model, 0) < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to load library: %s",
strerror(savederrno));
err = -1;
goto out_unlock;
}
}
if (val_level(knet_h, cmp_model, knet_handle_compress_cfg->compress_level) < 0) {
log_err(knet_h, KNET_SUB_COMPRESS, "compress level %d not supported for model %s",
knet_handle_compress_cfg->compress_level, knet_handle_compress_cfg->compress_model);
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
knet_h->compress_model = cmp_model;
knet_h->compress_level = knet_handle_compress_cfg->compress_level;
if (compress_lib_test(knet_h) < 0) {
savederrno = errno;
err = -1;
goto out_unlock;
}
out_unlock:
pthread_rwlock_unlock(&shlib_rwlock);
}
if (err) {
knet_h->compress_model = 0;
knet_h->compress_level = 0;
}
errno = savederrno;
return err;
}
void compress_fini(
knet_handle_t knet_h,
int all)
{
int savederrno = 0;
int idx = 0;
savederrno = pthread_rwlock_wrlock(&shlib_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to get write lock: %s",
strerror(savederrno));
return;
}
while (compress_modules_cmds[idx].model_name != NULL) {
if ((compress_modules_cmds[idx].built_in == 1) &&
(compress_modules_cmds[idx].loaded == 1) &&
(compress_modules_cmds[idx].model_id > 0) &&
(knet_h->compress_int_data[idx] != NULL) &&
(idx < KNET_MAX_COMPRESS_METHODS)) {
if ((all) || (compress_modules_cmds[idx].model_id == knet_h->compress_model)) {
if (compress_modules_cmds[idx].ops->fini != NULL) {
compress_modules_cmds[idx].ops->fini(knet_h, idx);
} else {
knet_h->compress_int_data[idx] = NULL;
}
}
}
idx++;
}
pthread_rwlock_unlock(&shlib_rwlock);
return;
}
/*
* compress does not require compress_check_lib_is_init
* because it's protected by compress_cfg
*/
int compress(
knet_handle_t knet_h,
const unsigned char *buf_in,
const ssize_t buf_in_len,
unsigned char *buf_out,
ssize_t *buf_out_len)
{
return compress_modules_cmds[knet_h->compress_model].ops->compress(knet_h, buf_in, buf_in_len, buf_out, buf_out_len);
}
int decompress(
knet_handle_t knet_h,
int compress_model,
const unsigned char *buf_in,
const ssize_t buf_in_len,
unsigned char *buf_out,
ssize_t *buf_out_len)
{
int savederrno = 0, err = 0;
if (compress_model > max_model) {
log_err(knet_h, KNET_SUB_COMPRESS, "Received packet with unknown compress model %d", compress_model);
errno = EINVAL;
return -1;
}
if (compress_is_valid_model(compress_model) < 0) {
log_err(knet_h, KNET_SUB_COMPRESS, "Received packet compressed with %s but support is not built in this version of libknet. Please contact your distribution vendor or fix the build.", compress_modules_cmds[compress_model].model_name);
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&shlib_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!compress_check_lib_is_init(knet_h, compress_model)) {
/*
* need to switch to write lock, load the lib, and return with a write lock
* this is not racy because compress_load_lib is written idempotent.
*/
pthread_rwlock_unlock(&shlib_rwlock);
savederrno = pthread_rwlock_wrlock(&shlib_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (compress_load_lib(knet_h, compress_model, 1) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to load library: %s",
strerror(savederrno));
goto out_unlock;
}
}
err = compress_modules_cmds[compress_model].ops->decompress(knet_h, buf_in, buf_in_len, buf_out, buf_out_len);
savederrno = errno;
out_unlock:
pthread_rwlock_unlock(&shlib_rwlock);
errno = savederrno;
return err;
}
int knet_get_compress_list(struct knet_compress_info *compress_list, size_t *compress_list_entries)
{
int err = 0;
int idx = 0;
int outidx = 0;
if (!compress_list_entries) {
errno = EINVAL;
return -1;
}
while (compress_modules_cmds[idx].model_name != NULL) {
if (compress_modules_cmds[idx].built_in) {
if (compress_list) {
compress_list[outidx].name = compress_modules_cmds[idx].model_name;
}
outidx++;
}
idx++;
}
*compress_list_entries = outidx;
+ if (!err)
+ errno = 0;
return err;
}
diff --git a/libknet/crypto.c b/libknet/crypto.c
index 6b1ead7d..172bac7b 100644
--- a/libknet/crypto.c
+++ b/libknet/crypto.c
@@ -1,212 +1,214 @@
/*
* Copyright (C) 2012-2018 Red Hat, Inc. All rights reserved.
*
* Author: Fabio M. Di Nitto <fabbione@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <sys/errno.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <time.h>
#include "crypto.h"
#include "crypto_model.h"
#include "internals.h"
#include "logging.h"
#include "common.h"
/*
* internal module switch data
*/
static crypto_model_t crypto_modules_cmds[] = {
{ "nss", WITH_CRYPTO_NSS, 0, NULL },
{ "openssl", WITH_CRYPTO_OPENSSL, 0, NULL },
{ NULL, 0, 0, NULL }
};
static int crypto_get_model(const char *model)
{
int idx = 0;
while (crypto_modules_cmds[idx].model_name != NULL) {
if (!strcmp(crypto_modules_cmds[idx].model_name, model))
return idx;
idx++;
}
return -1;
}
/*
* exported API
*/
int crypto_encrypt_and_sign (
knet_handle_t knet_h,
const unsigned char *buf_in,
const ssize_t buf_in_len,
unsigned char *buf_out,
ssize_t *buf_out_len)
{
return crypto_modules_cmds[knet_h->crypto_instance->model].ops->crypt(knet_h, buf_in, buf_in_len, buf_out, buf_out_len);
}
int crypto_encrypt_and_signv (
knet_handle_t knet_h,
const struct iovec *iov_in,
int iovcnt_in,
unsigned char *buf_out,
ssize_t *buf_out_len)
{
return crypto_modules_cmds[knet_h->crypto_instance->model].ops->cryptv(knet_h, iov_in, iovcnt_in, buf_out, buf_out_len);
}
int crypto_authenticate_and_decrypt (
knet_handle_t knet_h,
const unsigned char *buf_in,
const ssize_t buf_in_len,
unsigned char *buf_out,
ssize_t *buf_out_len)
{
return crypto_modules_cmds[knet_h->crypto_instance->model].ops->decrypt(knet_h, buf_in, buf_in_len, buf_out, buf_out_len);
}
int crypto_init(
knet_handle_t knet_h,
struct knet_handle_crypto_cfg *knet_handle_crypto_cfg)
{
int savederrno = 0;
int model = 0;
model = crypto_get_model(knet_handle_crypto_cfg->crypto_model);
if (model < 0) {
log_err(knet_h, KNET_SUB_CRYPTO, "model %s not supported", knet_handle_crypto_cfg->crypto_model);
return -1;
}
if (crypto_modules_cmds[model].built_in == 0) {
log_err(knet_h, KNET_SUB_CRYPTO, "this version of libknet was built without %s support. Please contact your vendor or fix the build.", knet_handle_crypto_cfg->crypto_model);
return -1;
}
savederrno = pthread_rwlock_wrlock(&shlib_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_CRYPTO, "Unable to get write lock: %s",
strerror(savederrno));
return -1;
}
if (!crypto_modules_cmds[model].loaded) {
crypto_modules_cmds[model].ops = load_module (knet_h, "crypto", crypto_modules_cmds[model].model_name);
if (!crypto_modules_cmds[model].ops) {
savederrno = errno;
log_err(knet_h, KNET_SUB_CRYPTO, "Unable to load %s lib", crypto_modules_cmds[model].model_name);
goto out_err;
}
if (crypto_modules_cmds[model].ops->abi_ver != KNET_CRYPTO_MODEL_ABI) {
log_err(knet_h, KNET_SUB_CRYPTO,
"ABI mismatch loading module %s. knet ver: %d, module ver: %d",
crypto_modules_cmds[model].model_name, KNET_CRYPTO_MODEL_ABI,
crypto_modules_cmds[model].ops->abi_ver);
savederrno = EINVAL;
goto out_err;
}
crypto_modules_cmds[model].loaded = 1;
}
log_debug(knet_h, KNET_SUB_CRYPTO,
"Initizializing crypto module [%s/%s/%s]",
knet_handle_crypto_cfg->crypto_model,
knet_handle_crypto_cfg->crypto_cipher_type,
knet_handle_crypto_cfg->crypto_hash_type);
knet_h->crypto_instance = malloc(sizeof(struct crypto_instance));
if (!knet_h->crypto_instance) {
log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto instance");
pthread_rwlock_unlock(&shlib_rwlock);
savederrno = ENOMEM;
goto out_err;
}
/*
* if crypto_modules_cmds.ops->init fails, it is expected that
* it will clean everything by itself.
* crypto_modules_cmds.ops->fini is not invoked on error.
*/
knet_h->crypto_instance->model = model;
if (crypto_modules_cmds[knet_h->crypto_instance->model].ops->init(knet_h, knet_handle_crypto_cfg)) {
savederrno = errno;
goto out_err;
}
log_debug(knet_h, KNET_SUB_CRYPTO, "security network overhead: %zu", knet_h->sec_header_size);
pthread_rwlock_unlock(&shlib_rwlock);
return 0;
out_err:
if (knet_h->crypto_instance) {
free(knet_h->crypto_instance);
knet_h->crypto_instance = NULL;
}
pthread_rwlock_unlock(&shlib_rwlock);
errno = savederrno;
return -1;
}
void crypto_fini(
knet_handle_t knet_h)
{
int savederrno = 0;
int model = 0;
savederrno = pthread_rwlock_wrlock(&shlib_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_CRYPTO, "Unable to get write lock: %s",
strerror(savederrno));
return;
}
if (knet_h->crypto_instance) {
model = knet_h->crypto_instance->model;
if (crypto_modules_cmds[model].ops->fini != NULL) {
crypto_modules_cmds[model].ops->fini(knet_h);
}
free(knet_h->crypto_instance);
knet_h->crypto_instance = NULL;
}
pthread_rwlock_unlock(&shlib_rwlock);
return;
}
int knet_get_crypto_list(struct knet_crypto_info *crypto_list, size_t *crypto_list_entries)
{
int err = 0;
int idx = 0;
int outidx = 0;
if (!crypto_list_entries) {
errno = EINVAL;
return -1;
}
while (crypto_modules_cmds[idx].model_name != NULL) {
if (crypto_modules_cmds[idx].built_in) {
if (crypto_list) {
crypto_list[outidx].name = crypto_modules_cmds[idx].model_name;
}
outidx++;
}
idx++;
}
*crypto_list_entries = outidx;
+ if (!err)
+ errno = 0;
return err;
}
diff --git a/libknet/handle.c b/libknet/handle.c
index 7932e36d..71bcb569 100644
--- a/libknet/handle.c
+++ b/libknet/handle.c
@@ -1,1604 +1,1613 @@
/*
* Copyright (C) 2010-2018 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <pthread.h>
#include <sys/uio.h>
#include <math.h>
#include <sys/time.h>
#include <sys/resource.h>
#include "internals.h"
#include "crypto.h"
#include "links.h"
#include "compress.h"
#include "compat.h"
#include "common.h"
#include "threads_common.h"
#include "threads_heartbeat.h"
#include "threads_pmtud.h"
#include "threads_dsthandler.h"
#include "threads_rx.h"
#include "threads_tx.h"
#include "transports.h"
#include "transport_common.h"
#include "logging.h"
static pthread_mutex_t handle_config_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_rwlock_t shlib_rwlock;
static uint8_t shlib_wrlock_init = 0;
static uint32_t knet_ref = 0;
static int _init_shlib_tracker(knet_handle_t knet_h)
{
int savederrno = 0;
if (!shlib_wrlock_init) {
savederrno = pthread_rwlock_init(&shlib_rwlock, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize shared lib rwlock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
shlib_wrlock_init = 1;
}
return 0;
}
static void _fini_shlib_tracker(void)
{
if (knet_ref == 0) {
pthread_rwlock_destroy(&shlib_rwlock);
shlib_wrlock_init = 0;
}
return;
}
static int _init_locks(knet_handle_t knet_h)
{
int savederrno = 0;
savederrno = pthread_rwlock_init(&knet_h->global_rwlock, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize list rwlock: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_mutex_init(&knet_h->threads_status_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize threads status mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_mutex_init(&knet_h->pmtud_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize pmtud mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_mutex_init(&knet_h->kmtu_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize kernel_mtu mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_cond_init(&knet_h->pmtud_cond, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize pmtud conditional mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_mutex_init(&knet_h->hb_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize hb_thread mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_mutex_init(&knet_h->tx_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize tx_thread mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_mutex_init(&knet_h->backoff_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize pong timeout backoff mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_mutex_init(&knet_h->tx_seq_num_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize tx_seq_num_mutex mutex: %s",
strerror(savederrno));
goto exit_fail;
}
return 0;
exit_fail:
errno = savederrno;
return -1;
}
static void _destroy_locks(knet_handle_t knet_h)
{
pthread_rwlock_destroy(&knet_h->global_rwlock);
pthread_mutex_destroy(&knet_h->pmtud_mutex);
pthread_mutex_destroy(&knet_h->kmtu_mutex);
pthread_cond_destroy(&knet_h->pmtud_cond);
pthread_mutex_destroy(&knet_h->hb_mutex);
pthread_mutex_destroy(&knet_h->tx_mutex);
pthread_mutex_destroy(&knet_h->backoff_mutex);
pthread_mutex_destroy(&knet_h->tx_seq_num_mutex);
pthread_mutex_destroy(&knet_h->threads_status_mutex);
}
static int _init_socks(knet_handle_t knet_h)
{
int savederrno = 0;
if (_init_socketpair(knet_h, knet_h->hostsockfd)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize internal hostsockpair: %s",
strerror(savederrno));
goto exit_fail;
}
if (_init_socketpair(knet_h, knet_h->dstsockfd)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize internal dstsockpair: %s",
strerror(savederrno));
goto exit_fail;
}
return 0;
exit_fail:
errno = savederrno;
return -1;
}
static void _close_socks(knet_handle_t knet_h)
{
_close_socketpair(knet_h, knet_h->dstsockfd);
_close_socketpair(knet_h, knet_h->hostsockfd);
}
static int _init_buffers(knet_handle_t knet_h)
{
int savederrno = 0;
int i;
size_t bufsize;
for (i = 0; i < PCKT_FRAG_MAX; i++) {
bufsize = ceil((float)KNET_MAX_PACKET_SIZE / (i + 1)) + KNET_HEADER_ALL_SIZE;
knet_h->send_to_links_buf[i] = malloc(bufsize);
if (!knet_h->send_to_links_buf[i]) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory datafd to link buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->send_to_links_buf[i], 0, bufsize);
}
for (i = 0; i < PCKT_RX_BUFS; i++) {
knet_h->recv_from_links_buf[i] = malloc(KNET_DATABUFSIZE);
if (!knet_h->recv_from_links_buf[i]) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for link to datafd buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->recv_from_links_buf[i], 0, KNET_DATABUFSIZE);
}
knet_h->recv_from_sock_buf = malloc(KNET_DATABUFSIZE);
if (!knet_h->recv_from_sock_buf) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for app to datafd buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->recv_from_sock_buf, 0, KNET_DATABUFSIZE);
knet_h->pingbuf = malloc(KNET_HEADER_PING_SIZE);
if (!knet_h->pingbuf) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for hearbeat buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->pingbuf, 0, KNET_HEADER_PING_SIZE);
knet_h->pmtudbuf = malloc(KNET_PMTUD_SIZE_V6);
if (!knet_h->pmtudbuf) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for pmtud buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->pmtudbuf, 0, KNET_PMTUD_SIZE_V6);
for (i = 0; i < PCKT_FRAG_MAX; i++) {
bufsize = ceil((float)KNET_MAX_PACKET_SIZE / (i + 1)) + KNET_HEADER_ALL_SIZE + KNET_DATABUFSIZE_CRYPT_PAD;
knet_h->send_to_links_buf_crypt[i] = malloc(bufsize);
if (!knet_h->send_to_links_buf_crypt[i]) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for crypto datafd to link buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->send_to_links_buf_crypt[i], 0, bufsize);
}
knet_h->recv_from_links_buf_decrypt = malloc(KNET_DATABUFSIZE_CRYPT);
if (!knet_h->recv_from_links_buf_decrypt) {
savederrno = errno;
log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto link to datafd buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->recv_from_links_buf_decrypt, 0, KNET_DATABUFSIZE_CRYPT);
knet_h->recv_from_links_buf_crypt = malloc(KNET_DATABUFSIZE_CRYPT);
if (!knet_h->recv_from_links_buf_crypt) {
savederrno = errno;
log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto link to datafd buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->recv_from_links_buf_crypt, 0, KNET_DATABUFSIZE_CRYPT);
knet_h->pingbuf_crypt = malloc(KNET_DATABUFSIZE_CRYPT);
if (!knet_h->pingbuf_crypt) {
savederrno = errno;
log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto hearbeat buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->pingbuf_crypt, 0, KNET_DATABUFSIZE_CRYPT);
knet_h->pmtudbuf_crypt = malloc(KNET_DATABUFSIZE_CRYPT);
if (!knet_h->pmtudbuf_crypt) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for crypto pmtud buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->pmtudbuf_crypt, 0, KNET_DATABUFSIZE_CRYPT);
knet_h->recv_from_links_buf_decompress = malloc(KNET_DATABUFSIZE_COMPRESS);
if (!knet_h->recv_from_links_buf_decompress) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for decompress buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->recv_from_links_buf_decompress, 0, KNET_DATABUFSIZE_COMPRESS);
knet_h->send_to_links_buf_compress = malloc(KNET_DATABUFSIZE_COMPRESS);
if (!knet_h->send_to_links_buf_compress) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for compress buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->send_to_links_buf_compress, 0, KNET_DATABUFSIZE_COMPRESS);
memset(knet_h->knet_transport_fd_tracker, KNET_MAX_TRANSPORTS, sizeof(knet_h->knet_transport_fd_tracker));
return 0;
exit_fail:
errno = savederrno;
return -1;
}
static void _destroy_buffers(knet_handle_t knet_h)
{
int i;
for (i = 0; i < PCKT_FRAG_MAX; i++) {
free(knet_h->send_to_links_buf[i]);
free(knet_h->send_to_links_buf_crypt[i]);
}
for (i = 0; i < PCKT_RX_BUFS; i++) {
free(knet_h->recv_from_links_buf[i]);
}
free(knet_h->recv_from_links_buf_decompress);
free(knet_h->send_to_links_buf_compress);
free(knet_h->recv_from_sock_buf);
free(knet_h->recv_from_links_buf_decrypt);
free(knet_h->recv_from_links_buf_crypt);
free(knet_h->pingbuf);
free(knet_h->pingbuf_crypt);
free(knet_h->pmtudbuf);
free(knet_h->pmtudbuf_crypt);
}
static int _init_epolls(knet_handle_t knet_h)
{
struct epoll_event ev;
int savederrno = 0;
/*
* even if the kernel does dynamic allocation with epoll_ctl
* we need to reserve one extra for host to host communication
*/
knet_h->send_to_links_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS + 1);
if (knet_h->send_to_links_epollfd < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to create epoll datafd to link fd: %s",
strerror(savederrno));
goto exit_fail;
}
knet_h->recv_from_links_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS);
if (knet_h->recv_from_links_epollfd < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to create epoll link to datafd fd: %s",
strerror(savederrno));
goto exit_fail;
}
knet_h->dst_link_handler_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS);
if (knet_h->dst_link_handler_epollfd < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to create epoll dst cache fd: %s",
strerror(savederrno));
goto exit_fail;
}
if (_fdset_cloexec(knet_h->send_to_links_epollfd)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on datafd to link epoll fd: %s",
strerror(savederrno));
goto exit_fail;
}
if (_fdset_cloexec(knet_h->recv_from_links_epollfd)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on link to datafd epoll fd: %s",
strerror(savederrno));
goto exit_fail;
}
if (_fdset_cloexec(knet_h->dst_link_handler_epollfd)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on dst cache epoll fd: %s",
strerror(savederrno));
goto exit_fail;
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = knet_h->hostsockfd[0];
if (epoll_ctl(knet_h->send_to_links_epollfd,
EPOLL_CTL_ADD, knet_h->hostsockfd[0], &ev)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to add hostsockfd[0] to epoll pool: %s",
strerror(savederrno));
goto exit_fail;
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = knet_h->dstsockfd[0];
if (epoll_ctl(knet_h->dst_link_handler_epollfd,
EPOLL_CTL_ADD, knet_h->dstsockfd[0], &ev)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to add dstsockfd[0] to epoll pool: %s",
strerror(savederrno));
goto exit_fail;
}
return 0;
exit_fail:
errno = savederrno;
return -1;
}
static void _close_epolls(knet_handle_t knet_h)
{
struct epoll_event ev;
int i;
memset(&ev, 0, sizeof(struct epoll_event));
for (i = 0; i < KNET_DATAFD_MAX; i++) {
if (knet_h->sockfd[i].in_use) {
epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->sockfd[i].sockfd[knet_h->sockfd[i].is_created], &ev);
if (knet_h->sockfd[i].sockfd[knet_h->sockfd[i].is_created]) {
_close_socketpair(knet_h, knet_h->sockfd[i].sockfd);
}
}
}
epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->hostsockfd[0], &ev);
epoll_ctl(knet_h->dst_link_handler_epollfd, EPOLL_CTL_DEL, knet_h->dstsockfd[0], &ev);
close(knet_h->send_to_links_epollfd);
close(knet_h->recv_from_links_epollfd);
close(knet_h->dst_link_handler_epollfd);
}
static int _start_threads(knet_handle_t knet_h)
{
int savederrno = 0;
savederrno = pthread_create(&knet_h->pmtud_link_handler_thread, 0,
_handle_pmtud_link_thread, (void *) knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to start pmtud link thread: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_create(&knet_h->dst_link_handler_thread, 0,
_handle_dst_link_handler_thread, (void *) knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to start dst cache thread: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_create(&knet_h->send_to_links_thread, 0,
_handle_send_to_links_thread, (void *) knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to start datafd to link thread: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_create(&knet_h->recv_from_links_thread, 0,
_handle_recv_from_links_thread, (void *) knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to start link to datafd thread: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_create(&knet_h->heartbt_thread, 0,
_handle_heartbt_thread, (void *) knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to start heartbeat thread: %s",
strerror(savederrno));
goto exit_fail;
}
return 0;
exit_fail:
errno = savederrno;
return -1;
}
static void _stop_threads(knet_handle_t knet_h)
{
void *retval;
wait_all_threads_status(knet_h, KNET_THREAD_STOPPED);
if (knet_h->heartbt_thread) {
pthread_cancel(knet_h->heartbt_thread);
pthread_join(knet_h->heartbt_thread, &retval);
}
if (knet_h->send_to_links_thread) {
pthread_cancel(knet_h->send_to_links_thread);
pthread_join(knet_h->send_to_links_thread, &retval);
}
if (knet_h->recv_from_links_thread) {
pthread_cancel(knet_h->recv_from_links_thread);
pthread_join(knet_h->recv_from_links_thread, &retval);
}
if (knet_h->dst_link_handler_thread) {
pthread_cancel(knet_h->dst_link_handler_thread);
pthread_join(knet_h->dst_link_handler_thread, &retval);
}
if (knet_h->pmtud_link_handler_thread) {
pthread_cancel(knet_h->pmtud_link_handler_thread);
pthread_join(knet_h->pmtud_link_handler_thread, &retval);
}
}
knet_handle_t knet_handle_new_ex(knet_node_id_t host_id,
int log_fd,
uint8_t default_log_level,
uint64_t flags)
{
knet_handle_t knet_h;
int savederrno = 0;
struct rlimit cur;
if (getrlimit(RLIMIT_NOFILE, &cur) < 0) {
return NULL;
}
if ((log_fd < 0) || ((unsigned int)log_fd >= cur.rlim_max)) {
errno = EINVAL;
return NULL;
}
/*
* validate incoming request
*/
if ((log_fd) && (default_log_level > KNET_LOG_DEBUG)) {
errno = EINVAL;
return NULL;
}
if (flags > KNET_HANDLE_FLAG_PRIVILEGED * 2 - 1) {
errno = EINVAL;
return NULL;
}
/*
* allocate handle
*/
knet_h = malloc(sizeof(struct knet_handle));
if (!knet_h) {
errno = ENOMEM;
return NULL;
}
memset(knet_h, 0, sizeof(struct knet_handle));
/*
* setting up some handle data so that we can use logging
* also when initializing the library global locks
* and trackers
*/
knet_h->flags = flags;
/*
* copy config in place
*/
knet_h->host_id = host_id;
knet_h->logfd = log_fd;
if (knet_h->logfd > 0) {
memset(&knet_h->log_levels, default_log_level, KNET_MAX_SUBSYSTEMS);
}
/*
* set pmtud default timers
*/
knet_h->pmtud_interval = KNET_PMTUD_DEFAULT_INTERVAL;
/*
* set transports reconnect default timers
*/
knet_h->reconnect_int = KNET_TRANSPORT_DEFAULT_RECONNECT_INTERVAL;
/*
* Set 'min' stats to the maximum value so the
* first value we get is always less
*/
knet_h->stats.tx_compress_time_min = UINT64_MAX;
knet_h->stats.rx_compress_time_min = UINT64_MAX;
knet_h->stats.tx_crypt_time_min = UINT64_MAX;
knet_h->stats.rx_crypt_time_min = UINT64_MAX;
/*
* init global shlib tracker
*/
savederrno = pthread_mutex_lock(&handle_config_mutex);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get handle mutex lock: %s",
strerror(savederrno));
free(knet_h);
knet_h = NULL;
errno = savederrno;
return NULL;
}
knet_ref++;
if (_init_shlib_tracker(knet_h) < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to init handles traceker: %s",
strerror(savederrno));
errno = savederrno;
goto exit_fail;
}
pthread_mutex_unlock(&handle_config_mutex);
/*
* init main locking structures
*/
if (_init_locks(knet_h)) {
savederrno = errno;
goto exit_fail;
}
/*
* init sockets
*/
if (_init_socks(knet_h)) {
savederrno = errno;
goto exit_fail;
}
/*
* allocate packet buffers
*/
if (_init_buffers(knet_h)) {
savederrno = errno;
goto exit_fail;
}
if (compress_init(knet_h)) {
savederrno = errno;
goto exit_fail;
}
/*
* create epoll fds
*/
if (_init_epolls(knet_h)) {
savederrno = errno;
goto exit_fail;
}
/*
* start transports
*/
if (start_all_transports(knet_h)) {
savederrno = errno;
goto exit_fail;
}
/*
* start internal threads
*/
if (_start_threads(knet_h)) {
savederrno = errno;
goto exit_fail;
}
wait_all_threads_status(knet_h, KNET_THREAD_RUNNING);
+ errno = 0;
return knet_h;
exit_fail:
knet_handle_free(knet_h);
errno = savederrno;
return NULL;
}
knet_handle_t knet_handle_new(knet_node_id_t host_id,
int log_fd,
uint8_t default_log_level)
{
return knet_handle_new_ex(host_id, log_fd, default_log_level, KNET_HANDLE_FLAG_PRIVILEGED);
}
int knet_handle_free(knet_handle_t knet_h)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (knet_h->host_head != NULL) {
savederrno = EBUSY;
log_err(knet_h, KNET_SUB_HANDLE,
"Unable to free handle: host(s) or listener(s) are still active: %s",
strerror(savederrno));
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return -1;
}
knet_h->fini_in_progress = 1;
pthread_rwlock_unlock(&knet_h->global_rwlock);
_stop_threads(knet_h);
stop_all_transports(knet_h);
_close_epolls(knet_h);
_destroy_buffers(knet_h);
_close_socks(knet_h);
crypto_fini(knet_h);
compress_fini(knet_h, 1);
_destroy_locks(knet_h);
free(knet_h);
knet_h = NULL;
(void)pthread_mutex_lock(&handle_config_mutex);
knet_ref--;
_fini_shlib_tracker();
pthread_mutex_unlock(&handle_config_mutex);
+ errno = 0;
return 0;
}
int knet_handle_enable_sock_notify(knet_handle_t knet_h,
void *sock_notify_fn_private_data,
void (*sock_notify_fn) (
void *private_data,
int datafd,
int8_t channel,
uint8_t tx_rx,
int error,
int errorno))
{
int savederrno = 0, err = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!sock_notify_fn) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->sock_notify_fn_private_data = sock_notify_fn_private_data;
knet_h->sock_notify_fn = sock_notify_fn;
log_debug(knet_h, KNET_SUB_HANDLE, "sock_notify_fn enabled");
pthread_rwlock_unlock(&knet_h->global_rwlock);
+ errno = err ? savederrno : 0;
return err;
}
int knet_handle_add_datafd(knet_handle_t knet_h, int *datafd, int8_t *channel)
{
int err = 0, savederrno = 0;
int i;
struct epoll_event ev;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (datafd == NULL) {
errno = EINVAL;
return -1;
}
if (channel == NULL) {
errno = EINVAL;
return -1;
}
if (*channel >= KNET_DATAFD_MAX) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->sock_notify_fn) {
log_err(knet_h, KNET_SUB_HANDLE, "Adding datafd requires sock notify callback enabled!");
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
if (*datafd > 0) {
for (i = 0; i < KNET_DATAFD_MAX; i++) {
if ((knet_h->sockfd[i].in_use) && (knet_h->sockfd[i].sockfd[0] == *datafd)) {
log_err(knet_h, KNET_SUB_HANDLE, "requested datafd: %d already exist in index: %d", *datafd, i);
savederrno = EEXIST;
err = -1;
goto out_unlock;
}
}
}
/*
* auto allocate a channel
*/
if (*channel < 0) {
for (i = 0; i < KNET_DATAFD_MAX; i++) {
if (!knet_h->sockfd[i].in_use) {
*channel = i;
break;
}
}
if (*channel < 0) {
savederrno = EBUSY;
err = -1;
goto out_unlock;
}
} else {
if (knet_h->sockfd[*channel].in_use) {
savederrno = EBUSY;
err = -1;
goto out_unlock;
}
}
knet_h->sockfd[*channel].is_created = 0;
knet_h->sockfd[*channel].is_socket = 0;
knet_h->sockfd[*channel].has_error = 0;
if (*datafd > 0) {
int sockopt;
socklen_t sockoptlen = sizeof(sockopt);
if (_fdset_cloexec(*datafd)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on datafd: %s",
strerror(savederrno));
goto out_unlock;
}
if (_fdset_nonblock(*datafd)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to set NONBLOCK on datafd: %s",
strerror(savederrno));
goto out_unlock;
}
knet_h->sockfd[*channel].sockfd[0] = *datafd;
knet_h->sockfd[*channel].sockfd[1] = 0;
if (!getsockopt(knet_h->sockfd[*channel].sockfd[0], SOL_SOCKET, SO_TYPE, &sockopt, &sockoptlen)) {
knet_h->sockfd[*channel].is_socket = 1;
}
} else {
if (_init_socketpair(knet_h, knet_h->sockfd[*channel].sockfd)) {
savederrno = errno;
err = -1;
goto out_unlock;
}
knet_h->sockfd[*channel].is_created = 1;
knet_h->sockfd[*channel].is_socket = 1;
*datafd = knet_h->sockfd[*channel].sockfd[0];
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created];
if (epoll_ctl(knet_h->send_to_links_epollfd,
EPOLL_CTL_ADD, knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created], &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to add datafd %d to linkfd epoll pool: %s",
knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created], strerror(savederrno));
if (knet_h->sockfd[*channel].is_created) {
_close_socketpair(knet_h, knet_h->sockfd[*channel].sockfd);
}
goto out_unlock;
}
knet_h->sockfd[*channel].in_use = 1;
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
- errno = savederrno;
+ errno = err ? savederrno : 0;
return err;
}
int knet_handle_remove_datafd(knet_handle_t knet_h, int datafd)
{
int err = 0, savederrno = 0;
int8_t channel = -1;
int i;
struct epoll_event ev;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (datafd <= 0) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
for (i = 0; i < KNET_DATAFD_MAX; i++) {
if ((knet_h->sockfd[i].in_use) &&
(knet_h->sockfd[i].sockfd[0] == datafd)) {
channel = i;
break;
}
}
if (channel < 0) {
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
if (!knet_h->sockfd[channel].has_error) {
memset(&ev, 0, sizeof(struct epoll_event));
if (epoll_ctl(knet_h->send_to_links_epollfd,
EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to del datafd %d from linkfd epoll pool: %s",
knet_h->sockfd[channel].sockfd[0], strerror(savederrno));
goto out_unlock;
}
}
if (knet_h->sockfd[channel].is_created) {
_close_socketpair(knet_h, knet_h->sockfd[channel].sockfd);
}
memset(&knet_h->sockfd[channel], 0, sizeof(struct knet_sock));
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
- errno = savederrno;
+ errno = err ? savederrno : 0;
return err;
}
int knet_handle_get_datafd(knet_handle_t knet_h, const int8_t channel, int *datafd)
{
int err = 0, savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if ((channel < 0) || (channel >= KNET_DATAFD_MAX)) {
errno = EINVAL;
return -1;
}
if (datafd == NULL) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->sockfd[channel].in_use) {
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
*datafd = knet_h->sockfd[channel].sockfd[0];
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
- errno = savederrno;
+ errno = err ? savederrno : 0;
return err;
}
int knet_handle_get_channel(knet_handle_t knet_h, const int datafd, int8_t *channel)
{
int err = 0, savederrno = 0;
int i;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (datafd <= 0) {
errno = EINVAL;
return -1;
}
if (channel == NULL) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
*channel = -1;
for (i = 0; i < KNET_DATAFD_MAX; i++) {
if ((knet_h->sockfd[i].in_use) &&
(knet_h->sockfd[i].sockfd[0] == datafd)) {
*channel = i;
break;
}
}
if (*channel < 0) {
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
- errno = savederrno;
+ errno = err ? savederrno : 0;
return err;
}
int knet_handle_enable_filter(knet_handle_t knet_h,
void *dst_host_filter_fn_private_data,
int (*dst_host_filter_fn) (
void *private_data,
const unsigned char *outdata,
ssize_t outdata_len,
uint8_t tx_rx,
knet_node_id_t this_host_id,
knet_node_id_t src_node_id,
int8_t *channel,
knet_node_id_t *dst_host_ids,
size_t *dst_host_ids_entries))
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->dst_host_filter_fn_private_data = dst_host_filter_fn_private_data;
knet_h->dst_host_filter_fn = dst_host_filter_fn;
if (knet_h->dst_host_filter_fn) {
log_debug(knet_h, KNET_SUB_HANDLE, "dst_host_filter_fn enabled");
} else {
log_debug(knet_h, KNET_SUB_HANDLE, "dst_host_filter_fn disabled");
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
+ errno = 0;
return 0;
}
int knet_handle_setfwd(knet_handle_t knet_h, unsigned int enabled)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (enabled > 1) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->enabled = enabled;
if (enabled) {
log_debug(knet_h, KNET_SUB_HANDLE, "Data forwarding is enabled");
} else {
log_debug(knet_h, KNET_SUB_HANDLE, "Data forwarding is disabled");
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
+ errno = 0;
return 0;
}
int knet_handle_pmtud_getfreq(knet_handle_t knet_h, unsigned int *interval)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!interval) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
*interval = knet_h->pmtud_interval;
pthread_rwlock_unlock(&knet_h->global_rwlock);
+ errno = 0;
return 0;
}
int knet_handle_pmtud_setfreq(knet_handle_t knet_h, unsigned int interval)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if ((!interval) || (interval > 86400)) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->pmtud_interval = interval;
log_debug(knet_h, KNET_SUB_HANDLE, "PMTUd interval set to: %u seconds", interval);
pthread_rwlock_unlock(&knet_h->global_rwlock);
+ errno = 0;
return 0;
}
int knet_handle_enable_pmtud_notify(knet_handle_t knet_h,
void *pmtud_notify_fn_private_data,
void (*pmtud_notify_fn) (
void *private_data,
unsigned int data_mtu))
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->pmtud_notify_fn_private_data = pmtud_notify_fn_private_data;
knet_h->pmtud_notify_fn = pmtud_notify_fn;
if (knet_h->pmtud_notify_fn) {
log_debug(knet_h, KNET_SUB_HANDLE, "pmtud_notify_fn enabled");
} else {
log_debug(knet_h, KNET_SUB_HANDLE, "pmtud_notify_fn disabled");
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
+ errno = 0;
return 0;
}
int knet_handle_pmtud_get(knet_handle_t knet_h,
unsigned int *data_mtu)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!data_mtu) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
*data_mtu = knet_h->data_mtu;
pthread_rwlock_unlock(&knet_h->global_rwlock);
+ errno = 0;
return 0;
}
int knet_handle_crypto(knet_handle_t knet_h, struct knet_handle_crypto_cfg *knet_handle_crypto_cfg)
{
int savederrno = 0;
int err = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!knet_handle_crypto_cfg) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
crypto_fini(knet_h);
if ((!strncmp("none", knet_handle_crypto_cfg->crypto_model, 4)) ||
((!strncmp("none", knet_handle_crypto_cfg->crypto_cipher_type, 4)) &&
(!strncmp("none", knet_handle_crypto_cfg->crypto_hash_type, 4)))) {
log_debug(knet_h, KNET_SUB_CRYPTO, "crypto is not enabled");
err = 0;
goto exit_unlock;
}
if (knet_handle_crypto_cfg->private_key_len < KNET_MIN_KEY_LEN) {
log_debug(knet_h, KNET_SUB_CRYPTO, "private key len too short (min %d): %u",
KNET_MIN_KEY_LEN, knet_handle_crypto_cfg->private_key_len);
savederrno = EINVAL;
err = -1;
goto exit_unlock;
}
if (knet_handle_crypto_cfg->private_key_len > KNET_MAX_KEY_LEN) {
log_debug(knet_h, KNET_SUB_CRYPTO, "private key len too long (max %d): %u",
KNET_MAX_KEY_LEN, knet_handle_crypto_cfg->private_key_len);
savederrno = EINVAL;
err = -1;
goto exit_unlock;
}
err = crypto_init(knet_h, knet_handle_crypto_cfg);
if (err) {
err = -2;
savederrno = errno;
}
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
- errno = savederrno;
+ errno = err ? savederrno : 0;
return err;
}
int knet_handle_compress(knet_handle_t knet_h, struct knet_handle_compress_cfg *knet_handle_compress_cfg)
{
int savederrno = 0;
int err = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!knet_handle_compress_cfg) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
compress_fini(knet_h, 0);
err = compress_cfg(knet_h, knet_handle_compress_cfg);
savederrno = errno;
pthread_rwlock_unlock(&knet_h->global_rwlock);
- errno = savederrno;
+ errno = err ? savederrno : 0;
return err;
}
ssize_t knet_recv(knet_handle_t knet_h, char *buff, const size_t buff_len, const int8_t channel)
{
int savederrno = 0;
ssize_t err = 0;
struct iovec iov_in;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (buff == NULL) {
errno = EINVAL;
return -1;
}
if (buff_len <= 0) {
errno = EINVAL;
return -1;
}
if (buff_len > KNET_MAX_PACKET_SIZE) {
errno = EINVAL;
return -1;
}
if (channel < 0) {
errno = EINVAL;
return -1;
}
if (channel >= KNET_DATAFD_MAX) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->sockfd[channel].in_use) {
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
memset(&iov_in, 0, sizeof(iov_in));
iov_in.iov_base = (void *)buff;
iov_in.iov_len = buff_len;
err = readv(knet_h->sockfd[channel].sockfd[0], &iov_in, 1);
savederrno = errno;
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
- errno = savederrno;
+ errno = err ? savederrno : 0;
return err;
}
ssize_t knet_send(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel)
{
int savederrno = 0;
ssize_t err = 0;
struct iovec iov_out[1];
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (buff == NULL) {
errno = EINVAL;
return -1;
}
if (buff_len <= 0) {
errno = EINVAL;
return -1;
}
if (buff_len > KNET_MAX_PACKET_SIZE) {
errno = EINVAL;
return -1;
}
if (channel < 0) {
errno = EINVAL;
return -1;
}
if (channel >= KNET_DATAFD_MAX) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->sockfd[channel].in_use) {
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
memset(iov_out, 0, sizeof(iov_out));
iov_out[0].iov_base = (void *)buff;
iov_out[0].iov_len = buff_len;
err = writev(knet_h->sockfd[channel].sockfd[0], iov_out, 1);
savederrno = errno;
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
- errno = savederrno;
+ errno = err ? savederrno : 0;
return err;
}
int knet_handle_get_stats(knet_handle_t knet_h, struct knet_handle_stats *stats, size_t struct_size)
{
int savederrno = 0;
int err = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!stats) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (struct_size > sizeof(struct knet_handle_stats)) {
struct_size = sizeof(struct knet_handle_stats);
}
memmove(stats, &knet_h->stats, struct_size);
/*
* TX crypt stats only count the data packets sent, so add in the ping/pong/pmtud figures
* RX is OK as it counts them before they are sorted.
*/
stats->tx_crypt_packets += knet_h->stats_extra.tx_crypt_ping_packets +
knet_h->stats_extra.tx_crypt_pong_packets +
knet_h->stats_extra.tx_crypt_pmtu_packets +
knet_h->stats_extra.tx_crypt_pmtu_reply_packets;
/* Tell the caller our full size in case they have an old version */
stats->size = sizeof(struct knet_handle_stats);
pthread_rwlock_unlock(&knet_h->global_rwlock);
- errno = savederrno;
+ errno = err ? savederrno : 0;
return err;
}
int knet_handle_clear_stats(knet_handle_t knet_h, int clear_option)
{
int savederrno = 0;
int err = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (clear_option != KNET_CLEARSTATS_HANDLE_ONLY &&
clear_option != KNET_CLEARSTATS_HANDLE_AND_LINK) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
memset(&knet_h->stats, 0, sizeof(struct knet_handle_stats));
memset(&knet_h->stats_extra, 0, sizeof(struct knet_handle_stats_extra));
if (clear_option == KNET_CLEARSTATS_HANDLE_AND_LINK) {
_link_clear_stats(knet_h);
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
- errno = savederrno;
+ errno = err ? savederrno : 0;
return err;
}
diff --git a/libknet/host.c b/libknet/host.c
index 9f6648e8..00e2298e 100644
--- a/libknet/host.c
+++ b/libknet/host.c
@@ -1,711 +1,712 @@
/*
* Copyright (C) 2010-2018 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include <stdio.h>
#include "host.h"
#include "internals.h"
#include "logging.h"
#include "threads_common.h"
static void _host_list_update(knet_handle_t knet_h)
{
struct knet_host *host;
knet_h->host_ids_entries = 0;
for (host = knet_h->host_head; host != NULL; host = host->next) {
knet_h->host_ids[knet_h->host_ids_entries] = host->host_id;
knet_h->host_ids_entries++;
}
}
int knet_host_add(knet_handle_t knet_h, knet_node_id_t host_id)
{
int savederrno = 0, err = 0;
struct knet_host *host = NULL;
uint8_t link_idx;
if (!knet_h) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (knet_h->host_index[host_id]) {
err = -1;
savederrno = EEXIST;
log_err(knet_h, KNET_SUB_HOST, "Unable to add host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
host = malloc(sizeof(struct knet_host));
if (!host) {
err = -1;
savederrno = errno;
log_err(knet_h, KNET_SUB_HOST, "Unable to allocate memory for host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
memset(host, 0, sizeof(struct knet_host));
/*
* set host_id
*/
host->host_id = host_id;
/*
* set default host->name to host_id for logging
*/
snprintf(host->name, KNET_MAX_HOST_LEN - 1, "%u", host_id);
/*
* initialize links internal data
*/
for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
host->link[link_idx].link_id = link_idx;
host->link[link_idx].status.stats.latency_min = UINT32_MAX;
}
/*
* add new host to the index
*/
knet_h->host_index[host_id] = host;
/*
* add new host to host list
*/
if (knet_h->host_head) {
host->next = knet_h->host_head;
}
knet_h->host_head = host;
_host_list_update(knet_h);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
if (err < 0) {
free(host);
}
- errno = savederrno;
+ errno = err ? savederrno : 0;
return err;
}
int knet_host_remove(knet_handle_t knet_h, knet_node_id_t host_id)
{
int savederrno = 0, err = 0;
struct knet_host *host, *removed;
uint8_t link_idx;
if (!knet_h) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_HOST, "Unable to remove host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
/*
* if links are configured we cannot release the host
*/
for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
if (host->link[link_idx].configured) {
err = -1;
savederrno = EBUSY;
log_err(knet_h, KNET_SUB_HOST, "Unable to remove host %u, links are still configured: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
}
removed = NULL;
/*
* removing host from list
*/
if (knet_h->host_head->host_id == host_id) {
removed = knet_h->host_head;
knet_h->host_head = removed->next;
} else {
for (host = knet_h->host_head; host->next != NULL; host = host->next) {
if (host->next->host_id == host_id) {
removed = host->next;
host->next = removed->next;
break;
}
}
}
knet_h->host_index[host_id] = NULL;
free(removed);
_host_list_update(knet_h);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
- errno = savederrno;
+ errno = err ? savederrno : 0;
return err;
}
int knet_host_set_name(knet_handle_t knet_h, knet_node_id_t host_id, const char *name)
{
int savederrno = 0, err = 0;
struct knet_host *host;
if (!knet_h) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->host_index[host_id]) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_HOST, "Unable to find host %u to set name: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
if (!name) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_HOST, "Unable to set name for host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
if (strlen(name) >= KNET_MAX_HOST_LEN) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_HOST, "Requested name for host %u is too long: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
for (host = knet_h->host_head; host != NULL; host = host->next) {
if (!strncmp(host->name, name, KNET_MAX_HOST_LEN - 1)) {
err = -1;
savederrno = EEXIST;
log_err(knet_h, KNET_SUB_HOST, "Duplicated name found on host_id %u",
host->host_id);
goto exit_unlock;
}
}
snprintf(knet_h->host_index[host_id]->name, KNET_MAX_HOST_LEN - 1, "%s", name);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
- errno = savederrno;
+ errno = err ? savederrno : 0;
return err;
}
int knet_host_get_name_by_host_id(knet_handle_t knet_h, knet_node_id_t host_id,
char *name)
{
int savederrno = 0, err = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!name) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->host_index[host_id]) {
savederrno = EINVAL;
err = -1;
log_debug(knet_h, KNET_SUB_HOST, "Host %u not found", host_id);
goto exit_unlock;
}
snprintf(name, KNET_MAX_HOST_LEN, "%s", knet_h->host_index[host_id]->name);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
- errno = savederrno;
+ errno = err ? savederrno : 0;
return err;
}
int knet_host_get_id_by_host_name(knet_handle_t knet_h, const char *name,
knet_node_id_t *host_id)
{
int savederrno = 0, err = 0, found = 0;
struct knet_host *host;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!name) {
errno = EINVAL;
return -1;
}
if (!host_id) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
for (host = knet_h->host_head; host != NULL; host = host->next) {
if (!strncmp(name, host->name, KNET_MAX_HOST_LEN)) {
found = 1;
*host_id = host->host_id;
break;
}
}
if (!found) {
savederrno = ENOENT;
err = -1;
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
- errno = savederrno;
+ errno = err ? savederrno : 0;
return err;
}
int knet_host_get_host_list(knet_handle_t knet_h,
knet_node_id_t *host_ids, size_t *host_ids_entries)
{
int savederrno = 0, err = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if ((!host_ids) || (!host_ids_entries)) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
memmove(host_ids, knet_h->host_ids, sizeof(knet_h->host_ids));
*host_ids_entries = knet_h->host_ids_entries;
pthread_rwlock_unlock(&knet_h->global_rwlock);
- errno = savederrno;
+ errno = err ? savederrno : 0;
return err;
}
int knet_host_set_policy(knet_handle_t knet_h, knet_node_id_t host_id,
uint8_t policy)
{
int savederrno = 0, err = 0;
uint8_t old_policy;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (policy > KNET_LINK_POLICY_RR) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->host_index[host_id]) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_HOST, "Unable to set name for host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
old_policy = knet_h->host_index[host_id]->link_handler_policy;
knet_h->host_index[host_id]->link_handler_policy = policy;
if (_host_dstcache_update_async(knet_h, knet_h->host_index[host_id])) {
savederrno = errno;
err = -1;
knet_h->host_index[host_id]->link_handler_policy = old_policy;
log_debug(knet_h, KNET_SUB_HOST, "Unable to update switch cache for host %u: %s",
host_id, strerror(savederrno));
}
log_debug(knet_h, KNET_SUB_HOST, "Host %u has new switching policy: %u", host_id, policy);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
- errno = savederrno;
+ errno = err ? savederrno : 0;
return err;
}
int knet_host_get_policy(knet_handle_t knet_h, knet_node_id_t host_id,
uint8_t *policy)
{
int savederrno = 0, err = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!policy) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->host_index[host_id]) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_HOST, "Unable to get name for host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
*policy = knet_h->host_index[host_id]->link_handler_policy;
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
- errno = savederrno;
+ errno = err ? savederrno : 0;
return err;
}
int knet_host_get_status(knet_handle_t knet_h, knet_node_id_t host_id,
struct knet_host_status *status)
{
int savederrno = 0, err = 0;
struct knet_host *host;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!status) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_HOST, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
memmove(status, &host->status, sizeof(struct knet_host_status));
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
- errno = savederrno;
+ errno = err ? savederrno : 0;
return err;
}
int knet_host_enable_status_change_notify(knet_handle_t knet_h,
void *host_status_change_notify_fn_private_data,
void (*host_status_change_notify_fn) (
void *private_data,
knet_node_id_t host_id,
uint8_t reachable,
uint8_t remote,
uint8_t external))
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->host_status_change_notify_fn_private_data = host_status_change_notify_fn_private_data;
knet_h->host_status_change_notify_fn = host_status_change_notify_fn;
if (knet_h->host_status_change_notify_fn) {
log_debug(knet_h, KNET_SUB_HOST, "host_status_change_notify_fn enabled");
} else {
log_debug(knet_h, KNET_SUB_HOST, "host_status_change_notify_fn disabled");
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
+ errno = 0;
return 0;
}
int _send_host_info(knet_handle_t knet_h, const void *data, const size_t datalen)
{
ssize_t ret = 0;
if (knet_h->fini_in_progress) {
return 0;
}
ret = sendto(knet_h->hostsockfd[1], data, datalen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0);
if (ret < 0) {
log_debug(knet_h, KNET_SUB_HOST, "Unable to write data to hostpipe. Error: %s", strerror(errno));
return -1;
}
if ((size_t)ret != datalen) {
log_debug(knet_h, KNET_SUB_HOST, "Unable to write all data to hostpipe. Expected: %zu, Written: %zd.", datalen, ret);
return -1;
}
return 0;
}
static void _clear_cbuffers(struct knet_host *host, seq_num_t rx_seq_num)
{
int i;
memset(host->circular_buffer, 0, KNET_CBUFFER_SIZE);
host->rx_seq_num = rx_seq_num;
memset(host->circular_buffer_defrag, 0, KNET_CBUFFER_SIZE);
for (i = 0; i < KNET_MAX_LINK; i++) {
memset(&host->defrag_buf[i], 0, sizeof(struct knet_host_defrag_buf));
}
}
/*
* check if a given packet seq num is in the circular buffers
* defrag_buf = 0 -> use normal cbuf 1 -> use the defrag buffer lookup
*/
int _seq_num_lookup(struct knet_host *host, seq_num_t seq_num, int defrag_buf, int clear_buf)
{
size_t i, j; /* circular buffer indexes */
seq_num_t seq_dist;
char *dst_cbuf = host->circular_buffer;
char *dst_cbuf_defrag = host->circular_buffer_defrag;
seq_num_t *dst_seq_num = &host->rx_seq_num;
if (clear_buf) {
_clear_cbuffers(host, seq_num);
}
if (seq_num < *dst_seq_num) {
seq_dist = (SEQ_MAX - seq_num) + *dst_seq_num;
} else {
seq_dist = *dst_seq_num - seq_num;
}
j = seq_num % KNET_CBUFFER_SIZE;
if (seq_dist < KNET_CBUFFER_SIZE) { /* seq num is in ring buffer */
if (!defrag_buf) {
return (dst_cbuf[j] == 0) ? 1 : 0;
} else {
return (dst_cbuf_defrag[j] == 0) ? 1 : 0;
}
} else if (seq_dist <= SEQ_MAX - KNET_CBUFFER_SIZE) {
memset(dst_cbuf, 0, KNET_CBUFFER_SIZE);
memset(dst_cbuf_defrag, 0, KNET_CBUFFER_SIZE);
*dst_seq_num = seq_num;
}
/* cleaning up circular buffer */
i = (*dst_seq_num + 1) % KNET_CBUFFER_SIZE;
if (i > j) {
memset(dst_cbuf + i, 0, KNET_CBUFFER_SIZE - i);
memset(dst_cbuf, 0, j + 1);
memset(dst_cbuf_defrag + i, 0, KNET_CBUFFER_SIZE - i);
memset(dst_cbuf_defrag, 0, j + 1);
} else {
memset(dst_cbuf + i, 0, j - i + 1);
memset(dst_cbuf_defrag + i, 0, j - i + 1);
}
*dst_seq_num = seq_num;
return 1;
}
void _seq_num_set(struct knet_host *host, seq_num_t seq_num, int defrag_buf)
{
if (!defrag_buf) {
host->circular_buffer[seq_num % KNET_CBUFFER_SIZE] = 1;
} else {
host->circular_buffer_defrag[seq_num % KNET_CBUFFER_SIZE] = 1;
}
return;
}
int _host_dstcache_update_async(knet_handle_t knet_h, struct knet_host *host)
{
int savederrno = 0;
knet_node_id_t host_id = host->host_id;
if (sendto(knet_h->dstsockfd[1], &host_id, sizeof(host_id), MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0) != sizeof(host_id)) {
savederrno = errno;
log_debug(knet_h, KNET_SUB_HOST, "Unable to write to dstpipefd[1]: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
return 0;
}
int _host_dstcache_update_sync(knet_handle_t knet_h, struct knet_host *host)
{
int link_idx;
int best_priority = -1;
int reachable = 0;
if (knet_h->host_id == host->host_id && knet_h->has_loop_link) {
host->active_link_entries = 1;
return 0;
}
host->active_link_entries = 0;
for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
if (host->link[link_idx].status.enabled != 1) /* link is not enabled */
continue;
if (host->link[link_idx].status.connected != 1) /* link is not enabled */
continue;
if (host->link[link_idx].has_valid_mtu != 1) /* link does not have valid MTU */
continue;
if (host->link_handler_policy == KNET_LINK_POLICY_PASSIVE) {
/* for passive we look for the only active link with higher priority */
if (host->link[link_idx].priority > best_priority) {
host->active_links[0] = link_idx;
best_priority = host->link[link_idx].priority;
}
host->active_link_entries = 1;
} else {
/* for RR and ACTIVE we need to copy all available links */
host->active_links[host->active_link_entries] = link_idx;
host->active_link_entries++;
}
}
if (host->link_handler_policy == KNET_LINK_POLICY_PASSIVE) {
log_debug(knet_h, KNET_SUB_HOST, "host: %u (passive) best link: %u (pri: %u)",
host->host_id, host->link[host->active_links[0]].link_id,
host->link[host->active_links[0]].priority);
} else {
log_debug(knet_h, KNET_SUB_HOST, "host: %u has %u active links",
host->host_id, host->active_link_entries);
}
/* no active links, we can clean the circular buffers and indexes */
if (!host->active_link_entries) {
log_warn(knet_h, KNET_SUB_HOST, "host: %u has no active links", host->host_id);
_clear_cbuffers(host, 0);
} else {
reachable = 1;
}
if (host->status.reachable != reachable) {
host->status.reachable = reachable;
if (knet_h->host_status_change_notify_fn) {
knet_h->host_status_change_notify_fn(
knet_h->host_status_change_notify_fn_private_data,
host->host_id,
host->status.reachable,
host->status.remote,
host->status.external);
}
}
return 0;
}
diff --git a/libknet/links.c b/libknet/links.c
index 6185d4df..ba28dbd9 100644
--- a/libknet/links.c
+++ b/libknet/links.c
@@ -1,1086 +1,1086 @@
/*
* Copyright (C) 2012-2018 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <errno.h>
#include <netdb.h>
#include <string.h>
#include <pthread.h>
#include "internals.h"
#include "logging.h"
#include "links.h"
#include "transports.h"
#include "host.h"
#include "threads_common.h"
int _link_updown(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
unsigned int enabled, unsigned int connected)
{
struct knet_link *link = &knet_h->host_index[host_id]->link[link_id];
if ((link->status.enabled == enabled) &&
(link->status.connected == connected))
return 0;
link->status.enabled = enabled;
link->status.connected = connected;
_host_dstcache_update_async(knet_h, knet_h->host_index[host_id]);
if ((link->status.dynconnected) &&
(!link->status.connected))
link->status.dynconnected = 0;
if (connected) {
time(&link->status.stats.last_up_times[link->status.stats.last_up_time_index]);
link->status.stats.up_count++;
if (++link->status.stats.last_up_time_index > MAX_LINK_EVENTS) {
link->status.stats.last_up_time_index = 0;
}
} else {
time(&link->status.stats.last_down_times[link->status.stats.last_down_time_index]);
link->status.stats.down_count++;
if (++link->status.stats.last_down_time_index > MAX_LINK_EVENTS) {
link->status.stats.last_down_time_index = 0;
}
}
return 0;
}
void _link_clear_stats(knet_handle_t knet_h)
{
struct knet_host *host;
struct knet_link *link;
uint32_t host_id;
uint8_t link_id;
for (host_id = 0; host_id < KNET_MAX_HOST; host_id++) {
host = knet_h->host_index[host_id];
if (!host) {
continue;
}
for (link_id = 0; link_id < KNET_MAX_LINK; link_id++) {
link = &host->link[link_id];
memset(&link->status.stats, 0, sizeof(struct knet_link_stats));
}
}
}
int knet_link_set_config(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
uint8_t transport,
struct sockaddr_storage *src_addr,
struct sockaddr_storage *dst_addr,
uint64_t flags)
{
int savederrno = 0, err = 0, i;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!src_addr) {
errno = EINVAL;
return -1;
}
if (transport >= KNET_MAX_TRANSPORTS) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (transport == KNET_TRANSPORT_LOOPBACK && knet_h->host_id != host_id) {
log_err(knet_h, KNET_SUB_LINK, "Cannot create loopback link to remote node");
err = -1;
savederrno = EINVAL;
goto exit_unlock;
}
if (knet_h->host_id == host_id && knet_h->has_loop_link) {
log_err(knet_h, KNET_SUB_LINK, "Cannot create more than 1 link when loopback is active");
err = -1;
savederrno = EINVAL;
goto exit_unlock;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
if (transport == KNET_TRANSPORT_LOOPBACK && knet_h->host_id == host_id) {
for (i=0; i<KNET_MAX_LINK; i++) {
if (host->link[i].configured) {
log_err(knet_h, KNET_SUB_LINK, "Cannot add loopback link when other links are already configured.");
err = -1;
savederrno = EINVAL;
goto exit_unlock;
}
}
}
link = &host->link[link_id];
if (link->configured != 0) {
err =-1;
savederrno = EBUSY;
log_err(knet_h, KNET_SUB_LINK, "Host %u link %u is currently configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
if (link->status.enabled != 0) {
err =-1;
savederrno = EBUSY;
log_err(knet_h, KNET_SUB_LINK, "Host %u link %u is currently in use: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
memmove(&link->src_addr, src_addr, sizeof(struct sockaddr_storage));
err = knet_addrtostr(src_addr, sizeof(struct sockaddr_storage),
link->status.src_ipaddr, KNET_MAX_HOST_LEN,
link->status.src_port, KNET_MAX_PORT_LEN);
if (err) {
if (err == EAI_SYSTEM) {
savederrno = errno;
log_warn(knet_h, KNET_SUB_LINK,
"Unable to resolve host: %u link: %u source addr/port: %s",
host_id, link_id, strerror(savederrno));
} else {
savederrno = EINVAL;
log_warn(knet_h, KNET_SUB_LINK,
"Unable to resolve host: %u link: %u source addr/port: %s",
host_id, link_id, gai_strerror(err));
}
err = -1;
goto exit_unlock;
}
if (!dst_addr) {
link->dynamic = KNET_LINK_DYNIP;
} else {
link->dynamic = KNET_LINK_STATIC;
memmove(&link->dst_addr, dst_addr, sizeof(struct sockaddr_storage));
err = knet_addrtostr(dst_addr, sizeof(struct sockaddr_storage),
link->status.dst_ipaddr, KNET_MAX_HOST_LEN,
link->status.dst_port, KNET_MAX_PORT_LEN);
if (err) {
if (err == EAI_SYSTEM) {
savederrno = errno;
log_warn(knet_h, KNET_SUB_LINK,
"Unable to resolve host: %u link: %u destination addr/port: %s",
host_id, link_id, strerror(savederrno));
} else {
savederrno = EINVAL;
log_warn(knet_h, KNET_SUB_LINK,
"Unable to resolve host: %u link: %u destination addr/port: %s",
host_id, link_id, gai_strerror(err));
}
err = -1;
goto exit_unlock;
}
}
link->pong_count = KNET_LINK_DEFAULT_PONG_COUNT;
link->has_valid_mtu = 0;
link->ping_interval = KNET_LINK_DEFAULT_PING_INTERVAL * 1000; /* microseconds */
link->pong_timeout = KNET_LINK_DEFAULT_PING_TIMEOUT * 1000; /* microseconds */
link->pong_timeout_backoff = KNET_LINK_PONG_TIMEOUT_BACKOFF;
link->pong_timeout_adj = link->pong_timeout * link->pong_timeout_backoff; /* microseconds */
link->latency_fix = KNET_LINK_DEFAULT_PING_PRECISION;
link->latency_exp = KNET_LINK_DEFAULT_PING_PRECISION - \
((link->ping_interval * KNET_LINK_DEFAULT_PING_PRECISION) / 8000000);
link->flags = flags;
if (transport_link_set_config(knet_h, link, transport) < 0) {
savederrno = errno;
err = -1;
goto exit_unlock;
}
link->configured = 1;
log_debug(knet_h, KNET_SUB_LINK, "host: %u link: %u is configured",
host_id, link_id);
if (transport == KNET_TRANSPORT_LOOPBACK) {
knet_h->has_loop_link = 1;
knet_h->loop_link = link_id;
host->status.reachable = 1;
link->status.mtu = KNET_PMTUD_SIZE_V6;
} else {
link->status.mtu = KNET_PMTUD_MIN_MTU_V4 - KNET_HEADER_ALL_SIZE - knet_h->sec_header_size;
link->has_valid_mtu = 1;
}
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
- errno = savederrno;
+ errno = err ? savederrno : 0;
return err;
}
int knet_link_get_config(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
uint8_t *transport,
struct sockaddr_storage *src_addr,
struct sockaddr_storage *dst_addr,
uint8_t *dynamic,
uint64_t *flags)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!src_addr) {
errno = EINVAL;
return -1;
}
if (!dynamic) {
errno = EINVAL;
return -1;
}
if (!transport) {
errno = EINVAL;
return -1;
}
if (!flags) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
if ((link->dynamic == KNET_LINK_STATIC) && (!dst_addr)) {
savederrno = EINVAL;
err = -1;
goto exit_unlock;
}
memmove(src_addr, &link->src_addr, sizeof(struct sockaddr_storage));
*transport = link->transport_type;
*flags = link->flags;
if (link->dynamic == KNET_LINK_STATIC) {
*dynamic = 0;
memmove(dst_addr, &link->dst_addr, sizeof(struct sockaddr_storage));
} else {
*dynamic = 1;
}
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
- errno = savederrno;
+ errno = err ? savederrno : 0;
return err;
}
int knet_link_clear_config(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (link->configured != 1) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
if (link->status.enabled != 0) {
err = -1;
savederrno = EBUSY;
log_err(knet_h, KNET_SUB_LINK, "Host %u link %u is currently in use: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
if ((transport_link_clear_config(knet_h, link) < 0) &&
(errno != EBUSY)) {
savederrno = errno;
err = -1;
goto exit_unlock;
}
memset(link, 0, sizeof(struct knet_link));
link->link_id = link_id;
if (knet_h->has_loop_link && host_id == knet_h->host_id && link_id == knet_h->loop_link) {
knet_h->has_loop_link = 0;
if (host->active_link_entries == 0) {
host->status.reachable = 0;
}
}
log_debug(knet_h, KNET_SUB_LINK, "host: %u link: %u config has been wiped",
host_id, link_id);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
- errno = savederrno;
+ errno = err ? savederrno : 0;
return err;
}
int knet_link_set_enable(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
unsigned int enabled)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (enabled > 1) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
if (link->status.enabled == enabled) {
err = 0;
goto exit_unlock;
}
err = _link_updown(knet_h, host_id, link_id, enabled, link->status.connected);
savederrno = errno;
if (enabled) {
goto exit_unlock;
}
log_debug(knet_h, KNET_SUB_LINK, "host: %u link: %u is disabled",
host_id, link_id);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
- errno = savederrno;
+ errno = err ? savederrno : 0;
return err;
}
int knet_link_get_enable(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
unsigned int *enabled)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!enabled) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
*enabled = link->status.enabled;
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
- errno = savederrno;
+ errno = err ? savederrno : 0;
return err;
}
int knet_link_set_pong_count(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
uint8_t pong_count)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (pong_count < 1) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
link->pong_count = pong_count;
log_debug(knet_h, KNET_SUB_LINK,
"host: %u link: %u pong count update: %u",
host_id, link_id, link->pong_count);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
- errno = savederrno;
+ errno = err ? savederrno : 0;
return err;
}
int knet_link_get_pong_count(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
uint8_t *pong_count)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!pong_count) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
*pong_count = link->pong_count;
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
- errno = savederrno;
+ errno = err ? savederrno : 0;
return err;
}
int knet_link_set_ping_timers(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
time_t interval, time_t timeout, unsigned int precision)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!interval) {
errno = EINVAL;
return -1;
}
if (!timeout) {
errno = ENOSYS;
return -1;
}
if (!precision) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
link->ping_interval = interval * 1000; /* microseconds */
link->pong_timeout = timeout * 1000; /* microseconds */
link->latency_fix = precision;
link->latency_exp = precision - \
((link->ping_interval * precision) / 8000000);
log_debug(knet_h, KNET_SUB_LINK,
"host: %u link: %u timeout update - interval: %llu timeout: %llu precision: %u",
host_id, link_id, link->ping_interval, link->pong_timeout, precision);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
- errno = savederrno;
+ errno = err ? savederrno : 0;
return err;
}
int knet_link_get_ping_timers(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
time_t *interval, time_t *timeout, unsigned int *precision)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!interval) {
errno = EINVAL;
return -1;
}
if (!timeout) {
errno = EINVAL;
return -1;
}
if (!precision) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
*interval = link->ping_interval / 1000; /* microseconds */
*timeout = link->pong_timeout / 1000;
*precision = link->latency_fix;
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
- errno = savederrno;
+ errno = err ? savederrno : 0;
return err;
}
int knet_link_set_priority(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
uint8_t priority)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
uint8_t old_priority;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
old_priority = link->priority;
if (link->priority == priority) {
err = 0;
goto exit_unlock;
}
link->priority = priority;
if (_host_dstcache_update_sync(knet_h, host)) {
savederrno = errno;
log_debug(knet_h, KNET_SUB_LINK,
"Unable to update link priority (host: %u link: %u priority: %u): %s",
host_id, link_id, link->priority, strerror(savederrno));
link->priority = old_priority;
err = -1;
goto exit_unlock;
}
log_debug(knet_h, KNET_SUB_LINK,
"host: %u link: %u priority set to: %u",
host_id, link_id, link->priority);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
- errno = savederrno;
+ errno = err ? savederrno : 0;
return err;
}
int knet_link_get_priority(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
uint8_t *priority)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!priority) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
*priority = link->priority;
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
- errno = savederrno;
+ errno = err ? savederrno : 0;
return err;
}
int knet_link_get_link_list(knet_handle_t knet_h, knet_node_id_t host_id,
uint8_t *link_ids, size_t *link_ids_entries)
{
int savederrno = 0, err = 0, i, count = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!link_ids) {
errno = EINVAL;
return -1;
}
if (!link_ids_entries) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
for (i = 0; i < KNET_MAX_LINK; i++) {
link = &host->link[i];
if (!link->configured) {
continue;
}
link_ids[count] = i;
count++;
}
*link_ids_entries = count;
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
- errno = savederrno;
+ errno = err ? savederrno : 0;
return err;
}
int knet_link_get_status(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
struct knet_link_status *status, size_t struct_size)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!status) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
memmove(status, &link->status, struct_size);
/* Calculate totals - no point in doing this on-the-fly */
status->stats.rx_total_packets =
status->stats.rx_data_packets +
status->stats.rx_ping_packets +
status->stats.rx_pong_packets +
status->stats.rx_pmtu_packets;
status->stats.tx_total_packets =
status->stats.tx_data_packets +
status->stats.tx_ping_packets +
status->stats.tx_pong_packets +
status->stats.tx_pmtu_packets;
status->stats.rx_total_bytes =
status->stats.rx_data_bytes +
status->stats.rx_ping_bytes +
status->stats.rx_pong_bytes +
status->stats.rx_pmtu_bytes;
status->stats.tx_total_bytes =
status->stats.tx_data_bytes +
status->stats.tx_ping_bytes +
status->stats.tx_pong_bytes +
status->stats.tx_pmtu_bytes;
status->stats.tx_total_errors =
status->stats.tx_data_errors +
status->stats.tx_ping_errors +
status->stats.tx_pong_errors +
status->stats.tx_pmtu_errors;
status->stats.tx_total_retries =
status->stats.tx_data_retries +
status->stats.tx_ping_retries +
status->stats.tx_pong_retries +
status->stats.tx_pmtu_retries;
/* Tell the caller our full size in case they have an old version */
status->size = sizeof(struct knet_link_status);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
- errno = savederrno;
+ errno = err ? savederrno : 0;
return err;
}
diff --git a/libknet/logging.c b/libknet/logging.c
index 543269a5..14c6eff5 100644
--- a/libknet/logging.c
+++ b/libknet/logging.c
@@ -1,241 +1,247 @@
/*
* Copyright (C) 2010-2018 Red Hat, Inc. All rights reserved.
*
* Author: Fabio M. Di Nitto <fabbione@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <strings.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <stdarg.h>
#include <errno.h>
#include <stdio.h>
#include "internals.h"
#include "logging.h"
#include "threads_common.h"
static struct pretty_names subsystem_names[] =
{
{ "common", KNET_SUB_COMMON },
{ "handle", KNET_SUB_HANDLE },
{ "host", KNET_SUB_HOST },
{ "listener", KNET_SUB_LISTENER },
{ "link", KNET_SUB_LINK },
{ "transport", KNET_SUB_TRANSPORT },
{ "crypto", KNET_SUB_CRYPTO },
{ "compress", KNET_SUB_COMPRESS },
{ "filter", KNET_SUB_FILTER },
{ "dstcache", KNET_SUB_DSTCACHE },
{ "heartbeat", KNET_SUB_HEARTBEAT },
{ "pmtud", KNET_SUB_PMTUD },
{ "tx", KNET_SUB_TX },
{ "rx", KNET_SUB_RX },
{ "loopback", KNET_SUB_TRANSP_LOOPBACK },
{ "udp", KNET_SUB_TRANSP_UDP },
{ "sctp", KNET_SUB_TRANSP_SCTP },
{ "nsscrypto", KNET_SUB_NSSCRYPTO },
{ "opensslcrypto", KNET_SUB_OPENSSLCRYPTO },
{ "zlibcomp", KNET_SUB_ZLIBCOMP },
{ "lz4comp", KNET_SUB_LZ4COMP },
{ "lz4hccomp", KNET_SUB_LZ4HCCOMP },
{ "lzo2comp", KNET_SUB_LZO2COMP },
{ "lzmacomp", KNET_SUB_LZMACOMP },
{ "bzip2comp", KNET_SUB_BZIP2COMP },
{ "unknown", KNET_SUB_UNKNOWN } /* unknown MUST always be last in this array */
};
const char *knet_log_get_subsystem_name(uint8_t subsystem)
{
unsigned int i;
for (i = 0; i < KNET_MAX_SUBSYSTEMS; i++) {
if (subsystem_names[i].val == KNET_SUB_UNKNOWN) {
break;
}
if (subsystem_names[i].val == subsystem) {
+ errno = 0;
return subsystem_names[i].name;
}
}
return "unknown";
}
uint8_t knet_log_get_subsystem_id(const char *name)
{
unsigned int i;
for (i = 0; i < KNET_MAX_SUBSYSTEMS; i++) {
if (subsystem_names[i].val == KNET_SUB_UNKNOWN) {
break;
}
if (strcasecmp(name, subsystem_names[i].name) == 0) {
+ errno = 0;
return subsystem_names[i].val;
}
}
return KNET_SUB_UNKNOWN;
}
static int is_valid_subsystem(uint8_t subsystem)
{
unsigned int i;
for (i = 0; i < KNET_MAX_SUBSYSTEMS; i++) {
if ((subsystem != KNET_SUB_UNKNOWN) &&
(subsystem_names[i].val == KNET_SUB_UNKNOWN)) {
break;
}
if (subsystem_names[i].val == subsystem) {
return 0;
}
}
return -1;
}
static struct pretty_names loglevel_names[] =
{
{ "ERROR", KNET_LOG_ERR },
{ "WARNING", KNET_LOG_WARN },
{ "info", KNET_LOG_INFO },
{ "debug", KNET_LOG_DEBUG }
};
const char *knet_log_get_loglevel_name(uint8_t level)
{
unsigned int i;
for (i = 0; i <= KNET_LOG_DEBUG; i++) {
if (loglevel_names[i].val == level) {
+ errno = 0;
return loglevel_names[i].name;
}
}
return "ERROR";
}
uint8_t knet_log_get_loglevel_id(const char *name)
{
unsigned int i;
for (i = 0; i <= KNET_LOG_DEBUG; i++) {
if (strcasecmp(name, loglevel_names[i].name) == 0) {
+ errno = 0;
return loglevel_names[i].val;
}
}
return KNET_LOG_ERR;
}
int knet_log_set_loglevel(knet_handle_t knet_h, uint8_t subsystem,
uint8_t level)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (is_valid_subsystem(subsystem) < 0) {
errno = EINVAL;
return -1;
}
if (level > KNET_LOG_DEBUG) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, subsystem, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->log_levels[subsystem] = level;
pthread_rwlock_unlock(&knet_h->global_rwlock);
+ errno = 0;
return 0;
}
int knet_log_get_loglevel(knet_handle_t knet_h, uint8_t subsystem,
uint8_t *level)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (is_valid_subsystem(subsystem) < 0) {
errno = EINVAL;
return -1;
}
if (!level) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, subsystem, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
*level = knet_h->log_levels[subsystem];
pthread_rwlock_unlock(&knet_h->global_rwlock);
+ errno = 0;
return 0;
}
void log_msg(knet_handle_t knet_h, uint8_t subsystem, uint8_t msglevel,
const char *fmt, ...)
{
va_list ap;
struct knet_log_msg msg;
size_t byte_cnt = 0;
int len;
if ((!knet_h) ||
(subsystem == KNET_MAX_SUBSYSTEMS) ||
(msglevel > knet_h->log_levels[subsystem]))
return;
if (knet_h->logfd <= 0)
goto out;
memset(&msg, 0, sizeof(struct knet_log_msg));
msg.subsystem = subsystem;
msg.msglevel = msglevel;
va_start(ap, fmt);
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wformat-nonliteral"
#endif
vsnprintf(msg.msg, sizeof(msg.msg), fmt, ap);
#ifdef __clang__
#pragma clang diagnostic pop
#endif
va_end(ap);
while (byte_cnt < sizeof(struct knet_log_msg)) {
len = write(knet_h->logfd, &msg, sizeof(struct knet_log_msg) - byte_cnt);
if (len <= 0) {
goto out;
}
byte_cnt += len;
}
out:
return;
}
diff --git a/libknet/netutils.c b/libknet/netutils.c
index 69c704a9..f30054ba 100644
--- a/libknet/netutils.c
+++ b/libknet/netutils.c
@@ -1,167 +1,176 @@
/*
* Copyright (C) 2010-2018 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <errno.h>
#include <string.h>
#include "internals.h"
#include "netutils.h"
static int is_v4_mapped(const struct sockaddr_storage *ss, socklen_t salen)
{
char map[] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff };
struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *) ss;
return memcmp(&addr6->sin6_addr, map, 12);
}
int cmpaddr(const struct sockaddr_storage *ss1, socklen_t sslen1,
const struct sockaddr_storage *ss2, socklen_t sslen2)
{
int ss1_offset = 0, ss2_offset = 0;
struct sockaddr_in6 *ss1_addr6 = (struct sockaddr_in6 *)ss1;
struct sockaddr_in6 *ss2_addr6 = (struct sockaddr_in6 *)ss2;
struct sockaddr_in *ss1_addr = (struct sockaddr_in *)ss1;
struct sockaddr_in *ss2_addr = (struct sockaddr_in *)ss2;
char *addr1, *addr2;
if (ss1->ss_family == ss2->ss_family) {
return memcmp(ss1, ss2, sslen1);
}
if (ss1->ss_family == AF_INET6) {
if (is_v4_mapped(ss1, sslen1)) {
return 1;
}
addr1 = (char *)&ss1_addr6->sin6_addr;
ss1_offset = 12;
} else {
addr1 = (char *)&ss1_addr->sin_addr;
}
if (ss2->ss_family == AF_INET6) {
if (is_v4_mapped(ss2, sslen2)) {
return 1;
}
addr2 = (char *)&ss2_addr6->sin6_addr;
ss2_offset = 12;
} else {
addr2 = (char *)&ss2_addr->sin_addr;
}
return memcmp(addr1+ss1_offset, addr2+ss2_offset, 4);
}
int cpyaddrport(struct sockaddr_storage *dst, const struct sockaddr_storage *src)
{
struct sockaddr_in6 *dst_addr6 = (struct sockaddr_in6 *)dst;
struct sockaddr_in6 *src_addr6 = (struct sockaddr_in6 *)src;
memset(dst, 0, sizeof(struct sockaddr_storage));
if (src->ss_family == AF_INET6) {
dst->ss_family = src->ss_family;
memmove(&dst_addr6->sin6_port, &src_addr6->sin6_port, sizeof(in_port_t));
memmove(&dst_addr6->sin6_addr, &src_addr6->sin6_addr, sizeof(struct in6_addr));
} else {
memmove(dst, src, sizeof(struct sockaddr_in));
}
return 0;
}
socklen_t sockaddr_len(const struct sockaddr_storage *ss)
{
if (ss->ss_family == AF_INET) {
return sizeof(struct sockaddr_in);
} else {
return sizeof(struct sockaddr_in6);
}
}
/*
* exported APIs
*/
int knet_strtoaddr(const char *host, const char *port, struct sockaddr_storage *ss, socklen_t sslen)
{
int err;
struct addrinfo hints;
struct addrinfo *result = NULL;
if (!host) {
errno = EINVAL;
return -1;
}
if (!port) {
errno = EINVAL;
return -1;
}
if (!ss) {
errno = EINVAL;
return -1;
}
if (!sslen) {
errno = EINVAL;
return -1;
}
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_DGRAM;
hints.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV;
err = getaddrinfo(host, port, &hints, &result);
if (!err) {
memmove(ss, result->ai_addr,
(sslen < result->ai_addrlen) ? sslen : result->ai_addrlen);
freeaddrinfo(result);
}
+ if (!err)
+ errno = 0;
return err;
}
int knet_addrtostr(const struct sockaddr_storage *ss, socklen_t sslen,
char *addr_buf, size_t addr_buf_size,
char *port_buf, size_t port_buf_size)
{
+ int err;
+
if (!ss) {
errno = EINVAL;
return -1;
}
if (!sslen) {
errno = EINVAL;
return -1;
}
if (!addr_buf) {
errno = EINVAL;
return -1;
}
if (!port_buf) {
errno = EINVAL;
return -1;
}
- return getnameinfo((struct sockaddr *)ss, sockaddr_len(ss), addr_buf, addr_buf_size,
- port_buf, port_buf_size,
- NI_NUMERICHOST | NI_NUMERICSERV);
+ err = getnameinfo((struct sockaddr *)ss, sockaddr_len(ss),
+ addr_buf, addr_buf_size,
+ port_buf, port_buf_size,
+ NI_NUMERICHOST | NI_NUMERICSERV);
+
+ if (!err)
+ errno = 0;
+ return err;
}
diff --git a/libknet/threads_tx.c b/libknet/threads_tx.c
index 81082f43..3ab075ca 100644
--- a/libknet/threads_tx.c
+++ b/libknet/threads_tx.c
@@ -1,751 +1,751 @@
/*
* Copyright (C) 2012-2018 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <math.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <sys/uio.h>
#include <errno.h>
#include "compat.h"
#include "compress.h"
#include "crypto.h"
#include "host.h"
#include "link.h"
#include "logging.h"
#include "transports.h"
#include "transport_common.h"
#include "threads_common.h"
#include "threads_heartbeat.h"
#include "threads_tx.h"
#include "netutils.h"
/*
* SEND
*/
static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_mmsghdr *msg, int msgs_to_send)
{
int link_idx, msg_idx, sent_msgs, prev_sent, progress;
int err = 0, savederrno = 0;
unsigned int i;
struct knet_mmsghdr *cur;
struct knet_link *cur_link;
for (link_idx = 0; link_idx < dst_host->active_link_entries; link_idx++) {
sent_msgs = 0;
prev_sent = 0;
progress = 1;
cur_link = &dst_host->link[dst_host->active_links[link_idx]];
if (cur_link->transport_type == KNET_TRANSPORT_LOOPBACK) {
continue;
}
msg_idx = 0;
while (msg_idx < msgs_to_send) {
msg[msg_idx].msg_hdr.msg_name = &cur_link->dst_addr;
/* Cast for Linux/BSD compatibility */
for (i=0; i<(unsigned int)msg[msg_idx].msg_hdr.msg_iovlen; i++) {
cur_link->status.stats.tx_data_bytes += msg[msg_idx].msg_hdr.msg_iov[i].iov_len;
}
cur_link->status.stats.tx_data_packets++;
msg_idx++;
}
retry:
cur = &msg[prev_sent];
sent_msgs = _sendmmsg(dst_host->link[dst_host->active_links[link_idx]].outsock,
&cur[0], msgs_to_send - prev_sent, MSG_DONTWAIT | MSG_NOSIGNAL);
savederrno = errno;
err = transport_tx_sock_error(knet_h, dst_host->link[dst_host->active_links[link_idx]].transport_type, dst_host->link[dst_host->active_links[link_idx]].outsock, sent_msgs, savederrno);
switch(err) {
case -1: /* unrecoverable error */
cur_link->status.stats.tx_data_errors++;
goto out_unlock;
break;
case 0: /* ignore error and continue */
break;
case 1: /* retry to send those same data */
cur_link->status.stats.tx_data_retries++;
goto retry;
break;
}
prev_sent = prev_sent + sent_msgs;
if ((sent_msgs >= 0) && (prev_sent < msgs_to_send)) {
if ((sent_msgs) || (progress)) {
if (sent_msgs) {
progress = 1;
} else {
progress = 0;
}
#ifdef DEBUG
log_debug(knet_h, KNET_SUB_TX, "Unable to send all (%d/%d) data packets to host %s (%u) link %s:%s (%u)",
sent_msgs, msg_idx,
dst_host->name, dst_host->host_id,
dst_host->link[dst_host->active_links[link_idx]].status.dst_ipaddr,
dst_host->link[dst_host->active_links[link_idx]].status.dst_port,
dst_host->link[dst_host->active_links[link_idx]].link_id);
#endif
goto retry;
}
if (!progress) {
savederrno = EAGAIN;
err = -1;
goto out_unlock;
}
}
if ((dst_host->link_handler_policy == KNET_LINK_POLICY_RR) &&
(dst_host->active_link_entries > 1)) {
uint8_t cur_link_id = dst_host->active_links[0];
memmove(&dst_host->active_links[0], &dst_host->active_links[1], KNET_MAX_LINK - 1);
dst_host->active_links[dst_host->active_link_entries - 1] = cur_link_id;
break;
}
}
out_unlock:
errno = savederrno;
return err;
}
static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t channel, int is_sync)
{
size_t outlen, frag_len;
struct knet_host *dst_host;
knet_node_id_t dst_host_ids_temp[KNET_MAX_HOST];
size_t dst_host_ids_entries_temp = 0;
knet_node_id_t dst_host_ids[KNET_MAX_HOST];
size_t dst_host_ids_entries = 0;
int bcast = 1;
struct knet_hostinfo *knet_hostinfo;
struct iovec iov_out[PCKT_FRAG_MAX][2];
int iovcnt_out = 2;
uint8_t frag_idx;
unsigned int temp_data_mtu;
size_t host_idx;
int send_mcast = 0;
struct knet_header *inbuf;
int savederrno = 0;
int err = 0;
seq_num_t tx_seq_num;
struct knet_mmsghdr msg[PCKT_FRAG_MAX];
int msgs_to_send, msg_idx;
unsigned int i;
int j;
int send_local = 0;
int data_compressed = 0;
size_t uncrypted_frag_size;
inbuf = knet_h->recv_from_sock_buf;
if ((knet_h->enabled != 1) &&
(inbuf->kh_type != KNET_HEADER_TYPE_HOST_INFO)) { /* data forward is disabled */
log_debug(knet_h, KNET_SUB_TX, "Received data packet but forwarding is disabled");
savederrno = ECANCELED;
err = -1;
goto out_unlock;
}
/*
* move this into a separate function to expand on
* extra switching rules
*/
switch(inbuf->kh_type) {
case KNET_HEADER_TYPE_DATA:
if (knet_h->dst_host_filter_fn) {
bcast = knet_h->dst_host_filter_fn(
knet_h->dst_host_filter_fn_private_data,
(const unsigned char *)inbuf->khp_data_userdata,
inlen,
KNET_NOTIFY_TX,
knet_h->host_id,
knet_h->host_id,
&channel,
dst_host_ids_temp,
&dst_host_ids_entries_temp);
if (bcast < 0) {
log_debug(knet_h, KNET_SUB_TX, "Error from dst_host_filter_fn: %d", bcast);
savederrno = EFAULT;
err = -1;
goto out_unlock;
}
if ((!bcast) && (!dst_host_ids_entries_temp)) {
log_debug(knet_h, KNET_SUB_TX, "Message is unicast but no dst_host_ids_entries");
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
if ((!bcast) &&
(dst_host_ids_entries_temp > KNET_MAX_HOST)) {
log_debug(knet_h, KNET_SUB_TX, "dst_host_filter_fn returned too many destinations");
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
}
/* Send to localhost if appropriate and enabled */
if (knet_h->has_loop_link) {
send_local = 0;
if (bcast) {
send_local = 1;
} else {
for (i=0; i< dst_host_ids_entries_temp; i++) {
if (dst_host_ids_temp[i] == knet_h->host_id) {
send_local = 1;
}
}
}
if (send_local) {
const unsigned char *buf = inbuf->khp_data_userdata;
ssize_t buflen = inlen;
struct knet_link *local_link;
local_link = knet_h->host_index[knet_h->host_id]->link;
local_retry:
err = write(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], buf, buflen);
if (err < 0) {
log_err(knet_h, KNET_SUB_TRANSP_LOOPBACK, "send local failed. error=%s\n", strerror(errno));
local_link->status.stats.tx_data_errors++;
}
if (err > 0 && err < buflen) {
log_debug(knet_h, KNET_SUB_TRANSP_LOOPBACK, "send local incomplete=%d bytes of %zu\n", err, inlen);
local_link->status.stats.tx_data_retries++;
buf += err;
buflen -= err;
usleep(KNET_THREADS_TIMERES / 16);
goto local_retry;
}
if (err == buflen) {
local_link->status.stats.tx_data_packets++;
local_link->status.stats.tx_data_bytes += inlen;
}
}
}
break;
case KNET_HEADER_TYPE_HOST_INFO:
knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata;
if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) {
bcast = 0;
dst_host_ids_temp[0] = knet_hostinfo->khi_dst_node_id;
dst_host_ids_entries_temp = 1;
knet_hostinfo->khi_dst_node_id = htons(knet_hostinfo->khi_dst_node_id);
}
break;
default:
log_warn(knet_h, KNET_SUB_TX, "Receiving unknown messages from socket");
savederrno = ENOMSG;
err = -1;
goto out_unlock;
break;
}
if (is_sync) {
if ((bcast) ||
((!bcast) && (dst_host_ids_entries_temp > 1))) {
log_debug(knet_h, KNET_SUB_TX, "knet_send_sync is only supported with unicast packets for one destination");
savederrno = E2BIG;
err = -1;
goto out_unlock;
}
}
/*
* check destinations hosts before spending time
* in fragmenting/encrypting packets to save
* time processing data for unreachable hosts.
* for unicast, also remap the destination data
* to skip unreachable hosts.
*/
if (!bcast) {
dst_host_ids_entries = 0;
for (host_idx = 0; host_idx < dst_host_ids_entries_temp; host_idx++) {
dst_host = knet_h->host_index[dst_host_ids_temp[host_idx]];
if (!dst_host) {
continue;
}
if (!(dst_host->host_id == knet_h->host_id &&
knet_h->has_loop_link) &&
dst_host->status.reachable) {
dst_host_ids[dst_host_ids_entries] = dst_host_ids_temp[host_idx];
dst_host_ids_entries++;
}
}
if (!dst_host_ids_entries) {
savederrno = EHOSTDOWN;
err = -1;
goto out_unlock;
}
} else {
send_mcast = 0;
for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
if (!(dst_host->host_id == knet_h->host_id &&
knet_h->has_loop_link) &&
dst_host->status.reachable) {
send_mcast = 1;
break;
}
}
if (!send_mcast) {
savederrno = EHOSTDOWN;
err = -1;
goto out_unlock;
}
}
if (!knet_h->data_mtu) {
/*
* using MIN_MTU_V4 for data mtu is not completely accurate but safe enough
*/
log_debug(knet_h, KNET_SUB_TX,
"Received data packet but data MTU is still unknown."
" Packet might not be delivered."
" Assuming minimum IPv4 MTU (%d)",
KNET_PMTUD_MIN_MTU_V4);
temp_data_mtu = KNET_PMTUD_MIN_MTU_V4;
} else {
/*
* take a copy of the mtu to avoid value changing under
* our feet while we are sending a fragmented pckt
*/
temp_data_mtu = knet_h->data_mtu;
}
/*
* compress data
*/
if ((knet_h->compress_model > 0) && (inlen > knet_h->compress_threshold)) {
size_t cmp_outlen = KNET_DATABUFSIZE_COMPRESS;
struct timespec start_time;
struct timespec end_time;
uint64_t compress_time;
clock_gettime(CLOCK_MONOTONIC, &start_time);
err = compress(knet_h,
(const unsigned char *)inbuf->khp_data_userdata, inlen,
knet_h->send_to_links_buf_compress, (ssize_t *)&cmp_outlen);
if (err < 0) {
knet_h->stats.tx_failed_to_compress++;
log_warn(knet_h, KNET_SUB_COMPRESS, "Compression failed (%d): %s", err, strerror(errno));
} else {
/* Collect stats */
clock_gettime(CLOCK_MONOTONIC, &end_time);
timespec_diff(start_time, end_time, &compress_time);
if (compress_time < knet_h->stats.tx_compress_time_min) {
knet_h->stats.tx_compress_time_min = compress_time;
}
if (compress_time > knet_h->stats.tx_compress_time_max) {
knet_h->stats.tx_compress_time_max = compress_time;
}
knet_h->stats.tx_compress_time_ave =
(unsigned long long)(knet_h->stats.tx_compress_time_ave * knet_h->stats.tx_compressed_packets +
compress_time) / (knet_h->stats.tx_compressed_packets+1);
knet_h->stats.tx_compressed_packets++;
knet_h->stats.tx_compressed_original_bytes += inlen;
knet_h->stats.tx_compressed_size_bytes += cmp_outlen;
if (cmp_outlen < inlen) {
memmove(inbuf->khp_data_userdata, knet_h->send_to_links_buf_compress, cmp_outlen);
inlen = cmp_outlen;
data_compressed = 1;
} else {
knet_h->stats.tx_unable_to_compress++;
}
}
}
if (knet_h->compress_model > 0 && !data_compressed) {
knet_h->stats.tx_uncompressed_packets++;
}
/*
* prepare the outgoing buffers
*/
frag_len = inlen;
frag_idx = 0;
inbuf->khp_data_bcast = bcast;
inbuf->khp_data_frag_num = ceil((float)inlen / temp_data_mtu);
inbuf->khp_data_channel = channel;
if (data_compressed) {
inbuf->khp_data_compress = knet_h->compress_model;
} else {
inbuf->khp_data_compress = 0;
}
if (pthread_mutex_lock(&knet_h->tx_seq_num_mutex)) {
log_debug(knet_h, KNET_SUB_TX, "Unable to get seq mutex lock");
goto out_unlock;
}
knet_h->tx_seq_num++;
/*
* force seq_num 0 to detect a node that has crashed and rejoining
* the knet instance. seq_num 0 will clear the buffers in the RX
* thread
*/
if (knet_h->tx_seq_num == 0) {
knet_h->tx_seq_num++;
}
/*
* cache the value in locked context
*/
tx_seq_num = knet_h->tx_seq_num;
inbuf->khp_data_seq_num = htons(knet_h->tx_seq_num);
pthread_mutex_unlock(&knet_h->tx_seq_num_mutex);
/*
* forcefully broadcast a ping to all nodes every SEQ_MAX / 8
* pckts.
* this solves 2 problems:
* 1) on TX socket overloads we generate extra pings to keep links alive
* 2) in 3+ nodes setup, where all the traffic is flowing between node 1 and 2,
* node 3+ will be able to keep in sync on the TX seq_num even without
* receiving traffic or pings in betweens. This avoids issues with
* rollover of the circular buffer
*/
if (tx_seq_num % (SEQ_MAX / 8) == 0) {
_send_pings(knet_h, 0);
}
if (inbuf->khp_data_frag_num > 1) {
while (frag_idx < inbuf->khp_data_frag_num) {
/*
* set the iov_base
*/
iov_out[frag_idx][0].iov_base = (void *)knet_h->send_to_links_buf[frag_idx];
iov_out[frag_idx][0].iov_len = KNET_HEADER_DATA_SIZE;
iov_out[frag_idx][1].iov_base = inbuf->khp_data_userdata + (temp_data_mtu * frag_idx);
/*
* set the len
*/
if (frag_len > temp_data_mtu) {
iov_out[frag_idx][1].iov_len = temp_data_mtu;
} else {
iov_out[frag_idx][1].iov_len = frag_len;
}
/*
* copy the frag info on all buffers
*/
knet_h->send_to_links_buf[frag_idx]->kh_type = inbuf->kh_type;
knet_h->send_to_links_buf[frag_idx]->khp_data_seq_num = inbuf->khp_data_seq_num;
knet_h->send_to_links_buf[frag_idx]->khp_data_frag_num = inbuf->khp_data_frag_num;
knet_h->send_to_links_buf[frag_idx]->khp_data_bcast = inbuf->khp_data_bcast;
knet_h->send_to_links_buf[frag_idx]->khp_data_channel = inbuf->khp_data_channel;
knet_h->send_to_links_buf[frag_idx]->khp_data_compress = inbuf->khp_data_compress;
frag_len = frag_len - temp_data_mtu;
frag_idx++;
}
iovcnt_out = 2;
} else {
iov_out[frag_idx][0].iov_base = (void *)inbuf;
iov_out[frag_idx][0].iov_len = frag_len + KNET_HEADER_DATA_SIZE;
iovcnt_out = 1;
}
if (knet_h->crypto_instance) {
struct timespec start_time;
struct timespec end_time;
uint64_t crypt_time;
frag_idx = 0;
while (frag_idx < inbuf->khp_data_frag_num) {
clock_gettime(CLOCK_MONOTONIC, &start_time);
if (crypto_encrypt_and_signv(
knet_h,
iov_out[frag_idx], iovcnt_out,
knet_h->send_to_links_buf_crypt[frag_idx],
(ssize_t *)&outlen) < 0) {
log_debug(knet_h, KNET_SUB_TX, "Unable to encrypt packet");
savederrno = ECHILD;
err = -1;
goto out_unlock;
}
clock_gettime(CLOCK_MONOTONIC, &end_time);
timespec_diff(start_time, end_time, &crypt_time);
if (crypt_time < knet_h->stats.tx_crypt_time_min) {
knet_h->stats.tx_crypt_time_min = crypt_time;
}
if (crypt_time > knet_h->stats.tx_crypt_time_max) {
knet_h->stats.tx_crypt_time_max = crypt_time;
}
knet_h->stats.tx_crypt_time_ave =
(knet_h->stats.tx_crypt_time_ave * knet_h->stats.tx_crypt_packets +
crypt_time) / (knet_h->stats.tx_crypt_packets+1);
uncrypted_frag_size = 0;
for (j=0; j < iovcnt_out; j++) {
uncrypted_frag_size += iov_out[frag_idx][j].iov_len;
}
knet_h->stats.tx_crypt_byte_overhead += (outlen - uncrypted_frag_size);
knet_h->stats.tx_crypt_packets++;
iov_out[frag_idx][0].iov_base = knet_h->send_to_links_buf_crypt[frag_idx];
iov_out[frag_idx][0].iov_len = outlen;
frag_idx++;
}
iovcnt_out = 1;
}
memset(&msg, 0, sizeof(msg));
msgs_to_send = inbuf->khp_data_frag_num;
msg_idx = 0;
while (msg_idx < msgs_to_send) {
msg[msg_idx].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
msg[msg_idx].msg_hdr.msg_iov = &iov_out[msg_idx][0];
msg[msg_idx].msg_hdr.msg_iovlen = iovcnt_out;
msg_idx++;
}
if (!bcast) {
for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
dst_host = knet_h->host_index[dst_host_ids[host_idx]];
err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send);
savederrno = errno;
if (err) {
goto out_unlock;
}
}
} else {
for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
if (dst_host->status.reachable) {
err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send);
savederrno = errno;
if (err) {
goto out_unlock;
}
}
}
}
out_unlock:
errno = savederrno;
return err;
}
int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel)
{
int savederrno = 0, err = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (buff == NULL) {
errno = EINVAL;
return -1;
}
if (buff_len <= 0) {
errno = EINVAL;
return -1;
}
if (buff_len > KNET_MAX_PACKET_SIZE) {
errno = EINVAL;
return -1;
}
if (channel < 0) {
errno = EINVAL;
return -1;
}
if (channel >= KNET_DATAFD_MAX) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_TX, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->sockfd[channel].in_use) {
savederrno = EINVAL;
err = -1;
goto out;
}
savederrno = pthread_mutex_lock(&knet_h->tx_mutex);
if (savederrno) {
log_err(knet_h, KNET_SUB_TX, "Unable to get TX mutex lock: %s",
strerror(savederrno));
err = -1;
goto out;
}
knet_h->recv_from_sock_buf->kh_type = KNET_HEADER_TYPE_DATA;
memmove(knet_h->recv_from_sock_buf->khp_data_userdata, buff, buff_len);
err = _parse_recv_from_sock(knet_h, buff_len, channel, 1);
savederrno = errno;
pthread_mutex_unlock(&knet_h->tx_mutex);
out:
pthread_rwlock_unlock(&knet_h->global_rwlock);
- errno = savederrno;
+ errno = err ? savederrno : 0;
return err;
}
static void _handle_send_to_links(knet_handle_t knet_h, struct msghdr *msg, int sockfd, int8_t channel, int type)
{
ssize_t inlen = 0;
int savederrno = 0, docallback = 0;
if ((channel >= 0) &&
(channel < KNET_DATAFD_MAX) &&
(!knet_h->sockfd[channel].is_socket)) {
inlen = readv(sockfd, msg->msg_iov, 1);
} else {
inlen = recvmsg(sockfd, msg, MSG_DONTWAIT | MSG_NOSIGNAL);
}
if (inlen == 0) {
savederrno = 0;
docallback = 1;
} else if (inlen < 0) {
struct epoll_event ev;
savederrno = errno;
docallback = 1;
memset(&ev, 0, sizeof(struct epoll_event));
if (epoll_ctl(knet_h->send_to_links_epollfd,
EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) {
log_err(knet_h, KNET_SUB_TX, "Unable to del datafd %d from linkfd epoll pool: %s",
knet_h->sockfd[channel].sockfd[0], strerror(savederrno));
} else {
knet_h->sockfd[channel].has_error = 1;
}
} else {
knet_h->recv_from_sock_buf->kh_type = type;
_parse_recv_from_sock(knet_h, inlen, channel, 0);
}
if (docallback) {
knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
knet_h->sockfd[channel].sockfd[0],
channel,
KNET_NOTIFY_TX,
inlen,
savederrno);
}
}
void *_handle_send_to_links_thread(void *data)
{
knet_handle_t knet_h = (knet_handle_t) data;
struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
int i, nev, type;
int8_t channel;
struct iovec iov_in;
struct msghdr msg;
struct sockaddr_storage address;
set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_RUNNING);
memset(&iov_in, 0, sizeof(iov_in));
iov_in.iov_base = (void *)knet_h->recv_from_sock_buf->khp_data_userdata;
iov_in.iov_len = KNET_MAX_PACKET_SIZE;
memset(&msg, 0, sizeof(struct msghdr));
msg.msg_name = &address;
msg.msg_namelen = sizeof(struct sockaddr_storage);
msg.msg_iov = &iov_in;
msg.msg_iovlen = 1;
knet_h->recv_from_sock_buf->kh_version = KNET_HEADER_VERSION;
knet_h->recv_from_sock_buf->khp_data_frag_seq = 0;
knet_h->recv_from_sock_buf->kh_node = htons(knet_h->host_id);
for (i = 0; i < PCKT_FRAG_MAX; i++) {
knet_h->send_to_links_buf[i]->kh_version = KNET_HEADER_VERSION;
knet_h->send_to_links_buf[i]->khp_data_frag_seq = i + 1;
knet_h->send_to_links_buf[i]->kh_node = htons(knet_h->host_id);
}
while (!shutdown_in_progress(knet_h)) {
nev = epoll_wait(knet_h->send_to_links_epollfd, events, KNET_EPOLL_MAX_EVENTS + 1, KNET_THREADS_TIMERES / 1000);
/*
* we use timeout to detect if thread is shutting down
*/
if (nev == 0) {
continue;
}
if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
log_debug(knet_h, KNET_SUB_TX, "Unable to get read lock");
continue;
}
for (i = 0; i < nev; i++) {
if (events[i].data.fd == knet_h->hostsockfd[0]) {
type = KNET_HEADER_TYPE_HOST_INFO;
channel = -1;
} else {
type = KNET_HEADER_TYPE_DATA;
for (channel = 0; channel < KNET_DATAFD_MAX; channel++) {
if ((knet_h->sockfd[channel].in_use) &&
(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created] == events[i].data.fd)) {
break;
}
}
if (channel >= KNET_DATAFD_MAX) {
log_debug(knet_h, KNET_SUB_TX, "No available channels");
continue; /* channel not found */
}
}
if (pthread_mutex_lock(&knet_h->tx_mutex) != 0) {
log_debug(knet_h, KNET_SUB_TX, "Unable to get mutex lock");
continue;
}
_handle_send_to_links(knet_h, &msg, events[i].data.fd, channel, type);
pthread_mutex_unlock(&knet_h->tx_mutex);
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
}
set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_STOPPED);
return NULL;
}
diff --git a/libknet/transports.c b/libknet/transports.c
index 67ebc6ef..b685212f 100644
--- a/libknet/transports.c
+++ b/libknet/transports.c
@@ -1,268 +1,272 @@
/*
* Copyright (C) 2017-2018 Red Hat, Inc. All rights reserved.
*
* Author: Fabio M. Di Nitto <fabbione@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include "libknet.h"
#include "compat.h"
#include "host.h"
#include "link.h"
#include "logging.h"
#include "common.h"
#include "transports.h"
#include "transport_loopback.h"
#include "transport_udp.h"
#include "transport_sctp.h"
#include "threads_common.h"
#define empty_module 0, 0, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL },
static knet_transport_ops_t transport_modules_cmd[KNET_MAX_TRANSPORTS] = {
{ "LOOPBACK", KNET_TRANSPORT_LOOPBACK, 1, KNET_PMTUD_LOOPBACK_OVERHEAD, loopback_transport_init, loopback_transport_free, loopback_transport_link_set_config, loopback_transport_link_clear_config, loopback_transport_link_dyn_connect, loopback_transport_rx_sock_error, loopback_transport_tx_sock_error, loopback_transport_rx_is_data },
{ "UDP", KNET_TRANSPORT_UDP, 1, KNET_PMTUD_UDP_OVERHEAD, udp_transport_init, udp_transport_free, udp_transport_link_set_config, udp_transport_link_clear_config, udp_transport_link_dyn_connect, udp_transport_rx_sock_error, udp_transport_tx_sock_error, udp_transport_rx_is_data },
{ "SCTP", KNET_TRANSPORT_SCTP,
#ifdef HAVE_NETINET_SCTP_H
1, KNET_PMTUD_SCTP_OVERHEAD, sctp_transport_init, sctp_transport_free, sctp_transport_link_set_config, sctp_transport_link_clear_config, sctp_transport_link_dyn_connect, sctp_transport_rx_sock_error, sctp_transport_tx_sock_error, sctp_transport_rx_is_data },
#else
empty_module
#endif
{ NULL, KNET_MAX_TRANSPORTS, empty_module
};
/*
* transport wrappers
*/
int start_all_transports(knet_handle_t knet_h)
{
int idx = 0, savederrno = 0, err = 0;
while (transport_modules_cmd[idx].transport_name != NULL) {
if (transport_modules_cmd[idx].built_in) {
if (transport_modules_cmd[idx].transport_init(knet_h) < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE,
"Failed to allocate transport handle for %s: %s",
transport_modules_cmd[idx].transport_name,
strerror(savederrno));
err = -1;
goto out;
}
}
idx++;
}
out:
errno = savederrno;
return err;
}
void stop_all_transports(knet_handle_t knet_h)
{
int idx = 0;
while (transport_modules_cmd[idx].transport_name != NULL) {
if (transport_modules_cmd[idx].built_in) {
transport_modules_cmd[idx].transport_free(knet_h);
}
idx++;
}
}
int transport_link_set_config(knet_handle_t knet_h, struct knet_link *kn_link, uint8_t transport)
{
if (!transport_modules_cmd[transport].built_in) {
errno = EINVAL;
return -1;
}
kn_link->transport_connected = 0;
kn_link->transport_type = transport;
kn_link->proto_overhead = transport_modules_cmd[transport].transport_mtu_overhead;
return transport_modules_cmd[transport].transport_link_set_config(knet_h, kn_link);
}
int transport_link_clear_config(knet_handle_t knet_h, struct knet_link *kn_link)
{
return transport_modules_cmd[kn_link->transport_type].transport_link_clear_config(knet_h, kn_link);
}
int transport_link_dyn_connect(knet_handle_t knet_h, int sockfd, struct knet_link *kn_link)
{
return transport_modules_cmd[kn_link->transport_type].transport_link_dyn_connect(knet_h, sockfd, kn_link);
}
int transport_rx_sock_error(knet_handle_t knet_h, uint8_t transport, int sockfd, int recv_err, int recv_errno)
{
return transport_modules_cmd[transport].transport_rx_sock_error(knet_h, sockfd, recv_err, recv_errno);
}
int transport_tx_sock_error(knet_handle_t knet_h, uint8_t transport, int sockfd, int recv_err, int recv_errno)
{
return transport_modules_cmd[transport].transport_tx_sock_error(knet_h, sockfd, recv_err, recv_errno);
}
int transport_rx_is_data(knet_handle_t knet_h, uint8_t transport, int sockfd, struct knet_mmsghdr *msg)
{
return transport_modules_cmd[transport].transport_rx_is_data(knet_h, sockfd, msg);
}
/*
* public api
*/
int knet_get_transport_list(struct knet_transport_info *transport_list,
size_t *transport_list_entries)
{
int err = 0;
int idx = 0;
int outidx = 0;
if (!transport_list_entries) {
errno = EINVAL;
return -1;
}
while (transport_modules_cmd[idx].transport_name != NULL) {
if (transport_modules_cmd[idx].built_in) {
if (transport_list) {
transport_list[outidx].name = transport_modules_cmd[idx].transport_name;
transport_list[outidx].id = transport_modules_cmd[idx].transport_id;
}
outidx++;
}
idx++;
}
*transport_list_entries = outidx;
+ if (!err)
+ errno = 0;
return err;
}
const char *knet_get_transport_name_by_id(uint8_t transport)
{
int savederrno = 0;
const char *name = NULL;
if (transport == KNET_MAX_TRANSPORTS) {
errno = EINVAL;
return name;
}
if ((transport_modules_cmd[transport].transport_name) &&
(transport_modules_cmd[transport].built_in)) {
name = transport_modules_cmd[transport].transport_name;
} else {
savederrno = ENOENT;
}
- errno = savederrno;
+ errno = name ? 0 : savederrno;
return name;
}
uint8_t knet_get_transport_id_by_name(const char *name)
{
int savederrno = 0;
uint8_t err = KNET_MAX_TRANSPORTS;
int i, found;
if (!name) {
errno = EINVAL;
return err;
}
i = 0;
found = 0;
while (transport_modules_cmd[i].transport_name != NULL) {
if (transport_modules_cmd[i].built_in) {
if (!strcmp(transport_modules_cmd[i].transport_name, name)) {
err = transport_modules_cmd[i].transport_id;
found = 1;
break;
}
}
i++;
}
if (!found) {
savederrno = EINVAL;
}
- errno = savederrno;
+ errno = err == KNET_MAX_TRANSPORTS ? savederrno : 0;
return err;
}
int knet_handle_set_transport_reconnect_interval(knet_handle_t knet_h, uint32_t msecs)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!msecs) {
errno = EINVAL;
return -1;
}
if (msecs < 1000) {
log_warn(knet_h, KNET_SUB_HANDLE, "reconnect internval below 1 sec (%u msecs) might be too aggressive", msecs);
}
if (msecs > 60000) {
log_warn(knet_h, KNET_SUB_HANDLE, "reconnect internval above 1 minute (%u msecs) could cause long delays in network convergiance", msecs);
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->reconnect_int = msecs;
pthread_rwlock_unlock(&knet_h->global_rwlock);
+ errno = 0;
return 0;
}
int knet_handle_get_transport_reconnect_interval(knet_handle_t knet_h, uint32_t *msecs)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!msecs) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
*msecs = knet_h->reconnect_int;
pthread_rwlock_unlock(&knet_h->global_rwlock);
+ errno = 0;
return 0;
}

File Metadata

Mime Type
text/x-diff
Expires
Wed, Feb 26, 2:40 PM (8 h, 36 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1465447
Default Alt Text
(139 KB)

Event Timeline