diff --git a/configure.ac b/configure.ac index af9b3b1d..bbd127c8 100644 --- a/configure.ac +++ b/configure.ac @@ -1,399 +1,398 @@ # # Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved. # # Authors: Fabio M. Di Nitto # Federico Simoncelli # # This software licensed under GPL-2.0+, LGPL-2.0+ # # -*- Autoconf -*- # Process this file with autoconf to produce a configure script. # AC_PREREQ([2.63]) AC_INIT([kronosnet], m4_esyscmd([build-aux/git-version-gen .tarball-version]), [devel@lists.kronosnet.org]) AC_USE_SYSTEM_EXTENSIONS AM_INIT_AUTOMAKE([1.11.1 dist-bzip2 dist-xz color-tests -Wno-portability subdir-objects]) LT_PREREQ([2.2.6]) LT_INIT AC_CONFIG_MACRO_DIR([m4]) AC_CONFIG_SRCDIR([kronosnetd/main.c]) AC_CONFIG_HEADERS([config.h]) AC_CANONICAL_HOST AC_PROG_LIBTOOL AC_LANG([C]) systemddir=${prefix}/lib/systemd/system if test "$prefix" = "NONE"; then prefix="/usr" if test "$localstatedir" = "\${prefix}/var"; then localstatedir="/var" fi if test "$sysconfdir" = "\${prefix}/etc"; then sysconfdir="/etc" fi if test "$systemddir" = "NONE/lib/systemd/system"; then systemddir=/lib/systemd/system fi if test "$libdir" = "\${exec_prefix}/lib"; then if test -e /usr/lib64; then libdir="/usr/lib64" else libdir="/usr/lib" fi fi fi # Checks for programs. if ! ${MAKE-make} --version /cannot/make/this >/dev/null 2>&1; then AC_MSG_ERROR(["you don't seem to have GNU make; it is required"]) fi AC_PROG_AWK AC_PROG_GREP AC_PROG_SED AC_PROG_CPP AC_PROG_CC AM_PROG_CC_C_O AC_PROG_LN_S AC_PROG_INSTALL AC_PROG_MAKE_SET AC_PROG_CXX AC_PROG_RANLIB AC_CHECK_PROGS([PUBLICAN], [publican], [:]) AC_CHECK_PROGS([PKGCONFIG], [pkg-config]) AC_ARG_ENABLE([poc], [ --enable-poc : build poc code ],, [ enable_poc="yes" ]) AM_CONDITIONAL([BUILD_POC], test x$enable_poc = xyes) AC_ARG_ENABLE([kronosnetd], [ --enable-kronosnetd : Kronosnetd support ],, [ enable_kronosnetd="no" ]) AM_CONDITIONAL([BUILD_KRONOSNETD], test x$enable_kronosnetd = xyes) AC_ARG_ENABLE([libtap], [ --enable-libtap : libtap support ],, [ enable_libtap="no" ]) if test "x$enable_kronosnetd" = xyes; then enable_libtap=yes fi AM_CONDITIONAL([BUILD_LIBTAP], test x$enable_libtap = xyes) AC_ARG_ENABLE([libknet-sctp], [ --enable-libknet-sctp : libknet SCTP support ],, [ enable_libknet_sctp="yes" ]) ## local helper functions # this function checks if CC support options passed as # args. Global CFLAGS are ignored during this test. cc_supports_flag() { saveCPPFLAGS="$CPPFLAGS" CPPFLAGS="$@" if echo $CC | grep -q clang; then CPPFLAGS="-Werror $CPPFLAGS" fi AC_MSG_CHECKING([whether $CC supports "$@"]) AC_PREPROC_IFELSE([AC_LANG_PROGRAM([])], [RC=0; AC_MSG_RESULT([yes])], [RC=1; AC_MSG_RESULT([no])]) CPPFLAGS="$saveCPPFLAGS" return $RC } # helper macro to check libs without adding them to LIBS check_lib_no_libs() { lib_no_libs_arg1=$1 shift lib_no_libs_arg2=$1 shift lib_no_libs_args=$@ AC_CHECK_LIB([$lib_no_libs_arg1], [$lib_no_libs_arg2],,, [$lib_no_libs_args]) LIBS=$ac_check_lib_save_LIBS } # Checks for C features AC_C_INLINE # Checks for libraries. AC_CHECK_LIB([pthread], [pthread_create]) AC_CHECK_LIB([m], [ceil]) AC_CHECK_LIB([rt], [clock_gettime]) PKG_CHECK_MODULES([nss],[nss]) # Checks for header files. AC_CHECK_HEADERS([fcntl.h]) AC_CHECK_HEADERS([stdlib.h]) AC_CHECK_HEADERS([string.h]) AC_CHECK_HEADERS([strings.h]) AC_CHECK_HEADERS([sys/ioctl.h]) AC_CHECK_HEADERS([syslog.h]) AC_CHECK_HEADERS([unistd.h]) AC_CHECK_HEADERS([netinet/in.h]) AC_CHECK_HEADERS([sys/socket.h]) AC_CHECK_HEADERS([arpa/inet.h]) AC_CHECK_HEADERS([netdb.h]) AC_CHECK_HEADERS([limits.h]) AC_CHECK_HEADERS([stdint.h]) AC_CHECK_HEADERS([sys/epoll.h]) if test "x$enable_libknet_sctp" = xyes; then AC_CHECK_HEADERS([netinet/sctp.h],, AC_MSG_ERROR(["missing required SCTP headers"])) fi # Checks for typedefs, structures, and compiler characteristics. AC_C_INLINE AC_TYPE_SIZE_T AC_TYPE_PID_T AC_TYPE_SSIZE_T AC_TYPE_UINT8_T AC_TYPE_UINT16_T AC_TYPE_UINT32_T AC_TYPE_UINT64_T AC_TYPE_INT32_T # Checks for library functions. AC_FUNC_ALLOCA AC_FUNC_FORK AC_FUNC_MALLOC AC_FUNC_REALLOC AC_CHECK_FUNCS([memset]) AC_CHECK_FUNCS([strdup]) AC_CHECK_FUNCS([strerror]) AC_CHECK_FUNCS([dup2]) AC_CHECK_FUNCS([select]) AC_CHECK_FUNCS([socket]) AC_CHECK_FUNCS([inet_ntoa]) AC_CHECK_FUNCS([memmove]) AC_CHECK_FUNCS([strchr]) AC_CHECK_FUNCS([atexit]) AC_CHECK_FUNCS([ftruncate]) AC_CHECK_FUNCS([strrchr]) AC_CHECK_FUNCS([strstr]) AC_CHECK_FUNCS([clock_gettime]) AC_CHECK_FUNCS([strcasecmp]) -AC_CHECK_FUNCS([recvmmsg]) AC_CHECK_FUNCS([kevent]) # if neither sys/epoll.h nor kevent are present, we should fail. if test "x$ac_cv_header_sys_epoll_h" = xno && test "x$ac_cv_func_kevent" = xno; then AC_MSG_ERROR([Both epoll and kevent unavailable on this OS]) fi if test "x$ac_cv_header_sys_epoll_h" = xyes && test "x$ac_cv_func_kevent" = xyes; then AC_MSG_ERROR([Both epoll and kevent available on this OS, please contact the maintainers to fix the code]) fi # checks (for kronosnetd) if test "x$enable_kronosnetd" = xyes; then AC_CHECK_HEADERS([security/pam_appl.h], [AC_CHECK_LIB([pam], [pam_start])], [AC_MSG_ERROR([Unable to find LinuxPAM devel files])]) AC_CHECK_HEADERS([security/pam_misc.h], [AC_CHECK_LIB([pam_misc], [misc_conv])], [AC_MSG_ERROR([Unable to find LinuxPAM MISC devel files])]) PKG_CHECK_MODULES([libqb], [libqb]) AC_CHECK_LIB([qb], [qb_log_thread_priority_set], [have_qb_log_thread_priority_set="yes"], [have_qb_log_thread_priority_set="no"]) if test "x${have_qb_log_thread_priority_set}" = xyes; then AC_DEFINE_UNQUOTED([HAVE_QB_LOG_THREAD_PRIORITY_SET], 1, [have qb_log_thread_priority_set]) fi fi # local options AC_ARG_ENABLE([debug], [ --enable-debug enable debug build. ], [ default="no" ]) AC_ARG_ENABLE([publicandocs], [ --enable-publicandocs enable docs build. ], [ default="no" ]) AC_ARG_WITH([initdefaultdir], [ --with-initdefaultdir : path to /etc/sysconfig/.. or /etc/default dir. ], [ INITDEFAULTDIR="$withval" ], [ INITDEFAULTDIR="$sysconfdir/default" ]) AC_ARG_WITH([initddir], [ --with-initddir=DIR : path to init script directory. ], [ INITDDIR="$withval" ], [ INITDDIR="$sysconfdir/init.d" ]) AC_ARG_WITH([systemddir], [ --with-systemddir=DIR : path to systemd unit files directory. ], [ SYSTEMDDIR="$withval" ], [ SYSTEMDDIR="$systemddir" ]) AC_ARG_WITH([syslogfacility], [ --with-syslogfacility=FACILITY default syslog facility. ], [ SYSLOGFACILITY="$withval" ], [ SYSLOGFACILITY="LOG_DAEMON" ]) AC_ARG_WITH([sysloglevel], [ --with-sysloglevel=LEVEL default syslog level. ], [ SYSLOGLEVEL="$withval" ], [ SYSLOGLEVEL="LOG_INFO" ]) AC_ARG_WITH([defaultadmgroup], [ --with-defaultadmgroup=GROUP define PAM group. Users part of this group will be allowed to configure kronosnet. Others will only receive read-only rights. ], [ DEFAULTADMGROUP="$withval" ], [ DEFAULTADMGROUP="kronosnetadm" ]) ## random vars LOGDIR=${localstatedir}/log/ RUNDIR=${localstatedir}/run/ DEFAULT_CONFIG_DIR=${sysconfdir}/kronosnet ## do subst AM_CONDITIONAL([BUILD_DOCS], [test "x${enable_publicandocs}" = xyes]) AM_CONDITIONAL([DEBUG], [test "x${enable_debug}" = xyes]) AC_SUBST([DEFAULT_CONFIG_DIR]) AC_SUBST([INITDEFAULTDIR]) AC_SUBST([INITDDIR]) AC_SUBST([SYSTEMDDIR]) AC_SUBST([LOGDIR]) AC_SUBST([DEFAULTADMGROUP]) AC_DEFINE_UNQUOTED([DEFAULT_CONFIG_DIR], ["$(eval echo ${DEFAULT_CONFIG_DIR})"], [Default config directory]) AC_DEFINE_UNQUOTED([DEFAULT_CONFIG_FILE], ["$(eval echo ${DEFAULT_CONFIG_DIR}/kronosnetd.conf)"], [Default config file]) AC_DEFINE_UNQUOTED([LOGDIR], ["$(eval echo ${LOGDIR})"], [Default logging directory]) AC_DEFINE_UNQUOTED([DEFAULT_LOG_FILE], ["$(eval echo ${LOGDIR}/kronosnetd.log)"], [Default log file]) AC_DEFINE_UNQUOTED([RUNDIR], ["$(eval echo ${RUNDIR})"], [Default run directory]) AC_DEFINE_UNQUOTED([SYSLOGFACILITY], [$(eval echo ${SYSLOGFACILITY})], [Default syslog facility]) AC_DEFINE_UNQUOTED([SYSLOGLEVEL], [$(eval echo ${SYSLOGLEVEL})], [Default syslog level]) AC_DEFINE_UNQUOTED([DEFAULTADMGROUP], ["$(eval echo ${DEFAULTADMGROUP})"], [Default admin group]) ## *FLAGS handling ENV_CFLAGS="$CFLAGS" ENV_CPPFLAGS="$CPPFLAGS" ENV_LDFLAGS="$LDFLAGS" # debug build stuff if test "x${enable_debug}" = xyes; then AC_DEFINE_UNQUOTED([DEBUG], [1], [Compiling Debugging code]) OPT_CFLAGS="-O0" else OPT_CFLAGS="-O3" fi # gdb flags if test "x${GCC}" = xyes; then GDB_FLAGS="-ggdb3" else GDB_FLAGS="-g" fi # extra warnings EXTRA_WARNINGS="" WARNLIST=" all shadow missing-prototypes missing-declarations strict-prototypes declaration-after-statement pointer-arith write-strings cast-align bad-function-cast missing-format-attribute format=2 format-security format-nonliteral no-long-long unsigned-char gnu89-inline no-strict-aliasing error address cpp overflow parentheses sequence-point switch uninitialized unused-but-set-variable unused-function unused-result unused-value unused-variable " for j in $WARNLIST; do if cc_supports_flag -W$j; then EXTRA_WARNINGS="$EXTRA_WARNINGS -W$j"; fi done CFLAGS="$ENV_CFLAGS $lt_prog_compiler_pic $OPT_CFLAGS $GDB_FLAGS \ $EXTRA_WARNINGS $WERROR_CFLAGS" CPPFLAGS="$ENV_CPPFLAGS" LDFLAGS="$ENV_LDFLAGS $lt_prog_compiler_pic -Wl,--as-needed" AC_CONFIG_FILES([ Makefile init/Makefile libtap/Makefile libtap/libtap.pc kronosnetd/Makefile kronosnetd/kronosnetd.logrotate libknet/Makefile libknet/libknet.pc libknet/tests/Makefile docs/Makefile poc-code/Makefile poc-code/iov-hash/Makefile poc-code/access-list/Makefile ]) AC_OUTPUT diff --git a/libknet/compat.c b/libknet/compat.c index a7b5c244..479db2e7 100644 --- a/libknet/compat.c +++ b/libknet/compat.c @@ -1,159 +1,114 @@ /* * Copyright (C) 2016 Red Hat, Inc. All rights reserved. * * Author: Jan Friesse * * This software licensed under GPL-2.0+, LGPL-2.0+ */ #include "config.h" #include #include #include #include "compat.h" -#ifndef HAVE_RECVMMSG -extern int recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen, - unsigned int flags, struct timespec *timeout) -{ -#ifdef SYS_recvmmsg - /* - * For systems where kernel supports recvmmsg but glibc doesn't (RHEL 6) - */ - return (syscall(SYS_recvmmsg, sockfd, msgvec, vlen, flags, timeout)); -#else - /* - * Generic implementation of recvmmsg using recvmsg - */ - unsigned int i; - ssize_t ret; - - if (vlen == 0) { - return (0); - } - - if ((timeout != NULL) || (flags & MSG_WAITFORONE)) { - /* - * Not implemented - */ - errno = EINVAL; - return (-1); - } - - for (i = 0; i < vlen; i++) { - ret = recvmsg(sockfd, &msgvec[i].msg_hdr, flags); - if (ret >= 0) { - msgvec[i].msg_len = ret; - } else { - if (ret == -1 && errno == EAGAIN) { - ret = 0; - } - break ; - } - } - - return ((ret >= 0) ? i : ret); -#endif -} -#endif - #ifndef HAVE_SYS_EPOLL_H #ifdef HAVE_KEVENT /* for FreeBSD which has kevent instead of epoll */ #include #include #include #include static int32_t _poll_to_filter_(int32_t event) { int32_t out = 0; if (event & POLLIN) out |= EVFILT_READ; if (event & POLLOUT) out |= EVFILT_WRITE; return out; } int epoll_create(int size) { return kqueue(); } int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) { int ret = 0; struct kevent ke; short filters = _poll_to_filter_(event->events); switch (op) { /* The kevent man page says that EV_ADD also does MOD */ case EPOLL_CTL_ADD: case EPOLL_CTL_MOD: EV_SET(&ke, fd, filters, EV_ADD | EV_ENABLE, 0, 0, event->data.ptr); break; case EPOLL_CTL_DEL: EV_SET(&ke, fd, filters, EV_DELETE, 0, 0, event->data.ptr); break; default: errno = EINVAL; return -1; } ret = kevent(epfd, &ke, 1, NULL, 0, NULL); return ret; } int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout_ms) { struct kevent kevents[maxevents]; struct timespec timeout = { 0, 0 }; struct timespec *timeout_ptr = &timeout; uint32_t revents; int event_count; int i; int returned_events; if (timeout_ms != -1) { timeout.tv_sec = timeout_ms/1000; timeout.tv_nsec += (timeout_ms % 1000) * 1000000ULL; } else { timeout_ptr = NULL; } event_count = kevent(epfd, NULL, 0, kevents, maxevents, timeout_ptr); if (event_count == -1) { return -1; } returned_events = 0; for (i = 0; i < event_count; i++) { revents = 0; if (kevents[i].flags & EV_ERROR) { revents |= POLLERR; } if (kevents[i].flags & EV_EOF) { revents |= POLLHUP; } if (kevents[i].filter == EVFILT_READ) { revents |= POLLIN; } if (kevents[i].filter == EVFILT_WRITE) { revents |= POLLOUT; } events[returned_events].events = revents; events[returned_events].data.ptr = kevents[i].udata; returned_events++; } return returned_events; } #endif /* HAVE_KEVENT */ #endif /* HAVE_SYS_EPOLL_H */ diff --git a/libknet/compat.h b/libknet/compat.h index 48297869..6f6cccfc 100644 --- a/libknet/compat.h +++ b/libknet/compat.h @@ -1,66 +1,50 @@ /* * Copyright (C) 2016 Red Hat, Inc. All rights reserved. * * Authors: Jan Friesse * * This software licensed under GPL-2.0+, LGPL-2.0+ */ #ifndef __COMPAT_H__ #define __COMPAT_H__ #include "config.h" #include #include - -/* FreeBSD has recvmmsg but it's a buggy wrapper */ -#ifdef __FreeBSD__ -#define recvmmsg COMPAT_recvmmsg -#undef HAVE_RECVMMSG -#endif - -#ifndef MSG_WAITFORONE -#define MSG_WAITFORONE 0x10000 -#endif - -#ifndef HAVE_RECVMMSG -extern int recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen, - unsigned int flags, struct timespec *timeout); -#endif - #ifndef ETIME #define ETIME ETIMEDOUT #endif #ifdef HAVE_SYS_EPOLL_H #include #else #ifdef HAVE_KEVENT #include #define EPOLL_CTL_ADD 1 #define EPOLL_CTL_MOD 2 #define EPOLL_CTL_DEL 3 #define EPOLLIN POLLIN #define EPOLLOUT POLLOUT typedef union epoll_data { void *ptr; int fd; uint32_t u32; uint64_t u64; } epoll_data_t; struct epoll_event { uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */ }; int epoll_create(int size); int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout_ms); #endif /* HAVE_KEVENT */ #endif /* HAVE_SYS_EPOLL_H */ #endif /* __COMPAT_H__ */ diff --git a/libknet/threads_rx.c b/libknet/threads_rx.c index d85bd429..7dce1e2e 100644 --- a/libknet/threads_rx.c +++ b/libknet/threads_rx.c @@ -1,727 +1,727 @@ /* * Copyright (C) 2010-2017 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under GPL-2.0+, LGPL-2.0+ */ #include "config.h" #include #include #include #include #include "compat.h" #include "crypto.h" #include "host.h" #include "link.h" #include "logging.h" #include "transports.h" #include "threads_common.h" #include "threads_heartbeat.h" #include "threads_rx.h" #include "netutils.h" /* * RECV */ /* * return 1 if a > b * return -1 if b > a * return 0 if they are equal */ static inline int timecmp(struct timespec a, struct timespec b) { if (a.tv_sec != b.tv_sec) { if (a.tv_sec > b.tv_sec) { return 1; } else { return -1; } } else { if (a.tv_nsec > b.tv_nsec) { return 1; } else if (a.tv_nsec < b.tv_nsec) { return -1; } else { return 0; } } } /* * this functions needs to return an index (0 to 7) * to a knet_host_defrag_buf. (-1 on errors) */ static int find_pckt_defrag_buf(knet_handle_t knet_h, struct knet_header *inbuf) { struct knet_host *src_host = knet_h->host_index[inbuf->kh_node]; int i, oldest; /* * check if there is a buffer already in use handling the same seq_num */ for (i = 0; i < KNET_MAX_LINK; i++) { if (src_host->defrag_buf[i].in_use) { if (src_host->defrag_buf[i].pckt_seq == inbuf->khp_data_seq_num) { return i; } } } /* * If there is no buffer that's handling the current seq_num * either it's new or it's been reclaimed already. * check if it's been reclaimed/seen before using the defrag circular * buffer. If the pckt has been seen before, the buffer expired (ETIME) * and there is no point to try to defrag it again. */ if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 1, 0)) { errno = ETIME; return -1; } /* * register the pckt as seen */ _seq_num_set(src_host, inbuf->khp_data_seq_num, 1); /* * see if there is a free buffer */ for (i = 0; i < KNET_MAX_LINK; i++) { if (!src_host->defrag_buf[i].in_use) { return i; } } /* * at this point, there are no free buffers, the pckt is new * and we need to reclaim a buffer, and we will take the one * with the oldest timestamp. It's as good as any. */ oldest = 0; for (i = 0; i < KNET_MAX_LINK; i++) { if (timecmp(src_host->defrag_buf[i].last_update, src_host->defrag_buf[oldest].last_update) < 0) { oldest = i; } } src_host->defrag_buf[oldest].in_use = 0; return oldest; } static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *len) { struct knet_host_defrag_buf *defrag_buf; int defrag_buf_idx; defrag_buf_idx = find_pckt_defrag_buf(knet_h, inbuf); if (defrag_buf_idx < 0) { if (errno == ETIME) { log_debug(knet_h, KNET_SUB_RX, "Defrag buffer expired"); } return 1; } defrag_buf = &knet_h->host_index[inbuf->kh_node]->defrag_buf[defrag_buf_idx]; /* * if the buf is not is use, then make sure it's clean */ if (!defrag_buf->in_use) { memset(defrag_buf, 0, sizeof(struct knet_host_defrag_buf)); defrag_buf->in_use = 1; defrag_buf->pckt_seq = inbuf->khp_data_seq_num; } /* * update timestamp on the buffer */ clock_gettime(CLOCK_MONOTONIC, &defrag_buf->last_update); /* * check if we already received this fragment */ if (defrag_buf->frag_map[inbuf->khp_data_frag_seq]) { /* * if we have received this fragment and we didn't clear the buffer * it means that we don't have all fragments yet */ return 1; } /* * we need to handle the last packet with gloves due to its different size */ if (inbuf->khp_data_frag_seq == inbuf->khp_data_frag_num) { defrag_buf->last_frag_size = *len; /* * in the event when the last packet arrives first, * we still don't know the offset vs the other fragments (based on MTU), * so we store the fragment at the end of the buffer where it's safe * and take a copy of the len so that we can restore its offset later. * remember we can't use the local MTU for this calculation because pMTU * can be asymettric between the same hosts. */ if (!defrag_buf->frag_size) { defrag_buf->last_first = 1; memmove(defrag_buf->buf + (KNET_MAX_PACKET_SIZE - *len), inbuf->khp_data_userdata, *len); } } else { defrag_buf->frag_size = *len; } memmove(defrag_buf->buf + ((inbuf->khp_data_frag_seq - 1) * defrag_buf->frag_size), inbuf->khp_data_userdata, *len); defrag_buf->frag_recv++; defrag_buf->frag_map[inbuf->khp_data_frag_seq] = 1; /* * check if we received all the fragments */ if (defrag_buf->frag_recv == inbuf->khp_data_frag_num) { /* * special case the last pckt */ if (defrag_buf->last_first) { memmove(defrag_buf->buf + ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size), defrag_buf->buf + (KNET_MAX_PACKET_SIZE - defrag_buf->last_frag_size), defrag_buf->last_frag_size); } /* * recalculate packet lenght */ *len = ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size) + defrag_buf->last_frag_size; /* * copy the pckt back in the user data */ memmove(inbuf->khp_data_userdata, defrag_buf->buf, *len); /* * free this buffer */ defrag_buf->in_use = 0; return 0; } return 1; } static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struct knet_mmsghdr *msg) { int err = 0, savederrno = 0; ssize_t outlen; struct knet_host *src_host; struct knet_link *src_link; unsigned long long latency_last; knet_node_id_t dst_host_ids[KNET_MAX_HOST]; size_t dst_host_ids_entries = 0; int bcast = 1; struct timespec recvtime; struct knet_header *inbuf = msg->msg_hdr.msg_iov->iov_base; unsigned char *outbuf = (unsigned char *)msg->msg_hdr.msg_iov->iov_base; ssize_t len = msg->msg_len; struct knet_hostinfo *knet_hostinfo; struct iovec iov_out[1]; int8_t channel; struct sockaddr_storage pckt_src; seq_num_t recv_seq_num; int wipe_bufs = 0; if (knet_h->crypto_instance) { if (crypto_authenticate_and_decrypt(knet_h, (unsigned char *)inbuf, len, knet_h->recv_from_links_buf_decrypt, &outlen) < 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to decrypt/auth packet"); return; } len = outlen; inbuf = (struct knet_header *)knet_h->recv_from_links_buf_decrypt; } if (len < (KNET_HEADER_SIZE + 1)) { log_debug(knet_h, KNET_SUB_RX, "Packet is too short: %ld", len); return; } if (inbuf->kh_version != KNET_HEADER_VERSION) { log_debug(knet_h, KNET_SUB_RX, "Packet version does not match"); return; } inbuf->kh_node = ntohs(inbuf->kh_node); src_host = knet_h->host_index[inbuf->kh_node]; if (src_host == NULL) { /* host not found */ log_debug(knet_h, KNET_SUB_RX, "Unable to find source host for this packet"); return; } src_link = NULL; if ((inbuf->kh_type & KNET_HEADER_TYPE_PMSK) != 0) { src_link = src_host->link + (inbuf->khp_ping_link % KNET_MAX_LINK); if (src_link->dynamic == KNET_LINK_DYNIP) { /* * cpyaddrport will only copy address and port of the incoming * packet and strip extra bits such as flow and scopeid */ cpyaddrport(&pckt_src, msg->msg_hdr.msg_name); if (cmpaddr(&src_link->dst_addr, sockaddr_len(&src_link->dst_addr), &pckt_src, sockaddr_len(&pckt_src)) != 0) { log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u appears to have changed ip address", src_host->host_id, src_link->link_id); memmove(&src_link->dst_addr, &pckt_src, sizeof(struct sockaddr_storage)); if (knet_addrtostr(&src_link->dst_addr, sockaddr_len(msg->msg_hdr.msg_name), src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN, src_link->status.dst_port, KNET_MAX_PORT_LEN) != 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to resolve ???"); snprintf(src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN - 1, "Unknown!!!"); snprintf(src_link->status.dst_port, KNET_MAX_PORT_LEN - 1, "??"); } else { log_info(knet_h, KNET_SUB_RX, "host: %u link: %u new connection established from: %s %s", src_host->host_id, src_link->link_id, src_link->status.dst_ipaddr, src_link->status.dst_port); } } /* * transport has already accepted the connection here * otherwise we would not be receiving packets */ knet_h->transport_ops[src_link->transport_type]->transport_link_dyn_connect(knet_h, sockfd, src_link); } } switch (inbuf->kh_type) { case KNET_HEADER_TYPE_HOST_INFO: case KNET_HEADER_TYPE_DATA: /* * TODO: should we accept data even if we can't reply to the other node? * how would that work with SCTP and guaranteed delivery? */ if (!src_host->status.reachable) { log_debug(knet_h, KNET_SUB_RX, "Source host %u not reachable yet", src_host->host_id); //return; } inbuf->khp_data_seq_num = ntohs(inbuf->khp_data_seq_num); channel = inbuf->khp_data_channel; src_host->got_data = 1; if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) { if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) { log_debug(knet_h, KNET_SUB_RX, "Packet has already been delivered"); } return; } if (inbuf->khp_data_frag_num > 1) { /* * len as received from the socket also includes extra stuff * that the defrag code doesn't care about. So strip it * here and readd only for repadding once we are done * defragging */ len = len - KNET_HEADER_DATA_SIZE; if (pckt_defrag(knet_h, inbuf, &len)) { return; } len = len + KNET_HEADER_DATA_SIZE; } if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) { if (knet_h->enabled != 1) /* data forward is disabled */ break; if (knet_h->dst_host_filter_fn) { int host_idx; int found = 0; bcast = knet_h->dst_host_filter_fn( knet_h->dst_host_filter_fn_private_data, (const unsigned char *)inbuf->khp_data_userdata, len - KNET_HEADER_DATA_SIZE, KNET_NOTIFY_RX, knet_h->host_id, inbuf->kh_node, &channel, dst_host_ids, &dst_host_ids_entries); if (bcast < 0) { log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast); return; } if ((!bcast) && (!dst_host_ids_entries)) { log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries"); return; } /* check if we are dst for this packet */ if (!bcast) { for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) { if (dst_host_ids[host_idx] == knet_h->host_id) { found = 1; break; } } if (!found) { log_debug(knet_h, KNET_SUB_RX, "Packet is not for us"); return; } } } } if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) { if (!knet_h->sockfd[channel].in_use) { log_debug(knet_h, KNET_SUB_RX, "received packet for channel %d but there is no local sock connected", channel); return; } memset(iov_out, 0, sizeof(iov_out)); iov_out[0].iov_base = (void *) inbuf->khp_data_userdata; iov_out[0].iov_len = len - KNET_HEADER_DATA_SIZE; outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1); if (outlen <= 0) { knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data, knet_h->sockfd[channel].sockfd[0], channel, KNET_NOTIFY_RX, outlen, errno); return; } if (outlen == iov_out[0].iov_len) { _seq_num_set(src_host, inbuf->khp_data_seq_num, 0); } } else { /* HOSTINFO */ knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata; if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) { bcast = 0; knet_hostinfo->khi_dst_node_id = ntohs(knet_hostinfo->khi_dst_node_id); } if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) { return; } _seq_num_set(src_host, inbuf->khp_data_seq_num, 0); switch(knet_hostinfo->khi_type) { case KNET_HOSTINFO_TYPE_LINK_UP_DOWN: break; case KNET_HOSTINFO_TYPE_LINK_TABLE: break; default: log_warn(knet_h, KNET_SUB_RX, "Receiving unknown host info message from host %u", src_host->host_id); break; } } break; case KNET_HEADER_TYPE_PING: outlen = KNET_HEADER_PING_SIZE; inbuf->kh_type = KNET_HEADER_TYPE_PONG; inbuf->kh_node = htons(knet_h->host_id); recv_seq_num = ntohs(inbuf->khp_ping_seq_num); wipe_bufs = 0; if (!inbuf->khp_ping_timed) { /* * we might be receiving this message from all links, but we want * to process it only the first time */ if (recv_seq_num != src_host->untimed_rx_seq_num) { /* * cache the untimed seq num */ src_host->untimed_rx_seq_num = recv_seq_num; /* * if the host has received data in between * untimed ping, then we don't need to wipe the bufs */ if (src_host->got_data) { src_host->got_data = 0; wipe_bufs = 0; } else { wipe_bufs = 1; } } _seq_num_lookup(src_host, recv_seq_num, 0, wipe_bufs); } else { /* * pings always arrives in bursts over all the link * catch the first of them to cache the seq num and * avoid duplicate processing */ if (recv_seq_num != src_host->timed_rx_seq_num) { src_host->timed_rx_seq_num = recv_seq_num; if (recv_seq_num == 0) { _seq_num_lookup(src_host, recv_seq_num, 0, 1); } } } if (knet_h->crypto_instance) { if (crypto_encrypt_and_sign(knet_h, (const unsigned char *)inbuf, len, knet_h->recv_from_links_buf_crypt, &outlen) < 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt pong packet"); break; } outbuf = knet_h->recv_from_links_buf_crypt; } retry_pong: len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, (struct sockaddr *) &src_link->dst_addr, sizeof(struct sockaddr_storage)); savederrno = errno; if (len != outlen) { err = knet_h->transport_ops[src_link->transport_type]->transport_tx_sock_error(knet_h, src_link->outsock, len, savederrno); switch(err) { case -1: /* unrecoverable error */ log_debug(knet_h, KNET_SUB_RX, "Unable to send pong reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s", src_link->outsock, errno, strerror(errno), src_link->status.src_ipaddr, src_link->status.src_port, src_link->status.dst_ipaddr, src_link->status.dst_port); break; case 0: /* ignore error and continue */ break; case 1: /* retry to send those same data */ goto retry_pong; break; } } break; case KNET_HEADER_TYPE_PONG: clock_gettime(CLOCK_MONOTONIC, &src_link->status.pong_last); memmove(&recvtime, &inbuf->khp_ping_time[0], sizeof(struct timespec)); timespec_diff(recvtime, src_link->status.pong_last, &latency_last); src_link->status.latency = ((src_link->status.latency * src_link->latency_exp) + ((latency_last / 1000llu) * (src_link->latency_fix - src_link->latency_exp))) / src_link->latency_fix; if (src_link->status.latency < src_link->pong_timeout) { if (!src_link->status.connected) { if (src_link->received_pong >= src_link->pong_count) { log_info(knet_h, KNET_SUB_RX, "host: %u link: %u is up", src_host->host_id, src_link->link_id); _link_updown(knet_h, src_host->host_id, src_link->link_id, src_link->status.enabled, 1); } else { src_link->received_pong++; log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u received pong: %u", src_host->host_id, src_link->link_id, src_link->received_pong); } } } break; case KNET_HEADER_TYPE_PMTUD: outlen = KNET_HEADER_PMTUD_SIZE; inbuf->kh_type = KNET_HEADER_TYPE_PMTUD_REPLY; inbuf->kh_node = htons(knet_h->host_id); if (knet_h->crypto_instance) { if (crypto_encrypt_and_sign(knet_h, (const unsigned char *)inbuf, len, knet_h->recv_from_links_buf_crypt, &outlen) < 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt PMTUd reply packet"); break; } outbuf = knet_h->recv_from_links_buf_crypt; } retry_pmtud: len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, (struct sockaddr *) &src_link->dst_addr, sizeof(struct sockaddr_storage)); if (len != outlen) { err = knet_h->transport_ops[src_link->transport_type]->transport_tx_sock_error(knet_h, src_link->outsock, len, savederrno); switch(err) { case -1: /* unrecoverable error */ log_debug(knet_h, KNET_SUB_RX, "Unable to send PMTUd reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s", src_link->outsock, errno, strerror(errno), src_link->status.src_ipaddr, src_link->status.src_port, src_link->status.dst_ipaddr, src_link->status.dst_port); break; case 0: /* ignore error and continue */ break; case 1: /* retry to send those same data */ goto retry_pmtud; break; } } break; case KNET_HEADER_TYPE_PMTUD_REPLY: if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to get mutex lock"); break; } src_link->last_recv_mtu = inbuf->khp_pmtud_size; pthread_cond_signal(&knet_h->pmtud_cond); pthread_mutex_unlock(&knet_h->pmtud_mutex); break; default: return; } } static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg) { int err, savederrno; int i, msg_recv, transport; if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to get global read lock"); return; } if (_is_valid_fd(knet_h, sockfd) < 1) { /* * this is normal if a fd got an event and before we grab the read lock * and the link is removed by another thread */ goto exit_unlock; } transport = knet_h->knet_transport_fd_tracker[sockfd].transport; /* * reset msg_namelen to buffer size because after recvmmsg * each msg_namelen will contain sizeof sockaddr_in or sockaddr_in6 */ for (i = 0; i < PCKT_FRAG_MAX; i++) { msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); } - msg_recv = recvmmsg(sockfd, (struct mmsghdr *)&msg[0], PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL, NULL); + msg_recv = _recvmmsg(sockfd, (struct mmsghdr *)&msg[0], PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL); savederrno = errno; /* * WARNING: man page for recvmmsg is wrong. Kernel implementation here: * recvmmsg can return: * -1 on error * 0 if the previous run of recvmmsg recorded an error on the socket * N number of messages (see exception below). * * If there is an error from recvmsg after receiving a frame or more, the recvmmsg * loop is interrupted, error recorded in the socket (getsockopt(SO_ERROR) and * it will be visibile in the next run. * * Need to be careful how we handle errors at this stage. * * error messages need to be handled on a per transport/protocol base * at this point we have different layers of error handling * - msg_recv < 0 -> error from this run * msg_recv = 0 -> error from previous run and error on socket needs to be cleared * - per-transport message data * example: msg[i].msg_hdr.msg_flags & MSG_NOTIFICATION or msg_len for SCTP == EOF, * but for UDP it is perfectly legal to receive a 0 bytes message.. go figure * - NOTE: on SCTP MSG_NOTIFICATION we get msg_recv == PCKT_FRAG_MAX messages and no * errno set. That means the error api needs to be able to abort the loop below. */ if (msg_recv <= 0) { knet_h->transport_ops[transport]->transport_rx_sock_error(knet_h, sockfd, msg_recv, savederrno); goto exit_unlock; } for (i = 0; i < msg_recv; i++) { err = knet_h->transport_ops[transport]->transport_rx_is_data(knet_h, sockfd, &msg[i]); /* * TODO: make this section silent once we are confident * all protocols packet handlers are good */ switch(err) { case -1: /* on error */ log_debug(knet_h, KNET_SUB_RX, "Transport reported error parsing packet"); goto exit_unlock; break; case 0: /* packet is not data and we should continue the packet process loop */ log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, continue"); break; case 1: /* packet is not data and we should STOP the packet process loop */ log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, stop"); goto exit_unlock; break; case 2: /* packet is data and should be parsed as such */ _parse_recv_from_links(knet_h, sockfd, &msg[i]); break; } } exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); } void *_handle_recv_from_links_thread(void *data) { int i, nev; knet_handle_t knet_h = (knet_handle_t) data; struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; struct sockaddr_storage address[PCKT_FRAG_MAX]; struct knet_mmsghdr msg[PCKT_FRAG_MAX]; struct iovec iov_in[PCKT_FRAG_MAX]; memset(&msg, 0, sizeof(msg)); for (i = 0; i < PCKT_FRAG_MAX; i++) { iov_in[i].iov_base = (void *)knet_h->recv_from_links_buf[i]; iov_in[i].iov_len = KNET_DATABUFSIZE; memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr)); msg[i].msg_hdr.msg_name = &address[i]; msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); msg[i].msg_hdr.msg_iov = &iov_in[i]; msg[i].msg_hdr.msg_iovlen = 1; } while (!shutdown_in_progress(knet_h)) { nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, -1); for (i = 0; i < nev; i++) { _handle_recv_from_links(knet_h, events[i].data.fd, msg); } } return NULL; } diff --git a/libknet/threads_tx.c b/libknet/threads_tx.c index 23962c8d..6c2bca8d 100644 --- a/libknet/threads_tx.c +++ b/libknet/threads_tx.c @@ -1,609 +1,609 @@ /* * Copyright (C) 2010-2017 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under GPL-2.0+, LGPL-2.0+ */ #include "config.h" #include #include #include #include #include "compat.h" #include "crypto.h" #include "host.h" #include "link.h" #include "logging.h" #include "transports.h" #include "threads_common.h" #include "threads_heartbeat.h" #include "threads_tx.h" #include "netutils.h" /* * SEND */ static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_mmsghdr *msg, int msgs_to_send) { int link_idx, msg_idx, sent_msgs, prev_sent, progress; int err = 0, savederrno = 0; struct knet_mmsghdr *cur; for (link_idx = 0; link_idx < dst_host->active_link_entries; link_idx++) { sent_msgs = 0; prev_sent = 0; progress = 1; msg_idx = 0; while (msg_idx < msgs_to_send) { msg[msg_idx].msg_hdr.msg_name = &dst_host->link[dst_host->active_links[link_idx]].dst_addr; msg_idx++; } retry: cur = &msg[prev_sent]; sent_msgs = _sendmmsg(dst_host->link[dst_host->active_links[link_idx]].outsock, &cur[0], msgs_to_send - prev_sent, MSG_DONTWAIT | MSG_NOSIGNAL); savederrno = errno; err = knet_h->transport_ops[dst_host->link[dst_host->active_links[link_idx]].transport_type]->transport_tx_sock_error(knet_h, dst_host->link[dst_host->active_links[link_idx]].outsock, sent_msgs, savederrno); switch(err) { case -1: /* unrecoverable error */ goto out_unlock; break; case 0: /* ignore error and continue */ break; case 1: /* retry to send those same data */ goto retry; break; } prev_sent = prev_sent + sent_msgs; if ((sent_msgs >= 0) && (prev_sent < msgs_to_send)) { if ((sent_msgs) || (progress)) { if (sent_msgs) { progress = 1; } else { progress = 0; } #ifdef DEBUG log_debug(knet_h, KNET_SUB_TX, "Unable to send all (%d/%d) data packets to host %s (%u) link %s:%s (%u)", sent_msgs, msg_idx, dst_host->name, dst_host->host_id, dst_host->link[dst_host->active_links[link_idx]].status.dst_ipaddr, dst_host->link[dst_host->active_links[link_idx]].status.dst_port, dst_host->link[dst_host->active_links[link_idx]].link_id); #endif goto retry; } if (!progress) { savederrno = EAGAIN; err = -1; goto out_unlock; } } if ((dst_host->link_handler_policy == KNET_LINK_POLICY_RR) && (dst_host->active_link_entries > 1)) { uint8_t cur_link_id = dst_host->active_links[0]; memmove(&dst_host->active_links[0], &dst_host->active_links[1], KNET_MAX_LINK - 1); dst_host->active_links[dst_host->active_link_entries - 1] = cur_link_id; break; } } out_unlock: errno = savederrno; return err; } static int _parse_recv_from_sock(knet_handle_t knet_h, int buf_idx, ssize_t inlen, int8_t channel, int is_sync) { ssize_t outlen, frag_len; struct knet_host *dst_host; knet_node_id_t dst_host_ids_temp[KNET_MAX_HOST]; size_t dst_host_ids_entries_temp = 0; knet_node_id_t dst_host_ids[KNET_MAX_HOST]; size_t dst_host_ids_entries = 0; int bcast = 1; struct knet_hostinfo *knet_hostinfo; struct iovec iov_out[PCKT_FRAG_MAX]; uint8_t frag_idx; unsigned int temp_data_mtu; int host_idx; int send_mcast = 0; struct knet_header *inbuf; int savederrno = 0; int err = 0; seq_num_t tx_seq_num; struct knet_mmsghdr msg[PCKT_FRAG_MAX]; int msgs_to_send, msg_idx; inbuf = knet_h->recv_from_sock_buf[buf_idx]; if ((knet_h->enabled != 1) && (inbuf->kh_type != KNET_HEADER_TYPE_HOST_INFO)) { /* data forward is disabled */ log_debug(knet_h, KNET_SUB_TX, "Received data packet but forwarding is disabled"); savederrno = ECANCELED; err = -1; goto out_unlock; } /* * move this into a separate function to expand on * extra switching rules */ switch(inbuf->kh_type) { case KNET_HEADER_TYPE_DATA: if (knet_h->dst_host_filter_fn) { bcast = knet_h->dst_host_filter_fn( knet_h->dst_host_filter_fn_private_data, (const unsigned char *)inbuf->khp_data_userdata, inlen, KNET_NOTIFY_TX, knet_h->host_id, knet_h->host_id, &channel, dst_host_ids_temp, &dst_host_ids_entries_temp); if (bcast < 0) { log_debug(knet_h, KNET_SUB_TX, "Error from dst_host_filter_fn: %d", bcast); savederrno = EFAULT; err = -1; goto out_unlock; } if ((!bcast) && (!dst_host_ids_entries_temp)) { log_debug(knet_h, KNET_SUB_TX, "Message is unicast but no dst_host_ids_entries"); savederrno = EINVAL; err = -1; goto out_unlock; } } break; case KNET_HEADER_TYPE_HOST_INFO: knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata; if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) { bcast = 0; dst_host_ids_temp[0] = knet_hostinfo->khi_dst_node_id; dst_host_ids_entries_temp = 1; knet_hostinfo->khi_dst_node_id = htons(knet_hostinfo->khi_dst_node_id); } break; default: log_warn(knet_h, KNET_SUB_TX, "Receiving unknown messages from socket"); savederrno = ENOMSG; err = -1; goto out_unlock; break; } if (is_sync) { if ((bcast) || ((!bcast) && (dst_host_ids_entries_temp > 1))) { log_debug(knet_h, KNET_SUB_TX, "knet_send_sync is only supported with unicast packets for one destination"); savederrno = E2BIG; err = -1; goto out_unlock; } } /* * check destinations hosts before spending time * in fragmenting/encrypting packets to save * time processing data for unrechable hosts. * for unicast, also remap the destination data * to skip unreachable hosts. */ if (!bcast) { dst_host_ids_entries = 0; for (host_idx = 0; host_idx < dst_host_ids_entries_temp; host_idx++) { dst_host = knet_h->host_index[dst_host_ids_temp[host_idx]]; if (!dst_host) { continue; } if (dst_host->status.reachable) { dst_host_ids[dst_host_ids_entries] = dst_host_ids_temp[host_idx]; dst_host_ids_entries++; } } if (!dst_host_ids_entries) { savederrno = EHOSTDOWN; err = -1; goto out_unlock; } } else { send_mcast = 0; for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { if (dst_host->status.reachable) { send_mcast = 1; break; } } if (!send_mcast) { savederrno = EHOSTDOWN; err = -1; goto out_unlock; } } if (!knet_h->data_mtu) { /* * using MIN_MTU_V4 for data mtu is not completely accurate but safe enough */ log_debug(knet_h, KNET_SUB_TX, "Received data packet but data MTU is still unknown." " Packet might not be delivered." " Assuming mininum IPv4 mtu (%d)", KNET_PMTUD_MIN_MTU_V4); temp_data_mtu = KNET_PMTUD_MIN_MTU_V4; } else { /* * take a copy of the mtu to avoid value changing under * our feet while we are sending a fragmented pckt */ temp_data_mtu = knet_h->data_mtu; } /* * prepare the outgoing buffers */ frag_len = inlen; frag_idx = 0; inbuf->khp_data_bcast = bcast; inbuf->khp_data_frag_num = ceil((float)inlen / temp_data_mtu); inbuf->khp_data_channel = channel; if (pthread_mutex_lock(&knet_h->tx_seq_num_mutex)) { log_debug(knet_h, KNET_SUB_TX, "Unable to get seq mutex lock"); goto out_unlock; } knet_h->tx_seq_num++; /* * force seq_num 0 to detect a node that has crashed and rejoining * the knet instance. seq_num 0 will clear the buffers in the RX * thread */ if (knet_h->tx_seq_num == 0) { knet_h->tx_seq_num++; } /* * cache the value in locked context */ tx_seq_num = knet_h->tx_seq_num; inbuf->khp_data_seq_num = htons(knet_h->tx_seq_num); pthread_mutex_unlock(&knet_h->tx_seq_num_mutex); /* * forcefully broadcast a ping to all nodes every SEQ_MAX / 8 * pckts. * this solves 2 problems: * 1) on TX socket overloads we generate extra pings to keep links alive * 2) in 3+ nodes setup, where all the traffic is flowing between node 1 and 2, * node 3+ will be able to keep in sync on the TX seq_num even without * receiving traffic or pings in betweens. This avoids issues with * rollover of the circular buffer */ if (tx_seq_num % (SEQ_MAX / 8) == 0) { _send_pings(knet_h, 0); } if (inbuf->khp_data_frag_num > 1) { while (frag_idx < inbuf->khp_data_frag_num) { /* * set the iov_base */ iov_out[frag_idx].iov_base = (void *)knet_h->send_to_links_buf[frag_idx]; /* * set the len */ if (frag_len > temp_data_mtu) { iov_out[frag_idx].iov_len = temp_data_mtu + KNET_HEADER_DATA_SIZE; } else { iov_out[frag_idx].iov_len = frag_len + KNET_HEADER_DATA_SIZE; } /* * copy the frag info on all buffers */ knet_h->send_to_links_buf[frag_idx]->kh_type = inbuf->kh_type; knet_h->send_to_links_buf[frag_idx]->khp_data_seq_num = inbuf->khp_data_seq_num; knet_h->send_to_links_buf[frag_idx]->khp_data_frag_num = inbuf->khp_data_frag_num; knet_h->send_to_links_buf[frag_idx]->khp_data_bcast = inbuf->khp_data_bcast; knet_h->send_to_links_buf[frag_idx]->khp_data_channel = inbuf->khp_data_channel; memmove(knet_h->send_to_links_buf[frag_idx]->khp_data_userdata, inbuf->khp_data_userdata + (temp_data_mtu * frag_idx), iov_out[frag_idx].iov_len - KNET_HEADER_DATA_SIZE); frag_len = frag_len - temp_data_mtu; frag_idx++; } } else { iov_out[frag_idx].iov_base = (void *)inbuf; iov_out[frag_idx].iov_len = frag_len + KNET_HEADER_DATA_SIZE; } if (knet_h->crypto_instance) { frag_idx = 0; while (frag_idx < inbuf->khp_data_frag_num) { if (crypto_encrypt_and_sign( knet_h, (const unsigned char *)iov_out[frag_idx].iov_base, iov_out[frag_idx].iov_len, knet_h->send_to_links_buf_crypt[frag_idx], &outlen) < 0) { log_debug(knet_h, KNET_SUB_TX, "Unable to encrypt packet"); savederrno = ECHILD; err = -1; goto out_unlock; } iov_out[frag_idx].iov_base = knet_h->send_to_links_buf_crypt[frag_idx]; iov_out[frag_idx].iov_len = outlen; frag_idx++; } } memset(&msg, 0, sizeof(msg)); msgs_to_send = inbuf->khp_data_frag_num; msg_idx = 0; while (msg_idx < msgs_to_send) { msg[msg_idx].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); msg[msg_idx].msg_hdr.msg_iov = &iov_out[msg_idx]; msg[msg_idx].msg_hdr.msg_iovlen = 1; msg_idx++; } if (!bcast) { for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) { dst_host = knet_h->host_index[dst_host_ids[host_idx]]; err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send); savederrno = errno; if (err) { goto out_unlock; } } } else { for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { if (dst_host->status.reachable) { err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send); savederrno = errno; if (err) { goto out_unlock; } } } } out_unlock: errno = savederrno; return err; } int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel) { int savederrno = 0, err = 0; if (!knet_h) { errno = EINVAL; return -1; } if (buff == NULL) { errno = EINVAL; return -1; } if (buff_len <= 0) { errno = EINVAL; return -1; } if (buff_len > KNET_MAX_PACKET_SIZE) { errno = EINVAL; return -1; } if (channel < 0) { errno = EINVAL; return -1; } if (channel >= KNET_DATAFD_MAX) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_TX, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->sockfd[channel].in_use) { savederrno = EINVAL; err = -1; goto out; } savederrno = pthread_mutex_lock(&knet_h->tx_mutex); if (savederrno) { log_err(knet_h, KNET_SUB_TX, "Unable to get TX mutex lock: %s", strerror(savederrno)); err = -1; goto out; } knet_h->recv_from_sock_buf[0]->kh_type = KNET_HEADER_TYPE_DATA; memmove(knet_h->recv_from_sock_buf[0]->khp_data_userdata, buff, buff_len); err = _parse_recv_from_sock(knet_h, 0, buff_len, channel, 1); savederrno = errno; pthread_mutex_unlock(&knet_h->tx_mutex); out: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } static void _handle_send_to_links(knet_handle_t knet_h, int sockfd, int8_t channel, struct knet_mmsghdr *msg, int type) { ssize_t inlen = 0; struct iovec iov_in; int msg_recv, i; int savederrno = 0, docallback = 0; if ((channel >= 0) && (channel < KNET_DATAFD_MAX) && (!knet_h->sockfd[channel].is_socket)) { memset(&iov_in, 0, sizeof(iov_in)); iov_in.iov_base = (void *)knet_h->recv_from_sock_buf[0]->khp_data_userdata; iov_in.iov_len = KNET_MAX_PACKET_SIZE; inlen = readv(sockfd, &iov_in, 1); if (inlen <= 0) { savederrno = errno; docallback = 1; goto out; } msg_recv = 1; knet_h->recv_from_sock_buf[0]->kh_type = type; _parse_recv_from_sock(knet_h, 0, inlen, channel, 0); } else { - msg_recv = recvmmsg(sockfd, (struct mmsghdr *)&msg[0], PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL, NULL); + msg_recv = _recvmmsg(sockfd, (struct mmsghdr *)&msg[0], PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL); if (msg_recv < 0) { inlen = msg_recv; savederrno = errno; docallback = 1; goto out; } for (i = 0; i < msg_recv; i++) { inlen = msg[i].msg_len; if (inlen == 0) { savederrno = 0; docallback = 1; goto out; break; } knet_h->recv_from_sock_buf[i]->kh_type = type; _parse_recv_from_sock(knet_h, i, inlen, channel, 0); } } out: if (inlen < 0) { struct epoll_event ev; memset(&ev, 0, sizeof(struct epoll_event)); if (epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) { log_err(knet_h, KNET_SUB_TX, "Unable to del datafd %d from linkfd epoll pool: %s", knet_h->sockfd[channel].sockfd[0], strerror(savederrno)); } else { knet_h->sockfd[channel].has_error = 1; } } if (docallback) { knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data, knet_h->sockfd[channel].sockfd[0], channel, KNET_NOTIFY_TX, inlen, savederrno); } } void *_handle_send_to_links_thread(void *data) { knet_handle_t knet_h = (knet_handle_t) data; struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; struct sockaddr_storage address[PCKT_FRAG_MAX]; struct knet_mmsghdr msg[PCKT_FRAG_MAX]; struct iovec iov_in[PCKT_FRAG_MAX]; int i, nev, type; int8_t channel; memset(&msg, 0, sizeof(msg)); /* preparing data buffer */ for (i = 0; i < PCKT_FRAG_MAX; i++) { iov_in[i].iov_base = (void *)knet_h->recv_from_sock_buf[i]->khp_data_userdata; iov_in[i].iov_len = KNET_MAX_PACKET_SIZE; memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr)); msg[i].msg_hdr.msg_name = &address[i]; msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); msg[i].msg_hdr.msg_iov = &iov_in[i]; msg[i].msg_hdr.msg_iovlen = 1; knet_h->recv_from_sock_buf[i]->kh_version = KNET_HEADER_VERSION; knet_h->recv_from_sock_buf[i]->khp_data_frag_seq = 0; knet_h->recv_from_sock_buf[i]->kh_node = htons(knet_h->host_id); knet_h->send_to_links_buf[i]->kh_version = KNET_HEADER_VERSION; knet_h->send_to_links_buf[i]->khp_data_frag_seq = i + 1; knet_h->send_to_links_buf[i]->kh_node = htons(knet_h->host_id); } while (!shutdown_in_progress(knet_h)) { nev = epoll_wait(knet_h->send_to_links_epollfd, events, KNET_EPOLL_MAX_EVENTS + 1, -1); if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) { log_debug(knet_h, KNET_SUB_TX, "Unable to get read lock"); continue; } for (i = 0; i < nev; i++) { if (events[i].data.fd == knet_h->hostsockfd[0]) { type = KNET_HEADER_TYPE_HOST_INFO; channel = -1; } else { type = KNET_HEADER_TYPE_DATA; for (channel = 0; channel < KNET_DATAFD_MAX; channel++) { if ((knet_h->sockfd[channel].in_use) && (knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created] == events[i].data.fd)) { break; } } } if (pthread_mutex_lock(&knet_h->tx_mutex) != 0) { log_debug(knet_h, KNET_SUB_TX, "Unable to get mutex lock"); continue; } _handle_send_to_links(knet_h, events[i].data.fd, channel, &msg[0], type); pthread_mutex_unlock(&knet_h->tx_mutex); } pthread_rwlock_unlock(&knet_h->global_rwlock); } return NULL; } diff --git a/libknet/transport_common.c b/libknet/transport_common.c index d56d4e0e..2b7187d7 100644 --- a/libknet/transport_common.c +++ b/libknet/transport_common.c @@ -1,428 +1,450 @@ #include "config.h" #include #include #include #include #include #include #include "libknet.h" #include "compat.h" #include "host.h" #include "link.h" #include "logging.h" #include "common.h" #include "transports.h" /* * reuse Jan Friesse's compat layer as wrapper to drop usage of sendmmsg * * TODO: kill those wrappers once we work on packet delivery guaranteed */ +int _recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen, unsigned int flags) +{ + int savederrno = 0, err = 0; + unsigned int i; + + for (i = 0; i < vlen; i++) { + err = recvmsg(sockfd, &msgvec[i].msg_hdr, flags); + savederrno = errno; + if (err >= 0) { + msgvec[i].msg_len = err; + } else { + if (err == -1 && errno == EAGAIN) { + err = 0; + } + break; + } + } + + errno = savederrno; + return ((err >= 0) ? i : err); +} + int _sendmmsg(int sockfd, struct knet_mmsghdr *msgvec, unsigned int vlen, unsigned int flags) { int savederrno = 0, err = 0; unsigned int i; for (i = 0; i < vlen; i++) { err = sendmsg(sockfd, &msgvec[i].msg_hdr, flags); savederrno = errno; if (err >= 0) { msgvec[i].msg_len = err; } else { break; } } errno = savederrno; return ((err >= 0) ? vlen : err); } int _configure_common_socket(knet_handle_t knet_h, int sock, const char *type) { int err = 0, savederrno = 0; int value; if (_fdset_cloexec(sock)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s CLOEXEC socket opts: %s", type, strerror(savederrno)); goto exit_error; } if (_fdset_nonblock(sock)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s NONBLOCK socket opts: %s", type, strerror(savederrno)); goto exit_error; } value = KNET_RING_RCVBUFF; #ifdef SO_RCVBUFFORCE if (setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE, &value, sizeof(value)) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s receive buffer: %s", type, strerror(savederrno)); goto exit_error; } #else if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &value, sizeof(value)) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s SO_RECVBUF: %s", type, strerror(savederrno)); goto exit_error; } #endif value = KNET_RING_RCVBUFF; #ifdef SO_SNDBUFFORCE if (setsockopt(sock, SOL_SOCKET, SO_SNDBUFFORCE, &value, sizeof(value)) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s send buffer: %s", type, strerror(savederrno)); goto exit_error; } #else if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &value, sizeof(value)) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s SO_SNDBUF: %s", type, strerror(savederrno)); goto exit_error; } #endif exit_error: errno = savederrno; return err; } int _configure_transport_socket(knet_handle_t knet_h, int sock, struct sockaddr_storage *address, const char *type) { int err = 0, savederrno = 0; int value; if (_configure_common_socket(knet_h, sock, type) < 0) { savederrno = errno; err = -1; goto exit_error; } #ifdef IP_FREEBIND value = 1; if (setsockopt(sock, SOL_IP, IP_FREEBIND, &value, sizeof(value)) <0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set FREEBIND on %s socket: %s", type, strerror(savederrno)); goto exit_error; } #endif #ifdef IP_BINDANY /* BSD */ value = 1; if (setsockopt(sock, IPPROTO_IP, IP_BINDANY, &value, sizeof(value)) <0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set BINDANY on %s socket: %s", type, strerror(savederrno)); goto exit_error; } #endif if (address->ss_family == AF_INET6) { value = 1; if (setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, &value, sizeof(value)) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s IPv6 only: %s", type, strerror(savederrno)); goto exit_error; } #ifdef IPV6_MTU_DISCOVER value = IPV6_PMTUDISC_PROBE; if (setsockopt(sock, SOL_IPV6, IPV6_MTU_DISCOVER, &value, sizeof(value)) <0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set PMTUDISC on %s socket: %s", type, strerror(savederrno)); goto exit_error; } #else value = 1; if (setsockopt(sock, IPPROTO_IPV6, IPV6_DONTFRAG, &value, sizeof(value)) <0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set DONTFRAG on %s socket: %s", type, strerror(savederrno)); goto exit_error; } #endif } else { #ifdef IP_MTU_DISCOVER value = IP_PMTUDISC_PROBE; if (setsockopt(sock, SOL_IP, IP_MTU_DISCOVER, &value, sizeof(value)) <0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set PMTUDISC on %s socket: %s", type, strerror(savederrno)); goto exit_error; } #else value = 1; if (setsockopt(sock, IPPROTO_IP, IP_DONTFRAG, &value, sizeof(value)) <0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set DONTFRAG on %s socket: %s", type, strerror(savederrno)); goto exit_error; } #endif } value = 1; if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value)) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s reuseaddr: %s", type, strerror(savederrno)); goto exit_error; } exit_error: errno = savederrno; return err; } int _init_socketpair(knet_handle_t knet_h, int *sock) { int err = 0, savederrno = 0; int i; if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, sock) != 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize socketpair: %s", strerror(savederrno)); goto exit_fail; } for (i = 0; i < 2; i++) { if (_configure_common_socket(knet_h, sock[i], "local socketpair") < 0) { savederrno = errno; err = -1; goto exit_fail; } } exit_fail: errno = savederrno; return err; } void _close_socketpair(knet_handle_t knet_h, int *sock) { int i; for (i = 0; i < 2; i++) { if (sock[i]) { close(sock[i]); sock[i] = 0; } } } /* * must be called with global read lock * * return -1 on error * return 0 if fd is invalid * return 1 if fd is valid */ int _is_valid_fd(knet_handle_t knet_h, int sockfd) { int ret = 0; if (sockfd < 0) { errno = EINVAL; return -1; } if (sockfd > KNET_MAX_FDS) { errno = EINVAL; return -1; } if (knet_h->knet_transport_fd_tracker[sockfd].transport >= KNET_MAX_TRANSPORTS) { ret = 0; } else { ret = 1; } return ret; } /* * must be called with global write lock */ int _set_fd_tracker(knet_handle_t knet_h, int sockfd, uint8_t transport, uint8_t data_type, void *data) { if (sockfd < 0) { errno = EINVAL; return -1; } if (sockfd > KNET_MAX_FDS) { errno = EINVAL; return -1; } knet_h->knet_transport_fd_tracker[sockfd].transport = transport; knet_h->knet_transport_fd_tracker[sockfd].data_type = data_type; knet_h->knet_transport_fd_tracker[sockfd].data = data; return 0; } /* * public api */ int knet_handle_get_transport_list(knet_handle_t knet_h, struct transport_info *transport_list, size_t *transport_list_entries) { int err = 0, savederrno = 0; int i, count; if (!knet_h) { errno = EINVAL; return -1; } if (!transport_list) { errno = EINVAL; return -1; } if (!transport_list_entries) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } count = 0; /* * we could potentially build this struct * at knet_handle_new init time, but * let's keep it dynamic in case at somepoint * we need to init transports dynamically * at runtime vs init time. */ for (i=0; itransport_ops[i]) { transport_list[count].name = knet_h->transport_ops[i]->transport_name; transport_list[count].id = knet_h->transport_ops[i]->transport_id; count++; } } *transport_list_entries = count; pthread_rwlock_unlock(&knet_h->global_rwlock); return err; } const char *knet_handle_get_transport_name_by_id(knet_handle_t knet_h, uint8_t transport) { int savederrno = 0; const char *name = NULL; if (!knet_h) { errno = EINVAL; return name; } if (transport >= KNET_MAX_TRANSPORTS) { errno = EINVAL; return name; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return name; } if (knet_h->transport_ops[transport]) { name = knet_h->transport_ops[transport]->transport_name; } else { savederrno = ENOENT; } pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return name; } uint8_t knet_handle_get_transport_id_by_name(knet_handle_t knet_h, const char *name) { int savederrno = 0; uint8_t err = KNET_MAX_TRANSPORTS; int i; if (!knet_h) { errno = EINVAL; return err; } if (!name) { errno = EINVAL; return err; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return err; } for (i=0; itransport_ops[i]) { if (!strcmp(knet_h->transport_ops[i]->transport_name, name)) { err = knet_h->transport_ops[i]->transport_id; break; } } } if (err == KNET_MAX_TRANSPORTS) { savederrno = EINVAL; } pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } diff --git a/libknet/transports.h b/libknet/transports.h index e47f7128..e6f65f8f 100644 --- a/libknet/transports.h +++ b/libknet/transports.h @@ -1,26 +1,27 @@ /* * Copyright (C) 2016 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * * This software licensed under GPL-2.0+, LGPL-2.0+ */ #ifndef __TRANSPORTS_H__ #define __TRANSPORTS_H__ knet_transport_ops_t *get_udp_transport(void); knet_transport_ops_t *get_sctp_transport(void); int _configure_common_socket(knet_handle_t knet_h, int sock, const char *type); int _configure_transport_socket(knet_handle_t knet_h, int sock, struct sockaddr_storage *address, const char *type); int _init_socketpair(knet_handle_t knet_h, int *sock); void _close_socketpair(knet_handle_t knet_h, int *sock); int _set_fd_tracker(knet_handle_t knet_h, int sockfd, uint8_t transport, uint8_t data_type, void *data); int _is_valid_fd(knet_handle_t knet_h, int sockfd); int _sendmmsg(int sockfd, struct knet_mmsghdr *msgvec, unsigned int vlen, unsigned int flags); +int _recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen, unsigned int flags); #endif