diff --git a/libknet/Makefile.am b/libknet/Makefile.am index 3127642a..dd16fd40 100644 --- a/libknet/Makefile.am +++ b/libknet/Makefile.am @@ -1,166 +1,167 @@ # # Copyright (C) 2010-2020 Red Hat, Inc. All rights reserved. # # Authors: Fabio M. Di Nitto # Federico Simoncelli # # This software licensed under GPL-2.0+ # MAINTAINERCLEANFILES = Makefile.in include $(top_srcdir)/build-aux/check.mk SYMFILE = libknet_exported_syms EXTRA_DIST = $(SYMFILE) SUBDIRS = . tests # https://www.gnu.org/software/libtool/manual/html_node/Updating-version-info.html libversion = 5:0:4 # override global LIBS that pulls in lots of craft we don't need here LIBS = sources = \ common.c \ compat.c \ compress.c \ crypto.c \ handle.c \ + handle_api.c \ host.c \ links.c \ links_acl.c \ links_acl_ip.c \ links_acl_loopback.c \ logging.c \ netutils.c \ onwire.c \ threads_common.c \ threads_dsthandler.c \ threads_heartbeat.c \ threads_pmtud.c \ threads_rx.c \ threads_tx.c \ transports.c \ transport_common.c \ transport_loopback.c \ transport_udp.c \ transport_sctp.c include_HEADERS = libknet.h pkgconfigdir = $(libdir)/pkgconfig pkgconfig_DATA = libknet.pc noinst_HEADERS = \ common.h \ compat.h \ compress.h \ compress_model.h \ crypto.h \ crypto_model.h \ host.h \ internals.h \ links.h \ links_acl.h \ links_acl_ip.h \ links_acl_loopback.h \ logging.h \ netutils.h \ onwire.h \ threads_common.h \ threads_dsthandler.h \ threads_heartbeat.h \ threads_pmtud.h \ threads_rx.h \ threads_tx.h \ transports.h \ transport_common.h \ transport_loopback.h \ transport_udp.h \ transport_sctp.h lib_LTLIBRARIES = libknet.la libknet_la_SOURCES = $(sources) AM_CFLAGS += $(libqb_CFLAGS) libknet_la_CFLAGS = $(AM_CFLAGS) $(PTHREAD_CFLAGS) EXTRA_libknet_la_DEPENDENCIES = $(SYMFILE) libknet_la_LDFLAGS = $(AM_LDFLAGS) \ -Wl,--version-script=$(srcdir)/$(SYMFILE) \ -Wl,-rpath=$(pkglibdir) \ -version-info $(libversion) libknet_la_LIBADD = $(PTHREAD_LIBS) $(dl_LIBS) $(rt_LIBS) $(m_LIBS) # Prepare empty value for appending pkglib_LTLIBRARIES = # MODULE_LDFLAGS would mean a target-specific variable for Automake MODULELDFLAGS = $(AM_LDFLAGS) -module -avoid-version -export-dynamic if BUILD_COMPRESS_ZSTD pkglib_LTLIBRARIES += compress_zstd.la compress_zstd_la_LDFLAGS = $(MODULELDFLAGS) compress_zstd_la_CFLAGS = $(AM_CFLAGS) $(libzstd_CFLAGS) compress_zstd_la_LIBADD = $(libzstd_LIBS) endif if BUILD_COMPRESS_ZLIB pkglib_LTLIBRARIES += compress_zlib.la compress_zlib_la_LDFLAGS = $(MODULELDFLAGS) compress_zlib_la_CFLAGS = $(AM_CFLAGS) $(zlib_CFLAGS) compress_zlib_la_LIBADD = $(zlib_LIBS) endif if BUILD_COMPRESS_LZ4 pkglib_LTLIBRARIES += compress_lz4.la compress_lz4hc.la compress_lz4_la_LDFLAGS = $(MODULELDFLAGS) compress_lz4_la_CFLAGS = $(AM_CFLAGS) $(liblz4_CFLAGS) compress_lz4_la_LIBADD = $(liblz4_LIBS) compress_lz4hc_la_LDFLAGS = $(MODULELDFLAGS) compress_lz4hc_la_CFLAGS = $(AM_CFLAGS) $(liblz4_CFLAGS) compress_lz4hc_la_LIBADD = $(liblz4_LIBS) endif if BUILD_COMPRESS_LZO2 pkglib_LTLIBRARIES += compress_lzo2.la compress_lzo2_la_LDFLAGS = $(MODULELDFLAGS) compress_lzo2_la_CFLAGS = $(AM_CFLAGS) $(lzo2_CFLAGS) compress_lzo2_la_LIBADD = $(lzo2_LIBS) endif if BUILD_COMPRESS_LZMA pkglib_LTLIBRARIES += compress_lzma.la compress_lzma_la_LDFLAGS = $(MODULELDFLAGS) compress_lzma_la_CFLAGS = $(AM_CFLAGS) $(liblzma_CFLAGS) compress_lzma_la_LIBADD = $(liblzma_LIBS) endif if BUILD_COMPRESS_BZIP2 pkglib_LTLIBRARIES += compress_bzip2.la compress_bzip2_la_LDFLAGS = $(MODULELDFLAGS) compress_bzip2_la_CFLAGS = $(AM_CFLAGS) $(bzip2_CFLAGS) compress_bzip2_la_LIBADD = $(bzip2_LIBS) endif if BUILD_CRYPTO_NSS pkglib_LTLIBRARIES += crypto_nss.la crypto_nss_la_LDFLAGS = $(MODULELDFLAGS) crypto_nss_la_CFLAGS = $(AM_CFLAGS) $(nss_CFLAGS) crypto_nss_la_LIBADD = $(nss_LIBS) endif if BUILD_CRYPTO_OPENSSL pkglib_LTLIBRARIES += crypto_openssl.la crypto_openssl_la_LDFLAGS = $(MODULELDFLAGS) crypto_openssl_la_CFLAGS = $(AM_CFLAGS) $(openssl_CFLAGS) crypto_openssl_la_LIBADD = $(openssl_LIBS) endif diff --git a/libknet/compress.c b/libknet/compress.c index f62861d6..ec6f16d8 100644 --- a/libknet/compress.c +++ b/libknet/compress.c @@ -1,513 +1,545 @@ /* * Copyright (C) 2017-2020 Red Hat, Inc. All rights reserved. * * Author: Fabio M. Di Nitto * * This software licensed under LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include "internals.h" #include "compress.h" #include "compress_model.h" #include "logging.h" #include "threads_common.h" #include "common.h" /* * internal module switch data */ /* * DO NOT CHANGE MODEL_ID HERE OR ONWIRE COMPATIBILITY * WILL BREAK! * * Always add new items before the last NULL. */ static compress_model_t compress_modules_cmds[KNET_MAX_COMPRESS_METHODS + 1] = { { "none" , 0, 0, 0, NULL }, { "zlib" , 1, WITH_COMPRESS_ZLIB , 0, NULL }, { "lz4" , 2, WITH_COMPRESS_LZ4 , 0, NULL }, { "lz4hc", 3, WITH_COMPRESS_LZ4 , 0, NULL }, { "lzo2" , 4, WITH_COMPRESS_LZO2 , 0, NULL }, { "lzma" , 5, WITH_COMPRESS_LZMA , 0, NULL }, { "bzip2", 6, WITH_COMPRESS_BZIP2, 0, NULL }, { "zstd" , 7, WITH_COMPRESS_ZSTD, 0, NULL }, { NULL, KNET_MAX_COMPRESS_METHODS, 0, 0, NULL } }; static int max_model = 0; static struct timespec last_load_failure; static int compress_get_model(const char *model) { int idx = 0; while (compress_modules_cmds[idx].model_name != NULL) { if (!strcmp(compress_modules_cmds[idx].model_name, model)) { return compress_modules_cmds[idx].model_id; } idx++; } return -1; } static int compress_get_max_model(void) { int idx = 0; while (compress_modules_cmds[idx].model_name != NULL) { idx++; } return idx - 1; } static int compress_is_valid_model(int compress_model) { int idx = 0; while (compress_modules_cmds[idx].model_name != NULL) { if ((compress_model == compress_modules_cmds[idx].model_id) && (compress_modules_cmds[idx].built_in == 1)) { return 0; } idx++; } return -1; } static int val_level( knet_handle_t knet_h, int compress_model, int compress_level) { if (compress_modules_cmds[compress_model].ops->val_level != NULL) { return compress_modules_cmds[compress_model].ops->val_level(knet_h, compress_level); } return 0; } /* * compress_check_lib_is_init needs to be invoked in a locked context! */ static int compress_check_lib_is_init(knet_handle_t knet_h, int cmp_model) { /* * lack of a .is_init function means that the module does not require * init per handle so we use a fake reference in the compress_int_data * to identify that we already increased the libref for this handle */ if (compress_modules_cmds[cmp_model].loaded == 1) { if (compress_modules_cmds[cmp_model].ops->is_init == NULL) { if (knet_h->compress_int_data[cmp_model] != NULL) { return 1; } } else { if (compress_modules_cmds[cmp_model].ops->is_init(knet_h, cmp_model) == 1) { return 1; } } } return 0; } /* * compress_load_lib should _always_ be invoked in write lock context */ static int compress_load_lib(knet_handle_t knet_h, int cmp_model, int rate_limit) { struct timespec clock_now; unsigned long long timediff; /* * checking again for paranoia and because * compress_check_lib_is_init is usually invoked in read context * and we need to switch from read to write locking in between. * another thread might have init the library in the meantime */ if (compress_check_lib_is_init(knet_h, cmp_model)) { return 0; } /* * due to the fact that decompress can load libraries * on demand, depending on the compress model selected * on other nodes, it is possible for an attacker * to send crafted packets to attempt to load libraries * at random in a DoS fashion. * If there is an error loading a library, then we want * to rate_limit a retry to reload the library every X * seconds to avoid a lock DoS that could greatly slow * down libknet. */ if (rate_limit) { if ((last_load_failure.tv_sec != 0) || (last_load_failure.tv_nsec != 0)) { clock_gettime(CLOCK_MONOTONIC, &clock_now); timespec_diff(last_load_failure, clock_now, &timediff); if (timediff < 10000000000) { errno = EAGAIN; return -1; } } } if (compress_modules_cmds[cmp_model].loaded == 0) { compress_modules_cmds[cmp_model].ops = load_module (knet_h, "compress", compress_modules_cmds[cmp_model].model_name); if (!compress_modules_cmds[cmp_model].ops) { clock_gettime(CLOCK_MONOTONIC, &last_load_failure); return -1; } if (compress_modules_cmds[cmp_model].ops->abi_ver != KNET_COMPRESS_MODEL_ABI) { log_err(knet_h, KNET_SUB_COMPRESS, "ABI mismatch loading module %s. knet ver: %d, module ver: %d", compress_modules_cmds[cmp_model].model_name, KNET_COMPRESS_MODEL_ABI, compress_modules_cmds[cmp_model].ops->abi_ver); errno = EINVAL; return -1; } compress_modules_cmds[cmp_model].loaded = 1; } if (compress_modules_cmds[cmp_model].ops->init != NULL) { if (compress_modules_cmds[cmp_model].ops->init(knet_h, cmp_model) < 0) { return -1; } } else { knet_h->compress_int_data[cmp_model] = (void *)&"1"; } return 0; } static int compress_lib_test(knet_handle_t knet_h) { int savederrno = 0; unsigned char src[KNET_DATABUFSIZE]; unsigned char dst[KNET_DATABUFSIZE_COMPRESS]; ssize_t dst_comp_len = KNET_DATABUFSIZE_COMPRESS, dst_decomp_len = KNET_DATABUFSIZE; unsigned int i; int request_level; memset(src, 0, KNET_DATABUFSIZE); memset(dst, 0, KNET_DATABUFSIZE_COMPRESS); /* * NOTE: we cannot use compress and decompress API calls due to locking * so we need to call directly into the modules */ if (compress_modules_cmds[knet_h->compress_model].ops->compress(knet_h, src, KNET_DATABUFSIZE, dst, &dst_comp_len) < 0) { savederrno = errno; log_err(knet_h, KNET_SUB_COMPRESS, "Unable to compress test buffer. Please check your compression settings: %s", strerror(savederrno)); errno = savederrno; return -1; } else if ((long unsigned int)dst_comp_len >= KNET_DATABUFSIZE) { /* * compress not effective, try again using default compression level when available */ request_level = knet_h->compress_level; log_warn(knet_h, KNET_SUB_COMPRESS, "Requested compression level (%d) did not generate any compressed data (source: %zu destination: %zu)", request_level, sizeof(src), dst_comp_len); if ((!compress_modules_cmds[knet_h->compress_model].ops->get_default_level()) || ((knet_h->compress_level = compress_modules_cmds[knet_h->compress_model].ops->get_default_level()) == KNET_COMPRESS_UNKNOWN_DEFAULT)) { log_err(knet_h, KNET_SUB_COMPRESS, "compression %s does not provide a default value", compress_modules_cmds[knet_h->compress_model].model_name); errno = EINVAL; return -1; } else { memset(src, 0, KNET_DATABUFSIZE); memset(dst, 0, KNET_DATABUFSIZE_COMPRESS); dst_comp_len = KNET_DATABUFSIZE_COMPRESS; if (compress_modules_cmds[knet_h->compress_model].ops->compress(knet_h, src, KNET_DATABUFSIZE, dst, &dst_comp_len) < 0) { savederrno = errno; log_err(knet_h, KNET_SUB_COMPRESS, "Unable to compress with default compression level: %s", strerror(savederrno)); errno = savederrno; return -1; } log_warn(knet_h, KNET_SUB_COMPRESS, "Requested compression level (%d) did not work, switching to default (%d)", request_level, knet_h->compress_level); } } if (compress_modules_cmds[knet_h->compress_model].ops->decompress(knet_h, dst, dst_comp_len, src, &dst_decomp_len) < 0) { savederrno = errno; log_err(knet_h, KNET_SUB_COMPRESS, "Unable to decompress test buffer. Please check your compression settings: %s", strerror(savederrno)); errno = savederrno; return -1; } for (i = 0; i < KNET_DATABUFSIZE; i++) { if (src[i] != 0) { log_err(knet_h, KNET_SUB_COMPRESS, "Decompressed buffer contains incorrect data"); errno = EINVAL; return -1; } } return 0; } int compress_init( knet_handle_t knet_h) { max_model = compress_get_max_model(); if (max_model > KNET_MAX_COMPRESS_METHODS) { log_err(knet_h, KNET_SUB_COMPRESS, "Too many compress methods defined in compress.c."); errno = EINVAL; return -1; } memset(&last_load_failure, 0, sizeof(struct timespec)); return 0; } -int compress_cfg( +static int compress_cfg( knet_handle_t knet_h, struct knet_handle_compress_cfg *knet_handle_compress_cfg) { int savederrno = 0, err = 0; int cmp_model; cmp_model = compress_get_model(knet_handle_compress_cfg->compress_model); if (cmp_model < 0) { log_err(knet_h, KNET_SUB_COMPRESS, "compress model %s not supported", knet_handle_compress_cfg->compress_model); errno = EINVAL; return -1; } log_debug(knet_h, KNET_SUB_COMPRESS, "Initizializing compress module [%s/%d/%u]", knet_handle_compress_cfg->compress_model, knet_handle_compress_cfg->compress_level, knet_handle_compress_cfg->compress_threshold); if (cmp_model > 0) { if (compress_modules_cmds[cmp_model].built_in == 0) { log_err(knet_h, KNET_SUB_COMPRESS, "compress model %s support has not been built in. Please contact your vendor or fix the build", knet_handle_compress_cfg->compress_model); errno = EINVAL; return -1; } if (knet_handle_compress_cfg->compress_threshold > KNET_MAX_PACKET_SIZE) { log_err(knet_h, KNET_SUB_COMPRESS, "compress threshold cannot be higher than KNET_MAX_PACKET_SIZE (%d).", KNET_MAX_PACKET_SIZE); errno = EINVAL; return -1; } if (knet_handle_compress_cfg->compress_threshold == 0) { knet_h->compress_threshold = KNET_COMPRESS_THRESHOLD; log_debug(knet_h, KNET_SUB_COMPRESS, "resetting compression threshold to default (%d)", KNET_COMPRESS_THRESHOLD); } else { knet_h->compress_threshold = knet_handle_compress_cfg->compress_threshold; } savederrno = pthread_rwlock_rdlock(&shlib_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_COMPRESS, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!compress_check_lib_is_init(knet_h, cmp_model)) { /* * need to switch to write lock, load the lib, and return with a write lock * this is not racy because compress_load_lib is written idempotent. */ pthread_rwlock_unlock(&shlib_rwlock); savederrno = pthread_rwlock_wrlock(&shlib_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_COMPRESS, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (compress_load_lib(knet_h, cmp_model, 0) < 0) { savederrno = errno; log_err(knet_h, KNET_SUB_COMPRESS, "Unable to load library: %s", strerror(savederrno)); err = -1; goto out_unlock; } } if (val_level(knet_h, cmp_model, knet_handle_compress_cfg->compress_level) < 0) { log_err(knet_h, KNET_SUB_COMPRESS, "compress level %d not supported for model %s", knet_handle_compress_cfg->compress_level, knet_handle_compress_cfg->compress_model); savederrno = EINVAL; err = -1; goto out_unlock; } knet_h->compress_model = cmp_model; knet_h->compress_level = knet_handle_compress_cfg->compress_level; if (compress_lib_test(knet_h) < 0) { savederrno = errno; err = -1; goto out_unlock; } out_unlock: pthread_rwlock_unlock(&shlib_rwlock); } if (err) { knet_h->compress_model = 0; knet_h->compress_level = 0; } errno = savederrno; return err; } void compress_fini( knet_handle_t knet_h, int all) { int savederrno = 0; int idx = 0; savederrno = pthread_rwlock_wrlock(&shlib_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_COMPRESS, "Unable to get write lock: %s", strerror(savederrno)); return; } while (idx < KNET_MAX_COMPRESS_METHODS) { if ((compress_modules_cmds[idx].model_name != NULL) && (compress_modules_cmds[idx].built_in == 1) && (compress_modules_cmds[idx].loaded == 1) && (compress_modules_cmds[idx].model_id > 0) && (knet_h->compress_int_data[idx] != NULL)) { if ((all) || (compress_modules_cmds[idx].model_id == knet_h->compress_model)) { if (compress_modules_cmds[idx].ops->fini != NULL) { compress_modules_cmds[idx].ops->fini(knet_h, idx); } else { knet_h->compress_int_data[idx] = NULL; } } } idx++; } pthread_rwlock_unlock(&shlib_rwlock); return; } /* * compress does not require compress_check_lib_is_init * because it's protected by compress_cfg */ int compress( knet_handle_t knet_h, const unsigned char *buf_in, const ssize_t buf_in_len, unsigned char *buf_out, ssize_t *buf_out_len) { return compress_modules_cmds[knet_h->compress_model].ops->compress(knet_h, buf_in, buf_in_len, buf_out, buf_out_len); } int decompress( knet_handle_t knet_h, int compress_model, const unsigned char *buf_in, const ssize_t buf_in_len, unsigned char *buf_out, ssize_t *buf_out_len) { int savederrno = 0, err = 0; if (compress_model > max_model) { log_err(knet_h, KNET_SUB_COMPRESS, "Received packet with unknown compress model %d", compress_model); errno = EINVAL; return -1; } if (compress_is_valid_model(compress_model) < 0) { log_err(knet_h, KNET_SUB_COMPRESS, "Received packet compressed with %s but support is not built in this version of libknet. Please contact your distribution vendor or fix the build.", compress_modules_cmds[compress_model].model_name); errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&shlib_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_COMPRESS, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!compress_check_lib_is_init(knet_h, compress_model)) { /* * need to switch to write lock, load the lib, and return with a write lock * this is not racy because compress_load_lib is written idempotent. */ pthread_rwlock_unlock(&shlib_rwlock); savederrno = pthread_rwlock_wrlock(&shlib_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_COMPRESS, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (compress_load_lib(knet_h, compress_model, 1) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_COMPRESS, "Unable to load library: %s", strerror(savederrno)); goto out_unlock; } } err = compress_modules_cmds[compress_model].ops->decompress(knet_h, buf_in, buf_in_len, buf_out, buf_out_len); savederrno = errno; out_unlock: pthread_rwlock_unlock(&shlib_rwlock); errno = savederrno; return err; } +int knet_handle_compress(knet_handle_t knet_h, struct knet_handle_compress_cfg *knet_handle_compress_cfg) +{ + int savederrno = 0; + int err = 0; + + if (!knet_h) { + errno = EINVAL; + return -1; + } + + if (!knet_handle_compress_cfg) { + errno = EINVAL; + return -1; + } + + savederrno = get_global_wrlock(knet_h); + if (savederrno) { + log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", + strerror(savederrno)); + errno = savederrno; + return -1; + } + + compress_fini(knet_h, 0); + err = compress_cfg(knet_h, knet_handle_compress_cfg); + savederrno = errno; + + pthread_rwlock_unlock(&knet_h->global_rwlock); + errno = err ? savederrno : 0; + return err; +} + int knet_get_compress_list(struct knet_compress_info *compress_list, size_t *compress_list_entries) { int err = 0; int idx = 0; int outidx = 0; if (!compress_list_entries) { errno = EINVAL; return -1; } while (compress_modules_cmds[idx].model_name != NULL) { if (compress_modules_cmds[idx].built_in) { if (compress_list) { compress_list[outidx].name = compress_modules_cmds[idx].model_name; } outidx++; } idx++; } *compress_list_entries = outidx; if (!err) errno = 0; return err; } diff --git a/libknet/compress.h b/libknet/compress.h index 95464e82..c0f28f19 100644 --- a/libknet/compress.h +++ b/libknet/compress.h @@ -1,40 +1,36 @@ /* * Copyright (C) 2017-2020 Red Hat, Inc. All rights reserved. * * Author: Fabio M. Di Nitto * * This software licensed under LGPL-2.0+ */ #ifndef __KNET_COMPRESS_H__ #define __KNET_COMPRESS_H__ #include "internals.h" -int compress_cfg( - knet_handle_t knet_h, - struct knet_handle_compress_cfg *knet_handle_compress_cfg); - int compress_init( knet_handle_t knet_h); void compress_fini( knet_handle_t knet_h, int all); int compress( knet_handle_t knet_h, const unsigned char *buf_in, const ssize_t buf_in_len, unsigned char *buf_out, ssize_t *buf_out_len); int decompress( knet_handle_t knet_h, int compress_model, const unsigned char *buf_in, const ssize_t buf_in_len, unsigned char *buf_out, ssize_t *buf_out_len); #endif diff --git a/libknet/crypto.c b/libknet/crypto.c index 0c475d0f..28d29ef3 100644 --- a/libknet/crypto.c +++ b/libknet/crypto.c @@ -1,319 +1,534 @@ /* * Copyright (C) 2012-2020 Red Hat, Inc. All rights reserved. * * Author: Fabio M. Di Nitto * * This software licensed under LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include "crypto.h" #include "crypto_model.h" #include "internals.h" #include "logging.h" #include "common.h" /* * internal module switch data */ static crypto_model_t crypto_modules_cmds[] = { { "nss", WITH_CRYPTO_NSS, 0, NULL }, { "openssl", WITH_CRYPTO_OPENSSL, 0, NULL }, { NULL, 0, 0, NULL } }; static int crypto_get_model(const char *model) { int idx = 0; while (crypto_modules_cmds[idx].model_name != NULL) { if (!strcmp(crypto_modules_cmds[idx].model_name, model)) return idx; idx++; } return -1; } /* * exported API */ int crypto_encrypt_and_sign ( knet_handle_t knet_h, const unsigned char *buf_in, const ssize_t buf_in_len, unsigned char *buf_out, ssize_t *buf_out_len) { return crypto_modules_cmds[knet_h->crypto_instance[knet_h->crypto_in_use_config]->model].ops->crypt(knet_h, knet_h->crypto_instance[knet_h->crypto_in_use_config], buf_in, buf_in_len, buf_out, buf_out_len); } int crypto_encrypt_and_signv ( knet_handle_t knet_h, const struct iovec *iov_in, int iovcnt_in, unsigned char *buf_out, ssize_t *buf_out_len) { return crypto_modules_cmds[knet_h->crypto_instance[knet_h->crypto_in_use_config]->model].ops->cryptv(knet_h, knet_h->crypto_instance[knet_h->crypto_in_use_config], iov_in, iovcnt_in, buf_out, buf_out_len); } int crypto_authenticate_and_decrypt ( knet_handle_t knet_h, const unsigned char *buf_in, const ssize_t buf_in_len, unsigned char *buf_out, ssize_t *buf_out_len) { int i, err = 0; int multiple_configs = 0; uint8_t log_level = KNET_LOG_ERR; for (i = 1; i <= KNET_MAX_CRYPTO_INSTANCES; i++) { if (knet_h->crypto_instance[i]) { multiple_configs++; } } /* * attempt to decrypt first with the in-use config * to avoid excessive performance hit. */ if (multiple_configs > 1) { log_level = KNET_LOG_DEBUG; } if (knet_h->crypto_in_use_config) { err = crypto_modules_cmds[knet_h->crypto_instance[knet_h->crypto_in_use_config]->model].ops->decrypt(knet_h, knet_h->crypto_instance[knet_h->crypto_in_use_config], buf_in, buf_in_len, buf_out, buf_out_len, log_level); } else { err = -1; } /* * if we fail, try to use the other configurations */ if (err) { for (i = 1; i <= KNET_MAX_CRYPTO_INSTANCES; i++) { /* * in-use config was already attempted */ if (i == knet_h->crypto_in_use_config) { continue; } if (knet_h->crypto_instance[i]) { log_debug(knet_h, KNET_SUB_CRYPTO, "Alternative crypto configuration found, attempting to decrypt with config %u", i); err = crypto_modules_cmds[knet_h->crypto_instance[i]->model].ops->decrypt(knet_h, knet_h->crypto_instance[i], buf_in, buf_in_len, buf_out, buf_out_len, KNET_LOG_ERR); if (!err) { errno = 0; /* clear errno from previous failures */ return err; } log_debug(knet_h, KNET_SUB_CRYPTO, "Packet failed to decrypt with crypto config %u", i); } } } return err; } -int crypto_use_config( +static int crypto_use_config( knet_handle_t knet_h, uint8_t config_num) { if ((config_num) && (!knet_h->crypto_instance[config_num])) { errno = EINVAL; return -1; } knet_h->crypto_in_use_config = config_num; if (config_num) { knet_h->sec_block_size = knet_h->crypto_instance[config_num]->sec_block_size; knet_h->sec_hash_size = knet_h->crypto_instance[config_num]->sec_hash_size; knet_h->sec_salt_size = knet_h->crypto_instance[config_num]->sec_salt_size; } else { knet_h->sec_block_size = 0; knet_h->sec_hash_size = 0; knet_h->sec_salt_size = 0; } force_pmtud_run(knet_h, KNET_SUB_CRYPTO, 1); return 0; } -int crypto_init( +static int crypto_init( knet_handle_t knet_h, struct knet_handle_crypto_cfg *knet_handle_crypto_cfg, uint8_t config_num) { int err = 0, savederrno = 0; int model = 0; struct crypto_instance *current = NULL, *new = NULL; current = knet_h->crypto_instance[config_num]; model = crypto_get_model(knet_handle_crypto_cfg->crypto_model); if (model < 0) { log_err(knet_h, KNET_SUB_CRYPTO, "model %s not supported", knet_handle_crypto_cfg->crypto_model); return -1; } if (crypto_modules_cmds[model].built_in == 0) { log_err(knet_h, KNET_SUB_CRYPTO, "this version of libknet was built without %s support. Please contact your vendor or fix the build.", knet_handle_crypto_cfg->crypto_model); return -1; } savederrno = pthread_rwlock_wrlock(&shlib_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_CRYPTO, "Unable to get write lock: %s", strerror(savederrno)); return -1; } if (!crypto_modules_cmds[model].loaded) { crypto_modules_cmds[model].ops = load_module (knet_h, "crypto", crypto_modules_cmds[model].model_name); if (!crypto_modules_cmds[model].ops) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_CRYPTO, "Unable to load %s lib", crypto_modules_cmds[model].model_name); goto out; } if (crypto_modules_cmds[model].ops->abi_ver != KNET_CRYPTO_MODEL_ABI) { savederrno = EINVAL; err = -1; log_err(knet_h, KNET_SUB_CRYPTO, "ABI mismatch loading module %s. knet ver: %d, module ver: %d", crypto_modules_cmds[model].model_name, KNET_CRYPTO_MODEL_ABI, crypto_modules_cmds[model].ops->abi_ver); goto out; } crypto_modules_cmds[model].loaded = 1; } log_debug(knet_h, KNET_SUB_CRYPTO, "Initizializing crypto module [%s/%s/%s]", knet_handle_crypto_cfg->crypto_model, knet_handle_crypto_cfg->crypto_cipher_type, knet_handle_crypto_cfg->crypto_hash_type); new = malloc(sizeof(struct crypto_instance)); if (!new) { savederrno = ENOMEM; err = -1; log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto instance"); goto out; } /* * if crypto_modules_cmds.ops->init fails, it is expected that * it will clean everything by itself. * crypto_modules_cmds.ops->fini is not invoked on error. */ new->model = model; if (crypto_modules_cmds[model].ops->init(knet_h, new, knet_handle_crypto_cfg)) { savederrno = errno; err = -1; goto out; } out: if (!err) { knet_h->crypto_instance[config_num] = new; if (current) { /* * if we are replacing the current config, we need to enable it right away */ if (knet_h->crypto_in_use_config == config_num) { crypto_use_config(knet_h, config_num); } if (crypto_modules_cmds[current->model].ops->fini != NULL) { crypto_modules_cmds[current->model].ops->fini(knet_h, current); } free(current); } } else { if (new) { free(new); } } pthread_rwlock_unlock(&shlib_rwlock); errno = err ? savederrno : 0; return err; } static void crypto_fini_config( knet_handle_t knet_h, uint8_t config_num) { if (knet_h->crypto_instance[config_num]) { if (crypto_modules_cmds[knet_h->crypto_instance[config_num]->model].ops->fini != NULL) { crypto_modules_cmds[knet_h->crypto_instance[config_num]->model].ops->fini(knet_h, knet_h->crypto_instance[config_num]); } free(knet_h->crypto_instance[config_num]); knet_h->crypto_instance[config_num] = NULL; } } void crypto_fini( knet_handle_t knet_h, uint8_t config_num) { int savederrno = 0, i; savederrno = pthread_rwlock_wrlock(&shlib_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_CRYPTO, "Unable to get write lock: %s", strerror(savederrno)); return; } if (config_num > KNET_MAX_CRYPTO_INSTANCES) { for (i = 1; i <= KNET_MAX_CRYPTO_INSTANCES; i++) { crypto_fini_config(knet_h, i); } } else { crypto_fini_config(knet_h, config_num); } pthread_rwlock_unlock(&shlib_rwlock); return; } +static int _knet_handle_crypto_set_config(knet_handle_t knet_h, + struct knet_handle_crypto_cfg *knet_handle_crypto_cfg, + uint8_t config_num, + uint8_t force) +{ + int savederrno = 0; + int err = 0; + + if (!knet_h) { + errno = EINVAL; + return -1; + } + + if (!knet_handle_crypto_cfg) { + errno = EINVAL; + return -1; + } + + if ((config_num < 1) || (config_num > KNET_MAX_CRYPTO_INSTANCES)) { + errno = EINVAL; + return -1; + } + + savederrno = get_global_wrlock(knet_h); + if (savederrno) { + log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", + strerror(savederrno)); + errno = savederrno; + return -1; + } + + if ((knet_h->crypto_in_use_config == config_num) && (!force)) { + savederrno = EBUSY; + err = -1; + goto exit_unlock; + } + + if ((!strncmp("none", knet_handle_crypto_cfg->crypto_model, 4)) || + ((!strncmp("none", knet_handle_crypto_cfg->crypto_cipher_type, 4)) && + (!strncmp("none", knet_handle_crypto_cfg->crypto_hash_type, 4)))) { + crypto_fini(knet_h, config_num); + log_debug(knet_h, KNET_SUB_CRYPTO, "crypto config %u is not enabled", config_num); + err = 0; + goto exit_unlock; + } + + if (knet_handle_crypto_cfg->private_key_len < KNET_MIN_KEY_LEN) { + log_debug(knet_h, KNET_SUB_CRYPTO, "private key len too short for config %u (min %d): %u", + config_num, KNET_MIN_KEY_LEN, knet_handle_crypto_cfg->private_key_len); + savederrno = EINVAL; + err = -1; + goto exit_unlock; + } + + if (knet_handle_crypto_cfg->private_key_len > KNET_MAX_KEY_LEN) { + log_debug(knet_h, KNET_SUB_CRYPTO, "private key len too long for config %u (max %d): %u", + config_num, KNET_MAX_KEY_LEN, knet_handle_crypto_cfg->private_key_len); + savederrno = EINVAL; + err = -1; + goto exit_unlock; + } + + err = crypto_init(knet_h, knet_handle_crypto_cfg, config_num); + + if (err) { + err = -2; + savederrno = errno; + } + +exit_unlock: + pthread_rwlock_unlock(&knet_h->global_rwlock); + errno = err ? savederrno : 0; + return err; +} + +int knet_handle_crypto_set_config(knet_handle_t knet_h, + struct knet_handle_crypto_cfg *knet_handle_crypto_cfg, + uint8_t config_num) +{ + return _knet_handle_crypto_set_config(knet_h, knet_handle_crypto_cfg, config_num, 0); +} + +int knet_handle_crypto_rx_clear_traffic(knet_handle_t knet_h, + uint8_t value) +{ + int savederrno = 0; + + if (!knet_h) { + errno = EINVAL; + return -1; + } + + if (value > KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC) { + errno = EINVAL; + return -1; + } + + savederrno = get_global_wrlock(knet_h); + if (savederrno) { + log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", + strerror(savederrno)); + errno = savederrno; + return -1; + } + + knet_h->crypto_only = value; + if (knet_h->crypto_only) { + log_debug(knet_h, KNET_SUB_CRYPTO, "Only crypto traffic allowed for RX"); + } else { + log_debug(knet_h, KNET_SUB_CRYPTO, "Both crypto and clear traffic allowed for RX"); + } + + pthread_rwlock_unlock(&knet_h->global_rwlock); + return 0; +} + +int knet_handle_crypto_use_config(knet_handle_t knet_h, + uint8_t config_num) +{ + int savederrno = 0; + int err = 0; + + if (!knet_h) { + errno = EINVAL; + return -1; + } + + if (config_num > KNET_MAX_CRYPTO_INSTANCES) { + errno = EINVAL; + return -1; + } + + savederrno = get_global_wrlock(knet_h); + if (savederrno) { + log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", + strerror(savederrno)); + errno = savederrno; + return -1; + } + + err = crypto_use_config(knet_h, config_num); + savederrno = errno; + + pthread_rwlock_unlock(&knet_h->global_rwlock); + errno = err ? savederrno : 0; + return err; +} + int knet_get_crypto_list(struct knet_crypto_info *crypto_list, size_t *crypto_list_entries) { int err = 0; int idx = 0; int outidx = 0; if (!crypto_list_entries) { errno = EINVAL; return -1; } while (crypto_modules_cmds[idx].model_name != NULL) { if (crypto_modules_cmds[idx].built_in) { if (crypto_list) { crypto_list[outidx].name = crypto_modules_cmds[idx].model_name; } outidx++; } idx++; } *crypto_list_entries = outidx; if (!err) errno = 0; return err; } + +/* + * compatibility wrapper for 1.x releases + */ +int knet_handle_crypto(knet_handle_t knet_h, struct knet_handle_crypto_cfg *knet_handle_crypto_cfg) +{ + int err = 0; + uint8_t value; + + if (!knet_h) { + errno = EINVAL; + return -1; + } + + value = knet_h->crypto_only; + /* + * configure crypto in slot 1 + */ + err = _knet_handle_crypto_set_config(knet_h, knet_handle_crypto_cfg, 1, 1); + if (err < 0) { + return err; + } + + if ((!strncmp("none", knet_handle_crypto_cfg->crypto_model, 4)) || + ((!strncmp("none", knet_handle_crypto_cfg->crypto_cipher_type, 4)) && + (!strncmp("none", knet_handle_crypto_cfg->crypto_hash_type, 4)))) { + err = knet_handle_crypto_rx_clear_traffic(knet_h, KNET_CRYPTO_RX_ALLOW_CLEAR_TRAFFIC); + if (err < 0) { + return err; + } + + /* + * start using clear traffic + */ + err = knet_handle_crypto_use_config(knet_h, 0); + if (err < 0) { + err = knet_handle_crypto_rx_clear_traffic(knet_h, value); + if (err < 0) { + /* + * force attempt or things will go bad + */ + knet_h->crypto_only = value; + } + } + return err; + } else { + err = knet_handle_crypto_rx_clear_traffic(knet_h, KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC); + if (err < 0) { + return err; + } + + /* + * start using crypto traffic + */ + err = knet_handle_crypto_use_config(knet_h, 1); + if (err < 0) { + err = knet_handle_crypto_rx_clear_traffic(knet_h, value); + if (err < 0) { + /* + * force attempt or things will go bad + */ + knet_h->crypto_only = value; + } + } + return err; + } +} diff --git a/libknet/crypto.h b/libknet/crypto.h index d04bc484..864ceba3 100644 --- a/libknet/crypto.h +++ b/libknet/crypto.h @@ -1,48 +1,39 @@ /* * Copyright (C) 2012-2020 Red Hat, Inc. All rights reserved. * * Author: Fabio M. Di Nitto * * This software licensed under LGPL-2.0+ */ #ifndef __KNET_CRYPTO_H__ #define __KNET_CRYPTO_H__ #include "internals.h" int crypto_authenticate_and_decrypt ( knet_handle_t knet_h, const unsigned char *buf_in, const ssize_t buf_in_len, unsigned char *buf_out, ssize_t *buf_out_len); int crypto_encrypt_and_sign ( knet_handle_t knet_h, const unsigned char *buf_in, const ssize_t buf_in_len, unsigned char *buf_out, ssize_t *buf_out_len); int crypto_encrypt_and_signv ( knet_handle_t knet_h, const struct iovec *iov_in, int iovcnt_in, unsigned char *buf_out, ssize_t *buf_out_len); -int crypto_use_config ( - knet_handle_t knet_h, - uint8_t config_num); - -int crypto_init( - knet_handle_t knet_h, - struct knet_handle_crypto_cfg *knet_handle_crypto_cfg, - uint8_t config_num); - void crypto_fini( knet_handle_t knet_h, uint8_t config_num); #endif diff --git a/libknet/handle.c b/libknet/handle.c index 71658926..68641d6e 100644 --- a/libknet/handle.c +++ b/libknet/handle.c @@ -1,1923 +1,808 @@ /* * Copyright (C) 2010-2020 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under LGPL-2.0+ */ #include "config.h" #include #include #include #include #include -#include #include #include #include #include "internals.h" #include "crypto.h" #include "links.h" #include "compress.h" #include "compat.h" #include "common.h" #include "threads_common.h" #include "threads_heartbeat.h" #include "threads_pmtud.h" #include "threads_dsthandler.h" #include "threads_rx.h" #include "threads_tx.h" #include "transports.h" #include "transport_common.h" #include "logging.h" static pthread_mutex_t handle_config_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_rwlock_t shlib_rwlock; static uint8_t shlib_wrlock_init = 0; static uint32_t knet_ref = 0; static int _init_shlib_tracker(knet_handle_t knet_h) { int savederrno = 0; if (!shlib_wrlock_init) { savederrno = pthread_rwlock_init(&shlib_rwlock, NULL); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize shared lib rwlock: %s", strerror(savederrno)); errno = savederrno; return -1; } shlib_wrlock_init = 1; } return 0; } static void _fini_shlib_tracker(void) { if (knet_ref == 0) { pthread_rwlock_destroy(&shlib_rwlock); shlib_wrlock_init = 0; } return; } static int _init_locks(knet_handle_t knet_h) { int savederrno = 0; savederrno = pthread_rwlock_init(&knet_h->global_rwlock, NULL); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize list rwlock: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_mutex_init(&knet_h->handle_stats_mutex, NULL); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize handle stats mutex: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_mutex_init(&knet_h->threads_status_mutex, NULL); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize threads status mutex: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_mutex_init(&knet_h->pmtud_mutex, NULL); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize pmtud mutex: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_mutex_init(&knet_h->kmtu_mutex, NULL); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize kernel_mtu mutex: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_cond_init(&knet_h->pmtud_cond, NULL); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize pmtud conditional mutex: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_mutex_init(&knet_h->hb_mutex, NULL); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize hb_thread mutex: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_mutex_init(&knet_h->tx_mutex, NULL); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize tx_thread mutex: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_mutex_init(&knet_h->backoff_mutex, NULL); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize pong timeout backoff mutex: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_mutex_init(&knet_h->tx_seq_num_mutex, NULL); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize tx_seq_num_mutex mutex: %s", strerror(savederrno)); goto exit_fail; } return 0; exit_fail: errno = savederrno; return -1; } static void _destroy_locks(knet_handle_t knet_h) { pthread_rwlock_destroy(&knet_h->global_rwlock); pthread_mutex_destroy(&knet_h->pmtud_mutex); pthread_mutex_destroy(&knet_h->kmtu_mutex); pthread_cond_destroy(&knet_h->pmtud_cond); pthread_mutex_destroy(&knet_h->hb_mutex); pthread_mutex_destroy(&knet_h->tx_mutex); pthread_mutex_destroy(&knet_h->backoff_mutex); pthread_mutex_destroy(&knet_h->tx_seq_num_mutex); pthread_mutex_destroy(&knet_h->threads_status_mutex); pthread_mutex_destroy(&knet_h->handle_stats_mutex); } static int _init_socks(knet_handle_t knet_h) { int savederrno = 0; if (_init_socketpair(knet_h, knet_h->hostsockfd)) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize internal hostsockpair: %s", strerror(savederrno)); goto exit_fail; } if (_init_socketpair(knet_h, knet_h->dstsockfd)) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize internal dstsockpair: %s", strerror(savederrno)); goto exit_fail; } return 0; exit_fail: errno = savederrno; return -1; } static void _close_socks(knet_handle_t knet_h) { _close_socketpair(knet_h, knet_h->dstsockfd); _close_socketpair(knet_h, knet_h->hostsockfd); } static int _init_buffers(knet_handle_t knet_h) { int savederrno = 0; int i; size_t bufsize; for (i = 0; i < PCKT_FRAG_MAX; i++) { bufsize = ceil((float)KNET_MAX_PACKET_SIZE / (i + 1)) + KNET_HEADER_ALL_SIZE; knet_h->send_to_links_buf[i] = malloc(bufsize); if (!knet_h->send_to_links_buf[i]) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory datafd to link buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->send_to_links_buf[i], 0, bufsize); } for (i = 0; i < PCKT_RX_BUFS; i++) { knet_h->recv_from_links_buf[i] = malloc(KNET_DATABUFSIZE); if (!knet_h->recv_from_links_buf[i]) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for link to datafd buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->recv_from_links_buf[i], 0, KNET_DATABUFSIZE); } knet_h->recv_from_sock_buf = malloc(KNET_DATABUFSIZE); if (!knet_h->recv_from_sock_buf) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for app to datafd buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->recv_from_sock_buf, 0, KNET_DATABUFSIZE); knet_h->pingbuf = malloc(KNET_HEADER_PING_SIZE); if (!knet_h->pingbuf) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for hearbeat buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->pingbuf, 0, KNET_HEADER_PING_SIZE); knet_h->pmtudbuf = malloc(KNET_PMTUD_SIZE_V6 + KNET_HEADER_ALL_SIZE); if (!knet_h->pmtudbuf) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for pmtud buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->pmtudbuf, 0, KNET_PMTUD_SIZE_V6 + KNET_HEADER_ALL_SIZE); for (i = 0; i < PCKT_FRAG_MAX; i++) { bufsize = ceil((float)KNET_MAX_PACKET_SIZE / (i + 1)) + KNET_HEADER_ALL_SIZE + KNET_DATABUFSIZE_CRYPT_PAD; knet_h->send_to_links_buf_crypt[i] = malloc(bufsize); if (!knet_h->send_to_links_buf_crypt[i]) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for crypto datafd to link buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->send_to_links_buf_crypt[i], 0, bufsize); } knet_h->recv_from_links_buf_decrypt = malloc(KNET_DATABUFSIZE_CRYPT); if (!knet_h->recv_from_links_buf_decrypt) { savederrno = errno; log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto link to datafd buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->recv_from_links_buf_decrypt, 0, KNET_DATABUFSIZE_CRYPT); knet_h->recv_from_links_buf_crypt = malloc(KNET_DATABUFSIZE_CRYPT); if (!knet_h->recv_from_links_buf_crypt) { savederrno = errno; log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto link to datafd buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->recv_from_links_buf_crypt, 0, KNET_DATABUFSIZE_CRYPT); knet_h->pingbuf_crypt = malloc(KNET_DATABUFSIZE_CRYPT); if (!knet_h->pingbuf_crypt) { savederrno = errno; log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto hearbeat buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->pingbuf_crypt, 0, KNET_DATABUFSIZE_CRYPT); knet_h->pmtudbuf_crypt = malloc(KNET_DATABUFSIZE_CRYPT); if (!knet_h->pmtudbuf_crypt) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for crypto pmtud buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->pmtudbuf_crypt, 0, KNET_DATABUFSIZE_CRYPT); knet_h->recv_from_links_buf_decompress = malloc(KNET_DATABUFSIZE_COMPRESS); if (!knet_h->recv_from_links_buf_decompress) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for decompress buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->recv_from_links_buf_decompress, 0, KNET_DATABUFSIZE_COMPRESS); knet_h->send_to_links_buf_compress = malloc(KNET_DATABUFSIZE_COMPRESS); if (!knet_h->send_to_links_buf_compress) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for compress buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->send_to_links_buf_compress, 0, KNET_DATABUFSIZE_COMPRESS); memset(knet_h->knet_transport_fd_tracker, 0, sizeof(knet_h->knet_transport_fd_tracker)); for (i = 0; i < KNET_MAX_FDS; i++) { knet_h->knet_transport_fd_tracker[i].transport = KNET_MAX_TRANSPORTS; } return 0; exit_fail: errno = savederrno; return -1; } static void _destroy_buffers(knet_handle_t knet_h) { int i; for (i = 0; i < PCKT_FRAG_MAX; i++) { free(knet_h->send_to_links_buf[i]); free(knet_h->send_to_links_buf_crypt[i]); } for (i = 0; i < PCKT_RX_BUFS; i++) { free(knet_h->recv_from_links_buf[i]); } free(knet_h->recv_from_links_buf_decompress); free(knet_h->send_to_links_buf_compress); free(knet_h->recv_from_sock_buf); free(knet_h->recv_from_links_buf_decrypt); free(knet_h->recv_from_links_buf_crypt); free(knet_h->pingbuf); free(knet_h->pingbuf_crypt); free(knet_h->pmtudbuf); free(knet_h->pmtudbuf_crypt); } static int _init_epolls(knet_handle_t knet_h) { struct epoll_event ev; int savederrno = 0; /* * even if the kernel does dynamic allocation with epoll_ctl * we need to reserve one extra for host to host communication */ knet_h->send_to_links_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS + 1); if (knet_h->send_to_links_epollfd < 0) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to create epoll datafd to link fd: %s", strerror(savederrno)); goto exit_fail; } knet_h->recv_from_links_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS); if (knet_h->recv_from_links_epollfd < 0) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to create epoll link to datafd fd: %s", strerror(savederrno)); goto exit_fail; } knet_h->dst_link_handler_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS); if (knet_h->dst_link_handler_epollfd < 0) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to create epoll dst cache fd: %s", strerror(savederrno)); goto exit_fail; } if (_fdset_cloexec(knet_h->send_to_links_epollfd)) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on datafd to link epoll fd: %s", strerror(savederrno)); goto exit_fail; } if (_fdset_cloexec(knet_h->recv_from_links_epollfd)) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on link to datafd epoll fd: %s", strerror(savederrno)); goto exit_fail; } if (_fdset_cloexec(knet_h->dst_link_handler_epollfd)) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on dst cache epoll fd: %s", strerror(savederrno)); goto exit_fail; } memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = knet_h->hostsockfd[0]; if (epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_ADD, knet_h->hostsockfd[0], &ev)) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to add hostsockfd[0] to epoll pool: %s", strerror(savederrno)); goto exit_fail; } memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = knet_h->dstsockfd[0]; if (epoll_ctl(knet_h->dst_link_handler_epollfd, EPOLL_CTL_ADD, knet_h->dstsockfd[0], &ev)) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to add dstsockfd[0] to epoll pool: %s", strerror(savederrno)); goto exit_fail; } return 0; exit_fail: errno = savederrno; return -1; } static void _close_epolls(knet_handle_t knet_h) { struct epoll_event ev; int i; memset(&ev, 0, sizeof(struct epoll_event)); for (i = 0; i < KNET_DATAFD_MAX; i++) { if (knet_h->sockfd[i].in_use) { epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->sockfd[i].sockfd[knet_h->sockfd[i].is_created], &ev); if (knet_h->sockfd[i].sockfd[knet_h->sockfd[i].is_created]) { _close_socketpair(knet_h, knet_h->sockfd[i].sockfd); } } } epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->hostsockfd[0], &ev); epoll_ctl(knet_h->dst_link_handler_epollfd, EPOLL_CTL_DEL, knet_h->dstsockfd[0], &ev); close(knet_h->send_to_links_epollfd); close(knet_h->recv_from_links_epollfd); close(knet_h->dst_link_handler_epollfd); } static int _start_threads(knet_handle_t knet_h) { int savederrno = 0; pthread_attr_t attr; set_thread_status(knet_h, KNET_THREAD_PMTUD, KNET_THREAD_REGISTERED); savederrno = pthread_attr_init(&attr); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to init pthread attributes: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_attr_setstacksize(&attr, KNET_THREAD_STACK_SIZE); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to set stack size attribute: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_create(&knet_h->pmtud_link_handler_thread, &attr, _handle_pmtud_link_thread, (void *) knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to start pmtud link thread: %s", strerror(savederrno)); goto exit_fail; } set_thread_status(knet_h, KNET_THREAD_DST_LINK, KNET_THREAD_REGISTERED); savederrno = pthread_create(&knet_h->dst_link_handler_thread, &attr, _handle_dst_link_handler_thread, (void *) knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to start dst cache thread: %s", strerror(savederrno)); goto exit_fail; } set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_REGISTERED); savederrno = pthread_create(&knet_h->send_to_links_thread, &attr, _handle_send_to_links_thread, (void *) knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to start datafd to link thread: %s", strerror(savederrno)); goto exit_fail; } set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_REGISTERED); savederrno = pthread_create(&knet_h->recv_from_links_thread, &attr, _handle_recv_from_links_thread, (void *) knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to start link to datafd thread: %s", strerror(savederrno)); goto exit_fail; } set_thread_status(knet_h, KNET_THREAD_HB, KNET_THREAD_REGISTERED); savederrno = pthread_create(&knet_h->heartbt_thread, &attr, _handle_heartbt_thread, (void *) knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to start heartbeat thread: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_attr_destroy(&attr); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to destroy pthread attributes: %s", strerror(savederrno)); /* * Do not return error code. Error is not critical. */ } return 0; exit_fail: errno = savederrno; return -1; } static void _stop_threads(knet_handle_t knet_h) { void *retval; wait_all_threads_status(knet_h, KNET_THREAD_STOPPED); if (knet_h->heartbt_thread) { pthread_cancel(knet_h->heartbt_thread); pthread_join(knet_h->heartbt_thread, &retval); } if (knet_h->send_to_links_thread) { pthread_cancel(knet_h->send_to_links_thread); pthread_join(knet_h->send_to_links_thread, &retval); } if (knet_h->recv_from_links_thread) { pthread_cancel(knet_h->recv_from_links_thread); pthread_join(knet_h->recv_from_links_thread, &retval); } if (knet_h->dst_link_handler_thread) { pthread_cancel(knet_h->dst_link_handler_thread); pthread_join(knet_h->dst_link_handler_thread, &retval); } if (knet_h->pmtud_link_handler_thread) { pthread_cancel(knet_h->pmtud_link_handler_thread); pthread_join(knet_h->pmtud_link_handler_thread, &retval); } } knet_handle_t knet_handle_new_ex(knet_node_id_t host_id, int log_fd, uint8_t default_log_level, uint64_t flags) { knet_handle_t knet_h; int savederrno = 0; struct rlimit cur; if (getrlimit(RLIMIT_NOFILE, &cur) < 0) { return NULL; } if ((log_fd < 0) || ((unsigned int)log_fd >= cur.rlim_max)) { errno = EINVAL; return NULL; } /* * validate incoming request */ if ((log_fd) && (default_log_level > KNET_LOG_DEBUG)) { errno = EINVAL; return NULL; } if (flags > KNET_HANDLE_FLAG_PRIVILEGED * 2 - 1) { errno = EINVAL; return NULL; } /* * allocate handle */ knet_h = malloc(sizeof(struct knet_handle)); if (!knet_h) { errno = ENOMEM; return NULL; } memset(knet_h, 0, sizeof(struct knet_handle)); /* * setting up some handle data so that we can use logging * also when initializing the library global locks * and trackers */ knet_h->flags = flags; /* * copy config in place */ knet_h->host_id = host_id; knet_h->logfd = log_fd; if (knet_h->logfd > 0) { memset(&knet_h->log_levels, default_log_level, KNET_MAX_SUBSYSTEMS); } /* * set pmtud default timers */ knet_h->pmtud_interval = KNET_PMTUD_DEFAULT_INTERVAL; /* * set transports reconnect default timers */ knet_h->reconnect_int = KNET_TRANSPORT_DEFAULT_RECONNECT_INTERVAL; /* * Set 'min' stats to the maximum value so the * first value we get is always less */ knet_h->stats.tx_compress_time_min = UINT64_MAX; knet_h->stats.rx_compress_time_min = UINT64_MAX; knet_h->stats.tx_crypt_time_min = UINT64_MAX; knet_h->stats.rx_crypt_time_min = UINT64_MAX; /* * init global shlib tracker */ savederrno = pthread_mutex_lock(&handle_config_mutex); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get handle mutex lock: %s", strerror(savederrno)); free(knet_h); knet_h = NULL; errno = savederrno; return NULL; } knet_ref++; if (_init_shlib_tracker(knet_h) < 0) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to init handle tracker: %s", strerror(savederrno)); errno = savederrno; pthread_mutex_unlock(&handle_config_mutex); goto exit_fail; } pthread_mutex_unlock(&handle_config_mutex); /* * init main locking structures */ if (_init_locks(knet_h)) { savederrno = errno; goto exit_fail; } /* * init sockets */ if (_init_socks(knet_h)) { savederrno = errno; goto exit_fail; } /* * allocate packet buffers */ if (_init_buffers(knet_h)) { savederrno = errno; goto exit_fail; } if (compress_init(knet_h)) { savederrno = errno; goto exit_fail; } /* * create epoll fds */ if (_init_epolls(knet_h)) { savederrno = errno; goto exit_fail; } /* * start transports */ if (start_all_transports(knet_h)) { savederrno = errno; goto exit_fail; } /* * start internal threads */ if (_start_threads(knet_h)) { savederrno = errno; goto exit_fail; } wait_all_threads_status(knet_h, KNET_THREAD_STARTED); errno = 0; return knet_h; exit_fail: knet_handle_free(knet_h); errno = savederrno; return NULL; } knet_handle_t knet_handle_new(knet_node_id_t host_id, int log_fd, uint8_t default_log_level) { return knet_handle_new_ex(host_id, log_fd, default_log_level, KNET_HANDLE_FLAG_PRIVILEGED); } int knet_handle_free(knet_handle_t knet_h) { int savederrno = 0; if (!knet_h) { errno = EINVAL; return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (knet_h->host_head != NULL) { savederrno = EBUSY; log_err(knet_h, KNET_SUB_HANDLE, "Unable to free handle: host(s) or listener(s) are still active: %s", strerror(savederrno)); pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return -1; } knet_h->fini_in_progress = 1; pthread_rwlock_unlock(&knet_h->global_rwlock); _stop_threads(knet_h); stop_all_transports(knet_h); _close_epolls(knet_h); _destroy_buffers(knet_h); _close_socks(knet_h); crypto_fini(knet_h, KNET_MAX_CRYPTO_INSTANCES + 1); /* values above MAX_CRYPTO will release all crypto resources */ compress_fini(knet_h, 1); _destroy_locks(knet_h); free(knet_h); knet_h = NULL; (void)pthread_mutex_lock(&handle_config_mutex); knet_ref--; _fini_shlib_tracker(); pthread_mutex_unlock(&handle_config_mutex); errno = 0; return 0; } - -int knet_handle_enable_sock_notify(knet_handle_t knet_h, - void *sock_notify_fn_private_data, - void (*sock_notify_fn) ( - void *private_data, - int datafd, - int8_t channel, - uint8_t tx_rx, - int error, - int errorno)) -{ - int savederrno = 0; - - if (!knet_h) { - errno = EINVAL; - return -1; - } - - if (!sock_notify_fn) { - errno = EINVAL; - return -1; - } - - savederrno = get_global_wrlock(knet_h); - if (savederrno) { - log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", - strerror(savederrno)); - errno = savederrno; - return -1; - } - - knet_h->sock_notify_fn_private_data = sock_notify_fn_private_data; - knet_h->sock_notify_fn = sock_notify_fn; - log_debug(knet_h, KNET_SUB_HANDLE, "sock_notify_fn enabled"); - - pthread_rwlock_unlock(&knet_h->global_rwlock); - - return 0; -} - -int knet_handle_add_datafd(knet_handle_t knet_h, int *datafd, int8_t *channel) -{ - int err = 0, savederrno = 0; - int i; - struct epoll_event ev; - - if (!knet_h) { - errno = EINVAL; - return -1; - } - - if (datafd == NULL) { - errno = EINVAL; - return -1; - } - - if (channel == NULL) { - errno = EINVAL; - return -1; - } - - if (*channel >= KNET_DATAFD_MAX) { - errno = EINVAL; - return -1; - } - - savederrno = get_global_wrlock(knet_h); - if (savederrno) { - log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", - strerror(savederrno)); - errno = savederrno; - return -1; - } - - if (!knet_h->sock_notify_fn) { - log_err(knet_h, KNET_SUB_HANDLE, "Adding datafd requires sock notify callback enabled!"); - savederrno = EINVAL; - err = -1; - goto out_unlock; - } - - if (*datafd > 0) { - for (i = 0; i < KNET_DATAFD_MAX; i++) { - if ((knet_h->sockfd[i].in_use) && (knet_h->sockfd[i].sockfd[0] == *datafd)) { - log_err(knet_h, KNET_SUB_HANDLE, "requested datafd: %d already exist in index: %d", *datafd, i); - savederrno = EEXIST; - err = -1; - goto out_unlock; - } - } - } - - /* - * auto allocate a channel - */ - if (*channel < 0) { - for (i = 0; i < KNET_DATAFD_MAX; i++) { - if (!knet_h->sockfd[i].in_use) { - *channel = i; - break; - } - } - if (*channel < 0) { - savederrno = EBUSY; - err = -1; - goto out_unlock; - } - } else { - if (knet_h->sockfd[*channel].in_use) { - savederrno = EBUSY; - err = -1; - goto out_unlock; - } - } - - knet_h->sockfd[*channel].is_created = 0; - knet_h->sockfd[*channel].is_socket = 0; - knet_h->sockfd[*channel].has_error = 0; - - if (*datafd > 0) { - int sockopt; - socklen_t sockoptlen = sizeof(sockopt); - - if (_fdset_cloexec(*datafd)) { - savederrno = errno; - err = -1; - log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on datafd: %s", - strerror(savederrno)); - goto out_unlock; - } - - if (_fdset_nonblock(*datafd)) { - savederrno = errno; - err = -1; - log_err(knet_h, KNET_SUB_HANDLE, "Unable to set NONBLOCK on datafd: %s", - strerror(savederrno)); - goto out_unlock; - } - - knet_h->sockfd[*channel].sockfd[0] = *datafd; - knet_h->sockfd[*channel].sockfd[1] = 0; - - if (!getsockopt(knet_h->sockfd[*channel].sockfd[0], SOL_SOCKET, SO_TYPE, &sockopt, &sockoptlen)) { - knet_h->sockfd[*channel].is_socket = 1; - } - } else { - if (_init_socketpair(knet_h, knet_h->sockfd[*channel].sockfd)) { - savederrno = errno; - err = -1; - goto out_unlock; - } - - knet_h->sockfd[*channel].is_created = 1; - knet_h->sockfd[*channel].is_socket = 1; - *datafd = knet_h->sockfd[*channel].sockfd[0]; - } - - memset(&ev, 0, sizeof(struct epoll_event)); - ev.events = EPOLLIN; - ev.data.fd = knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created]; - - if (epoll_ctl(knet_h->send_to_links_epollfd, - EPOLL_CTL_ADD, knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created], &ev)) { - savederrno = errno; - err = -1; - log_err(knet_h, KNET_SUB_HANDLE, "Unable to add datafd %d to linkfd epoll pool: %s", - knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created], strerror(savederrno)); - if (knet_h->sockfd[*channel].is_created) { - _close_socketpair(knet_h, knet_h->sockfd[*channel].sockfd); - } - goto out_unlock; - } - - knet_h->sockfd[*channel].in_use = 1; - -out_unlock: - pthread_rwlock_unlock(&knet_h->global_rwlock); - errno = err ? savederrno : 0; - return err; -} - -int knet_handle_remove_datafd(knet_handle_t knet_h, int datafd) -{ - int err = 0, savederrno = 0; - int8_t channel = -1; - int i; - struct epoll_event ev; - - if (!knet_h) { - errno = EINVAL; - return -1; - } - - if (datafd <= 0) { - errno = EINVAL; - return -1; - } - - savederrno = get_global_wrlock(knet_h); - if (savederrno) { - log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", - strerror(savederrno)); - errno = savederrno; - return -1; - } - - for (i = 0; i < KNET_DATAFD_MAX; i++) { - if ((knet_h->sockfd[i].in_use) && - (knet_h->sockfd[i].sockfd[0] == datafd)) { - channel = i; - break; - } - } - - if (channel < 0) { - savederrno = EINVAL; - err = -1; - goto out_unlock; - } - - if (!knet_h->sockfd[channel].has_error) { - memset(&ev, 0, sizeof(struct epoll_event)); - - if (epoll_ctl(knet_h->send_to_links_epollfd, - EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) { - savederrno = errno; - err = -1; - log_err(knet_h, KNET_SUB_HANDLE, "Unable to del datafd %d from linkfd epoll pool: %s", - knet_h->sockfd[channel].sockfd[0], strerror(savederrno)); - goto out_unlock; - } - } - - if (knet_h->sockfd[channel].is_created) { - _close_socketpair(knet_h, knet_h->sockfd[channel].sockfd); - } - - memset(&knet_h->sockfd[channel], 0, sizeof(struct knet_sock)); - -out_unlock: - pthread_rwlock_unlock(&knet_h->global_rwlock); - errno = err ? savederrno : 0; - return err; -} - -int knet_handle_get_datafd(knet_handle_t knet_h, const int8_t channel, int *datafd) -{ - int err = 0, savederrno = 0; - - if (!knet_h) { - errno = EINVAL; - return -1; - } - - if ((channel < 0) || (channel >= KNET_DATAFD_MAX)) { - errno = EINVAL; - return -1; - } - - if (datafd == NULL) { - errno = EINVAL; - return -1; - } - - savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); - if (savederrno) { - log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", - strerror(savederrno)); - errno = savederrno; - return -1; - } - - if (!knet_h->sockfd[channel].in_use) { - savederrno = EINVAL; - err = -1; - goto out_unlock; - } - - *datafd = knet_h->sockfd[channel].sockfd[0]; - -out_unlock: - pthread_rwlock_unlock(&knet_h->global_rwlock); - errno = err ? savederrno : 0; - return err; -} - -int knet_handle_get_channel(knet_handle_t knet_h, const int datafd, int8_t *channel) -{ - int err = 0, savederrno = 0; - int i; - - if (!knet_h) { - errno = EINVAL; - return -1; - } - - if (datafd <= 0) { - errno = EINVAL; - return -1; - } - - if (channel == NULL) { - errno = EINVAL; - return -1; - } - - savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); - if (savederrno) { - log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", - strerror(savederrno)); - errno = savederrno; - return -1; - } - - *channel = -1; - - for (i = 0; i < KNET_DATAFD_MAX; i++) { - if ((knet_h->sockfd[i].in_use) && - (knet_h->sockfd[i].sockfd[0] == datafd)) { - *channel = i; - break; - } - } - - if (*channel < 0) { - savederrno = EINVAL; - err = -1; - goto out_unlock; - } - -out_unlock: - pthread_rwlock_unlock(&knet_h->global_rwlock); - errno = err ? savederrno : 0; - return err; -} - -int knet_handle_enable_filter(knet_handle_t knet_h, - void *dst_host_filter_fn_private_data, - int (*dst_host_filter_fn) ( - void *private_data, - const unsigned char *outdata, - ssize_t outdata_len, - uint8_t tx_rx, - knet_node_id_t this_host_id, - knet_node_id_t src_node_id, - int8_t *channel, - knet_node_id_t *dst_host_ids, - size_t *dst_host_ids_entries)) -{ - int savederrno = 0; - - if (!knet_h) { - errno = EINVAL; - return -1; - } - - savederrno = get_global_wrlock(knet_h); - if (savederrno) { - log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", - strerror(savederrno)); - errno = savederrno; - return -1; - } - - knet_h->dst_host_filter_fn_private_data = dst_host_filter_fn_private_data; - knet_h->dst_host_filter_fn = dst_host_filter_fn; - if (knet_h->dst_host_filter_fn) { - log_debug(knet_h, KNET_SUB_HANDLE, "dst_host_filter_fn enabled"); - } else { - log_debug(knet_h, KNET_SUB_HANDLE, "dst_host_filter_fn disabled"); - } - - pthread_rwlock_unlock(&knet_h->global_rwlock); - - errno = 0; - return 0; -} - -int knet_handle_setfwd(knet_handle_t knet_h, unsigned int enabled) -{ - int savederrno = 0; - - if (!knet_h) { - errno = EINVAL; - return -1; - } - - if (enabled > 1) { - errno = EINVAL; - return -1; - } - - savederrno = get_global_wrlock(knet_h); - if (savederrno) { - log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", - strerror(savederrno)); - errno = savederrno; - return -1; - } - - if (enabled) { - knet_h->enabled = enabled; - log_debug(knet_h, KNET_SUB_HANDLE, "Data forwarding is enabled"); - } else { - /* - * notify TX and RX threads to flush the queues - */ - if (set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSH) < 0) { - log_debug(knet_h, KNET_SUB_HANDLE, "Unable to request queue flushing for TX thread"); - } - if (set_thread_flush_queue(knet_h, KNET_THREAD_RX, KNET_THREAD_QUEUE_FLUSH) < 0) { - log_debug(knet_h, KNET_SUB_HANDLE, "Unable to request queue flushing for RX thread"); - } - } - - pthread_rwlock_unlock(&knet_h->global_rwlock); - - /* - * when disabling data forward, we need to give time to TX and RX - * to flush the queues. - * - * the TX thread is the main leader here. When there is no more - * data in the TX queue, we will also close traffic for RX. - */ - if (!enabled) { - /* - * this usleep might be unnecessary, but wait_all_threads_flush_queue - * adds extra locking delay. - * - * allow all threads to run free without extra locking interference - * and then we switch to a more active wait in case the scheduler - * has decided to delay one thread or another - */ - usleep(KNET_THREADS_TIMERES * 2); - wait_all_threads_flush_queue(knet_h); - - /* - * all threads have done flushing the queue, we can stop data forwarding - */ - savederrno = get_global_wrlock(knet_h); - if (savederrno) { - log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", - strerror(savederrno)); - errno = savederrno; - return -1; - } - knet_h->enabled = enabled; - log_debug(knet_h, KNET_SUB_HANDLE, "Data forwarding is disabled"); - pthread_rwlock_unlock(&knet_h->global_rwlock); - } - - errno = 0; - return 0; -} - -int knet_handle_enable_access_lists(knet_handle_t knet_h, unsigned int enabled) -{ - int savederrno = 0; - - if (!knet_h) { - errno = EINVAL; - return -1; - } - - if (enabled > 1) { - errno = EINVAL; - return -1; - } - - savederrno = get_global_wrlock(knet_h); - if (savederrno) { - log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", - strerror(savederrno)); - errno = savederrno; - return -1; - } - - knet_h->use_access_lists = enabled; - - if (enabled) { - log_debug(knet_h, KNET_SUB_HANDLE, "Links access lists are enabled"); - } else { - log_debug(knet_h, KNET_SUB_HANDLE, "Links access lists are disabled"); - } - - pthread_rwlock_unlock(&knet_h->global_rwlock); - - errno = 0; - return 0; -} - -int knet_handle_pmtud_getfreq(knet_handle_t knet_h, unsigned int *interval) -{ - int savederrno = 0; - - if (!knet_h) { - errno = EINVAL; - return -1; - } - - if (!interval) { - errno = EINVAL; - return -1; - } - - savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); - if (savederrno) { - log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", - strerror(savederrno)); - errno = savederrno; - return -1; - } - - *interval = knet_h->pmtud_interval; - - pthread_rwlock_unlock(&knet_h->global_rwlock); - - errno = 0; - return 0; -} - -int knet_handle_pmtud_setfreq(knet_handle_t knet_h, unsigned int interval) -{ - int savederrno = 0; - - if (!knet_h) { - errno = EINVAL; - return -1; - } - - if ((!interval) || (interval > 86400)) { - errno = EINVAL; - return -1; - } - - savederrno = get_global_wrlock(knet_h); - if (savederrno) { - log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", - strerror(savederrno)); - errno = savederrno; - return -1; - } - - knet_h->pmtud_interval = interval; - log_debug(knet_h, KNET_SUB_HANDLE, "PMTUd interval set to: %u seconds", interval); - - pthread_rwlock_unlock(&knet_h->global_rwlock); - - errno = 0; - return 0; -} - -int knet_handle_enable_pmtud_notify(knet_handle_t knet_h, - void *pmtud_notify_fn_private_data, - void (*pmtud_notify_fn) ( - void *private_data, - unsigned int data_mtu)) -{ - int savederrno = 0; - - if (!knet_h) { - errno = EINVAL; - return -1; - } - - savederrno = get_global_wrlock(knet_h); - if (savederrno) { - log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", - strerror(savederrno)); - errno = savederrno; - return -1; - } - - knet_h->pmtud_notify_fn_private_data = pmtud_notify_fn_private_data; - knet_h->pmtud_notify_fn = pmtud_notify_fn; - if (knet_h->pmtud_notify_fn) { - log_debug(knet_h, KNET_SUB_HANDLE, "pmtud_notify_fn enabled"); - } else { - log_debug(knet_h, KNET_SUB_HANDLE, "pmtud_notify_fn disabled"); - } - - pthread_rwlock_unlock(&knet_h->global_rwlock); - - errno = 0; - return 0; -} - -int knet_handle_pmtud_set(knet_handle_t knet_h, - unsigned int iface_mtu) -{ - int savederrno = 0; - - if (!knet_h) { - errno = EINVAL; - return -1; - } - - if (iface_mtu > KNET_PMTUD_SIZE_V4) { - errno = EINVAL; - return -1; - } - - savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); - if (savederrno) { - log_err(knet_h, KNET_SUB_PMTUD, "Unable to get read lock: %s", - strerror(savederrno)); - errno = savederrno; - return -1; - } - - log_info(knet_h, KNET_SUB_PMTUD, "MTU manually set to: %u", iface_mtu); - - knet_h->manual_mtu = iface_mtu; - - force_pmtud_run(knet_h, KNET_SUB_PMTUD, 0); - - pthread_rwlock_unlock(&knet_h->global_rwlock); - - errno = 0; - return 0; -} - -int knet_handle_pmtud_get(knet_handle_t knet_h, - unsigned int *data_mtu) -{ - int savederrno = 0; - - if (!knet_h) { - errno = EINVAL; - return -1; - } - - if (!data_mtu) { - errno = EINVAL; - return -1; - } - - savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); - if (savederrno) { - log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", - strerror(savederrno)); - errno = savederrno; - return -1; - } - - *data_mtu = knet_h->data_mtu; - - pthread_rwlock_unlock(&knet_h->global_rwlock); - - errno = 0; - return 0; -} - -static int _knet_handle_crypto_set_config(knet_handle_t knet_h, - struct knet_handle_crypto_cfg *knet_handle_crypto_cfg, - uint8_t config_num, - uint8_t force) -{ - int savederrno = 0; - int err = 0; - - if (!knet_h) { - errno = EINVAL; - return -1; - } - - if (!knet_handle_crypto_cfg) { - errno = EINVAL; - return -1; - } - - if ((config_num < 1) || (config_num > KNET_MAX_CRYPTO_INSTANCES)) { - errno = EINVAL; - return -1; - } - - savederrno = get_global_wrlock(knet_h); - if (savederrno) { - log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", - strerror(savederrno)); - errno = savederrno; - return -1; - } - - if ((knet_h->crypto_in_use_config == config_num) && (!force)) { - savederrno = EBUSY; - err = -1; - goto exit_unlock; - } - - if ((!strncmp("none", knet_handle_crypto_cfg->crypto_model, 4)) || - ((!strncmp("none", knet_handle_crypto_cfg->crypto_cipher_type, 4)) && - (!strncmp("none", knet_handle_crypto_cfg->crypto_hash_type, 4)))) { - crypto_fini(knet_h, config_num); - log_debug(knet_h, KNET_SUB_CRYPTO, "crypto config %u is not enabled", config_num); - err = 0; - goto exit_unlock; - } - - if (knet_handle_crypto_cfg->private_key_len < KNET_MIN_KEY_LEN) { - log_debug(knet_h, KNET_SUB_CRYPTO, "private key len too short for config %u (min %d): %u", - config_num, KNET_MIN_KEY_LEN, knet_handle_crypto_cfg->private_key_len); - savederrno = EINVAL; - err = -1; - goto exit_unlock; - } - - if (knet_handle_crypto_cfg->private_key_len > KNET_MAX_KEY_LEN) { - log_debug(knet_h, KNET_SUB_CRYPTO, "private key len too long for config %u (max %d): %u", - config_num, KNET_MAX_KEY_LEN, knet_handle_crypto_cfg->private_key_len); - savederrno = EINVAL; - err = -1; - goto exit_unlock; - } - - err = crypto_init(knet_h, knet_handle_crypto_cfg, config_num); - - if (err) { - err = -2; - savederrno = errno; - } - -exit_unlock: - pthread_rwlock_unlock(&knet_h->global_rwlock); - errno = err ? savederrno : 0; - return err; -} - -int knet_handle_crypto_set_config(knet_handle_t knet_h, - struct knet_handle_crypto_cfg *knet_handle_crypto_cfg, - uint8_t config_num) -{ - return _knet_handle_crypto_set_config(knet_h, knet_handle_crypto_cfg, config_num, 0); -} - -int knet_handle_crypto_rx_clear_traffic(knet_handle_t knet_h, - uint8_t value) -{ - int savederrno = 0; - - if (!knet_h) { - errno = EINVAL; - return -1; - } - - if (value > KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC) { - errno = EINVAL; - return -1; - } - - savederrno = get_global_wrlock(knet_h); - if (savederrno) { - log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", - strerror(savederrno)); - errno = savederrno; - return -1; - } - - knet_h->crypto_only = value; - if (knet_h->crypto_only) { - log_debug(knet_h, KNET_SUB_CRYPTO, "Only crypto traffic allowed for RX"); - } else { - log_debug(knet_h, KNET_SUB_CRYPTO, "Both crypto and clear traffic allowed for RX"); - } - - pthread_rwlock_unlock(&knet_h->global_rwlock); - return 0; -} - -int knet_handle_crypto_use_config(knet_handle_t knet_h, - uint8_t config_num) -{ - int savederrno = 0; - int err = 0; - - if (!knet_h) { - errno = EINVAL; - return -1; - } - - if (config_num > KNET_MAX_CRYPTO_INSTANCES) { - errno = EINVAL; - return -1; - } - - savederrno = get_global_wrlock(knet_h); - if (savederrno) { - log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", - strerror(savederrno)); - errno = savederrno; - return -1; - } - - err = crypto_use_config(knet_h, config_num); - savederrno = errno; - - pthread_rwlock_unlock(&knet_h->global_rwlock); - errno = err ? savederrno : 0; - return err; -} - -/* - * compatibility wrapper for 1.x releases - */ -int knet_handle_crypto(knet_handle_t knet_h, struct knet_handle_crypto_cfg *knet_handle_crypto_cfg) -{ - int err = 0; - uint8_t value; - - if (!knet_h) { - errno = EINVAL; - return -1; - } - - value = knet_h->crypto_only; - /* - * configure crypto in slot 1 - */ - err = _knet_handle_crypto_set_config(knet_h, knet_handle_crypto_cfg, 1, 1); - if (err < 0) { - return err; - } - - if ((!strncmp("none", knet_handle_crypto_cfg->crypto_model, 4)) || - ((!strncmp("none", knet_handle_crypto_cfg->crypto_cipher_type, 4)) && - (!strncmp("none", knet_handle_crypto_cfg->crypto_hash_type, 4)))) { - err = knet_handle_crypto_rx_clear_traffic(knet_h, KNET_CRYPTO_RX_ALLOW_CLEAR_TRAFFIC); - if (err < 0) { - return err; - } - - /* - * start using clear traffic - */ - err = knet_handle_crypto_use_config(knet_h, 0); - if (err < 0) { - err = knet_handle_crypto_rx_clear_traffic(knet_h, value); - if (err < 0) { - /* - * force attempt or things will go bad - */ - knet_h->crypto_only = value; - } - } - return err; - } else { - err = knet_handle_crypto_rx_clear_traffic(knet_h, KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC); - if (err < 0) { - return err; - } - - /* - * start using crypto traffic - */ - err = knet_handle_crypto_use_config(knet_h, 1); - if (err < 0) { - err = knet_handle_crypto_rx_clear_traffic(knet_h, value); - if (err < 0) { - /* - * force attempt or things will go bad - */ - knet_h->crypto_only = value; - } - } - return err; - } -} - -int knet_handle_compress(knet_handle_t knet_h, struct knet_handle_compress_cfg *knet_handle_compress_cfg) -{ - int savederrno = 0; - int err = 0; - - if (!knet_h) { - errno = EINVAL; - return -1; - } - - if (!knet_handle_compress_cfg) { - errno = EINVAL; - return -1; - } - - savederrno = get_global_wrlock(knet_h); - if (savederrno) { - log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", - strerror(savederrno)); - errno = savederrno; - return -1; - } - - compress_fini(knet_h, 0); - err = compress_cfg(knet_h, knet_handle_compress_cfg); - savederrno = errno; - - pthread_rwlock_unlock(&knet_h->global_rwlock); - errno = err ? savederrno : 0; - return err; -} - -ssize_t knet_recv(knet_handle_t knet_h, char *buff, const size_t buff_len, const int8_t channel) -{ - int savederrno = 0; - ssize_t err = 0; - struct iovec iov_in; - - if (!knet_h) { - errno = EINVAL; - return -1; - } - - if (buff == NULL) { - errno = EINVAL; - return -1; - } - - if (buff_len <= 0) { - errno = EINVAL; - return -1; - } - - if (buff_len > KNET_MAX_PACKET_SIZE) { - errno = EINVAL; - return -1; - } - - if (channel < 0) { - errno = EINVAL; - return -1; - } - - if (channel >= KNET_DATAFD_MAX) { - errno = EINVAL; - return -1; - } - - savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); - if (savederrno) { - log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", - strerror(savederrno)); - errno = savederrno; - return -1; - } - - if (!knet_h->sockfd[channel].in_use) { - savederrno = EINVAL; - err = -1; - goto out_unlock; - } - - memset(&iov_in, 0, sizeof(iov_in)); - iov_in.iov_base = (void *)buff; - iov_in.iov_len = buff_len; - - err = readv(knet_h->sockfd[channel].sockfd[0], &iov_in, 1); - savederrno = errno; - -out_unlock: - pthread_rwlock_unlock(&knet_h->global_rwlock); - errno = err ? savederrno : 0; - return err; -} - -ssize_t knet_send(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel) -{ - int savederrno = 0; - ssize_t err = 0; - struct iovec iov_out[1]; - - if (!knet_h) { - errno = EINVAL; - return -1; - } - - if (buff == NULL) { - errno = EINVAL; - return -1; - } - - if (buff_len <= 0) { - errno = EINVAL; - return -1; - } - - if (buff_len > KNET_MAX_PACKET_SIZE) { - errno = EINVAL; - return -1; - } - - if (channel < 0) { - errno = EINVAL; - return -1; - } - - if (channel >= KNET_DATAFD_MAX) { - errno = EINVAL; - return -1; - } - - savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); - if (savederrno) { - log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", - strerror(savederrno)); - errno = savederrno; - return -1; - } - - if (!knet_h->sockfd[channel].in_use) { - savederrno = EINVAL; - err = -1; - goto out_unlock; - } - - memset(iov_out, 0, sizeof(iov_out)); - - iov_out[0].iov_base = (void *)buff; - iov_out[0].iov_len = buff_len; - - err = writev(knet_h->sockfd[channel].sockfd[0], iov_out, 1); - savederrno = errno; - -out_unlock: - pthread_rwlock_unlock(&knet_h->global_rwlock); - errno = err ? savederrno : 0; - return err; -} - -int knet_handle_get_stats(knet_handle_t knet_h, struct knet_handle_stats *stats, size_t struct_size) -{ - int err = 0, savederrno = 0; - - if (!knet_h) { - errno = EINVAL; - return -1; - } - - if (!stats) { - errno = EINVAL; - return -1; - } - - savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); - if (savederrno) { - log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", - strerror(savederrno)); - errno = savederrno; - return -1; - } - - savederrno = pthread_mutex_lock(&knet_h->handle_stats_mutex); - if (savederrno) { - log_err(knet_h, KNET_SUB_HANDLE, "Unable to get mutex lock: %s", - strerror(savederrno)); - err = -1; - goto out_unlock; - } - - if (struct_size > sizeof(struct knet_handle_stats)) { - struct_size = sizeof(struct knet_handle_stats); - } - - memmove(stats, &knet_h->stats, struct_size); - - /* - * TX crypt stats only count the data packets sent, so add in the ping/pong/pmtud figures - * RX is OK as it counts them before they are sorted. - */ - - stats->tx_crypt_packets += knet_h->stats_extra.tx_crypt_ping_packets + - knet_h->stats_extra.tx_crypt_pong_packets + - knet_h->stats_extra.tx_crypt_pmtu_packets + - knet_h->stats_extra.tx_crypt_pmtu_reply_packets; - - /* Tell the caller our full size in case they have an old version */ - stats->size = sizeof(struct knet_handle_stats); - -out_unlock: - pthread_mutex_unlock(&knet_h->handle_stats_mutex); - pthread_rwlock_unlock(&knet_h->global_rwlock); - return err; -} - -int knet_handle_clear_stats(knet_handle_t knet_h, int clear_option) -{ - int savederrno = 0; - - if (!knet_h) { - errno = EINVAL; - return -1; - } - - if (clear_option != KNET_CLEARSTATS_HANDLE_ONLY && - clear_option != KNET_CLEARSTATS_HANDLE_AND_LINK) { - errno = EINVAL; - return -1; - } - - savederrno = get_global_wrlock(knet_h); - if (savederrno) { - log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", - strerror(savederrno)); - errno = savederrno; - return -1; - } - - memset(&knet_h->stats, 0, sizeof(struct knet_handle_stats)); - memset(&knet_h->stats_extra, 0, sizeof(struct knet_handle_stats_extra)); - if (clear_option == KNET_CLEARSTATS_HANDLE_AND_LINK) { - _link_clear_stats(knet_h); - } - - pthread_rwlock_unlock(&knet_h->global_rwlock); - return 0; -} diff --git a/libknet/handle_api.c b/libknet/handle_api.c new file mode 100644 index 00000000..bf4324f5 --- /dev/null +++ b/libknet/handle_api.c @@ -0,0 +1,602 @@ +/* + * Copyright (C) 2020 Red Hat, Inc. All rights reserved. + * + * Authors: Fabio M. Di Nitto + * Federico Simoncelli + * + * This software licensed under LGPL-2.0+ + */ + +#include "config.h" + +#include +#include +#include +#include +#include +#include + +#include "internals.h" +#include "crypto.h" +#include "links.h" +#include "common.h" +#include "transport_common.h" +#include "logging.h" + +int knet_handle_enable_sock_notify(knet_handle_t knet_h, + void *sock_notify_fn_private_data, + void (*sock_notify_fn) ( + void *private_data, + int datafd, + int8_t channel, + uint8_t tx_rx, + int error, + int errorno)) +{ + int savederrno = 0; + + if (!knet_h) { + errno = EINVAL; + return -1; + } + + if (!sock_notify_fn) { + errno = EINVAL; + return -1; + } + + savederrno = get_global_wrlock(knet_h); + if (savederrno) { + log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", + strerror(savederrno)); + errno = savederrno; + return -1; + } + + knet_h->sock_notify_fn_private_data = sock_notify_fn_private_data; + knet_h->sock_notify_fn = sock_notify_fn; + log_debug(knet_h, KNET_SUB_HANDLE, "sock_notify_fn enabled"); + + pthread_rwlock_unlock(&knet_h->global_rwlock); + + return 0; +} + +int knet_handle_add_datafd(knet_handle_t knet_h, int *datafd, int8_t *channel) +{ + int err = 0, savederrno = 0; + int i; + struct epoll_event ev; + + if (!knet_h) { + errno = EINVAL; + return -1; + } + + if (datafd == NULL) { + errno = EINVAL; + return -1; + } + + if (channel == NULL) { + errno = EINVAL; + return -1; + } + + if (*channel >= KNET_DATAFD_MAX) { + errno = EINVAL; + return -1; + } + + savederrno = get_global_wrlock(knet_h); + if (savederrno) { + log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", + strerror(savederrno)); + errno = savederrno; + return -1; + } + + if (!knet_h->sock_notify_fn) { + log_err(knet_h, KNET_SUB_HANDLE, "Adding datafd requires sock notify callback enabled!"); + savederrno = EINVAL; + err = -1; + goto out_unlock; + } + + if (*datafd > 0) { + for (i = 0; i < KNET_DATAFD_MAX; i++) { + if ((knet_h->sockfd[i].in_use) && (knet_h->sockfd[i].sockfd[0] == *datafd)) { + log_err(knet_h, KNET_SUB_HANDLE, "requested datafd: %d already exist in index: %d", *datafd, i); + savederrno = EEXIST; + err = -1; + goto out_unlock; + } + } + } + + /* + * auto allocate a channel + */ + if (*channel < 0) { + for (i = 0; i < KNET_DATAFD_MAX; i++) { + if (!knet_h->sockfd[i].in_use) { + *channel = i; + break; + } + } + if (*channel < 0) { + savederrno = EBUSY; + err = -1; + goto out_unlock; + } + } else { + if (knet_h->sockfd[*channel].in_use) { + savederrno = EBUSY; + err = -1; + goto out_unlock; + } + } + + knet_h->sockfd[*channel].is_created = 0; + knet_h->sockfd[*channel].is_socket = 0; + knet_h->sockfd[*channel].has_error = 0; + + if (*datafd > 0) { + int sockopt; + socklen_t sockoptlen = sizeof(sockopt); + + if (_fdset_cloexec(*datafd)) { + savederrno = errno; + err = -1; + log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on datafd: %s", + strerror(savederrno)); + goto out_unlock; + } + + if (_fdset_nonblock(*datafd)) { + savederrno = errno; + err = -1; + log_err(knet_h, KNET_SUB_HANDLE, "Unable to set NONBLOCK on datafd: %s", + strerror(savederrno)); + goto out_unlock; + } + + knet_h->sockfd[*channel].sockfd[0] = *datafd; + knet_h->sockfd[*channel].sockfd[1] = 0; + + if (!getsockopt(knet_h->sockfd[*channel].sockfd[0], SOL_SOCKET, SO_TYPE, &sockopt, &sockoptlen)) { + knet_h->sockfd[*channel].is_socket = 1; + } + } else { + if (_init_socketpair(knet_h, knet_h->sockfd[*channel].sockfd)) { + savederrno = errno; + err = -1; + goto out_unlock; + } + + knet_h->sockfd[*channel].is_created = 1; + knet_h->sockfd[*channel].is_socket = 1; + *datafd = knet_h->sockfd[*channel].sockfd[0]; + } + + memset(&ev, 0, sizeof(struct epoll_event)); + ev.events = EPOLLIN; + ev.data.fd = knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created]; + + if (epoll_ctl(knet_h->send_to_links_epollfd, + EPOLL_CTL_ADD, knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created], &ev)) { + savederrno = errno; + err = -1; + log_err(knet_h, KNET_SUB_HANDLE, "Unable to add datafd %d to linkfd epoll pool: %s", + knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created], strerror(savederrno)); + if (knet_h->sockfd[*channel].is_created) { + _close_socketpair(knet_h, knet_h->sockfd[*channel].sockfd); + } + goto out_unlock; + } + + knet_h->sockfd[*channel].in_use = 1; + +out_unlock: + pthread_rwlock_unlock(&knet_h->global_rwlock); + errno = err ? savederrno : 0; + return err; +} + +int knet_handle_remove_datafd(knet_handle_t knet_h, int datafd) +{ + int err = 0, savederrno = 0; + int8_t channel = -1; + int i; + struct epoll_event ev; + + if (!knet_h) { + errno = EINVAL; + return -1; + } + + if (datafd <= 0) { + errno = EINVAL; + return -1; + } + + savederrno = get_global_wrlock(knet_h); + if (savederrno) { + log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", + strerror(savederrno)); + errno = savederrno; + return -1; + } + + for (i = 0; i < KNET_DATAFD_MAX; i++) { + if ((knet_h->sockfd[i].in_use) && + (knet_h->sockfd[i].sockfd[0] == datafd)) { + channel = i; + break; + } + } + + if (channel < 0) { + savederrno = EINVAL; + err = -1; + goto out_unlock; + } + + if (!knet_h->sockfd[channel].has_error) { + memset(&ev, 0, sizeof(struct epoll_event)); + + if (epoll_ctl(knet_h->send_to_links_epollfd, + EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) { + savederrno = errno; + err = -1; + log_err(knet_h, KNET_SUB_HANDLE, "Unable to del datafd %d from linkfd epoll pool: %s", + knet_h->sockfd[channel].sockfd[0], strerror(savederrno)); + goto out_unlock; + } + } + + if (knet_h->sockfd[channel].is_created) { + _close_socketpair(knet_h, knet_h->sockfd[channel].sockfd); + } + + memset(&knet_h->sockfd[channel], 0, sizeof(struct knet_sock)); + +out_unlock: + pthread_rwlock_unlock(&knet_h->global_rwlock); + errno = err ? savederrno : 0; + return err; +} + +int knet_handle_get_datafd(knet_handle_t knet_h, const int8_t channel, int *datafd) +{ + int err = 0, savederrno = 0; + + if (!knet_h) { + errno = EINVAL; + return -1; + } + + if ((channel < 0) || (channel >= KNET_DATAFD_MAX)) { + errno = EINVAL; + return -1; + } + + if (datafd == NULL) { + errno = EINVAL; + return -1; + } + + savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); + if (savederrno) { + log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", + strerror(savederrno)); + errno = savederrno; + return -1; + } + + if (!knet_h->sockfd[channel].in_use) { + savederrno = EINVAL; + err = -1; + goto out_unlock; + } + + *datafd = knet_h->sockfd[channel].sockfd[0]; + +out_unlock: + pthread_rwlock_unlock(&knet_h->global_rwlock); + errno = err ? savederrno : 0; + return err; +} + +int knet_handle_get_channel(knet_handle_t knet_h, const int datafd, int8_t *channel) +{ + int err = 0, savederrno = 0; + int i; + + if (!knet_h) { + errno = EINVAL; + return -1; + } + + if (datafd <= 0) { + errno = EINVAL; + return -1; + } + + if (channel == NULL) { + errno = EINVAL; + return -1; + } + + savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); + if (savederrno) { + log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", + strerror(savederrno)); + errno = savederrno; + return -1; + } + + *channel = -1; + + for (i = 0; i < KNET_DATAFD_MAX; i++) { + if ((knet_h->sockfd[i].in_use) && + (knet_h->sockfd[i].sockfd[0] == datafd)) { + *channel = i; + break; + } + } + + if (*channel < 0) { + savederrno = EINVAL; + err = -1; + goto out_unlock; + } + +out_unlock: + pthread_rwlock_unlock(&knet_h->global_rwlock); + errno = err ? savederrno : 0; + return err; +} + +int knet_handle_enable_filter(knet_handle_t knet_h, + void *dst_host_filter_fn_private_data, + int (*dst_host_filter_fn) ( + void *private_data, + const unsigned char *outdata, + ssize_t outdata_len, + uint8_t tx_rx, + knet_node_id_t this_host_id, + knet_node_id_t src_node_id, + int8_t *channel, + knet_node_id_t *dst_host_ids, + size_t *dst_host_ids_entries)) +{ + int savederrno = 0; + + if (!knet_h) { + errno = EINVAL; + return -1; + } + + savederrno = get_global_wrlock(knet_h); + if (savederrno) { + log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", + strerror(savederrno)); + errno = savederrno; + return -1; + } + + knet_h->dst_host_filter_fn_private_data = dst_host_filter_fn_private_data; + knet_h->dst_host_filter_fn = dst_host_filter_fn; + if (knet_h->dst_host_filter_fn) { + log_debug(knet_h, KNET_SUB_HANDLE, "dst_host_filter_fn enabled"); + } else { + log_debug(knet_h, KNET_SUB_HANDLE, "dst_host_filter_fn disabled"); + } + + pthread_rwlock_unlock(&knet_h->global_rwlock); + + errno = 0; + return 0; +} + +int knet_handle_setfwd(knet_handle_t knet_h, unsigned int enabled) +{ + int savederrno = 0; + + if (!knet_h) { + errno = EINVAL; + return -1; + } + + if (enabled > 1) { + errno = EINVAL; + return -1; + } + + savederrno = get_global_wrlock(knet_h); + if (savederrno) { + log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", + strerror(savederrno)); + errno = savederrno; + return -1; + } + + if (enabled) { + knet_h->enabled = enabled; + log_debug(knet_h, KNET_SUB_HANDLE, "Data forwarding is enabled"); + } else { + /* + * notify TX and RX threads to flush the queues + */ + if (set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSH) < 0) { + log_debug(knet_h, KNET_SUB_HANDLE, "Unable to request queue flushing for TX thread"); + } + if (set_thread_flush_queue(knet_h, KNET_THREAD_RX, KNET_THREAD_QUEUE_FLUSH) < 0) { + log_debug(knet_h, KNET_SUB_HANDLE, "Unable to request queue flushing for RX thread"); + } + } + + pthread_rwlock_unlock(&knet_h->global_rwlock); + + /* + * when disabling data forward, we need to give time to TX and RX + * to flush the queues. + * + * the TX thread is the main leader here. When there is no more + * data in the TX queue, we will also close traffic for RX. + */ + if (!enabled) { + /* + * this usleep might be unnecessary, but wait_all_threads_flush_queue + * adds extra locking delay. + * + * allow all threads to run free without extra locking interference + * and then we switch to a more active wait in case the scheduler + * has decided to delay one thread or another + */ + usleep(KNET_THREADS_TIMERES * 2); + wait_all_threads_flush_queue(knet_h); + + /* + * all threads have done flushing the queue, we can stop data forwarding + */ + savederrno = get_global_wrlock(knet_h); + if (savederrno) { + log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", + strerror(savederrno)); + errno = savederrno; + return -1; + } + knet_h->enabled = enabled; + log_debug(knet_h, KNET_SUB_HANDLE, "Data forwarding is disabled"); + pthread_rwlock_unlock(&knet_h->global_rwlock); + } + + errno = 0; + return 0; +} + +int knet_handle_get_stats(knet_handle_t knet_h, struct knet_handle_stats *stats, size_t struct_size) +{ + int err = 0, savederrno = 0; + + if (!knet_h) { + errno = EINVAL; + return -1; + } + + if (!stats) { + errno = EINVAL; + return -1; + } + + savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); + if (savederrno) { + log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", + strerror(savederrno)); + errno = savederrno; + return -1; + } + + savederrno = pthread_mutex_lock(&knet_h->handle_stats_mutex); + if (savederrno) { + log_err(knet_h, KNET_SUB_HANDLE, "Unable to get mutex lock: %s", + strerror(savederrno)); + err = -1; + goto out_unlock; + } + + if (struct_size > sizeof(struct knet_handle_stats)) { + struct_size = sizeof(struct knet_handle_stats); + } + + memmove(stats, &knet_h->stats, struct_size); + + /* + * TX crypt stats only count the data packets sent, so add in the ping/pong/pmtud figures + * RX is OK as it counts them before they are sorted. + */ + + stats->tx_crypt_packets += knet_h->stats_extra.tx_crypt_ping_packets + + knet_h->stats_extra.tx_crypt_pong_packets + + knet_h->stats_extra.tx_crypt_pmtu_packets + + knet_h->stats_extra.tx_crypt_pmtu_reply_packets; + + /* Tell the caller our full size in case they have an old version */ + stats->size = sizeof(struct knet_handle_stats); + +out_unlock: + pthread_mutex_unlock(&knet_h->handle_stats_mutex); + pthread_rwlock_unlock(&knet_h->global_rwlock); + return err; +} + +int knet_handle_clear_stats(knet_handle_t knet_h, int clear_option) +{ + int savederrno = 0; + + if (!knet_h) { + errno = EINVAL; + return -1; + } + + if (clear_option != KNET_CLEARSTATS_HANDLE_ONLY && + clear_option != KNET_CLEARSTATS_HANDLE_AND_LINK) { + errno = EINVAL; + return -1; + } + + savederrno = get_global_wrlock(knet_h); + if (savederrno) { + log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", + strerror(savederrno)); + errno = savederrno; + return -1; + } + + memset(&knet_h->stats, 0, sizeof(struct knet_handle_stats)); + memset(&knet_h->stats_extra, 0, sizeof(struct knet_handle_stats_extra)); + if (clear_option == KNET_CLEARSTATS_HANDLE_AND_LINK) { + _link_clear_stats(knet_h); + } + + pthread_rwlock_unlock(&knet_h->global_rwlock); + return 0; +} + +int knet_handle_enable_access_lists(knet_handle_t knet_h, unsigned int enabled) +{ + int savederrno = 0; + + if (!knet_h) { + errno = EINVAL; + return -1; + } + + if (enabled > 1) { + errno = EINVAL; + return -1; + } + + savederrno = get_global_wrlock(knet_h); + if (savederrno) { + log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", + strerror(savederrno)); + errno = savederrno; + return -1; + } + + knet_h->use_access_lists = enabled; + + if (enabled) { + log_debug(knet_h, KNET_SUB_HANDLE, "Links access lists are enabled"); + } else { + log_debug(knet_h, KNET_SUB_HANDLE, "Links access lists are disabled"); + } + + pthread_rwlock_unlock(&knet_h->global_rwlock); + + errno = 0; + return 0; +} diff --git a/libknet/threads_pmtud.c b/libknet/threads_pmtud.c index b4432898..d49827aa 100644 --- a/libknet/threads_pmtud.c +++ b/libknet/threads_pmtud.c @@ -1,642 +1,804 @@ /* * Copyright (C) 2015-2020 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under LGPL-2.0+ */ #include "config.h" #include #include #include #include #include "crypto.h" #include "links.h" #include "host.h" #include "logging.h" #include "transports.h" #include "threads_common.h" #include "threads_pmtud.h" static int _calculate_manual_mtu(knet_handle_t knet_h, struct knet_link *dst_link) { size_t ipproto_overhead_len; /* onwire packet overhead (protocol based) */ switch (dst_link->dst_addr.ss_family) { case AF_INET6: ipproto_overhead_len = KNET_PMTUD_OVERHEAD_V6 + dst_link->proto_overhead; break; case AF_INET: ipproto_overhead_len = KNET_PMTUD_OVERHEAD_V4 + dst_link->proto_overhead; break; default: log_debug(knet_h, KNET_SUB_PMTUD, "unknown protocol"); return 0; break; } dst_link->status.mtu = calc_max_data_outlen(knet_h, knet_h->manual_mtu - ipproto_overhead_len); return 1; } static int _handle_check_link_pmtud(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link) { int err, ret, savederrno, mutex_retry_limit, failsafe, use_kernel_mtu, warn_once; uint32_t kernel_mtu; /* record kernel_mtu from EMSGSIZE */ size_t onwire_len; /* current packet onwire size */ size_t ipproto_overhead_len; /* onwire packet overhead (protocol based) */ size_t max_mtu_len; /* max mtu for protocol */ size_t data_len; /* how much data we can send in the packet * generally would be onwire_len - ipproto_overhead_len * needs to be adjusted for crypto */ size_t app_mtu_len; /* real data that we can send onwire */ ssize_t len; /* len of what we were able to sendto onwire */ struct timespec ts, pmtud_crypto_start_ts, pmtud_crypto_stop_ts; unsigned long long pong_timeout_adj_tmp, timediff; int pmtud_crypto_reduce = 1; unsigned char *outbuf = (unsigned char *)knet_h->pmtudbuf; warn_once = 0; mutex_retry_limit = 0; failsafe = 0; knet_h->pmtudbuf->khp_pmtud_link = dst_link->link_id; switch (dst_link->dst_addr.ss_family) { case AF_INET6: max_mtu_len = KNET_PMTUD_SIZE_V6; ipproto_overhead_len = KNET_PMTUD_OVERHEAD_V6 + dst_link->proto_overhead; break; case AF_INET: max_mtu_len = KNET_PMTUD_SIZE_V4; ipproto_overhead_len = KNET_PMTUD_OVERHEAD_V4 + dst_link->proto_overhead; break; default: log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD aborted, unknown protocol"); return -1; break; } dst_link->last_bad_mtu = 0; dst_link->last_good_mtu = dst_link->last_ping_size + ipproto_overhead_len; /* * discovery starts from the top because kernel will * refuse to send packets > current iface mtu. * this saves us some time and network bw. */ onwire_len = max_mtu_len; restart: /* * prevent a race when interface mtu is changed _exactly_ during * the discovery process and it's complex to detect. Easier * to wait the next loop. * 30 is not an arbitrary value. To bisect from 576 to 128000 doesn't * take more than 18/19 steps. */ if (failsafe == 30) { log_err(knet_h, KNET_SUB_PMTUD, "Aborting PMTUD process: Too many attempts. MTU might have changed during discovery."); return -1; } else { failsafe++; } /* * common to all packets */ /* * calculate the application MTU based on current onwire_len minus ipproto_overhead_len */ app_mtu_len = calc_max_data_outlen(knet_h, onwire_len - ipproto_overhead_len); /* * recalculate onwire len back that might be different based * on data padding from crypto layer. */ onwire_len = calc_data_outlen(knet_h, app_mtu_len + KNET_HEADER_ALL_SIZE) + ipproto_overhead_len; /* * calculate the size of what we need to send to sendto(2). * see also onwire.c for packet format explanation. */ data_len = app_mtu_len + knet_h->sec_hash_size + knet_h->sec_salt_size + KNET_HEADER_ALL_SIZE; if (knet_h->crypto_in_use_config) { if (data_len < (knet_h->sec_hash_size + knet_h->sec_salt_size) + 1) { log_debug(knet_h, KNET_SUB_PMTUD, "Aborting PMTUD process: link mtu smaller than crypto header detected (link might have been disconnected)"); return -1; } knet_h->pmtudbuf->khp_pmtud_size = onwire_len; if (crypto_encrypt_and_sign(knet_h, (const unsigned char *)knet_h->pmtudbuf, data_len - (knet_h->sec_hash_size + knet_h->sec_salt_size), knet_h->pmtudbuf_crypt, (ssize_t *)&data_len) < 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to crypto pmtud packet"); return -1; } outbuf = knet_h->pmtudbuf_crypt; if (pthread_mutex_lock(&knet_h->handle_stats_mutex) < 0) { log_err(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock"); return -1; } knet_h->stats_extra.tx_crypt_pmtu_packets++; pthread_mutex_unlock(&knet_h->handle_stats_mutex); } else { knet_h->pmtudbuf->khp_pmtud_size = onwire_len; } /* link has gone down, aborting pmtud */ if (dst_link->status.connected != 1) { log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD detected host (%u) link (%u) has been disconnected", dst_host->host_id, dst_link->link_id); return -1; } if (dst_link->transport_connected != 1) { log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD detected host (%u) link (%u) has been disconnected", dst_host->host_id, dst_link->link_id); return -1; } if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock"); return -1; } if (knet_h->pmtud_abort) { pthread_mutex_unlock(&knet_h->pmtud_mutex); errno = EDEADLK; return -1; } savederrno = pthread_mutex_lock(&knet_h->tx_mutex); if (savederrno) { pthread_mutex_unlock(&knet_h->pmtud_mutex); log_err(knet_h, KNET_SUB_PMTUD, "Unable to get TX mutex lock: %s", strerror(savederrno)); return -1; } savederrno = pthread_mutex_lock(&dst_link->link_stats_mutex); if (savederrno) { pthread_mutex_unlock(&knet_h->pmtud_mutex); pthread_mutex_unlock(&knet_h->tx_mutex); log_err(knet_h, KNET_SUB_PMTUD, "Unable to get stats mutex lock for host %u link %u: %s", dst_host->host_id, dst_link->link_id, strerror(savederrno)); return -1; } retry: if (transport_get_connection_oriented(knet_h, dst_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) { len = sendto(dst_link->outsock, outbuf, data_len, MSG_DONTWAIT | MSG_NOSIGNAL, (struct sockaddr *) &dst_link->dst_addr, sizeof(struct sockaddr_storage)); } else { len = sendto(dst_link->outsock, outbuf, data_len, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0); } savederrno = errno; /* * we cannot hold a lock on kmtu_mutex between resetting * knet_h->kernel_mtu here and below where it's used. * use_kernel_mtu tells us if the knet_h->kernel_mtu was * set to 0 and we can trust its value later. */ use_kernel_mtu = 0; if (pthread_mutex_lock(&knet_h->kmtu_mutex) == 0) { use_kernel_mtu = 1; knet_h->kernel_mtu = 0; pthread_mutex_unlock(&knet_h->kmtu_mutex); } kernel_mtu = 0; err = transport_tx_sock_error(knet_h, dst_link->transport, dst_link->outsock, len, savederrno); switch(err) { case -1: /* unrecoverable error */ log_debug(knet_h, KNET_SUB_PMTUD, "Unable to send pmtu packet (sendto): %d %s", savederrno, strerror(savederrno)); pthread_mutex_unlock(&knet_h->tx_mutex); pthread_mutex_unlock(&knet_h->pmtud_mutex); dst_link->status.stats.tx_pmtu_errors++; pthread_mutex_unlock(&dst_link->link_stats_mutex); return -1; case 0: /* ignore error and continue */ break; case 1: /* retry to send those same data */ dst_link->status.stats.tx_pmtu_retries++; goto retry; break; } pthread_mutex_unlock(&knet_h->tx_mutex); if (len != (ssize_t )data_len) { pthread_mutex_unlock(&dst_link->link_stats_mutex); if (savederrno == EMSGSIZE) { /* * we cannot hold a lock on kmtu_mutex between resetting * knet_h->kernel_mtu and here. * use_kernel_mtu tells us if the knet_h->kernel_mtu was * set to 0 previously and we can trust its value now. */ if (use_kernel_mtu) { use_kernel_mtu = 0; if (pthread_mutex_lock(&knet_h->kmtu_mutex) == 0) { kernel_mtu = knet_h->kernel_mtu; pthread_mutex_unlock(&knet_h->kmtu_mutex); } } if (kernel_mtu > 0) { dst_link->last_bad_mtu = kernel_mtu + 1; } else { dst_link->last_bad_mtu = onwire_len; } } else { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to send pmtu packet len: %zu err: %s", onwire_len, strerror(savederrno)); } } else { dst_link->last_sent_mtu = onwire_len; dst_link->last_recv_mtu = 0; dst_link->status.stats.tx_pmtu_packets++; dst_link->status.stats.tx_pmtu_bytes += data_len; pthread_mutex_unlock(&dst_link->link_stats_mutex); if (clock_gettime(CLOCK_REALTIME, &ts) < 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get current time: %s", strerror(errno)); pthread_mutex_unlock(&knet_h->pmtud_mutex); return -1; } /* * non fatal, we can wait the next round to reduce the * multiplier */ if (clock_gettime(CLOCK_MONOTONIC, &pmtud_crypto_start_ts) < 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get current time: %s", strerror(errno)); pmtud_crypto_reduce = 0; } /* * set PMTUd reply timeout to match pong_timeout on a given link * * math: internally pong_timeout is expressed in microseconds, while * the public API exports milliseconds. So careful with the 0's here. * the loop is necessary because we are grabbing the current time just above * and add values to it that could overflow into seconds. */ if (pthread_mutex_lock(&knet_h->backoff_mutex)) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get backoff_mutex"); pthread_mutex_unlock(&knet_h->pmtud_mutex); return -1; } if (knet_h->crypto_in_use_config) { /* * crypto, under pressure, is a royal PITA */ pong_timeout_adj_tmp = dst_link->pong_timeout_adj * dst_link->pmtud_crypto_timeout_multiplier; } else { pong_timeout_adj_tmp = dst_link->pong_timeout_adj; } ts.tv_sec += pong_timeout_adj_tmp / 1000000; ts.tv_nsec += (((pong_timeout_adj_tmp) % 1000000) * 1000); while (ts.tv_nsec > 1000000000) { ts.tv_sec += 1; ts.tv_nsec -= 1000000000; } pthread_mutex_unlock(&knet_h->backoff_mutex); knet_h->pmtud_waiting = 1; ret = pthread_cond_timedwait(&knet_h->pmtud_cond, &knet_h->pmtud_mutex, &ts); knet_h->pmtud_waiting = 0; if (knet_h->pmtud_abort) { pthread_mutex_unlock(&knet_h->pmtud_mutex); errno = EDEADLK; return -1; } /* * we cannot use shutdown_in_progress in here because * we already hold the read lock */ if (knet_h->fini_in_progress) { pthread_mutex_unlock(&knet_h->pmtud_mutex); log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD aborted. shutdown in progress"); return -1; } if (ret) { if (ret == ETIMEDOUT) { if ((knet_h->crypto_in_use_config) && (dst_link->pmtud_crypto_timeout_multiplier < KNET_LINK_PMTUD_CRYPTO_TIMEOUT_MULTIPLIER_MAX)) { dst_link->pmtud_crypto_timeout_multiplier = dst_link->pmtud_crypto_timeout_multiplier * 2; pmtud_crypto_reduce = 0; log_debug(knet_h, KNET_SUB_PMTUD, "Increasing PMTUd response timeout multiplier to (%u) for host %u link: %u", dst_link->pmtud_crypto_timeout_multiplier, dst_host->host_id, dst_link->link_id); pthread_mutex_unlock(&knet_h->pmtud_mutex); goto restart; } if (!warn_once) { log_warn(knet_h, KNET_SUB_PMTUD, "possible MTU misconfiguration detected. " "kernel is reporting MTU: %u bytes for " "host %u link %u but the other node is " "not acknowledging packets of this size. ", dst_link->last_sent_mtu, dst_host->host_id, dst_link->link_id); log_warn(knet_h, KNET_SUB_PMTUD, "This can be caused by this node interface MTU " "too big or a network device that does not " "support or has been misconfigured to manage MTU " "of this size, or packet loss. knet will continue " "to run but performances might be affected."); warn_once = 1; } } else { pthread_mutex_unlock(&knet_h->pmtud_mutex); if (mutex_retry_limit == 3) { log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD aborted, unable to get mutex lock"); return -1; } mutex_retry_limit++; goto restart; } } if ((knet_h->crypto_in_use_config) && (pmtud_crypto_reduce == 1) && (dst_link->pmtud_crypto_timeout_multiplier > KNET_LINK_PMTUD_CRYPTO_TIMEOUT_MULTIPLIER_MIN)) { if (!clock_gettime(CLOCK_MONOTONIC, &pmtud_crypto_stop_ts)) { timespec_diff(pmtud_crypto_start_ts, pmtud_crypto_stop_ts, &timediff); if (((pong_timeout_adj_tmp * 1000) / 2) > timediff) { dst_link->pmtud_crypto_timeout_multiplier = dst_link->pmtud_crypto_timeout_multiplier / 2; log_debug(knet_h, KNET_SUB_PMTUD, "Decreasing PMTUd response timeout multiplier to (%u) for host %u link: %u", dst_link->pmtud_crypto_timeout_multiplier, dst_host->host_id, dst_link->link_id); } } else { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get current time: %s", strerror(errno)); } } if ((dst_link->last_recv_mtu != onwire_len) || (ret)) { dst_link->last_bad_mtu = onwire_len; } else { int found_mtu = 0; if (knet_h->sec_block_size) { if ((onwire_len + knet_h->sec_block_size >= max_mtu_len) || ((dst_link->last_bad_mtu) && (dst_link->last_bad_mtu <= (onwire_len + knet_h->sec_block_size)))) { found_mtu = 1; } } else { if ((onwire_len == max_mtu_len) || ((dst_link->last_bad_mtu) && (dst_link->last_bad_mtu == (onwire_len + 1))) || (dst_link->last_bad_mtu == dst_link->last_good_mtu)) { found_mtu = 1; } } if (found_mtu) { /* * account for IP overhead, knet headers and crypto in PMTU calculation */ dst_link->status.mtu = calc_max_data_outlen(knet_h, onwire_len - ipproto_overhead_len); pthread_mutex_unlock(&knet_h->pmtud_mutex); return 0; } dst_link->last_good_mtu = onwire_len; } } if (kernel_mtu) { onwire_len = kernel_mtu; } else { onwire_len = (dst_link->last_good_mtu + dst_link->last_bad_mtu) / 2; } pthread_mutex_unlock(&knet_h->pmtud_mutex); goto restart; } static int _handle_check_pmtud(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link, int force_run) { uint8_t saved_valid_pmtud; unsigned int saved_pmtud; struct timespec clock_now; unsigned long long diff_pmtud, interval; if (clock_gettime(CLOCK_MONOTONIC, &clock_now) != 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get monotonic clock"); return 0; } if (!force_run) { interval = knet_h->pmtud_interval * 1000000000llu; /* nanoseconds */ timespec_diff(dst_link->pmtud_last, clock_now, &diff_pmtud); if (diff_pmtud < interval) { return dst_link->has_valid_mtu; } } /* * status.proto_overhead should include all IP/(UDP|SCTP)/knet headers * * please note that it is not the same as link->proto_overhead that * includes only either UDP or SCTP (at the moment) overhead. */ switch (dst_link->dst_addr.ss_family) { case AF_INET6: dst_link->status.proto_overhead = KNET_PMTUD_OVERHEAD_V6 + dst_link->proto_overhead + KNET_HEADER_ALL_SIZE + knet_h->sec_hash_size + knet_h->sec_salt_size; break; case AF_INET: dst_link->status.proto_overhead = KNET_PMTUD_OVERHEAD_V4 + dst_link->proto_overhead + KNET_HEADER_ALL_SIZE + knet_h->sec_hash_size + knet_h->sec_salt_size; break; } saved_pmtud = dst_link->status.mtu; saved_valid_pmtud = dst_link->has_valid_mtu; log_debug(knet_h, KNET_SUB_PMTUD, "Starting PMTUD for host: %u link: %u", dst_host->host_id, dst_link->link_id); errno = 0; if (_handle_check_link_pmtud(knet_h, dst_host, dst_link) < 0) { if (errno == EDEADLK) { log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD for host: %u link: %u has been rescheduled", dst_host->host_id, dst_link->link_id); dst_link->status.mtu = saved_pmtud; dst_link->has_valid_mtu = saved_valid_pmtud; errno = EDEADLK; return dst_link->has_valid_mtu; } dst_link->has_valid_mtu = 0; } else { if (dst_link->status.mtu < calc_min_mtu(knet_h)) { log_info(knet_h, KNET_SUB_PMTUD, "Invalid MTU detected for host: %u link: %u mtu: %u", dst_host->host_id, dst_link->link_id, dst_link->status.mtu); dst_link->has_valid_mtu = 0; } else { dst_link->has_valid_mtu = 1; } if (dst_link->has_valid_mtu) { if ((saved_pmtud) && (saved_pmtud != dst_link->status.mtu)) { log_info(knet_h, KNET_SUB_PMTUD, "PMTUD link change for host: %u link: %u from %u to %u", dst_host->host_id, dst_link->link_id, saved_pmtud, dst_link->status.mtu); } log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD completed for host: %u link: %u current link mtu: %u", dst_host->host_id, dst_link->link_id, dst_link->status.mtu); /* * set pmtud_last, if we can, after we are done with the PMTUd process * because it can take a very long time. */ dst_link->pmtud_last = clock_now; if (!clock_gettime(CLOCK_MONOTONIC, &clock_now)) { dst_link->pmtud_last = clock_now; } } } if (saved_valid_pmtud != dst_link->has_valid_mtu) { _host_dstcache_update_async(knet_h, dst_host); } return dst_link->has_valid_mtu; } void *_handle_pmtud_link_thread(void *data) { knet_handle_t knet_h = (knet_handle_t) data; struct knet_host *dst_host; struct knet_link *dst_link; int link_idx; unsigned int have_mtu; unsigned int lower_mtu; int link_has_mtu; int force_run = 0; set_thread_status(knet_h, KNET_THREAD_PMTUD, KNET_THREAD_STARTED); knet_h->data_mtu = calc_min_mtu(knet_h); /* preparing pmtu buffer */ knet_h->pmtudbuf->kh_version = KNET_HEADER_VERSION; knet_h->pmtudbuf->kh_type = KNET_HEADER_TYPE_PMTUD; knet_h->pmtudbuf->kh_node = htons(knet_h->host_id); while (!shutdown_in_progress(knet_h)) { usleep(KNET_THREADS_TIMERES); if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock"); continue; } knet_h->pmtud_abort = 0; knet_h->pmtud_running = 1; force_run = knet_h->pmtud_forcerun; knet_h->pmtud_forcerun = 0; pthread_mutex_unlock(&knet_h->pmtud_mutex); if (force_run) { log_debug(knet_h, KNET_SUB_PMTUD, "PMTUd request to rerun has been received"); } if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get read lock"); continue; } lower_mtu = KNET_PMTUD_SIZE_V4; have_mtu = 0; for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { dst_link = &dst_host->link[link_idx]; if ((dst_link->status.enabled != 1) || (dst_link->status.connected != 1) || (dst_host->link[link_idx].transport == KNET_TRANSPORT_LOOPBACK) || (!dst_link->last_ping_size) || ((dst_link->dynamic == KNET_LINK_DYNIP) && (dst_link->status.dynconnected != 1))) continue; if (!knet_h->manual_mtu) { link_has_mtu = _handle_check_pmtud(knet_h, dst_host, dst_link, force_run); if (errno == EDEADLK) { goto out_unlock; } if (link_has_mtu) { have_mtu = 1; if (dst_link->status.mtu < lower_mtu) { lower_mtu = dst_link->status.mtu; } } } else { link_has_mtu = _calculate_manual_mtu(knet_h, dst_link); if (link_has_mtu) { have_mtu = 1; if (dst_link->status.mtu < lower_mtu) { lower_mtu = dst_link->status.mtu; } } } } } if (have_mtu) { if (knet_h->data_mtu != lower_mtu) { knet_h->data_mtu = lower_mtu; log_info(knet_h, KNET_SUB_PMTUD, "Global data MTU changed to: %u", knet_h->data_mtu); if (knet_h->pmtud_notify_fn) { knet_h->pmtud_notify_fn(knet_h->pmtud_notify_fn_private_data, knet_h->data_mtu); } } } out_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock"); } else { knet_h->pmtud_running = 0; pthread_mutex_unlock(&knet_h->pmtud_mutex); } } set_thread_status(knet_h, KNET_THREAD_PMTUD, KNET_THREAD_STOPPED); return NULL; } + +int knet_handle_pmtud_getfreq(knet_handle_t knet_h, unsigned int *interval) +{ + int savederrno = 0; + + if (!knet_h) { + errno = EINVAL; + return -1; + } + + if (!interval) { + errno = EINVAL; + return -1; + } + + savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); + if (savederrno) { + log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", + strerror(savederrno)); + errno = savederrno; + return -1; + } + + *interval = knet_h->pmtud_interval; + + pthread_rwlock_unlock(&knet_h->global_rwlock); + + errno = 0; + return 0; +} + +int knet_handle_pmtud_setfreq(knet_handle_t knet_h, unsigned int interval) +{ + int savederrno = 0; + + if (!knet_h) { + errno = EINVAL; + return -1; + } + + if ((!interval) || (interval > 86400)) { + errno = EINVAL; + return -1; + } + + savederrno = get_global_wrlock(knet_h); + if (savederrno) { + log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", + strerror(savederrno)); + errno = savederrno; + return -1; + } + + knet_h->pmtud_interval = interval; + log_debug(knet_h, KNET_SUB_HANDLE, "PMTUd interval set to: %u seconds", interval); + + pthread_rwlock_unlock(&knet_h->global_rwlock); + + errno = 0; + return 0; +} + +int knet_handle_enable_pmtud_notify(knet_handle_t knet_h, + void *pmtud_notify_fn_private_data, + void (*pmtud_notify_fn) ( + void *private_data, + unsigned int data_mtu)) +{ + int savederrno = 0; + + if (!knet_h) { + errno = EINVAL; + return -1; + } + + savederrno = get_global_wrlock(knet_h); + if (savederrno) { + log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", + strerror(savederrno)); + errno = savederrno; + return -1; + } + + knet_h->pmtud_notify_fn_private_data = pmtud_notify_fn_private_data; + knet_h->pmtud_notify_fn = pmtud_notify_fn; + if (knet_h->pmtud_notify_fn) { + log_debug(knet_h, KNET_SUB_HANDLE, "pmtud_notify_fn enabled"); + } else { + log_debug(knet_h, KNET_SUB_HANDLE, "pmtud_notify_fn disabled"); + } + + pthread_rwlock_unlock(&knet_h->global_rwlock); + + errno = 0; + return 0; +} + +int knet_handle_pmtud_set(knet_handle_t knet_h, + unsigned int iface_mtu) +{ + int savederrno = 0; + + if (!knet_h) { + errno = EINVAL; + return -1; + } + + if (iface_mtu > KNET_PMTUD_SIZE_V4) { + errno = EINVAL; + return -1; + } + + savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); + if (savederrno) { + log_err(knet_h, KNET_SUB_PMTUD, "Unable to get read lock: %s", + strerror(savederrno)); + errno = savederrno; + return -1; + } + + log_info(knet_h, KNET_SUB_PMTUD, "MTU manually set to: %u", iface_mtu); + + knet_h->manual_mtu = iface_mtu; + + force_pmtud_run(knet_h, KNET_SUB_PMTUD, 0); + + pthread_rwlock_unlock(&knet_h->global_rwlock); + + errno = 0; + return 0; +} + +int knet_handle_pmtud_get(knet_handle_t knet_h, + unsigned int *data_mtu) +{ + int savederrno = 0; + + if (!knet_h) { + errno = EINVAL; + return -1; + } + + if (!data_mtu) { + errno = EINVAL; + return -1; + } + + savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); + if (savederrno) { + log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", + strerror(savederrno)); + errno = savederrno; + return -1; + } + + *data_mtu = knet_h->data_mtu; + + pthread_rwlock_unlock(&knet_h->global_rwlock); + + errno = 0; + return 0; +} diff --git a/libknet/threads_rx.c b/libknet/threads_rx.c index bff7dbad..61e2862d 100644 --- a/libknet/threads_rx.c +++ b/libknet/threads_rx.c @@ -1,1001 +1,1064 @@ /* * Copyright (C) 2012-2020 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include "compat.h" #include "compress.h" #include "crypto.h" #include "host.h" #include "links.h" #include "links_acl.h" #include "logging.h" #include "transports.h" #include "transport_common.h" #include "threads_common.h" #include "threads_heartbeat.h" #include "threads_rx.h" #include "netutils.h" /* * RECV */ /* * return 1 if a > b * return -1 if b > a * return 0 if they are equal */ static inline int timecmp(struct timespec a, struct timespec b) { if (a.tv_sec != b.tv_sec) { if (a.tv_sec > b.tv_sec) { return 1; } else { return -1; } } else { if (a.tv_nsec > b.tv_nsec) { return 1; } else if (a.tv_nsec < b.tv_nsec) { return -1; } else { return 0; } } } /* * this functions needs to return an index (0 to 7) * to a knet_host_defrag_buf. (-1 on errors) */ static int find_pckt_defrag_buf(knet_handle_t knet_h, struct knet_header *inbuf) { struct knet_host *src_host = knet_h->host_index[inbuf->kh_node]; int i, oldest; /* * check if there is a buffer already in use handling the same seq_num */ for (i = 0; i < KNET_MAX_LINK; i++) { if (src_host->defrag_buf[i].in_use) { if (src_host->defrag_buf[i].pckt_seq == inbuf->khp_data_seq_num) { return i; } } } /* * If there is no buffer that's handling the current seq_num * either it's new or it's been reclaimed already. * check if it's been reclaimed/seen before using the defrag circular * buffer. If the pckt has been seen before, the buffer expired (ETIME) * and there is no point to try to defrag it again. */ if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 1, 0)) { errno = ETIME; return -1; } /* * register the pckt as seen */ _seq_num_set(src_host, inbuf->khp_data_seq_num, 1); /* * see if there is a free buffer */ for (i = 0; i < KNET_MAX_LINK; i++) { if (!src_host->defrag_buf[i].in_use) { return i; } } /* * at this point, there are no free buffers, the pckt is new * and we need to reclaim a buffer, and we will take the one * with the oldest timestamp. It's as good as any. */ oldest = 0; for (i = 0; i < KNET_MAX_LINK; i++) { if (timecmp(src_host->defrag_buf[i].last_update, src_host->defrag_buf[oldest].last_update) < 0) { oldest = i; } } src_host->defrag_buf[oldest].in_use = 0; return oldest; } static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *len) { struct knet_host_defrag_buf *defrag_buf; int defrag_buf_idx; defrag_buf_idx = find_pckt_defrag_buf(knet_h, inbuf); if (defrag_buf_idx < 0) { return 1; } defrag_buf = &knet_h->host_index[inbuf->kh_node]->defrag_buf[defrag_buf_idx]; /* * if the buf is not is use, then make sure it's clean */ if (!defrag_buf->in_use) { memset(defrag_buf, 0, sizeof(struct knet_host_defrag_buf)); defrag_buf->in_use = 1; defrag_buf->pckt_seq = inbuf->khp_data_seq_num; } /* * update timestamp on the buffer */ clock_gettime(CLOCK_MONOTONIC, &defrag_buf->last_update); /* * check if we already received this fragment */ if (defrag_buf->frag_map[inbuf->khp_data_frag_seq]) { /* * if we have received this fragment and we didn't clear the buffer * it means that we don't have all fragments yet */ return 1; } /* * we need to handle the last packet with gloves due to its different size */ if (inbuf->khp_data_frag_seq == inbuf->khp_data_frag_num) { defrag_buf->last_frag_size = *len; /* * in the event when the last packet arrives first, * we still don't know the offset vs the other fragments (based on MTU), * so we store the fragment at the end of the buffer where it's safe * and take a copy of the len so that we can restore its offset later. * remember we can't use the local MTU for this calculation because pMTU * can be asymettric between the same hosts. */ if (!defrag_buf->frag_size) { defrag_buf->last_first = 1; memmove(defrag_buf->buf + (KNET_MAX_PACKET_SIZE - *len), inbuf->khp_data_userdata, *len); } } else { defrag_buf->frag_size = *len; } if (defrag_buf->frag_size) { memmove(defrag_buf->buf + ((inbuf->khp_data_frag_seq - 1) * defrag_buf->frag_size), inbuf->khp_data_userdata, *len); } defrag_buf->frag_recv++; defrag_buf->frag_map[inbuf->khp_data_frag_seq] = 1; /* * check if we received all the fragments */ if (defrag_buf->frag_recv == inbuf->khp_data_frag_num) { /* * special case the last pckt */ if (defrag_buf->last_first) { memmove(defrag_buf->buf + ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size), defrag_buf->buf + (KNET_MAX_PACKET_SIZE - defrag_buf->last_frag_size), defrag_buf->last_frag_size); } /* * recalculate packet lenght */ *len = ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size) + defrag_buf->last_frag_size; /* * copy the pckt back in the user data */ memmove(inbuf->khp_data_userdata, defrag_buf->buf, *len); /* * free this buffer */ defrag_buf->in_use = 0; return 0; } return 1; } static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struct knet_mmsghdr *msg) { int err = 0, savederrno = 0, stats_err = 0; ssize_t outlen; struct knet_host *src_host; struct knet_link *src_link; unsigned long long latency_last; knet_node_id_t dst_host_ids[KNET_MAX_HOST]; size_t dst_host_ids_entries = 0; int bcast = 1; uint64_t decrypt_time = 0; struct timespec recvtime; struct knet_header *inbuf = msg->msg_hdr.msg_iov->iov_base; unsigned char *outbuf = (unsigned char *)msg->msg_hdr.msg_iov->iov_base; ssize_t len = msg->msg_len; struct knet_hostinfo *knet_hostinfo; struct iovec iov_out[1]; int8_t channel; struct sockaddr_storage pckt_src; seq_num_t recv_seq_num; int wipe_bufs = 0; int try_decrypt = 0, decrypted = 0, i; for (i = 1; i <= KNET_MAX_CRYPTO_INSTANCES; i++) { if (knet_h->crypto_instance[i]) { try_decrypt = 1; break; } } if ((!try_decrypt) && (knet_h->crypto_only == KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC)) { log_debug(knet_h, KNET_SUB_RX, "RX thread configured to accept only crypto packets, but no crypto configs are configured!"); return; } if (try_decrypt) { struct timespec start_time; struct timespec end_time; clock_gettime(CLOCK_MONOTONIC, &start_time); if (crypto_authenticate_and_decrypt(knet_h, (unsigned char *)inbuf, len, knet_h->recv_from_links_buf_decrypt, &outlen) < 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to decrypt/auth packet"); if (knet_h->crypto_only == KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC) { return; } log_debug(knet_h, KNET_SUB_RX, "Attempting to process packet as clear data"); } else { clock_gettime(CLOCK_MONOTONIC, &end_time); timespec_diff(start_time, end_time, &decrypt_time); len = outlen; inbuf = (struct knet_header *)knet_h->recv_from_links_buf_decrypt; decrypted = 1; } } if (len < (ssize_t)(KNET_HEADER_SIZE + 1)) { log_debug(knet_h, KNET_SUB_RX, "Packet is too short: %ld", (long)len); return; } if (inbuf->kh_version != KNET_HEADER_VERSION) { log_debug(knet_h, KNET_SUB_RX, "Packet version does not match"); return; } inbuf->kh_node = ntohs(inbuf->kh_node); src_host = knet_h->host_index[inbuf->kh_node]; if (src_host == NULL) { /* host not found */ log_debug(knet_h, KNET_SUB_RX, "Unable to find source host for this packet"); return; } src_link = src_host->link + (inbuf->khp_ping_link % KNET_MAX_LINK); if ((inbuf->kh_type & KNET_HEADER_TYPE_PMSK) != 0) { if (src_link->dynamic == KNET_LINK_DYNIP) { /* * cpyaddrport will only copy address and port of the incoming * packet and strip extra bits such as flow and scopeid */ cpyaddrport(&pckt_src, msg->msg_hdr.msg_name); if (cmpaddr(&src_link->dst_addr, sockaddr_len(&src_link->dst_addr), &pckt_src, sockaddr_len(&pckt_src)) != 0) { log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u appears to have changed ip address", src_host->host_id, src_link->link_id); memmove(&src_link->dst_addr, &pckt_src, sizeof(struct sockaddr_storage)); if (knet_addrtostr(&src_link->dst_addr, sockaddr_len(msg->msg_hdr.msg_name), src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN, src_link->status.dst_port, KNET_MAX_PORT_LEN) != 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to resolve ???"); snprintf(src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN - 1, "Unknown!!!"); snprintf(src_link->status.dst_port, KNET_MAX_PORT_LEN - 1, "??"); } else { log_info(knet_h, KNET_SUB_RX, "host: %u link: %u new connection established from: %s %s", src_host->host_id, src_link->link_id, src_link->status.dst_ipaddr, src_link->status.dst_port); } } /* * transport has already accepted the connection here * otherwise we would not be receiving packets */ transport_link_dyn_connect(knet_h, sockfd, src_link); } } stats_err = pthread_mutex_lock(&src_link->link_stats_mutex); if (stats_err) { log_err(knet_h, KNET_SUB_RX, "Unable to get stats mutex lock for host %u link %u: %s", src_host->host_id, src_link->link_id, strerror(savederrno)); return; } switch (inbuf->kh_type) { case KNET_HEADER_TYPE_HOST_INFO: case KNET_HEADER_TYPE_DATA: if (!src_host->status.reachable) { pthread_mutex_unlock(&src_link->link_stats_mutex); log_debug(knet_h, KNET_SUB_RX, "Source host %u not reachable yet. Discarding packet.", src_host->host_id); return; } inbuf->khp_data_seq_num = ntohs(inbuf->khp_data_seq_num); channel = inbuf->khp_data_channel; src_host->got_data = 1; src_link->status.stats.rx_data_packets++; src_link->status.stats.rx_data_bytes += len; if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) { pthread_mutex_unlock(&src_link->link_stats_mutex); if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) { log_debug(knet_h, KNET_SUB_RX, "Packet has already been delivered"); } return; } if (inbuf->khp_data_frag_num > 1) { /* * len as received from the socket also includes extra stuff * that the defrag code doesn't care about. So strip it * here and readd only for repadding once we are done * defragging */ len = len - KNET_HEADER_DATA_SIZE; if (pckt_defrag(knet_h, inbuf, &len)) { pthread_mutex_unlock(&src_link->link_stats_mutex); return; } len = len + KNET_HEADER_DATA_SIZE; } if (inbuf->khp_data_compress) { ssize_t decmp_outlen = KNET_DATABUFSIZE_COMPRESS; struct timespec start_time; struct timespec end_time; uint64_t compress_time; clock_gettime(CLOCK_MONOTONIC, &start_time); err = decompress(knet_h, inbuf->khp_data_compress, (const unsigned char *)inbuf->khp_data_userdata, len - KNET_HEADER_DATA_SIZE, knet_h->recv_from_links_buf_decompress, &decmp_outlen); stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); if (stats_err < 0) { pthread_mutex_unlock(&src_link->link_stats_mutex); log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err)); return; } clock_gettime(CLOCK_MONOTONIC, &end_time); timespec_diff(start_time, end_time, &compress_time); if (!err) { /* Collect stats */ if (compress_time < knet_h->stats.rx_compress_time_min) { knet_h->stats.rx_compress_time_min = compress_time; } if (compress_time > knet_h->stats.rx_compress_time_max) { knet_h->stats.rx_compress_time_max = compress_time; } knet_h->stats.rx_compress_time_ave = (knet_h->stats.rx_compress_time_ave * knet_h->stats.rx_compressed_packets + compress_time) / (knet_h->stats.rx_compressed_packets+1); knet_h->stats.rx_compressed_packets++; knet_h->stats.rx_compressed_original_bytes += decmp_outlen; knet_h->stats.rx_compressed_size_bytes += len - KNET_HEADER_SIZE; memmove(inbuf->khp_data_userdata, knet_h->recv_from_links_buf_decompress, decmp_outlen); len = decmp_outlen + KNET_HEADER_DATA_SIZE; } else { pthread_mutex_unlock(&knet_h->handle_stats_mutex); pthread_mutex_unlock(&src_link->link_stats_mutex); log_warn(knet_h, KNET_SUB_COMPRESS, "Unable to decompress packet (%d): %s", err, strerror(errno)); return; } pthread_mutex_unlock(&knet_h->handle_stats_mutex); } if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) { if (decrypted) { stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); if (stats_err < 0) { pthread_mutex_unlock(&src_link->link_stats_mutex); log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err)); return; } /* Only update the crypto overhead for data packets. Mainly to be consistent with TX */ if (decrypt_time < knet_h->stats.rx_crypt_time_min) { knet_h->stats.rx_crypt_time_min = decrypt_time; } if (decrypt_time > knet_h->stats.rx_crypt_time_max) { knet_h->stats.rx_crypt_time_max = decrypt_time; } knet_h->stats.rx_crypt_time_ave = (knet_h->stats.rx_crypt_time_ave * knet_h->stats.rx_crypt_packets + decrypt_time) / (knet_h->stats.rx_crypt_packets+1); knet_h->stats.rx_crypt_packets++; pthread_mutex_unlock(&knet_h->handle_stats_mutex); } if (knet_h->enabled != 1) /* data forward is disabled */ break; if (knet_h->dst_host_filter_fn) { size_t host_idx; int found = 0; bcast = knet_h->dst_host_filter_fn( knet_h->dst_host_filter_fn_private_data, (const unsigned char *)inbuf->khp_data_userdata, len - KNET_HEADER_DATA_SIZE, KNET_NOTIFY_RX, knet_h->host_id, inbuf->kh_node, &channel, dst_host_ids, &dst_host_ids_entries); if (bcast < 0) { pthread_mutex_unlock(&src_link->link_stats_mutex); log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast); return; } if ((!bcast) && (!dst_host_ids_entries)) { pthread_mutex_unlock(&src_link->link_stats_mutex); log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries"); return; } /* check if we are dst for this packet */ if (!bcast) { if (dst_host_ids_entries > KNET_MAX_HOST) { pthread_mutex_unlock(&src_link->link_stats_mutex); log_debug(knet_h, KNET_SUB_RX, "dst_host_filter_fn returned too many destinations"); return; } for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) { if (dst_host_ids[host_idx] == knet_h->host_id) { found = 1; break; } } if (!found) { pthread_mutex_unlock(&src_link->link_stats_mutex); log_debug(knet_h, KNET_SUB_RX, "Packet is not for us"); return; } } } } if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) { if (!knet_h->sockfd[channel].in_use) { pthread_mutex_unlock(&src_link->link_stats_mutex); log_debug(knet_h, KNET_SUB_RX, "received packet for channel %d but there is no local sock connected", channel); return; } outlen = 0; memset(iov_out, 0, sizeof(iov_out)); retry: iov_out[0].iov_base = (void *) inbuf->khp_data_userdata + outlen; iov_out[0].iov_len = len - (outlen + KNET_HEADER_DATA_SIZE); outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1); if ((outlen > 0) && (outlen < (ssize_t)iov_out[0].iov_len)) { log_debug(knet_h, KNET_SUB_RX, "Unable to send all data to the application in one go. Expected: %zu Sent: %zd\n", iov_out[0].iov_len, outlen); goto retry; } if (outlen <= 0) { knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data, knet_h->sockfd[channel].sockfd[0], channel, KNET_NOTIFY_RX, outlen, errno); pthread_mutex_unlock(&src_link->link_stats_mutex); return; } if ((size_t)outlen == iov_out[0].iov_len) { _seq_num_set(src_host, inbuf->khp_data_seq_num, 0); } } else { /* HOSTINFO */ knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata; if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) { knet_hostinfo->khi_dst_node_id = ntohs(knet_hostinfo->khi_dst_node_id); } if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) { pthread_mutex_unlock(&src_link->link_stats_mutex); return; } _seq_num_set(src_host, inbuf->khp_data_seq_num, 0); switch(knet_hostinfo->khi_type) { case KNET_HOSTINFO_TYPE_LINK_UP_DOWN: break; case KNET_HOSTINFO_TYPE_LINK_TABLE: break; default: log_warn(knet_h, KNET_SUB_RX, "Receiving unknown host info message from host %u", src_host->host_id); break; } } break; case KNET_HEADER_TYPE_PING: outlen = KNET_HEADER_PING_SIZE; inbuf->kh_type = KNET_HEADER_TYPE_PONG; inbuf->kh_node = htons(knet_h->host_id); recv_seq_num = ntohs(inbuf->khp_ping_seq_num); src_link->status.stats.rx_ping_packets++; src_link->status.stats.rx_ping_bytes += len; wipe_bufs = 0; if (!inbuf->khp_ping_timed) { /* * we might be receiving this message from all links, but we want * to process it only the first time */ if (recv_seq_num != src_host->untimed_rx_seq_num) { /* * cache the untimed seq num */ src_host->untimed_rx_seq_num = recv_seq_num; /* * if the host has received data in between * untimed ping, then we don't need to wipe the bufs */ if (src_host->got_data) { src_host->got_data = 0; wipe_bufs = 0; } else { wipe_bufs = 1; } } _seq_num_lookup(src_host, recv_seq_num, 0, wipe_bufs); } else { /* * pings always arrives in bursts over all the link * catch the first of them to cache the seq num and * avoid duplicate processing */ if (recv_seq_num != src_host->timed_rx_seq_num) { src_host->timed_rx_seq_num = recv_seq_num; if (recv_seq_num == 0) { _seq_num_lookup(src_host, recv_seq_num, 0, 1); } } } if (knet_h->crypto_in_use_config) { if (crypto_encrypt_and_sign(knet_h, (const unsigned char *)inbuf, outlen, knet_h->recv_from_links_buf_crypt, &outlen) < 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt pong packet"); break; } outbuf = knet_h->recv_from_links_buf_crypt; stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); if (stats_err < 0) { log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err)); break; } knet_h->stats_extra.tx_crypt_pong_packets++; pthread_mutex_unlock(&knet_h->handle_stats_mutex); } retry_pong: if (src_link->transport_connected) { if (transport_get_connection_oriented(knet_h, src_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) { len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, (struct sockaddr *) &src_link->dst_addr, sizeof(struct sockaddr_storage)); } else { len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0); } savederrno = errno; if (len != outlen) { err = transport_tx_sock_error(knet_h, src_link->transport, src_link->outsock, len, savederrno); switch(err) { case -1: /* unrecoverable error */ log_debug(knet_h, KNET_SUB_RX, "Unable to send pong reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s", src_link->outsock, errno, strerror(errno), src_link->status.src_ipaddr, src_link->status.src_port, src_link->status.dst_ipaddr, src_link->status.dst_port); src_link->status.stats.tx_pong_errors++; break; case 0: /* ignore error and continue */ break; case 1: /* retry to send those same data */ src_link->status.stats.tx_pong_retries++; goto retry_pong; break; } } src_link->status.stats.tx_pong_packets++; src_link->status.stats.tx_pong_bytes += outlen; } break; case KNET_HEADER_TYPE_PONG: src_link->status.stats.rx_pong_packets++; src_link->status.stats.rx_pong_bytes += len; clock_gettime(CLOCK_MONOTONIC, &src_link->status.pong_last); memmove(&recvtime, &inbuf->khp_ping_time[0], sizeof(struct timespec)); timespec_diff(recvtime, src_link->status.pong_last, &latency_last); if ((latency_last / 1000llu) > src_link->pong_timeout) { log_debug(knet_h, KNET_SUB_RX, "Incoming pong packet from host: %u link: %u has higher latency than pong_timeout. Discarding", src_host->host_id, src_link->link_id); } else { /* * in words : ('previous mean' * '(count -1)') + 'new value') / 'count' */ src_link->latency_cur_samples++; /* * limit to max_samples (precision) */ if (src_link->latency_cur_samples >= src_link->latency_max_samples) { src_link->latency_cur_samples = src_link->latency_max_samples; } src_link->status.latency = (((src_link->status.latency * (src_link->latency_cur_samples - 1)) + (latency_last / 1000llu)) / src_link->latency_cur_samples); if (src_link->status.latency < src_link->pong_timeout_adj) { if (!src_link->status.connected) { if (src_link->received_pong >= src_link->pong_count) { log_info(knet_h, KNET_SUB_RX, "host: %u link: %u is up", src_host->host_id, src_link->link_id); _link_updown(knet_h, src_host->host_id, src_link->link_id, src_link->status.enabled, 1, 0); } else { src_link->received_pong++; log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u received pong: %u", src_host->host_id, src_link->link_id, src_link->received_pong); } } } /* Calculate latency stats */ if (src_link->status.latency > src_link->status.stats.latency_max) { src_link->status.stats.latency_max = src_link->status.latency; } if (src_link->status.latency < src_link->status.stats.latency_min) { src_link->status.stats.latency_min = src_link->status.latency; } /* * those 2 lines below make all latency average calculations consistent and capped to * link precision. In future we will kill the one above to keep only this one in * the stats structure, but for now we leave it around to avoid API/ABI * breakage as we backport the fixes to stable */ src_link->status.stats.latency_ave = src_link->status.latency; src_link->status.stats.latency_samples = src_link->latency_cur_samples; } break; case KNET_HEADER_TYPE_PMTUD: src_link->status.stats.rx_pmtu_packets++; src_link->status.stats.rx_pmtu_bytes += len; outlen = KNET_HEADER_PMTUD_SIZE; inbuf->kh_type = KNET_HEADER_TYPE_PMTUD_REPLY; inbuf->kh_node = htons(knet_h->host_id); if (knet_h->crypto_in_use_config) { if (crypto_encrypt_and_sign(knet_h, (const unsigned char *)inbuf, outlen, knet_h->recv_from_links_buf_crypt, &outlen) < 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt PMTUd reply packet"); break; } outbuf = knet_h->recv_from_links_buf_crypt; stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); if (stats_err < 0) { log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err)); break; } knet_h->stats_extra.tx_crypt_pmtu_reply_packets++; pthread_mutex_unlock(&knet_h->handle_stats_mutex); } /* Unlock so we don't deadlock with tx_mutex */ pthread_mutex_unlock(&src_link->link_stats_mutex); savederrno = pthread_mutex_lock(&knet_h->tx_mutex); if (savederrno) { log_err(knet_h, KNET_SUB_RX, "Unable to get TX mutex lock: %s", strerror(savederrno)); goto out_pmtud; } retry_pmtud: if (src_link->transport_connected) { if (transport_get_connection_oriented(knet_h, src_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) { len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, (struct sockaddr *) &src_link->dst_addr, sizeof(struct sockaddr_storage)); } else { len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0); } savederrno = errno; if (len != outlen) { err = transport_tx_sock_error(knet_h, src_link->transport, src_link->outsock, len, savederrno); stats_err = pthread_mutex_lock(&src_link->link_stats_mutex); if (stats_err < 0) { log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err)); break; } switch(err) { case -1: /* unrecoverable error */ log_debug(knet_h, KNET_SUB_RX, "Unable to send PMTUd reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s", src_link->outsock, errno, strerror(errno), src_link->status.src_ipaddr, src_link->status.src_port, src_link->status.dst_ipaddr, src_link->status.dst_port); src_link->status.stats.tx_pmtu_errors++; break; case 0: /* ignore error and continue */ src_link->status.stats.tx_pmtu_errors++; break; case 1: /* retry to send those same data */ src_link->status.stats.tx_pmtu_retries++; pthread_mutex_unlock(&src_link->link_stats_mutex); goto retry_pmtud; break; } pthread_mutex_unlock(&src_link->link_stats_mutex); } } pthread_mutex_unlock(&knet_h->tx_mutex); out_pmtud: return; /* Don't need to unlock link_stats_mutex */ case KNET_HEADER_TYPE_PMTUD_REPLY: src_link->status.stats.rx_pmtu_packets++; src_link->status.stats.rx_pmtu_bytes += len; /* pmtud_mutex can't be acquired while we hold a link_stats_mutex (ordering) */ pthread_mutex_unlock(&src_link->link_stats_mutex); if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to get mutex lock"); break; } src_link->last_recv_mtu = inbuf->khp_pmtud_size; pthread_cond_signal(&knet_h->pmtud_cond); pthread_mutex_unlock(&knet_h->pmtud_mutex); return; default: pthread_mutex_unlock(&src_link->link_stats_mutex); return; } pthread_mutex_unlock(&src_link->link_stats_mutex); } static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg) { int err, savederrno; int i, msg_recv, transport; if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to get global read lock"); return; } if (_is_valid_fd(knet_h, sockfd) < 1) { /* * this is normal if a fd got an event and before we grab the read lock * and the link is removed by another thread */ goto exit_unlock; } transport = knet_h->knet_transport_fd_tracker[sockfd].transport; /* * reset msg_namelen to buffer size because after recvmmsg * each msg_namelen will contain sizeof sockaddr_in or sockaddr_in6 */ for (i = 0; i < PCKT_RX_BUFS; i++) { msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); } msg_recv = _recvmmsg(sockfd, &msg[0], PCKT_RX_BUFS, MSG_DONTWAIT | MSG_NOSIGNAL); savederrno = errno; /* * WARNING: man page for recvmmsg is wrong. Kernel implementation here: * recvmmsg can return: * -1 on error * 0 if the previous run of recvmmsg recorded an error on the socket * N number of messages (see exception below). * * If there is an error from recvmsg after receiving a frame or more, the recvmmsg * loop is interrupted, error recorded in the socket (getsockopt(SO_ERROR) and * it will be visibile in the next run. * * Need to be careful how we handle errors at this stage. * * error messages need to be handled on a per transport/protocol base * at this point we have different layers of error handling * - msg_recv < 0 -> error from this run * msg_recv = 0 -> error from previous run and error on socket needs to be cleared * - per-transport message data * example: msg[i].msg_hdr.msg_flags & MSG_NOTIFICATION or msg_len for SCTP == EOF, * but for UDP it is perfectly legal to receive a 0 bytes message.. go figure * - NOTE: on SCTP MSG_NOTIFICATION we get msg_recv == PCKT_FRAG_MAX messages and no * errno set. That means the error api needs to be able to abort the loop below. */ if (msg_recv <= 0) { transport_rx_sock_error(knet_h, transport, sockfd, msg_recv, savederrno); goto exit_unlock; } for (i = 0; i < msg_recv; i++) { err = transport_rx_is_data(knet_h, transport, sockfd, &msg[i]); /* * TODO: make this section silent once we are confident * all protocols packet handlers are good */ switch(err) { case KNET_TRANSPORT_RX_ERROR: /* on error */ log_debug(knet_h, KNET_SUB_RX, "Transport reported error parsing packet"); goto exit_unlock; break; case KNET_TRANSPORT_RX_NOT_DATA_CONTINUE: /* packet is not data and we should continue the packet process loop */ log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, continue"); break; case KNET_TRANSPORT_RX_NOT_DATA_STOP: /* packet is not data and we should STOP the packet process loop */ log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, stop"); goto exit_unlock; break; case KNET_TRANSPORT_RX_IS_DATA: /* packet is data and should be parsed as such */ /* * processing incoming packets vs access lists */ if ((knet_h->use_access_lists) && (transport_get_acl_type(knet_h, transport) == USE_GENERIC_ACL)) { if (!check_validate(knet_h, sockfd, transport, msg[i].msg_hdr.msg_name)) { char src_ipaddr[KNET_MAX_HOST_LEN]; char src_port[KNET_MAX_PORT_LEN]; memset(src_ipaddr, 0, KNET_MAX_HOST_LEN); memset(src_port, 0, KNET_MAX_PORT_LEN); if (knet_addrtostr(msg[i].msg_hdr.msg_name, sockaddr_len(msg[i].msg_hdr.msg_name), src_ipaddr, KNET_MAX_HOST_LEN, src_port, KNET_MAX_PORT_LEN) < 0) { log_debug(knet_h, KNET_SUB_RX, "Packet rejected: unable to resolve host/port"); } else { log_debug(knet_h, KNET_SUB_RX, "Packet rejected from %s/%s", src_ipaddr, src_port); } /* * continue processing the other packets */ continue; } } _parse_recv_from_links(knet_h, sockfd, &msg[i]); break; case KNET_TRANSPORT_RX_OOB_DATA_CONTINUE: log_debug(knet_h, KNET_SUB_RX, "Transport is processing sock OOB data, continue"); break; case KNET_TRANSPORT_RX_OOB_DATA_STOP: log_debug(knet_h, KNET_SUB_RX, "Transport has completed processing sock OOB data, stop"); goto exit_unlock; break; } } exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); } void *_handle_recv_from_links_thread(void *data) { int i, nev; knet_handle_t knet_h = (knet_handle_t) data; struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; struct sockaddr_storage address[PCKT_RX_BUFS]; struct knet_mmsghdr msg[PCKT_RX_BUFS]; struct iovec iov_in[PCKT_RX_BUFS]; set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_STARTED); memset(&msg, 0, sizeof(msg)); for (i = 0; i < PCKT_RX_BUFS; i++) { iov_in[i].iov_base = (void *)knet_h->recv_from_links_buf[i]; iov_in[i].iov_len = KNET_DATABUFSIZE; memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr)); msg[i].msg_hdr.msg_name = &address[i]; msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); msg[i].msg_hdr.msg_iov = &iov_in[i]; msg[i].msg_hdr.msg_iovlen = 1; } while (!shutdown_in_progress(knet_h)) { nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, KNET_THREADS_TIMERES / 1000); /* * the RX threads only need to notify that there has been at least * one successful run after queue flush has been requested. * See setfwd in handle.c */ if (get_thread_flush_queue(knet_h, KNET_THREAD_RX) == KNET_THREAD_QUEUE_FLUSH) { set_thread_flush_queue(knet_h, KNET_THREAD_RX, KNET_THREAD_QUEUE_FLUSHED); } /* * we use timeout to detect if thread is shutting down */ if (nev == 0) { continue; } for (i = 0; i < nev; i++) { _handle_recv_from_links(knet_h, events[i].data.fd, msg); } } set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_STOPPED); return NULL; } + +ssize_t knet_recv(knet_handle_t knet_h, char *buff, const size_t buff_len, const int8_t channel) +{ + int savederrno = 0; + ssize_t err = 0; + struct iovec iov_in; + + if (!knet_h) { + errno = EINVAL; + return -1; + } + + if (buff == NULL) { + errno = EINVAL; + return -1; + } + + if (buff_len <= 0) { + errno = EINVAL; + return -1; + } + + if (buff_len > KNET_MAX_PACKET_SIZE) { + errno = EINVAL; + return -1; + } + + if (channel < 0) { + errno = EINVAL; + return -1; + } + + if (channel >= KNET_DATAFD_MAX) { + errno = EINVAL; + return -1; + } + + savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); + if (savederrno) { + log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", + strerror(savederrno)); + errno = savederrno; + return -1; + } + + if (!knet_h->sockfd[channel].in_use) { + savederrno = EINVAL; + err = -1; + goto out_unlock; + } + + memset(&iov_in, 0, sizeof(iov_in)); + iov_in.iov_base = (void *)buff; + iov_in.iov_len = buff_len; + + err = readv(knet_h->sockfd[channel].sockfd[0], &iov_in, 1); + savederrno = errno; + +out_unlock: + pthread_rwlock_unlock(&knet_h->global_rwlock); + errno = err ? savederrno : 0; + return err; +} diff --git a/libknet/threads_tx.c b/libknet/threads_tx.c index be5fb6b2..852064c2 100644 --- a/libknet/threads_tx.c +++ b/libknet/threads_tx.c @@ -1,837 +1,901 @@ /* * Copyright (C) 2012-2020 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include #include "compat.h" #include "compress.h" #include "crypto.h" #include "host.h" #include "link.h" #include "logging.h" #include "transports.h" #include "transport_common.h" #include "threads_common.h" #include "threads_heartbeat.h" #include "threads_tx.h" #include "netutils.h" /* * SEND */ static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_mmsghdr *msg, int msgs_to_send) { int link_idx, msg_idx, sent_msgs, prev_sent, progress; int err = 0, savederrno = 0, locked = 0; unsigned int i; struct knet_mmsghdr *cur; struct knet_link *cur_link; for (link_idx = 0; link_idx < dst_host->active_link_entries; link_idx++) { prev_sent = 0; progress = 1; locked = 0; cur_link = &dst_host->link[dst_host->active_links[link_idx]]; if (cur_link->transport == KNET_TRANSPORT_LOOPBACK) { continue; } savederrno = pthread_mutex_lock(&cur_link->link_stats_mutex); if (savederrno) { log_err(knet_h, KNET_SUB_TX, "Unable to get stats mutex lock for host %u link %u: %s", dst_host->host_id, cur_link->link_id, strerror(savederrno)); continue; } locked = 1; msg_idx = 0; while (msg_idx < msgs_to_send) { msg[msg_idx].msg_hdr.msg_name = &cur_link->dst_addr; /* Cast for Linux/BSD compatibility */ for (i=0; i<(unsigned int)msg[msg_idx].msg_hdr.msg_iovlen; i++) { cur_link->status.stats.tx_data_bytes += msg[msg_idx].msg_hdr.msg_iov[i].iov_len; } cur_link->status.stats.tx_data_packets++; msg_idx++; } retry: cur = &msg[prev_sent]; sent_msgs = _sendmmsg(dst_host->link[dst_host->active_links[link_idx]].outsock, transport_get_connection_oriented(knet_h, dst_host->link[dst_host->active_links[link_idx]].transport), &cur[0], msgs_to_send - prev_sent, MSG_DONTWAIT | MSG_NOSIGNAL); savederrno = errno; err = transport_tx_sock_error(knet_h, dst_host->link[dst_host->active_links[link_idx]].transport, dst_host->link[dst_host->active_links[link_idx]].outsock, sent_msgs, savederrno); switch(err) { case -1: /* unrecoverable error */ cur_link->status.stats.tx_data_errors++; goto out_unlock; break; case 0: /* ignore error and continue */ break; case 1: /* retry to send those same data */ cur_link->status.stats.tx_data_retries++; goto retry; break; } prev_sent = prev_sent + sent_msgs; if ((sent_msgs >= 0) && (prev_sent < msgs_to_send)) { if ((sent_msgs) || (progress)) { if (sent_msgs) { progress = 1; } else { progress = 0; } #ifdef DEBUG log_debug(knet_h, KNET_SUB_TX, "Unable to send all (%d/%d) data packets to host %s (%u) link %s:%s (%u)", sent_msgs, msg_idx, dst_host->name, dst_host->host_id, dst_host->link[dst_host->active_links[link_idx]].status.dst_ipaddr, dst_host->link[dst_host->active_links[link_idx]].status.dst_port, dst_host->link[dst_host->active_links[link_idx]].link_id); #endif goto retry; } if (!progress) { savederrno = EAGAIN; err = -1; goto out_unlock; } } if ((dst_host->link_handler_policy == KNET_LINK_POLICY_RR) && (dst_host->active_link_entries > 1)) { uint8_t cur_link_id = dst_host->active_links[0]; memmove(&dst_host->active_links[0], &dst_host->active_links[1], KNET_MAX_LINK - 1); dst_host->active_links[dst_host->active_link_entries - 1] = cur_link_id; break; } pthread_mutex_unlock(&cur_link->link_stats_mutex); locked = 0; } out_unlock: if (locked) { pthread_mutex_unlock(&cur_link->link_stats_mutex); } errno = savederrno; return err; } static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t channel, int is_sync) { size_t outlen, frag_len; struct knet_host *dst_host; knet_node_id_t dst_host_ids_temp[KNET_MAX_HOST]; size_t dst_host_ids_entries_temp = 0; knet_node_id_t dst_host_ids[KNET_MAX_HOST]; size_t dst_host_ids_entries = 0; int bcast = 1; struct knet_hostinfo *knet_hostinfo; struct iovec iov_out[PCKT_FRAG_MAX][2]; int iovcnt_out = 2; uint8_t frag_idx; unsigned int temp_data_mtu; size_t host_idx; int send_mcast = 0; struct knet_header *inbuf; int savederrno = 0; int err = 0; seq_num_t tx_seq_num; struct knet_mmsghdr msg[PCKT_FRAG_MAX]; int msgs_to_send, msg_idx; unsigned int i; int j; int send_local = 0; int data_compressed = 0; size_t uncrypted_frag_size; int stats_locked = 0, stats_err = 0; inbuf = knet_h->recv_from_sock_buf; if ((knet_h->enabled != 1) && (inbuf->kh_type != KNET_HEADER_TYPE_HOST_INFO)) { /* data forward is disabled */ log_debug(knet_h, KNET_SUB_TX, "Received data packet but forwarding is disabled"); savederrno = ECANCELED; err = -1; goto out_unlock; } /* * move this into a separate function to expand on * extra switching rules */ switch(inbuf->kh_type) { case KNET_HEADER_TYPE_DATA: if (knet_h->dst_host_filter_fn) { bcast = knet_h->dst_host_filter_fn( knet_h->dst_host_filter_fn_private_data, (const unsigned char *)inbuf->khp_data_userdata, inlen, KNET_NOTIFY_TX, knet_h->host_id, knet_h->host_id, &channel, dst_host_ids_temp, &dst_host_ids_entries_temp); if (bcast < 0) { log_debug(knet_h, KNET_SUB_TX, "Error from dst_host_filter_fn: %d", bcast); savederrno = EFAULT; err = -1; goto out_unlock; } if ((!bcast) && (!dst_host_ids_entries_temp)) { log_debug(knet_h, KNET_SUB_TX, "Message is unicast but no dst_host_ids_entries"); savederrno = EINVAL; err = -1; goto out_unlock; } if ((!bcast) && (dst_host_ids_entries_temp > KNET_MAX_HOST)) { log_debug(knet_h, KNET_SUB_TX, "dst_host_filter_fn returned too many destinations"); savederrno = EINVAL; err = -1; goto out_unlock; } } /* Send to localhost if appropriate and enabled */ if (knet_h->has_loop_link) { send_local = 0; if (bcast) { send_local = 1; } else { for (i=0; i< dst_host_ids_entries_temp; i++) { if (dst_host_ids_temp[i] == knet_h->host_id) { send_local = 1; } } } if (send_local) { const unsigned char *buf = inbuf->khp_data_userdata; ssize_t buflen = inlen; struct knet_link *local_link; local_link = knet_h->host_index[knet_h->host_id]->link; local_retry: err = write(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], buf, buflen); if (err < 0) { log_err(knet_h, KNET_SUB_TRANSP_LOOPBACK, "send local failed. error=%s\n", strerror(errno)); local_link->status.stats.tx_data_errors++; } if (err > 0 && err < buflen) { log_debug(knet_h, KNET_SUB_TRANSP_LOOPBACK, "send local incomplete=%d bytes of %zu\n", err, inlen); local_link->status.stats.tx_data_retries++; buf += err; buflen -= err; goto local_retry; } if (err == buflen) { local_link->status.stats.tx_data_packets++; local_link->status.stats.tx_data_bytes += inlen; } } } break; case KNET_HEADER_TYPE_HOST_INFO: knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata; if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) { bcast = 0; dst_host_ids_temp[0] = knet_hostinfo->khi_dst_node_id; dst_host_ids_entries_temp = 1; knet_hostinfo->khi_dst_node_id = htons(knet_hostinfo->khi_dst_node_id); } break; default: log_warn(knet_h, KNET_SUB_TX, "Receiving unknown messages from socket"); savederrno = ENOMSG; err = -1; goto out_unlock; break; } if (is_sync) { if ((bcast) || ((!bcast) && (dst_host_ids_entries_temp > 1))) { log_debug(knet_h, KNET_SUB_TX, "knet_send_sync is only supported with unicast packets for one destination"); savederrno = E2BIG; err = -1; goto out_unlock; } } /* * check destinations hosts before spending time * in fragmenting/encrypting packets to save * time processing data for unreachable hosts. * for unicast, also remap the destination data * to skip unreachable hosts. */ if (!bcast) { dst_host_ids_entries = 0; for (host_idx = 0; host_idx < dst_host_ids_entries_temp; host_idx++) { dst_host = knet_h->host_index[dst_host_ids_temp[host_idx]]; if (!dst_host) { continue; } if (!(dst_host->host_id == knet_h->host_id && knet_h->has_loop_link) && dst_host->status.reachable) { dst_host_ids[dst_host_ids_entries] = dst_host_ids_temp[host_idx]; dst_host_ids_entries++; } } if (!dst_host_ids_entries) { savederrno = EHOSTDOWN; err = -1; goto out_unlock; } } else { send_mcast = 0; for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { if (!(dst_host->host_id == knet_h->host_id && knet_h->has_loop_link) && dst_host->status.reachable) { send_mcast = 1; break; } } if (!send_mcast) { savederrno = EHOSTDOWN; err = -1; goto out_unlock; } } if (!knet_h->data_mtu) { /* * using MIN_MTU_V4 for data mtu is not completely accurate but safe enough */ log_debug(knet_h, KNET_SUB_TX, "Received data packet but data MTU is still unknown." " Packet might not be delivered." " Assuming minimum IPv4 MTU (%d)", KNET_PMTUD_MIN_MTU_V4); temp_data_mtu = KNET_PMTUD_MIN_MTU_V4; } else { /* * take a copy of the mtu to avoid value changing under * our feet while we are sending a fragmented pckt */ temp_data_mtu = knet_h->data_mtu; } /* * compress data */ if ((knet_h->compress_model > 0) && (inlen > knet_h->compress_threshold)) { size_t cmp_outlen = KNET_DATABUFSIZE_COMPRESS; struct timespec start_time; struct timespec end_time; uint64_t compress_time; clock_gettime(CLOCK_MONOTONIC, &start_time); err = compress(knet_h, (const unsigned char *)inbuf->khp_data_userdata, inlen, knet_h->send_to_links_buf_compress, (ssize_t *)&cmp_outlen); savederrno = errno; stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); if (stats_err < 0) { log_err(knet_h, KNET_SUB_TX, "Unable to get mutex lock: %s", strerror(stats_err)); err = -1; savederrno = stats_err; goto out_unlock; } stats_locked = 1; /* Collect stats */ clock_gettime(CLOCK_MONOTONIC, &end_time); timespec_diff(start_time, end_time, &compress_time); if (compress_time < knet_h->stats.tx_compress_time_min) { knet_h->stats.tx_compress_time_min = compress_time; } if (compress_time > knet_h->stats.tx_compress_time_max) { knet_h->stats.tx_compress_time_max = compress_time; } knet_h->stats.tx_compress_time_ave = (unsigned long long)(knet_h->stats.tx_compress_time_ave * knet_h->stats.tx_compressed_packets + compress_time) / (knet_h->stats.tx_compressed_packets+1); if (err < 0) { log_warn(knet_h, KNET_SUB_COMPRESS, "Compression failed (%d): %s", err, strerror(savederrno)); } else { knet_h->stats.tx_compressed_packets++; knet_h->stats.tx_compressed_original_bytes += inlen; knet_h->stats.tx_compressed_size_bytes += cmp_outlen; if (cmp_outlen < inlen) { memmove(inbuf->khp_data_userdata, knet_h->send_to_links_buf_compress, cmp_outlen); inlen = cmp_outlen; data_compressed = 1; } } } if (!stats_locked) { stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); if (stats_err < 0) { log_err(knet_h, KNET_SUB_TX, "Unable to get mutex lock: %s", strerror(stats_err)); err = -1; savederrno = stats_err; goto out_unlock; } } if (knet_h->compress_model > 0 && !data_compressed) { knet_h->stats.tx_uncompressed_packets++; } pthread_mutex_unlock(&knet_h->handle_stats_mutex); stats_locked = 0; /* * prepare the outgoing buffers */ frag_len = inlen; frag_idx = 0; inbuf->khp_data_bcast = bcast; inbuf->khp_data_frag_num = ceil((float)inlen / temp_data_mtu); inbuf->khp_data_channel = channel; if (data_compressed) { inbuf->khp_data_compress = knet_h->compress_model; } else { inbuf->khp_data_compress = 0; } if (pthread_mutex_lock(&knet_h->tx_seq_num_mutex)) { log_debug(knet_h, KNET_SUB_TX, "Unable to get seq mutex lock"); goto out_unlock; } knet_h->tx_seq_num++; /* * force seq_num 0 to detect a node that has crashed and rejoining * the knet instance. seq_num 0 will clear the buffers in the RX * thread */ if (knet_h->tx_seq_num == 0) { knet_h->tx_seq_num++; } /* * cache the value in locked context */ tx_seq_num = knet_h->tx_seq_num; inbuf->khp_data_seq_num = htons(knet_h->tx_seq_num); pthread_mutex_unlock(&knet_h->tx_seq_num_mutex); /* * forcefully broadcast a ping to all nodes every SEQ_MAX / 8 * pckts. * this solves 2 problems: * 1) on TX socket overloads we generate extra pings to keep links alive * 2) in 3+ nodes setup, where all the traffic is flowing between node 1 and 2, * node 3+ will be able to keep in sync on the TX seq_num even without * receiving traffic or pings in betweens. This avoids issues with * rollover of the circular buffer */ if (tx_seq_num % (SEQ_MAX / 8) == 0) { _send_pings(knet_h, 0); } if (inbuf->khp_data_frag_num > 1) { while (frag_idx < inbuf->khp_data_frag_num) { /* * set the iov_base */ iov_out[frag_idx][0].iov_base = (void *)knet_h->send_to_links_buf[frag_idx]; iov_out[frag_idx][0].iov_len = KNET_HEADER_DATA_SIZE; iov_out[frag_idx][1].iov_base = inbuf->khp_data_userdata + (temp_data_mtu * frag_idx); /* * set the len */ if (frag_len > temp_data_mtu) { iov_out[frag_idx][1].iov_len = temp_data_mtu; } else { iov_out[frag_idx][1].iov_len = frag_len; } /* * copy the frag info on all buffers */ knet_h->send_to_links_buf[frag_idx]->kh_type = inbuf->kh_type; knet_h->send_to_links_buf[frag_idx]->khp_data_seq_num = inbuf->khp_data_seq_num; knet_h->send_to_links_buf[frag_idx]->khp_data_frag_num = inbuf->khp_data_frag_num; knet_h->send_to_links_buf[frag_idx]->khp_data_bcast = inbuf->khp_data_bcast; knet_h->send_to_links_buf[frag_idx]->khp_data_channel = inbuf->khp_data_channel; knet_h->send_to_links_buf[frag_idx]->khp_data_compress = inbuf->khp_data_compress; frag_len = frag_len - temp_data_mtu; frag_idx++; } iovcnt_out = 2; } else { iov_out[frag_idx][0].iov_base = (void *)inbuf; iov_out[frag_idx][0].iov_len = frag_len + KNET_HEADER_DATA_SIZE; iovcnt_out = 1; } if (knet_h->crypto_in_use_config) { struct timespec start_time; struct timespec end_time; uint64_t crypt_time; frag_idx = 0; while (frag_idx < inbuf->khp_data_frag_num) { clock_gettime(CLOCK_MONOTONIC, &start_time); if (crypto_encrypt_and_signv( knet_h, iov_out[frag_idx], iovcnt_out, knet_h->send_to_links_buf_crypt[frag_idx], (ssize_t *)&outlen) < 0) { log_debug(knet_h, KNET_SUB_TX, "Unable to encrypt packet"); savederrno = ECHILD; err = -1; goto out_unlock; } clock_gettime(CLOCK_MONOTONIC, &end_time); timespec_diff(start_time, end_time, &crypt_time); stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); if (stats_err < 0) { log_err(knet_h, KNET_SUB_TX, "Unable to get mutex lock: %s", strerror(stats_err)); err = -1; savederrno = stats_err; goto out_unlock; } if (crypt_time < knet_h->stats.tx_crypt_time_min) { knet_h->stats.tx_crypt_time_min = crypt_time; } if (crypt_time > knet_h->stats.tx_crypt_time_max) { knet_h->stats.tx_crypt_time_max = crypt_time; } knet_h->stats.tx_crypt_time_ave = (knet_h->stats.tx_crypt_time_ave * knet_h->stats.tx_crypt_packets + crypt_time) / (knet_h->stats.tx_crypt_packets+1); uncrypted_frag_size = 0; for (j=0; j < iovcnt_out; j++) { uncrypted_frag_size += iov_out[frag_idx][j].iov_len; } knet_h->stats.tx_crypt_byte_overhead += (outlen - uncrypted_frag_size); knet_h->stats.tx_crypt_packets++; pthread_mutex_unlock(&knet_h->handle_stats_mutex); iov_out[frag_idx][0].iov_base = knet_h->send_to_links_buf_crypt[frag_idx]; iov_out[frag_idx][0].iov_len = outlen; frag_idx++; } iovcnt_out = 1; } memset(&msg, 0, sizeof(msg)); msgs_to_send = inbuf->khp_data_frag_num; msg_idx = 0; while (msg_idx < msgs_to_send) { msg[msg_idx].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); msg[msg_idx].msg_hdr.msg_iov = &iov_out[msg_idx][0]; msg[msg_idx].msg_hdr.msg_iovlen = iovcnt_out; msg_idx++; } if (!bcast) { for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) { dst_host = knet_h->host_index[dst_host_ids[host_idx]]; err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send); savederrno = errno; if (err) { goto out_unlock; } } } else { for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { if (dst_host->status.reachable) { err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send); savederrno = errno; if (err) { goto out_unlock; } } } } out_unlock: errno = savederrno; return err; } -int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel) -{ - int savederrno = 0, err = 0; - - if (!knet_h) { - errno = EINVAL; - return -1; - } - - if (buff == NULL) { - errno = EINVAL; - return -1; - } - - if (buff_len <= 0) { - errno = EINVAL; - return -1; - } - - if (buff_len > KNET_MAX_PACKET_SIZE) { - errno = EINVAL; - return -1; - } - - if (channel < 0) { - errno = EINVAL; - return -1; - } - - if (channel >= KNET_DATAFD_MAX) { - errno = EINVAL; - return -1; - } - - savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); - if (savederrno) { - log_err(knet_h, KNET_SUB_TX, "Unable to get read lock: %s", - strerror(savederrno)); - errno = savederrno; - return -1; - } - - if (!knet_h->sockfd[channel].in_use) { - savederrno = EINVAL; - err = -1; - goto out; - } - - savederrno = pthread_mutex_lock(&knet_h->tx_mutex); - if (savederrno) { - log_err(knet_h, KNET_SUB_TX, "Unable to get TX mutex lock: %s", - strerror(savederrno)); - err = -1; - goto out; - } - - knet_h->recv_from_sock_buf->kh_type = KNET_HEADER_TYPE_DATA; - memmove(knet_h->recv_from_sock_buf->khp_data_userdata, buff, buff_len); - err = _parse_recv_from_sock(knet_h, buff_len, channel, 1); - savederrno = errno; - - pthread_mutex_unlock(&knet_h->tx_mutex); - -out: - pthread_rwlock_unlock(&knet_h->global_rwlock); - - errno = err ? savederrno : 0; - return err; -} - static void _handle_send_to_links(knet_handle_t knet_h, struct msghdr *msg, int sockfd, int8_t channel, int type) { ssize_t inlen = 0; int savederrno = 0, docallback = 0; if ((channel >= 0) && (channel < KNET_DATAFD_MAX) && (!knet_h->sockfd[channel].is_socket)) { inlen = readv(sockfd, msg->msg_iov, 1); } else { inlen = recvmsg(sockfd, msg, MSG_DONTWAIT | MSG_NOSIGNAL); if (msg->msg_flags & MSG_TRUNC) { log_warn(knet_h, KNET_SUB_TX, "Received truncated message from sock %d. Discarding", sockfd); return; } } if (inlen == 0) { savederrno = 0; docallback = 1; } else if (inlen < 0) { struct epoll_event ev; savederrno = errno; docallback = 1; memset(&ev, 0, sizeof(struct epoll_event)); if (channel != KNET_INTERNAL_DATA_CHANNEL) { if (epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) { log_err(knet_h, KNET_SUB_TX, "Unable to del datafd %d from linkfd epoll pool: %s", knet_h->sockfd[channel].sockfd[0], strerror(savederrno)); } else { knet_h->sockfd[channel].has_error = 1; } } /* * TODO: add error handling for KNET_INTERNAL_DATA_CHANNEL * once we add support for internal knet communication */ } else { knet_h->recv_from_sock_buf->kh_type = type; _parse_recv_from_sock(knet_h, inlen, channel, 0); } if ((docallback) && (channel != KNET_INTERNAL_DATA_CHANNEL)) { knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data, knet_h->sockfd[channel].sockfd[0], channel, KNET_NOTIFY_TX, inlen, savederrno); } } void *_handle_send_to_links_thread(void *data) { knet_handle_t knet_h = (knet_handle_t) data; struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; int i, nev, type; int flush, flush_queue_limit; int8_t channel; struct iovec iov_in; struct msghdr msg; struct sockaddr_storage address; set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_STARTED); memset(&iov_in, 0, sizeof(iov_in)); iov_in.iov_base = (void *)knet_h->recv_from_sock_buf->khp_data_userdata; iov_in.iov_len = KNET_MAX_PACKET_SIZE; memset(&msg, 0, sizeof(struct msghdr)); msg.msg_name = &address; msg.msg_namelen = sizeof(struct sockaddr_storage); msg.msg_iov = &iov_in; msg.msg_iovlen = 1; knet_h->recv_from_sock_buf->kh_version = KNET_HEADER_VERSION; knet_h->recv_from_sock_buf->khp_data_frag_seq = 0; knet_h->recv_from_sock_buf->kh_node = htons(knet_h->host_id); for (i = 0; i < PCKT_FRAG_MAX; i++) { knet_h->send_to_links_buf[i]->kh_version = KNET_HEADER_VERSION; knet_h->send_to_links_buf[i]->khp_data_frag_seq = i + 1; knet_h->send_to_links_buf[i]->kh_node = htons(knet_h->host_id); } flush_queue_limit = 0; while (!shutdown_in_progress(knet_h)) { nev = epoll_wait(knet_h->send_to_links_epollfd, events, KNET_EPOLL_MAX_EVENTS + 1, KNET_THREADS_TIMERES / 1000); flush = get_thread_flush_queue(knet_h, KNET_THREAD_TX); /* * we use timeout to detect if thread is shutting down */ if (nev == 0) { /* * ideally we want to communicate that we are done flushing * the queue when we have an epoll timeout event */ if (flush == KNET_THREAD_QUEUE_FLUSH) { set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSHED); flush_queue_limit = 0; } continue; } /* * fall back in case the TX sockets will continue receive traffic * and we do not hit an epoll timeout. * * allow up to a 100 loops to flush queues, then we give up. * there might be more clean ways to do it by checking the buffer queue * on each socket, but we have tons of sockets and calculations can go wrong. * Also, why would you disable data forwarding and still send packets? */ if (flush == KNET_THREAD_QUEUE_FLUSH) { if (flush_queue_limit >= 100) { log_debug(knet_h, KNET_SUB_TX, "Timeout flushing the TX queue, expect packet loss"); set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSHED); flush_queue_limit = 0; } else { flush_queue_limit++; } } else { flush_queue_limit = 0; } if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) { log_debug(knet_h, KNET_SUB_TX, "Unable to get read lock"); continue; } for (i = 0; i < nev; i++) { if (events[i].data.fd == knet_h->hostsockfd[0]) { type = KNET_HEADER_TYPE_HOST_INFO; channel = KNET_INTERNAL_DATA_CHANNEL; } else { type = KNET_HEADER_TYPE_DATA; for (channel = 0; channel < KNET_DATAFD_MAX; channel++) { if ((knet_h->sockfd[channel].in_use) && (knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created] == events[i].data.fd)) { break; } } if (channel >= KNET_DATAFD_MAX) { log_debug(knet_h, KNET_SUB_TX, "No available channels"); continue; /* channel not found */ } } if (pthread_mutex_lock(&knet_h->tx_mutex) != 0) { log_debug(knet_h, KNET_SUB_TX, "Unable to get mutex lock"); continue; } _handle_send_to_links(knet_h, &msg, events[i].data.fd, channel, type); pthread_mutex_unlock(&knet_h->tx_mutex); } pthread_rwlock_unlock(&knet_h->global_rwlock); } set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_STOPPED); return NULL; } + +int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel) +{ + int savederrno = 0, err = 0; + + if (!knet_h) { + errno = EINVAL; + return -1; + } + + if (buff == NULL) { + errno = EINVAL; + return -1; + } + + if (buff_len <= 0) { + errno = EINVAL; + return -1; + } + + if (buff_len > KNET_MAX_PACKET_SIZE) { + errno = EINVAL; + return -1; + } + + if (channel < 0) { + errno = EINVAL; + return -1; + } + + if (channel >= KNET_DATAFD_MAX) { + errno = EINVAL; + return -1; + } + + savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); + if (savederrno) { + log_err(knet_h, KNET_SUB_TX, "Unable to get read lock: %s", + strerror(savederrno)); + errno = savederrno; + return -1; + } + + if (!knet_h->sockfd[channel].in_use) { + savederrno = EINVAL; + err = -1; + goto out; + } + + savederrno = pthread_mutex_lock(&knet_h->tx_mutex); + if (savederrno) { + log_err(knet_h, KNET_SUB_TX, "Unable to get TX mutex lock: %s", + strerror(savederrno)); + err = -1; + goto out; + } + + knet_h->recv_from_sock_buf->kh_type = KNET_HEADER_TYPE_DATA; + memmove(knet_h->recv_from_sock_buf->khp_data_userdata, buff, buff_len); + err = _parse_recv_from_sock(knet_h, buff_len, channel, 1); + savederrno = errno; + + pthread_mutex_unlock(&knet_h->tx_mutex); + +out: + pthread_rwlock_unlock(&knet_h->global_rwlock); + + errno = err ? savederrno : 0; + return err; +} + +ssize_t knet_send(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel) +{ + int savederrno = 0; + ssize_t err = 0; + struct iovec iov_out[1]; + + if (!knet_h) { + errno = EINVAL; + return -1; + } + + if (buff == NULL) { + errno = EINVAL; + return -1; + } + + if (buff_len <= 0) { + errno = EINVAL; + return -1; + } + + if (buff_len > KNET_MAX_PACKET_SIZE) { + errno = EINVAL; + return -1; + } + + if (channel < 0) { + errno = EINVAL; + return -1; + } + + if (channel >= KNET_DATAFD_MAX) { + errno = EINVAL; + return -1; + } + + savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); + if (savederrno) { + log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", + strerror(savederrno)); + errno = savederrno; + return -1; + } + + if (!knet_h->sockfd[channel].in_use) { + savederrno = EINVAL; + err = -1; + goto out_unlock; + } + + memset(iov_out, 0, sizeof(iov_out)); + + iov_out[0].iov_base = (void *)buff; + iov_out[0].iov_len = buff_len; + + err = writev(knet_h->sockfd[channel].sockfd[0], iov_out, 1); + savederrno = errno; + +out_unlock: + pthread_rwlock_unlock(&knet_h->global_rwlock); + errno = err ? savederrno : 0; + return err; +}