diff --git a/configure.ac b/configure.ac index 25e5ebc0..d9adc50a 100644 --- a/configure.ac +++ b/configure.ac @@ -1,577 +1,584 @@ # # Copyright (C) 2010-2021 Red Hat, Inc. All rights reserved. # # Authors: Fabio M. Di Nitto # Federico Simoncelli # # This software licensed under GPL-2.0+ # # -*- Autoconf -*- # Process this file with autoconf to produce a configure script. # AC_PREREQ([2.63]) AC_INIT([kronosnet], m4_esyscmd([build-aux/git-version-gen .tarball-version .gitarchivever]), [devel@lists.kronosnet.org]) # Don't let AC_PROC_CC (invoked by AC_USE_SYSTEM_EXTENSIONS) replace # undefined CFLAGS with -g -O2, overriding our special OPT_CFLAGS. : ${CFLAGS=""} AC_USE_SYSTEM_EXTENSIONS AM_INIT_AUTOMAKE([1.13 dist-bzip2 dist-xz color-tests -Wno-portability subdir-objects]) LT_PREREQ([2.2.6]) # --enable-new-dtags: Use RUNPATH instead of RPATH. # It is necessary to have this done before libtool does linker detection. # See also: https://github.com/kronosnet/kronosnet/issues/107 # --as-needed: Modern systems have builtin ceil() making -lm superfluous but # AC_SEARCH_LIBS can't detect this because it tests with a false prototype AX_CHECK_LINK_FLAG([-Wl,--enable-new-dtags], [AM_LDFLAGS=-Wl,--enable-new-dtags], [AC_MSG_ERROR(["Linker support for --enable-new-dtags is required"])]) AX_CHECK_LINK_FLAG([-Wl,--as-needed], [AM_LDFLAGS="$AM_LDFLAGS -Wl,--as-needed"]) saved_LDFLAGS="$LDFLAGS" LDFLAGS="$AM_LDFLAGS $LDFLAGS" LT_INIT LDFLAGS="$saved_LDFLAGS" AC_CONFIG_MACRO_DIR([m4]) AC_CONFIG_SRCDIR([libknet/handle.c]) AC_CONFIG_HEADERS([config.h]) AC_CANONICAL_HOST AC_LANG([C]) if test "$prefix" = "NONE"; then prefix="/usr" if test "$localstatedir" = "\${prefix}/var"; then localstatedir="/var" fi if test "$libdir" = "\${exec_prefix}/lib"; then if test -e /usr/lib64; then libdir="/usr/lib64" else libdir="/usr/lib" fi fi fi AC_PROG_AWK AC_PROG_GREP AC_PROG_SED AC_PROG_CPP AC_PROG_CC AC_PROG_CC_C99 if test "x$ac_cv_prog_cc_c99" = "xno"; then AC_MSG_ERROR(["C99 support is required"]) fi AC_PROG_LN_S AC_PROG_INSTALL AC_PROG_MAKE_SET PKG_PROG_PKG_CONFIG AC_CHECK_PROGS([VALGRIND_EXEC], [valgrind]) AM_CONDITIONAL([HAS_VALGRIND], [test x$VALGRIND_EXEC != "x"]) AC_CHECK_PROGS([COVBUILD_EXEC], [cov-build]) AM_CONDITIONAL([HAS_COVBUILD], [test x$COVBUILD_EXEC != "x"]) AC_CHECK_PROGS([COVANALYZE_EXEC], [cov-analyze]) AM_CONDITIONAL([HAS_COVANALYZE], [test x$COVANALYZE_EXEC != "x"]) AC_CHECK_PROGS([COVFORMATERRORS_EXEC], [cov-format-errors]) AM_CONDITIONAL([HAS_COVFORMATERRORS], [test x$COVFORMATERRORS_EXEC != "x"]) # KNET_OPTION_DEFINES(stem,type,detection code) # stem: enters name of option, Automake conditional and preprocessor define # type: compress or crypto, determines where the default comes from AC_DEFUN([KNET_OPTION_DEFINES],[ AC_ARG_ENABLE([$2-$1],[AS_HELP_STRING([--disable-$2-$1],[disable libknet $1 support])],, [enable_$2_$1="$enable_$2_all"]) AM_CONDITIONAL([BUILD_]m4_toupper([$2_$1]),[test "x$enable_$2_$1" = xyes]) if test "x$enable_$2_$1" = xyes; then $3 fi AC_DEFINE_UNQUOTED([WITH_]m4_toupper([$2_$1]), [`test "x$enable_$2_$1" != xyes; echo $?`], $1 $2 [built in]) ]) AC_ARG_ENABLE([man], [AS_HELP_STRING([--disable-man],[disable man page creation])],, [ enable_man="yes" ]) AM_CONDITIONAL([BUILD_MAN], [test x$enable_man = xyes]) AC_ARG_ENABLE([libknet-sctp], [AS_HELP_STRING([--disable-libknet-sctp],[disable libknet SCTP support])],, [ enable_libknet_sctp="yes" ]) AM_CONDITIONAL([BUILD_SCTP], [test x$enable_libknet_sctp = xyes]) AC_ARG_ENABLE([functional-tests], [AS_HELP_STRING([--disable-functional-tests],[disable execution of functional tests, useful for old and slow arches])],, [ enable_functional_tests="yes" ]) AM_CONDITIONAL([RUN_FUN_TESTS], [test x$enable_functional_tests = xyes]) AC_ARG_ENABLE([crypto-all], [AS_HELP_STRING([--disable-crypto-all],[disable libknet all crypto modules support])],, [ enable_crypto_all="yes" ]) KNET_OPTION_DEFINES([nss],[crypto],[PKG_CHECK_MODULES([nss], [nss])]) KNET_OPTION_DEFINES([openssl],[crypto],[PKG_CHECK_MODULES([openssl], [libcrypto])]) # use gcry_mac_open to detect if libgcrypt is new enough KNET_OPTION_DEFINES([gcrypt],[crypto],[ PKG_CHECK_MODULES([gcrypt], [libgcrypt >= 1.8.0],, [AC_CHECK_HEADERS([gcrypt.h], [AC_CHECK_LIB([gcrypt], [gcry_mac_open], [AC_SUBST([gcrypt_LIBS], ["-lgcrypt -ldl -lgpg-error"])])], [AC_MSG_ERROR(["missing required gcrypt.h"])])]) ]) AC_ARG_ENABLE([compress-all], [AS_HELP_STRING([--disable-compress-all],[disable libknet all compress modules support])],, [ enable_compress_all="yes" ]) KNET_OPTION_DEFINES([zstd],[compress],[PKG_CHECK_MODULES([libzstd], [libzstd])]) KNET_OPTION_DEFINES([zlib],[compress],[PKG_CHECK_MODULES([zlib], [zlib])]) KNET_OPTION_DEFINES([lz4],[compress],[PKG_CHECK_MODULES([liblz4], [liblz4])]) KNET_OPTION_DEFINES([lzo2],[compress],[ PKG_CHECK_MODULES([lzo2], [lzo2], [# work around broken pkg-config file in v2.10 AC_SUBST([lzo2_CFLAGS],[`echo $lzo2_CFLAGS | sed 's,/lzo *, ,'`])], [AC_CHECK_HEADERS([lzo/lzo1x.h], [AC_CHECK_LIB([lzo2], [lzo1x_decompress_safe], [AC_SUBST([lzo2_LIBS], [-llzo2])])], [AC_MSG_ERROR(["missing required lzo/lzo1x.h header"])])]) ]) KNET_OPTION_DEFINES([lzma],[compress],[PKG_CHECK_MODULES([liblzma], [liblzma])]) KNET_OPTION_DEFINES([bzip2],[compress],[ PKG_CHECK_MODULES([bzip2], [bzip2],, [AC_CHECK_HEADERS([bzlib.h], [AC_CHECK_LIB([bz2], [BZ2_bzBuffToBuffCompress], [AC_SUBST([bzip2_LIBS], [-lbz2])])], [AC_MSG_ERROR(["missing required bzlib.h"])])]) ]) AC_ARG_ENABLE([install-tests], [AS_HELP_STRING([--enable-install-tests],[install tests])],, [ enable_install_tests="no" ]) AM_CONDITIONAL([INSTALL_TESTS], [test x$enable_install_tests = xyes]) AC_ARG_ENABLE([runautogen], [AS_HELP_STRING([--enable-runautogen],[run autogen.sh])],, [ enable_runautogen="no" ]) AM_CONDITIONAL([BUILD_RUNAUTOGEN], [test x$enable_runautogen = xyes]) override_rpm_debuginfo_option="yes" AC_ARG_ENABLE([rpm-debuginfo], [AS_HELP_STRING([--enable-rpm-debuginfo],[build debuginfo packages])],, [ enable_rpm_debuginfo="no", override_rpm_debuginfo_option="no" ]) AM_CONDITIONAL([BUILD_RPM_DEBUGINFO], [test x$enable_rpm_debuginfo = xyes]) AM_CONDITIONAL([OVERRIDE_RPM_DEBUGINFO], [test x$override_rpm_debuginfo_option = xyes]) AC_ARG_ENABLE([libnozzle], [AS_HELP_STRING([--enable-libnozzle],[libnozzle support])],, [ enable_libnozzle="yes" ]) AM_CONDITIONAL([BUILD_LIBNOZZLE], [test x$enable_libnozzle = xyes]) AC_ARG_ENABLE([rust-bindings], [AS_HELP_STRING([--enable-rust-bindings],[rust bindings support])],, [ enable_rust_bindings="no" ]) AM_CONDITIONAL([BUILD_RUST_BINDINGS], [test x$enable_rust_bindings = xyes]) ## local helper functions # this function checks if CC support options passed as # args. Global CPPFLAGS are ignored during this test. cc_supports_flag() { saveCPPFLAGS="$CPPFLAGS" CPPFLAGS="$@" if echo $CC | grep -q clang; then CPPFLAGS="-Werror $CPPFLAGS" fi AC_MSG_CHECKING([whether $CC supports "$@"]) AC_COMPILE_IFELSE([AC_LANG_PROGRAM([])], [RC=0; AC_MSG_RESULT([yes])], [RC=1; AC_MSG_RESULT([no])]) CPPFLAGS="$saveCPPFLAGS" return $RC } # Checks for libraries. AX_PTHREAD(,[AC_MSG_ERROR([POSIX threads support is required])]) saved_LIBS="$LIBS" LIBS= AC_SEARCH_LIBS([ceil], [m], , [AC_MSG_ERROR([ceil not found])]) AC_SUBST([m_LIBS], [$LIBS]) LIBS= AC_SEARCH_LIBS([clock_gettime], [rt], , [AC_MSG_ERROR([clock_gettime not found])]) AC_SUBST([rt_LIBS], [$LIBS]) LIBS= AC_SEARCH_LIBS([dlopen], [dl dld], , [AC_MSG_ERROR([dlopen not found])]) AC_SUBST([dl_LIBS], [$LIBS]) LIBS="$saved_LIBS" # Check RTLD_DI_ORIGIN (not decalred by musl. glibc has it as an enum so cannot use ifdef) AC_CHECK_DECL([RTLD_DI_ORIGIN], [AC_DEFINE([HAVE_RTLD_DI_ORIGIN], 1, [define when RTLD_DI_ORIGIN is declared])], ,[[#include ]]) # OS detection AC_MSG_CHECKING([for os in ${host_os}]) case "$host_os" in *linux*) AC_DEFINE_UNQUOTED([KNET_LINUX], [1], [Compiling for Linux platform]) AC_MSG_RESULT([Linux]) ;; *bsd*) AC_DEFINE_UNQUOTED([KNET_BSD], [1], [Compiling for BSD platform]) AC_MSG_RESULT([BSD]) ;; *) AC_MSG_ERROR([Unsupported OS? hmmmm]) ;; esac # Checks for header files. AC_CHECK_HEADERS([sys/epoll.h]) AC_CHECK_FUNCS([kevent]) # if neither sys/epoll.h nor kevent are present, we should fail. if test "x$ac_cv_header_sys_epoll_h" = xno && test "x$ac_cv_func_kevent" = xno; then AC_MSG_ERROR([Both epoll and kevent unavailable on this OS]) fi if test "x$ac_cv_header_sys_epoll_h" = xyes && test "x$ac_cv_func_kevent" = xyes; then AC_MSG_ERROR([Both epoll and kevent available on this OS, please contact the maintainers to fix the code]) fi if test "x$enable_libknet_sctp" = xyes; then AC_CHECK_HEADERS([netinet/sctp.h],, [AC_MSG_ERROR(["missing required SCTP headers"])]) fi # Checks for typedefs, structures, and compiler characteristics. AC_C_INLINE AC_TYPE_PID_T AC_TYPE_SIZE_T AC_TYPE_SSIZE_T AC_TYPE_UINT8_T AC_TYPE_UINT16_T AC_TYPE_UINT32_T AC_TYPE_UINT64_T AC_TYPE_INT8_T AC_TYPE_INT16_T AC_TYPE_INT32_T AC_TYPE_INT64_T PKG_CHECK_MODULES([libqb], [libqb]) if test "x$enable_man" = "xyes"; then AC_ARG_VAR([DOXYGEN], [override doxygen executable]) AC_CHECK_PROGS([DOXYGEN], [doxygen], [no]) if test "x$DOXYGEN" = xno; then AC_MSG_ERROR(["Doxygen command not found"]) fi AC_ARG_VAR([DOXYGEN2MAN], [override doxygen2man executable]) # required to detect doxygen2man when libqb is installed # in non standard paths saved_PKG_CONFIG="$PKG_CONFIG" saved_ac_cv_path_PKG_CONFIG="$ac_cv_path_PKG_CONFIG" unset PKG_CONFIG ac_cv_path_PKG_CONFIG AC_PATH_PROG([PKG_CONFIG], [pkg-config]) PKG_CHECK_MODULES([libqb_BUILD], [libqb]) PKG_CHECK_VAR([libqb_BUILD_PREFIX], [libqb], [prefix]) AC_PATH_PROG([DOXYGEN2MAN], [doxygen2man], [no], [$libqb_BUILD_PREFIX/bin$PATH_SEPARATOR$PATH]) PKG_CONFIG="$saved_PKG_CONFIG" ac_cv_path_PKG_CONFIG="$saved_ac_cv_path_PKG_CONFIG" if test "x$DOXYGEN2MAN" = "xno"; then AC_MSG_ERROR(["doxygen2man command not found"]) fi AC_SUBST([DOXYGEN2MAN]) fi # check for rust tools to build bindings if test "x$enable_rust_bindings" = "xyes"; then AC_PATH_PROG([CARGO], [cargo], [no]) if test "x$CARGO" = xno; then AC_MSG_ERROR(["cargo command not found"]) fi AC_PATH_PROG([RUSTC], [rustc], [no]) if test "x$RUSTC" = xno; then AC_MSG_ERROR(["rustc command not found"]) fi AC_PATH_PROG([RUSTDOC], [rustdoc], [no]) if test "x$RUSTDOC" = xno; then AC_MSG_ERROR(["rustdoc command not found"]) fi AC_PATH_PROG([BINDGEN], [bindgen], [no]) if test "x$BINDGEN" = xno; then AC_MSG_ERROR(["bindgen command not found"]) fi AC_PATH_PROG([CLIPPY], [clippy-driver], [no]) if test "x$CLIPPY" = xno; then AC_MSG_ERROR(["clippy-driver command not found"]) fi AC_PATH_PROG([RUSTFMT], [rustfmt], [no]) if test "x$RUSTFMT" = xno; then AC_MSG_WARN(["rustfmt command not found (optional)"]) fi fi # checks for libnozzle if test "x$enable_libnozzle" = xyes; then if `echo $host_os | grep -q linux`; then PKG_CHECK_MODULES([libnl], [libnl-3.0]) PKG_CHECK_MODULES([libnlroute], [libnl-route-3.0 >= 3.3], [], [PKG_CHECK_MODULES([libnlroute], [libnl-route-3.0 < 3.3], [AC_DEFINE_UNQUOTED([LIBNL3_WORKAROUND], [1], [Enable libnl < 3.3 build workaround])], [])]) fi fi # https://www.gnu.org/software/libtool/manual/html_node/Updating-version-info.html knetcurrent="2" knetrevision="0" knetage="0" # c:r:a libknetversion="$knetcurrent:$knetrevision:$knetage" # soname derived from c:r:a # use $VERSION as build info https://semver.org/. build info are incremental automatically knetalpha="-alpha1" libknetrustver="$(($knetcurrent - $knetage)).$knetage.$knetrevision$knetalpha+$VERSION" nozzlecurrent="1" nozzlerevision="0" nozzleage="0" libnozzleversion="$nozzlecurrent:$nozzlerevision:$nozzleage" # nozzle is stable for now nozzlealpha="" libnozzlerustver="$(($nozzlecurrent - $nozzleage)).$nozzleage.$nozzlerevision$nozzlealpha+$VERSION" AC_SUBST([libknetversion]) AC_SUBST([libknetrustver]) AC_SUBST([libnozzleversion]) AC_SUBST([libnozzlerustver]) # local options AC_ARG_ENABLE([debug], [AS_HELP_STRING([--enable-debug],[enable debug build])]) +AC_ARG_ENABLE([onwire-v1-extra-debug], + [AS_HELP_STRING([--enable-onwire-v1-extra-debug],[enable onwire protocol v1 extra debug. WARNING: IT BREAKS ONWIRE COMPATIBILITY! DO NOT USE IN PRODUCTION!])]) + +if test "x${enable_onwire_v1_extra_debug}" = xyes; then + AC_DEFINE_UNQUOTED([ONWIRE_V1_EXTRA_DEBUG], [1], [Enable crc32 checksum for data and packets]) +fi + # for standard crc32 function (used in test suite) PKG_CHECK_MODULES([zlib], [zlib]) AC_ARG_ENABLE([hardening], [AS_HELP_STRING([--disable-hardening],[disable hardening build flags])],, [ enable_hardening="yes" ]) AC_ARG_WITH([sanitizers], [AS_HELP_STRING([--with-sanitizers=...,...], [enable SANitizer build, do *NOT* use for production. Only ASAN/UBSAN/TSAN are currently supported])], [ SANITIZERS="$withval" ], [ SANITIZERS="" ]) AC_ARG_WITH([testdir], [AS_HELP_STRING([--with-testdir=DIR],[path to /usr/lib../kronosnet/tests/ dir where to install the test suite])], [ TESTDIR="$withval" ], [ TESTDIR="$libdir/kronosnet/tests" ]) ## do subst AC_SUBST([TESTDIR]) # debug build stuff if test "x${enable_debug}" = xyes; then AC_DEFINE_UNQUOTED([DEBUG], [1], [Compiling Debugging code]) OPT_CFLAGS="-O0" FORTIFY_CFLAGS="" RUST_FLAGS="" RUST_TARGET_DIR="debug" else OPT_CFLAGS="-O3" FORTIFY_CFLAGS="-D_FORTIFY_SOURCE=2" RUST_FLAGS="--release" RUST_TARGET_DIR="release" fi # Check for availablility of hardening options annocheck=no if test "x${enable_hardening}" = xyes; then # support only gcc for now if echo $CC | grep -q gcc; then ANNOPLUGIN="-fplugin=annobin" annocheck=yes fi HARDENING_CFLAGS_ANNOCHECK="$ANNOPLUGIN -fPIC -DPIC -pie $FORTIFY_CFLAGS -fstack-protector-strong -fexceptions -D_GLIBCXX_ASSERTIONS -Wl,-z,now" HARDENING_CFLAGS="-fstack-clash-protection -fcf-protection=full -mcet -mstackrealign" EXTRA_HARDENING_CFLAGS="" # check for annobin required cflags/ldflags for j in $HARDENING_CFLAGS_ANNOCHECK; do if cc_supports_flag $j; then EXTRA_HARDENING_CFLAGS="$EXTRA_HARDENING_CFLAGS $j" else annocheck=no fi done # check for other hardening cflags/ldflags for j in $HARDENING_CFLAGS; do if cc_supports_flag $j; then EXTRA_HARDENING_CFLAGS="$EXTRA_HARDENING_CFLAGS $j" fi done # check if annocheck binary is available if test "x${annocheck}" = xyes; then AC_CHECK_PROGS([ANNOCHECK_EXEC], [annocheck]) if test "x${ANNOCHECK_EXEC}" = x; then annocheck=no fi fi AM_LDFLAGS="$AM_LDFLAGS $EXTRA_HARDENING_CFLAGS" fi if test "x${enable_debug}" = xyes; then annocheck=no fi AM_CONDITIONAL([HAS_ANNOCHECK], [test "x$annocheck" = "xyes"]) # gdb flags if test "x${GCC}" = xyes; then GDB_CFLAGS="-ggdb3" else GDB_CFLAGS="-g" fi # --- ASAN/UBSAN/TSAN (see man gcc) --- # when using SANitizers, we need to pass the -fsanitize.. # to both CFLAGS and LDFLAGS. The CFLAGS/LDFLAGS must be # specified as first in the list or there will be runtime # issues (for example user has to LD_PRELOAD asan for it to work # properly). if test -n "${SANITIZERS}"; then SANITIZERS=$(echo $SANITIZERS | sed -e 's/,/ /g') for SANITIZER in $SANITIZERS; do case $SANITIZER in asan|ASAN) SANITIZERS_CFLAGS="$SANITIZERS_CFLAGS -fsanitize=address" SANITIZERS_LDFLAGS="$SANITIZERS_LDFLAGS -fsanitize=address -lasan" AC_CHECK_LIB([asan],[main],,AC_MSG_ERROR([Unable to find libasan])) ;; ubsan|UBSAN) SANITIZERS_CFLAGS="$SANITIZERS_CFLAGS -fsanitize=undefined" SANITIZERS_LDFLAGS="$SANITIZERS_LDFLAGS -fsanitize=undefined -lubsan" AC_CHECK_LIB([ubsan],[main],,AC_MSG_ERROR([Unable to find libubsan])) ;; tsan|TSAN) SANITIZERS_CFLAGS="$SANITIZERS_CFLAGS -fsanitize=thread" SANITIZERS_LDFLAGS="$SANITIZERS_LDFLAGS -fsanitize=thread -ltsan" AC_CHECK_LIB([tsan],[main],,AC_MSG_ERROR([Unable to find libtsan])) ;; esac done fi DEFAULT_CFLAGS="-Werror -Wall -Wextra" # manual overrides # generates too much noise for stub APIs UNWANTED_CFLAGS="-Wno-unused-parameter" AC_SUBST([AM_CFLAGS],["$SANITIZERS_CFLAGS $OPT_CFLAGS $GDB_CFLAGS $DEFAULT_CFLAGS $EXTRA_HARDENING_CFLAGS $UNWANTED_CFLAGS"]) LDFLAGS="$SANITIZERS_LDFLAGS $LDFLAGS" AC_SUBST([AM_LDFLAGS]) AC_SUBST([RUST_FLAGS]) AC_SUBST([RUST_TARGET_DIR]) AX_PROG_DATE AS_IF([test "$ax_cv_prog_date_gnu_date:$ax_cv_prog_date_gnu_utc" = yes:yes], [UTC_DATE_AT="date -u -d@"], [AS_IF([test "x$ax_cv_prog_date_bsd_date" = xyes], [UTC_DATE_AT="date -u -r"], [AC_MSG_ERROR([date utility unable to convert epoch to UTC])])]) AC_SUBST([UTC_DATE_AT]) AC_ARG_VAR([SOURCE_EPOCH],[last modification date of the source]) AC_MSG_NOTICE([trying to determine source epoch]) AC_MSG_CHECKING([for source epoch in \$SOURCE_EPOCH]) AS_IF([test -n "$SOURCE_EPOCH"], [AC_MSG_RESULT([yes])], [AC_MSG_RESULT([no]) AC_MSG_CHECKING([for source epoch in source_epoch file]) AS_IF([test -e "$srcdir/source_epoch"], [read SOURCE_EPOCH <"$srcdir/source_epoch" AC_MSG_RESULT([yes])], [AC_MSG_RESULT([no]) AC_MSG_CHECKING([for source epoch baked in by gitattributes export-subst]) SOURCE_EPOCH='$Format:%at$' # template for rewriting by git-archive AS_CASE([$SOURCE_EPOCH], [?Format:*], # was not rewritten [AC_MSG_RESULT([no]) AC_MSG_CHECKING([for source epoch in \$SOURCE_DATE_EPOCH]) AS_IF([test "x$SOURCE_DATE_EPOCH" != x], [SOURCE_EPOCH="$SOURCE_DATE_EPOCH" AC_MSG_RESULT([yes])], [AC_MSG_RESULT([no]) AC_MSG_CHECKING([whether git log can provide a source epoch]) SOURCE_EPOCH=f${SOURCE_EPOCH#\$F} # convert into git log --pretty format SOURCE_EPOCH=$(cd "$srcdir" && git log -1 --pretty=${SOURCE_EPOCH%$} 2>/dev/null) AS_IF([test -n "$SOURCE_EPOCH"], [AC_MSG_RESULT([yes])], [AC_MSG_RESULT([no, using current time and breaking reproducibility]) SOURCE_EPOCH=$(date +%s)])])], [AC_MSG_RESULT([yes])] )]) ]) AC_MSG_NOTICE([using source epoch $($UTC_DATE_AT$SOURCE_EPOCH +'%F %T %Z')]) AC_CONFIG_FILES([ Makefile libnozzle/Makefile libnozzle/libnozzle.pc libnozzle/tests/Makefile libnozzle/bindings/Makefile libnozzle/bindings/rust/Makefile libnozzle/bindings/rust/Cargo.toml libnozzle/bindings/rust/tests/Makefile libnozzle/bindings/rust/tests/Cargo.toml libknet/Makefile libknet/libknet.pc libknet/tests/Makefile libknet/bindings/Makefile libknet/bindings/rust/Makefile libknet/bindings/rust/Cargo.toml libknet/bindings/rust/tests/Makefile libknet/bindings/rust/tests/Cargo.toml man/Makefile man/Doxyfile-knet man/Doxyfile-nozzle ]) if test "x$VERSION" = "xUNKNOWN"; then AC_MSG_ERROR([m4_text_wrap([ configure was unable to determine the source tree's current version. This generally happens when using git archive (or the github download button) generated tarball/zip file. In order to workaround this issue, either use git clone https://github.com/kronosnet/kronosnet.git or use an official release tarball, available at https://kronosnet.org/releases/. Alternatively you can add a compatible version in a .tarball-version file at the top of the source tree, wipe your autom4te.cache dir and generated configure, and rerun autogen.sh. ], [ ], [ ], [76])]) fi AC_OUTPUT diff --git a/libknet/onwire.h b/libknet/onwire.h index 9b2f0d37..d5a8981b 100644 --- a/libknet/onwire.h +++ b/libknet/onwire.h @@ -1,166 +1,175 @@ /* * Copyright (C) 2012-2021 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under LGPL-2.0+ */ #ifndef __KNET_ONWIRE_H__ #define __KNET_ONWIRE_H__ #include #include "libknet.h" /* * data structures to define network packets. * Start from knet_header at the bottom */ /* * Plan is to support MAX_VER with MIN_VER = MAX_VER - 1 * but for the sake of not rewriting the world later on, * let´s make sure we can support a random range of protocol * versions */ #define KNET_HEADER_ONWIRE_MAX_VER 0x01 /* max onwire protocol supported by this build */ #define KNET_HEADER_ONWIRE_MIN_VER 0x01 /* min onwire protocol supported by this build */ /* * Packet types * * adding new DATA types requires the packet to contain * data_seq_num and frag_num/frag_seq in the current data types. * * Changing those data types requires major surgery to thread_tx/thread_rx * and defrag buffer allocation in knet_host_add. * * Also please be aware that frags buffer allocation size is not constant * so you cannot assume each frag is 64K+. * (see handle.c) */ #define KNET_HEADER_TYPE_DATA 0x00 /* pure data packet */ #define KNET_HEADER_TYPE_PING 0x81 /* heartbeat */ #define KNET_HEADER_TYPE_PONG 0x82 /* reply to heartbeat */ #define KNET_HEADER_TYPE_PMTUD 0x83 /* Used to determine Path MTU */ #define KNET_HEADER_TYPE_PMTUD_REPLY 0x84 /* reply from remote host */ /* * KNET_HEADER_TYPE_DATA */ typedef uint16_t seq_num_t; /* data sequence number required to deduplicate pckts */ #define SEQ_MAX UINT16_MAX struct knet_header_payload_data_v1 { seq_num_t khp_data_seq_num; /* pckt seq number used to deduplicate pckts */ uint8_t khp_data_compress; /* identify if user data are compressed */ uint8_t khp_data_pad1; /* make sure to have space in the header to grow features */ uint8_t khp_data_bcast; /* data destination bcast/ucast */ uint8_t khp_data_frag_num; /* number of fragments of this pckt. 1 is not fragmented */ uint8_t khp_data_frag_seq; /* as above, indicates the frag sequence number */ int8_t khp_data_channel; /* transport channel data for localsock <-> knet <-> localsock mapping */ +#ifdef ONWIRE_V1_EXTRA_DEBUG + uint32_t khp_data_checksum; /* debug checksum of all user data */ +#endif uint8_t khp_data_userdata[0]; /* pointer to the real user data */ } __attribute__((packed)); #define khp_data_v1_seq_num kh_payload.khp_data_v1.khp_data_seq_num #define khp_data_v1_frag_num kh_payload.khp_data_v1.khp_data_frag_num #define khp_data_v1_frag_seq kh_payload.khp_data_v1.khp_data_frag_seq #define khp_data_v1_userdata kh_payload.khp_data_v1.khp_data_userdata #define khp_data_v1_bcast kh_payload.khp_data_v1.khp_data_bcast #define khp_data_v1_channel kh_payload.khp_data_v1.khp_data_channel #define khp_data_v1_compress kh_payload.khp_data_v1.khp_data_compress +#ifdef ONWIRE_V1_EXTRA_DEBUG +#define khp_data_v1_checksum kh_payload.khp_data_v1.khp_data_checksum +#endif /* * KNET_HEADER_TYPE_PING / KNET_HEADER_TYPE_PONG */ struct knet_header_payload_ping_v1 { uint8_t khp_ping_link; /* changing khp_ping_link requires changes to thread_rx.c KNET_LINK_DYNIP code handling */ uint32_t khp_ping_time[4]; /* ping timestamp */ seq_num_t khp_ping_seq_num; /* transport host seq_num */ uint8_t khp_ping_timed; /* timed pinged (1) or forced by seq_num (0) */ } __attribute__((packed)); #define khp_ping_v1_link kh_payload.khp_ping_v1.khp_ping_link #define khp_ping_v1_time kh_payload.khp_ping_v1.khp_ping_time #define khp_ping_v1_seq_num kh_payload.khp_ping_v1.khp_ping_seq_num #define khp_ping_v1_timed kh_payload.khp_ping_v1.khp_ping_timed /* * KNET_HEADER_TYPE_PMTUD / KNET_HEADER_TYPE_PMTUD_REPLY */ /* * taken from tracepath6 */ #define KNET_PMTUD_SIZE_V4 65535 #define KNET_PMTUD_SIZE_V6 KNET_PMTUD_SIZE_V4 /* * IPv4/IPv6 header size */ #define KNET_PMTUD_OVERHEAD_V4 20 #define KNET_PMTUD_OVERHEAD_V6 40 #define KNET_PMTUD_MIN_MTU_V4 576 #define KNET_PMTUD_MIN_MTU_V6 1280 struct knet_header_payload_pmtud_v1 { uint8_t khp_pmtud_link; /* link_id */ uint16_t khp_pmtud_size; /* size of the current packet */ uint8_t khp_pmtud_data[0]; /* pointer to empty/random data/fill buffer */ } __attribute__((packed)); #define khp_pmtud_v1_link kh_payload.khp_pmtud_v1.khp_pmtud_link #define khp_pmtud_v1_size kh_payload.khp_pmtud_v1.khp_pmtud_size #define khp_pmtud_v1_data kh_payload.khp_pmtud_v1.khp_pmtud_data /* * PMTUd related functions */ size_t calc_data_outlen(knet_handle_t knet_h, size_t inlen); size_t calc_max_data_outlen(knet_handle_t knet_h, size_t inlen); size_t calc_min_mtu(knet_handle_t knet_h); /* * union to reference possible individual payloads */ union knet_header_payload { struct knet_header_payload_data_v1 khp_data_v1; /* pure data packet struct */ struct knet_header_payload_ping_v1 khp_ping_v1; /* heartbeat packet struct */ struct knet_header_payload_pmtud_v1 khp_pmtud_v1; /* Path MTU discovery packet struct */ } __attribute__((packed)); /* * this header CANNOT change or onwire compat will break! */ struct knet_header { uint8_t kh_version; /* this pckt format/version */ uint8_t kh_type; /* from above defines. Tells what kind of pckt it is */ knet_node_id_t kh_node; /* host id of the source host for this pckt */ uint8_t kh_max_ver; /* max version of the protocol supported by this node */ uint8_t kh_pad1; /* make sure to have space in the header to grow features */ +#ifdef ONWIRE_V1_EXTRA_DEBUG + uint32_t kh_checksum;/* debug checksum per packet */ +#endif union knet_header_payload kh_payload; /* union of potential data struct based on kh_type */ } __attribute__((packed)); /* * extra defines to avoid mingling with sizeof() too much */ #define KNET_HEADER_ALL_SIZE sizeof(struct knet_header) #define KNET_HEADER_SIZE (KNET_HEADER_ALL_SIZE - sizeof(union knet_header_payload)) #define KNET_HEADER_PING_V1_SIZE (KNET_HEADER_SIZE + sizeof(struct knet_header_payload_ping_v1)) #define KNET_HEADER_PMTUD_V1_SIZE (KNET_HEADER_SIZE + sizeof(struct knet_header_payload_pmtud_v1)) #define KNET_HEADER_DATA_V1_SIZE (KNET_HEADER_SIZE + sizeof(struct knet_header_payload_data_v1)) #endif diff --git a/libknet/onwire_v1.c b/libknet/onwire_v1.c index d820286e..c2b52532 100644 --- a/libknet/onwire_v1.c +++ b/libknet/onwire_v1.c @@ -1,216 +1,248 @@ /* * Copyright (C) 2020-2021 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * * This software licensed under LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include #include "logging.h" #include "host.h" #include "links.h" #include "onwire_v1.h" int prep_ping_v1(knet_handle_t knet_h, struct knet_link *dst_link, uint8_t onwire_ver, struct timespec clock_now, int timed, ssize_t *outlen) { *outlen = KNET_HEADER_PING_V1_SIZE; /* preparing ping buffer */ knet_h->pingbuf->kh_version = onwire_ver; knet_h->pingbuf->kh_max_ver = knet_h->onwire_max_ver; knet_h->pingbuf->kh_type = KNET_HEADER_TYPE_PING; knet_h->pingbuf->kh_node = htons(knet_h->host_id); knet_h->pingbuf->khp_ping_v1_link = dst_link->link_id; knet_h->pingbuf->khp_ping_v1_timed = timed; memmove(&knet_h->pingbuf->khp_ping_v1_time[0], &clock_now, sizeof(struct timespec)); if (pthread_mutex_lock(&knet_h->tx_seq_num_mutex)) { log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to get seq mutex lock"); return -1; } knet_h->pingbuf->khp_ping_v1_seq_num = htons(knet_h->tx_seq_num); pthread_mutex_unlock(&knet_h->tx_seq_num_mutex); +#ifdef ONWIRE_V1_EXTRA_DEBUG + knet_h->pingbuf->kh_checksum = 0; + knet_h->pingbuf->kh_checksum = compute_chksum((const unsigned char*)knet_h->pingbuf, KNET_HEADER_PING_V1_SIZE); +#endif + return 0; } void prep_pong_v1(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *outlen) { *outlen = KNET_HEADER_PING_V1_SIZE; inbuf->kh_type = KNET_HEADER_TYPE_PONG; inbuf->kh_node = htons(knet_h->host_id); + +#ifdef ONWIRE_V1_EXTRA_DEBUG + inbuf->kh_checksum = 0; + inbuf->kh_checksum = compute_chksum((const unsigned char*)inbuf, KNET_HEADER_PING_V1_SIZE); +#endif } void process_ping_v1(knet_handle_t knet_h, struct knet_host *src_host, struct knet_link *src_link, struct knet_header *inbuf, ssize_t len) { int wipe_bufs = 0; seq_num_t recv_seq_num = ntohs(inbuf->khp_ping_v1_seq_num); if (!inbuf->khp_ping_v1_timed) { /* * we might be receiving this message from all links, but we want * to process it only the first time */ if (recv_seq_num != src_host->untimed_rx_seq_num) { /* * cache the untimed seq num */ src_host->untimed_rx_seq_num = recv_seq_num; /* * if the host has received data in between * untimed ping, then we don't need to wipe the bufs */ if (src_host->got_data) { src_host->got_data = 0; wipe_bufs = 0; } else { wipe_bufs = 1; } } _seq_num_lookup(knet_h, src_host, recv_seq_num, 0, wipe_bufs); } else { /* * pings always arrives in bursts over all the link * catch the first of them to cache the seq num and * avoid duplicate processing */ if (recv_seq_num != src_host->timed_rx_seq_num) { src_host->timed_rx_seq_num = recv_seq_num; if (recv_seq_num == 0) { _seq_num_lookup(knet_h, src_host, recv_seq_num, 0, 1); } } } } void process_pong_v1(knet_handle_t knet_h, struct knet_host *src_host, struct knet_link *src_link, struct knet_header *inbuf, struct timespec *recvtime) { memmove(recvtime, &inbuf->khp_ping_v1_time[0], sizeof(struct timespec)); } struct knet_link *get_link_from_pong_v1(knet_handle_t knet_h, struct knet_host *src_host, struct knet_header *inbuf) { return &src_host->link[inbuf->khp_ping_v1_link]; } -void prep_pmtud_v1(knet_handle_t knet_h, struct knet_link *dst_link, uint8_t onwire_ver, size_t onwire_len) +void prep_pmtud_v1(knet_handle_t knet_h, struct knet_link *dst_link, uint8_t onwire_ver, size_t onwire_len, size_t data_len) { knet_h->pmtudbuf->kh_version = onwire_ver; knet_h->pmtudbuf->kh_max_ver = knet_h->onwire_max_ver; knet_h->pmtudbuf->kh_type = KNET_HEADER_TYPE_PMTUD; knet_h->pmtudbuf->kh_node = htons(knet_h->host_id); knet_h->pmtudbuf->khp_pmtud_v1_link = dst_link->link_id; knet_h->pmtudbuf->khp_pmtud_v1_size = onwire_len; + +#ifdef ONWIRE_V1_EXTRA_DEBUG + knet_h->pmtudbuf->kh_checksum = 0; + knet_h->pmtudbuf->kh_checksum = compute_chksum((const unsigned char*)knet_h->pmtudbuf, data_len); +#endif } void prep_pmtud_reply_v1(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *outlen) { *outlen = KNET_HEADER_PMTUD_V1_SIZE; inbuf->kh_type = KNET_HEADER_TYPE_PMTUD_REPLY; inbuf->kh_node = htons(knet_h->host_id); + +#ifdef ONWIRE_V1_EXTRA_DEBUG + inbuf->kh_checksum = 0; + inbuf->kh_checksum = compute_chksum((const unsigned char*)inbuf, KNET_HEADER_PMTUD_V1_SIZE); +#endif } void process_pmtud_reply_v1(knet_handle_t knet_h, struct knet_link *src_link, struct knet_header *inbuf) { src_link->last_recv_mtu = inbuf->khp_pmtud_v1_size; } void prep_tx_bufs_v1(knet_handle_t knet_h, - struct knet_header *inbuf, unsigned char *data, size_t inlen, unsigned int temp_data_mtu, + struct knet_header *inbuf, unsigned char *data, size_t inlen, uint32_t data_checksum, unsigned int temp_data_mtu, seq_num_t tx_seq_num, int8_t channel, int bcast, int data_compressed, int *msgs_to_send, struct iovec iov_out[PCKT_FRAG_MAX][2], int *iovcnt_out) { uint8_t frag_idx = 0; size_t frag_len = inlen; /* * prepare the main header */ inbuf->kh_type = KNET_HEADER_TYPE_DATA; inbuf->kh_version = 1; inbuf->kh_max_ver = knet_h->onwire_max_ver; inbuf->kh_node = htons(knet_h->host_id); /* * prepare the data header */ inbuf->khp_data_v1_frag_seq = 0; inbuf->khp_data_v1_bcast = bcast; inbuf->khp_data_v1_frag_num = ceil((float)inlen / temp_data_mtu); inbuf->khp_data_v1_channel = channel; inbuf->khp_data_v1_seq_num = htons(tx_seq_num); if (data_compressed) { inbuf->khp_data_v1_compress = knet_h->compress_model; } else { inbuf->khp_data_v1_compress = 0; } - +#ifdef ONWIRE_V1_EXTRA_DEBUG + inbuf->khp_data_v1_checksum = data_checksum; +#endif /* * handle fragmentation */ if (inbuf->khp_data_v1_frag_num > 1) { while (frag_idx < inbuf->khp_data_v1_frag_num) { /* * set the iov_base */ iov_out[frag_idx][0].iov_base = (void *)knet_h->send_to_links_buf[frag_idx]; iov_out[frag_idx][0].iov_len = KNET_HEADER_DATA_V1_SIZE; iov_out[frag_idx][1].iov_base = data + (temp_data_mtu * frag_idx); /* * set the len */ if (frag_len > temp_data_mtu) { iov_out[frag_idx][1].iov_len = temp_data_mtu; } else { iov_out[frag_idx][1].iov_len = frag_len; } /* * copy the frag info on all buffers */ memmove(knet_h->send_to_links_buf[frag_idx], inbuf, KNET_HEADER_DATA_V1_SIZE); /* * bump the frag */ knet_h->send_to_links_buf[frag_idx]->khp_data_v1_frag_seq = frag_idx + 1; +#ifdef ONWIRE_V1_EXTRA_DEBUG + knet_h->send_to_links_buf[frag_idx]->kh_checksum = 0; + knet_h->send_to_links_buf[frag_idx]->kh_checksum = compute_chksumv(iov_out[frag_idx], 2); +#endif + frag_len = frag_len - temp_data_mtu; frag_idx++; } *iovcnt_out = 2; } else { iov_out[frag_idx][0].iov_base = (void *)inbuf; iov_out[frag_idx][0].iov_len = frag_len + KNET_HEADER_DATA_V1_SIZE; *iovcnt_out = 1; + +#ifdef ONWIRE_V1_EXTRA_DEBUG + inbuf->kh_checksum = 0; + inbuf->kh_checksum = compute_chksumv(iov_out[frag_idx], 1); +#endif } *msgs_to_send = inbuf->khp_data_v1_frag_num; } unsigned char *get_data_v1(knet_handle_t knet_h, struct knet_header *inbuf) { return inbuf->khp_data_v1_userdata; } void get_data_header_info_v1(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *header_size, int8_t *channel, seq_num_t *seq_num, uint8_t *decompress_type, uint8_t *frags, uint8_t *frag_seq) { *header_size = KNET_HEADER_DATA_V1_SIZE; *channel = inbuf->khp_data_v1_channel; *seq_num = ntohs(inbuf->khp_data_v1_seq_num); *decompress_type = inbuf->khp_data_v1_compress; *frags = inbuf->khp_data_v1_frag_num; *frag_seq = inbuf->khp_data_v1_frag_seq; } diff --git a/libknet/onwire_v1.h b/libknet/onwire_v1.h index c802032a..270c98b1 100644 --- a/libknet/onwire_v1.h +++ b/libknet/onwire_v1.h @@ -1,37 +1,37 @@ /* * Copyright (C) 2020-2021 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * * This software licensed under LGPL-2.0+ */ #ifndef __KNET_ONWIRE_V1_H__ #define __KNET_ONWIRE_V1_H__ #include #include "internals.h" int prep_ping_v1(knet_handle_t knet_h, struct knet_link *dst_link, uint8_t onwire_ver, struct timespec clock_now, int timed, ssize_t *outlen); void prep_pong_v1(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *outlen); void process_ping_v1(knet_handle_t knet_h, struct knet_host *src_host, struct knet_link *src_link, struct knet_header *inbuf, ssize_t len); void process_pong_v1(knet_handle_t knet_h, struct knet_host *src_host, struct knet_link *src_link, struct knet_header *inbuf, struct timespec *recvtime); struct knet_link *get_link_from_pong_v1(knet_handle_t knet_h, struct knet_host *src_host, struct knet_header *inbuf); -void prep_pmtud_v1(knet_handle_t knet_h, struct knet_link *dst_link, uint8_t onwire_ver, size_t onwire_len); +void prep_pmtud_v1(knet_handle_t knet_h, struct knet_link *dst_link, uint8_t onwire_ver, size_t onwire_len, size_t data_len); void prep_pmtud_reply_v1(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *outlen); void process_pmtud_reply_v1(knet_handle_t knet_h, struct knet_link *src_link, struct knet_header *inbuf); void prep_tx_bufs_v1(knet_handle_t knet_h, - struct knet_header *inbuf, unsigned char *data, size_t inlen, unsigned int temp_data_mtu, + struct knet_header *inbuf, unsigned char *data, size_t inlen, uint32_t data_checksum, unsigned int temp_data_mtu, seq_num_t tx_seq_num, int8_t channel, int bcast, int data_compressed, int *msgs_to_send, struct iovec iov_out[PCKT_FRAG_MAX][2], int *iovcnt_out); unsigned char *get_data_v1(knet_handle_t knet_h, struct knet_header *inbuf); void get_data_header_info_v1(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *header_size, int8_t *channel, seq_num_t *seq_num, uint8_t *decompress_type, uint8_t *frags, uint8_t *frag_seq); #endif diff --git a/libknet/threads_pmtud.c b/libknet/threads_pmtud.c index 18ecd91f..eee9a9d3 100644 --- a/libknet/threads_pmtud.c +++ b/libknet/threads_pmtud.c @@ -1,938 +1,938 @@ /* * Copyright (C) 2015-2021 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under LGPL-2.0+ */ #include "config.h" #include #include #include #include #include "crypto.h" #include "links.h" #include "host.h" #include "logging.h" #include "transports.h" #include "threads_common.h" #include "threads_pmtud.h" #include "onwire_v1.h" static int _calculate_manual_mtu(knet_handle_t knet_h, struct knet_link *dst_link) { size_t ipproto_overhead_len; /* onwire packet overhead (protocol based) */ switch (dst_link->dst_addr.ss_family) { case AF_INET6: ipproto_overhead_len = KNET_PMTUD_OVERHEAD_V6 + dst_link->proto_overhead; break; case AF_INET: ipproto_overhead_len = KNET_PMTUD_OVERHEAD_V4 + dst_link->proto_overhead; break; default: log_debug(knet_h, KNET_SUB_PMTUD, "unknown protocol"); return 0; break; } dst_link->status.mtu = calc_max_data_outlen(knet_h, knet_h->manual_mtu - ipproto_overhead_len); return 1; } static int _handle_check_link_pmtud(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link) { int err, ret, savederrno, mutex_retry_limit, failsafe, use_kernel_mtu, warn_once; uint32_t kernel_mtu; /* record kernel_mtu from EMSGSIZE */ size_t onwire_len; /* current packet onwire size */ size_t ipproto_overhead_len; /* onwire packet overhead (protocol based) */ size_t max_mtu_len; /* max mtu for protocol */ size_t data_len; /* how much data we can send in the packet * generally would be onwire_len - ipproto_overhead_len * needs to be adjusted for crypto */ size_t app_mtu_len; /* real data that we can send onwire */ ssize_t len; /* len of what we were able to sendto onwire */ uint8_t onwire_ver; struct timespec ts, pmtud_crypto_start_ts, pmtud_crypto_stop_ts; unsigned long long pong_timeout_adj_tmp, timediff; int pmtud_crypto_reduce = 1; unsigned char *outbuf = (unsigned char *)knet_h->pmtudbuf; warn_once = 0; mutex_retry_limit = 0; failsafe = 0; switch (dst_link->dst_addr.ss_family) { case AF_INET6: max_mtu_len = KNET_PMTUD_SIZE_V6; ipproto_overhead_len = KNET_PMTUD_OVERHEAD_V6 + dst_link->proto_overhead; break; case AF_INET: max_mtu_len = KNET_PMTUD_SIZE_V4; ipproto_overhead_len = KNET_PMTUD_OVERHEAD_V4 + dst_link->proto_overhead; break; default: log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD aborted, unknown protocol"); return -1; break; } dst_link->last_bad_mtu = 0; dst_link->last_good_mtu = dst_link->last_ping_size + ipproto_overhead_len; /* * discovery starts from the top because kernel will * refuse to send packets > current iface mtu. * this saves us some time and network bw. */ onwire_len = max_mtu_len; /* * cache onwire version for this link / run */ if (pthread_mutex_lock(&knet_h->onwire_mutex)) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get onwire mutex lock"); return -1; } onwire_ver = knet_h->onwire_ver; pthread_mutex_unlock(&knet_h->onwire_mutex); restart: /* * prevent a race when interface mtu is changed _exactly_ during * the discovery process and it's complex to detect. Easier * to wait the next loop. * 30 is not an arbitrary value. To bisect from 576 to 128000 doesn't * take more than 18/19 steps. */ if (failsafe == 30) { log_err(knet_h, KNET_SUB_PMTUD, "Aborting PMTUD process: Too many attempts. MTU might have changed during discovery."); return -1; } else { failsafe++; } /* * common to all packets */ /* * calculate the application MTU based on current onwire_len minus ipproto_overhead_len */ app_mtu_len = calc_max_data_outlen(knet_h, onwire_len - ipproto_overhead_len); /* * recalculate onwire len back that might be different based * on data padding from crypto layer. */ onwire_len = calc_data_outlen(knet_h, app_mtu_len + KNET_HEADER_ALL_SIZE) + ipproto_overhead_len; /* * calculate the size of what we need to send to sendto(2). * see also onwire.c for packet format explanation. */ data_len = app_mtu_len + knet_h->sec_hash_size + knet_h->sec_salt_size + KNET_HEADER_ALL_SIZE; if (knet_h->onwire_ver_remap) { - prep_pmtud_v1(knet_h, dst_link, onwire_ver, onwire_len); + prep_pmtud_v1(knet_h, dst_link, onwire_ver, onwire_len, app_mtu_len + KNET_HEADER_ALL_SIZE); } else { switch (onwire_ver) { case 1: - prep_pmtud_v1(knet_h, dst_link, onwire_ver, onwire_len); + prep_pmtud_v1(knet_h, dst_link, onwire_ver, onwire_len, app_mtu_len + KNET_HEADER_ALL_SIZE); break; default: log_warn(knet_h, KNET_SUB_PMTUD, "preparing PMTUD onwire version %u not supported", onwire_ver); return -1; break; } } if (knet_h->crypto_in_use_config) { if (data_len < (knet_h->sec_hash_size + knet_h->sec_salt_size) + 1) { log_debug(knet_h, KNET_SUB_PMTUD, "Aborting PMTUD process: link mtu smaller than crypto header detected (link might have been disconnected)"); return -1; } if (crypto_encrypt_and_sign(knet_h, (const unsigned char *)knet_h->pmtudbuf, data_len - (knet_h->sec_hash_size + knet_h->sec_salt_size), knet_h->pmtudbuf_crypt, (ssize_t *)&data_len) < 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to crypto pmtud packet"); return -1; } outbuf = knet_h->pmtudbuf_crypt; if (pthread_mutex_lock(&knet_h->handle_stats_mutex) < 0) { log_err(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock"); return -1; } knet_h->stats_extra.tx_crypt_pmtu_packets++; pthread_mutex_unlock(&knet_h->handle_stats_mutex); } /* link has gone down, aborting pmtud */ if (dst_link->status.connected != 1) { log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD detected host (%u) link (%u) has been disconnected", dst_host->host_id, dst_link->link_id); return -1; } if (dst_link->transport_connected != 1) { log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD detected host (%u) link (%u) has been disconnected", dst_host->host_id, dst_link->link_id); return -1; } if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock"); return -1; } if (knet_h->pmtud_abort) { pthread_mutex_unlock(&knet_h->pmtud_mutex); errno = EDEADLK; return -1; } savederrno = pthread_mutex_lock(&knet_h->tx_mutex); if (savederrno) { pthread_mutex_unlock(&knet_h->pmtud_mutex); log_err(knet_h, KNET_SUB_PMTUD, "Unable to get TX mutex lock: %s", strerror(savederrno)); return -1; } savederrno = pthread_mutex_lock(&dst_link->link_stats_mutex); if (savederrno) { pthread_mutex_unlock(&knet_h->pmtud_mutex); pthread_mutex_unlock(&knet_h->tx_mutex); log_err(knet_h, KNET_SUB_PMTUD, "Unable to get stats mutex lock for host %u link %u: %s", dst_host->host_id, dst_link->link_id, strerror(savederrno)); return -1; } retry: if (transport_get_connection_oriented(knet_h, dst_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) { len = sendto(dst_link->outsock, outbuf, data_len, MSG_DONTWAIT | MSG_NOSIGNAL, (struct sockaddr *) &dst_link->dst_addr, knet_h->knet_transport_fd_tracker[dst_link->outsock].sockaddr_len); } else { len = sendto(dst_link->outsock, outbuf, data_len, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0); } savederrno = errno; /* * we cannot hold a lock on kmtu_mutex between resetting * knet_h->kernel_mtu here and below where it's used. * use_kernel_mtu tells us if the knet_h->kernel_mtu was * set to 0 and we can trust its value later. */ use_kernel_mtu = 0; if (pthread_mutex_lock(&knet_h->kmtu_mutex) == 0) { use_kernel_mtu = 1; knet_h->kernel_mtu = 0; pthread_mutex_unlock(&knet_h->kmtu_mutex); } kernel_mtu = 0; err = transport_tx_sock_error(knet_h, dst_link->transport, dst_link->outsock, len, savederrno); switch(err) { case KNET_TRANSPORT_SOCK_ERROR_INTERNAL: log_debug(knet_h, KNET_SUB_PMTUD, "Unable to send pmtu packet (sendto): %d %s", savederrno, strerror(savederrno)); pthread_mutex_unlock(&knet_h->tx_mutex); pthread_mutex_unlock(&knet_h->pmtud_mutex); dst_link->status.stats.tx_pmtu_errors++; pthread_mutex_unlock(&dst_link->link_stats_mutex); return -1; break; case KNET_TRANSPORT_SOCK_ERROR_IGNORE: break; case KNET_TRANSPORT_SOCK_ERROR_RETRY: dst_link->status.stats.tx_pmtu_retries++; goto retry; break; } pthread_mutex_unlock(&knet_h->tx_mutex); if (len != (ssize_t )data_len) { pthread_mutex_unlock(&dst_link->link_stats_mutex); if (savederrno == EMSGSIZE) { /* * we cannot hold a lock on kmtu_mutex between resetting * knet_h->kernel_mtu and here. * use_kernel_mtu tells us if the knet_h->kernel_mtu was * set to 0 previously and we can trust its value now. */ if (use_kernel_mtu) { use_kernel_mtu = 0; if (pthread_mutex_lock(&knet_h->kmtu_mutex) == 0) { kernel_mtu = knet_h->kernel_mtu; pthread_mutex_unlock(&knet_h->kmtu_mutex); } } if (kernel_mtu > 0) { dst_link->last_bad_mtu = kernel_mtu + 1; } else { dst_link->last_bad_mtu = onwire_len; } } else { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to send pmtu packet len: %zu err: %s", onwire_len, strerror(savederrno)); } } else { dst_link->last_sent_mtu = onwire_len; dst_link->last_recv_mtu = 0; dst_link->status.stats.tx_pmtu_packets++; dst_link->status.stats.tx_pmtu_bytes += data_len; pthread_mutex_unlock(&dst_link->link_stats_mutex); if (clock_gettime(CLOCK_REALTIME, &ts) < 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get current time: %s", strerror(errno)); pthread_mutex_unlock(&knet_h->pmtud_mutex); return -1; } /* * non fatal, we can wait the next round to reduce the * multiplier */ if (clock_gettime(CLOCK_MONOTONIC, &pmtud_crypto_start_ts) < 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get current time: %s", strerror(errno)); pmtud_crypto_reduce = 0; } /* * set PMTUd reply timeout to match pong_timeout on a given link * * math: internally pong_timeout is expressed in microseconds, while * the public API exports milliseconds. So careful with the 0's here. * the loop is necessary because we are grabbing the current time just above * and add values to it that could overflow into seconds. */ if (pthread_mutex_lock(&knet_h->backoff_mutex)) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get backoff_mutex"); pthread_mutex_unlock(&knet_h->pmtud_mutex); return -1; } if (knet_h->crypto_in_use_config) { /* * crypto, under pressure, is a royal PITA */ pong_timeout_adj_tmp = dst_link->pong_timeout_adj * dst_link->pmtud_crypto_timeout_multiplier; } else { pong_timeout_adj_tmp = dst_link->pong_timeout_adj; } ts.tv_sec += pong_timeout_adj_tmp / 1000000; ts.tv_nsec += (((pong_timeout_adj_tmp) % 1000000) * 1000); while (ts.tv_nsec > 1000000000) { ts.tv_sec += 1; ts.tv_nsec -= 1000000000; } pthread_mutex_unlock(&knet_h->backoff_mutex); knet_h->pmtud_waiting = 1; ret = pthread_cond_timedwait(&knet_h->pmtud_cond, &knet_h->pmtud_mutex, &ts); knet_h->pmtud_waiting = 0; if (knet_h->pmtud_abort) { pthread_mutex_unlock(&knet_h->pmtud_mutex); errno = EDEADLK; return -1; } /* * we cannot use shutdown_in_progress in here because * we already hold the read lock */ if (knet_h->fini_in_progress) { pthread_mutex_unlock(&knet_h->pmtud_mutex); log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD aborted. shutdown in progress"); return -1; } if (ret) { if (ret == ETIMEDOUT) { if ((knet_h->crypto_in_use_config) && (dst_link->pmtud_crypto_timeout_multiplier < KNET_LINK_PMTUD_CRYPTO_TIMEOUT_MULTIPLIER_MAX)) { dst_link->pmtud_crypto_timeout_multiplier = dst_link->pmtud_crypto_timeout_multiplier * 2; pmtud_crypto_reduce = 0; log_debug(knet_h, KNET_SUB_PMTUD, "Increasing PMTUd response timeout multiplier to (%u) for host %u link: %u", dst_link->pmtud_crypto_timeout_multiplier, dst_host->host_id, dst_link->link_id); pthread_mutex_unlock(&knet_h->pmtud_mutex); goto restart; } if (!warn_once) { log_warn(knet_h, KNET_SUB_PMTUD, "possible MTU misconfiguration detected. " "kernel is reporting MTU: %u bytes for " "host %u link %u but the other node is " "not acknowledging packets of this size. ", dst_link->last_sent_mtu, dst_host->host_id, dst_link->link_id); log_warn(knet_h, KNET_SUB_PMTUD, "This can be caused by this node interface MTU " "too big or a network device that does not " "support or has been misconfigured to manage MTU " "of this size, or packet loss. knet will continue " "to run but performances might be affected."); warn_once = 1; } } else { pthread_mutex_unlock(&knet_h->pmtud_mutex); if (mutex_retry_limit == 3) { log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD aborted, unable to get mutex lock"); return -1; } mutex_retry_limit++; goto restart; } } if ((knet_h->crypto_in_use_config) && (pmtud_crypto_reduce == 1) && (dst_link->pmtud_crypto_timeout_multiplier > KNET_LINK_PMTUD_CRYPTO_TIMEOUT_MULTIPLIER_MIN)) { if (!clock_gettime(CLOCK_MONOTONIC, &pmtud_crypto_stop_ts)) { timespec_diff(pmtud_crypto_start_ts, pmtud_crypto_stop_ts, &timediff); if (((pong_timeout_adj_tmp * 1000) / 2) > timediff) { dst_link->pmtud_crypto_timeout_multiplier = dst_link->pmtud_crypto_timeout_multiplier / 2; log_debug(knet_h, KNET_SUB_PMTUD, "Decreasing PMTUd response timeout multiplier to (%u) for host %u link: %u", dst_link->pmtud_crypto_timeout_multiplier, dst_host->host_id, dst_link->link_id); } } else { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get current time: %s", strerror(errno)); } } if ((dst_link->last_recv_mtu != onwire_len) || (ret)) { dst_link->last_bad_mtu = onwire_len; } else { int found_mtu = 0; if (knet_h->sec_block_size) { if ((onwire_len + knet_h->sec_block_size >= max_mtu_len) || ((dst_link->last_bad_mtu) && (dst_link->last_bad_mtu <= (onwire_len + knet_h->sec_block_size)))) { found_mtu = 1; } } else { if ((onwire_len == max_mtu_len) || ((dst_link->last_bad_mtu) && (dst_link->last_bad_mtu == (onwire_len + 1))) || (dst_link->last_bad_mtu == dst_link->last_good_mtu)) { found_mtu = 1; } } if (found_mtu) { /* * account for IP overhead, knet headers and crypto in PMTU calculation */ dst_link->status.mtu = calc_max_data_outlen(knet_h, onwire_len - ipproto_overhead_len); pthread_mutex_unlock(&knet_h->pmtud_mutex); return 0; } dst_link->last_good_mtu = onwire_len; } } if (kernel_mtu) { onwire_len = kernel_mtu; } else { onwire_len = (dst_link->last_good_mtu + dst_link->last_bad_mtu) / 2; } pthread_mutex_unlock(&knet_h->pmtud_mutex); goto restart; } static int _handle_check_pmtud(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link, int force_run) { uint8_t saved_valid_pmtud; unsigned int saved_pmtud; struct timespec clock_now; unsigned long long diff_pmtud, interval; if (clock_gettime(CLOCK_MONOTONIC, &clock_now) != 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get monotonic clock"); return 0; } if (!force_run) { interval = knet_h->pmtud_interval * 1000000000llu; /* nanoseconds */ timespec_diff(dst_link->pmtud_last, clock_now, &diff_pmtud); if (diff_pmtud < interval) { return dst_link->has_valid_mtu; } } /* * status.proto_overhead should include all IP/(UDP|SCTP)/knet headers * * please note that it is not the same as link->proto_overhead that * includes only either UDP or SCTP (at the moment) overhead. */ switch (dst_link->dst_addr.ss_family) { case AF_INET6: dst_link->status.proto_overhead = KNET_PMTUD_OVERHEAD_V6 + dst_link->proto_overhead + KNET_HEADER_ALL_SIZE + knet_h->sec_hash_size + knet_h->sec_salt_size; break; case AF_INET: dst_link->status.proto_overhead = KNET_PMTUD_OVERHEAD_V4 + dst_link->proto_overhead + KNET_HEADER_ALL_SIZE + knet_h->sec_hash_size + knet_h->sec_salt_size; break; } saved_pmtud = dst_link->status.mtu; saved_valid_pmtud = dst_link->has_valid_mtu; log_debug(knet_h, KNET_SUB_PMTUD, "Starting PMTUD for host: %u link: %u", dst_host->host_id, dst_link->link_id); errno = 0; if (_handle_check_link_pmtud(knet_h, dst_host, dst_link) < 0) { if (errno == EDEADLK) { log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD for host: %u link: %u has been rescheduled", dst_host->host_id, dst_link->link_id); dst_link->status.mtu = saved_pmtud; dst_link->has_valid_mtu = saved_valid_pmtud; errno = EDEADLK; return dst_link->has_valid_mtu; } dst_link->has_valid_mtu = 0; } else { if (dst_link->status.mtu < calc_min_mtu(knet_h)) { log_info(knet_h, KNET_SUB_PMTUD, "Invalid MTU detected for host: %u link: %u mtu: %u", dst_host->host_id, dst_link->link_id, dst_link->status.mtu); dst_link->has_valid_mtu = 0; } else { dst_link->has_valid_mtu = 1; } if (dst_link->has_valid_mtu) { if ((saved_pmtud) && (saved_pmtud != dst_link->status.mtu)) { log_info(knet_h, KNET_SUB_PMTUD, "PMTUD link change for host: %u link: %u from %u to %u", dst_host->host_id, dst_link->link_id, saved_pmtud, dst_link->status.mtu); } log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD completed for host: %u link: %u current link mtu: %u", dst_host->host_id, dst_link->link_id, dst_link->status.mtu); /* * set pmtud_last, if we can, after we are done with the PMTUd process * because it can take a very long time. */ dst_link->pmtud_last = clock_now; if (!clock_gettime(CLOCK_MONOTONIC, &clock_now)) { dst_link->pmtud_last = clock_now; } } } if (saved_valid_pmtud != dst_link->has_valid_mtu) { _host_dstcache_update_async(knet_h, dst_host); } return dst_link->has_valid_mtu; } void *_handle_pmtud_link_thread(void *data) { knet_handle_t knet_h = (knet_handle_t) data; struct knet_host *dst_host; struct knet_link *dst_link; int link_idx; unsigned int have_mtu; unsigned int lower_mtu; int link_has_mtu; int force_run = 0; set_thread_status(knet_h, KNET_THREAD_PMTUD, KNET_THREAD_STARTED); knet_h->data_mtu = calc_min_mtu(knet_h); while (!shutdown_in_progress(knet_h)) { usleep(knet_h->threads_timer_res); if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock"); continue; } knet_h->pmtud_abort = 0; knet_h->pmtud_running = 1; force_run = knet_h->pmtud_forcerun; knet_h->pmtud_forcerun = 0; pthread_mutex_unlock(&knet_h->pmtud_mutex); if (force_run) { log_debug(knet_h, KNET_SUB_PMTUD, "PMTUd request to rerun has been received"); } if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get read lock"); continue; } lower_mtu = KNET_PMTUD_SIZE_V4; have_mtu = 0; for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { dst_link = &dst_host->link[link_idx]; if ((dst_link->status.enabled != 1) || (dst_link->status.connected != 1) || (dst_host->link[link_idx].transport == KNET_TRANSPORT_LOOPBACK) || (!dst_link->last_ping_size) || ((dst_link->dynamic == KNET_LINK_DYNIP) && (dst_link->status.dynconnected != 1))) continue; if (!knet_h->manual_mtu) { link_has_mtu = _handle_check_pmtud(knet_h, dst_host, dst_link, force_run); if (errno == EDEADLK) { goto out_unlock; } if (link_has_mtu) { have_mtu = 1; if (dst_link->status.mtu < lower_mtu) { lower_mtu = dst_link->status.mtu; } } } else { link_has_mtu = _calculate_manual_mtu(knet_h, dst_link); if (link_has_mtu) { have_mtu = 1; if (dst_link->status.mtu < lower_mtu) { lower_mtu = dst_link->status.mtu; } } } } } if (have_mtu) { if (knet_h->data_mtu != lower_mtu) { knet_h->data_mtu = lower_mtu; log_info(knet_h, KNET_SUB_PMTUD, "Global data MTU changed to: %u", knet_h->data_mtu); if (knet_h->pmtud_notify_fn) { knet_h->pmtud_notify_fn(knet_h->pmtud_notify_fn_private_data, knet_h->data_mtu); } } } out_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock"); } else { knet_h->pmtud_running = 0; pthread_mutex_unlock(&knet_h->pmtud_mutex); } } set_thread_status(knet_h, KNET_THREAD_PMTUD, KNET_THREAD_STOPPED); return NULL; } static void send_pmtud_reply(knet_handle_t knet_h, struct knet_link *src_link, struct knet_header *inbuf) { int err = 0, savederrno = 0, stats_err = 0; unsigned char *outbuf = (unsigned char *)inbuf; ssize_t len, outlen; if (knet_h->onwire_ver_remap) { prep_pmtud_reply_v1(knet_h, inbuf, &outlen); } else { switch (inbuf->kh_version) { case 1: prep_pmtud_reply_v1(knet_h, inbuf, &outlen); break; default: log_warn(knet_h, KNET_SUB_PMTUD, "preparing PMTUD reply onwire version %u not supported", inbuf->kh_version); return; break; } } if (knet_h->crypto_in_use_config) { if (crypto_encrypt_and_sign(knet_h, (const unsigned char *)inbuf, outlen, knet_h->recv_from_links_buf_crypt, &outlen) < 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to encrypt PMTUd reply packet"); return; } outbuf = knet_h->recv_from_links_buf_crypt; stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); if (stats_err < 0) { log_err(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock: %s", strerror(stats_err)); return; } knet_h->stats_extra.tx_crypt_pmtu_reply_packets++; pthread_mutex_unlock(&knet_h->handle_stats_mutex); } savederrno = pthread_mutex_lock(&knet_h->tx_mutex); if (savederrno) { log_err(knet_h, KNET_SUB_PMTUD, "Unable to get TX mutex lock: %s", strerror(savederrno)); return; } retry: if (src_link->transport_connected) { if (transport_get_connection_oriented(knet_h, src_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) { len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, (struct sockaddr *) &src_link->dst_addr, knet_h->knet_transport_fd_tracker[src_link->outsock].sockaddr_len); } else { len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0); } savederrno = errno; if (len != outlen) { err = transport_tx_sock_error(knet_h, src_link->transport, src_link->outsock, len, savederrno); stats_err = pthread_mutex_lock(&src_link->link_stats_mutex); if (stats_err < 0) { log_err(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock: %s", strerror(stats_err)); return; } switch(err) { case KNET_TRANSPORT_SOCK_ERROR_INTERNAL: log_debug(knet_h, KNET_SUB_PMTUD, "Unable to send PMTUd reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s", src_link->outsock, errno, strerror(errno), src_link->status.src_ipaddr, src_link->status.src_port, src_link->status.dst_ipaddr, src_link->status.dst_port); src_link->status.stats.tx_pmtu_errors++; break; case KNET_TRANSPORT_SOCK_ERROR_IGNORE: src_link->status.stats.tx_pmtu_errors++; break; case KNET_TRANSPORT_SOCK_ERROR_RETRY: src_link->status.stats.tx_pmtu_retries++; pthread_mutex_unlock(&src_link->link_stats_mutex); goto retry; break; } pthread_mutex_unlock(&src_link->link_stats_mutex); } } pthread_mutex_unlock(&knet_h->tx_mutex); } void process_pmtud(knet_handle_t knet_h, struct knet_link *src_link, struct knet_header *inbuf) { /* * at the moment we don't need to take any extra * actions when processing a PMTUd packet, except * sending a reply */ send_pmtud_reply(knet_h, src_link, inbuf); } void process_pmtud_reply(knet_handle_t knet_h, struct knet_link *src_link, struct knet_header *inbuf) { if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) { log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock"); return; } if (knet_h->onwire_ver_remap) { process_pmtud_reply_v1(knet_h, src_link, inbuf); } else { switch (inbuf->kh_version) { case 1: process_pmtud_reply_v1(knet_h, src_link, inbuf); break; default: log_warn(knet_h, KNET_SUB_PMTUD, "preparing PMTUD reply onwire version %u not supported", inbuf->kh_version); goto out_unlock; break; } } pthread_cond_signal(&knet_h->pmtud_cond); out_unlock: pthread_mutex_unlock(&knet_h->pmtud_mutex); } int knet_handle_pmtud_getfreq(knet_handle_t knet_h, unsigned int *interval) { int savederrno = 0; if (!_is_valid_handle(knet_h)) { return -1; } if (!interval) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_PMTUD, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } *interval = knet_h->pmtud_interval; pthread_rwlock_unlock(&knet_h->global_rwlock); errno = 0; return 0; } int knet_handle_pmtud_setfreq(knet_handle_t knet_h, unsigned int interval) { int savederrno = 0; if (!_is_valid_handle(knet_h)) { return -1; } if ((!interval) || (interval > 86400)) { errno = EINVAL; return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_PMTUD, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } knet_h->pmtud_interval = interval; log_debug(knet_h, KNET_SUB_PMTUD, "PMTUd interval set to: %u seconds", interval); pthread_rwlock_unlock(&knet_h->global_rwlock); errno = 0; return 0; } int knet_handle_enable_pmtud_notify(knet_handle_t knet_h, void *pmtud_notify_fn_private_data, void (*pmtud_notify_fn) ( void *private_data, unsigned int data_mtu)) { int savederrno = 0; if (!_is_valid_handle(knet_h)) { return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_PMTUD, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } knet_h->pmtud_notify_fn_private_data = pmtud_notify_fn_private_data; knet_h->pmtud_notify_fn = pmtud_notify_fn; if (knet_h->pmtud_notify_fn) { log_debug(knet_h, KNET_SUB_PMTUD, "pmtud_notify_fn enabled"); } else { log_debug(knet_h, KNET_SUB_PMTUD, "pmtud_notify_fn disabled"); } pthread_rwlock_unlock(&knet_h->global_rwlock); errno = 0; return 0; } int knet_handle_pmtud_set(knet_handle_t knet_h, unsigned int iface_mtu) { int savederrno = 0; if (!_is_valid_handle(knet_h)) { return -1; } if (iface_mtu > KNET_PMTUD_SIZE_V4) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_PMTUD, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } log_info(knet_h, KNET_SUB_PMTUD, "MTU manually set to: %u", iface_mtu); knet_h->manual_mtu = iface_mtu; force_pmtud_run(knet_h, KNET_SUB_PMTUD, 0); pthread_rwlock_unlock(&knet_h->global_rwlock); errno = 0; return 0; } int knet_handle_pmtud_get(knet_handle_t knet_h, unsigned int *data_mtu) { int savederrno = 0; if (!_is_valid_handle(knet_h)) { return -1; } if (!data_mtu) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_PMTUD, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } *data_mtu = knet_h->data_mtu; pthread_rwlock_unlock(&knet_h->global_rwlock); errno = 0; return 0; } diff --git a/libknet/threads_rx.c b/libknet/threads_rx.c index 3ba09c71..b8f2a7e6 100644 --- a/libknet/threads_rx.c +++ b/libknet/threads_rx.c @@ -1,1178 +1,1207 @@ /* * Copyright (C) 2012-2021 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include #include "compat.h" #include "compress.h" #include "crypto.h" #include "host.h" #include "links.h" #include "links_acl.h" #include "logging.h" #include "transports.h" #include "transport_common.h" #include "threads_common.h" #include "threads_heartbeat.h" #include "threads_pmtud.h" #include "threads_rx.h" #include "netutils.h" #include "onwire_v1.h" /* * RECV */ /* * return 1 if a > b * return -1 if b > a * return 0 if they are equal */ static inline int _timecmp(struct timespec a, struct timespec b) { if (a.tv_sec != b.tv_sec) { if (a.tv_sec > b.tv_sec) { return 1; } else { return -1; } } else { if (a.tv_nsec > b.tv_nsec) { return 1; } else if (a.tv_nsec < b.tv_nsec) { return -1; } else { return 0; } } } /* * calculate use % of defrag buffers per host * and if % is <= knet_h->defrag_bufs_shrink_threshold for the last second, then half the size */ static void _shrink_defrag_buffers(knet_handle_t knet_h) { struct knet_host *host; struct knet_host_defrag_buf *new_bufs = NULL; struct timespec now; unsigned long long time_diff; /* nanoseconds */ uint16_t i, x, in_use_bufs; uint32_t sum; /* * first run. */ if ((knet_h->defrag_bufs_last_run.tv_sec == 0) && (knet_h->defrag_bufs_last_run.tv_nsec == 0)) { clock_gettime(CLOCK_MONOTONIC, &knet_h->defrag_bufs_last_run); return; } clock_gettime(CLOCK_MONOTONIC, &now); timespec_diff(knet_h->defrag_bufs_last_run, now, &time_diff); if (time_diff < (((unsigned long long)knet_h->defrag_bufs_usage_samples_timespan * 1000000000) / knet_h->defrag_bufs_usage_samples)) { return; } /* * record the last run */ memmove(&knet_h->defrag_bufs_last_run, &now, sizeof(struct timespec)); /* * do the real work: */ for (host = knet_h->host_head; host != NULL; host = host->next) { /* * Update buffer usage stats. We do this for all nodes. */ in_use_bufs = 0; for (i = 0; i < host->allocated_defrag_bufs; i++) { if (host->defrag_bufs[i].in_use) { in_use_bufs++; } } /* * record only % */ host->in_use_defrag_buffers[host->in_use_defrag_buffers_index] = (in_use_bufs * 100 / host->allocated_defrag_bufs); host->in_use_defrag_buffers_index++; /* * make sure to stay within buffer */ if (host->in_use_defrag_buffers_index == knet_h->defrag_bufs_usage_samples) { host->in_use_defrag_buffers_index = 0; } /* * only allow shrinking if we have enough samples */ if (host->in_use_defrag_buffers_samples < knet_h->defrag_bufs_usage_samples) { host->in_use_defrag_buffers_samples++; continue; } /* * only allow shrinking if in use bufs are <= knet_h->defrag_bufs_shrink_threshold% */ if (knet_h->defrag_bufs_reclaim_policy == RECLAIM_POLICY_AVERAGE) { sum = 0; for (i = 0; i < knet_h->defrag_bufs_usage_samples; i++) { sum += host->in_use_defrag_buffers[i]; } sum = sum / knet_h->defrag_bufs_usage_samples; if (sum > knet_h->defrag_bufs_shrink_threshold) { continue; } } else { sum = 0; for (i = 0; i < knet_h->defrag_bufs_usage_samples; i++) { if (host->in_use_defrag_buffers[i] > knet_h->defrag_bufs_shrink_threshold) { sum = 1; } } if (sum) { continue; } } /* * only allow shrinking if allocated bufs > min_defrag_bufs */ if (host->allocated_defrag_bufs == knet_h->defrag_bufs_min) { continue; } /* * compat all the in_use buffers at the beginning. * we the checks above, we are 100% sure they fit */ x = 0; for (i = 0; i < host->allocated_defrag_bufs; i++) { if (host->defrag_bufs[i].in_use) { memmove(&host->defrag_bufs[x], &host->defrag_bufs[i], sizeof(struct knet_host_defrag_buf)); x++; } } /* * memory allocation is not critical. it just means the system is under * memory pressure and we will need to wait our turn to free memory... how odd :) */ new_bufs = realloc(host->defrag_bufs, sizeof(struct knet_host_defrag_buf) * (host->allocated_defrag_bufs / 2)); if (!new_bufs) { log_err(knet_h, KNET_SUB_RX, "Unable to decrease defrag buffers for host %u: %s", host->host_id, strerror(errno)); continue; } host->defrag_bufs = new_bufs; host->allocated_defrag_bufs = host->allocated_defrag_bufs / 2; /* * clear buffer use stats. Old ones are no good for new one */ _clear_defrag_bufs_stats(host); log_debug(knet_h, KNET_SUB_RX, "Defrag buffers for host %u decreased from %u to: %u", host->host_id, host->allocated_defrag_bufs * 2, host->allocated_defrag_bufs); } } /* * check if we can double the defrag buffers. * * return 0 if we cannot reallocate * return 1 if we have more buffers */ static int _realloc_defrag_buffers(knet_handle_t knet_h, struct knet_host *src_host) { struct knet_host_defrag_buf *new_bufs = NULL; int i; /* * max_defrag_bufs is a power of 2 * allocated_defrag_bufs doubles on each iteration. * Sooner or later (and hopefully never) allocated with be == to max. */ if (src_host->allocated_defrag_bufs < knet_h->defrag_bufs_max) { new_bufs = realloc(src_host->defrag_bufs, src_host->allocated_defrag_bufs * 2 * sizeof(struct knet_host_defrag_buf)); if (!new_bufs) { log_err(knet_h, KNET_SUB_RX, "Unable to increase defrag buffers for host %u: %s", src_host->host_id, strerror(errno)); return 0; } /* * keep the math simple here between arrays, pointers and what not. * Init each buffer individually. */ for (i = src_host->allocated_defrag_bufs; i < src_host->allocated_defrag_bufs * 2; i++) { memset(&new_bufs[i], 0, sizeof(struct knet_host_defrag_buf)); } src_host->allocated_defrag_bufs = src_host->allocated_defrag_bufs * 2; src_host->defrag_bufs = new_bufs; /* * clear buffer use stats. Old ones are no good for new one */ _clear_defrag_bufs_stats(src_host); log_debug(knet_h, KNET_SUB_RX, "Defrag buffers for host %u increased from %u to: %u", src_host->host_id, src_host->allocated_defrag_bufs / 2, src_host->allocated_defrag_bufs); return 1; } return 0; } /* * this functions needs to return an index * to a knet_host_defrag_buf. (-1 on errors) */ static int _find_pckt_defrag_buf(knet_handle_t knet_h, struct knet_host *src_host, seq_num_t seq_num) { int i, oldest; uint16_t cur_allocated_defrag_bufs = src_host->allocated_defrag_bufs; /* * check if there is a buffer already in use handling the same seq_num */ for (i = 0; i < src_host->allocated_defrag_bufs; i++) { if (src_host->defrag_bufs[i].in_use) { if (src_host->defrag_bufs[i].pckt_seq == seq_num) { return i; } } } /* * If there is no buffer that's handling the current seq_num * either it's new or it's been reclaimed already. * check if it's been reclaimed/seen before using the defrag circular * buffer. If the pckt has been seen before, the buffer expired (ETIME) * and there is no point to try to defrag it again. */ if (!_seq_num_lookup(knet_h, src_host, seq_num, 1, 0)) { errno = ETIME; return -1; } /* * register the pckt as seen */ _seq_num_set(src_host, seq_num, 1); /* * see if there is a free buffer */ for (i = 0; i < src_host->allocated_defrag_bufs; i++) { if (!src_host->defrag_bufs[i].in_use) { return i; } } /* * check if we can increase num of buffers */ if (_realloc_defrag_buffers(knet_h, src_host)) { return cur_allocated_defrag_bufs + 1; } /* * at this point, there are no free buffers, the pckt is new * and we need to reclaim a buffer, and we will take the one * with the oldest timestamp. It's as good as any. */ oldest = 0; for (i = 0; i < src_host->allocated_defrag_bufs; i++) { if (_timecmp(src_host->defrag_bufs[i].last_update, src_host->defrag_bufs[oldest].last_update) < 0) { oldest = i; } } src_host->defrag_bufs[oldest].in_use = 0; return oldest; } static int _pckt_defrag(knet_handle_t knet_h, struct knet_host *src_host, seq_num_t seq_num, unsigned char *data, ssize_t *len, uint8_t frags, uint8_t frag_seq) { struct knet_host_defrag_buf *defrag_buf; int defrag_buf_idx; defrag_buf_idx = _find_pckt_defrag_buf(knet_h, src_host, seq_num); if (defrag_buf_idx < 0) { return 1; } defrag_buf = &src_host->defrag_bufs[defrag_buf_idx]; /* * if the buf is not is use, then make sure it's clean */ if (!defrag_buf->in_use) { memset(defrag_buf, 0, sizeof(struct knet_host_defrag_buf)); defrag_buf->in_use = 1; defrag_buf->pckt_seq = seq_num; } /* * update timestamp on the buffer */ clock_gettime(CLOCK_MONOTONIC, &defrag_buf->last_update); /* * check if we already received this fragment */ if (defrag_buf->frag_map[frag_seq]) { /* * if we have received this fragment and we didn't clear the buffer * it means that we don't have all fragments yet */ return 1; } /* * we need to handle the last packet with gloves due to its different size */ if (frag_seq == frags) { defrag_buf->last_frag_size = *len; /* * in the event when the last packet arrives first, * we still don't know the offset vs the other fragments (based on MTU), * so we store the fragment at the end of the buffer where it's safe * and take a copy of the len so that we can restore its offset later. * remember we can't use the local MTU for this calculation because pMTU * can be asymettric between the same hosts. */ if (!defrag_buf->frag_size) { defrag_buf->last_first = 1; memmove(defrag_buf->buf + (KNET_MAX_PACKET_SIZE - *len), data, *len); } } else { defrag_buf->frag_size = *len; } if (defrag_buf->frag_size) { memmove(defrag_buf->buf + ((frag_seq - 1) * defrag_buf->frag_size), data, *len); } defrag_buf->frag_recv++; defrag_buf->frag_map[frag_seq] = 1; /* * check if we received all the fragments */ if (defrag_buf->frag_recv == frags) { /* * special case the last pckt */ if (defrag_buf->last_first) { memmove(defrag_buf->buf + ((frags - 1) * defrag_buf->frag_size), defrag_buf->buf + (KNET_MAX_PACKET_SIZE - defrag_buf->last_frag_size), defrag_buf->last_frag_size); } /* * recalculate packet lenght */ *len = ((frags - 1) * defrag_buf->frag_size) + defrag_buf->last_frag_size; /* * copy the pckt back in the user data */ memmove(data, defrag_buf->buf, *len); /* * free this buffer */ defrag_buf->in_use = 0; return 0; } return 1; } static int _handle_data_stats(knet_handle_t knet_h, struct knet_link *src_link, ssize_t len, uint64_t decrypt_time) { int stats_err; /* data stats at the top for consistency with TX */ src_link->status.stats.rx_data_packets++; src_link->status.stats.rx_data_bytes += len; if (decrypt_time) { stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); if (stats_err < 0) { log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err)); return -1; } /* Only update the crypto overhead for data packets. Mainly to be consistent with TX */ if (decrypt_time < knet_h->stats.rx_crypt_time_min) { knet_h->stats.rx_crypt_time_min = decrypt_time; } if (decrypt_time > knet_h->stats.rx_crypt_time_max) { knet_h->stats.rx_crypt_time_max = decrypt_time; } knet_h->stats.rx_crypt_time_ave = (knet_h->stats.rx_crypt_time_ave * knet_h->stats.rx_crypt_packets + decrypt_time) / (knet_h->stats.rx_crypt_packets+1); knet_h->stats.rx_crypt_packets++; pthread_mutex_unlock(&knet_h->handle_stats_mutex); } return 0; } static int _decompress_data(knet_handle_t knet_h, uint8_t decompress_type, unsigned char *data, ssize_t *len, ssize_t header_size) { int err = 0, stats_err = 0; if (decompress_type) { ssize_t decmp_outlen = KNET_DATABUFSIZE_COMPRESS; struct timespec start_time; struct timespec end_time; uint64_t decompress_time; clock_gettime(CLOCK_MONOTONIC, &start_time); err = decompress(knet_h, decompress_type, data, *len - header_size, knet_h->recv_from_links_buf_decompress, &decmp_outlen); clock_gettime(CLOCK_MONOTONIC, &end_time); timespec_diff(start_time, end_time, &decompress_time); stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); if (stats_err < 0) { log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err)); return -1; } if (!err) { /* Collect stats */ if (decompress_time < knet_h->stats.rx_compress_time_min) { knet_h->stats.rx_compress_time_min = decompress_time; } if (decompress_time > knet_h->stats.rx_compress_time_max) { knet_h->stats.rx_compress_time_max = decompress_time; } knet_h->stats.rx_compress_time_ave = (knet_h->stats.rx_compress_time_ave * knet_h->stats.rx_compressed_packets + decompress_time) / (knet_h->stats.rx_compressed_packets+1); knet_h->stats.rx_compressed_packets++; knet_h->stats.rx_compressed_original_bytes += decmp_outlen; knet_h->stats.rx_compressed_size_bytes += *len - KNET_HEADER_SIZE; memmove(data, knet_h->recv_from_links_buf_decompress, decmp_outlen); *len = decmp_outlen + header_size; } else { knet_h->stats.rx_failed_to_decompress++; pthread_mutex_unlock(&knet_h->handle_stats_mutex); log_warn(knet_h, KNET_SUB_COMPRESS, "Unable to decompress packet (%d): %s", err, strerror(errno)); return -1; } pthread_mutex_unlock(&knet_h->handle_stats_mutex); } return 0; } static int _check_destination(knet_handle_t knet_h, struct knet_header *inbuf, unsigned char *data, ssize_t len, ssize_t header_size, int8_t *channel) { knet_node_id_t dst_host_ids[KNET_MAX_HOST]; size_t dst_host_ids_entries = 0; int bcast = 1; size_t host_idx; int found = 0; if (knet_h->dst_host_filter_fn) { bcast = knet_h->dst_host_filter_fn( knet_h->dst_host_filter_fn_private_data, data, len - header_size, KNET_NOTIFY_RX, knet_h->host_id, inbuf->kh_node, channel, dst_host_ids, &dst_host_ids_entries); if (bcast < 0) { log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast); return -1; } if ((!bcast) && (!dst_host_ids_entries)) { log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries"); return -1; } /* check if we are dst for this packet */ if (!bcast) { if (dst_host_ids_entries > KNET_MAX_HOST) { log_debug(knet_h, KNET_SUB_RX, "dst_host_filter_fn returned too many destinations"); return -1; } for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) { if (dst_host_ids[host_idx] == knet_h->host_id) { found = 1; break; } } if (!found) { log_debug(knet_h, KNET_SUB_RX, "Packet is not for us"); return -1; } } } return 0; } static int _deliver_data(knet_handle_t knet_h, unsigned char *data, ssize_t len, ssize_t header_size, int8_t channel) { struct iovec iov_out[1]; ssize_t outlen = 0; memset(iov_out, 0, sizeof(iov_out)); retry: iov_out[0].iov_base = (void *) data + outlen; iov_out[0].iov_len = len - (outlen + header_size); outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1); if ((outlen > 0) && (outlen < (ssize_t)iov_out[0].iov_len)) { log_debug(knet_h, KNET_SUB_RX, "Unable to send all data to the application in one go. Expected: %zu Sent: %zd\n", iov_out[0].iov_len, outlen); goto retry; } if (outlen <= 0) { knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data, knet_h->sockfd[channel].sockfd[0], channel, KNET_NOTIFY_RX, outlen, errno); return -1; } if ((size_t)outlen != iov_out[0].iov_len) { return -1; } return 0; } static void _process_data(knet_handle_t knet_h, struct knet_host *src_host, struct knet_link *src_link, struct knet_header *inbuf, ssize_t len, uint64_t decrypt_time) { int8_t channel; uint8_t decompress_type = 0; ssize_t header_size; seq_num_t seq_num; uint8_t frags, frag_seq; unsigned char *data; if (_handle_data_stats(knet_h, src_link, len, decrypt_time) < 0) { return; } /* * register host is sending data. Required to determine if we need * to reset circular buffers. (see onwire_v1.c) */ src_host->got_data = 1; if (knet_h->onwire_ver_remap) { get_data_header_info_v1(knet_h, inbuf, &header_size, &channel, &seq_num, &decompress_type, &frags, &frag_seq); data = get_data_v1(knet_h, inbuf); } else { switch (inbuf->kh_version) { case 1: get_data_header_info_v1(knet_h, inbuf, &header_size, &channel, &seq_num, &decompress_type, &frags, &frag_seq); data = get_data_v1(knet_h, inbuf); break; default: log_warn(knet_h, KNET_SUB_RX, "processing data onwire version %u not supported", inbuf->kh_version); return; break; } } if (!_seq_num_lookup(knet_h, src_host, seq_num, 0, 0)) { if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) { log_debug(knet_h, KNET_SUB_RX, "Packet has already been delivered"); } return; } if (frags > 1) { /* * len as received from the socket also includes extra stuff * that the defrag code doesn't care about. So strip it * here and readd only for repadding once we are done * defragging * * the defrag code assumes that data packets have all the same size * except the last one that might be smaller. * */ len = len - header_size; if (_pckt_defrag(knet_h, src_host, seq_num, data, &len, frags, frag_seq)) { return; } len = len + header_size; } if (_decompress_data(knet_h, decompress_type, data, &len, header_size) < 0) { return; } if (!src_host->status.reachable) { log_debug(knet_h, KNET_SUB_RX, "Source host %u not reachable yet. Discarding packet.", src_host->host_id); return; } if (knet_h->enabled != 1) /* data forward is disabled */ return; if (_check_destination(knet_h, inbuf, data, len, header_size, &channel) < 0) { return; } if (!knet_h->sockfd[channel].in_use) { log_debug(knet_h, KNET_SUB_RX, "received packet for channel %d but there is no local sock connected", channel); return; } +#ifdef ONWIRE_V1_EXTRA_DEBUG + if (inbuf->khp_data_v1_checksum != compute_chksum(data, len - header_size)) { + log_err(knet_h, KNET_SUB_RX, "Received incorrect data checksum after reassembly from host: %u seq: %u", src_host->host_id, seq_num); + /* + * give a chance to the log threads to pick up the message + */ + sleep(1); + abort(); + } +#endif + if (_deliver_data(knet_h, data, len, header_size, channel) < 0) { return; } _seq_num_set(src_host, seq_num, 0); } static struct knet_header *_decrypt_packet(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *len, uint64_t *decrypt_time) { int try_decrypt = 0; int i = 0; struct timespec start_time; struct timespec end_time; ssize_t outlen; for (i = 1; i <= KNET_MAX_CRYPTO_INSTANCES; i++) { if (knet_h->crypto_instance[i]) { try_decrypt = 1; break; } } if ((!try_decrypt) && (knet_h->crypto_only == KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC)) { log_debug(knet_h, KNET_SUB_RX, "RX thread configured to accept only crypto packets, but no crypto configs are configured!"); return NULL; } if (try_decrypt) { clock_gettime(CLOCK_MONOTONIC, &start_time); if (crypto_authenticate_and_decrypt(knet_h, (unsigned char *)inbuf, *len, knet_h->recv_from_links_buf_decrypt, &outlen) < 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to decrypt/auth packet"); if (knet_h->crypto_only == KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC) { return NULL; } log_debug(knet_h, KNET_SUB_RX, "Attempting to process packet as clear data"); } else { clock_gettime(CLOCK_MONOTONIC, &end_time); timespec_diff(start_time, end_time, decrypt_time); *len = outlen; inbuf = (struct knet_header *)knet_h->recv_from_links_buf_decrypt; } } return inbuf; } static int _packet_checks(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t len) { +#ifdef ONWIRE_V1_EXTRA_DEBUG + uint32_t rx_packet_checksum, expected_packet_checksum; +#endif + if (len < (ssize_t)(KNET_HEADER_SIZE + 1)) { log_debug(knet_h, KNET_SUB_RX, "Packet is too short: %ld", (long)len); return -1; } +#ifdef ONWIRE_V1_EXTRA_DEBUG + rx_packet_checksum = inbuf->kh_checksum; + inbuf->kh_checksum = 0; + expected_packet_checksum = compute_chksum((const unsigned char *)inbuf, len); + if (rx_packet_checksum != expected_packet_checksum) { + log_err(knet_h, KNET_SUB_RX, "Received packet with incorrect checksum. Received: %u Expected: %u", rx_packet_checksum, expected_packet_checksum); + /* + * give a chance to the log threads to pick up the message + */ + sleep(1); + abort(); + } +#endif + /* * old versions of knet did not advertise max_ver and max_ver is set to 0. */ if (!inbuf->kh_max_ver) { inbuf->kh_max_ver = 1; } /* * if the node joining max version is lower than the min version * then we reject the node */ if (inbuf->kh_max_ver < knet_h->onwire_min_ver) { log_warn(knet_h, KNET_SUB_RX, "Received packet version %u from node %u, lower than currently minimal supported onwire version. Rejecting.", inbuf->kh_version, inbuf->kh_node); return -1; } /* * if the node joining with version higher than our max version * then we reject the node */ if (inbuf->kh_version > knet_h->onwire_max_ver) { log_warn(knet_h, KNET_SUB_RX, "Received packet version %u from node %u, higher than currently maximum supported onwire version. Rejecting.", inbuf->kh_version, inbuf->kh_node); return -1; } /* * if the node joining with version lower than the current in use version * then we reject the node * * NOTE: should we make this configurable and support downgrades? */ if ((!knet_h->onwire_force_ver) && (inbuf->kh_version < knet_h->onwire_ver) && (inbuf->kh_max_ver > inbuf->kh_version)) { log_warn(knet_h, KNET_SUB_RX, "Received packet version %u from node %u, lower than currently in use onwire version. Rejecting.", inbuf->kh_version, inbuf->kh_node); return -1; } return 0; } static void _handle_dynip(knet_handle_t knet_h, struct knet_host *src_host, struct knet_link *src_link, int sockfd, const struct knet_mmsghdr *msg) { if (src_link->dynamic == KNET_LINK_DYNIP) { if (cmpaddr(&src_link->dst_addr, msg->msg_hdr.msg_name) != 0) { log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u appears to have changed ip address", src_host->host_id, src_link->link_id); memmove(&src_link->dst_addr, msg->msg_hdr.msg_name, sizeof(struct sockaddr_storage)); if (knet_addrtostr(&src_link->dst_addr, sockaddr_len(&src_link->dst_addr), src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN, src_link->status.dst_port, KNET_MAX_PORT_LEN) != 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to resolve ???"); snprintf(src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN - 1, "Unknown!!!"); snprintf(src_link->status.dst_port, KNET_MAX_PORT_LEN - 1, "??"); } else { log_info(knet_h, KNET_SUB_RX, "host: %u link: %u new connection established from: %s %s", src_host->host_id, src_link->link_id, src_link->status.dst_ipaddr, src_link->status.dst_port); } } /* * transport has already accepted the connection here * otherwise we would not be receiving packets */ transport_link_dyn_connect(knet_h, sockfd, src_link); } } /* * processing incoming packets vs access lists */ static int _check_rx_acl(knet_handle_t knet_h, struct knet_link *src_link, const struct knet_mmsghdr *msg) { if (knet_h->use_access_lists) { if (!check_validate(knet_h, src_link, msg->msg_hdr.msg_name)) { char src_ipaddr[KNET_MAX_HOST_LEN]; char src_port[KNET_MAX_PORT_LEN]; memset(src_ipaddr, 0, KNET_MAX_HOST_LEN); memset(src_port, 0, KNET_MAX_PORT_LEN); if (knet_addrtostr(msg->msg_hdr.msg_name, sockaddr_len(msg->msg_hdr.msg_name), src_ipaddr, KNET_MAX_HOST_LEN, src_port, KNET_MAX_PORT_LEN) < 0) { log_debug(knet_h, KNET_SUB_RX, "Packet rejected: unable to resolve host/port"); } else { log_debug(knet_h, KNET_SUB_RX, "Packet rejected from %s/%s", src_ipaddr, src_port); } return 0; } } return 1; } static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struct knet_mmsghdr *msg) { int savederrno = 0, stats_err = 0; struct knet_host *src_host; struct knet_link *src_link; uint64_t decrypt_time = 0; struct knet_header *inbuf = msg->msg_hdr.msg_iov->iov_base; ssize_t len = msg->msg_len; int i, found_link = 0; inbuf = _decrypt_packet(knet_h, inbuf, &len, &decrypt_time); if (!inbuf) { return; } - inbuf->kh_node = ntohs(inbuf->kh_node); - if (_packet_checks(knet_h, inbuf, len) < 0) { return; } + inbuf->kh_node = ntohs(inbuf->kh_node); + /* * determine source host */ src_host = knet_h->host_index[inbuf->kh_node]; if (src_host == NULL) { /* host not found */ log_debug(knet_h, KNET_SUB_RX, "Unable to find source host for this packet"); return; } /* * deteremine source link */ if (inbuf->kh_type == KNET_HEADER_TYPE_PING) { _handle_onwire_version(knet_h, src_host, inbuf); if (knet_h->onwire_ver_remap) { src_link = get_link_from_pong_v1(knet_h, src_host, inbuf); } else { switch (inbuf->kh_version) { case 1: src_link = get_link_from_pong_v1(knet_h, src_host, inbuf); break; default: log_warn(knet_h, KNET_SUB_RX, "Parsing ping onwire version %u not supported", inbuf->kh_version); return; break; } } if (!_check_rx_acl(knet_h, src_link, msg)) { return; } _handle_dynip(knet_h, src_host, src_link, sockfd, msg); } else { /* all other packets */ for (i = 0; i < KNET_MAX_LINK; i++) { src_link = &src_host->link[i]; if (cmpaddr(&src_link->dst_addr, msg->msg_hdr.msg_name) == 0) { found_link = 1; break; } } if (found_link) { /* * this check is currently redundant.. Keep it here for now */ if (!_check_rx_acl(knet_h, src_link, msg)) { return; } } else { log_debug(knet_h, KNET_SUB_RX, "Unable to determine source link for data packet. Discarding packet."); return; } } stats_err = pthread_mutex_lock(&src_link->link_stats_mutex); if (stats_err) { log_err(knet_h, KNET_SUB_RX, "Unable to get stats mutex lock for host %u link %u: %s", src_host->host_id, src_link->link_id, strerror(savederrno)); return; } switch (inbuf->kh_type) { case KNET_HEADER_TYPE_DATA: _process_data(knet_h, src_host, src_link, inbuf, len, decrypt_time); break; case KNET_HEADER_TYPE_PING: process_ping(knet_h, src_host, src_link, inbuf, len); break; case KNET_HEADER_TYPE_PONG: process_pong(knet_h, src_host, src_link, inbuf, len); break; case KNET_HEADER_TYPE_PMTUD: src_link->status.stats.rx_pmtu_packets++; src_link->status.stats.rx_pmtu_bytes += len; /* Unlock so we don't deadlock with tx_mutex */ pthread_mutex_unlock(&src_link->link_stats_mutex); process_pmtud(knet_h, src_link, inbuf); return; /* Don't need to unlock link_stats_mutex */ break; case KNET_HEADER_TYPE_PMTUD_REPLY: src_link->status.stats.rx_pmtu_packets++; src_link->status.stats.rx_pmtu_bytes += len; /* pmtud_mutex can't be acquired while we hold a link_stats_mutex (ordering) */ pthread_mutex_unlock(&src_link->link_stats_mutex); process_pmtud_reply(knet_h, src_link, inbuf); return; break; default: pthread_mutex_unlock(&src_link->link_stats_mutex); return; break; } pthread_mutex_unlock(&src_link->link_stats_mutex); } static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg) { int err, savederrno; int i, msg_recv, transport; if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to get global read lock"); return; } if (_is_valid_fd(knet_h, sockfd) < 1) { /* * this is normal if a fd got an event and before we grab the read lock * and the link is removed by another thread */ goto exit_unlock; } transport = knet_h->knet_transport_fd_tracker[sockfd].transport; /* * reset msg_namelen to buffer size because after recvmmsg * each msg_namelen will contain sizeof sockaddr_in or sockaddr_in6 */ for (i = 0; i < PCKT_RX_BUFS; i++) { msg[i].msg_hdr.msg_namelen = knet_h->knet_transport_fd_tracker[sockfd].sockaddr_len; } msg_recv = _recvmmsg(sockfd, &msg[0], PCKT_RX_BUFS, MSG_DONTWAIT | MSG_NOSIGNAL); savederrno = errno; /* * WARNING: man page for recvmmsg is wrong. Kernel implementation here: * recvmmsg can return: * -1 on error * 0 if the previous run of recvmmsg recorded an error on the socket * N number of messages (see exception below). * * If there is an error from recvmsg after receiving a frame or more, the recvmmsg * loop is interrupted, error recorded in the socket (getsockopt(SO_ERROR) and * it will be visibile in the next run. * * Need to be careful how we handle errors at this stage. * * error messages need to be handled on a per transport/protocol base * at this point we have different layers of error handling * - msg_recv < 0 -> error from this run * msg_recv = 0 -> error from previous run and error on socket needs to be cleared * - per-transport message data * example: msg[i].msg_hdr.msg_flags & MSG_NOTIFICATION or msg_len for SCTP == EOF, * but for UDP it is perfectly legal to receive a 0 bytes message.. go figure * - NOTE: on SCTP MSG_NOTIFICATION we get msg_recv == PCKT_FRAG_MAX messages and no * errno set. That means the error api needs to be able to abort the loop below. */ if (msg_recv <= 0) { transport_rx_sock_error(knet_h, transport, sockfd, msg_recv, savederrno); goto exit_unlock; } for (i = 0; i < msg_recv; i++) { err = transport_rx_is_data(knet_h, transport, sockfd, &msg[i]); /* * TODO: make this section silent once we are confident * all protocols packet handlers are good */ switch(err) { case KNET_TRANSPORT_RX_ERROR: /* on error */ log_debug(knet_h, KNET_SUB_RX, "Transport reported error parsing packet"); goto exit_unlock; break; case KNET_TRANSPORT_RX_NOT_DATA_CONTINUE: /* packet is not data and we should continue the packet process loop */ log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, continue"); break; case KNET_TRANSPORT_RX_NOT_DATA_STOP: /* packet is not data and we should STOP the packet process loop */ log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, stop"); goto exit_unlock; break; case KNET_TRANSPORT_RX_IS_DATA: /* packet is data and should be parsed as such */ _parse_recv_from_links(knet_h, sockfd, &msg[i]); break; case KNET_TRANSPORT_RX_OOB_DATA_CONTINUE: log_debug(knet_h, KNET_SUB_RX, "Transport is processing sock OOB data, continue"); break; case KNET_TRANSPORT_RX_OOB_DATA_STOP: log_debug(knet_h, KNET_SUB_RX, "Transport has completed processing sock OOB data, stop"); goto exit_unlock; break; } } exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); } void *_handle_recv_from_links_thread(void *data) { int i, nev; knet_handle_t knet_h = (knet_handle_t) data; struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; struct sockaddr_storage address[PCKT_RX_BUFS]; struct knet_mmsghdr msg[PCKT_RX_BUFS]; struct iovec iov_in[PCKT_RX_BUFS]; set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_STARTED); memset(&msg, 0, sizeof(msg)); memset(&events, 0, sizeof(events)); for (i = 0; i < PCKT_RX_BUFS; i++) { iov_in[i].iov_base = (void *)knet_h->recv_from_links_buf[i]; iov_in[i].iov_len = KNET_DATABUFSIZE; memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr)); msg[i].msg_hdr.msg_name = &address[i]; msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); /* Real value filled in before actual use */ msg[i].msg_hdr.msg_iov = &iov_in[i]; msg[i].msg_hdr.msg_iovlen = 1; } while (!shutdown_in_progress(knet_h)) { nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, knet_h->threads_timer_res / 1000); /* * the RX threads only need to notify that there has been at least * one successful run after queue flush has been requested. * See setfwd in handle.c */ if (get_thread_flush_queue(knet_h, KNET_THREAD_RX) == KNET_THREAD_QUEUE_FLUSH) { set_thread_flush_queue(knet_h, KNET_THREAD_RX, KNET_THREAD_QUEUE_FLUSHED); } /* * we use timeout to detect if thread is shutting down */ if (nev == 0) { continue; } for (i = 0; i < nev; i++) { _handle_recv_from_links(knet_h, events[i].data.fd, msg); } _shrink_defrag_buffers(knet_h); } set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_STOPPED); return NULL; } ssize_t knet_recv(knet_handle_t knet_h, char *buff, const size_t buff_len, const int8_t channel) { int savederrno = 0; ssize_t err = 0; struct iovec iov_in; if (!_is_valid_handle(knet_h)) { return -1; } if (buff == NULL) { errno = EINVAL; return -1; } if (buff_len <= 0) { errno = EINVAL; return -1; } if (buff_len > KNET_MAX_PACKET_SIZE) { errno = EINVAL; return -1; } if (channel < 0) { errno = EINVAL; return -1; } if (channel >= KNET_DATAFD_MAX) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->sockfd[channel].in_use) { savederrno = EINVAL; err = -1; goto out_unlock; } memset(&iov_in, 0, sizeof(iov_in)); iov_in.iov_base = (void *)buff; iov_in.iov_len = buff_len; err = readv(knet_h->sockfd[channel].sockfd[0], &iov_in, 1); savederrno = errno; out_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } diff --git a/libknet/threads_tx.c b/libknet/threads_tx.c index f240f65d..44473c1f 100644 --- a/libknet/threads_tx.c +++ b/libknet/threads_tx.c @@ -1,994 +1,999 @@ /* * Copyright (C) 2012-2021 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include "compat.h" #include "compress.h" #include "crypto.h" #include "host.h" #include "link.h" #include "logging.h" #include "transports.h" #include "transport_common.h" #include "threads_common.h" #include "threads_heartbeat.h" #include "threads_tx.h" #include "netutils.h" #include "onwire_v1.h" /* * SEND */ static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_mmsghdr *msg, int msgs_to_send) { int link_idx, msg_idx, sent_msgs, prev_sent, progress; int err = 0, savederrno = 0, locked = 0; unsigned int i; struct knet_mmsghdr *cur; struct knet_link *cur_link; for (link_idx = 0; link_idx < dst_host->active_link_entries; link_idx++) { prev_sent = 0; progress = 1; locked = 0; cur_link = &dst_host->link[dst_host->active_links[link_idx]]; if (cur_link->transport == KNET_TRANSPORT_LOOPBACK) { continue; } savederrno = pthread_mutex_lock(&cur_link->link_stats_mutex); if (savederrno) { log_err(knet_h, KNET_SUB_TX, "Unable to get stats mutex lock for host %u link %u: %s", dst_host->host_id, cur_link->link_id, strerror(savederrno)); continue; } locked = 1; msg_idx = 0; while (msg_idx < msgs_to_send) { msg[msg_idx].msg_hdr.msg_name = &cur_link->dst_addr; msg[msg_idx].msg_hdr.msg_namelen = knet_h->knet_transport_fd_tracker[cur_link->outsock].sockaddr_len; /* Cast for Linux/BSD compatibility */ for (i=0; i<(unsigned int)msg[msg_idx].msg_hdr.msg_iovlen; i++) { cur_link->status.stats.tx_data_bytes += msg[msg_idx].msg_hdr.msg_iov[i].iov_len; } cur_link->status.stats.tx_data_packets++; msg_idx++; } retry: cur = &msg[prev_sent]; sent_msgs = _sendmmsg(dst_host->link[dst_host->active_links[link_idx]].outsock, transport_get_connection_oriented(knet_h, dst_host->link[dst_host->active_links[link_idx]].transport), &cur[0], msgs_to_send - prev_sent, MSG_DONTWAIT | MSG_NOSIGNAL); savederrno = errno; err = transport_tx_sock_error(knet_h, dst_host->link[dst_host->active_links[link_idx]].transport, dst_host->link[dst_host->active_links[link_idx]].outsock, sent_msgs, savederrno); switch(err) { case KNET_TRANSPORT_SOCK_ERROR_INTERNAL: cur_link->status.stats.tx_data_errors++; goto out_unlock; break; case KNET_TRANSPORT_SOCK_ERROR_IGNORE: break; case KNET_TRANSPORT_SOCK_ERROR_RETRY: cur_link->status.stats.tx_data_retries++; goto retry; break; } prev_sent = prev_sent + sent_msgs; if ((sent_msgs >= 0) && (prev_sent < msgs_to_send)) { if ((sent_msgs) || (progress)) { if (sent_msgs) { progress = 1; } else { progress = 0; } #ifdef DEBUG log_debug(knet_h, KNET_SUB_TX, "Unable to send all (%d/%d) data packets to host %s (%u) link %s:%s (%u)", sent_msgs, msg_idx, dst_host->name, dst_host->host_id, dst_host->link[dst_host->active_links[link_idx]].status.dst_ipaddr, dst_host->link[dst_host->active_links[link_idx]].status.dst_port, dst_host->link[dst_host->active_links[link_idx]].link_id); #endif goto retry; } if (!progress) { savederrno = EAGAIN; err = -1; goto out_unlock; } } if ((dst_host->link_handler_policy == KNET_LINK_POLICY_RR) && (dst_host->active_link_entries > 1)) { uint8_t cur_link_id = dst_host->active_links[0]; memmove(&dst_host->active_links[0], &dst_host->active_links[1], KNET_MAX_LINK - 1); dst_host->active_links[dst_host->active_link_entries - 1] = cur_link_id; break; } pthread_mutex_unlock(&cur_link->link_stats_mutex); locked = 0; } out_unlock: if (locked) { pthread_mutex_unlock(&cur_link->link_stats_mutex); } errno = savederrno; return err; } static int _dispatch_to_local(knet_handle_t knet_h, unsigned char *data, size_t inlen, int8_t channel) { int err = 0, savederrno = 0; const unsigned char *buf = data; ssize_t buflen = inlen; struct knet_link *local_link = knet_h->host_index[knet_h->host_id]->link; local_retry: err = write(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], buf, buflen); savederrno = errno; if (err < 0) { log_err(knet_h, KNET_SUB_TRANSP_LOOPBACK, "send local failed. error=%s\n", strerror(errno)); local_link->status.stats.tx_data_errors++; goto out; } if (err > 0 && err < buflen) { log_debug(knet_h, KNET_SUB_TRANSP_LOOPBACK, "send local incomplete=%d bytes of %zu\n", err, inlen); local_link->status.stats.tx_data_retries++; buf += err; buflen -= err; goto local_retry; } if (err == buflen) { local_link->status.stats.tx_data_packets++; local_link->status.stats.tx_data_bytes += inlen; } out: errno = savederrno; return err; } static int _prep_tx_bufs(knet_handle_t knet_h, struct knet_header *inbuf, uint8_t onwire_ver, - unsigned char *data, size_t inlen, + unsigned char *data, size_t inlen, uint32_t data_checksum, seq_num_t tx_seq_num, int8_t channel, int bcast, int data_compressed, int *msgs_to_send, struct iovec iov_out[PCKT_FRAG_MAX][2], int *iovcnt_out) { int err = 0, savederrno = 0; unsigned int temp_data_mtu; if (!knet_h->data_mtu) { /* * using MIN_MTU_V4 for data mtu is not completely accurate but safe enough */ log_debug(knet_h, KNET_SUB_TX, "Received data packet but data MTU is still unknown." " Packet might not be delivered." " Assuming minimum IPv4 MTU (%d)", KNET_PMTUD_MIN_MTU_V4); temp_data_mtu = KNET_PMTUD_MIN_MTU_V4; } else { /* * take a copy of the mtu to avoid value changing under * our feet while we are sending a fragmented pckt */ temp_data_mtu = knet_h->data_mtu; } if (knet_h->onwire_ver_remap) { - prep_tx_bufs_v1(knet_h, inbuf, data, inlen, temp_data_mtu, tx_seq_num, channel, bcast, data_compressed, msgs_to_send, iov_out, iovcnt_out); + prep_tx_bufs_v1(knet_h, inbuf, data, inlen, data_checksum, temp_data_mtu, tx_seq_num, channel, bcast, data_compressed, msgs_to_send, iov_out, iovcnt_out); } else { switch (onwire_ver) { case 1: - prep_tx_bufs_v1(knet_h, inbuf, data, inlen, temp_data_mtu, tx_seq_num, channel, bcast, data_compressed, msgs_to_send, iov_out, iovcnt_out); + prep_tx_bufs_v1(knet_h, inbuf, data, inlen, data_checksum, temp_data_mtu, tx_seq_num, channel, bcast, data_compressed, msgs_to_send, iov_out, iovcnt_out); break; default: /* this should never hit as filters are in place in the calling functions */ log_warn(knet_h, KNET_SUB_TX, "preparing data onwire version %u not supported", onwire_ver); savederrno = EINVAL; err = -1; goto out; break; } } out: errno = savederrno; return err; } static int _compress_data(knet_handle_t knet_h, unsigned char* data, size_t *inlen, int *data_compressed) { int err = 0, savederrno = 0; int stats_locked = 0, stats_err = 0; size_t cmp_outlen = KNET_DATABUFSIZE_COMPRESS; struct timespec start_time; struct timespec end_time; uint64_t compress_time; /* * compress data */ if (knet_h->compress_model > 0) { if (*inlen > knet_h->compress_threshold) { clock_gettime(CLOCK_MONOTONIC, &start_time); err = compress(knet_h, data, *inlen, knet_h->send_to_links_buf_compress, (ssize_t *)&cmp_outlen); savederrno = errno; clock_gettime(CLOCK_MONOTONIC, &end_time); timespec_diff(start_time, end_time, &compress_time); stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); if (stats_err < 0) { log_err(knet_h, KNET_SUB_TX, "Unable to get mutex lock: %s", strerror(stats_err)); err = -1; savederrno = stats_err; goto out; } stats_locked = 1; /* Collect stats */ if (compress_time < knet_h->stats.tx_compress_time_min) { knet_h->stats.tx_compress_time_min = compress_time; } if (compress_time > knet_h->stats.tx_compress_time_max) { knet_h->stats.tx_compress_time_max = compress_time; } knet_h->stats.tx_compress_time_ave = (unsigned long long)(knet_h->stats.tx_compress_time_ave * knet_h->stats.tx_compressed_packets + compress_time) / (knet_h->stats.tx_compressed_packets+1); if (err < 0) { knet_h->stats.tx_failed_to_compress++; log_warn(knet_h, KNET_SUB_COMPRESS, "Compression failed (%d): %s", err, strerror(savederrno)); } else { knet_h->stats.tx_compressed_packets++; knet_h->stats.tx_compressed_original_bytes += *inlen; knet_h->stats.tx_compressed_size_bytes += cmp_outlen; if (cmp_outlen < *inlen) { memmove(data, knet_h->send_to_links_buf_compress, cmp_outlen); *inlen = cmp_outlen; *data_compressed = 1; } else { knet_h->stats.tx_unable_to_compress++; } } } if (!*data_compressed) { if (!stats_locked) { stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); if (stats_err < 0) { log_err(knet_h, KNET_SUB_TX, "Unable to get mutex lock: %s", strerror(stats_err)); err = -1; savederrno = stats_err; goto out; } stats_locked = 1; } knet_h->stats.tx_uncompressed_packets++; } if (stats_locked) { pthread_mutex_unlock(&knet_h->handle_stats_mutex); } } out: errno = savederrno; return err; } static int _encrypt_bufs(knet_handle_t knet_h, int msgs_to_send, struct iovec iov_out[PCKT_FRAG_MAX][2], int *iovcnt_out) { int err = 0, savederrno = 0, stats_err = 0; struct timespec start_time; struct timespec end_time; uint64_t crypt_time; uint8_t frag_idx = 0; size_t outlen, uncrypted_frag_size; int j; if (knet_h->crypto_in_use_config) { while (frag_idx < msgs_to_send) { clock_gettime(CLOCK_MONOTONIC, &start_time); if (crypto_encrypt_and_signv( knet_h, iov_out[frag_idx], *iovcnt_out, knet_h->send_to_links_buf_crypt[frag_idx], (ssize_t *)&outlen) < 0) { log_debug(knet_h, KNET_SUB_TX, "Unable to encrypt packet"); savederrno = ECHILD; err = -1; goto out; } clock_gettime(CLOCK_MONOTONIC, &end_time); timespec_diff(start_time, end_time, &crypt_time); stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); if (stats_err < 0) { log_err(knet_h, KNET_SUB_TX, "Unable to get mutex lock: %s", strerror(stats_err)); err = -1; savederrno = stats_err; goto out; } if (crypt_time < knet_h->stats.tx_crypt_time_min) { knet_h->stats.tx_crypt_time_min = crypt_time; } if (crypt_time > knet_h->stats.tx_crypt_time_max) { knet_h->stats.tx_crypt_time_max = crypt_time; } knet_h->stats.tx_crypt_time_ave = (knet_h->stats.tx_crypt_time_ave * knet_h->stats.tx_crypt_packets + crypt_time) / (knet_h->stats.tx_crypt_packets+1); uncrypted_frag_size = 0; for (j=0; j < *iovcnt_out; j++) { uncrypted_frag_size += iov_out[frag_idx][j].iov_len; } knet_h->stats.tx_crypt_byte_overhead += (outlen - uncrypted_frag_size); knet_h->stats.tx_crypt_packets++; pthread_mutex_unlock(&knet_h->handle_stats_mutex); iov_out[frag_idx][0].iov_base = knet_h->send_to_links_buf_crypt[frag_idx]; iov_out[frag_idx][0].iov_len = outlen; frag_idx++; } *iovcnt_out = 1; } out: errno = savederrno; return err; } static int _get_tx_seq_num(knet_handle_t knet_h, seq_num_t *tx_seq_num) { int savederrno = 0; savederrno = pthread_mutex_lock(&knet_h->tx_seq_num_mutex); if (savederrno) { log_debug(knet_h, KNET_SUB_TX, "Unable to get seq mutex lock"); errno = savederrno; return -1; } knet_h->tx_seq_num++; /* * force seq_num 0 to detect a node that has crashed and rejoining * the knet instance. seq_num 0 will clear the buffers in the RX * thread */ if (knet_h->tx_seq_num == 0) { knet_h->tx_seq_num++; } /* * cache the value in locked context */ *tx_seq_num = knet_h->tx_seq_num; pthread_mutex_unlock(&knet_h->tx_seq_num_mutex); /* * forcefully broadcast a ping to all nodes every SEQ_MAX / 8 * pckts. * this solves 2 problems: * 1) on TX socket overloads we generate extra pings to keep links alive * 2) in 3+ nodes setup, where all the traffic is flowing between node 1 and 2, * node 3+ will be able to keep in sync on the TX seq_num even without * receiving traffic or pings in betweens. This avoids issues with * rollover of the circular buffer */ if (*tx_seq_num % (SEQ_MAX / 8) == 0) { _send_pings(knet_h, 0); } return 0; } static int _get_data_dests(knet_handle_t knet_h, unsigned char* data, size_t inlen, int8_t *channel, int *bcast, int *send_local, knet_node_id_t *dst_host_ids, size_t *dst_host_ids_entries, int is_sync) { int err = 0, savederrno = 0; knet_node_id_t dst_host_ids_temp[KNET_MAX_HOST]; /* store destinations from filter */ size_t dst_host_ids_entries_temp = 0; size_t dst_host_ids_entries_temp2 = 0; /* workaround gcc here */ struct knet_host *dst_host; size_t host_idx; memset(dst_host_ids_temp, 0, sizeof(dst_host_ids_temp)); if (knet_h->dst_host_filter_fn) { *bcast = knet_h->dst_host_filter_fn( knet_h->dst_host_filter_fn_private_data, data, inlen, KNET_NOTIFY_TX, knet_h->host_id, knet_h->host_id, channel, dst_host_ids_temp, &dst_host_ids_entries_temp); if (*bcast < 0) { log_debug(knet_h, KNET_SUB_TX, "Error from dst_host_filter_fn: %d", *bcast); savederrno = EFAULT; err = -1; goto out; } if ((!*bcast) && (!dst_host_ids_entries_temp)) { log_debug(knet_h, KNET_SUB_TX, "Message is unicast but no dst_host_ids_entries"); savederrno = EINVAL; err = -1; goto out; } if ((!*bcast) && (dst_host_ids_entries_temp > KNET_MAX_HOST)) { log_debug(knet_h, KNET_SUB_TX, "dst_host_filter_fn returned too many destinations"); savederrno = EINVAL; err = -1; goto out; } if (is_sync) { if ((*bcast) || ((!*bcast) && (dst_host_ids_entries_temp > 1))) { log_debug(knet_h, KNET_SUB_TX, "knet_send_sync is only supported with unicast packets for one destination"); savederrno = E2BIG; err = -1; goto out; } } } /* * check destinations hosts before spending time * in fragmenting/encrypting packets to save * time processing data for unreachable hosts. * for unicast, also remap the destination data * to skip unreachable hosts. */ if (!*bcast) { *dst_host_ids_entries = dst_host_ids_entries_temp2; for (host_idx = 0; host_idx < dst_host_ids_entries_temp; host_idx++) { dst_host = knet_h->host_index[dst_host_ids_temp[host_idx]]; if (!dst_host) { continue; } if ((dst_host->host_id == knet_h->host_id) && (knet_h->has_loop_link)) { *send_local = 1; } if (!((dst_host->host_id == knet_h->host_id) && (knet_h->has_loop_link)) && dst_host->status.reachable) { dst_host_ids[dst_host_ids_entries_temp2] = dst_host_ids_temp[host_idx]; dst_host_ids_entries_temp2++; } } if ((!dst_host_ids_entries_temp2) && (!*send_local)) { savederrno = EHOSTDOWN; err = -1; goto out; } *dst_host_ids_entries = dst_host_ids_entries_temp2; } else { *bcast = 0; *send_local = 0; for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { if ((dst_host->host_id == knet_h->host_id) && (knet_h->has_loop_link)) { *send_local = 1; } if (!(dst_host->host_id == knet_h->host_id && knet_h->has_loop_link) && dst_host->status.reachable) { *bcast = 1; } } if ((!*bcast) && (!*send_local)) { savederrno = EHOSTDOWN; err = -1; goto out; } } out: errno = savederrno; return err; } static int _prep_and_send_msgs(knet_handle_t knet_h, int bcast, knet_node_id_t *dst_host_ids, size_t dst_host_ids_entries, int msgs_to_send, struct iovec iov_out[PCKT_FRAG_MAX][2], int iovcnt_out) { int err = 0, savederrno = 0; struct knet_host *dst_host; struct knet_mmsghdr msg[PCKT_FRAG_MAX]; int msg_idx; size_t host_idx; memset(&msg, 0, sizeof(msg)); msg_idx = 0; while (msg_idx < msgs_to_send) { msg[msg_idx].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); /* this will set properly in _dispatch_to_links() */ msg[msg_idx].msg_hdr.msg_iov = &iov_out[msg_idx][0]; msg[msg_idx].msg_hdr.msg_iovlen = iovcnt_out; msg_idx++; } if (!bcast) { for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) { dst_host = knet_h->host_index[dst_host_ids[host_idx]]; err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send); savederrno = errno; if (err) { goto out; } } } else { for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { if (dst_host->status.reachable) { err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send); savederrno = errno; if (err) { goto out; } } } } out: errno = savederrno; return err; } static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t channel, uint8_t onwire_ver, int is_sync) { int err = 0, savederrno = 0; struct knet_header *inbuf = knet_h->recv_from_sock_buf; /* all TX packets are stored here regardless of the onwire */ unsigned char *data; /* onwire neutrual pointer to data to send */ int data_compressed = 0; /* track data compression to fill the header */ seq_num_t tx_seq_num; + uint32_t data_checksum = 0; /* used only for debugging at the moment */ int bcast = 1; /* assume all packets are to be broadcasted unless filter tells us differently */ knet_node_id_t dst_host_ids[KNET_MAX_HOST]; /* store destinations from filter */ size_t dst_host_ids_entries = 0; int send_local = 0; /* send packets to loopback */ struct iovec iov_out[PCKT_FRAG_MAX][2]; int iovcnt_out = 2; int msgs_to_send = 0; if (knet_h->enabled != 1) { log_debug(knet_h, KNET_SUB_TX, "Received data packet but forwarding is disabled"); savederrno = ECANCELED; err = -1; goto out; } if (knet_h->onwire_ver_remap) { data = get_data_v1(knet_h, inbuf); } else { switch (onwire_ver) { case 1: data = get_data_v1(knet_h, inbuf); break; default: /* this should never hit as filters are in place in the calling functions */ log_warn(knet_h, KNET_SUB_TX, "preparing data onwire version %u not supported", onwire_ver); savederrno = EINVAL; err = -1; goto out; break; } } +#ifdef ONWIRE_V1_EXTRA_DEBUG + data_checksum = compute_chksum(data, inlen); +#endif + memset(dst_host_ids, 0, sizeof(dst_host_ids)); err = _get_data_dests(knet_h, data, inlen, &channel, &bcast, &send_local, dst_host_ids, &dst_host_ids_entries, is_sync); if (err < 0) { savederrno = errno; goto out; } /* Send to localhost if appropriate and enabled */ if (send_local) { err = _dispatch_to_local(knet_h, data, inlen, channel); if (err < 0) { savederrno = errno; goto out; } } err = _compress_data(knet_h, data, &inlen, &data_compressed); if (err < 0) { savederrno = errno; goto out; } err = _get_tx_seq_num(knet_h, &tx_seq_num); if (err < 0) { savederrno = errno; goto out; } - err = _prep_tx_bufs(knet_h, inbuf, onwire_ver, data, inlen, tx_seq_num, channel, bcast, data_compressed, &msgs_to_send, iov_out, &iovcnt_out); + err = _prep_tx_bufs(knet_h, inbuf, onwire_ver, data, inlen, data_checksum, tx_seq_num, channel, bcast, data_compressed, &msgs_to_send, iov_out, &iovcnt_out); if (err < 0) { savederrno = errno; goto out; } err = _encrypt_bufs(knet_h, msgs_to_send, iov_out, &iovcnt_out); if (err < 0) { savederrno = errno; goto out; } err = _prep_and_send_msgs(knet_h, bcast, dst_host_ids, dst_host_ids_entries, msgs_to_send, iov_out, iovcnt_out); if (err < 0) { savederrno = errno; goto out; } out: errno = savederrno; return err; } static void _handle_send_to_links(knet_handle_t knet_h, int sockfd, uint8_t onwire_ver, int8_t channel) { ssize_t inlen = 0; int savederrno = 0, docallback = 0; struct iovec iov_in; struct msghdr msg; struct sockaddr_storage address; memset(&iov_in, 0, sizeof(iov_in)); if (knet_h->onwire_ver_remap) { iov_in.iov_base = (void *)get_data_v1(knet_h, knet_h->recv_from_sock_buf); iov_in.iov_len = KNET_MAX_PACKET_SIZE; } else { switch (onwire_ver) { case 1: iov_in.iov_base = (void *)get_data_v1(knet_h, knet_h->recv_from_sock_buf); iov_in.iov_len = KNET_MAX_PACKET_SIZE; break; default: log_warn(knet_h, KNET_SUB_TX, "preparing data onwire version %u not supported", onwire_ver); break; } } memset(&msg, 0, sizeof(struct msghdr)); msg.msg_name = &address; msg.msg_namelen = knet_h->knet_transport_fd_tracker[sockfd].sockaddr_len; msg.msg_iov = &iov_in; msg.msg_iovlen = 1; if ((channel >= 0) && (channel < KNET_DATAFD_MAX) && (!knet_h->sockfd[channel].is_socket)) { inlen = readv(sockfd, msg.msg_iov, 1); } else { inlen = recvmsg(sockfd, &msg, MSG_DONTWAIT | MSG_NOSIGNAL); if (msg.msg_flags & MSG_TRUNC) { log_warn(knet_h, KNET_SUB_TX, "Received truncated message from sock %d. Discarding", sockfd); return; } } if (inlen == 0) { savederrno = 0; docallback = 1; } else if (inlen < 0) { struct epoll_event ev; savederrno = errno; docallback = 1; memset(&ev, 0, sizeof(struct epoll_event)); if (epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) { log_err(knet_h, KNET_SUB_TX, "Unable to del datafd %d from linkfd epoll pool: %s", knet_h->sockfd[channel].sockfd[0], strerror(savederrno)); } else { knet_h->sockfd[channel].has_error = 1; } } else { _parse_recv_from_sock(knet_h, inlen, channel, onwire_ver, 0); } if (docallback) { knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data, knet_h->sockfd[channel].sockfd[0], channel, KNET_NOTIFY_TX, inlen, savederrno); } } void *_handle_send_to_links_thread(void *data) { knet_handle_t knet_h = (knet_handle_t) data; struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; int i, nev; int flush, flush_queue_limit; int8_t channel; uint8_t onwire_ver; set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_STARTED); memset(&events, 0, sizeof(events)); flush_queue_limit = 0; while (!shutdown_in_progress(knet_h)) { nev = epoll_wait(knet_h->send_to_links_epollfd, events, KNET_EPOLL_MAX_EVENTS + 1, knet_h->threads_timer_res / 1000); flush = get_thread_flush_queue(knet_h, KNET_THREAD_TX); /* * we use timeout to detect if thread is shutting down */ if (nev == 0) { /* * ideally we want to communicate that we are done flushing * the queue when we have an epoll timeout event */ if (flush == KNET_THREAD_QUEUE_FLUSH) { set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSHED); flush_queue_limit = 0; } continue; } /* * fall back in case the TX sockets will continue receive traffic * and we do not hit an epoll timeout. * * allow up to a 100 loops to flush queues, then we give up. * there might be more clean ways to do it by checking the buffer queue * on each socket, but we have tons of sockets and calculations can go wrong. * Also, why would you disable data forwarding and still send packets? */ if (flush == KNET_THREAD_QUEUE_FLUSH) { if (flush_queue_limit >= 100) { log_debug(knet_h, KNET_SUB_TX, "Timeout flushing the TX queue, expect packet loss"); set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSHED); flush_queue_limit = 0; } else { flush_queue_limit++; } } else { flush_queue_limit = 0; } if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) { log_debug(knet_h, KNET_SUB_TX, "Unable to get read lock"); continue; } if (pthread_mutex_lock(&knet_h->onwire_mutex)) { log_debug(knet_h, KNET_SUB_TX, "Unable to get onwire mutex lock"); goto out_unlock; } onwire_ver = knet_h->onwire_ver; pthread_mutex_unlock(&knet_h->onwire_mutex); for (i = 0; i < nev; i++) { for (channel = 0; channel < KNET_DATAFD_MAX; channel++) { if ((knet_h->sockfd[channel].in_use) && (knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created] == events[i].data.fd)) { break; } } if (channel >= KNET_DATAFD_MAX) { log_debug(knet_h, KNET_SUB_TX, "No available channels"); continue; /* channel not found */ } if (pthread_mutex_lock(&knet_h->tx_mutex) != 0) { log_debug(knet_h, KNET_SUB_TX, "Unable to get mutex lock"); continue; } _handle_send_to_links(knet_h, events[i].data.fd, onwire_ver, channel); pthread_mutex_unlock(&knet_h->tx_mutex); } out_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); } set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_STOPPED); return NULL; } int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel) { int savederrno = 0, err = 0; uint8_t onwire_ver; if (!_is_valid_handle(knet_h)) { return -1; } if (buff == NULL) { errno = EINVAL; return -1; } if (buff_len <= 0) { errno = EINVAL; return -1; } if (buff_len > KNET_MAX_PACKET_SIZE) { errno = EINVAL; return -1; } if (channel < 0) { errno = EINVAL; return -1; } if (channel >= KNET_DATAFD_MAX) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_TX, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->dst_host_filter_fn) { savederrno = ENETDOWN; err = -1; goto out; } if (!knet_h->sockfd[channel].in_use) { savederrno = EINVAL; err = -1; goto out; } if (pthread_mutex_lock(&knet_h->onwire_mutex)) { log_debug(knet_h, KNET_SUB_TX, "Unable to get onwire mutex lock"); goto out; } onwire_ver = knet_h->onwire_ver; pthread_mutex_unlock(&knet_h->onwire_mutex); savederrno = pthread_mutex_lock(&knet_h->tx_mutex); if (savederrno) { log_err(knet_h, KNET_SUB_TX, "Unable to get TX mutex lock: %s", strerror(savederrno)); err = -1; goto out; } if (knet_h->onwire_ver_remap) { memmove(get_data_v1(knet_h, knet_h->recv_from_sock_buf), buff, buff_len); } else { switch (onwire_ver) { case 1: memmove(get_data_v1(knet_h, knet_h->recv_from_sock_buf), buff, buff_len); break; default: log_warn(knet_h, KNET_SUB_TX, "preparing sync data onwire version %u not supported", onwire_ver); goto out_tx; break; } } err = _parse_recv_from_sock(knet_h, buff_len, channel, onwire_ver, 1); savederrno = errno; out_tx: pthread_mutex_unlock(&knet_h->tx_mutex); out: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } ssize_t knet_send(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel) { int savederrno = 0; ssize_t err = 0; struct iovec iov_out[1]; if (!_is_valid_handle(knet_h)) { return -1; } if (buff == NULL) { errno = EINVAL; return -1; } if (buff_len <= 0) { errno = EINVAL; return -1; } if (buff_len > KNET_MAX_PACKET_SIZE) { errno = EINVAL; return -1; } if (channel < 0) { errno = EINVAL; return -1; } if (channel >= KNET_DATAFD_MAX) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->sockfd[channel].in_use) { savederrno = EINVAL; err = -1; goto out_unlock; } memset(iov_out, 0, sizeof(iov_out)); iov_out[0].iov_base = (void *)buff; iov_out[0].iov_len = buff_len; err = writev(knet_h->sockfd[channel].sockfd[0], iov_out, 1); savederrno = errno; out_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; }