Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/configure.ac b/configure.ac
index fedb5db0..9bdcda0e 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1,486 +1,488 @@
#
# Copyright (C) 2010-2017 Red Hat, Inc. All rights reserved.
#
# Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
# Federico Simoncelli <fsimon@kronosnet.org>
#
# This software licensed under GPL-2.0+, LGPL-2.0+
#
# -*- Autoconf -*-
# Process this file with autoconf to produce a configure script.
#
AC_PREREQ([2.63])
AC_INIT([kronosnet],
m4_esyscmd([build-aux/git-version-gen .tarball-version]),
[devel@lists.kronosnet.org])
AC_USE_SYSTEM_EXTENSIONS
AM_INIT_AUTOMAKE([1.11.1 dist-bzip2 dist-xz color-tests -Wno-portability subdir-objects])
LT_PREREQ([2.2.6])
LT_INIT
AC_CONFIG_MACRO_DIR([m4])
AC_CONFIG_SRCDIR([kronosnetd/main.c])
AC_CONFIG_HEADERS([config.h])
AC_CANONICAL_HOST
AC_PROG_LIBTOOL
AC_LANG([C])
systemddir=${prefix}/lib/systemd/system
if test "$prefix" = "NONE"; then
prefix="/usr"
if test "$localstatedir" = "\${prefix}/var"; then
localstatedir="/var"
fi
if test "$sysconfdir" = "\${prefix}/etc"; then
sysconfdir="/etc"
fi
if test "$systemddir" = "NONE/lib/systemd/system"; then
systemddir=/lib/systemd/system
fi
if test "$libdir" = "\${exec_prefix}/lib"; then
if test -e /usr/lib64; then
libdir="/usr/lib64"
else
libdir="/usr/lib"
fi
fi
fi
# Checks for programs.
if ! ${MAKE-make} --version /cannot/make/this >/dev/null 2>&1; then
AC_MSG_ERROR(["you don't seem to have GNU make; it is required"])
fi
AC_PROG_AWK
AC_PROG_GREP
AC_PROG_SED
AC_PROG_CPP
AC_PROG_CC
AC_PROG_CC_C99
if test "x$ac_cv_prog_cc_c99" = "xno"; then
AC_MSG_ERROR(["C99 support is required"])
fi
AC_PROG_LN_S
AC_PROG_INSTALL
AC_PROG_MAKE_SET
AC_PROG_CXX
AC_PROG_RANLIB
AC_CHECK_PROGS([PUBLICAN], [publican], [:])
AC_CHECK_PROGS([PKGCONFIG], [pkg-config])
AC_ARG_ENABLE([libknet-sctp],
[ --disable-libknet-sctp : disable libknet SCTP support ],,
[ enable_libknet_sctp="yes" ])
AM_CONDITIONAL([BUILDSCTP], [test x$enable_libknet_sctp = xyes])
AC_ARG_ENABLE([crypto-all],
[ --disable-crypto-all : disable libknet all crypto modules support ],,
[ enable_crypto_all="yes" ])
AC_ARG_ENABLE([crypto-nss],
[ --disable-crypto-nss : disable libknet nss support ],,
[ enable_crypto_nss="$enable_crypto_all" ])
AM_CONDITIONAL([BUILDCRYPTONSS], [test x$enable_crypto_nss = xyes])
AC_ARG_ENABLE([compress-all],
[ --disable-compress-all : disable libknet all compress modules support ],,
[ enable_compress_all="yes" ])
AC_ARG_ENABLE([compress-zlib],
[ --disable-compress-zlib : disable libknet zlib support ],,
[ enable_compress_zlib="$enable_compress_all" ])
AM_CONDITIONAL([BUILDCOMPZLIB], [test x$enable_compress_zlib = xyes])
AC_ARG_ENABLE([compress-lz4],
[ --disable-compress-lz4 : disable libknet lz4 support ],,
[ enable_compress_lz4="$enable_compress_all" ])
AM_CONDITIONAL([BUILDCOMPLZ4], [test x$enable_compress_lz4 = xyes])
AC_ARG_ENABLE([compress-lzo2],
[ --disable-compress-lzo2 : disable libknet lzo2 support ],,
[ enable_compress_lzo2="$enable_compress_all" ])
AM_CONDITIONAL([BUILDCOMPLZO2], [test x$enable_compress_lzo2 = xyes])
AC_ARG_ENABLE([compress-lzma],
[ --disable-compress-lzma : disable libknet lzma support ],,
[ enable_compress_lzma="$enable_compress_all" ])
AM_CONDITIONAL([BUILDCOMPLZMA], [test x$enable_compress_lzma = xyes])
AC_ARG_ENABLE([compress-bzip2],
[ --disable-compress-bzip2 : disable libknet bzip2 support ],,
[ enable_compress_bzip2="$enable_compress_all" ])
AM_CONDITIONAL([BUILDCOMPBZIP2], [test x$enable_compress_bzip2 = xyes])
AC_ARG_ENABLE([poc],
[ --disable-poc : disable building poc code ],,
[ enable_poc="yes" ])
AM_CONDITIONAL([BUILD_POC], [test x$enable_poc = xyes])
AC_ARG_ENABLE([kronosnetd],
[ --enable-kronosnetd : Kronosnetd support ],,
[ enable_kronosnetd="no" ])
AM_CONDITIONAL([BUILD_KRONOSNETD], [test x$enable_kronosnetd = xyes])
AC_ARG_ENABLE([libtap],
[ --enable-libtap : libtap support ],,
[ enable_libtap="no" ])
if test "x$enable_kronosnetd" = xyes; then
enable_libtap=yes
fi
AM_CONDITIONAL([BUILD_LIBTAP], [test x$enable_libtap = xyes])
## local helper functions
# this function checks if CC support options passed as
# args. Global CFLAGS are ignored during this test.
cc_supports_flag() {
saveCPPFLAGS="$CPPFLAGS"
CPPFLAGS="$@"
if echo $CC | grep -q clang; then
CPPFLAGS="-Werror $CPPFLAGS"
fi
AC_MSG_CHECKING([whether $CC supports "$@"])
AC_PREPROC_IFELSE([AC_LANG_PROGRAM([])],
[RC=0; AC_MSG_RESULT([yes])],
[RC=1; AC_MSG_RESULT([no])])
CPPFLAGS="$saveCPPFLAGS"
return $RC
}
# helper macro to check libs without adding them to LIBS
check_lib_no_libs() {
lib_no_libs_arg1=$1
shift
lib_no_libs_arg2=$1
shift
lib_no_libs_args=$@
AC_CHECK_LIB([$lib_no_libs_arg1],
[$lib_no_libs_arg2],,,
[$lib_no_libs_args])
LIBS=$ac_check_lib_save_LIBS
}
# Checks for C features
AC_C_INLINE
# Checks for libraries.
AC_CHECK_LIB([pthread], [pthread_create])
AC_CHECK_LIB([m], [ceil])
AC_CHECK_LIB([rt], [clock_gettime])
+AC_SEARCH_LIBS([dlopen], [dl dld], [AC_SUBST([dl_LIBS], [$ac_res])], [AC_MSG_ERROR([unable to find the dlopen() function])])
+
# workaround pkg-config bootstrapping
PKG_CHECK_MODULES([foobarbaz],[foobarbaz], [AC_MSG_NOTICE([bootstrapping pkg-config])], [AC_MSG_NOTICE([bootstrapping pkg-config])])
# crypto libraries checks
if test "x$enable_crypto_nss" = xyes; then
PKG_CHECK_MODULES([nss],[nss])
AC_DEFINE_UNQUOTED([BUILDCRYPTONSS], [1], [Enable nss crypto])
fi
# compress libraries checks
if test "x$enable_compress_zlib" = xyes; then
PKG_CHECK_MODULES([zlib], [zlib])
AC_DEFINE_UNQUOTED([BUILDCOMPZLIB], [1], [Enable zlib compression])
fi
if test "x$enable_compress_lz4" = xyes; then
PKG_CHECK_MODULES([liblz4], [liblz4])
AC_DEFINE_UNQUOTED([BUILDCOMPLZ4], [1], [Enable lz4 compress])
fi
if test "x$enable_compress_lzo2" = xyes; then
PKG_CHECK_MODULES([lzo2], [lzo2],,
[AC_CHECK_HEADERS([lzo/lzo1x.h],
[AC_CHECK_LIB([lzo2], [lzo1x_decompress_safe],
[AC_SUBST([lzo2_LIBS], [-llzo2])])],
[AC_MSG_ERROR(["missing required lzo/lzo1x.h header"])])])
AC_DEFINE_UNQUOTED([BUILDCOMPLZO2], [1], [Enable lzo2 compress])
fi
if test "x$enable_compress_lzma" = xyes; then
PKG_CHECK_MODULES([liblzma], [liblzma])
AC_DEFINE_UNQUOTED([BUILDCOMPLZMA], [1], [Enable lzma compress])
fi
if test "x$enable_compress_bzip2" = xyes; then
PKG_CHECK_MODULES([bzip2], [bzip2],,
[AC_CHECK_HEADERS([bzlib.h],
[AC_CHECK_LIB([bz2], [BZ2_bzBuffToBuffCompress],
[AC_SUBST([bzip2_LIBS], [-lbz2])])],
[AC_MSG_ERROR(["missing required bzlib.h"])])])
AC_DEFINE_UNQUOTED([BUILDCOMPBZIP2], [1], [Enable bzip2 compress])
fi
# Checks for header files.
AC_CHECK_HEADERS([fcntl.h])
AC_CHECK_HEADERS([stdlib.h])
AC_CHECK_HEADERS([string.h])
AC_CHECK_HEADERS([strings.h])
AC_CHECK_HEADERS([sys/ioctl.h])
AC_CHECK_HEADERS([syslog.h])
AC_CHECK_HEADERS([unistd.h])
AC_CHECK_HEADERS([netinet/in.h])
AC_CHECK_HEADERS([sys/socket.h])
AC_CHECK_HEADERS([arpa/inet.h])
AC_CHECK_HEADERS([netdb.h])
AC_CHECK_HEADERS([limits.h])
AC_CHECK_HEADERS([stdint.h])
AC_CHECK_HEADERS([sys/epoll.h])
if test "x$enable_libknet_sctp" = xyes; then
AC_CHECK_HEADERS([netinet/sctp.h],, [AC_MSG_ERROR(["missing required SCTP headers"])])
fi
# Checks for typedefs, structures, and compiler characteristics.
AC_C_INLINE
AC_TYPE_SIZE_T
AC_TYPE_PID_T
AC_TYPE_SSIZE_T
AC_TYPE_UINT8_T
AC_TYPE_UINT16_T
AC_TYPE_UINT32_T
AC_TYPE_UINT64_T
AC_TYPE_INT32_T
# Checks for library functions.
AC_FUNC_ALLOCA
AC_FUNC_FORK
AC_FUNC_MALLOC
AC_FUNC_REALLOC
AC_CHECK_FUNCS([memset])
AC_CHECK_FUNCS([strdup])
AC_CHECK_FUNCS([strerror])
AC_CHECK_FUNCS([dup2])
AC_CHECK_FUNCS([select])
AC_CHECK_FUNCS([socket])
AC_CHECK_FUNCS([inet_ntoa])
AC_CHECK_FUNCS([memmove])
AC_CHECK_FUNCS([strchr])
AC_CHECK_FUNCS([atexit])
AC_CHECK_FUNCS([ftruncate])
AC_CHECK_FUNCS([strrchr])
AC_CHECK_FUNCS([strstr])
AC_CHECK_FUNCS([clock_gettime])
AC_CHECK_FUNCS([strcasecmp])
AC_CHECK_FUNCS([kevent])
# if neither sys/epoll.h nor kevent are present, we should fail.
if test "x$ac_cv_header_sys_epoll_h" = xno && test "x$ac_cv_func_kevent" = xno; then
AC_MSG_ERROR([Both epoll and kevent unavailable on this OS])
fi
if test "x$ac_cv_header_sys_epoll_h" = xyes && test "x$ac_cv_func_kevent" = xyes; then
AC_MSG_ERROR([Both epoll and kevent available on this OS, please contact the maintainers to fix the code])
fi
# checks (for kronosnetd)
if test "x$enable_kronosnetd" = xyes; then
AC_CHECK_HEADERS([security/pam_appl.h],
[AC_CHECK_LIB([pam], [pam_start])],
[AC_MSG_ERROR([Unable to find LinuxPAM devel files])])
AC_CHECK_HEADERS([security/pam_misc.h],
[AC_CHECK_LIB([pam_misc], [misc_conv])],
[AC_MSG_ERROR([Unable to find LinuxPAM MISC devel files])])
PKG_CHECK_MODULES([libqb], [libqb])
AC_CHECK_LIB([qb], [qb_log_thread_priority_set],
[have_qb_log_thread_priority_set="yes"],
[have_qb_log_thread_priority_set="no"])
if test "x${have_qb_log_thread_priority_set}" = xyes; then
AC_DEFINE_UNQUOTED([HAVE_QB_LOG_THREAD_PRIORITY_SET], [1], [have qb_log_thread_priority_set])
fi
fi
# local options
AC_ARG_ENABLE([debug],
[ --enable-debug enable debug build. ],
[ default="no" ])
AC_ARG_ENABLE([publicandocs],
[ --enable-publicandocs enable docs build. ],
[ default="no" ])
AC_ARG_WITH([initdefaultdir],
[ --with-initdefaultdir : path to /etc/sysconfig/.. or /etc/default dir. ],
[ INITDEFAULTDIR="$withval" ],
[ INITDEFAULTDIR="$sysconfdir/default" ])
AC_ARG_WITH([initddir],
[ --with-initddir=DIR : path to init script directory. ],
[ INITDDIR="$withval" ],
[ INITDDIR="$sysconfdir/init.d" ])
AC_ARG_WITH([systemddir],
[ --with-systemddir=DIR : path to systemd unit files directory. ],
[ SYSTEMDDIR="$withval" ],
[ SYSTEMDDIR="$systemddir" ])
AC_ARG_WITH([syslogfacility],
[ --with-syslogfacility=FACILITY
default syslog facility. ],
[ SYSLOGFACILITY="$withval" ],
[ SYSLOGFACILITY="LOG_DAEMON" ])
AC_ARG_WITH([sysloglevel],
[ --with-sysloglevel=LEVEL
default syslog level. ],
[ SYSLOGLEVEL="$withval" ],
[ SYSLOGLEVEL="LOG_INFO" ])
AC_ARG_WITH([defaultadmgroup],
[ --with-defaultadmgroup=GROUP
define PAM group. Users part of this group will be
allowed to configure kronosnet. Others will only
receive read-only rights. ],
[ DEFAULTADMGROUP="$withval" ],
[ DEFAULTADMGROUP="kronosnetadm" ])
## random vars
LOGDIR=${localstatedir}/log/
RUNDIR=${localstatedir}/run/
DEFAULT_CONFIG_DIR=${sysconfdir}/kronosnet
## do subst
AM_CONDITIONAL([BUILD_DOCS], [test "x${enable_publicandocs}" = xyes])
AM_CONDITIONAL([DEBUG], [test "x${enable_debug}" = xyes])
AC_SUBST([DEFAULT_CONFIG_DIR])
AC_SUBST([INITDEFAULTDIR])
AC_SUBST([INITDDIR])
AC_SUBST([SYSTEMDDIR])
AC_SUBST([LOGDIR])
AC_SUBST([DEFAULTADMGROUP])
AC_DEFINE_UNQUOTED([DEFAULT_CONFIG_DIR],
["$(eval echo ${DEFAULT_CONFIG_DIR})"],
[Default config directory])
AC_DEFINE_UNQUOTED([DEFAULT_CONFIG_FILE],
["$(eval echo ${DEFAULT_CONFIG_DIR}/kronosnetd.conf)"],
[Default config file])
AC_DEFINE_UNQUOTED([LOGDIR],
["$(eval echo ${LOGDIR})"],
[Default logging directory])
AC_DEFINE_UNQUOTED([DEFAULT_LOG_FILE],
["$(eval echo ${LOGDIR}/kronosnetd.log)"],
[Default log file])
AC_DEFINE_UNQUOTED([RUNDIR],
["$(eval echo ${RUNDIR})"],
[Default run directory])
AC_DEFINE_UNQUOTED([SYSLOGFACILITY],
[$(eval echo ${SYSLOGFACILITY})],
[Default syslog facility])
AC_DEFINE_UNQUOTED([SYSLOGLEVEL],
[$(eval echo ${SYSLOGLEVEL})],
[Default syslog level])
AC_DEFINE_UNQUOTED([DEFAULTADMGROUP],
["$(eval echo ${DEFAULTADMGROUP})"],
[Default admin group])
## *FLAGS handling
ENV_CFLAGS="$CFLAGS"
ENV_CPPFLAGS="$CPPFLAGS"
ENV_LDFLAGS="$LDFLAGS"
# debug build stuff
if test "x${enable_debug}" = xyes; then
AC_DEFINE_UNQUOTED([DEBUG], [1], [Compiling Debugging code])
OPT_CFLAGS="-O0"
else
OPT_CFLAGS="-O3"
fi
# gdb flags
if test "x${GCC}" = xyes; then
GDB_FLAGS="-ggdb3"
else
GDB_FLAGS="-g"
fi
# extra warnings
EXTRA_WARNINGS=""
WARNLIST="
all
extra
unused
shadow
missing-prototypes
missing-declarations
suggest-attribute=noreturn
suggest-attribute=format
strict-prototypes
declaration-after-statement
pointer-arith
write-strings
cast-align
bad-function-cast
missing-format-attribute
float-equal
format=2
format-signedness
format-security
format-nonliteral
no-long-long
unsigned-char
gnu89-inline
no-strict-aliasing
error
address
cpp
overflow
parentheses
sequence-point
switch
shift-overflow=2
overlength-strings
retundent-decls
init-self
uninitialized
unused-but-set-variable
unused-function
unused-result
unused-value
unused-variable
unknown-pragmas
no-unused-parameter
"
for j in $WARNLIST; do
if cc_supports_flag -W$j; then
EXTRA_WARNINGS="$EXTRA_WARNINGS -W$j";
fi
done
CFLAGS="$ENV_CFLAGS $lt_prog_compiler_pic $OPT_CFLAGS $GDB_FLAGS \
$EXTRA_WARNINGS $WERROR_CFLAGS"
CPPFLAGS="$ENV_CPPFLAGS"
LDFLAGS="$ENV_LDFLAGS $lt_prog_compiler_pic -Wl,--as-needed"
AC_CONFIG_FILES([
Makefile
init/Makefile
libtap/Makefile
libtap/libtap.pc
kronosnetd/Makefile
kronosnetd/kronosnetd.logrotate
libknet/Makefile
libknet/libknet.pc
libknet/tests/Makefile
docs/Makefile
poc-code/Makefile
poc-code/iov-hash/Makefile
poc-code/access-list/Makefile
])
AC_OUTPUT
diff --git a/libknet/Makefile.am b/libknet/Makefile.am
index bd0ead5a..334b8818 100644
--- a/libknet/Makefile.am
+++ b/libknet/Makefile.am
@@ -1,98 +1,98 @@
#
# Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved.
#
# Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
# Federico Simoncelli <fsimon@kronosnet.org>
#
# This software licensed under GPL-2.0+, LGPL-2.0+
#
MAINTAINERCLEANFILES = Makefile.in
include $(top_srcdir)/build-aux/check.mk
SYMFILE = libknet_exported_syms
EXTRA_DIST = $(SYMFILE)
SUBDIRS = . tests
libversion = 0:0:0
# override global LIBS that pulls in lots of craft we don't need here
LIBS =
sources = \
common.c \
compat.c \
compress.c \
compress_zlib.c \
compress_lz4.c \
compress_lzo2.c \
compress_lzma.c \
compress_bzip2.c \
crypto.c \
crypto_nss.c \
handle.c \
host.c \
link.c \
logging.c \
netutils.c \
threads_common.c \
threads_dsthandler.c \
threads_heartbeat.c \
threads_pmtud.c \
threads_rx.c \
threads_tx.c \
transport_udp.c \
transport_sctp.c \
transport_loopback.c \
transport_common.c
include_HEADERS = libknet.h
pkgconfigdir = $(libdir)/pkgconfig
pkgconfig_DATA = libknet.pc
noinst_HEADERS = \
common.h \
compat.h \
compress.h \
compress_zlib.h \
compress_lz4.h \
compress_lzo2.h \
compress_lzma.h \
compress_bzip2.h \
crypto.h \
crypto_nss.h \
host.h \
internals.h \
link.h \
logging.h \
netutils.h \
onwire.h \
threads_common.h \
threads_dsthandler.h \
threads_heartbeat.h \
threads_pmtud.h \
threads_rx.h \
threads_tx.h \
transports.h
lib_LTLIBRARIES = libknet.la
libknet_la_SOURCES = $(sources)
libknet_la_CFLAGS = $(nss_CFLAGS) \
$(zlib_CFLAGS) $(liblz4_CFLAGS) $(lzo2_CFLAGS) $(liblzma_CFLAGS) $(bzip2_CFLAGS)
EXTRA_libknet_la_DEPENDENCIES = $(SYMFILE)
libknet_la_LDFLAGS = -Wl,--version-script=$(srcdir)/$(SYMFILE) \
--export-dynamic \
-version-number $(libversion)
-libknet_la_LIBADD = $(nss_LIBS) \
- $(zlib_LIBS) $(liblz4_LIBS) $(lzo2_LIBS) $(liblzma_LIBS) $(bzip2_LIBS) \
+libknet_la_LIBADD = $(dl_LIBS) \
+ $(nss_LIBS) $(zlib_LIBS) $(liblz4_LIBS) $(liblzma_LIBS) $(bzip2_LIBS) \
-lrt -lpthread -lm
diff --git a/libknet/compress.c b/libknet/compress.c
index 0250ad92..90070012 100644
--- a/libknet/compress.c
+++ b/libknet/compress.c
@@ -1,337 +1,336 @@
/*
* Copyright (C) 2010-2017 Red Hat, Inc. All rights reserved.
*
* Author: Fabio M. Di Nitto <fabbione@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include "internals.h"
#include "compress.h"
#include "logging.h"
#ifdef BUILDCOMPZLIB
#include "compress_zlib.h"
#endif
#ifdef BUILDCOMPLZ4
#include "compress_lz4.h"
#endif
#ifdef BUILDCOMPLZO2
#include "compress_lzo2.h"
#endif
#ifdef BUILDCOMPLZMA
#include "compress_lzma.h"
#endif
#ifdef BUILDCOMPBZIP2
#include "compress_bzip2.h"
#endif
/*
* internal module switch data
*/
/*
* DO NOT CHANGE MODEL_ID HERE OR ONWIRE COMPATIBILITY
* WILL BREAK!
*
* always add before the last NULL/NULL/NULL.
*/
compress_model_t compress_modules_cmds[] = {
{ 0, 1, 0, "none", NULL, NULL, NULL, NULL, NULL },
#ifdef BUILDCOMPZLIB
{ 1, 1, 0, "zlib", NULL, NULL, zlib_val_level, zlib_compress, zlib_decompress },
#else
{ 1, 0, 0, "zlib", NULL, NULL, NULL, NULL, NULL },
#endif
#ifdef BUILDCOMPLZ4
{ 2, 1, 0, "lz4", NULL, NULL, lz4_val_level, lz4_compress, lz4_decompress },
{ 3, 1, 0, "lz4hc", NULL, NULL, lz4hc_val_level, lz4hc_compress, lz4_decompress },
#else
{ 2, 0, 0, "lz4", NULL, NULL, NULL, NULL, NULL },
{ 3, 0, 0, "lz4hc", NULL, NULL, NULL, NULL, NULL },
#endif
#ifdef BUILDCOMPLZO2
{ 4, 1, 0, "lzo2", lzo2_init, lzo2_fini, lzo2_val_level, lzo2_compress, lzo2_decompress },
#else
{ 4, 0, 0, "lzo2", NULL, NULL, NULL, NULL, NULL },
#endif
#ifdef BUILDCOMPLZMA
{ 5, 1, 0, "lzma", NULL, NULL, lzma_val_level, lzma_compress, lzma_decompress },
#else
{ 5, 0, 0, "lzma", NULL, NULL, NULL, NULL, NULL },
#endif
#ifdef BUILDCOMPBZIP2
{ 6, 1, 0, "bzip2", NULL, NULL, bzip2_val_level, bzip2_compress, bzip2_decompress },
#else
{ 6, 0, 0, "bzip2", NULL, NULL, NULL, NULL, NULL },
#endif
{ 255, 0, 0, NULL, NULL, NULL, NULL, NULL, NULL },
};
static int get_model(const char *model)
{
int idx = 0;
while (compress_modules_cmds[idx].model_name != NULL) {
if (!strcmp(compress_modules_cmds[idx].model_name, model)) {
return compress_modules_cmds[idx].model_id;
}
idx++;
}
return -1;
}
static int get_max_model(void)
{
int idx = 0;
while (compress_modules_cmds[idx].model_name != NULL) {
idx++;
}
return idx - 1;
}
static int is_valid_model(int compress_model)
{
int idx = 0;
while (compress_modules_cmds[idx].model_name != NULL) {
if ((compress_model == compress_modules_cmds[idx].model_id) &&
(compress_modules_cmds[idx].built_in == 1)) {
return 0;
}
idx++;
}
return -1;
}
static int val_level(
knet_handle_t knet_h,
int compress_model,
int compress_level)
{
return compress_modules_cmds[compress_model].val_level(knet_h, compress_level);
}
static int check_init_lib(knet_handle_t knet_h, int cmp_model)
{
int savederrno = 0;
savederrno = pthread_rwlock_rdlock(&shlib_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
/*
* if the module is already loaded, we will return
* and keep the lock to avoid any race condition
* on other threads potentially unloading or reloading
*/
if (compress_modules_cmds[cmp_model].loaded == 1) {
return 0;
}
/*
* need to switch to write lock, load the lib, and return with a write lock
*/
pthread_rwlock_unlock(&shlib_rwlock);
savederrno = pthread_rwlock_wrlock(&shlib_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
/*
* check again after changing locking context
*/
if (compress_modules_cmds[cmp_model].loaded == 0) {
if (compress_modules_cmds[cmp_model].init != NULL) {
if (compress_modules_cmds[cmp_model].init(knet_h, cmp_model) < 0) {
pthread_rwlock_unlock(&shlib_rwlock);
return -1;
}
}
compress_modules_cmds[cmp_model].loaded = 1;
}
return 0;
}
int compress_init(
knet_handle_t knet_h)
{
if (get_max_model() > KNET_MAX_COMPRESS_METHODS) {
log_err(knet_h, KNET_SUB_COMPRESS, "Too many compress methods defined in compress.c.");
errno = EINVAL;
return -1;
}
return 0;
}
int compress_cfg(
knet_handle_t knet_h,
struct knet_handle_compress_cfg *knet_handle_compress_cfg)
{
int savederrno = 0, err = 0;
int cmp_model;
log_debug(knet_h, KNET_SUB_COMPRESS,
"Initizializing compress module [%s/%d/%u]",
knet_handle_compress_cfg->compress_model, knet_handle_compress_cfg->compress_level, knet_handle_compress_cfg->compress_threshold);
cmp_model = get_model(knet_handle_compress_cfg->compress_model);
if (cmp_model < 0) {
log_err(knet_h, KNET_SUB_COMPRESS, "compress model %s not supported", knet_handle_compress_cfg->compress_model);
errno = EINVAL;
return -1;
}
if (cmp_model > 0) {
if (compress_modules_cmds[cmp_model].built_in == 0) {
log_err(knet_h, KNET_SUB_COMPRESS, "compress model %s support has not been built in. Please contact your vendor or fix the build", knet_handle_compress_cfg->compress_model);
savederrno = EINVAL;
err = -1;
goto out;
}
if (check_init_lib(knet_h, cmp_model) < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to load/init shared lib for model %s: %s",
knet_handle_compress_cfg->compress_model, strerror(errno));
err = -1;
goto out_unlock;
}
if (val_level(knet_h, cmp_model, knet_handle_compress_cfg->compress_level) < 0) {
log_err(knet_h, KNET_SUB_COMPRESS, "compress level %d not supported for model %s",
knet_handle_compress_cfg->compress_level, knet_handle_compress_cfg->compress_model);
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
if (knet_handle_compress_cfg->compress_threshold > KNET_MAX_PACKET_SIZE) {
log_err(knet_h, KNET_SUB_COMPRESS, "compress threshold cannot be higher than KNET_MAX_PACKET_SIZE (%d).",
KNET_MAX_PACKET_SIZE);
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
if (knet_handle_compress_cfg->compress_threshold == 0) {
knet_h->compress_threshold = KNET_COMPRESS_THRESHOLD;
log_debug(knet_h, KNET_SUB_COMPRESS, "resetting compression threshold to default (%d)", KNET_COMPRESS_THRESHOLD);
} else {
knet_h->compress_threshold = knet_handle_compress_cfg->compress_threshold;
}
out_unlock:
pthread_rwlock_unlock(&shlib_rwlock);
}
out:
if (!err) {
knet_h->compress_model = cmp_model;
knet_h->compress_level = knet_handle_compress_cfg->compress_level;
}
errno = savederrno;
return err;
}
void compress_fini(
- knet_handle_t knet_h,
- int knet_ref)
+ knet_handle_t knet_h)
{
int savederrno;
int idx = 0;
savederrno = pthread_rwlock_wrlock(&shlib_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to get write lock: %s",
strerror(savederrno));
return;
}
while (compress_modules_cmds[idx].model_name != NULL) {
if ((compress_modules_cmds[idx].built_in == 1) &&
(compress_modules_cmds[idx].loaded == 1) &&
(idx < KNET_MAX_COMPRESS_METHODS)) {
if (compress_modules_cmds[idx].fini != NULL) {
- compress_modules_cmds[idx].fini(knet_h, idx, knet_ref);
+ compress_modules_cmds[idx].fini(knet_h, idx);
}
compress_modules_cmds[idx].loaded = 0;
}
idx++;
}
pthread_rwlock_unlock(&shlib_rwlock);
return;
}
int compress(
knet_handle_t knet_h,
const unsigned char *buf_in,
const ssize_t buf_in_len,
unsigned char *buf_out,
ssize_t *buf_out_len)
{
int savederrno = 0, err = 0;
if (check_init_lib(knet_h, knet_h->compress_model) < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to load/init shared lib (compress) for model %s: %s",
compress_modules_cmds[knet_h->compress_model].model_name, strerror(savederrno));
return -1;
}
err = compress_modules_cmds[knet_h->compress_model].compress(knet_h, buf_in, buf_in_len, buf_out, buf_out_len);
savederrno = errno;
pthread_rwlock_unlock(&shlib_rwlock);
errno = savederrno;
return err;
}
int decompress(
knet_handle_t knet_h,
int compress_model,
const unsigned char *buf_in,
const ssize_t buf_in_len,
unsigned char *buf_out,
ssize_t *buf_out_len)
{
int savederrno = 0, err = 0;
if (is_valid_model(compress_model) < 0) {
log_err(knet_h, KNET_SUB_COMPRESS, "This build of libknet does not support %s compression. Please contact your distribution vendor or fix the build", compress_modules_cmds[compress_model].model_name);
errno = EINVAL;
return -1;
}
if (check_init_lib(knet_h, compress_model) < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to load/init shared lib (decompress) for model %s: %s",
compress_modules_cmds[compress_model].model_name, strerror(savederrno));
return -1;
}
err = compress_modules_cmds[compress_model].decompress(knet_h, buf_in, buf_in_len, buf_out, buf_out_len);
savederrno = errno;
pthread_rwlock_unlock(&shlib_rwlock);
errno = savederrno;
return err;
}
diff --git a/libknet/compress.h b/libknet/compress.h
index 9d53f54b..f9fe7c44 100644
--- a/libknet/compress.h
+++ b/libknet/compress.h
@@ -1,61 +1,60 @@
/*
* Copyright (C) 2010-2017 Red Hat, Inc. All rights reserved.
*
* Author: Fabio M. Di Nitto <fabbione@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#ifndef __KNET_COMPRESS_H__
#define __KNET_COMPRESS_H__
#include "internals.h"
typedef struct {
uint8_t model_id;
uint8_t built_in;
uint8_t loaded;
const char *model_name;
int (*init) (knet_handle_t knet_h, int method_idx);
- void (*fini) (knet_handle_t knet_h, int method_idx, int knet_ref);
+ void (*fini) (knet_handle_t knet_h, int method_idx);
int (*val_level)(knet_handle_t knet_h,
int compress_level);
int (*compress) (knet_handle_t knet_h,
const unsigned char *buf_in,
const ssize_t buf_in_len,
unsigned char *buf_out,
ssize_t *buf_out_len);
int (*decompress)(knet_handle_t knet_h,
const unsigned char *buf_in,
const ssize_t buf_in_len,
unsigned char *buf_out,
ssize_t *buf_out_len);
} compress_model_t;
int compress_cfg(
knet_handle_t knet_h,
struct knet_handle_compress_cfg *knet_handle_compress_cfg);
int compress_init(
knet_handle_t knet_h);
void compress_fini(
- knet_handle_t knet_h,
- int knet_ref);
+ knet_handle_t knet_h);
int compress(
knet_handle_t knet_h,
const unsigned char *buf_in,
const ssize_t buf_in_len,
unsigned char *buf_out,
ssize_t *buf_out_len);
int decompress(
knet_handle_t knet_h,
int compress_model,
const unsigned char *buf_in,
const ssize_t buf_in_len,
unsigned char *buf_out,
ssize_t *buf_out_len);
#endif
diff --git a/libknet/compress_lzo2.c b/libknet/compress_lzo2.c
index e163a19e..19048e9c 100644
--- a/libknet/compress_lzo2.c
+++ b/libknet/compress_lzo2.c
@@ -1,155 +1,305 @@
/*
* Copyright (C) 2010-2017 Red Hat, Inc. All rights reserved.
*
* Author: Fabio M. Di Nitto <fabbione@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <stdlib.h>
#include <string.h>
#include <errno.h>
+#include <dlfcn.h>
#ifdef BUILDCOMPLZO2
#include <lzo/lzo1x.h>
#include "internals.h"
#include "compress_lzo2.h"
#include "logging.h"
+/*
+ * global vars for dlopen
+ */
+static int lzo2_libref = 0;
+
+/*
+ * symbols remapping
+ */
+int (*_int_lzo1x_decompress)(const lzo_bytep src, lzo_uint src_len,
+ lzo_bytep dst, lzo_uintp dst_len,
+ lzo_voidp wrkmem /* NOT USED */ );
+int (*_int_lzo1x_1_compress)(const lzo_bytep src, lzo_uint src_len,
+ lzo_bytep dst, lzo_uintp dst_len,
+ lzo_voidp wrkmem);
+int (*_int_lzo1x_1_11_compress)(const lzo_bytep src, lzo_uint src_len,
+ lzo_bytep dst, lzo_uintp dst_len,
+ lzo_voidp wrkmem);
+int (*_int_lzo1x_1_12_compress)(const lzo_bytep src, lzo_uint src_len,
+ lzo_bytep dst, lzo_uintp dst_len,
+ lzo_voidp wrkmem);
+int (*_int_lzo1x_1_15_compress)(const lzo_bytep src, lzo_uint src_len,
+ lzo_bytep dst, lzo_uintp dst_len,
+ lzo_voidp wrkmem);
+
+int (*_int_lzo1x_999_compress)(const lzo_bytep src, lzo_uint src_len,
+ lzo_bytep dst, lzo_uintp dst_len,
+ lzo_voidp wrkmem);
+
+/*
+ * per handle dlopen + internal init
+ */
+struct lzo2_data {
+ void *lib;
+ void *workmem;
+};
+
+static int lzo2_remap_symbols(knet_handle_t knet_h, void *lib)
+{
+ int err = 0;
+ char *error = NULL;
+
+ _int_lzo1x_decompress = dlsym(lib, "lzo1x_decompress");
+ if (!_int_lzo1x_decompress) {
+ error = dlerror();
+ log_err(knet_h, KNET_SUB_LZO2COMP, "unable to map lzo1x_decompress: %s", error);
+ err = -1;
+ goto out;
+ }
+
+ _int_lzo1x_1_compress = dlsym(lib, "lzo1x_1_compress");
+ if (!_int_lzo1x_1_compress) {
+ error = dlerror();
+ log_err(knet_h, KNET_SUB_LZO2COMP, "unable to map lzo1x_1_compress: %s", error);
+ err = -1;
+ goto out;
+ }
+
+ _int_lzo1x_1_11_compress = dlsym(lib, "lzo1x_1_11_compress");
+ if (!_int_lzo1x_1_11_compress) {
+ error = dlerror();
+ log_err(knet_h, KNET_SUB_LZO2COMP, "unable to map lzo1x_1_11_compress: %s", error);
+ err = -1;
+ goto out;
+ }
+
+ _int_lzo1x_1_12_compress = dlsym(lib, "lzo1x_1_12_compress");
+ if (!_int_lzo1x_1_12_compress) {
+ error = dlerror();
+ log_err(knet_h, KNET_SUB_LZO2COMP, "unable to map lzo1x_1_12_compress: %s", error);
+ err = -1;
+ goto out;
+ }
+
+ _int_lzo1x_1_15_compress = dlsym(lib, "lzo1x_1_15_compress");
+ if (!_int_lzo1x_1_15_compress) {
+ error = dlerror();
+ log_err(knet_h, KNET_SUB_LZO2COMP, "unable to map lzo1x_1_15_compress: %s", error);
+ err = -1;
+ goto out;
+ }
+
+ _int_lzo1x_999_compress = dlsym(lib, "lzo1x_999_compress");
+ if (!_int_lzo1x_999_compress) {
+ error = dlerror();
+ log_err(knet_h, KNET_SUB_LZO2COMP, "unable to map lzo1x_999_compress: %s", error);
+ err = -1;
+ goto out;
+ }
+
+out:
+ if (err) {
+ _int_lzo1x_decompress = NULL;
+ _int_lzo1x_1_compress = NULL;
+ _int_lzo1x_1_11_compress = NULL;
+ _int_lzo1x_1_12_compress = NULL;
+ _int_lzo1x_1_15_compress = NULL;
+ _int_lzo1x_999_compress = NULL;
+ errno = EINVAL;
+ }
+ return err;
+}
+
+void lzo2_fini(
+ knet_handle_t knet_h,
+ int method_idx)
+{
+ struct lzo2_data *lzo2_data = knet_h->compress_int_data[method_idx];
+
+ if (lzo2_data) {
+ lzo2_libref--;
+ if (lzo2_data->workmem) {
+ free(lzo2_data->workmem);
+ lzo2_data->workmem = NULL;
+ }
+ if ((lzo2_libref == 0) && (lzo2_data->lib)) {
+ dlclose(lzo2_data->lib);
+ }
+ lzo2_data->lib = NULL;
+ lzo2_data = NULL;
+ if (knet_h->compress_int_data[method_idx]) {
+ free(knet_h->compress_int_data[method_idx]);
+ knet_h->compress_int_data[method_idx] = NULL;
+ }
+ }
+ return;
+}
+
int lzo2_init(
knet_handle_t knet_h,
int method_idx)
{
- if (lzo_init() != LZO_E_OK) {
- log_err(knet_h, KNET_SUB_LZO2COMP, "lzo2 unable to initialize library");
- errno = EPROTO;
+ int err = 0, savederrno = 0;
+ struct lzo2_data *lzo2_data;
+ char *error = NULL;
+
+ knet_h->compress_int_data[method_idx] = malloc(sizeof(struct lzo2_data));
+ if (!knet_h->compress_int_data[method_idx]) {
+ errno = ENOMEM;
return -1;
}
+ lzo2_data = knet_h->compress_int_data[method_idx];
+ memset(lzo2_data, 0, sizeof(struct lzo2_data));
+
+ /*
+ * clear any pending error
+ */
+ dlerror();
+
+ lzo2_data->lib = dlopen("liblzo2.so.2", RTLD_LAZY | RTLD_GLOBAL);
+ error = dlerror();
+ if (error != NULL) {
+ log_err(knet_h, KNET_SUB_LZO2COMP, "unable to dlopen liblzo2.so.2: %s", error);
+ savederrno = EAGAIN;
+ err = -1;
+ goto out;
+ }
+
+ if (lzo2_remap_symbols(knet_h, lzo2_data->lib) > 0) {
+ savederrno = errno;
+ err = -1;
+ goto out;
+ }
+
/*
* LZO1X_999_MEM_COMPRESS is the highest amount of memory lzo2 can use
*/
- knet_h->compress_int_data[method_idx] = malloc(LZO1X_999_MEM_COMPRESS);
+ lzo2_data->workmem = malloc(LZO1X_999_MEM_COMPRESS);
- if (!knet_h->compress_int_data[method_idx]) {
+ if (!lzo2_data->workmem) {
log_err(knet_h, KNET_SUB_LZO2COMP, "lzo2 unable to allocate work memory");
- errno = ENOMEM;
- return -1;
+ savederrno = ENOMEM;
+ err = -1;
+ goto out;
}
- memset(knet_h->compress_int_data[method_idx], 0, LZO1X_999_MEM_COMPRESS);
-
- return 0;
-}
+ memset(lzo2_data->workmem, 0, LZO1X_999_MEM_COMPRESS);
-void lzo2_fini(
- knet_handle_t knet_h,
- int method_idx,
- int knet_ref)
-{
- if (knet_h->compress_int_data[method_idx]) {
- free(knet_h->compress_int_data[method_idx]);
- knet_h->compress_int_data[method_idx] = NULL;
+out:
+ lzo2_libref++;
+ if (err) {
+ lzo2_fini(knet_h, method_idx);
}
- return;
+ errno = savederrno;
+ return err;
}
int lzo2_val_level(
knet_handle_t knet_h,
int compress_level)
{
switch(compress_level) {
case 1:
log_debug(knet_h, KNET_SUB_LZO2COMP, "lzo2 will use lzo1x_1_compress internal compress method");
break;
case 11:
log_debug(knet_h, KNET_SUB_LZO2COMP, "lzo2 will use lzo1x_1_11_compress internal compress method");
break;
case 12:
log_debug(knet_h, KNET_SUB_LZO2COMP, "lzo2 will use lzo1x_1_12_compress internal compress method");
break;
case 15:
log_debug(knet_h, KNET_SUB_LZO2COMP, "lzo2 will use lzo1x_1_15_compress internal compress method");
break;
case 999:
log_debug(knet_h, KNET_SUB_LZO2COMP, "lzo2 will use lzo1x_999_compress internal compress method");
break;
default:
log_warn(knet_h, KNET_SUB_LZO2COMP, "Unknown lzo2 internal compress method. lzo1x_1_compress will be used as default fallback");
break;
}
return 0;
}
int lzo2_compress(
knet_handle_t knet_h,
const unsigned char *buf_in,
const ssize_t buf_in_len,
unsigned char *buf_out,
ssize_t *buf_out_len)
{
- int lzerr = 0, err = 0;
- int savederrno = 0;
+ int savederrno = 0, lzerr = 0, err = 0;
+ struct lzo2_data *lzo2_data = knet_h->compress_int_data[knet_h->compress_model];
lzo_uint cmp_len;
switch(knet_h->compress_level) {
case 1:
- lzerr = lzo1x_1_compress(buf_in, buf_in_len, buf_out, &cmp_len, knet_h->compress_int_data[knet_h->compress_model]);
+ lzerr = (*_int_lzo1x_1_compress)(buf_in, buf_in_len, buf_out, &cmp_len, lzo2_data->workmem);
break;
case 11:
- lzerr = lzo1x_1_11_compress(buf_in, buf_in_len, buf_out, &cmp_len, knet_h->compress_int_data[knet_h->compress_model]);
+ lzerr = (*_int_lzo1x_1_11_compress)(buf_in, buf_in_len, buf_out, &cmp_len, lzo2_data->workmem);
break;
case 12:
- lzerr = lzo1x_1_12_compress(buf_in, buf_in_len, buf_out, &cmp_len, knet_h->compress_int_data[knet_h->compress_model]);
+ lzerr = (*_int_lzo1x_1_12_compress)(buf_in, buf_in_len, buf_out, &cmp_len, lzo2_data->workmem);
break;
case 15:
- lzerr = lzo1x_1_15_compress(buf_in, buf_in_len, buf_out, &cmp_len, knet_h->compress_int_data[knet_h->compress_model]);
+ lzerr = (*_int_lzo1x_1_15_compress)(buf_in, buf_in_len, buf_out, &cmp_len, lzo2_data->workmem);
break;
case 999:
- lzerr = lzo1x_999_compress(buf_in, buf_in_len, buf_out, &cmp_len, knet_h->compress_int_data[knet_h->compress_model]);
+ lzerr = (*_int_lzo1x_999_compress)(buf_in, buf_in_len, buf_out, &cmp_len, lzo2_data->workmem);
break;
default:
- lzerr = lzo1x_1_compress(buf_in, buf_in_len, buf_out, &cmp_len, knet_h->compress_int_data[knet_h->compress_model]);
+ lzerr = (*_int_lzo1x_1_compress)(buf_in, buf_in_len, buf_out, &cmp_len, lzo2_data->workmem);
break;
}
if (lzerr != LZO_E_OK) {
log_err(knet_h, KNET_SUB_LZO2COMP, "lzo2 internal compression error");
savederrno = EAGAIN;
err = -1;
} else {
*buf_out_len = cmp_len;
}
errno = savederrno;
return err;
}
int lzo2_decompress(
knet_handle_t knet_h,
const unsigned char *buf_in,
const ssize_t buf_in_len,
unsigned char *buf_out,
ssize_t *buf_out_len)
{
int lzerr = 0, err = 0;
int savederrno = 0;
lzo_uint decmp_len;
- lzerr = lzo1x_decompress(buf_in, buf_in_len, buf_out, &decmp_len, NULL);
+ lzerr = (*_int_lzo1x_decompress)(buf_in, buf_in_len, buf_out, &decmp_len, NULL);
if (lzerr != LZO_E_OK) {
log_err(knet_h, KNET_SUB_LZO2COMP, "lzo2 internal decompression error");
savederrno = EAGAIN;
err = -1;
} else {
*buf_out_len = decmp_len;
}
errno = savederrno;
return err;
}
#endif
diff --git a/libknet/compress_lzo2.h b/libknet/compress_lzo2.h
index 82b90c75..c391211b 100644
--- a/libknet/compress_lzo2.h
+++ b/libknet/compress_lzo2.h
@@ -1,41 +1,40 @@
/*
* Copyright (C) 2010-2017 Red Hat, Inc. All rights reserved.
*
* Author: Fabio M. Di Nitto <fabbione@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#ifndef __KNET_COMPRESS_LZO2_H__
#define __KNET_COMPRESS_LZO2_H__
#include "internals.h"
int lzo2_init(
knet_handle_t knet_h,
int method_idx);
void lzo2_fini(
knet_handle_t knet_h,
- int method_idx,
- int knet_ref);
+ int method_idx);
int lzo2_val_level(
knet_handle_t knet_h,
int compress_level);
int lzo2_compress(
knet_handle_t knet_h,
const unsigned char *buf_in,
const ssize_t buf_in_len,
unsigned char *buf_out,
ssize_t *buf_out_len);
int lzo2_decompress(
knet_handle_t knet_h,
const unsigned char *buf_in,
const ssize_t buf_in_len,
unsigned char *buf_out,
ssize_t *buf_out_len);
#endif
diff --git a/libknet/handle.c b/libknet/handle.c
index fb31c34b..e6c908b8 100644
--- a/libknet/handle.c
+++ b/libknet/handle.c
@@ -1,1622 +1,1622 @@
/*
* Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <pthread.h>
#include <sys/uio.h>
#include <math.h>
#include <sys/time.h>
#include <sys/resource.h>
#include "internals.h"
#include "crypto.h"
#include "compress.h"
#include "compat.h"
#include "common.h"
#include "threads_common.h"
#include "threads_heartbeat.h"
#include "threads_pmtud.h"
#include "threads_dsthandler.h"
#include "threads_rx.h"
#include "threads_tx.h"
#include "transports.h"
#include "logging.h"
static pthread_mutex_t handle_config_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_rwlock_t shlib_rwlock;
static uint8_t shlib_wrlock_init = 0;
/*
* UINT8_MAX knet_handles need more than 20GB of RAM
* and you need to tune ulimit for tons of open file
* descriptors.
*/
static knet_handle_t *knet_handle_tracker[UINT8_MAX];
static uint8_t knet_ref = 0;
static int _init_handles_tracker(knet_handle_t knet_h)
{
int savederrno = 0;
uint8_t idx = 0;
if (!shlib_wrlock_init) {
for (idx = 0; idx < UINT8_MAX; idx++) {
knet_handle_tracker[idx] = NULL;
}
savederrno = pthread_rwlock_init(&shlib_rwlock, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize shared lib rwlock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
shlib_wrlock_init = 1;
}
return 0;
}
static void _fini_handles_tracker(void)
{
if (knet_ref == 0) {
pthread_rwlock_destroy(&shlib_rwlock);
shlib_wrlock_init = 0;
}
return;
}
static int _register_handle(knet_handle_t knet_h)
{
uint8_t idx = 0;
knet_h->this_knet_h_id = UINT8_MAX;
for (idx = 0; idx < UINT8_MAX; idx++) {
if (knet_handle_tracker[idx] == NULL) {
knet_handle_tracker[idx] = &knet_h;
knet_h->this_knet_h_id = idx;
return 0;
}
}
errno = EBUSY;
return -1;
}
static void _unregister_handle(knet_handle_t knet_h)
{
if (knet_h->this_knet_h_id != UINT8_MAX) {
knet_handle_tracker[knet_h->this_knet_h_id] = NULL;
}
return;
}
static int _init_locks(knet_handle_t knet_h)
{
int savederrno = 0;
savederrno = pthread_rwlock_init(&knet_h->global_rwlock, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize list rwlock: %s",
strerror(savederrno));
goto exit_fail;
}
knet_h->lock_init_done = 1;
savederrno = pthread_mutex_init(&knet_h->pmtud_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize pmtud mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_cond_init(&knet_h->pmtud_cond, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize pmtud conditional mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_mutex_init(&knet_h->hb_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize hb_thread mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_mutex_init(&knet_h->tx_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize tx_thread mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_mutex_init(&knet_h->tx_seq_num_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize tx_seq_num_mutex mutex: %s",
strerror(savederrno));
goto exit_fail;
}
return 0;
exit_fail:
errno = savederrno;
return -1;
}
static void _destroy_locks(knet_handle_t knet_h)
{
knet_h->lock_init_done = 0;
pthread_rwlock_destroy(&knet_h->global_rwlock);
pthread_mutex_destroy(&knet_h->pmtud_mutex);
pthread_cond_destroy(&knet_h->pmtud_cond);
pthread_mutex_destroy(&knet_h->hb_mutex);
pthread_mutex_destroy(&knet_h->tx_mutex);
pthread_mutex_destroy(&knet_h->tx_seq_num_mutex);
}
static int _init_socks(knet_handle_t knet_h)
{
int savederrno = 0;
if (_init_socketpair(knet_h, knet_h->hostsockfd)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize internal hostsockpair: %s",
strerror(savederrno));
goto exit_fail;
}
if (_init_socketpair(knet_h, knet_h->dstsockfd)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize internal dstsockpair: %s",
strerror(savederrno));
goto exit_fail;
}
return 0;
exit_fail:
errno = savederrno;
return -1;
}
static void _close_socks(knet_handle_t knet_h)
{
_close_socketpair(knet_h, knet_h->dstsockfd);
_close_socketpair(knet_h, knet_h->hostsockfd);
}
static int _init_buffers(knet_handle_t knet_h)
{
int savederrno = 0;
int i;
size_t bufsize;
for (i = 0; i < PCKT_FRAG_MAX; i++) {
bufsize = ceil((float)KNET_MAX_PACKET_SIZE / (i + 1)) + KNET_HEADER_ALL_SIZE;
knet_h->send_to_links_buf[i] = malloc(bufsize);
if (!knet_h->send_to_links_buf[i]) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory datafd to link buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->send_to_links_buf[i], 0, bufsize);
}
for (i = 0; i < PCKT_RX_BUFS; i++) {
knet_h->recv_from_links_buf[i] = malloc(KNET_DATABUFSIZE);
if (!knet_h->recv_from_links_buf[i]) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for link to datafd buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->recv_from_links_buf[i], 0, KNET_DATABUFSIZE);
}
knet_h->recv_from_sock_buf = malloc(KNET_DATABUFSIZE);
if (!knet_h->recv_from_sock_buf) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for app to datafd buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->recv_from_sock_buf, 0, KNET_DATABUFSIZE);
knet_h->pingbuf = malloc(KNET_HEADER_PING_SIZE);
if (!knet_h->pingbuf) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for hearbeat buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->pingbuf, 0, KNET_HEADER_PING_SIZE);
knet_h->pmtudbuf = malloc(KNET_PMTUD_SIZE_V6);
if (!knet_h->pmtudbuf) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for pmtud buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->pmtudbuf, 0, KNET_PMTUD_SIZE_V6);
for (i = 0; i < PCKT_FRAG_MAX; i++) {
bufsize = ceil((float)KNET_MAX_PACKET_SIZE / (i + 1)) + KNET_HEADER_ALL_SIZE + KNET_DATABUFSIZE_CRYPT_PAD;
knet_h->send_to_links_buf_crypt[i] = malloc(bufsize);
if (!knet_h->send_to_links_buf_crypt[i]) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for crypto datafd to link buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->send_to_links_buf_crypt[i], 0, bufsize);
}
knet_h->recv_from_links_buf_decrypt = malloc(KNET_DATABUFSIZE_CRYPT);
if (!knet_h->recv_from_links_buf_decrypt) {
savederrno = errno;
log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto link to datafd buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->recv_from_links_buf_decrypt, 0, KNET_DATABUFSIZE_CRYPT);
knet_h->recv_from_links_buf_crypt = malloc(KNET_DATABUFSIZE_CRYPT);
if (!knet_h->recv_from_links_buf_crypt) {
savederrno = errno;
log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto link to datafd buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->recv_from_links_buf_crypt, 0, KNET_DATABUFSIZE_CRYPT);
knet_h->pingbuf_crypt = malloc(KNET_DATABUFSIZE_CRYPT);
if (!knet_h->pingbuf_crypt) {
savederrno = errno;
log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto hearbeat buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->pingbuf_crypt, 0, KNET_DATABUFSIZE_CRYPT);
knet_h->pmtudbuf_crypt = malloc(KNET_DATABUFSIZE_CRYPT);
if (!knet_h->pmtudbuf_crypt) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for crypto pmtud buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->pmtudbuf_crypt, 0, KNET_DATABUFSIZE_CRYPT);
knet_h->recv_from_links_buf_decompress = malloc(KNET_DATABUFSIZE_COMPRESS);
if (!knet_h->recv_from_links_buf_decompress) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for decompress buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->recv_from_links_buf_decompress, 0, KNET_DATABUFSIZE_COMPRESS);
knet_h->send_to_links_buf_compress = malloc(KNET_DATABUFSIZE_COMPRESS);
if (!knet_h->send_to_links_buf_compress) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for compress buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->send_to_links_buf_compress, 0, KNET_DATABUFSIZE_COMPRESS);
memset(knet_h->knet_transport_fd_tracker, KNET_MAX_TRANSPORTS, sizeof(knet_h->knet_transport_fd_tracker));
return 0;
exit_fail:
errno = savederrno;
return -1;
}
static void _destroy_buffers(knet_handle_t knet_h)
{
int i;
for (i = 0; i < PCKT_FRAG_MAX; i++) {
free(knet_h->send_to_links_buf[i]);
free(knet_h->send_to_links_buf_crypt[i]);
}
for (i = 0; i < PCKT_RX_BUFS; i++) {
free(knet_h->recv_from_links_buf[i]);
}
free(knet_h->recv_from_links_buf_decompress);
free(knet_h->send_to_links_buf_compress);
free(knet_h->recv_from_sock_buf);
free(knet_h->recv_from_links_buf_decrypt);
free(knet_h->recv_from_links_buf_crypt);
free(knet_h->pingbuf);
free(knet_h->pingbuf_crypt);
free(knet_h->pmtudbuf);
free(knet_h->pmtudbuf_crypt);
}
static int _init_epolls(knet_handle_t knet_h)
{
struct epoll_event ev;
int savederrno = 0;
/*
* even if the kernel does dynamic allocation with epoll_ctl
* we need to reserve one extra for host to host communication
*/
knet_h->send_to_links_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS + 1);
if (knet_h->send_to_links_epollfd < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to create epoll datafd to link fd: %s",
strerror(savederrno));
goto exit_fail;
}
knet_h->recv_from_links_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS);
if (knet_h->recv_from_links_epollfd < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to create epoll link to datafd fd: %s",
strerror(savederrno));
goto exit_fail;
}
knet_h->dst_link_handler_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS);
if (knet_h->dst_link_handler_epollfd < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to create epoll dst cache fd: %s",
strerror(savederrno));
goto exit_fail;
}
if (_fdset_cloexec(knet_h->send_to_links_epollfd)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on datafd to link epoll fd: %s",
strerror(savederrno));
goto exit_fail;
}
if (_fdset_cloexec(knet_h->recv_from_links_epollfd)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on link to datafd epoll fd: %s",
strerror(savederrno));
goto exit_fail;
}
if (_fdset_cloexec(knet_h->dst_link_handler_epollfd)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on dst cache epoll fd: %s",
strerror(savederrno));
goto exit_fail;
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = knet_h->hostsockfd[0];
if (epoll_ctl(knet_h->send_to_links_epollfd,
EPOLL_CTL_ADD, knet_h->hostsockfd[0], &ev)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to add hostsockfd[0] to epoll pool: %s",
strerror(savederrno));
goto exit_fail;
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = knet_h->dstsockfd[0];
if (epoll_ctl(knet_h->dst_link_handler_epollfd,
EPOLL_CTL_ADD, knet_h->dstsockfd[0], &ev)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to add dstsockfd[0] to epoll pool: %s",
strerror(savederrno));
goto exit_fail;
}
return 0;
exit_fail:
errno = savederrno;
return -1;
}
static void _close_epolls(knet_handle_t knet_h)
{
struct epoll_event ev;
int i;
memset(&ev, 0, sizeof(struct epoll_event));
for (i = 0; i < KNET_DATAFD_MAX; i++) {
if (knet_h->sockfd[i].in_use) {
epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->sockfd[i].sockfd[knet_h->sockfd[i].is_created], &ev);
if (knet_h->sockfd[i].sockfd[knet_h->sockfd[i].is_created]) {
_close_socketpair(knet_h, knet_h->sockfd[i].sockfd);
}
}
}
epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->hostsockfd[0], &ev);
epoll_ctl(knet_h->dst_link_handler_epollfd, EPOLL_CTL_DEL, knet_h->dstsockfd[0], &ev);
close(knet_h->send_to_links_epollfd);
close(knet_h->recv_from_links_epollfd);
close(knet_h->dst_link_handler_epollfd);
}
static int _start_transports(knet_handle_t knet_h)
{
int i, savederrno = 0, err = 0;
for (i=0; i<KNET_MAX_TRANSPORTS; i++) {
switch (i) {
case KNET_TRANSPORT_UDP:
knet_h->transport_ops[i] = get_udp_transport();
break;
case KNET_TRANSPORT_SCTP:
knet_h->transport_ops[i] = get_sctp_transport();
break;
case KNET_TRANSPORT_LOOPBACK:
knet_h->transport_ops[i] = get_loopback_transport();
break;
}
if (knet_h->transport_ops[i]) {
if (knet_h->transport_ops[i]->transport_init(knet_h) < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Failed to allocate transport handle for %s: %s",
knet_h->transport_ops[i]->transport_name,
strerror(savederrno));
err = -1;
goto out;
}
}
}
out:
errno = savederrno;
return err;
}
static void _stop_transports(knet_handle_t knet_h)
{
int i;
for (i=0; i<KNET_MAX_TRANSPORTS; i++) {
if (knet_h->transport_ops[i]) {
knet_h->transport_ops[i]->transport_free(knet_h);
}
}
}
static int _start_threads(knet_handle_t knet_h)
{
int savederrno = 0;
savederrno = pthread_create(&knet_h->pmtud_link_handler_thread, 0,
_handle_pmtud_link_thread, (void *) knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to start pmtud link thread: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_create(&knet_h->dst_link_handler_thread, 0,
_handle_dst_link_handler_thread, (void *) knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to start dst cache thread: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_create(&knet_h->send_to_links_thread, 0,
_handle_send_to_links_thread, (void *) knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to start datafd to link thread: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_create(&knet_h->recv_from_links_thread, 0,
_handle_recv_from_links_thread, (void *) knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to start link to datafd thread: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_create(&knet_h->heartbt_thread, 0,
_handle_heartbt_thread, (void *) knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to start heartbeat thread: %s",
strerror(savederrno));
goto exit_fail;
}
return 0;
exit_fail:
errno = savederrno;
return -1;
}
static void _stop_threads(knet_handle_t knet_h)
{
void *retval;
/*
* allow threads to catch on shutdown request
* and release locks before we stop them.
* this isn't the most efficent way to handle it
* but it works good enough for now
*/
sleep(1);
if (knet_h->heartbt_thread) {
pthread_cancel(knet_h->heartbt_thread);
pthread_join(knet_h->heartbt_thread, &retval);
}
if (knet_h->send_to_links_thread) {
pthread_cancel(knet_h->send_to_links_thread);
pthread_join(knet_h->send_to_links_thread, &retval);
}
if (knet_h->recv_from_links_thread) {
pthread_cancel(knet_h->recv_from_links_thread);
pthread_join(knet_h->recv_from_links_thread, &retval);
}
if (knet_h->dst_link_handler_thread) {
pthread_cancel(knet_h->dst_link_handler_thread);
pthread_join(knet_h->dst_link_handler_thread, &retval);
}
pthread_mutex_lock(&knet_h->pmtud_mutex);
pthread_cond_signal(&knet_h->pmtud_cond);
pthread_mutex_unlock(&knet_h->pmtud_mutex);
sleep(1);
if (knet_h->pmtud_link_handler_thread) {
pthread_cancel(knet_h->pmtud_link_handler_thread);
pthread_join(knet_h->pmtud_link_handler_thread, &retval);
}
}
knet_handle_t knet_handle_new(knet_node_id_t host_id,
int log_fd,
uint8_t default_log_level)
{
knet_handle_t knet_h;
int savederrno = 0;
struct rlimit cur;
if (getrlimit(RLIMIT_NOFILE, &cur) < 0) {
return NULL;
}
if ((log_fd < 0) || ((unsigned int)log_fd >= cur.rlim_max)) {
errno = EINVAL;
return NULL;
}
/*
* validate incoming request
*/
if ((log_fd) && (default_log_level > KNET_LOG_DEBUG)) {
errno = EINVAL;
return NULL;
}
/*
* allocate handle
*/
knet_h = malloc(sizeof(struct knet_handle));
if (!knet_h) {
errno = ENOMEM;
return NULL;
}
memset(knet_h, 0, sizeof(struct knet_handle));
savederrno = pthread_mutex_lock(&handle_config_mutex);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get handle mutex lock: %s",
strerror(savederrno));
errno = savederrno;
goto exit_fail;
}
/*
* copy config in place
*/
knet_h->host_id = host_id;
knet_h->logfd = log_fd;
if (knet_h->logfd > 0) {
memset(&knet_h->log_levels, default_log_level, KNET_MAX_SUBSYSTEMS);
}
/*
* set pmtud default timers
*/
knet_h->pmtud_interval = KNET_PMTUD_DEFAULT_INTERVAL;
/*
* set transports reconnect default timers
*/
knet_h->reconnect_int = KNET_TRANSPORT_DEFAULT_RECONNECT_INTERVAL;
/*
* Set 'min' stats to the maximum value so the
* first value we get is always less
*/
knet_h->stats.tx_compress_time_min = UINT64_MAX;
knet_h->stats.rx_compress_time_min = UINT64_MAX;
knet_h->stats.tx_crypt_time_min = UINT64_MAX;
knet_h->stats.rx_crypt_time_min = UINT64_MAX;
/*
* init global handles tracker
*/
if (_init_handles_tracker(knet_h) < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to init handles traceker: %s",
strerror(savederrno));
errno = savederrno;
goto exit_fail;
}
if (_register_handle(knet_h) < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to register handle: %s",
strerror(savederrno));
errno = savederrno;
goto exit_fail;
}
/*
* init main locking structures
*/
if (_init_locks(knet_h)) {
savederrno = errno;
goto exit_fail;
}
/*
* init sockets
*/
if (_init_socks(knet_h)) {
savederrno = errno;
goto exit_fail;
}
/*
* allocate packet buffers
*/
if (_init_buffers(knet_h)) {
savederrno = errno;
goto exit_fail;
}
if (compress_init(knet_h)) {
savederrno = errno;
goto exit_fail;
}
/*
* create epoll fds
*/
if (_init_epolls(knet_h)) {
savederrno = errno;
goto exit_fail;
}
/*
* start transports
*/
if (_start_transports(knet_h)) {
savederrno = errno;
goto exit_fail;
}
/*
* start internal threads
*/
if (_start_threads(knet_h)) {
savederrno = errno;
goto exit_fail;
}
knet_ref++;
pthread_mutex_unlock(&handle_config_mutex);
return knet_h;
exit_fail:
pthread_mutex_unlock(&handle_config_mutex);
knet_handle_free(knet_h);
errno = savederrno;
return NULL;
}
int knet_handle_free(knet_handle_t knet_h)
{
int savederrno = 0;
savederrno = pthread_mutex_lock(&handle_config_mutex);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get handle mutex lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h) {
pthread_mutex_unlock(&handle_config_mutex);
errno = EINVAL;
return -1;
}
if (!knet_h->lock_init_done) {
goto exit_nolock;
}
savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
pthread_mutex_unlock(&handle_config_mutex);
errno = savederrno;
return -1;
}
if (knet_h->host_head != NULL) {
savederrno = EBUSY;
log_err(knet_h, KNET_SUB_HANDLE,
"Unable to free handle: host(s) or listener(s) are still active: %s",
strerror(savederrno));
pthread_rwlock_unlock(&knet_h->global_rwlock);
pthread_mutex_unlock(&handle_config_mutex);
errno = savederrno;
return -1;
}
knet_h->fini_in_progress = 1;
pthread_rwlock_unlock(&knet_h->global_rwlock);
_stop_threads(knet_h);
_stop_transports(knet_h);
_close_epolls(knet_h);
_destroy_buffers(knet_h);
_close_socks(knet_h);
crypto_fini(knet_h);
- compress_fini(knet_h, knet_ref - 1);
+ compress_fini(knet_h);
_destroy_locks(knet_h);
_unregister_handle(knet_h);
exit_nolock:
free(knet_h);
knet_h = NULL;
knet_ref--;
_fini_handles_tracker();
pthread_mutex_unlock(&handle_config_mutex);
return 0;
}
int knet_handle_enable_sock_notify(knet_handle_t knet_h,
void *sock_notify_fn_private_data,
void (*sock_notify_fn) (
void *private_data,
int datafd,
int8_t channel,
uint8_t tx_rx,
int error,
int errorno))
{
int savederrno = 0, err = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!sock_notify_fn) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->sock_notify_fn_private_data = sock_notify_fn_private_data;
knet_h->sock_notify_fn = sock_notify_fn;
log_debug(knet_h, KNET_SUB_HANDLE, "sock_notify_fn enabled");
pthread_rwlock_unlock(&knet_h->global_rwlock);
return err;
}
int knet_handle_add_datafd(knet_handle_t knet_h, int *datafd, int8_t *channel)
{
int err = 0, savederrno = 0;
int i;
struct epoll_event ev;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (datafd == NULL) {
errno = EINVAL;
return -1;
}
if (channel == NULL) {
errno = EINVAL;
return -1;
}
if (*channel >= KNET_DATAFD_MAX) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->sock_notify_fn) {
log_err(knet_h, KNET_SUB_HANDLE, "Adding datafd requires sock notify callback enabled!");
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
if (*datafd > 0) {
for (i = 0; i < KNET_DATAFD_MAX; i++) {
if ((knet_h->sockfd[i].in_use) && (knet_h->sockfd[i].sockfd[0] == *datafd)) {
log_err(knet_h, KNET_SUB_HANDLE, "requested datafd: %d already exist in index: %d", *datafd, i);
savederrno = EEXIST;
err = -1;
goto out_unlock;
}
}
}
/*
* auto allocate a channel
*/
if (*channel < 0) {
for (i = 0; i < KNET_DATAFD_MAX; i++) {
if (!knet_h->sockfd[i].in_use) {
*channel = i;
break;
}
}
if (*channel < 0) {
savederrno = EBUSY;
err = -1;
goto out_unlock;
}
} else {
if (knet_h->sockfd[*channel].in_use) {
savederrno = EBUSY;
err = -1;
goto out_unlock;
}
}
knet_h->sockfd[*channel].is_created = 0;
knet_h->sockfd[*channel].is_socket = 0;
knet_h->sockfd[*channel].has_error = 0;
if (*datafd > 0) {
int sockopt;
socklen_t sockoptlen = sizeof(sockopt);
if (_fdset_cloexec(*datafd)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on datafd: %s",
strerror(savederrno));
goto out_unlock;
}
if (_fdset_nonblock(*datafd)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to set NONBLOCK on datafd: %s",
strerror(savederrno));
goto out_unlock;
}
knet_h->sockfd[*channel].sockfd[0] = *datafd;
knet_h->sockfd[*channel].sockfd[1] = 0;
if (!getsockopt(knet_h->sockfd[*channel].sockfd[0], SOL_SOCKET, SO_TYPE, &sockopt, &sockoptlen)) {
knet_h->sockfd[*channel].is_socket = 1;
}
} else {
if (_init_socketpair(knet_h, knet_h->sockfd[*channel].sockfd)) {
savederrno = errno;
err = -1;
goto out_unlock;
}
knet_h->sockfd[*channel].is_created = 1;
knet_h->sockfd[*channel].is_socket = 1;
*datafd = knet_h->sockfd[*channel].sockfd[0];
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created];
if (epoll_ctl(knet_h->send_to_links_epollfd,
EPOLL_CTL_ADD, knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created], &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to add datafd %d to linkfd epoll pool: %s",
knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created], strerror(savederrno));
if (knet_h->sockfd[*channel].is_created) {
_close_socketpair(knet_h, knet_h->sockfd[*channel].sockfd);
}
goto out_unlock;
}
knet_h->sockfd[*channel].in_use = 1;
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_handle_remove_datafd(knet_handle_t knet_h, int datafd)
{
int err = 0, savederrno = 0;
int8_t channel = -1;
int i;
struct epoll_event ev;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (datafd <= 0) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
for (i = 0; i < KNET_DATAFD_MAX; i++) {
if ((knet_h->sockfd[i].in_use) &&
(knet_h->sockfd[i].sockfd[0] == datafd)) {
channel = i;
break;
}
}
if (channel < 0) {
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
if (!knet_h->sockfd[channel].has_error) {
memset(&ev, 0, sizeof(struct epoll_event));
if (epoll_ctl(knet_h->send_to_links_epollfd,
EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to del datafd %d from linkfd epoll pool: %s",
knet_h->sockfd[channel].sockfd[0], strerror(savederrno));
goto out_unlock;
}
}
if (knet_h->sockfd[channel].is_created) {
_close_socketpair(knet_h, knet_h->sockfd[channel].sockfd);
}
memset(&knet_h->sockfd[channel], 0, sizeof(struct knet_sock));
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_handle_get_datafd(knet_handle_t knet_h, const int8_t channel, int *datafd)
{
int err = 0, savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if ((channel < 0) || (channel >= KNET_DATAFD_MAX)) {
errno = EINVAL;
return -1;
}
if (datafd == NULL) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->sockfd[channel].in_use) {
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
*datafd = knet_h->sockfd[channel].sockfd[0];
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_handle_get_channel(knet_handle_t knet_h, const int datafd, int8_t *channel)
{
int err = 0, savederrno = 0;
int i;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (datafd <= 0) {
errno = EINVAL;
return -1;
}
if (channel == NULL) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
*channel = -1;
for (i = 0; i < KNET_DATAFD_MAX; i++) {
if ((knet_h->sockfd[i].in_use) &&
(knet_h->sockfd[i].sockfd[0] == datafd)) {
*channel = i;
break;
}
}
if (*channel < 0) {
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_handle_enable_filter(knet_handle_t knet_h,
void *dst_host_filter_fn_private_data,
int (*dst_host_filter_fn) (
void *private_data,
const unsigned char *outdata,
ssize_t outdata_len,
uint8_t tx_rx,
knet_node_id_t this_host_id,
knet_node_id_t src_node_id,
int8_t *channel,
knet_node_id_t *dst_host_ids,
size_t *dst_host_ids_entries))
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->dst_host_filter_fn_private_data = dst_host_filter_fn_private_data;
knet_h->dst_host_filter_fn = dst_host_filter_fn;
if (knet_h->dst_host_filter_fn) {
log_debug(knet_h, KNET_SUB_HANDLE, "dst_host_filter_fn enabled");
} else {
log_debug(knet_h, KNET_SUB_HANDLE, "dst_host_filter_fn disabled");
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
return 0;
}
int knet_handle_setfwd(knet_handle_t knet_h, unsigned int enabled)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (enabled > 1) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->enabled = enabled;
if (enabled) {
log_debug(knet_h, KNET_SUB_HANDLE, "Data forwarding is enabled");
} else {
log_debug(knet_h, KNET_SUB_HANDLE, "Data forwarding is disabled");
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
return 0;
}
int knet_handle_pmtud_getfreq(knet_handle_t knet_h, unsigned int *interval)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!interval) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
*interval = knet_h->pmtud_interval;
pthread_rwlock_unlock(&knet_h->global_rwlock);
return 0;
}
int knet_handle_pmtud_setfreq(knet_handle_t knet_h, unsigned int interval)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if ((!interval) || (interval > 86400)) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->pmtud_interval = interval;
log_debug(knet_h, KNET_SUB_HANDLE, "PMTUd interval set to: %u seconds", interval);
pthread_rwlock_unlock(&knet_h->global_rwlock);
return 0;
}
int knet_handle_enable_pmtud_notify(knet_handle_t knet_h,
void *pmtud_notify_fn_private_data,
void (*pmtud_notify_fn) (
void *private_data,
unsigned int data_mtu))
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->pmtud_notify_fn_private_data = pmtud_notify_fn_private_data;
knet_h->pmtud_notify_fn = pmtud_notify_fn;
if (knet_h->pmtud_notify_fn) {
log_debug(knet_h, KNET_SUB_HANDLE, "pmtud_notify_fn enabled");
} else {
log_debug(knet_h, KNET_SUB_HANDLE, "pmtud_notify_fn disabled");
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
return 0;
}
int knet_handle_pmtud_get(knet_handle_t knet_h,
unsigned int *data_mtu)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!data_mtu) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
*data_mtu = knet_h->data_mtu;
pthread_rwlock_unlock(&knet_h->global_rwlock);
return 0;
}
int knet_handle_crypto(knet_handle_t knet_h, struct knet_handle_crypto_cfg *knet_handle_crypto_cfg)
{
int savederrno = 0;
int err = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!knet_handle_crypto_cfg) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
crypto_fini(knet_h);
if ((!strncmp("none", knet_handle_crypto_cfg->crypto_model, 4)) ||
((!strncmp("none", knet_handle_crypto_cfg->crypto_cipher_type, 4)) &&
(!strncmp("none", knet_handle_crypto_cfg->crypto_hash_type, 4)))) {
log_debug(knet_h, KNET_SUB_CRYPTO, "crypto is not enabled");
err = 0;
goto exit_unlock;
}
if (knet_handle_crypto_cfg->private_key_len < KNET_MIN_KEY_LEN) {
log_debug(knet_h, KNET_SUB_CRYPTO, "private key len too short (min %d): %u",
KNET_MIN_KEY_LEN, knet_handle_crypto_cfg->private_key_len);
savederrno = EINVAL;
err = -1;
goto exit_unlock;
}
if (knet_handle_crypto_cfg->private_key_len > KNET_MAX_KEY_LEN) {
log_debug(knet_h, KNET_SUB_CRYPTO, "private key len too long (max %d): %u",
KNET_MAX_KEY_LEN, knet_handle_crypto_cfg->private_key_len);
savederrno = EINVAL;
err = -1;
goto exit_unlock;
}
err = crypto_init(knet_h, knet_handle_crypto_cfg);
if (err) {
err = -2;
}
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_handle_compress(knet_handle_t knet_h, struct knet_handle_compress_cfg *knet_handle_compress_cfg)
{
int savederrno = 0;
int err = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!knet_handle_compress_cfg) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
err = compress_cfg(knet_h, knet_handle_compress_cfg);
savederrno = errno;
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
ssize_t knet_recv(knet_handle_t knet_h, char *buff, const size_t buff_len, const int8_t channel)
{
int savederrno = 0;
ssize_t err = 0;
struct iovec iov_in;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (buff == NULL) {
errno = EINVAL;
return -1;
}
if (buff_len <= 0) {
errno = EINVAL;
return -1;
}
if (buff_len > KNET_MAX_PACKET_SIZE) {
errno = EINVAL;
return -1;
}
if (channel < 0) {
errno = EINVAL;
return -1;
}
if (channel >= KNET_DATAFD_MAX) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->sockfd[channel].in_use) {
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
memset(&iov_in, 0, sizeof(iov_in));
iov_in.iov_base = (void *)buff;
iov_in.iov_len = buff_len;
err = readv(knet_h->sockfd[channel].sockfd[0], &iov_in, 1);
savederrno = errno;
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
ssize_t knet_send(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel)
{
int savederrno = 0;
ssize_t err = 0;
struct iovec iov_out[1];
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (buff == NULL) {
errno = EINVAL;
return -1;
}
if (buff_len <= 0) {
errno = EINVAL;
return -1;
}
if (buff_len > KNET_MAX_PACKET_SIZE) {
errno = EINVAL;
return -1;
}
if (channel < 0) {
errno = EINVAL;
return -1;
}
if (channel >= KNET_DATAFD_MAX) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->sockfd[channel].in_use) {
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
memset(iov_out, 0, sizeof(iov_out));
iov_out[0].iov_base = (void *)buff;
iov_out[0].iov_len = buff_len;
err = writev(knet_h->sockfd[channel].sockfd[0], iov_out, 1);
savederrno = errno;
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_handle_get_stats(knet_handle_t knet_h, struct knet_handle_stats *stats, size_t struct_size)
{
int savederrno = 0;
int err = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!stats) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (struct_size > sizeof(struct knet_handle_stats)) {
struct_size = sizeof(struct knet_handle_stats);
}
memmove(stats, &knet_h->stats, struct_size);
/* Tell the caller our full size in case they have an old version */
stats->size = sizeof(struct knet_handle_stats);
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}

File Metadata

Mime Type
text/x-diff
Expires
Thu, Feb 27, 4:03 AM (1 d, 11 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1466137
Default Alt Text
(75 KB)

Event Timeline