diff --git a/configure.ac b/configure.ac index a213eef9..214a426d 100644 --- a/configure.ac +++ b/configure.ac @@ -1,394 +1,393 @@ # # Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved. # # Authors: Fabio M. Di Nitto # Federico Simoncelli # # This software licensed under GPL-2.0+, LGPL-2.0+ # # -*- Autoconf -*- # Process this file with autoconf to produce a configure script. # AC_PREREQ([2.63]) AC_INIT([kronosnet], m4_esyscmd([build-aux/git-version-gen .tarball-version]), [devel@lists.kronosnet.org]) AC_USE_SYSTEM_EXTENSIONS AM_INIT_AUTOMAKE([1.11.1 dist-bzip2 dist-xz color-tests -Wno-portability]) # Usage of subdir-objects breaks make maintainer-clean targets. # Not using it spits out some warnings at ./autogen time and we can live with those for now # AM_INIT_AUTOMAKE([1.11.1 dist-bzip2 dist-xz color-tests -Wno-portability subdir-objects]) LT_PREREQ([2.2.6]) LT_INIT AC_CONFIG_MACRO_DIR([m4]) AC_CONFIG_SRCDIR([kronosnetd/main.c]) AC_CONFIG_HEADERS([config.h]) AC_CANONICAL_HOST AC_PROG_LIBTOOL AC_LANG([C]) systemddir=${prefix}/lib/systemd/system if test "$prefix" = "NONE"; then prefix="/usr" if test "$localstatedir" = "\${prefix}/var"; then localstatedir="/var" fi if test "$sysconfdir" = "\${prefix}/etc"; then sysconfdir="/etc" fi if test "$systemddir" = "NONE/lib/systemd/system"; then systemddir=/lib/systemd/system fi if test "$libdir" = "\${exec_prefix}/lib"; then if test -e /usr/lib64; then libdir="/usr/lib64" else libdir="/usr/lib" fi fi fi # Checks for programs. if ! ${MAKE-make} --version /cannot/make/this >/dev/null 2>&1; then AC_MSG_ERROR(["you don't seem to have GNU make; it is required"]) fi AC_PROG_AWK AC_PROG_GREP AC_PROG_SED AC_PROG_CPP AC_PROG_CC AM_PROG_CC_C_O AC_PROG_LN_S AC_PROG_INSTALL AC_PROG_MAKE_SET AC_PROG_CXX AC_PROG_RANLIB AC_CHECK_PROGS([PUBLICAN], [publican], [:]) AC_CHECK_PROGS([PKGCONFIG], [pkg-config]) AC_ARG_ENABLE([kronosnetd], [ --enable-kronosnetd : Kronosnetd support ],, [ enable_kronosnetd="no" ]) AM_CONDITIONAL([BUILD_KRONOSNETD], test x$enable_kronosnetd = xyes) AC_ARG_ENABLE([libtap], [ --enable-libtap : libtap support ],, [ enable_libtap="no" ]) if test "x$enable_kronosnetd" = xyes; then enable_libtap=yes fi AM_CONDITIONAL([BUILD_LIBTAP], test x$enable_libtap = xyes) AC_ARG_ENABLE([libknet-sctp], [ --enable-libknet-sctp : libknet SCTP support ],, [ enable_libknet_sctp="yes" ]) -AM_CONDITIONAL([BUILD_LIBKNET_SCTP], test x$enable_libknet_sctp = xyes) ## local helper functions # this function checks if CC support options passed as # args. Global CFLAGS are ignored during this test. cc_supports_flag() { saveCPPFLAGS="$CPPFLAGS" CPPFLAGS="$@" if echo $CC | grep -q clang; then CPPFLAGS="-Werror $CPPFLAGS" fi AC_MSG_CHECKING([whether $CC supports "$@"]) AC_PREPROC_IFELSE([AC_LANG_PROGRAM([])], [RC=0; AC_MSG_RESULT([yes])], [RC=1; AC_MSG_RESULT([no])]) CPPFLAGS="$saveCPPFLAGS" return $RC } # helper macro to check libs without adding them to LIBS check_lib_no_libs() { lib_no_libs_arg1=$1 shift lib_no_libs_arg2=$1 shift lib_no_libs_args=$@ AC_CHECK_LIB([$lib_no_libs_arg1], [$lib_no_libs_arg2],,, [$lib_no_libs_args]) LIBS=$ac_check_lib_save_LIBS } # Checks for C features AC_C_INLINE # Checks for libraries. AC_CHECK_LIB([pthread], [pthread_create]) AC_CHECK_LIB([m], [ceil]) AC_CHECK_LIB([rt], [clock_gettime]) PKG_CHECK_MODULES([nss],[nss]) # Checks for header files. AC_CHECK_HEADERS([fcntl.h]) AC_CHECK_HEADERS([stdlib.h]) AC_CHECK_HEADERS([string.h]) AC_CHECK_HEADERS([strings.h]) AC_CHECK_HEADERS([sys/ioctl.h]) AC_CHECK_HEADERS([syslog.h]) AC_CHECK_HEADERS([unistd.h]) AC_CHECK_HEADERS([netinet/in.h]) AC_CHECK_HEADERS([sys/socket.h]) AC_CHECK_HEADERS([arpa/inet.h]) AC_CHECK_HEADERS([netdb.h]) AC_CHECK_HEADERS([limits.h]) AC_CHECK_HEADERS([stdint.h]) if test "x$enable_libknet_sctp" = xyes; then AC_CHECK_HEADERS([netinet/sctp.h],, AC_MSG_ERROR(["missing required SCTP headers"])) fi # Checks for typedefs, structures, and compiler characteristics. AC_C_INLINE AC_TYPE_SIZE_T AC_TYPE_PID_T AC_TYPE_SSIZE_T AC_TYPE_UINT8_T AC_TYPE_UINT16_T AC_TYPE_UINT32_T AC_TYPE_UINT64_T AC_TYPE_INT32_T # Checks for library functions. AC_FUNC_ALLOCA AC_FUNC_FORK AC_FUNC_MALLOC AC_FUNC_REALLOC AC_CHECK_FUNCS([memset]) AC_CHECK_FUNCS([strdup]) AC_CHECK_FUNCS([strerror]) AC_CHECK_FUNCS([dup2]) AC_CHECK_FUNCS([select]) AC_CHECK_FUNCS([socket]) AC_CHECK_FUNCS([inet_ntoa]) AC_CHECK_FUNCS([memmove]) AC_CHECK_FUNCS([strchr]) AC_CHECK_FUNCS([atexit]) AC_CHECK_FUNCS([ftruncate]) AC_CHECK_FUNCS([strrchr]) AC_CHECK_FUNCS([strstr]) AC_CHECK_FUNCS([clock_gettime]) AC_CHECK_FUNCS([strcasecmp]) AC_CHECK_FUNCS([sendmmsg]) AC_CHECK_FUNCS([recvmmsg]) # Check entries in specific structs AC_CHECK_MEMBER([struct mmsghdr.msg_hdr], [AC_DEFINE_UNQUOTED([HAVE_MMSGHDR], [1], [struct mmsghdr exists])], [], [[#include ]]) # checks (for kronosnetd) if test "x$enable_kronosnetd" = xyes; then AC_CHECK_HEADERS([security/pam_appl.h], [AC_CHECK_LIB([pam], [pam_start])], [AC_MSG_ERROR([Unable to find LinuxPAM devel files])]) AC_CHECK_HEADERS([security/pam_misc.h], [AC_CHECK_LIB([pam_misc], [misc_conv])], [AC_MSG_ERROR([Unable to find LinuxPAM MISC devel files])]) PKG_CHECK_MODULES([libqb], [libqb]) AC_CHECK_LIB([qb], [qb_log_thread_priority_set], [have_qb_log_thread_priority_set="yes"], [have_qb_log_thread_priority_set="no"]) if test "x${have_qb_log_thread_priority_set}" = xyes; then AC_DEFINE_UNQUOTED([HAVE_QB_LOG_THREAD_PRIORITY_SET], 1, [have qb_log_thread_priority_set]) fi fi # local options AC_ARG_ENABLE([debug], [ --enable-debug enable debug build. ], [ default="no" ]) AC_ARG_ENABLE([publicandocs], [ --enable-publicandocs enable docs build. ], [ default="no" ]) AC_ARG_WITH([initdefaultdir], [ --with-initdefaultdir : path to /etc/sysconfig/.. or /etc/default dir. ], [ INITDEFAULTDIR="$withval" ], [ INITDEFAULTDIR="$sysconfdir/default" ]) AC_ARG_WITH([initddir], [ --with-initddir=DIR : path to init script directory. ], [ INITDDIR="$withval" ], [ INITDDIR="$sysconfdir/init.d" ]) AC_ARG_WITH([systemddir], [ --with-systemddir=DIR : path to systemd unit files directory. ], [ SYSTEMDDIR="$withval" ], [ SYSTEMDDIR="$systemddir" ]) AC_ARG_WITH([syslogfacility], [ --with-syslogfacility=FACILITY default syslog facility. ], [ SYSLOGFACILITY="$withval" ], [ SYSLOGFACILITY="LOG_DAEMON" ]) AC_ARG_WITH([sysloglevel], [ --with-sysloglevel=LEVEL default syslog level. ], [ SYSLOGLEVEL="$withval" ], [ SYSLOGLEVEL="LOG_INFO" ]) AC_ARG_WITH([defaultadmgroup], [ --with-defaultadmgroup=GROUP define PAM group. Users part of this group will be allowed to configure kronosnet. Others will only receive read-only rights. ], [ DEFAULTADMGROUP="$withval" ], [ DEFAULTADMGROUP="kronosnetadm" ]) ## random vars LOGDIR=${localstatedir}/log/ RUNDIR=${localstatedir}/run/ DEFAULT_CONFIG_DIR=${sysconfdir}/kronosnet ## do subst AM_CONDITIONAL([BUILD_DOCS], [test "x${enable_publicandocs}" = xyes]) AM_CONDITIONAL([DEBUG], [test "x${enable_debug}" = xyes]) AC_SUBST([DEFAULT_CONFIG_DIR]) AC_SUBST([INITDEFAULTDIR]) AC_SUBST([INITDDIR]) AC_SUBST([SYSTEMDDIR]) AC_SUBST([LOGDIR]) AC_SUBST([DEFAULTADMGROUP]) AC_DEFINE_UNQUOTED([DEFAULT_CONFIG_DIR], ["$(eval echo ${DEFAULT_CONFIG_DIR})"], [Default config directory]) AC_DEFINE_UNQUOTED([DEFAULT_CONFIG_FILE], ["$(eval echo ${DEFAULT_CONFIG_DIR}/kronosnetd.conf)"], [Default config file]) AC_DEFINE_UNQUOTED([LOGDIR], ["$(eval echo ${LOGDIR})"], [Default logging directory]) AC_DEFINE_UNQUOTED([DEFAULT_LOG_FILE], ["$(eval echo ${LOGDIR}/kronosnetd.log)"], [Default log file]) AC_DEFINE_UNQUOTED([RUNDIR], ["$(eval echo ${RUNDIR})"], [Default run directory]) AC_DEFINE_UNQUOTED([SYSLOGFACILITY], [$(eval echo ${SYSLOGFACILITY})], [Default syslog facility]) AC_DEFINE_UNQUOTED([SYSLOGLEVEL], [$(eval echo ${SYSLOGLEVEL})], [Default syslog level]) AC_DEFINE_UNQUOTED([DEFAULTADMGROUP], ["$(eval echo ${DEFAULTADMGROUP})"], [Default admin group]) ## *FLAGS handling ENV_CFLAGS="$CFLAGS" ENV_CPPFLAGS="$CPPFLAGS" ENV_LDFLAGS="$LDFLAGS" # debug build stuff if test "x${enable_debug}" = xyes; then AC_DEFINE_UNQUOTED([DEBUG], [1], [Compiling Debugging code]) OPT_CFLAGS="-O0" else OPT_CFLAGS="-O3" fi # gdb flags if test "x${GCC}" = xyes; then GDB_FLAGS="-ggdb3" else GDB_FLAGS="-g" fi # extra warnings EXTRA_WARNINGS="" WARNLIST=" all shadow missing-prototypes missing-declarations strict-prototypes declaration-after-statement pointer-arith write-strings cast-align bad-function-cast missing-format-attribute format=2 format-security format-nonliteral no-long-long unsigned-char gnu89-inline no-strict-aliasing error address cpp overflow parentheses sequence-point switch uninitialized unused-but-set-variable unused-function unused-result unused-value unused-variable " for j in $WARNLIST; do if cc_supports_flag -W$j; then EXTRA_WARNINGS="$EXTRA_WARNINGS -W$j"; fi done CFLAGS="$ENV_CFLAGS $lt_prog_compiler_pic $OPT_CFLAGS $GDB_FLAGS \ $EXTRA_WARNINGS $WERROR_CFLAGS" CPPFLAGS="$ENV_CPPFLAGS" LDFLAGS="$ENV_LDFLAGS $lt_prog_compiler_pic -Wl,--as-needed" AC_CONFIG_FILES([ Makefile common/Makefile init/Makefile libtap/Makefile libtap/libtap.pc kronosnetd/Makefile kronosnetd/kronosnetd.logrotate libknet/Makefile libknet/libknet.pc libknet/tests/Makefile docs/Makefile poc-code/Makefile poc-code/iov-hash/Makefile poc-code/access-list/Makefile ]) AC_OUTPUT diff --git a/libknet/Makefile.am b/libknet/Makefile.am index 84ac8882..b76a220f 100644 --- a/libknet/Makefile.am +++ b/libknet/Makefile.am @@ -1,87 +1,84 @@ # # Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved. # # Authors: Fabio M. Di Nitto # Federico Simoncelli # # This software licensed under GPL-2.0+, LGPL-2.0+ # MAINTAINERCLEANFILES = Makefile.in include $(top_srcdir)/build-aux/check.mk SYMFILE = libknet_exported_syms EXTRA_DIST = $(SYMFILE) SUBDIRS = . tests libversion = 0:0:0 # override global LIBS that pulls in lots of craft we don't need here LIBS = sources = \ common.c \ compat.c \ crypto.c \ handle.c \ host.c \ listener.c \ link.c \ logging.c \ nsscrypto.c \ threads_common.c \ threads_dsthandler.c \ threads_heartbeat.c \ threads_pmtud.c \ threads_send_recv.c \ transport_udp.c \ + transport_sctp.c \ transport_common.c -if BUILD_LIBKNET_SCTP -sources += transport_sctp.c -endif - if DEBUG sources += ../common/netutils.c endif include_HEADERS = libknet.h pkgconfigdir = $(libdir)/pkgconfig pkgconfig_DATA = libknet.pc noinst_HEADERS = \ common.h \ compat.h \ crypto.h \ host.h \ internals.h \ link.h \ listener.h \ logging.h \ nsscrypto.h \ onwire.h \ threads_common.h \ threads_dsthandler.h \ threads_heartbeat.h \ threads_pmtud.h \ threads_send_recv.h \ transports.h lib_LTLIBRARIES = libknet.la libknet_la_SOURCES = $(sources) libknet_la_CFLAGS = $(nss_CFLAGS) EXTRA_libknet_la_DEPENDENCIES = $(SYMFILE) libknet_la_LDFLAGS = -Wl,--version-script=$(srcdir)/$(SYMFILE) \ --export-dynamic \ -version-number $(libversion) libknet_la_LIBADD = $(nss_LIBS) -lrt -lpthread -lm diff --git a/libknet/handle.c b/libknet/handle.c index 69e0e3c1..fef199cf 100644 --- a/libknet/handle.c +++ b/libknet/handle.c @@ -1,1501 +1,1499 @@ /* * Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under GPL-2.0+, LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include #include #include #include #include #include "internals.h" #include "crypto.h" #include "common.h" #include "threads_common.h" #include "threads_heartbeat.h" #include "threads_pmtud.h" #include "threads_dsthandler.h" #include "threads_send_recv.h" #include "transports.h" #include "logging.h" static pthread_mutex_t handle_config_mutex = PTHREAD_MUTEX_INITIALIZER; static int _init_locks(knet_handle_t knet_h) { int savederrno = 0; savederrno = pthread_rwlock_init(&knet_h->global_rwlock, NULL); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize list rwlock: %s", strerror(savederrno)); goto exit_fail; } knet_h->lock_init_done = 1; savederrno = pthread_rwlock_init(&knet_h->listener_rwlock, NULL); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize listener rwlock: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_rwlock_init(&knet_h->host_rwlock, NULL); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize host rwlock: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_mutex_init(&knet_h->host_mutex, NULL); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize host mutex: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_cond_init(&knet_h->host_cond, NULL); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize host conditional mutex: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_mutex_init(&knet_h->pmtud_mutex, NULL); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize pmtud mutex: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_cond_init(&knet_h->pmtud_cond, NULL); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize pmtud conditional mutex: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_mutex_init(&knet_h->tx_mutex, NULL); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize tx_thread mutex: %s", strerror(savederrno)); goto exit_fail; } return 0; exit_fail: errno = savederrno; return -1; } static void _destroy_locks(knet_handle_t knet_h) { knet_h->lock_init_done = 0; pthread_rwlock_destroy(&knet_h->global_rwlock); pthread_rwlock_destroy(&knet_h->listener_rwlock); pthread_rwlock_destroy(&knet_h->host_rwlock); pthread_mutex_destroy(&knet_h->host_mutex); pthread_cond_destroy(&knet_h->host_cond); pthread_mutex_destroy(&knet_h->pmtud_mutex); pthread_cond_destroy(&knet_h->pmtud_cond); pthread_mutex_destroy(&knet_h->tx_mutex); } static int _init_socketpair(knet_handle_t knet_h, int *sock) { int savederrno = 0; int value; int i; if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, sock) != 0) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize socketpair: %s", strerror(savederrno)); goto exit_fail; } for (i = 0; i < 2; i++) { if (_fdset_cloexec(sock[i])) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on sock[%d]: %s", i, strerror(savederrno)); goto exit_fail; } if (_fdset_nonblock(sock[i])) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to set NONBLOCK on sock[%d]: %s", i, strerror(savederrno)); goto exit_fail; } value = KNET_RING_RCVBUFF; if (setsockopt(sock[i], SOL_SOCKET, SO_RCVBUFFORCE, &value, sizeof(value)) < 0) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to set receive buffer on sock[%d]: %s", i, strerror(savederrno)); goto exit_fail; } value = KNET_RING_RCVBUFF; if (setsockopt(sock[i], SOL_SOCKET, SO_SNDBUFFORCE, &value, sizeof(value)) < 0) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to set send buffer on sock[%d]: %s", i, strerror(savederrno)); goto exit_fail; } } return 0; exit_fail: errno = savederrno; return -1; } static void _close_socketpair(knet_handle_t knet_h, int *sock) { int i; for (i = 0; i < 2; i++) { if (sock[i]) { close(sock[i]); sock[i] = 0; } } } static int _init_socks(knet_handle_t knet_h) { int savederrno = 0; if (_init_socketpair(knet_h, knet_h->hostsockfd)) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize internal hostsockpair: %s", strerror(savederrno)); goto exit_fail; } if (_init_socketpair(knet_h, knet_h->dstsockfd)) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize internal dstsockpair: %s", strerror(savederrno)); goto exit_fail; } return 0; exit_fail: errno = savederrno; return -1; } static void _close_socks(knet_handle_t knet_h) { _close_socketpair(knet_h, knet_h->dstsockfd); _close_socketpair(knet_h, knet_h->hostsockfd); } static int _init_buffers(knet_handle_t knet_h) { int savederrno = 0; int i; size_t bufsize; for (i = 0; i < PCKT_FRAG_MAX; i++) { bufsize = ceil((float)KNET_MAX_PACKET_SIZE / (i + 1)) + KNET_HEADER_ALL_SIZE; knet_h->send_to_links_buf[i] = malloc(bufsize); if (!knet_h->send_to_links_buf[i]) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory datafd to link buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->send_to_links_buf[i], 0, bufsize); knet_h->recv_from_sock_buf[i] = malloc(KNET_DATABUFSIZE); if (!knet_h->recv_from_sock_buf[i]) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for app to datafd buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->recv_from_sock_buf[i], 0, KNET_DATABUFSIZE); knet_h->recv_from_links_buf[i] = malloc(KNET_DATABUFSIZE); if (!knet_h->recv_from_links_buf[i]) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for link to datafd buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->recv_from_links_buf[i], 0, KNET_DATABUFSIZE); } knet_h->pingbuf = malloc(KNET_HEADER_PING_SIZE); if (!knet_h->pingbuf) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for hearbeat buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->pingbuf, 0, KNET_HEADER_PING_SIZE); knet_h->pmtudbuf = malloc(KNET_PMTUD_SIZE_V6); if (!knet_h->pmtudbuf) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for pmtud buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->pmtudbuf, 0, KNET_PMTUD_SIZE_V6); for (i = 0; i < PCKT_FRAG_MAX; i++) { bufsize = ceil((float)KNET_MAX_PACKET_SIZE / (i + 1)) + KNET_HEADER_ALL_SIZE + KNET_DATABUFSIZE_CRYPT_PAD; knet_h->send_to_links_buf_crypt[i] = malloc(bufsize); if (!knet_h->send_to_links_buf_crypt[i]) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for crypto datafd to link buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->send_to_links_buf_crypt[i], 0, bufsize); } knet_h->recv_from_links_buf_decrypt = malloc(KNET_DATABUFSIZE_CRYPT); if (!knet_h->recv_from_links_buf_decrypt) { savederrno = errno; log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto link to datafd buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->recv_from_links_buf_decrypt, 0, KNET_DATABUFSIZE_CRYPT); knet_h->recv_from_links_buf_crypt = malloc(KNET_DATABUFSIZE_CRYPT); if (!knet_h->recv_from_links_buf_crypt) { savederrno = errno; log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto link to datafd buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->recv_from_links_buf_crypt, 0, KNET_DATABUFSIZE_CRYPT); knet_h->pingbuf_crypt = malloc(KNET_DATABUFSIZE_CRYPT); if (!knet_h->pingbuf_crypt) { savederrno = errno; log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto hearbeat buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->pingbuf_crypt, 0, KNET_DATABUFSIZE_CRYPT); knet_h->pmtudbuf_crypt = malloc(KNET_DATABUFSIZE_CRYPT); if (!knet_h->pmtudbuf_crypt) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for crypto pmtud buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->pmtudbuf_crypt, 0, KNET_DATABUFSIZE_CRYPT); return 0; exit_fail: errno = savederrno; return -1; } static void _destroy_buffers(knet_handle_t knet_h) { int i; for (i = 0; i < PCKT_FRAG_MAX; i++) { free(knet_h->send_to_links_buf[i]); free(knet_h->recv_from_sock_buf[i]); free(knet_h->send_to_links_buf_crypt[i]); free(knet_h->recv_from_links_buf[i]); } free(knet_h->recv_from_links_buf_decrypt); free(knet_h->recv_from_links_buf_crypt); free(knet_h->pingbuf); free(knet_h->pingbuf_crypt); free(knet_h->pmtudbuf); free(knet_h->pmtudbuf_crypt); } static int _init_epolls(knet_handle_t knet_h) { struct epoll_event ev; int savederrno = 0; /* * even if the kernel does dynamic allocation with epoll_ctl * we need to reserve one extra for host to host communication */ knet_h->send_to_links_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS + 1); if (knet_h->send_to_links_epollfd < 0) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to create epoll datafd to link fd: %s", strerror(savederrno)); goto exit_fail; } knet_h->recv_from_links_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS); if (knet_h->recv_from_links_epollfd < 0) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to create epoll link to datafd fd: %s", strerror(savederrno)); goto exit_fail; } knet_h->dst_link_handler_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS); if (knet_h->dst_link_handler_epollfd < 0) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to create epoll dst cache fd: %s", strerror(savederrno)); goto exit_fail; } if (_fdset_cloexec(knet_h->send_to_links_epollfd)) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on datafd to link epoll fd: %s", strerror(savederrno)); goto exit_fail; } if (_fdset_cloexec(knet_h->recv_from_links_epollfd)) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on link to datafd epoll fd: %s", strerror(savederrno)); goto exit_fail; } if (_fdset_cloexec(knet_h->dst_link_handler_epollfd)) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on dst cache epoll fd: %s", strerror(savederrno)); goto exit_fail; } memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = knet_h->hostsockfd[0]; if (epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_ADD, knet_h->hostsockfd[0], &ev)) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to add hostsockfd[0] to epoll pool: %s", strerror(savederrno)); goto exit_fail; } memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = knet_h->dstsockfd[0]; if (epoll_ctl(knet_h->dst_link_handler_epollfd, EPOLL_CTL_ADD, knet_h->dstsockfd[0], &ev)) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to add dstsockfd[0] to epoll pool: %s", strerror(savederrno)); goto exit_fail; } return 0; exit_fail: errno = savederrno; return -1; } static void _close_epolls(knet_handle_t knet_h) { struct epoll_event ev; int i; memset(&ev, 0, sizeof(struct epoll_event)); for (i = 0; i < KNET_DATAFD_MAX; i++) { if (knet_h->sockfd[i].in_use) { epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->sockfd[i].sockfd[knet_h->sockfd[i].is_created], &ev); if (knet_h->sockfd[i].sockfd[knet_h->sockfd[i].is_created]) { _close_socketpair(knet_h, knet_h->sockfd[i].sockfd); } } } epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->hostsockfd[0], &ev); epoll_ctl(knet_h->dst_link_handler_epollfd, EPOLL_CTL_DEL, knet_h->dstsockfd[0], &ev); close(knet_h->send_to_links_epollfd); close(knet_h->recv_from_links_epollfd); close(knet_h->dst_link_handler_epollfd); } static int _start_transports(knet_handle_t knet_h) { int i, savederrno = 0, err = 0; for (i=0; itransport_ops[i] = get_udp_transport(); break; -#ifdef HAVE_NETINET_SCTP_H case KNET_TRANSPORT_SCTP: knet_h->transport_ops[i] = get_sctp_transport(); break; -#endif } if ((knet_h->transport_ops[i]) && (knet_h->transport_ops[i]->handle_allocate)) { knet_h->transport_ops[i]->handle_allocate(knet_h, &knet_h->transports[i]); if (!knet_h->transports[i]) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Failed to allocate transport handle for %s: %s", knet_h->transport_ops[i]->transport_name, strerror(savederrno)); err = -1; goto out; } } } out: errno = savederrno; return err; } static void _stop_transports(knet_handle_t knet_h) { int i; for (i=0; itransport_ops[i]) && (knet_h->transport_ops[i]->handle_free)) { knet_h->transport_ops[i]->handle_free(knet_h, knet_h->transports[i]); } } } static int _start_threads(knet_handle_t knet_h) { int savederrno = 0; savederrno = pthread_create(&knet_h->pmtud_link_handler_thread, 0, _handle_pmtud_link_thread, (void *) knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to start pmtud link thread: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_create(&knet_h->dst_link_handler_thread, 0, _handle_dst_link_handler_thread, (void *) knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to start dst cache thread: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_create(&knet_h->send_to_links_thread, 0, _handle_send_to_links_thread, (void *) knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to start datafd to link thread: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_create(&knet_h->recv_from_links_thread, 0, _handle_recv_from_links_thread, (void *) knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to start link to datafd thread: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_create(&knet_h->heartbt_thread, 0, _handle_heartbt_thread, (void *) knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to start heartbeat thread: %s", strerror(savederrno)); goto exit_fail; } return 0; exit_fail: errno = savederrno; return -1; } static void _stop_threads(knet_handle_t knet_h) { void *retval; /* * allow threads to catch on shutdown request * and release locks before we stop them. * this isn't the most efficent way to handle it * but it works good enough for now */ sleep(1); pthread_mutex_lock(&knet_h->host_mutex); pthread_cond_signal(&knet_h->host_cond); pthread_mutex_unlock(&knet_h->host_mutex); if (knet_h->heartbt_thread) { pthread_cancel(knet_h->heartbt_thread); pthread_join(knet_h->heartbt_thread, &retval); } if (knet_h->send_to_links_thread) { pthread_cancel(knet_h->send_to_links_thread); pthread_join(knet_h->send_to_links_thread, &retval); } if (knet_h->recv_from_links_thread) { pthread_cancel(knet_h->recv_from_links_thread); pthread_join(knet_h->recv_from_links_thread, &retval); } if (knet_h->dst_link_handler_thread) { pthread_cancel(knet_h->dst_link_handler_thread); pthread_join(knet_h->dst_link_handler_thread, &retval); } pthread_mutex_lock(&knet_h->pmtud_mutex); pthread_cond_signal(&knet_h->pmtud_cond); pthread_mutex_unlock(&knet_h->pmtud_mutex); sleep(1); if (knet_h->pmtud_link_handler_thread) { pthread_cancel(knet_h->pmtud_link_handler_thread); pthread_join(knet_h->pmtud_link_handler_thread, &retval); } } knet_handle_t knet_handle_new(uint16_t host_id, int log_fd, uint8_t default_log_level) { knet_handle_t knet_h; int savederrno = 0; struct rlimit cur; if (getrlimit(RLIMIT_NOFILE, &cur) < 0) { return NULL; } if ((log_fd < 0) || (log_fd >= cur.rlim_max)) { errno = EINVAL; return NULL; } /* * validate incoming request */ if ((log_fd) && (default_log_level > KNET_LOG_DEBUG)) { errno = EINVAL; return NULL; } /* * allocate handle */ knet_h = malloc(sizeof(struct knet_handle)); if (!knet_h) { errno = ENOMEM; return NULL; } memset(knet_h, 0, sizeof(struct knet_handle)); savederrno = pthread_mutex_lock(&handle_config_mutex); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get handle mutex lock: %s", strerror(savederrno)); errno = savederrno; goto exit_fail; } /* * copy config in place */ knet_h->host_id = host_id; knet_h->logfd = log_fd; if (knet_h->logfd > 0) { memset(&knet_h->log_levels, default_log_level, KNET_MAX_SUBSYSTEMS); } /* * set pmtud default timers */ knet_h->pmtud_interval = KNET_PMTUD_DEFAULT_INTERVAL; /* * init main locking structures */ if (_init_locks(knet_h)) { savederrno = errno; goto exit_fail; } /* * init sockets */ if (_init_socks(knet_h)) { savederrno = errno; goto exit_fail; } /* * allocate packet buffers */ if (_init_buffers(knet_h)) { savederrno = errno; goto exit_fail; } /* * create epoll fds */ if (_init_epolls(knet_h)) { savederrno = errno; goto exit_fail; } /* * start transports */ if (_start_transports(knet_h)) { savederrno = errno; goto exit_fail; } /* * start internal threads */ if (_start_threads(knet_h)) { savederrno = errno; goto exit_fail; } pthread_mutex_unlock(&handle_config_mutex); return knet_h; exit_fail: pthread_mutex_unlock(&handle_config_mutex); knet_handle_free(knet_h); errno = savederrno; return NULL; } int knet_handle_free(knet_handle_t knet_h) { int savederrno = 0; savederrno = pthread_mutex_lock(&handle_config_mutex); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get handle mutex lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h) { pthread_mutex_unlock(&handle_config_mutex); errno = EINVAL; return -1; } if (!knet_h->lock_init_done) { goto exit_nolock; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", strerror(savederrno)); pthread_mutex_unlock(&handle_config_mutex); errno = savederrno; return -1; } if (knet_h->host_head != NULL) { savederrno = EBUSY; log_err(knet_h, KNET_SUB_HANDLE, "Unable to free handle: host(s) or listener(s) are still active: %s", strerror(savederrno)); pthread_rwlock_unlock(&knet_h->global_rwlock); pthread_mutex_unlock(&handle_config_mutex); errno = savederrno; return -1; } knet_h->fini_in_progress = 1; pthread_rwlock_unlock(&knet_h->global_rwlock); _stop_threads(knet_h); _stop_transports(knet_h); _close_epolls(knet_h); _destroy_buffers(knet_h); _close_socks(knet_h); crypto_fini(knet_h); _destroy_locks(knet_h); exit_nolock: free(knet_h); knet_h = NULL; pthread_mutex_unlock(&handle_config_mutex); return 0; } int knet_handle_enable_sock_notify(knet_handle_t knet_h, void *sock_notify_fn_private_data, void (*sock_notify_fn) ( void *private_data, int datafd, int8_t channel, uint8_t tx_rx, int error, int errorno)) { int savederrno = 0, err = 0; if (!knet_h) { errno = EINVAL; return -1; } if (!sock_notify_fn) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } knet_h->sock_notify_fn_private_data = sock_notify_fn_private_data; knet_h->sock_notify_fn = sock_notify_fn; log_debug(knet_h, KNET_SUB_HANDLE, "sock_notify_fn enabled"); pthread_rwlock_unlock(&knet_h->global_rwlock); return err; } int knet_handle_add_datafd(knet_handle_t knet_h, int *datafd, int8_t *channel) { int err = 0, savederrno = 0; int i; struct epoll_event ev; if (!knet_h) { errno = EINVAL; return -1; } if (datafd == NULL) { errno = EINVAL; return -1; } if (channel == NULL) { errno = EINVAL; return -1; } if (*channel >= KNET_DATAFD_MAX) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->sock_notify_fn) { log_err(knet_h, KNET_SUB_HANDLE, "Adding datafd requires sock notify callback enabled!"); savederrno = EINVAL; err = -1; goto out_unlock; } if (*datafd > 0) { for (i = 0; i < KNET_DATAFD_MAX; i++) { if ((knet_h->sockfd[i].in_use) && (knet_h->sockfd[i].sockfd[0] == *datafd)) { log_err(knet_h, KNET_SUB_HANDLE, "requested datafd: %d already exist in index: %d", *datafd, i); savederrno = EEXIST; err = -1; goto out_unlock; } } } /* * auto allocate a channel */ if (*channel < 0) { for (i = 0; i < KNET_DATAFD_MAX; i++) { if (!knet_h->sockfd[i].in_use) { *channel = i; break; } } if (*channel < 0) { savederrno = EBUSY; err = -1; goto out_unlock; } } else { if (knet_h->sockfd[*channel].in_use) { savederrno = EBUSY; err = -1; goto out_unlock; } } knet_h->sockfd[*channel].is_created = 0; knet_h->sockfd[*channel].is_socket = 0; knet_h->sockfd[*channel].has_error = 0; if (*datafd > 0) { int sockopt; socklen_t sockoptlen = sizeof(sockopt); if (_fdset_cloexec(*datafd)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on datafd: %s", strerror(savederrno)); goto out_unlock; } if (_fdset_nonblock(*datafd)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_HANDLE, "Unable to set NONBLOCK on datafd: %s", strerror(savederrno)); goto out_unlock; } knet_h->sockfd[*channel].sockfd[0] = *datafd; knet_h->sockfd[*channel].sockfd[1] = 0; if (!getsockopt(knet_h->sockfd[*channel].sockfd[0], SOL_SOCKET, SO_TYPE, &sockopt, &sockoptlen)) { knet_h->sockfd[*channel].is_socket = 1; } } else { if (_init_socketpair(knet_h, knet_h->sockfd[*channel].sockfd)) { savederrno = errno; err = -1; goto out_unlock; } knet_h->sockfd[*channel].is_created = 1; knet_h->sockfd[*channel].is_socket = 1; *datafd = knet_h->sockfd[*channel].sockfd[0]; } memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created]; if (epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_ADD, knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created], &ev)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_HANDLE, "Unable to add datafd %d to linkfd epoll pool: %s", knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created], strerror(savederrno)); if (knet_h->sockfd[*channel].is_created) { _close_socketpair(knet_h, knet_h->sockfd[*channel].sockfd); } goto out_unlock; } knet_h->sockfd[*channel].in_use = 1; out_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_handle_remove_datafd(knet_handle_t knet_h, int datafd) { int err = 0, savederrno = 0; int8_t channel = -1; int i; struct epoll_event ev; if (!knet_h) { errno = EINVAL; return -1; } if (datafd <= 0) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } for (i = 0; i < KNET_DATAFD_MAX; i++) { if ((knet_h->sockfd[i].in_use) && (knet_h->sockfd[i].sockfd[0] == datafd)) { channel = i; break; } } if (channel < 0) { savederrno = EINVAL; err = -1; goto out_unlock; } if (!knet_h->sockfd[channel].has_error) { memset(&ev, 0, sizeof(struct epoll_event)); if (epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_HANDLE, "Unable to del datafd %d from linkfd epoll pool: %s", knet_h->sockfd[channel].sockfd[0], strerror(savederrno)); goto out_unlock; } } if (knet_h->sockfd[channel].is_created) { _close_socketpair(knet_h, knet_h->sockfd[channel].sockfd); } memset(&knet_h->sockfd[channel], 0, sizeof(struct knet_sock)); out_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_handle_get_datafd(knet_handle_t knet_h, const int8_t channel, int *datafd) { int err = 0, savederrno = 0; if (!knet_h) { errno = EINVAL; return -1; } if ((channel < 0) || (channel >= KNET_DATAFD_MAX)) { errno = EINVAL; return -1; } if (datafd == NULL) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->sockfd[channel].in_use) { savederrno = EINVAL; err = -1; goto out_unlock; } *datafd = knet_h->sockfd[channel].sockfd[0]; out_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_handle_get_channel(knet_handle_t knet_h, const int datafd, int8_t *channel) { int err = 0, savederrno = 0; int i; if (!knet_h) { errno = EINVAL; return -1; } if (datafd <= 0) { errno = EINVAL; return -1; } if (channel == NULL) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } *channel = -1; for (i = 0; i < KNET_DATAFD_MAX; i++) { if ((knet_h->sockfd[i].in_use) && (knet_h->sockfd[i].sockfd[0] == datafd)) { *channel = i; break; } } if (*channel < 0) { savederrno = EINVAL; err = -1; goto out_unlock; } out_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_handle_enable_filter(knet_handle_t knet_h, void *dst_host_filter_fn_private_data, int (*dst_host_filter_fn) ( void *private_data, const unsigned char *outdata, ssize_t outdata_len, uint8_t tx_rx, uint16_t this_host_id, uint16_t src_node_id, int8_t *channel, uint16_t *dst_host_ids, size_t *dst_host_ids_entries)) { int savederrno = 0; if (!knet_h) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } knet_h->dst_host_filter_fn_private_data = dst_host_filter_fn_private_data; knet_h->dst_host_filter_fn = dst_host_filter_fn; if (knet_h->dst_host_filter_fn) { log_debug(knet_h, KNET_SUB_HANDLE, "dst_host_filter_fn enabled"); } else { log_debug(knet_h, KNET_SUB_HANDLE, "dst_host_filter_fn disabled"); } pthread_rwlock_unlock(&knet_h->global_rwlock); return 0; } int knet_handle_setfwd(knet_handle_t knet_h, unsigned int enabled) { int savederrno = 0; if (!knet_h) { errno = EINVAL; return -1; } if ((enabled < 0) || (enabled > 1)) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } knet_h->enabled = enabled; if (enabled) { log_debug(knet_h, KNET_SUB_HANDLE, "Data forwarding is enabled"); } else { log_debug(knet_h, KNET_SUB_HANDLE, "Data forwarding is disabled"); } pthread_rwlock_unlock(&knet_h->global_rwlock); return 0; } int knet_handle_pmtud_getfreq(knet_handle_t knet_h, unsigned int *interval) { int savederrno = 0; if (!knet_h) { errno = EINVAL; return -1; } if (!interval) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } *interval = knet_h->pmtud_interval; pthread_rwlock_unlock(&knet_h->global_rwlock); return 0; } int knet_handle_pmtud_setfreq(knet_handle_t knet_h, unsigned int interval) { int savederrno = 0; if (!knet_h) { errno = EINVAL; return -1; } if ((!interval) || (interval > 86400)) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } knet_h->pmtud_interval = interval; log_debug(knet_h, KNET_SUB_HANDLE, "PMTUd interval set to: %u seconds", interval); pthread_rwlock_unlock(&knet_h->global_rwlock); return 0; } int knet_handle_enable_pmtud_notify(knet_handle_t knet_h, void *pmtud_notify_fn_private_data, void (*pmtud_notify_fn) ( void *private_data, unsigned int data_mtu)) { int savederrno = 0; if (!knet_h) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } knet_h->pmtud_notify_fn_private_data = pmtud_notify_fn_private_data; knet_h->pmtud_notify_fn = pmtud_notify_fn; if (knet_h->pmtud_notify_fn) { log_debug(knet_h, KNET_SUB_HANDLE, "pmtud_notify_fn enabled"); } else { log_debug(knet_h, KNET_SUB_HANDLE, "pmtud_notify_fn disabled"); } pthread_rwlock_unlock(&knet_h->global_rwlock); return 0; } int knet_handle_pmtud_get(knet_handle_t knet_h, unsigned int *data_mtu) { int savederrno = 0; if (!knet_h) { errno = EINVAL; return -1; } if (!data_mtu) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } *data_mtu = knet_h->data_mtu; pthread_rwlock_unlock(&knet_h->global_rwlock); return 0; } int knet_handle_crypto(knet_handle_t knet_h, struct knet_handle_crypto_cfg *knet_handle_crypto_cfg) { int savederrno = 0; int err = 0; if (!knet_h) { errno = EINVAL; return -1; } if (!knet_handle_crypto_cfg) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } crypto_fini(knet_h); if ((!strncmp("none", knet_handle_crypto_cfg->crypto_model, 4)) || ((!strncmp("none", knet_handle_crypto_cfg->crypto_cipher_type, 4)) && (!strncmp("none", knet_handle_crypto_cfg->crypto_hash_type, 4)))) { log_debug(knet_h, KNET_SUB_CRYPTO, "crypto is not enabled"); err = 0; goto exit_unlock; } if (knet_handle_crypto_cfg->private_key_len < KNET_MIN_KEY_LEN) { log_debug(knet_h, KNET_SUB_CRYPTO, "private key len too short (min %u): %u", KNET_MIN_KEY_LEN, knet_handle_crypto_cfg->private_key_len); savederrno = EINVAL; err = -1; goto exit_unlock; } if (knet_handle_crypto_cfg->private_key_len > KNET_MAX_KEY_LEN) { log_debug(knet_h, KNET_SUB_CRYPTO, "private key len too long (max %u): %u", KNET_MAX_KEY_LEN, knet_handle_crypto_cfg->private_key_len); savederrno = EINVAL; err = -1; goto exit_unlock; } err = crypto_init(knet_h, knet_handle_crypto_cfg); if (err) { err = -2; } exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } ssize_t knet_recv(knet_handle_t knet_h, char *buff, const size_t buff_len, const int8_t channel) { int savederrno = 0; ssize_t err = 0; struct iovec iov_in; if (!knet_h) { errno = EINVAL; return -1; } if (buff == NULL) { errno = EINVAL; return -1; } if (buff_len <= 0) { errno = EINVAL; return -1; } if (buff_len > KNET_MAX_PACKET_SIZE) { errno = EINVAL; return -1; } if (channel < 0) { errno = EINVAL; return -1; } if (channel >= KNET_DATAFD_MAX) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->sockfd[channel].in_use) { savederrno = EINVAL; err = -1; goto out_unlock; } memset(&iov_in, 0, sizeof(iov_in)); iov_in.iov_base = (void *)buff; iov_in.iov_len = buff_len; err = readv(knet_h->sockfd[channel].sockfd[0], &iov_in, 1); savederrno = errno; out_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } ssize_t knet_send(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel) { int savederrno = 0; ssize_t err = 0; struct iovec iov_out[1]; if (!knet_h) { errno = EINVAL; return -1; } if (buff == NULL) { errno = EINVAL; return -1; } if (buff_len <= 0) { errno = EINVAL; return -1; } if (buff_len > KNET_MAX_PACKET_SIZE) { errno = EINVAL; return -1; } if (channel < 0) { errno = EINVAL; return -1; } if (channel >= KNET_DATAFD_MAX) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->sockfd[channel].in_use) { savederrno = EINVAL; err = -1; goto out_unlock; } memset(iov_out, 0, sizeof(iov_out)); iov_out[0].iov_base = (void *)buff; iov_out[0].iov_len = buff_len; err = writev(knet_h->sockfd[channel].sockfd[0], iov_out, 1); savederrno = errno; out_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } diff --git a/libknet/transport_sctp.c b/libknet/transport_sctp.c index 0ad299c6..e0aa9395 100644 --- a/libknet/transport_sctp.c +++ b/libknet/transport_sctp.c @@ -1,704 +1,706 @@ #include "config.h" -#include #include #include #include #include -#include #include #include #include -#include #include -#include -#include -#include "libknet.h" #include "host.h" #include "link.h" #include "logging.h" #include "common.h" #include "transports.h" +#ifdef HAVE_NETINET_SCTP_H +#include + /* * https://en.wikipedia.org/wiki/SCTP_packet_structure */ #define KNET_PMTUD_SCTP_OVERHEAD_COMMON 12 #define KNET_PMTUD_SCTP_OVERHEAD_DATA_CHUNK 16 #define KNET_PMTUD_SCTP_OVERHEAD KNET_PMTUD_SCTP_OVERHEAD_COMMON + KNET_PMTUD_SCTP_OVERHEAD_DATA_CHUNK /* Time to sleep before reconnection attempts. in microseconds */ #define KNET_SCTP_SLEEP_TIME 1000000 #define MAX_ACCEPTED_SOCKS 256 typedef struct sctp_handle_info { knet_handle_t knet_handle; int connect_epollfd; int listen_epollfd; pthread_t connect_thread; pthread_t listen_thread; pthread_rwlock_t links_list_lock; struct knet_list_head links_list; } sctp_handle_info_t; typedef struct sctp_link_info { knet_transport_t transport; knet_handle_t knet_handle; struct knet_link *link; int sendrecv_sock; int listen_sock; int accepted_socks[MAX_ACCEPTED_SOCKS]; struct sockaddr_storage dst_address; struct knet_list_head list; int on_epoll; } sctp_link_info_t; - static int _configure_sctp_socket(knet_handle_t knet_h, int sock, struct sockaddr_storage *address, const char *type) { int err = 0; int value; int savederrno; struct sctp_event_subscribe events; if (_configure_transport_socket(knet_h, sock, address, type) < 0) { err = -1; goto exit_error; } value = 1; if (setsockopt(sock, SOL_SCTP, SCTP_NODELAY, &value, sizeof(value)) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT_T, "Unable to set sctp nodelay: %s", strerror(savederrno)); goto exit_error; } /* Events we want notifications for */ memset(&events, 0, sizeof (events)); events.sctp_data_io_event = 1; events.sctp_association_event = 1; events.sctp_send_failure_event = 1; events.sctp_address_event = 1; events.sctp_peer_error_event = 1; events.sctp_shutdown_event = 1; if (setsockopt(sock, IPPROTO_SCTP, SCTP_EVENTS, &events, sizeof (events)) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_SCTP_LINK_T, "Unable to enable %s events: %s", type, strerror(savederrno)); goto exit_error; } err = 0; exit_error: return err; } /* Listener received a new connection */ static void _handle_incoming_sctp(sctp_handle_info_t *handle_info, sctp_link_info_t *info) { knet_handle_t knet_h = handle_info->knet_handle; int new_fd; int i; struct epoll_event ev; struct sockaddr_storage ss; socklen_t sock_len = sizeof(ss); new_fd = accept(info->listen_sock, (struct sockaddr *)&ss, &sock_len); if (new_fd < 0) { log_warn(knet_h, KNET_SUB_SCTP_LINK_T, "Incoming: accept error: %s", strerror(errno)); return; } if (_fdset_cloexec(new_fd)) { log_debug(knet_h, KNET_SUB_SCTP_LINK_T, "Incoming: unable to set cloexec opts: %s", strerror(errno)); return; } /* Keep a track of all accepted FDs */ for (i=0; iaccepted_socks[i] == -1) { info->accepted_socks[i] = new_fd; break; } } if (i == MAX_ACCEPTED_SOCKS) { log_err(knet_h, KNET_SUB_SCTP_LINK_T, "Incoming: too many connections!"); close(new_fd); return; } memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = new_fd; if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_ADD, new_fd, &ev)) { log_err(knet_h, KNET_SUB_SCTP_LINK_T, "Incoming: unable to add accepted socket %d to epoll pool: %s", new_fd, strerror(errno)); info->accepted_socks[i] = -1; close(new_fd); } else { char *print_str[2]; _transport_addrtostr((struct sockaddr *)&ss, sizeof(ss), print_str); log_debug(knet_h, KNET_SUB_SCTP_LINK_T, "Incoming: accepted new fd %d for %s (listen fd: %d). index: %d", new_fd, print_str[0], info->listen_sock, i); _transport_addrtostr_free(print_str); } } static int _create_connect_socket(knet_handle_t knet_h, sctp_handle_info_t *handle_info, sctp_link_info_t *info, int do_close) { int sendrecv_sock; int savederrno = EINVAL; struct epoll_event ev; char *print_str[2]; memset(&ev, 0, sizeof(struct epoll_event)); if (do_close || info->sendrecv_sock != -1) { if (info->on_epoll) { ev.events = EPOLLOUT; ev.data.ptr = info; if (epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_DEL, info->sendrecv_sock, &ev)) { log_err(knet_h, KNET_SUB_SCTP_LINK_T, "Unable to remove connected socket from the epoll pool: %s", strerror(errno)); } } close(info->sendrecv_sock); info->on_epoll = 0; sendrecv_sock = socket(info->dst_address.ss_family, SOCK_STREAM, IPPROTO_SCTP); if (sendrecv_sock < 0) { savederrno = errno; sendrecv_sock = -1; log_err(knet_h, KNET_SUB_SCTP_LINK_T, "Unable to create send/recv socket: %s", strerror(savederrno)); goto exit_error; } if (_configure_sctp_socket(knet_h, sendrecv_sock, &info->dst_address, "send/recv") < 0) { /* Error already reported */ goto exit_error; } } else { sendrecv_sock = info->sendrecv_sock; } if (connect(sendrecv_sock, (struct sockaddr *)&info->dst_address, sizeof(struct sockaddr_storage)) < 0) { if (errno != EINPROGRESS && errno != EISCONN) { savederrno = errno; sendrecv_sock = -1; log_err(knet_h, KNET_SUB_SCTP_LINK_T, "Unable to connect SCTP socket: %s", strerror(savederrno)); goto exit_error; } } memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLOUT; ev.data.ptr = info; if (epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_ADD, sendrecv_sock, &ev)) { savederrno = errno; sendrecv_sock = -1; log_err(knet_h, KNET_SUB_SCTP_LINK_T, "Unable to add send/recv to epoll pool: %s", strerror(savederrno)); goto exit_error; } info->on_epoll = 1; _transport_addrtostr((struct sockaddr *)&info->dst_address, sizeof(struct sockaddr_storage), print_str); log_debug(knet_h, KNET_SUB_SCTP_LINK_T, "New connect attempt to %s on fd %d", print_str[0], sendrecv_sock); _transport_addrtostr_free(print_str); exit_error: return sendrecv_sock; } /* Connect completed or failed */ static void _handle_connected_sctp(sctp_handle_info_t *handle_info, sctp_link_info_t *info) { knet_handle_t knet_h = handle_info->knet_handle; struct epoll_event ev; int err; char *print_str[2]; unsigned int status, len = sizeof(status); int fd = info->sendrecv_sock; err = getsockopt(fd, SOL_SOCKET, SO_ERROR, &status, &len); if (err || status) { if (err) { log_err(knet_h, KNET_SUB_SCTP_LINK_T, "SCTP getsockopt() on connecting socket %d failed: %s", fd, strerror(errno)); } else { _transport_addrtostr((struct sockaddr *)&info->dst_address, sizeof(struct sockaddr_storage), print_str); log_info(knet_h, KNET_SUB_SCTP_LINK_T, "SCTP connect on %d to %s failed: %s", fd, print_str[0], strerror(status)); _transport_addrtostr_free(print_str); /* Retry connect */ usleep(KNET_SCTP_SLEEP_TIME); /* No need to create a new socket if connect failed, * just retry connect */ info->sendrecv_sock = _create_connect_socket(knet_h, handle_info, info, 0); } return; } /* Connected - Remove us from the connect epoll */ ev.events = EPOLLOUT; ev.data.ptr = info; if (epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_DEL, fd, &ev)) { log_err(knet_h, KNET_SUB_SCTP_LINK_T, "Unable to remove connected socket %d from epoll pool: %s", fd, strerror(errno)); } info->on_epoll = 0; ev.events = EPOLLIN; ev.data.fd = fd; if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_ADD, fd, &ev)) { log_err(knet_h, KNET_SUB_SCTP_LINK_T, "Unable to add connected socket to epoll pool: %s", strerror(errno)); } _transport_addrtostr((struct sockaddr *)&info->dst_address, sizeof(struct sockaddr_storage), print_str); log_debug(knet_h, KNET_SUB_SCTP_LINK_T, "SCTP handler fd %d now connected to %s", fd, print_str[0]); _transport_addrtostr_free(print_str); } static void *_sctp_listen_thread(void *data) { int i, nev; sctp_handle_info_t *handle_info = (sctp_handle_info_t*) data; knet_handle_t knet_h = handle_info->knet_handle; struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; while (!knet_h->fini_in_progress) { nev = epoll_wait(handle_info->listen_epollfd, events, KNET_EPOLL_MAX_EVENTS, -1); if (knet_h->fini_in_progress) { break; } if (nev < 0) { log_debug(knet_h, KNET_SUB_SCTP_LINK_T, "SCTP listen handler EPOLL ERROR: %s", strerror(errno)); continue; } /* Sort out which FD has an incoming connection */ for (i = 0; i < nev; i++) { _handle_incoming_sctp(handle_info, events[i].data.ptr); } } return NULL; } static void *_sctp_connect_thread(void *data) { int i, nev; sctp_handle_info_t *handle_info = (sctp_handle_info_t*) data; knet_handle_t knet_h = handle_info->knet_handle; struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; while (!knet_h->fini_in_progress) { nev = epoll_wait(handle_info->connect_epollfd, events, KNET_EPOLL_MAX_EVENTS, -1); if (knet_h->fini_in_progress) { break; } if (nev < 0) { log_debug(knet_h, KNET_SUB_SCTP_LINK_T, "SCTP connect handler EPOLL ERROR: %s", strerror(errno)); continue; } /* Sort out which FD has a connection */ for (i = 0; i < nev; i++) { _handle_connected_sctp(handle_info, events[i].data.ptr); } } return NULL; } /* * EOF on the socket, find the link and set it waiting for connect() again * Returns -1 if the fd is not known to us. * The fd is already removed from the main epoll by the time we get here. */ static int sctp_handle_fd_eof(knet_handle_t knet_h, int sock_fd) { sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; sctp_link_info_t *info; int ret = -1; int i; /* Not us */ if (!handle_info) { return ret; } pthread_rwlock_rdlock(&handle_info->links_list_lock); knet_list_for_each_entry(info, &handle_info->links_list, list) { if (sock_fd == info->sendrecv_sock) { pthread_rwlock_unlock(&handle_info->links_list_lock); log_info(knet_h, KNET_SUB_SCTP_LINK_T, "Restarting connect for closed socket %d", sock_fd); /* Restart the connect() attempts */ info->sendrecv_sock = _create_connect_socket(knet_h, handle_info, info, 1); info->link->outsock = info->sendrecv_sock; return 0; } /* Accepted socket - just close it */ for (i=0; iaccepted_socks[i]) { log_info(knet_h, KNET_SUB_SCTP_LINK_T, "Closing accepted socket %d", sock_fd); close(sock_fd); info->accepted_socks[i] = -1; pthread_rwlock_unlock(&handle_info->links_list_lock); return 0; } } } pthread_rwlock_unlock(&handle_info->links_list_lock); log_info(knet_h, KNET_SUB_SCTP_LINK_T, "Cannot find link_info for EOF socket %d", sock_fd); return -1; } static int sctp_handle_allocate(knet_handle_t knet_h, knet_transport_t *transport) { sctp_handle_info_t *handle_info; int savederrno; handle_info = malloc(sizeof(sctp_handle_info_t)); if (!handle_info) { return -1; } handle_info->knet_handle = knet_h; knet_list_init(&handle_info->links_list); pthread_rwlock_init(&handle_info->links_list_lock, NULL); handle_info->listen_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS + 1); if (handle_info->listen_epollfd < 0) { savederrno = errno; log_err(knet_h, KNET_SUB_SCTP_LINK_T, "Unable to create epoll listen fd: %s", strerror(savederrno)); goto exit_fail; } handle_info->connect_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS + 1); if (handle_info->connect_epollfd < 0) { savederrno = errno; log_err(knet_h, KNET_SUB_SCTP_LINK_T, "Unable to create epoll connect fd: %s", strerror(savederrno)); goto exit_fail; } /* Start connect & listener threads */ savederrno = pthread_create(&handle_info->listen_thread, NULL, _sctp_listen_thread, handle_info); if (savederrno) { log_err(knet_h, KNET_SUB_SCTP_LINK_T, "Unable to start sctp listen thread: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_create(&handle_info->connect_thread, NULL, _sctp_connect_thread, handle_info); if (savederrno) { log_err(knet_h, KNET_SUB_SCTP_LINK_T, "Unable to start sctp connect thread: %s", strerror(savederrno)); goto exit_fail; } *transport = handle_info; return 0; exit_fail: errno = savederrno; return -1; } static int sctp_handle_free(knet_handle_t knet_h, knet_transport_t transport) { sctp_handle_info_t *handle_info; void *thread_status; if (!transport) { errno = EINVAL; return -1; } handle_info = transport; if (handle_info->listen_thread) { pthread_cancel(handle_info->listen_thread); pthread_join(handle_info->listen_thread, &thread_status); } if (handle_info->connect_thread) { pthread_cancel(handle_info->connect_thread); pthread_join(handle_info->connect_thread, &thread_status); } free(handle_info); return 0; } static int sctp_link_listener_start(knet_handle_t knet_h, knet_transport_link_t transport_link, uint8_t link_id, struct sockaddr_storage *address, struct sockaddr_storage *dst_address) { int listen_sock; int savederrno = EINVAL; struct epoll_event ev; int err; sctp_link_info_t *info; sctp_handle_info_t *handle_info; char *print_str[2]; info = (sctp_link_info_t *)transport_link; handle_info = info->transport; listen_sock = socket(address->ss_family, SOCK_STREAM, IPPROTO_SCTP); if (listen_sock < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_SCTP_LINK_T, "Unable to create listener socket: %s", strerror(savederrno)); goto exit_error; } if (_configure_sctp_socket(knet_h, listen_sock, address, "listener") < 0) { /* Error already reported */ goto exit_error; } if (bind(listen_sock, (struct sockaddr *)address, sizeof(struct sockaddr_storage)) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_SCTP_LINK_T, "Unable to bind listener socket: %s", strerror(savederrno)); goto exit_error; } if (listen(listen_sock, 5) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_SCTP_LINK_T, "Unable to listen on listener socket: %s", strerror(savederrno)); goto exit_error; } memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.ptr = info; if (epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_ADD, listen_sock, &ev)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_SCTP_LINK_T, "Unable to add listener to epoll pool: %s", strerror(savederrno)); goto exit_error; } info->listen_sock = listen_sock; _transport_addrtostr((struct sockaddr *)address, sizeof(struct sockaddr_storage), print_str); log_debug(knet_h, KNET_SUB_SCTP_LINK_T, "Listening on fd %d for %s", listen_sock, print_str[0]); _transport_addrtostr_free(print_str); return 0; exit_error: errno = savederrno; return err; } static int sctp_link_allocate(knet_handle_t knet_h, knet_transport_t transport, struct knet_link *link, knet_transport_link_t *transport_link, uint8_t link_id, struct sockaddr_storage *address, struct sockaddr_storage *dst_address, int *send_sock) { int savederrno = EINVAL; int err; int i; sctp_link_info_t *info; sctp_handle_info_t *handle_info; info = malloc(sizeof(sctp_link_info_t)); if (!info) { return -1; } info->knet_handle = knet_h; memcpy(&info->dst_address, dst_address, sizeof(struct sockaddr_storage)); handle_info = transport; info->link = link; info->on_epoll = 0; info->sendrecv_sock = -1; for (i=0; i< MAX_ACCEPTED_SOCKS; i++) { info->accepted_socks[i] = -1; } info->sendrecv_sock = _create_connect_socket(knet_h, handle_info, info, 1); if (info->sendrecv_sock == -1) { free(info); err = -1; goto exit_error; } info->transport = transport; pthread_rwlock_wrlock(&handle_info->links_list_lock); knet_list_add(&info->list, &handle_info->links_list); pthread_rwlock_unlock(&handle_info->links_list_lock); *transport_link = (knet_transport_link_t *)info; *send_sock = info->sendrecv_sock; return 0; exit_error: errno = savederrno; return err; } static int sctp_link_free(knet_transport_link_t transport) { sctp_link_info_t *info = (sctp_link_info_t *)transport; sctp_handle_info_t *handle_info = info->transport; int i; struct epoll_event ev; memset(&ev, 0, sizeof(struct epoll_event)); if (info->on_epoll) { ev.events = EPOLLOUT; ev.data.ptr = info; if (epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_DEL, info->sendrecv_sock, &ev)) { log_err(handle_info->knet_handle, KNET_SUB_SCTP_LINK_T, "Unable to remove connected socket from the epoll pool: %s", strerror(errno)); } } ev.events = EPOLLIN; ev.data.ptr = info; if (epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_DEL, info->listen_sock, &ev)) { log_err(handle_info->knet_handle, KNET_SUB_SCTP_LINK_T, "Unable to add listener to epoll pool: %s", strerror(errno)); } close(info->sendrecv_sock); close(info->listen_sock); for (i=0; i< MAX_ACCEPTED_SOCKS; i++) { if (info->accepted_socks[i] > -1) { close(info->accepted_socks[i]); } } pthread_rwlock_wrlock(&handle_info->links_list_lock); knet_list_del(&info->list); pthread_rwlock_unlock(&handle_info->links_list_lock); /* Remove from epoll */ free(transport); return 0; } static int sctp_handle_fd_notification(knet_handle_t knet_h, int sockfd, struct iovec *iov, size_t iovlen) { struct sctp_assoc_change *sac; union sctp_notification *snp; sctp_link_info_t *info; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; char *print_str[2]; int i; /* Find the link associated with this fd */ pthread_rwlock_rdlock(&handle_info->links_list_lock); knet_list_for_each_entry(info, &handle_info->links_list, list) { if (sockfd == info->sendrecv_sock) { for (i=0; i< iovlen; i++) { snp = iov[i].iov_base; switch (snp->sn_header.sn_type) { case SCTP_ASSOC_CHANGE: sac = &snp->sn_assoc_change; if (sac->sac_state == SCTP_COMM_LOST) { _transport_addrtostr((struct sockaddr *)&info->dst_address, sizeof(struct sockaddr_storage), print_str); log_debug(knet_h, KNET_SUB_SCTP_LINK_T, "SCTP shutdown, reconnecting sock %d to %s", sockfd, print_str[0]); _transport_addrtostr_free(print_str); _create_connect_socket(knet_h, handle_info, info, 1); } break; case SCTP_SEND_FAILED: break; case SCTP_PEER_ADDR_CHANGE: break; case SCTP_REMOTE_ERROR: break; case SCTP_SHUTDOWN_EVENT: _transport_addrtostr((struct sockaddr *)&info->dst_address, sizeof(struct sockaddr_storage), print_str); log_debug(knet_h, KNET_SUB_SCTP_LINK_T, "SCTP shutdown, reconnecting sock %d to %s", sockfd, print_str[0]); _transport_addrtostr_free(print_str); _create_connect_socket(knet_h, handle_info, info, 1); break; default: log_debug(knet_h, KNET_SUB_SCTP_LINK_T, "unknown SCTP event type: %hu\n", snp->sn_header.sn_type); break; } pthread_rwlock_unlock(&handle_info->links_list_lock); return 0; } } } pthread_rwlock_unlock(&handle_info->links_list_lock); return -1; } static int sctp_link_get_mtu_overhead(knet_transport_t transport) { return KNET_PMTUD_SCTP_OVERHEAD; } static knet_transport_ops_t sctp_transport_ops = { .handle_allocate = sctp_handle_allocate, .handle_free = sctp_handle_free, .handle_fd_eof = sctp_handle_fd_eof, .handle_fd_notification = sctp_handle_fd_notification, .link_allocate = sctp_link_allocate, .link_listener_start = sctp_link_listener_start, .link_free = sctp_link_free, .link_get_mtu_overhead = sctp_link_get_mtu_overhead, .transport_name = "SCTP", }; - knet_transport_ops_t *get_sctp_transport() { + return &sctp_transport_ops; } +#else // HAVE_NETINET_SCTP_H +knet_transport_ops_t *get_sctp_transport() +{ + return NULL; +} +#endif diff --git a/libknet/transports.h b/libknet/transports.h index 4b74839b..b790befe 100644 --- a/libknet/transports.h +++ b/libknet/transports.h @@ -1,25 +1,22 @@ /* * Copyright (C) 2016 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * * This software licensed under GPL-2.0+, LGPL-2.0+ */ #ifndef __TRANSPORTS_H__ #define __TRANSPORTS_H__ knet_transport_ops_t *get_udp_transport(void); - -#ifdef HAVE_NETINET_SCTP_H knet_transport_ops_t *get_sctp_transport(void); -#endif int _configure_transport_socket(knet_handle_t knet_h, int sock, struct sockaddr_storage *address, const char *type); void _close_socket(knet_handle_t knet_h, int sockfd); void _handle_socket_notification(knet_handle_t knet_h, int sockfd, struct iovec *iov, size_t iovlen); int _transport_addrtostr(const struct sockaddr *sa, socklen_t salen, char *str[2]); void _transport_addrtostr_free(char *str[2]); #endif